diff --git a/internal/channel/adapters/telegram/telegram.go b/internal/channel/adapters/telegram/telegram.go index b3e79e4a..f9ff2f0e 100644 --- a/internal/channel/adapters/telegram/telegram.go +++ b/internal/channel/adapters/telegram/telegram.go @@ -27,6 +27,7 @@ import ( const ( telegramMaxMessageLength = 4096 telegramMediaGroupCollectWindow = 700 * time.Millisecond + telegramUpdateDedupeTTL = 10 * time.Minute ) var ( @@ -52,6 +53,8 @@ type TelegramAdapter struct { fileEndpoints map[*tgbotapi.BotAPI]string // bot instance → file endpoint format string assets assetOpener streamLimiter *rate.Limiter // global rate limiter for all streaming API calls + seenUpdatesMu sync.Mutex + seenUpdates map[string]time.Time } // NewTelegramAdapter creates a TelegramAdapter with the given logger. @@ -64,6 +67,7 @@ func NewTelegramAdapter(log *slog.Logger) *TelegramAdapter { bots: make(map[string]*tgbotapi.BotAPI), fileEndpoints: make(map[*tgbotapi.BotAPI]string), streamLimiter: rate.NewLimiter(rate.Every(time.Second), 3), // 1 req/s sustained, burst of 3 + seenUpdates: make(map[string]time.Time), } initTelegramBotLogger(adapter.logger) return adapter @@ -371,11 +375,20 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig if update.Message == nil { continue } + if a.seenTelegramUpdate(cfg.ID, update.UpdateID, time.Now()) { + if a.logger != nil { + a.logger.Debug("skip duplicate telegram update", + slog.String("config_id", cfg.ID), + slog.Int("update_id", update.UpdateID), + ) + } + continue + } if queueMediaGroup(update.Message) { continue } flushMediaGroupsByChat(telegramChatID(update.Message)) - msg, ok := a.buildTelegramInboundMessage(bot, cfg, update.Message) + msg, ok := a.buildTelegramInboundMessage(bot, cfg, update) if !ok { continue } @@ -437,14 +450,20 @@ func (a *TelegramAdapter) dispatchInbound(ctx context.Context, cfg channel.Chann }() } -func (a *TelegramAdapter) buildTelegramInboundMessage(bot *tgbotapi.BotAPI, cfg channel.ChannelConfig, raw *tgbotapi.Message) (channel.InboundMessage, bool) { +func (a *TelegramAdapter) buildTelegramInboundMessage(bot *tgbotapi.BotAPI, cfg channel.ChannelConfig, update tgbotapi.Update) (channel.InboundMessage, bool) { + raw := update.Message + if raw == nil { + return channel.InboundMessage{}, false + } text := strings.TrimSpace(raw.Text) caption := strings.TrimSpace(raw.Caption) if text == "" && caption != "" { text = caption } attachments := a.collectTelegramAttachments(bot, raw) - return a.toInboundTelegramMessage(bot, cfg, raw, text, attachments, nil) + return a.toInboundTelegramMessage(bot, cfg, raw, text, attachments, map[string]any{ + "update_id": update.UpdateID, + }) } func (a *TelegramAdapter) buildTelegramMediaGroupInboundMessage( @@ -506,6 +525,33 @@ func (a *TelegramAdapter) buildTelegramMediaGroupInboundMessage( return a.toInboundTelegramMessage(bot, cfg, anchor, text, attachments, metadata) } +func (a *TelegramAdapter) seenTelegramUpdate(configID string, updateID int, now time.Time) bool { + if a == nil || updateID <= 0 { + return false + } + key := strings.TrimSpace(configID) + ":" + strconv.Itoa(updateID) + if key == ":" { + return false + } + + cutoff := now.Add(-telegramUpdateDedupeTTL) + + a.seenUpdatesMu.Lock() + defer a.seenUpdatesMu.Unlock() + + for seenKey, seenAt := range a.seenUpdates { + if seenAt.Before(cutoff) { + delete(a.seenUpdates, seenKey) + } + } + + if _, exists := a.seenUpdates[key]; exists { + return true + } + a.seenUpdates[key] = now + return false +} + func (a *TelegramAdapter) toInboundTelegramMessage( bot *tgbotapi.BotAPI, _ channel.ChannelConfig, diff --git a/internal/channel/adapters/telegram/telegram_test.go b/internal/channel/adapters/telegram/telegram_test.go index e48f2024..70e01e92 100644 --- a/internal/channel/adapters/telegram/telegram_test.go +++ b/internal/channel/adapters/telegram/telegram_test.go @@ -258,6 +258,62 @@ func TestBuildTelegramMediaGroupInboundMessageAggregatesAttachments(t *testing.T } } +func TestBuildTelegramInboundMessageIncludesUpdateIDMetadata(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + bot := &tgbotapi.BotAPI{ + Token: "test", + Self: tgbotapi.User{ID: 1001, UserName: "memohbot"}, + } + update := tgbotapi.Update{ + UpdateID: 777, + Message: &tgbotapi.Message{ + MessageID: 101, + Date: 1710000000, + Text: "hello", + Chat: &tgbotapi.Chat{ID: 123, Type: "private"}, + From: &tgbotapi.User{ID: 10, UserName: "alice"}, + }, + } + + inbound, ok := adapter.buildTelegramInboundMessage(bot, channel.ChannelConfig{}, update) + if !ok { + t.Fatal("expected inbound message") + } + if got := inbound.Metadata["update_id"]; got != 777 { + t.Fatalf("unexpected update_id metadata: %#v", got) + } +} + +func TestSeenTelegramUpdate(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + now := time.Unix(1710000000, 0) + + if adapter.seenTelegramUpdate("cfg-1", 42, now) { + t.Fatal("first update should not be treated as duplicate") + } + if !adapter.seenTelegramUpdate("cfg-1", 42, now.Add(time.Second)) { + t.Fatal("second update should be treated as duplicate") + } + if adapter.seenTelegramUpdate("cfg-2", 42, now.Add(time.Second)) { + t.Fatal("same update_id under different config should not collide") + } + if adapter.seenTelegramUpdate("cfg-1", 43, now.Add(time.Second)) { + t.Fatal("different update_id should not collide") + } + if adapter.seenTelegramUpdate("cfg-1", 0, now.Add(time.Second)) { + t.Fatal("zero update_id should bypass dedupe") + } + + later := now.Add(telegramUpdateDedupeTTL + time.Second) + if adapter.seenTelegramUpdate("cfg-1", 42, later) { + t.Fatal("expired dedupe entry should be accepted again") + } +} + func TestIsTelegramMediaGroupForChat(t *testing.T) { t.Parallel()