mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-25 07:00:48 +09:00
fix(telegram): dedupe repeated inbound updates
This commit is contained in:
@@ -27,6 +27,7 @@ import (
|
||||
const (
|
||||
telegramMaxMessageLength = 4096
|
||||
telegramMediaGroupCollectWindow = 700 * time.Millisecond
|
||||
telegramUpdateDedupeTTL = 10 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -52,6 +53,8 @@ type TelegramAdapter struct {
|
||||
fileEndpoints map[*tgbotapi.BotAPI]string // bot instance → file endpoint format string
|
||||
assets assetOpener
|
||||
streamLimiter *rate.Limiter // global rate limiter for all streaming API calls
|
||||
seenUpdatesMu sync.Mutex
|
||||
seenUpdates map[string]time.Time
|
||||
}
|
||||
|
||||
// NewTelegramAdapter creates a TelegramAdapter with the given logger.
|
||||
@@ -64,6 +67,7 @@ func NewTelegramAdapter(log *slog.Logger) *TelegramAdapter {
|
||||
bots: make(map[string]*tgbotapi.BotAPI),
|
||||
fileEndpoints: make(map[*tgbotapi.BotAPI]string),
|
||||
streamLimiter: rate.NewLimiter(rate.Every(time.Second), 3), // 1 req/s sustained, burst of 3
|
||||
seenUpdates: make(map[string]time.Time),
|
||||
}
|
||||
initTelegramBotLogger(adapter.logger)
|
||||
return adapter
|
||||
@@ -371,11 +375,20 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig
|
||||
if update.Message == nil {
|
||||
continue
|
||||
}
|
||||
if a.seenTelegramUpdate(cfg.ID, update.UpdateID, time.Now()) {
|
||||
if a.logger != nil {
|
||||
a.logger.Debug("skip duplicate telegram update",
|
||||
slog.String("config_id", cfg.ID),
|
||||
slog.Int("update_id", update.UpdateID),
|
||||
)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if queueMediaGroup(update.Message) {
|
||||
continue
|
||||
}
|
||||
flushMediaGroupsByChat(telegramChatID(update.Message))
|
||||
msg, ok := a.buildTelegramInboundMessage(bot, cfg, update.Message)
|
||||
msg, ok := a.buildTelegramInboundMessage(bot, cfg, update)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
@@ -437,14 +450,20 @@ func (a *TelegramAdapter) dispatchInbound(ctx context.Context, cfg channel.Chann
|
||||
}()
|
||||
}
|
||||
|
||||
func (a *TelegramAdapter) buildTelegramInboundMessage(bot *tgbotapi.BotAPI, cfg channel.ChannelConfig, raw *tgbotapi.Message) (channel.InboundMessage, bool) {
|
||||
func (a *TelegramAdapter) buildTelegramInboundMessage(bot *tgbotapi.BotAPI, cfg channel.ChannelConfig, update tgbotapi.Update) (channel.InboundMessage, bool) {
|
||||
raw := update.Message
|
||||
if raw == nil {
|
||||
return channel.InboundMessage{}, false
|
||||
}
|
||||
text := strings.TrimSpace(raw.Text)
|
||||
caption := strings.TrimSpace(raw.Caption)
|
||||
if text == "" && caption != "" {
|
||||
text = caption
|
||||
}
|
||||
attachments := a.collectTelegramAttachments(bot, raw)
|
||||
return a.toInboundTelegramMessage(bot, cfg, raw, text, attachments, nil)
|
||||
return a.toInboundTelegramMessage(bot, cfg, raw, text, attachments, map[string]any{
|
||||
"update_id": update.UpdateID,
|
||||
})
|
||||
}
|
||||
|
||||
func (a *TelegramAdapter) buildTelegramMediaGroupInboundMessage(
|
||||
@@ -506,6 +525,33 @@ func (a *TelegramAdapter) buildTelegramMediaGroupInboundMessage(
|
||||
return a.toInboundTelegramMessage(bot, cfg, anchor, text, attachments, metadata)
|
||||
}
|
||||
|
||||
func (a *TelegramAdapter) seenTelegramUpdate(configID string, updateID int, now time.Time) bool {
|
||||
if a == nil || updateID <= 0 {
|
||||
return false
|
||||
}
|
||||
key := strings.TrimSpace(configID) + ":" + strconv.Itoa(updateID)
|
||||
if key == ":" {
|
||||
return false
|
||||
}
|
||||
|
||||
cutoff := now.Add(-telegramUpdateDedupeTTL)
|
||||
|
||||
a.seenUpdatesMu.Lock()
|
||||
defer a.seenUpdatesMu.Unlock()
|
||||
|
||||
for seenKey, seenAt := range a.seenUpdates {
|
||||
if seenAt.Before(cutoff) {
|
||||
delete(a.seenUpdates, seenKey)
|
||||
}
|
||||
}
|
||||
|
||||
if _, exists := a.seenUpdates[key]; exists {
|
||||
return true
|
||||
}
|
||||
a.seenUpdates[key] = now
|
||||
return false
|
||||
}
|
||||
|
||||
func (a *TelegramAdapter) toInboundTelegramMessage(
|
||||
bot *tgbotapi.BotAPI,
|
||||
_ channel.ChannelConfig,
|
||||
|
||||
@@ -258,6 +258,62 @@ func TestBuildTelegramMediaGroupInboundMessageAggregatesAttachments(t *testing.T
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildTelegramInboundMessageIncludesUpdateIDMetadata(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
adapter := NewTelegramAdapter(nil)
|
||||
bot := &tgbotapi.BotAPI{
|
||||
Token: "test",
|
||||
Self: tgbotapi.User{ID: 1001, UserName: "memohbot"},
|
||||
}
|
||||
update := tgbotapi.Update{
|
||||
UpdateID: 777,
|
||||
Message: &tgbotapi.Message{
|
||||
MessageID: 101,
|
||||
Date: 1710000000,
|
||||
Text: "hello",
|
||||
Chat: &tgbotapi.Chat{ID: 123, Type: "private"},
|
||||
From: &tgbotapi.User{ID: 10, UserName: "alice"},
|
||||
},
|
||||
}
|
||||
|
||||
inbound, ok := adapter.buildTelegramInboundMessage(bot, channel.ChannelConfig{}, update)
|
||||
if !ok {
|
||||
t.Fatal("expected inbound message")
|
||||
}
|
||||
if got := inbound.Metadata["update_id"]; got != 777 {
|
||||
t.Fatalf("unexpected update_id metadata: %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeenTelegramUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
adapter := NewTelegramAdapter(nil)
|
||||
now := time.Unix(1710000000, 0)
|
||||
|
||||
if adapter.seenTelegramUpdate("cfg-1", 42, now) {
|
||||
t.Fatal("first update should not be treated as duplicate")
|
||||
}
|
||||
if !adapter.seenTelegramUpdate("cfg-1", 42, now.Add(time.Second)) {
|
||||
t.Fatal("second update should be treated as duplicate")
|
||||
}
|
||||
if adapter.seenTelegramUpdate("cfg-2", 42, now.Add(time.Second)) {
|
||||
t.Fatal("same update_id under different config should not collide")
|
||||
}
|
||||
if adapter.seenTelegramUpdate("cfg-1", 43, now.Add(time.Second)) {
|
||||
t.Fatal("different update_id should not collide")
|
||||
}
|
||||
if adapter.seenTelegramUpdate("cfg-1", 0, now.Add(time.Second)) {
|
||||
t.Fatal("zero update_id should bypass dedupe")
|
||||
}
|
||||
|
||||
later := now.Add(telegramUpdateDedupeTTL + time.Second)
|
||||
if adapter.seenTelegramUpdate("cfg-1", 42, later) {
|
||||
t.Fatal("expired dedupe entry should be accepted again")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsTelegramMediaGroupForChat(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user