diff --git a/internal/channel/adapters/telegram/stream.go b/internal/channel/adapters/telegram/stream.go index 93156d05..15acdfce 100644 --- a/internal/channel/adapters/telegram/stream.go +++ b/internal/channel/adapters/telegram/stream.go @@ -14,7 +14,9 @@ import ( "github.com/memohai/memoh/internal/channel" ) -const telegramStreamEditThrottle = 350 * time.Millisecond +const telegramStreamEditThrottle = 250 * time.Millisecond + +var testEditFunc func(bot *tgbotapi.BotAPI, chatID int64, msgID int, text string, parseMode string) error type telegramOutboundStream struct { adapter *TelegramAdapter @@ -91,8 +93,22 @@ func (s *telegramOutboundStream) editStreamMessage(ctx context.Context, text str if err != nil { return err } - if err := editTelegramMessageText(bot, chatID, msgID, text, s.parseMode); err != nil { - return err + editErr := error(nil) + if testEditFunc != nil { + editErr = testEditFunc(bot, chatID, msgID, text, s.parseMode) + } else { + editErr = editTelegramMessageText(bot, chatID, msgID, text, s.parseMode) + } + if editErr != nil { + if isTelegramTooManyRequests(editErr) { + if d := getTelegramRetryAfter(editErr); d > 0 { + s.mu.Lock() + s.lastEditedAt = time.Now().Add(d) + s.mu.Unlock() + } + return nil + } + return editErr } s.mu.Lock() s.lastEdited = text diff --git a/internal/channel/adapters/telegram/stream_test.go b/internal/channel/adapters/telegram/stream_test.go index 56ea4032..5a7cda8b 100644 --- a/internal/channel/adapters/telegram/stream_test.go +++ b/internal/channel/adapters/telegram/stream_test.go @@ -4,7 +4,9 @@ import ( "context" "strings" "testing" + "time" + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" "github.com/memohai/memoh/internal/channel" ) @@ -117,3 +119,116 @@ func TestTelegramOutboundStream_CloseContextCanceled(t *testing.T) { t.Fatalf("Close with canceled context should return context.Canceled: %v", err) } } + +// Test editStreamMessage dedup: no API call when content equals lastEdited (avoids Telegram "message is not modified" error). +func TestEditStreamMessage_NoEditWhenSameContent(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + streamChatID: 1, + streamMsgID: 1, + lastEdited: "hello", + lastEditedAt: time.Now().Add(-time.Minute), + } + ctx := context.Background() + + tests := []struct { + name string + text string + }{ + {"exact same", "hello"}, + {"trimmed same", " hello "}, + {"leading space", " hello"}, + {"trailing space", "hello "}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := s.editStreamMessage(ctx, tt.text) + if err != nil { + t.Fatalf("editStreamMessage(same content) should return nil to avoid duplicate edit API call: %v", err) + } + }) + } +} + +func TestEditStreamMessage_NoEditWhenMessageNotSent(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{adapter: adapter, streamMsgID: 0} + ctx := context.Background() + + err := s.editStreamMessage(ctx, "any") + if err != nil { + t.Fatalf("editStreamMessage when streamMsgID==0 should return nil: %v", err) + } +} + +func TestEditStreamMessage_NoEditWhenThrottled(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + s := &telegramOutboundStream{ + adapter: adapter, + streamChatID: 1, + streamMsgID: 1, + lastEdited: "a", + lastEditedAt: time.Now(), // just now, within 350ms + } + ctx := context.Background() + + err := s.editStreamMessage(ctx, "ab") + if err != nil { + t.Fatalf("editStreamMessage within throttle window and no newline should skip edit and return nil: %v", err) + } +} + +func TestEditStreamMessage_429SetsBackoffAndReturnsNil(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + before := time.Now().Add(-time.Minute) + s := &telegramOutboundStream{ + adapter: adapter, + cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}}, + streamChatID: 1, + streamMsgID: 1, + lastEdited: "a", + lastEditedAt: before, + } + ctx := context.Background() + + origGetBot := getOrCreateBotForTest + origEdit := testEditFunc + getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) { + return &tgbotapi.BotAPI{Token: "fake"}, nil + } + testEditFunc = func(*tgbotapi.BotAPI, int64, int, string, string) error { + return tgbotapi.Error{ + Code: 429, + Message: "Too Many Requests", + ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 2}, + } + } + defer func() { + getOrCreateBotForTest = origGetBot + testEditFunc = origEdit + }() + + err := s.editStreamMessage(ctx, "b") + if err != nil { + t.Fatalf("editStreamMessage on 429 should return nil (backoff): %v", err) + } + s.mu.Lock() + lastEdited := s.lastEdited + lastEditedAt := s.lastEditedAt + s.mu.Unlock() + if lastEdited != "a" { + t.Fatalf("on 429 lastEdited should remain unchanged: got %q", lastEdited) + } + if !lastEditedAt.After(before) { + t.Fatalf("on 429 lastEditedAt should be pushed forward for backoff: got %v", lastEditedAt) + } +} diff --git a/internal/channel/adapters/telegram/telegram.go b/internal/channel/adapters/telegram/telegram.go index 1c1544de..77f2dd48 100644 --- a/internal/channel/adapters/telegram/telegram.go +++ b/internal/channel/adapters/telegram/telegram.go @@ -2,6 +2,7 @@ package telegram import ( "context" + "errors" "fmt" "log/slog" "strconv" @@ -37,7 +38,12 @@ func NewTelegramAdapter(log *slog.Logger) *TelegramAdapter { return adapter } +var getOrCreateBotForTest func(a *TelegramAdapter, token, configID string) (*tgbotapi.BotAPI, error) + func (a *TelegramAdapter) getOrCreateBot(token, configID string) (*tgbotapi.BotAPI, error) { + if getOrCreateBotForTest != nil { + return getOrCreateBotForTest(a, token, configID) + } a.mu.RLock() bot, ok := a.bots[token] a.mu.RUnlock() @@ -446,16 +452,74 @@ func sendTelegramTextReturnMessage(bot *tgbotapi.BotAPI, target string, text str return chatID, messageID, nil } +var ( + sendEditForTest func(bot *tgbotapi.BotAPI, edit tgbotapi.EditMessageTextConfig) error + sleepForTest func(time.Duration) +) + func editTelegramMessageText(bot *tgbotapi.BotAPI, chatID int64, messageID int, text string, parseMode string) error { if len(text) > telegramMaxMessageLength { text = text[:telegramMaxMessageLength-3] + "..." } edit := tgbotapi.NewEditMessageText(chatID, messageID, text) edit.ParseMode = parseMode - _, err := bot.Send(edit) + send := sendEditForTest + if send == nil { + send = func(b *tgbotapi.BotAPI, e tgbotapi.EditMessageTextConfig) error { _, err := b.Send(e); return err } + } + sleep := sleepForTest + if sleep == nil { + sleep = time.Sleep + } + err := send(bot, edit) + if err != nil && isTelegramMessageNotModified(err) { + return nil + } + if err != nil && isTelegramTooManyRequests(err) { + if d := getTelegramRetryAfter(err); d > 0 { + sleep(d) + err = send(bot, edit) + if err != nil && isTelegramMessageNotModified(err) { + return nil + } + } + } return err } +func isTelegramMessageNotModified(err error) bool { + if err == nil { + return false + } + var apiErr tgbotapi.Error + if errors.As(err, &apiErr) { + return apiErr.Code == 400 && strings.Contains(apiErr.Message, "message is not modified") + } + return false +} + +func isTelegramTooManyRequests(err error) bool { + if err == nil { + return false + } + var apiErr tgbotapi.Error + if errors.As(err, &apiErr) { + return apiErr.Code == 429 + } + return false +} + +func getTelegramRetryAfter(err error) time.Duration { + if err == nil { + return 0 + } + var apiErr tgbotapi.Error + if errors.As(err, &apiErr) && apiErr.RetryAfter > 0 { + return time.Duration(apiErr.RetryAfter) * time.Second + } + return 0 +} + func sendTelegramAttachment(bot *tgbotapi.BotAPI, target string, att channel.Attachment, caption string, replyTo int, parseMode string) error { urlRef := strings.TrimSpace(att.URL) keyRef := strings.TrimSpace(att.PlatformKey) diff --git a/internal/channel/adapters/telegram/telegram_test.go b/internal/channel/adapters/telegram/telegram_test.go index b3c4ca46..8101ca7a 100644 --- a/internal/channel/adapters/telegram/telegram_test.go +++ b/internal/channel/adapters/telegram/telegram_test.go @@ -2,8 +2,10 @@ package telegram import ( "context" + "fmt" "strings" "testing" + "time" tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" "github.com/memohai/memoh/internal/channel" @@ -304,3 +306,109 @@ func TestTelegramAdapter_NormalizeAndResolve(t *testing.T) { t.Fatalf("ResolveTarget: %s", target) } } + +func TestIsTelegramMessageNotModified(t *testing.T) { + t.Parallel() + + // Exact production error from Telegram API (editMessageText when content unchanged). + const productionMessageNotModified = "Bad Request: message is not modified: specified new message content and reply markup are exactly the same as a current content and reply markup of the message" + + tests := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"plain error", fmt.Errorf("network error"), false}, + {"other api error", tgbotapi.Error{Code: 400, Message: "Bad Request: chat not found"}, false}, + {"message is not modified", tgbotapi.Error{Code: 400, Message: productionMessageNotModified}, true}, + {"production exact", tgbotapi.Error{Code: 400, Message: productionMessageNotModified}, true}, + {"same text but code 500", tgbotapi.Error{Code: 500, Message: "message is not modified"}, false}, + {"wrapped same", fmt.Errorf("wrapped: %w", tgbotapi.Error{Code: 400, Message: "Bad Request: message is not modified"}), true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isTelegramMessageNotModified(tt.err) + if got != tt.want { + t.Fatalf("isTelegramMessageNotModified() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestIsTelegramTooManyRequests(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"429", tgbotapi.Error{Code: 429, Message: "Too Many Requests"}, true}, + {"400", tgbotapi.Error{Code: 400, Message: "Bad Request"}, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isTelegramTooManyRequests(tt.err) + if got != tt.want { + t.Fatalf("isTelegramTooManyRequests() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetTelegramRetryAfter(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + want time.Duration + }{ + {"nil", nil, 0}, + {"no retry_after", tgbotapi.Error{Code: 429, Message: "Too Many Requests"}, 0}, + {"retry_after 2", tgbotapi.Error{Code: 429, Message: "Too Many Requests", ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 2}}, 2 * time.Second}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := getTelegramRetryAfter(tt.err) + if got != tt.want { + t.Fatalf("getTelegramRetryAfter() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestEditTelegramMessageText_429RetryThenSuccess(t *testing.T) { + t.Parallel() + + var sendCalls int + origSend := sendEditForTest + origSleep := sleepForTest + sendEditForTest = func(_ *tgbotapi.BotAPI, _ tgbotapi.EditMessageTextConfig) error { + sendCalls++ + if sendCalls == 1 { + return tgbotapi.Error{ + Code: 429, + Message: "Too Many Requests", + ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 1}, + } + } + return nil + } + sleepForTest = func(time.Duration) {} + defer func() { + sendEditForTest = origSend + sleepForTest = origSleep + }() + + bot := &tgbotapi.BotAPI{Token: "test"} + err := editTelegramMessageText(bot, 1, 1, "hi", "") + if err != nil { + t.Fatalf("editTelegramMessageText after 429 retry should return nil: %v", err) + } + if sendCalls != 2 { + t.Fatalf("send should be called twice (first 429, then retry): got %d", sendCalls) + } +}