feat(telegram): use sendMessageDraft for streaming in private chats (#174)

* feat(telegram): use sendMessageDraft for streaming in private chats

Use Telegram Bot API 9.3's sendMessageDraft to stream partial messages
with smooth animation in private chats, replacing the sendMessage +
editMessageText approach. Group/channel chats keep the existing
edit-based streaming.

- Add sendTelegramDraft() for the sendMessageDraft API
- Detect private chats via conversation_type metadata in OpenStream
- Use 300ms throttle for drafts (vs 5s for edits)
- Send permanent messages at tool call boundaries and on final event
- Reset buffer atomically in StreamEventFinal to prevent duplicate
  messages when multiple final events fire (one per assistant output)

* test(telegram): improve draft mode test assertions

Add sendTextForTest hook for sendTelegramTextReturnMessage to enable
direct assertion of send calls. Clean up residual unused variables
and replace indirect assertions with explicit mock-based verification.
This commit is contained in:
Menci
2026-03-03 16:01:18 +08:00
committed by GitHub
parent 7730096696
commit b1925bf2be
4 changed files with 527 additions and 41 deletions
+126 -33
View File
@@ -15,24 +15,27 @@ import (
)
const telegramStreamEditThrottle = 5000 * time.Millisecond
const telegramDraftThrottle = 300 * time.Millisecond
const telegramStreamToolHintText = "Calling tools..."
const telegramStreamPendingSuffix = "\n……"
var testEditFunc func(bot *tgbotapi.BotAPI, chatID int64, msgID int, text string, parseMode string) error
type telegramOutboundStream struct {
adapter *TelegramAdapter
cfg channel.ChannelConfig
target string
reply *channel.ReplyRef
parseMode string
closed atomic.Bool
mu sync.Mutex
buf strings.Builder
streamChatID int64
streamMsgID int
lastEdited string
lastEditedAt time.Time
adapter *TelegramAdapter
cfg channel.ChannelConfig
target string
reply *channel.ReplyRef
parseMode string
isPrivateChat bool
draftID int
closed atomic.Bool
mu sync.Mutex
buf strings.Builder
streamChatID int64
streamMsgID int
lastEdited string
lastEditedAt time.Time
}
func (s *telegramOutboundStream) getBot(ctx context.Context) (bot *tgbotapi.BotAPI, err error) {
@@ -205,6 +208,59 @@ func (s *telegramOutboundStream) editStreamMessageFinal(ctx context.Context, tex
return nil
}
// sendDraft sends a partial message via sendMessageDraft with throttling.
// Only used for private chats.
func (s *telegramOutboundStream) sendDraft(ctx context.Context, text string) error {
s.mu.Lock()
lastEditedAt := s.lastEditedAt
s.mu.Unlock()
if time.Since(lastEditedAt) < telegramDraftThrottle {
return nil
}
if strings.TrimSpace(text) == "" {
return nil
}
bot, err := s.getBot(ctx)
if err != nil {
return err
}
draftErr := sendTelegramDraft(bot, s.streamChatID, s.draftID, text, "")
if draftErr != nil {
if isTelegramTooManyRequests(draftErr) {
d := getTelegramRetryAfter(draftErr)
if d <= 0 {
d = telegramDraftThrottle
}
s.mu.Lock()
s.lastEditedAt = time.Now().Add(d)
s.mu.Unlock()
return nil
}
return draftErr
}
s.mu.Lock()
s.lastEditedAt = time.Now()
s.mu.Unlock()
return nil
}
// sendPermanentMessage sends a final, permanent message via sendMessage.
// Used in draft mode to commit text after streaming is complete for a phase.
func (s *telegramOutboundStream) sendPermanentMessage(ctx context.Context, text string, parseMode string) error {
if strings.TrimSpace(text) == "" {
return nil
}
bot, replyTo, err := s.getBotAndReply(ctx)
if err != nil {
return err
}
return sendTelegramText(bot, s.target, text, replyTo, parseMode)
}
func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamEvent) error {
if s == nil || s.adapter == nil {
return fmt.Errorf("telegram stream not configured")
@@ -225,12 +281,21 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
bufText := strings.TrimSpace(s.buf.String())
hasMsg := s.streamMsgID != 0
s.mu.Unlock()
if hasMsg && bufText != "" {
if s.isPrivateChat {
// In draft mode, send buffered text as a permanent message before tool execution.
if bufText != "" {
if err := s.sendPermanentMessage(ctx, bufText, ""); err != nil {
slog.Warn("telegram: draft permanent message failed", slog.Any("error", err))
}
}
} else if hasMsg && bufText != "" {
_ = s.editStreamMessageFinal(ctx, bufText)
}
s.mu.Lock()
s.streamMsgID = 0
s.streamChatID = 0
if !s.isPrivateChat {
s.streamChatID = 0
}
s.lastEdited = ""
s.lastEditedAt = time.Time{}
s.buf.Reset()
@@ -239,7 +304,9 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
case channel.StreamEventToolCallEnd:
s.mu.Lock()
s.streamMsgID = 0
s.streamChatID = 0
if !s.isPrivateChat {
s.streamChatID = 0
}
s.lastEdited = ""
s.lastEditedAt = time.Time{}
s.buf.Reset()
@@ -267,6 +334,11 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
return nil
case channel.StreamEventPhaseEnd:
if event.Phase == channel.StreamPhaseText {
// In draft mode, skip phase-end finalization; StreamEventFinal sends the
// permanent formatted message.
if s.isPrivateChat {
return nil
}
s.mu.Lock()
finalText := strings.TrimSpace(s.buf.String())
s.mu.Unlock()
@@ -288,31 +360,43 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
s.buf.WriteString(event.Delta)
content := s.buf.String()
s.mu.Unlock()
if s.isPrivateChat {
return s.sendDraft(ctx, content)
}
if err := s.ensureStreamMessage(ctx, content); err != nil {
return err
}
return s.editStreamMessage(ctx, content)
case channel.StreamEventFinal:
// In draft mode, read and reset buffer atomically to prevent duplicate
// permanent messages when multiple StreamEventFinal events fire
// (one per assistant output in multi-tool-call responses).
s.mu.Lock()
bufText := strings.TrimSpace(s.buf.String())
if s.isPrivateChat {
s.buf.Reset()
}
s.mu.Unlock()
if event.Final == nil || event.Final.Message.IsEmpty() {
s.mu.Lock()
finalText := strings.TrimSpace(s.buf.String())
s.mu.Unlock()
if finalText != "" {
if err := s.ensureStreamMessage(ctx, finalText); err != nil {
slog.Warn("telegram: ensure stream message failed", slog.Any("error", err))
}
if err := s.editStreamMessageFinal(ctx, finalText); err != nil {
slog.Warn("telegram: edit stream message failed", slog.Any("error", err))
if bufText != "" {
if s.isPrivateChat {
if err := s.sendPermanentMessage(ctx, bufText, ""); err != nil {
slog.Warn("telegram: draft final permanent message failed", slog.Any("error", err))
}
} else {
if err := s.ensureStreamMessage(ctx, bufText); err != nil {
slog.Warn("telegram: ensure stream message failed", slog.Any("error", err))
}
if err := s.editStreamMessageFinal(ctx, bufText); err != nil {
slog.Warn("telegram: edit stream message failed", slog.Any("error", err))
}
}
}
return nil
}
msg := event.Final.Message
s.mu.Lock()
bufText := strings.TrimSpace(s.buf.String())
s.mu.Unlock()
finalText := bufText
if finalText == "" {
if finalText == "" && !s.isPrivateChat {
finalText = strings.TrimSpace(msg.PlainText())
}
// Convert markdown to Telegram HTML for the final message.
@@ -323,11 +407,17 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
s.mu.Unlock()
finalText = formatted
}
if err := s.ensureStreamMessage(ctx, finalText); err != nil {
return err
}
if err := s.editStreamMessageFinal(ctx, finalText); err != nil {
return err
if s.isPrivateChat {
if err := s.sendPermanentMessage(ctx, finalText, s.parseMode); err != nil {
return err
}
} else {
if err := s.ensureStreamMessage(ctx, finalText); err != nil {
return err
}
if err := s.editStreamMessageFinal(ctx, finalText); err != nil {
return err
}
}
if len(msg.Attachments) > 0 {
replyTo := parseReplyToMessageID(s.reply)
@@ -357,6 +447,9 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
return nil
}
display := "Error: " + errText
if s.isPrivateChat {
return s.sendPermanentMessage(ctx, display, "")
}
if err := s.ensureStreamMessage(ctx, display); err != nil {
return err
}
@@ -300,3 +300,354 @@ func TestEditStreamMessageFinal_NoMessageNoOp(t *testing.T) {
t.Fatalf("editStreamMessageFinal when streamMsgID==0 should return nil: %v", err)
}
}
// --- Draft mode (sendMessageDraft) tests ---
func TestSendDraft_ThrottleSkip(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}},
isPrivateChat: true,
draftID: 1,
streamChatID: 123,
lastEditedAt: time.Now(), // just now, within draft throttle window
}
ctx := context.Background()
err := s.sendDraft(ctx, "hello")
if err != nil {
t.Fatalf("sendDraft within throttle window should skip and return nil: %v", err)
}
}
func TestSendDraft_EmptyTextSkip(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
isPrivateChat: true,
draftID: 1,
streamChatID: 123,
lastEditedAt: time.Now().Add(-time.Minute),
}
ctx := context.Background()
err := s.sendDraft(ctx, " ")
if err != nil {
t.Fatalf("sendDraft with whitespace-only text should skip and return nil: %v", err)
}
}
func TestSendDraft_Success(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}},
isPrivateChat: true,
draftID: 1,
streamChatID: 123,
lastEditedAt: time.Now().Add(-time.Minute),
}
ctx := context.Background()
origGetBot := getOrCreateBotForTest
origDraft := sendDraftForTest
getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) {
return &tgbotapi.BotAPI{Token: "fake"}, nil
}
var capturedChatID int64
var capturedDraftID int
var capturedText string
sendDraftForTest = func(_ *tgbotapi.BotAPI, chatID int64, draftID int, text string, _ string) error {
capturedChatID = chatID
capturedDraftID = draftID
capturedText = text
return nil
}
defer func() {
getOrCreateBotForTest = origGetBot
sendDraftForTest = origDraft
}()
err := s.sendDraft(ctx, "streaming text")
if err != nil {
t.Fatalf("sendDraft should succeed: %v", err)
}
if capturedChatID != 123 {
t.Fatalf("expected chatID 123, got %d", capturedChatID)
}
if capturedDraftID != 1 {
t.Fatalf("expected draftID 1, got %d", capturedDraftID)
}
if capturedText != "streaming text" {
t.Fatalf("expected text 'streaming text', got %q", capturedText)
}
}
func TestSendDraft_429Backoff(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
before := time.Now().Add(-time.Minute)
s := &telegramOutboundStream{
adapter: adapter,
cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}},
isPrivateChat: true,
draftID: 1,
streamChatID: 123,
lastEditedAt: before,
}
ctx := context.Background()
origGetBot := getOrCreateBotForTest
origDraft := sendDraftForTest
getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) {
return &tgbotapi.BotAPI{Token: "fake"}, nil
}
sendDraftForTest = func(*tgbotapi.BotAPI, int64, int, string, string) error {
return tgbotapi.Error{
Code: 429,
Message: "Too Many Requests",
ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 2},
}
}
defer func() {
getOrCreateBotForTest = origGetBot
sendDraftForTest = origDraft
}()
err := s.sendDraft(ctx, "hello")
if err != nil {
t.Fatalf("sendDraft on 429 should return nil (backoff): %v", err)
}
s.mu.Lock()
lastEditedAt := s.lastEditedAt
s.mu.Unlock()
if !lastEditedAt.After(before) {
t.Fatalf("on 429 lastEditedAt should be pushed forward for backoff")
}
}
func TestDraftMode_DeltaUsesSendDraft(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}},
isPrivateChat: true,
draftID: 1,
streamChatID: 123,
}
ctx := context.Background()
origGetBot := getOrCreateBotForTest
origDraft := sendDraftForTest
getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) {
return &tgbotapi.BotAPI{Token: "fake"}, nil
}
draftCalls := 0
sendDraftForTest = func(*tgbotapi.BotAPI, int64, int, string, string) error {
draftCalls++
return nil
}
defer func() {
getOrCreateBotForTest = origGetBot
sendDraftForTest = origDraft
}()
err := s.Push(ctx, channel.StreamEvent{Type: channel.StreamEventDelta, Delta: "Hello "})
if err != nil {
t.Fatalf("Push delta should succeed: %v", err)
}
if draftCalls != 1 {
t.Fatalf("expected 1 sendDraft call, got %d", draftCalls)
}
s.mu.Lock()
buf := s.buf.String()
s.mu.Unlock()
if buf != "Hello " {
t.Fatalf("expected buffer to be 'Hello ', got %q", buf)
}
}
func TestDraftMode_PhaseEndTextIsNoOp(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
isPrivateChat: true,
draftID: 1,
streamChatID: 123,
}
s.buf.WriteString("some content")
ctx := context.Background()
err := s.Push(ctx, channel.StreamEvent{
Type: channel.StreamEventPhaseEnd,
Phase: channel.StreamPhaseText,
})
if err != nil {
t.Fatalf("PhaseEnd in draft mode should be no-op: %v", err)
}
}
func TestDraftMode_ToolCallStartSendsPermanentMessage(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}},
target: "123",
isPrivateChat: true,
draftID: 1,
streamChatID: 123,
}
s.buf.WriteString("partial text")
ctx := context.Background()
origGetBot := getOrCreateBotForTest
origSendEdit := sendEditForTest
origSendText := sendTextForTest
getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) {
return &tgbotapi.BotAPI{Token: "fake"}, nil
}
var sentText string
sendTextForTest = func(_ *tgbotapi.BotAPI, _ string, text string, _ int, _ string) (int64, int, error) {
sentText = text
return 123, 1, nil
}
sendEditForTest = func(_ *tgbotapi.BotAPI, edit tgbotapi.EditMessageTextConfig) error {
t.Error("editMessage should not be called in draft mode")
return nil
}
defer func() {
getOrCreateBotForTest = origGetBot
sendEditForTest = origSendEdit
sendTextForTest = origSendText
}()
err := s.Push(ctx, channel.StreamEvent{Type: channel.StreamEventToolCallStart})
if err != nil {
t.Fatalf("Push ToolCallStart should succeed: %v", err)
}
if sentText != "partial text" {
t.Fatalf("expected sendPermanentMessage with 'partial text', got %q", sentText)
}
s.mu.Lock()
bufAfter := s.buf.String()
chatID := s.streamChatID
s.mu.Unlock()
if bufAfter != "" {
t.Fatalf("buffer should be reset after ToolCallStart: got %q", bufAfter)
}
// streamChatID should be preserved in draft mode
if chatID != 123 {
t.Fatalf("streamChatID should be preserved in draft mode: got %d", chatID)
}
}
func TestDraftMode_FinalEmptyBufferSkipsDuplicate(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}},
target: "123",
isPrivateChat: true,
draftID: 1,
streamChatID: 123,
}
ctx := context.Background()
// Simulate: buffer was already committed during ToolCallStart, so it's empty.
// StreamEventFinal should NOT re-send the message via PlainText() fallback.
origGetBot := getOrCreateBotForTest
origSendText := sendTextForTest
getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) {
return &tgbotapi.BotAPI{Token: "fake"}, nil
}
sendTextForTest = func(_ *tgbotapi.BotAPI, _ string, _ string, _ int, _ string) (int64, int, error) {
t.Error("sendTelegramText should not be called when buffer is empty in draft mode")
return 0, 0, nil
}
defer func() {
getOrCreateBotForTest = origGetBot
sendTextForTest = origSendText
}()
err := s.Push(ctx, channel.StreamEvent{
Type: channel.StreamEventFinal,
Final: &channel.StreamFinalizePayload{
Message: channel.Message{Text: "already sent text"},
},
})
if err != nil {
t.Fatalf("StreamEventFinal with empty buffer in draft mode should succeed: %v", err)
}
}
// TestDraftMode_MultipleFinalEventsOnlyOneSend verifies that when multiple
// StreamEventFinal events fire (one per assistant output in multi-tool-call
// responses), only the first one sends the buffer text as a permanent message.
// Subsequent finals find the buffer empty and skip sending.
func TestDraftMode_MultipleFinalEventsOnlyOneSend(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}},
target: "123",
isPrivateChat: true,
draftID: 1,
streamChatID: 123,
}
ctx := context.Background()
// Simulate buffer containing the final summary text
s.buf.WriteString("final summary")
origGetBot := getOrCreateBotForTest
origSendText := sendTextForTest
getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) {
return &tgbotapi.BotAPI{Token: "fake"}, nil
}
sendCount := 0
sendTextForTest = func(_ *tgbotapi.BotAPI, _ string, _ string, _ int, _ string) (int64, int, error) {
sendCount++
return 123, 1, nil
}
defer func() {
getOrCreateBotForTest = origGetBot
sendTextForTest = origSendText
}()
// Push 3 StreamEventFinal events (simulating 3 assistant outputs).
// Only the first should actually send a message.
for i, text := range []string{"intermediate 1", "intermediate 2", "final summary"} {
err := s.Push(ctx, channel.StreamEvent{
Type: channel.StreamEventFinal,
Final: &channel.StreamFinalizePayload{
Message: channel.Message{Text: text},
},
})
if err != nil {
t.Fatalf("StreamEventFinal #%d should succeed: %v", i+1, err)
}
}
if sendCount != 1 {
t.Fatalf("expected exactly 1 sendTelegramText call, got %d", sendCount)
}
}
+48 -7
View File
@@ -585,8 +585,10 @@ func (a *TelegramAdapter) Send(ctx context.Context, cfg channel.ChannelConfig, m
}
// OpenStream opens a Telegram streaming session.
// The adapter sends one message then edits it in place as deltas arrive (editMessageText),
// avoiding one message per delta and rate limits.
// For private chats, uses sendMessageDraft to stream partial content with smooth
// animation, then sends a final permanent message via sendMessage.
// For group/channel chats, sends one message then edits it in place as deltas
// arrive (editMessageText), avoiding one message per delta and rate limits.
func (a *TelegramAdapter) OpenStream(ctx context.Context, cfg channel.ChannelConfig, target string, opts channel.StreamOptions) (channel.OutboundStream, error) {
target = strings.TrimSpace(target)
if target == "" {
@@ -597,12 +599,25 @@ func (a *TelegramAdapter) OpenStream(ctx context.Context, cfg channel.ChannelCon
return nil, ctx.Err()
default:
}
isPrivateChat := false
var chatID int64
if opts.Metadata != nil {
if ct, ok := opts.Metadata["conversation_type"].(string); ok && ct == "private" {
if parsed, err := strconv.ParseInt(target, 10, 64); err == nil {
isPrivateChat = true
chatID = parsed
}
}
}
return &telegramOutboundStream{
adapter: a,
cfg: cfg,
target: target,
reply: opts.Reply,
parseMode: "",
adapter: a,
cfg: cfg,
target: target,
reply: opts.Reply,
parseMode: "",
isPrivateChat: isPrivateChat,
streamChatID: chatID,
draftID: 1,
}, nil
}
@@ -680,9 +695,14 @@ func sendTelegramText(bot *tgbotapi.BotAPI, target string, text string, replyTo
return err
}
var sendTextForTest func(bot *tgbotapi.BotAPI, target string, text string, replyTo int, parseMode string) (int64, int, error)
// sendTelegramTextReturnMessage sends a text message and returns the chat ID and message ID for later editing.
func sendTelegramTextReturnMessage(bot *tgbotapi.BotAPI, target string, text string, replyTo int, parseMode string) (chatID int64, messageID int, err error) {
text = truncateTelegramText(sanitizeTelegramText(text))
if sendTextForTest != nil {
return sendTextForTest(bot, target, text, replyTo, parseMode)
}
var sent tgbotapi.Message
if strings.HasPrefix(target, "@") {
message := tgbotapi.NewMessageToChannel(target, text)
@@ -735,6 +755,27 @@ func editTelegramMessageText(bot *tgbotapi.BotAPI, chatID int64, messageID int,
return err
}
var sendDraftForTest func(bot *tgbotapi.BotAPI, chatID int64, draftID int, text string, parseMode string) error
// sendTelegramDraft calls the sendMessageDraft Bot API method to stream a
// partial message to a private chat while it is being generated.
func sendTelegramDraft(bot *tgbotapi.BotAPI, chatID int64, draftID int, text string, parseMode string) error {
text = truncateTelegramText(sanitizeTelegramText(text))
if strings.TrimSpace(text) == "" {
return nil
}
if sendDraftForTest != nil {
return sendDraftForTest(bot, chatID, draftID, text, parseMode)
}
params := tgbotapi.Params{}
params.AddFirstValid("chat_id", chatID)
params.AddNonZero("draft_id", draftID)
params.AddNonEmpty("text", text)
params.AddNonEmpty("parse_mode", parseMode)
_, err := bot.MakeRequest("sendMessageDraft", params)
return err
}
func isTelegramMessageNotModified(err error) bool {
if err == nil {
return false
+2 -1
View File
@@ -325,7 +325,8 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel
Reply: replyRef,
SourceMessageID: sourceMessageID,
Metadata: map[string]any{
"route_id": resolved.RouteID,
"route_id": resolved.RouteID,
"conversation_type": msg.Conversation.Type,
},
})
if err != nil {