From b82444759a403a73e9a1f6f4de80170b12dd4e44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=AC=E9=A2=A8=E5=8D=83=E9=9B=AA?= Date: Sun, 1 Mar 2026 14:11:32 +0800 Subject: [PATCH] fix(telegram): several fixes of typing action in stream message (#136) --- internal/channel/adapters/telegram/stream.go | 52 ++++++++++++-------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/internal/channel/adapters/telegram/stream.go b/internal/channel/adapters/telegram/stream.go index cfdd801e..991464d8 100644 --- a/internal/channel/adapters/telegram/stream.go +++ b/internal/channel/adapters/telegram/stream.go @@ -21,18 +21,19 @@ const telegramStreamPendingSuffix = "\n……" var testEditFunc func(bot *tgbotapi.BotAPI, chatID int64, msgID int, text string, parseMode string) error type telegramOutboundStream struct { - adapter *TelegramAdapter - cfg channel.ChannelConfig - target string - reply *channel.ReplyRef - parseMode string - closed atomic.Bool - mu sync.Mutex - buf strings.Builder - streamChatID int64 - streamMsgID int - lastEdited string - lastEditedAt time.Time + adapter *TelegramAdapter + cfg channel.ChannelConfig + target string + reply *channel.ReplyRef + parseMode string + closed atomic.Bool + mu sync.Mutex + buf strings.Builder + streamChatID int64 + streamMsgID int + lastEdited string + lastEditedAt time.Time + lastTypingActionAt time.Time } func (s *telegramOutboundStream) getBot(ctx context.Context) (bot *tgbotapi.BotAPI, err error) { @@ -56,21 +57,30 @@ func (s *telegramOutboundStream) getBotAndReply(ctx context.Context) (bot *tgbot return bot, replyTo, nil } -func (s *telegramOutboundStream) refreshTypingAction(ctx context.Context) error { +func (s *telegramOutboundStream) refreshTypingAction(ctx context.Context) { // When ensureStreamMessage is called, always means that the message has not been completely generated // so always refresh the "typing" action to improve the user experience - bot, err := s.getBot(ctx) - if err != nil { - return err + s.mu.Lock() + if time.Since(s.lastTypingActionAt) < telegramStreamEditThrottle { + // typing action lasts for 5 seconds + s.mu.Unlock() + return } - action := tgbotapi.NewChatAction(s.streamChatID, tgbotapi.ChatTyping) - _, err = bot.Request(action) - return err + s.lastTypingActionAt = time.Now() + chatID := s.streamChatID + s.mu.Unlock() + go func() { + bot, err := s.getBot(ctx) + if err != nil { + return + } + action := tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping) + _, _ = bot.Request(action) + }() } func (s *telegramOutboundStream) ensureStreamMessage(ctx context.Context, text string) error { s.mu.Lock() - go s.refreshTypingAction(ctx) if s.streamMsgID != 0 { s.mu.Unlock() return nil @@ -217,6 +227,7 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE case channel.StreamEventStatus: return nil case channel.StreamEventToolCallStart: + s.refreshTypingAction(ctx) s.mu.Lock() bufText := strings.TrimSpace(s.buf.String()) hasMsg := s.streamMsgID != 0 @@ -264,6 +275,7 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE case channel.StreamEventProcessingFailed, channel.StreamEventAgentStart, channel.StreamEventAgentEnd, channel.StreamEventProcessingStarted, channel.StreamEventProcessingCompleted: return nil case channel.StreamEventDelta: + s.refreshTypingAction(ctx) if event.Delta == "" || event.Phase == channel.StreamPhaseReasoning { return nil }