mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
fix(telegram): handle stream edit errors and 429 rate limit
- Treat 400 "message is not modified" as success to avoid user-facing error - On 429: sleep retry_after and retry once in editTelegramMessageText; stream backs off lastEditedAt and returns nil - Require error code 400 for message-not-modified check; add production error string to unit tests - Lower base throttle to 250ms; add test hooks and tests for 429 retry and stream backoff
This commit is contained in:
@@ -14,7 +14,9 @@ import (
|
||||
"github.com/memohai/memoh/internal/channel"
|
||||
)
|
||||
|
||||
const telegramStreamEditThrottle = 350 * time.Millisecond
|
||||
const telegramStreamEditThrottle = 250 * time.Millisecond
|
||||
|
||||
var testEditFunc func(bot *tgbotapi.BotAPI, chatID int64, msgID int, text string, parseMode string) error
|
||||
|
||||
type telegramOutboundStream struct {
|
||||
adapter *TelegramAdapter
|
||||
@@ -91,8 +93,22 @@ func (s *telegramOutboundStream) editStreamMessage(ctx context.Context, text str
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := editTelegramMessageText(bot, chatID, msgID, text, s.parseMode); err != nil {
|
||||
return err
|
||||
editErr := error(nil)
|
||||
if testEditFunc != nil {
|
||||
editErr = testEditFunc(bot, chatID, msgID, text, s.parseMode)
|
||||
} else {
|
||||
editErr = editTelegramMessageText(bot, chatID, msgID, text, s.parseMode)
|
||||
}
|
||||
if editErr != nil {
|
||||
if isTelegramTooManyRequests(editErr) {
|
||||
if d := getTelegramRetryAfter(editErr); d > 0 {
|
||||
s.mu.Lock()
|
||||
s.lastEditedAt = time.Now().Add(d)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return editErr
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.lastEdited = text
|
||||
|
||||
@@ -4,7 +4,9 @@ import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
|
||||
"github.com/memohai/memoh/internal/channel"
|
||||
)
|
||||
|
||||
@@ -117,3 +119,116 @@ func TestTelegramOutboundStream_CloseContextCanceled(t *testing.T) {
|
||||
t.Fatalf("Close with canceled context should return context.Canceled: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test editStreamMessage dedup: no API call when content equals lastEdited (avoids Telegram "message is not modified" error).
|
||||
func TestEditStreamMessage_NoEditWhenSameContent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
adapter := NewTelegramAdapter(nil)
|
||||
s := &telegramOutboundStream{
|
||||
adapter: adapter,
|
||||
streamChatID: 1,
|
||||
streamMsgID: 1,
|
||||
lastEdited: "hello",
|
||||
lastEditedAt: time.Now().Add(-time.Minute),
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
text string
|
||||
}{
|
||||
{"exact same", "hello"},
|
||||
{"trimmed same", " hello "},
|
||||
{"leading space", " hello"},
|
||||
{"trailing space", "hello "},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := s.editStreamMessage(ctx, tt.text)
|
||||
if err != nil {
|
||||
t.Fatalf("editStreamMessage(same content) should return nil to avoid duplicate edit API call: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEditStreamMessage_NoEditWhenMessageNotSent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
adapter := NewTelegramAdapter(nil)
|
||||
s := &telegramOutboundStream{adapter: adapter, streamMsgID: 0}
|
||||
ctx := context.Background()
|
||||
|
||||
err := s.editStreamMessage(ctx, "any")
|
||||
if err != nil {
|
||||
t.Fatalf("editStreamMessage when streamMsgID==0 should return nil: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEditStreamMessage_NoEditWhenThrottled(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
adapter := NewTelegramAdapter(nil)
|
||||
s := &telegramOutboundStream{
|
||||
adapter: adapter,
|
||||
streamChatID: 1,
|
||||
streamMsgID: 1,
|
||||
lastEdited: "a",
|
||||
lastEditedAt: time.Now(), // just now, within 350ms
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
err := s.editStreamMessage(ctx, "ab")
|
||||
if err != nil {
|
||||
t.Fatalf("editStreamMessage within throttle window and no newline should skip edit and return nil: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEditStreamMessage_429SetsBackoffAndReturnsNil(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
adapter := NewTelegramAdapter(nil)
|
||||
before := time.Now().Add(-time.Minute)
|
||||
s := &telegramOutboundStream{
|
||||
adapter: adapter,
|
||||
cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}},
|
||||
streamChatID: 1,
|
||||
streamMsgID: 1,
|
||||
lastEdited: "a",
|
||||
lastEditedAt: before,
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
origGetBot := getOrCreateBotForTest
|
||||
origEdit := testEditFunc
|
||||
getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) {
|
||||
return &tgbotapi.BotAPI{Token: "fake"}, nil
|
||||
}
|
||||
testEditFunc = func(*tgbotapi.BotAPI, int64, int, string, string) error {
|
||||
return tgbotapi.Error{
|
||||
Code: 429,
|
||||
Message: "Too Many Requests",
|
||||
ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 2},
|
||||
}
|
||||
}
|
||||
defer func() {
|
||||
getOrCreateBotForTest = origGetBot
|
||||
testEditFunc = origEdit
|
||||
}()
|
||||
|
||||
err := s.editStreamMessage(ctx, "b")
|
||||
if err != nil {
|
||||
t.Fatalf("editStreamMessage on 429 should return nil (backoff): %v", err)
|
||||
}
|
||||
s.mu.Lock()
|
||||
lastEdited := s.lastEdited
|
||||
lastEditedAt := s.lastEditedAt
|
||||
s.mu.Unlock()
|
||||
if lastEdited != "a" {
|
||||
t.Fatalf("on 429 lastEdited should remain unchanged: got %q", lastEdited)
|
||||
}
|
||||
if !lastEditedAt.After(before) {
|
||||
t.Fatalf("on 429 lastEditedAt should be pushed forward for backoff: got %v", lastEditedAt)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package telegram
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
@@ -37,7 +38,12 @@ func NewTelegramAdapter(log *slog.Logger) *TelegramAdapter {
|
||||
return adapter
|
||||
}
|
||||
|
||||
var getOrCreateBotForTest func(a *TelegramAdapter, token, configID string) (*tgbotapi.BotAPI, error)
|
||||
|
||||
func (a *TelegramAdapter) getOrCreateBot(token, configID string) (*tgbotapi.BotAPI, error) {
|
||||
if getOrCreateBotForTest != nil {
|
||||
return getOrCreateBotForTest(a, token, configID)
|
||||
}
|
||||
a.mu.RLock()
|
||||
bot, ok := a.bots[token]
|
||||
a.mu.RUnlock()
|
||||
@@ -446,16 +452,74 @@ func sendTelegramTextReturnMessage(bot *tgbotapi.BotAPI, target string, text str
|
||||
return chatID, messageID, nil
|
||||
}
|
||||
|
||||
var (
|
||||
sendEditForTest func(bot *tgbotapi.BotAPI, edit tgbotapi.EditMessageTextConfig) error
|
||||
sleepForTest func(time.Duration)
|
||||
)
|
||||
|
||||
func editTelegramMessageText(bot *tgbotapi.BotAPI, chatID int64, messageID int, text string, parseMode string) error {
|
||||
if len(text) > telegramMaxMessageLength {
|
||||
text = text[:telegramMaxMessageLength-3] + "..."
|
||||
}
|
||||
edit := tgbotapi.NewEditMessageText(chatID, messageID, text)
|
||||
edit.ParseMode = parseMode
|
||||
_, err := bot.Send(edit)
|
||||
send := sendEditForTest
|
||||
if send == nil {
|
||||
send = func(b *tgbotapi.BotAPI, e tgbotapi.EditMessageTextConfig) error { _, err := b.Send(e); return err }
|
||||
}
|
||||
sleep := sleepForTest
|
||||
if sleep == nil {
|
||||
sleep = time.Sleep
|
||||
}
|
||||
err := send(bot, edit)
|
||||
if err != nil && isTelegramMessageNotModified(err) {
|
||||
return nil
|
||||
}
|
||||
if err != nil && isTelegramTooManyRequests(err) {
|
||||
if d := getTelegramRetryAfter(err); d > 0 {
|
||||
sleep(d)
|
||||
err = send(bot, edit)
|
||||
if err != nil && isTelegramMessageNotModified(err) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func isTelegramMessageNotModified(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
var apiErr tgbotapi.Error
|
||||
if errors.As(err, &apiErr) {
|
||||
return apiErr.Code == 400 && strings.Contains(apiErr.Message, "message is not modified")
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isTelegramTooManyRequests(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
var apiErr tgbotapi.Error
|
||||
if errors.As(err, &apiErr) {
|
||||
return apiErr.Code == 429
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func getTelegramRetryAfter(err error) time.Duration {
|
||||
if err == nil {
|
||||
return 0
|
||||
}
|
||||
var apiErr tgbotapi.Error
|
||||
if errors.As(err, &apiErr) && apiErr.RetryAfter > 0 {
|
||||
return time.Duration(apiErr.RetryAfter) * time.Second
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func sendTelegramAttachment(bot *tgbotapi.BotAPI, target string, att channel.Attachment, caption string, replyTo int, parseMode string) error {
|
||||
urlRef := strings.TrimSpace(att.URL)
|
||||
keyRef := strings.TrimSpace(att.PlatformKey)
|
||||
|
||||
@@ -2,8 +2,10 @@ package telegram
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
|
||||
"github.com/memohai/memoh/internal/channel"
|
||||
@@ -304,3 +306,109 @@ func TestTelegramAdapter_NormalizeAndResolve(t *testing.T) {
|
||||
t.Fatalf("ResolveTarget: %s", target)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsTelegramMessageNotModified(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Exact production error from Telegram API (editMessageText when content unchanged).
|
||||
const productionMessageNotModified = "Bad Request: message is not modified: specified new message content and reply markup are exactly the same as a current content and reply markup of the message"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
err error
|
||||
want bool
|
||||
}{
|
||||
{"nil", nil, false},
|
||||
{"plain error", fmt.Errorf("network error"), false},
|
||||
{"other api error", tgbotapi.Error{Code: 400, Message: "Bad Request: chat not found"}, false},
|
||||
{"message is not modified", tgbotapi.Error{Code: 400, Message: productionMessageNotModified}, true},
|
||||
{"production exact", tgbotapi.Error{Code: 400, Message: productionMessageNotModified}, true},
|
||||
{"same text but code 500", tgbotapi.Error{Code: 500, Message: "message is not modified"}, false},
|
||||
{"wrapped same", fmt.Errorf("wrapped: %w", tgbotapi.Error{Code: 400, Message: "Bad Request: message is not modified"}), true},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := isTelegramMessageNotModified(tt.err)
|
||||
if got != tt.want {
|
||||
t.Fatalf("isTelegramMessageNotModified() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsTelegramTooManyRequests(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
err error
|
||||
want bool
|
||||
}{
|
||||
{"nil", nil, false},
|
||||
{"429", tgbotapi.Error{Code: 429, Message: "Too Many Requests"}, true},
|
||||
{"400", tgbotapi.Error{Code: 400, Message: "Bad Request"}, false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := isTelegramTooManyRequests(tt.err)
|
||||
if got != tt.want {
|
||||
t.Fatalf("isTelegramTooManyRequests() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTelegramRetryAfter(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
err error
|
||||
want time.Duration
|
||||
}{
|
||||
{"nil", nil, 0},
|
||||
{"no retry_after", tgbotapi.Error{Code: 429, Message: "Too Many Requests"}, 0},
|
||||
{"retry_after 2", tgbotapi.Error{Code: 429, Message: "Too Many Requests", ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 2}}, 2 * time.Second},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := getTelegramRetryAfter(tt.err)
|
||||
if got != tt.want {
|
||||
t.Fatalf("getTelegramRetryAfter() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEditTelegramMessageText_429RetryThenSuccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var sendCalls int
|
||||
origSend := sendEditForTest
|
||||
origSleep := sleepForTest
|
||||
sendEditForTest = func(_ *tgbotapi.BotAPI, _ tgbotapi.EditMessageTextConfig) error {
|
||||
sendCalls++
|
||||
if sendCalls == 1 {
|
||||
return tgbotapi.Error{
|
||||
Code: 429,
|
||||
Message: "Too Many Requests",
|
||||
ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 1},
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
sleepForTest = func(time.Duration) {}
|
||||
defer func() {
|
||||
sendEditForTest = origSend
|
||||
sleepForTest = origSleep
|
||||
}()
|
||||
|
||||
bot := &tgbotapi.BotAPI{Token: "test"}
|
||||
err := editTelegramMessageText(bot, 1, 1, "hi", "")
|
||||
if err != nil {
|
||||
t.Fatalf("editTelegramMessageText after 429 retry should return nil: %v", err)
|
||||
}
|
||||
if sendCalls != 2 {
|
||||
t.Fatalf("send should be called twice (first 429, then retry): got %d", sendCalls)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user