mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
fix(telegram): several fixes of typing action in stream message (#136)
This commit is contained in:
@@ -21,18 +21,19 @@ const telegramStreamPendingSuffix = "\n……"
|
||||
var testEditFunc func(bot *tgbotapi.BotAPI, chatID int64, msgID int, text string, parseMode string) error
|
||||
|
||||
type telegramOutboundStream struct {
|
||||
adapter *TelegramAdapter
|
||||
cfg channel.ChannelConfig
|
||||
target string
|
||||
reply *channel.ReplyRef
|
||||
parseMode string
|
||||
closed atomic.Bool
|
||||
mu sync.Mutex
|
||||
buf strings.Builder
|
||||
streamChatID int64
|
||||
streamMsgID int
|
||||
lastEdited string
|
||||
lastEditedAt time.Time
|
||||
adapter *TelegramAdapter
|
||||
cfg channel.ChannelConfig
|
||||
target string
|
||||
reply *channel.ReplyRef
|
||||
parseMode string
|
||||
closed atomic.Bool
|
||||
mu sync.Mutex
|
||||
buf strings.Builder
|
||||
streamChatID int64
|
||||
streamMsgID int
|
||||
lastEdited string
|
||||
lastEditedAt time.Time
|
||||
lastTypingActionAt time.Time
|
||||
}
|
||||
|
||||
func (s *telegramOutboundStream) getBot(ctx context.Context) (bot *tgbotapi.BotAPI, err error) {
|
||||
@@ -56,21 +57,30 @@ func (s *telegramOutboundStream) getBotAndReply(ctx context.Context) (bot *tgbot
|
||||
return bot, replyTo, nil
|
||||
}
|
||||
|
||||
func (s *telegramOutboundStream) refreshTypingAction(ctx context.Context) error {
|
||||
func (s *telegramOutboundStream) refreshTypingAction(ctx context.Context) {
|
||||
// When ensureStreamMessage is called, always means that the message has not been completely generated
|
||||
// so always refresh the "typing" action to improve the user experience
|
||||
bot, err := s.getBot(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
s.mu.Lock()
|
||||
if time.Since(s.lastTypingActionAt) < telegramStreamEditThrottle {
|
||||
// typing action lasts for 5 seconds
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
action := tgbotapi.NewChatAction(s.streamChatID, tgbotapi.ChatTyping)
|
||||
_, err = bot.Request(action)
|
||||
return err
|
||||
s.lastTypingActionAt = time.Now()
|
||||
chatID := s.streamChatID
|
||||
s.mu.Unlock()
|
||||
go func() {
|
||||
bot, err := s.getBot(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
action := tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping)
|
||||
_, _ = bot.Request(action)
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *telegramOutboundStream) ensureStreamMessage(ctx context.Context, text string) error {
|
||||
s.mu.Lock()
|
||||
go s.refreshTypingAction(ctx)
|
||||
if s.streamMsgID != 0 {
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
@@ -217,6 +227,7 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
|
||||
case channel.StreamEventStatus:
|
||||
return nil
|
||||
case channel.StreamEventToolCallStart:
|
||||
s.refreshTypingAction(ctx)
|
||||
s.mu.Lock()
|
||||
bufText := strings.TrimSpace(s.buf.String())
|
||||
hasMsg := s.streamMsgID != 0
|
||||
@@ -264,6 +275,7 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
|
||||
case channel.StreamEventProcessingFailed, channel.StreamEventAgentStart, channel.StreamEventAgentEnd, channel.StreamEventProcessingStarted, channel.StreamEventProcessingCompleted:
|
||||
return nil
|
||||
case channel.StreamEventDelta:
|
||||
s.refreshTypingAction(ctx)
|
||||
if event.Delta == "" || event.Phase == channel.StreamPhaseReasoning {
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user