fix(telegram): improve stream edit throttle and 429 rate limit handling

Increase edit throttle from 250ms to 1500ms to respect Telegram per-chat limits,
remove newline bypass that effectively disabled throttling, add dedicated
editStreamMessageFinal with retry for final message delivery, and simplify
editTelegramMessageText by removing blocking sleep in favor of caller-level retry.
This commit is contained in:
BBQ
2026-02-13 20:21:10 +08:00
parent c46f284556
commit f1d53e1c2c
4 changed files with 149 additions and 45 deletions
+60 -8
View File
@@ -14,7 +14,7 @@ import (
"github.com/memohai/memoh/internal/channel"
)
const telegramStreamEditThrottle = 250 * time.Millisecond
const telegramStreamEditThrottle = 5000 * time.Millisecond
var testEditFunc func(bot *tgbotapi.BotAPI, chatID int64, msgID int, text string, parseMode string) error
@@ -86,7 +86,7 @@ func (s *telegramOutboundStream) editStreamMessage(ctx context.Context, text str
if strings.TrimSpace(text) == lastEdited {
return nil
}
if time.Since(lastEditedAt) < telegramStreamEditThrottle && !strings.Contains(text, "\n") {
if time.Since(lastEditedAt) < telegramStreamEditThrottle {
return nil
}
bot, _, err := s.getBotAndReply(ctx)
@@ -101,11 +101,13 @@ func (s *telegramOutboundStream) editStreamMessage(ctx context.Context, text str
}
if editErr != nil {
if isTelegramTooManyRequests(editErr) {
if d := getTelegramRetryAfter(editErr); d > 0 {
s.mu.Lock()
s.lastEditedAt = time.Now().Add(d)
s.mu.Unlock()
d := getTelegramRetryAfter(editErr)
if d <= 0 {
d = telegramStreamEditThrottle
}
s.mu.Lock()
s.lastEditedAt = time.Now().Add(d)
s.mu.Unlock()
return nil
}
return editErr
@@ -117,6 +119,56 @@ func (s *telegramOutboundStream) editStreamMessage(ctx context.Context, text str
return nil
}
const telegramFinalEditMaxRetries = 3
// editStreamMessageFinal edits the streamed message for the final content.
// Retries on 429 with server-provided backoff to ensure delivery.
func (s *telegramOutboundStream) editStreamMessageFinal(ctx context.Context, text string) error {
s.mu.Lock()
chatID := s.streamChatID
msgID := s.streamMsgID
lastEdited := s.lastEdited
s.mu.Unlock()
if msgID == 0 {
return nil
}
if strings.TrimSpace(text) == lastEdited {
return nil
}
bot, _, err := s.getBotAndReply(ctx)
if err != nil {
return err
}
for attempt := range telegramFinalEditMaxRetries {
editErr := error(nil)
if testEditFunc != nil {
editErr = testEditFunc(bot, chatID, msgID, text, s.parseMode)
} else {
editErr = editTelegramMessageText(bot, chatID, msgID, text, s.parseMode)
}
if editErr == nil {
s.mu.Lock()
s.lastEdited = text
s.lastEditedAt = time.Now()
s.mu.Unlock()
return nil
}
if !isTelegramTooManyRequests(editErr) {
return editErr
}
d := getTelegramRetryAfter(editErr)
if d <= 0 {
d = time.Duration(attempt+1) * time.Second
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d):
}
}
return nil
}
func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamEvent) error {
if s == nil || s.adapter == nil {
return fmt.Errorf("telegram stream not configured")
@@ -153,7 +205,7 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
if err := s.ensureStreamMessage(ctx, finalText); err != nil {
slog.Warn("telegram: ensure stream message failed", slog.Any("error", err))
}
if err := s.editStreamMessage(ctx, finalText); err != nil {
if err := s.editStreamMessageFinal(ctx, finalText); err != nil {
slog.Warn("telegram: edit stream message failed", slog.Any("error", err))
}
}
@@ -177,7 +229,7 @@ func (s *telegramOutboundStream) Push(ctx context.Context, event channel.StreamE
if err := s.ensureStreamMessage(ctx, finalText); err != nil {
return err
}
if err := s.editStreamMessage(ctx, finalText); err != nil {
if err := s.editStreamMessageFinal(ctx, finalText); err != nil {
return err
}
if len(msg.Attachments) > 0 {
@@ -175,13 +175,13 @@ func TestEditStreamMessage_NoEditWhenThrottled(t *testing.T) {
streamChatID: 1,
streamMsgID: 1,
lastEdited: "a",
lastEditedAt: time.Now(), // just now, within 350ms
lastEditedAt: time.Now(), // just now, within throttle window
}
ctx := context.Background()
err := s.editStreamMessage(ctx, "ab")
if err != nil {
t.Fatalf("editStreamMessage within throttle window and no newline should skip edit and return nil: %v", err)
t.Fatalf("editStreamMessage within throttle window should skip edit and return nil: %v", err)
}
}
@@ -232,3 +232,74 @@ func TestEditStreamMessage_429SetsBackoffAndReturnsNil(t *testing.T) {
t.Fatalf("on 429 lastEditedAt should be pushed forward for backoff: got %v", lastEditedAt)
}
}
func TestEditStreamMessageFinal_Success(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
cfg: channel.ChannelConfig{ID: "test", Credentials: map[string]any{"bot_token": "fake"}},
streamChatID: 1,
streamMsgID: 1,
lastEdited: "a",
lastEditedAt: time.Now().Add(-time.Minute),
}
ctx := context.Background()
origGetBot := getOrCreateBotForTest
origEdit := testEditFunc
getOrCreateBotForTest = func(_ *TelegramAdapter, _, _ string) (*tgbotapi.BotAPI, error) {
return &tgbotapi.BotAPI{Token: "fake"}, nil
}
testEditFunc = func(*tgbotapi.BotAPI, int64, int, string, string) error {
return nil
}
defer func() {
getOrCreateBotForTest = origGetBot
testEditFunc = origEdit
}()
err := s.editStreamMessageFinal(ctx, "final text")
if err != nil {
t.Fatalf("editStreamMessageFinal should succeed: %v", err)
}
s.mu.Lock()
lastEdited := s.lastEdited
s.mu.Unlock()
if lastEdited != "final text" {
t.Fatalf("expected lastEdited to be updated: got %q", lastEdited)
}
}
func TestEditStreamMessageFinal_SameContentNoOp(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{
adapter: adapter,
streamChatID: 1,
streamMsgID: 1,
lastEdited: "same",
lastEditedAt: time.Now(),
}
ctx := context.Background()
err := s.editStreamMessageFinal(ctx, "same")
if err != nil {
t.Fatalf("editStreamMessageFinal with same content should return nil: %v", err)
}
}
func TestEditStreamMessageFinal_NoMessageNoOp(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
s := &telegramOutboundStream{adapter: adapter, streamMsgID: 0}
ctx := context.Background()
err := s.editStreamMessageFinal(ctx, "any")
if err != nil {
t.Fatalf("editStreamMessageFinal when streamMsgID==0 should return nil: %v", err)
}
}
+3 -17
View File
@@ -452,11 +452,10 @@ func sendTelegramTextReturnMessage(bot *tgbotapi.BotAPI, target string, text str
return chatID, messageID, nil
}
var (
sendEditForTest func(bot *tgbotapi.BotAPI, edit tgbotapi.EditMessageTextConfig) error
sleepForTest func(time.Duration)
)
var sendEditForTest func(bot *tgbotapi.BotAPI, edit tgbotapi.EditMessageTextConfig) error
// editTelegramMessageText sends an edit request. It handles "message is not modified"
// silently but returns 429 and other errors to the caller for higher-level retry decisions.
func editTelegramMessageText(bot *tgbotapi.BotAPI, chatID int64, messageID int, text string, parseMode string) error {
if len(text) > telegramMaxMessageLength {
text = text[:telegramMaxMessageLength-3] + "..."
@@ -467,23 +466,10 @@ func editTelegramMessageText(bot *tgbotapi.BotAPI, chatID int64, messageID int,
if send == nil {
send = func(b *tgbotapi.BotAPI, e tgbotapi.EditMessageTextConfig) error { _, err := b.Send(e); return err }
}
sleep := sleepForTest
if sleep == nil {
sleep = time.Sleep
}
err := send(bot, edit)
if err != nil && isTelegramMessageNotModified(err) {
return nil
}
if err != nil && isTelegramTooManyRequests(err) {
if d := getTelegramRetryAfter(err); d > 0 {
sleep(d)
err = send(bot, edit)
if err != nil && isTelegramMessageNotModified(err) {
return nil
}
}
}
return err
}
@@ -380,35 +380,30 @@ func TestGetTelegramRetryAfter(t *testing.T) {
}
}
func TestEditTelegramMessageText_429RetryThenSuccess(t *testing.T) {
func TestEditTelegramMessageText_429ReturnsError(t *testing.T) {
t.Parallel()
var sendCalls int
origSend := sendEditForTest
origSleep := sleepForTest
sendEditForTest = func(_ *tgbotapi.BotAPI, _ tgbotapi.EditMessageTextConfig) error {
sendCalls++
if sendCalls == 1 {
return tgbotapi.Error{
Code: 429,
Message: "Too Many Requests",
ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 1},
}
return tgbotapi.Error{
Code: 429,
Message: "Too Many Requests",
ResponseParameters: tgbotapi.ResponseParameters{RetryAfter: 1},
}
return nil
}
sleepForTest = func(time.Duration) {}
defer func() {
sendEditForTest = origSend
sleepForTest = origSleep
}()
defer func() { sendEditForTest = origSend }()
bot := &tgbotapi.BotAPI{Token: "test"}
err := editTelegramMessageText(bot, 1, 1, "hi", "")
if err != nil {
t.Fatalf("editTelegramMessageText after 429 retry should return nil: %v", err)
if err == nil {
t.Fatal("editTelegramMessageText on 429 should return error for caller to handle")
}
if sendCalls != 2 {
t.Fatalf("send should be called twice (first 429, then retry): got %d", sendCalls)
if !isTelegramTooManyRequests(err) {
t.Fatalf("expected 429 error: %v", err)
}
if sendCalls != 1 {
t.Fatalf("send should be called once (no internal retry): got %d", sendCalls)
}
}