diff --git a/internal/channel/adapters/telegram/stream.go b/internal/channel/adapters/telegram/stream.go index ea774a9d..42eb0341 100644 --- a/internal/channel/adapters/telegram/stream.go +++ b/internal/channel/adapters/telegram/stream.go @@ -15,24 +15,27 @@ import ( ) const telegramStreamEditThrottle = 5000 * time.Millisecond +const telegramDraftThrottle = 300 * time.Millisecond const telegramStreamToolHintText = "Calling tools..." const telegramStreamPendingSuffix = "\n……" var testEditFunc func(bot *tgbotapi.BotAPI, chatID int64, msgID int, text string, parseMode string) error type telegramOutboundStream struct { - adapter *TelegramAdapter - cfg channel.ChannelConfig - target string - reply *channel.ReplyRef - parseMode string - closed atomic.Bool - mu sync.Mutex - buf strings.Builder - streamChatID int64 - streamMsgID int - lastEdited string - lastEditedAt time.Time + adapter *TelegramAdapter + cfg channel.ChannelConfig + target string + reply *channel.ReplyRef + parseMode string + isPrivateChat bool + draftID int + closed atomic.Bool + mu sync.Mutex + buf strings.Builder + streamChatID int64 + streamMsgID int + lastEdited string + lastEditedAt time.Time } func (s *telegramOutboundStream) getBot(ctx context.Context) (bot *tgbotapi.BotAPI, err error) { @@ -205,6 +208,59 @@ func (s *telegramOutboundStream) editStreamMessageFinal(ctx context.Context, tex return nil } +// sendDraft sends a partial message via sendMessageDraft with throttling. +// Only used for private chats. +func (s *telegramOutboundStream) sendDraft(ctx context.Context, text string) error { + s.mu.Lock() + lastEditedAt := s.lastEditedAt + s.mu.Unlock() + + if time.Since(lastEditedAt) < telegramDraftThrottle { + return nil + } + if strings.TrimSpace(text) == "" { + return nil + } + + bot, err := s.getBot(ctx) + if err != nil { + return err + } + + draftErr := sendTelegramDraft(bot, s.streamChatID, s.draftID, text, "") + if draftErr != nil { + if isTelegramTooManyRequests(draftErr) { + d := getTelegramRetryAfter(draftErr) + if d <= 0 { + d = telegramDraftThrottle + } + s.mu.Lock() + s.lastEditedAt = time.Now().Add(d) + s.mu.Unlock() + return nil + } + return draftErr + } + + s.mu.Lock() + s.lastEditedAt = time.Now() + s.mu.Unlock() + return nil +} + +// sendPermanentMessage sends a final, permanent message via sendMessage. +// Used in draft mode to commit text after streaming is complete for a phase. +func (s *telegramOutboundStream) sendPermanentMessage(ctx context.Context, text string, parseMode string) error { + if strings.TrimSpace(text) == "" { + return nil + } + bot, replyTo, err := s.getBotAndReply(ctx) + if err != nil { + return err + } + return sendTelegramText(bot, s.target, text, replyTo, parseMode) +} + func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamEvent) error { if s == nil || s.adapter == nil { return fmt.Errorf("telegram stream not configured") @@ -225,12 +281,21 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE bufText := strings.TrimSpace(s.buf.String()) hasMsg := s.streamMsgID != 0 s.mu.Unlock() - if hasMsg && bufText != "" { + if s.isPrivateChat { + // In draft mode, send buffered text as a permanent message before tool execution. + if bufText != "" { + if err := s.sendPermanentMessage(ctx, bufText, ""); err != nil { + slog.Warn("telegram: draft permanent message failed", slog.Any("error", err)) + } + } + } else if hasMsg && bufText != "" { _ = s.editStreamMessageFinal(ctx, bufText) } s.mu.Lock() s.streamMsgID = 0 - s.streamChatID = 0 + if !s.isPrivateChat { + s.streamChatID = 0 + } s.lastEdited = "" s.lastEditedAt = time.Time{} s.buf.Reset() @@ -239,7 +304,9 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE case channel.StreamEventToolCallEnd: s.mu.Lock() s.streamMsgID = 0 - s.streamChatID = 0 + if !s.isPrivateChat { + s.streamChatID = 0 + } s.lastEdited = "" s.lastEditedAt = time.Time{} s.buf.Reset() @@ -267,6 +334,11 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE return nil case channel.StreamEventPhaseEnd: if event.Phase == channel.StreamPhaseText { + // In draft mode, skip phase-end finalization; StreamEventFinal sends the + // permanent formatted message. + if s.isPrivateChat { + return nil + } s.mu.Lock() finalText := strings.TrimSpace(s.buf.String()) s.mu.Unlock() @@ -288,31 +360,43 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE s.buf.WriteString(event.Delta) content := s.buf.String() s.mu.Unlock() + if s.isPrivateChat { + return s.sendDraft(ctx, content) + } if err := s.ensureStreamMessage(ctx, content); err != nil { return err } return s.editStreamMessage(ctx, content) case channel.StreamEventFinal: + // In draft mode, read and reset buffer atomically to prevent duplicate + // permanent messages when multiple StreamEventFinal events fire + // (one per assistant output in multi-tool-call responses). + s.mu.Lock() + bufText := strings.TrimSpace(s.buf.String()) + if s.isPrivateChat { + s.buf.Reset() + } + s.mu.Unlock() if event.Final == nil || event.Final.Message.IsEmpty() { - s.mu.Lock() - finalText := strings.TrimSpace(s.buf.String()) - s.mu.Unlock() - if finalText != "" { - if err := s.ensureStreamMessage(ctx, finalText); err != nil { - slog.Warn("telegram: ensure stream message failed", slog.Any("error", err)) - } - if err := s.editStreamMessageFinal(ctx, finalText); err != nil { - slog.Warn("telegram: edit stream message failed", slog.Any("error", err)) + if bufText != "" { + if s.isPrivateChat { + if err := s.sendPermanentMessage(ctx, bufText, ""); err != nil { + slog.Warn("telegram: draft final permanent message failed", slog.Any("error", err)) + } + } else { + if err := s.ensureStreamMessage(ctx, bufText); err != nil { + slog.Warn("telegram: ensure stream message failed", slog.Any("error", err)) + } + if err := s.editStreamMessageFinal(ctx, bufText); err != nil { + slog.Warn("telegram: edit stream message failed", slog.Any("error", err)) + } } } return nil } msg := event.Final.Message - s.mu.Lock() - bufText := strings.TrimSpace(s.buf.String()) - s.mu.Unlock() finalText := bufText - if finalText == "" { + if finalText == "" && !s.isPrivateChat { finalText = strings.TrimSpace(msg.PlainText()) } // Convert markdown to Telegram HTML for the final message. @@ -323,11 +407,17 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE s.mu.Unlock() finalText = formatted } - if err := s.ensureStreamMessage(ctx, finalText); err != nil { - return err - } - if err := s.editStreamMessageFinal(ctx, finalText); err != nil { - return err + if s.isPrivateChat { + if err := s.sendPermanentMessage(ctx, finalText, s.parseMode); err != nil { + return err + } + } else { + if err := s.ensureStreamMessage(ctx, finalText); err != nil { + return err + } + if err := s.editStreamMessageFinal(ctx, finalText); err != nil { + return err + } } if len(msg.Attachments) > 0 { replyTo := parseReplyToMessageID(s.reply) @@ -357,6 +447,9 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE return nil } display := "Error: " + errText + if s.isPrivateChat { + return s.sendPermanentMessage(ctx, display, "") + } if err := s.ensureStreamMessage(ctx, display); err != nil { return err } diff --git a/internal/channel/adapters/telegram/stream_test.go b/internal/channel/adapters/telegram/stream_test.go index b2e3628c..e6c92538 100644 --- a/internal/channel/adapters/telegram/stream_test.go +++ b/internal/channel/adapters/telegram/stream_test.go @@ -300,3 +300,354 @@ func TestEditStreamMessageFinal_NoMessageNoOp(t *testing.T) { t.Fatalf("editStreamMessageFinal when streamMsgID==0 should return nil: %v", err) } } + +// --- Draft mode (sendMessageDraft) tests --- + +func TestSendDraft_ThrottleSkip(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}}, + isPrivateChat: true, + draftID: 1, + streamChatID: 123, + lastEditedAt: time.Now(), // just now, within draft throttle window + } + ctx := context.Background() + + err := s.sendDraft(ctx, "hello") + if err != nil { + t.Fatalf("sendDraft within throttle window should skip and return nil: %v", err) + } +} + +func TestSendDraft_EmptyTextSkip(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + isPrivateChat: true, + draftID: 1, + streamChatID: 123, + lastEditedAt: time.Now().Add(-time.Minute), + } + ctx := context.Background() + + err := s.sendDraft(ctx, " ") + if err != nil { + t.Fatalf("sendDraft with whitespace-only text should skip and return nil: %v", err) + } +} + +func TestSendDraft_Success(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}}, + isPrivateChat: true, + draftID: 1, + streamChatID: 123, + lastEditedAt: time.Now().Add(-time.Minute), + } + ctx := context.Background() + + origGetBot := getOrCreateBotForTest + origDraft := sendDraftForTest + getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) { + return &tgbotapi.BotAPI{Token: "fake"}, nil + } + var capturedChatID int64 + var capturedDraftID int + var capturedText string + sendDraftForTest = func(_ *tgbotapi.BotAPI, chatID int64, draftID int, text string, _ string) error { + capturedChatID = chatID + capturedDraftID = draftID + capturedText = text + return nil + } + defer func() { + getOrCreateBotForTest = origGetBot + sendDraftForTest = origDraft + }() + + err := s.sendDraft(ctx, "streaming text") + if err != nil { + t.Fatalf("sendDraft should succeed: %v", err) + } + if capturedChatID != 123 { + t.Fatalf("expected chatID 123, got %d", capturedChatID) + } + if capturedDraftID != 1 { + t.Fatalf("expected draftID 1, got %d", capturedDraftID) + } + if capturedText != "streaming text" { + t.Fatalf("expected text 'streaming text', got %q", capturedText) + } +} + +func TestSendDraft_429Backoff(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + before := time.Now().Add(-time.Minute) + s := &telegramOutboundStream{ + adapter: adapter, + cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}}, + isPrivateChat: true, + draftID: 1, + streamChatID: 123, + lastEditedAt: before, + } + ctx := context.Background() + + origGetBot := getOrCreateBotForTest + origDraft := sendDraftForTest + getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) { + return &tgbotapi.BotAPI{Token: "fake"}, nil + } + sendDraftForTest = func(*tgbotapi.BotAPI, int64, int, string, string) error { + return tgbotapi.Error{ + Code: 429, + Message: "Too Many Requests", + ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 2}, + } + } + defer func() { + getOrCreateBotForTest = origGetBot + sendDraftForTest = origDraft + }() + + err := s.sendDraft(ctx, "hello") + if err != nil { + t.Fatalf("sendDraft on 429 should return nil (backoff): %v", err) + } + s.mu.Lock() + lastEditedAt := s.lastEditedAt + s.mu.Unlock() + if !lastEditedAt.After(before) { + t.Fatalf("on 429 lastEditedAt should be pushed forward for backoff") + } +} + +func TestDraftMode_DeltaUsesSendDraft(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}}, + isPrivateChat: true, + draftID: 1, + streamChatID: 123, + } + ctx := context.Background() + + origGetBot := getOrCreateBotForTest + origDraft := sendDraftForTest + getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) { + return &tgbotapi.BotAPI{Token: "fake"}, nil + } + draftCalls := 0 + sendDraftForTest = func(*tgbotapi.BotAPI, int64, int, string, string) error { + draftCalls++ + return nil + } + defer func() { + getOrCreateBotForTest = origGetBot + sendDraftForTest = origDraft + }() + + err := s.Push(ctx, channel.StreamEvent{Type: channel.StreamEventDelta, Delta: "Hello "}) + if err != nil { + t.Fatalf("Push delta should succeed: %v", err) + } + if draftCalls != 1 { + t.Fatalf("expected 1 sendDraft call, got %d", draftCalls) + } + s.mu.Lock() + buf := s.buf.String() + s.mu.Unlock() + if buf != "Hello " { + t.Fatalf("expected buffer to be 'Hello ', got %q", buf) + } +} + +func TestDraftMode_PhaseEndTextIsNoOp(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + isPrivateChat: true, + draftID: 1, + streamChatID: 123, + } + s.buf.WriteString("some content") + ctx := context.Background() + + err := s.Push(ctx, channel.StreamEvent{ + Type: channel.StreamEventPhaseEnd, + Phase: channel.StreamPhaseText, + }) + if err != nil { + t.Fatalf("PhaseEnd in draft mode should be no-op: %v", err) + } +} + +func TestDraftMode_ToolCallStartSendsPermanentMessage(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}}, + target: "123", + isPrivateChat: true, + draftID: 1, + streamChatID: 123, + } + s.buf.WriteString("partial text") + ctx := context.Background() + + origGetBot := getOrCreateBotForTest + origSendEdit := sendEditForTest + origSendText := sendTextForTest + getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) { + return &tgbotapi.BotAPI{Token: "fake"}, nil + } + var sentText string + sendTextForTest = func(_ *tgbotapi.BotAPI, _ string, text string, _ int, _ string) (int64, int, error) { + sentText = text + return 123, 1, nil + } + sendEditForTest = func(_ *tgbotapi.BotAPI, edit tgbotapi.EditMessageTextConfig) error { + t.Error("editMessage should not be called in draft mode") + return nil + } + defer func() { + getOrCreateBotForTest = origGetBot + sendEditForTest = origSendEdit + sendTextForTest = origSendText + }() + + err := s.Push(ctx, channel.StreamEvent{Type: channel.StreamEventToolCallStart}) + if err != nil { + t.Fatalf("Push ToolCallStart should succeed: %v", err) + } + if sentText != "partial text" { + t.Fatalf("expected sendPermanentMessage with 'partial text', got %q", sentText) + } + + s.mu.Lock() + bufAfter := s.buf.String() + chatID := s.streamChatID + s.mu.Unlock() + if bufAfter != "" { + t.Fatalf("buffer should be reset after ToolCallStart: got %q", bufAfter) + } + // streamChatID should be preserved in draft mode + if chatID != 123 { + t.Fatalf("streamChatID should be preserved in draft mode: got %d", chatID) + } +} + +func TestDraftMode_FinalEmptyBufferSkipsDuplicate(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}}, + target: "123", + isPrivateChat: true, + draftID: 1, + streamChatID: 123, + } + ctx := context.Background() + + // Simulate: buffer was already committed during ToolCallStart, so it's empty. + // StreamEventFinal should NOT re-send the message via PlainText() fallback. + origGetBot := getOrCreateBotForTest + origSendText := sendTextForTest + getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) { + return &tgbotapi.BotAPI{Token: "fake"}, nil + } + sendTextForTest = func(_ *tgbotapi.BotAPI, _ string, _ string, _ int, _ string) (int64, int, error) { + t.Error("sendTelegramText should not be called when buffer is empty in draft mode") + return 0, 0, nil + } + defer func() { + getOrCreateBotForTest = origGetBot + sendTextForTest = origSendText + }() + + err := s.Push(ctx, channel.StreamEvent{ + Type: channel.StreamEventFinal, + Final: &channel.StreamFinalizePayload{ + Message: channel.Message{Text: "already sent text"}, + }, + }) + if err != nil { + t.Fatalf("StreamEventFinal with empty buffer in draft mode should succeed: %v", err) + } +} + +// TestDraftMode_MultipleFinalEventsOnlyOneSend verifies that when multiple +// StreamEventFinal events fire (one per assistant output in multi-tool-call +// responses), only the first one sends the buffer text as a permanent message. +// Subsequent finals find the buffer empty and skip sending. +func TestDraftMode_MultipleFinalEventsOnlyOneSend(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}}, + target: "123", + isPrivateChat: true, + draftID: 1, + streamChatID: 123, + } + ctx := context.Background() + + // Simulate buffer containing the final summary text + s.buf.WriteString("final summary") + + origGetBot := getOrCreateBotForTest + origSendText := sendTextForTest + getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) { + return &tgbotapi.BotAPI{Token: "fake"}, nil + } + sendCount := 0 + sendTextForTest = func(_ *tgbotapi.BotAPI, _ string, _ string, _ int, _ string) (int64, int, error) { + sendCount++ + return 123, 1, nil + } + defer func() { + getOrCreateBotForTest = origGetBot + sendTextForTest = origSendText + }() + + // Push 3 StreamEventFinal events (simulating 3 assistant outputs). + // Only the first should actually send a message. + for i, text := range []string{"intermediate 1", "intermediate 2", "final summary"} { + err := s.Push(ctx, channel.StreamEvent{ + Type: channel.StreamEventFinal, + Final: &channel.StreamFinalizePayload{ + Message: channel.Message{Text: text}, + }, + }) + if err != nil { + t.Fatalf("StreamEventFinal #%d should succeed: %v", i+1, err) + } + } + + if sendCount != 1 { + t.Fatalf("expected exactly 1 sendTelegramText call, got %d", sendCount) + } +} diff --git a/internal/channel/adapters/telegram/telegram.go b/internal/channel/adapters/telegram/telegram.go index 58e9946f..45078b27 100644 --- a/internal/channel/adapters/telegram/telegram.go +++ b/internal/channel/adapters/telegram/telegram.go @@ -585,8 +585,10 @@ func (a *TelegramAdapter) Send(ctx context.Context, cfg channel.ChannelConfig, m } // OpenStream opens a Telegram streaming session. -// The adapter sends one message then edits it in place as deltas arrive (editMessageText), -// avoiding one message per delta and rate limits. +// For private chats, uses sendMessageDraft to stream partial content with smooth +// animation, then sends a final permanent message via sendMessage. +// For group/channel chats, sends one message then edits it in place as deltas +// arrive (editMessageText), avoiding one message per delta and rate limits. func (a *TelegramAdapter) OpenStream(ctx context.Context, cfg channel.ChannelConfig, target string, opts channel.StreamOptions) (channel.OutboundStream, error) { target = strings.TrimSpace(target) if target == "" { @@ -597,12 +599,25 @@ func (a *TelegramAdapter) OpenStream(ctx context.Context, cfg channel.ChannelCon return nil, ctx.Err() default: } + isPrivateChat := false + var chatID int64 + if opts.Metadata != nil { + if ct, ok := opts.Metadata["conversation_type"].(string); ok && ct == "private" { + if parsed, err := strconv.ParseInt(target, 10, 64); err == nil { + isPrivateChat = true + chatID = parsed + } + } + } return &telegramOutboundStream{ - adapter: a, - cfg: cfg, - target: target, - reply: opts.Reply, - parseMode: "", + adapter: a, + cfg: cfg, + target: target, + reply: opts.Reply, + parseMode: "", + isPrivateChat: isPrivateChat, + streamChatID: chatID, + draftID: 1, }, nil } @@ -680,9 +695,14 @@ func sendTelegramText(bot *tgbotapi.BotAPI, target string, text string, replyTo return err } +var sendTextForTest func(bot *tgbotapi.BotAPI, target string, text string, replyTo int, parseMode string) (int64, int, error) + // sendTelegramTextReturnMessage sends a text message and returns the chat ID and message ID for later editing. func sendTelegramTextReturnMessage(bot *tgbotapi.BotAPI, target string, text string, replyTo int, parseMode string) (chatID int64, messageID int, err error) { text = truncateTelegramText(sanitizeTelegramText(text)) + if sendTextForTest != nil { + return sendTextForTest(bot, target, text, replyTo, parseMode) + } var sent tgbotapi.Message if strings.HasPrefix(target, "@") { message := tgbotapi.NewMessageToChannel(target, text) @@ -735,6 +755,27 @@ func editTelegramMessageText(bot *tgbotapi.BotAPI, chatID int64, messageID int, return err } +var sendDraftForTest func(bot *tgbotapi.BotAPI, chatID int64, draftID int, text string, parseMode string) error + +// sendTelegramDraft calls the sendMessageDraft Bot API method to stream a +// partial message to a private chat while it is being generated. +func sendTelegramDraft(bot *tgbotapi.BotAPI, chatID int64, draftID int, text string, parseMode string) error { + text = truncateTelegramText(sanitizeTelegramText(text)) + if strings.TrimSpace(text) == "" { + return nil + } + if sendDraftForTest != nil { + return sendDraftForTest(bot, chatID, draftID, text, parseMode) + } + params := tgbotapi.Params{} + params.AddFirstValid("chat_id", chatID) + params.AddNonZero("draft_id", draftID) + params.AddNonEmpty("text", text) + params.AddNonEmpty("parse_mode", parseMode) + _, err := bot.MakeRequest("sendMessageDraft", params) + return err +} + func isTelegramMessageNotModified(err error) bool { if err == nil { return false diff --git a/internal/channel/inbound/channel.go b/internal/channel/inbound/channel.go index 513006c1..436be583 100644 --- a/internal/channel/inbound/channel.go +++ b/internal/channel/inbound/channel.go @@ -325,7 +325,8 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel Reply: replyRef, SourceMessageID: sourceMessageID, Metadata: map[string]any{ - "route_id": resolved.RouteID, + "route_id": resolved.RouteID, + "conversation_type": msg.Conversation.Type, }, }) if err != nil {