From 53f080503c8ff7ac78bc70f5b06e9fa54a90e6c9 Mon Sep 17 00:00:00 2001 From: "Ringo.Typowriter" Date: Thu, 19 Feb 2026 17:14:12 +0800 Subject: [PATCH] fix(telegram): aggregate media_group inbound and preserve ordering with (#69) attachment fallback query --- .../channel/adapters/telegram/telegram.go | 345 ++++++++++++++---- .../adapters/telegram/telegram_test.go | 75 ++++ internal/channel/inbound/channel.go | 13 +- internal/channel/inbound/channel_test.go | 67 ++++ 4 files changed, 435 insertions(+), 65 deletions(-) diff --git a/internal/channel/adapters/telegram/telegram.go b/internal/channel/adapters/telegram/telegram.go index 66c55229..41bc5bae 100644 --- a/internal/channel/adapters/telegram/telegram.go +++ b/internal/channel/adapters/telegram/telegram.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "net/http" + "sort" "strconv" "strings" "sync" @@ -22,6 +23,12 @@ import ( ) const telegramMaxMessageLength = 4096 +const telegramMediaGroupCollectWindow = 700 * time.Millisecond + +type telegramMediaGroupBuffer struct { + messages []*tgbotapi.Message + timer *time.Timer +} // assetOpener reads stored asset bytes by content hash. type assetOpener interface { @@ -182,14 +189,93 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig updateConfig.Timeout = 30 updates := bot.GetUpdatesChan(updateConfig) connCtx, cancel := context.WithCancel(ctx) + mediaGroups := make(map[string]*telegramMediaGroupBuffer) + var mediaGroupsMu sync.Mutex + + var flushMediaGroup func(groupKey string) + flushMediaGroup = func(groupKey string) { + var batch []*tgbotapi.Message + mediaGroupsMu.Lock() + buffer, ok := mediaGroups[groupKey] + if ok { + delete(mediaGroups, groupKey) + batch = append(batch, buffer.messages...) + } + mediaGroupsMu.Unlock() + if !ok || len(batch) == 0 { + return + } + msg, ok := a.buildTelegramMediaGroupInboundMessage(bot, cfg, batch) + if !ok { + return + } + a.dispatchInbound(connCtx, cfg, handler, msg) + } + flushAllMediaGroups := func() { + mediaGroupsMu.Lock() + keys := make([]string, 0, len(mediaGroups)) + for key, buffer := range mediaGroups { + keys = append(keys, key) + if buffer != nil && buffer.timer != nil { + buffer.timer.Stop() + } + } + mediaGroupsMu.Unlock() + for _, key := range keys { + flushMediaGroup(key) + } + } + flushMediaGroupsByChat := func(chatID int64) { + if chatID == 0 { + return + } + mediaGroupsMu.Lock() + keys := make([]string, 0, len(mediaGroups)) + for key, buffer := range mediaGroups { + if !isTelegramMediaGroupForChat(key, chatID) { + continue + } + keys = append(keys, key) + if buffer != nil && buffer.timer != nil { + buffer.timer.Stop() + } + } + mediaGroupsMu.Unlock() + for _, key := range keys { + flushMediaGroup(key) + } + } + queueMediaGroup := func(msg *tgbotapi.Message) bool { + groupKey := telegramMediaGroupKey(msg) + if groupKey == "" { + return false + } + mediaGroupsMu.Lock() + buffer, ok := mediaGroups[groupKey] + if !ok { + buffer = &telegramMediaGroupBuffer{} + mediaGroups[groupKey] = buffer + } + buffer.messages = append(buffer.messages, msg) + if buffer.timer != nil { + buffer.timer.Stop() + } + buffer.timer = time.AfterFunc(telegramMediaGroupCollectWindow, func() { + flushMediaGroup(groupKey) + }) + mediaGroupsMu.Unlock() + return true + } go func() { for { select { case <-connCtx.Done(): + flushAllMediaGroups() return case update, ok := <-updates: if !ok { + flushAllMediaGroups() if a.logger != nil { a.logger.Info("updates channel closed", slog.String("config_id", cfg.ID)) } @@ -198,73 +284,15 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig if update.Message == nil { continue } - text := strings.TrimSpace(update.Message.Text) - caption := strings.TrimSpace(update.Message.Caption) - if text == "" && caption != "" { - text = caption - } - attachments := a.collectTelegramAttachments(bot, update.Message) - if text == "" && len(attachments) == 0 { + if queueMediaGroup(update.Message) { continue } - subjectID, displayName, attrs := resolveTelegramSender(update.Message) - chatID := "" - chatType := "" - chatName := "" - if update.Message.Chat != nil { - chatID = strconv.FormatInt(update.Message.Chat.ID, 10) - chatType = strings.TrimSpace(update.Message.Chat.Type) - chatName = strings.TrimSpace(update.Message.Chat.Title) + flushMediaGroupsByChat(telegramChatID(update.Message)) + msg, ok := a.buildTelegramInboundMessage(bot, cfg, update.Message) + if !ok { + continue } - replyRef := buildTelegramReplyRef(update.Message, chatID) - isReplyToBot := update.Message.ReplyToMessage != nil && - update.Message.ReplyToMessage.From != nil && - update.Message.ReplyToMessage.From.ID == bot.Self.ID - isMentioned := isTelegramBotMentioned(update.Message, bot.Self.UserName) - msg := channel.InboundMessage{ - Channel: Type, - Message: channel.Message{ - ID: strconv.Itoa(update.Message.MessageID), - Format: channel.MessageFormatPlain, - Text: text, - Attachments: attachments, - Reply: replyRef, - }, - BotID: cfg.BotID, - ReplyTarget: chatID, - Sender: channel.Identity{ - SubjectID: subjectID, - DisplayName: displayName, - Attributes: attrs, - }, - Conversation: channel.Conversation{ - ID: chatID, - Type: chatType, - Name: chatName, - }, - ReceivedAt: time.Unix(int64(update.Message.Date), 0).UTC(), - Source: "telegram", - Metadata: map[string]any{ - "is_mentioned": isMentioned, - "is_reply_to_bot": isReplyToBot, - }, - } - if a.logger != nil { - a.logger.Info( - "inbound received", - slog.String("config_id", cfg.ID), - slog.String("chat_type", msg.Conversation.Type), - slog.String("chat_id", msg.Conversation.ID), - slog.String("user_id", attrs["user_id"]), - slog.String("username", attrs["username"]), - slog.String("text", common.SummarizeText(text)), - ) - } - go func() { - if err := handler(connCtx, cfg, msg); err != nil && a.logger != nil { - a.logger.Error("handle inbound failed", slog.String("config_id", cfg.ID), slog.Any("error", err)) - } - }() + a.dispatchInbound(connCtx, cfg, handler, msg) } } }() @@ -287,6 +315,195 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig return channel.NewConnection(cfg, stop), nil } +func telegramMediaGroupKey(msg *tgbotapi.Message) string { + if msg == nil { + return "" + } + mediaGroupID := strings.TrimSpace(msg.MediaGroupID) + if mediaGroupID == "" { + return "" + } + chatID := telegramChatID(msg) + return fmt.Sprintf("%d:%s", chatID, mediaGroupID) +} + +func telegramChatID(msg *tgbotapi.Message) int64 { + if msg == nil || msg.Chat == nil { + return 0 + } + return msg.Chat.ID +} + +func isTelegramMediaGroupForChat(groupKey string, chatID int64) bool { + if chatID == 0 || strings.TrimSpace(groupKey) == "" { + return false + } + return strings.HasPrefix(groupKey, fmt.Sprintf("%d:", chatID)) +} + +func (a *TelegramAdapter) dispatchInbound(ctx context.Context, cfg channel.ChannelConfig, handler channel.InboundHandler, msg channel.InboundMessage) { + a.logTelegramInbound(cfg.ID, msg) + go func() { + if err := handler(ctx, cfg, msg); err != nil && a.logger != nil { + a.logger.Error("handle inbound failed", slog.String("config_id", cfg.ID), slog.Any("error", err)) + } + }() +} + +func (a *TelegramAdapter) buildTelegramInboundMessage(bot *tgbotapi.BotAPI, cfg channel.ChannelConfig, raw *tgbotapi.Message) (channel.InboundMessage, bool) { + text := strings.TrimSpace(raw.Text) + caption := strings.TrimSpace(raw.Caption) + if text == "" && caption != "" { + text = caption + } + attachments := a.collectTelegramAttachments(bot, raw) + return a.toInboundTelegramMessage(bot, cfg, raw, text, attachments, nil) +} + +func (a *TelegramAdapter) buildTelegramMediaGroupInboundMessage( + bot *tgbotapi.BotAPI, + cfg channel.ChannelConfig, + raw []*tgbotapi.Message, +) (channel.InboundMessage, bool) { + if len(raw) == 0 { + return channel.InboundMessage{}, false + } + items := make([]*tgbotapi.Message, 0, len(raw)) + for _, msg := range raw { + if msg != nil { + items = append(items, msg) + } + } + if len(items) == 0 { + return channel.InboundMessage{}, false + } + sort.SliceStable(items, func(i, j int) bool { + return items[i].MessageID < items[j].MessageID + }) + anchor := items[0] + text := "" + attachments := make([]channel.Attachment, 0, len(items)) + isMentioned := false + isReplyToBot := false + botUsername := "" + botID := int64(0) + if bot != nil { + botUsername = bot.Self.UserName + botID = bot.Self.ID + } + for _, msg := range items { + candidate := strings.TrimSpace(msg.Text) + if candidate == "" { + candidate = strings.TrimSpace(msg.Caption) + } + if text == "" && candidate != "" { + text = candidate + anchor = msg + } + attachments = append(attachments, a.collectTelegramAttachments(bot, msg)...) + if !isMentioned { + isMentioned = isTelegramBotMentioned(msg, botUsername) + } + if !isReplyToBot { + isReplyToBot = msg.ReplyToMessage != nil && + msg.ReplyToMessage.From != nil && + msg.ReplyToMessage.From.ID == botID + } + } + metadata := map[string]any{ + "is_mentioned": isMentioned, + "is_reply_to_bot": isReplyToBot, + "media_group_id": strings.TrimSpace(anchor.MediaGroupID), + "media_group_size": len(items), + } + return a.toInboundTelegramMessage(bot, cfg, anchor, text, attachments, metadata) +} + +func (a *TelegramAdapter) toInboundTelegramMessage( + bot *tgbotapi.BotAPI, + cfg channel.ChannelConfig, + raw *tgbotapi.Message, + text string, + attachments []channel.Attachment, + metadata map[string]any, +) (channel.InboundMessage, bool) { + if raw == nil { + return channel.InboundMessage{}, false + } + text = strings.TrimSpace(text) + if text == "" && len(attachments) == 0 { + return channel.InboundMessage{}, false + } + subjectID, displayName, attrs := resolveTelegramSender(raw) + chatID := "" + chatType := "" + chatName := "" + if raw.Chat != nil { + chatID = strconv.FormatInt(raw.Chat.ID, 10) + chatType = strings.TrimSpace(raw.Chat.Type) + chatName = strings.TrimSpace(raw.Chat.Title) + } + replyRef := buildTelegramReplyRef(raw, chatID) + botUsername := "" + botID := int64(0) + if bot != nil { + botUsername = bot.Self.UserName + botID = bot.Self.ID + } + isReplyToBot := raw.ReplyToMessage != nil && + raw.ReplyToMessage.From != nil && + raw.ReplyToMessage.From.ID == botID + isMentioned := isTelegramBotMentioned(raw, botUsername) + meta := map[string]any{ + "is_mentioned": isMentioned, + "is_reply_to_bot": isReplyToBot, + } + for key, value := range metadata { + meta[key] = value + } + return channel.InboundMessage{ + Channel: Type, + Message: channel.Message{ + ID: strconv.Itoa(raw.MessageID), + Format: channel.MessageFormatPlain, + Text: text, + Attachments: attachments, + Reply: replyRef, + }, + BotID: cfg.BotID, + ReplyTarget: chatID, + Sender: channel.Identity{ + SubjectID: subjectID, + DisplayName: displayName, + Attributes: attrs, + }, + Conversation: channel.Conversation{ + ID: chatID, + Type: chatType, + Name: chatName, + }, + ReceivedAt: time.Unix(int64(raw.Date), 0).UTC(), + Source: "telegram", + Metadata: meta, + }, true +} + +func (a *TelegramAdapter) logTelegramInbound(configID string, msg channel.InboundMessage) { + if a.logger == nil { + return + } + a.logger.Info( + "inbound received", + slog.String("config_id", configID), + slog.String("chat_type", msg.Conversation.Type), + slog.String("chat_id", msg.Conversation.ID), + slog.String("user_id", msg.Sender.Attribute("user_id")), + slog.String("username", msg.Sender.Attribute("username")), + slog.String("text", common.SummarizeText(msg.Message.Text)), + slog.Int("attachments", len(msg.Message.Attachments)), + ) +} + // Send delivers an outbound message to Telegram, handling text, attachments, and replies. func (a *TelegramAdapter) Send(ctx context.Context, cfg channel.ChannelConfig, msg channel.OutboundMessage) error { telegramCfg, err := parseConfig(cfg.Credentials) diff --git a/internal/channel/adapters/telegram/telegram_test.go b/internal/channel/adapters/telegram/telegram_test.go index c845613b..88b02aae 100644 --- a/internal/channel/adapters/telegram/telegram_test.go +++ b/internal/channel/adapters/telegram/telegram_test.go @@ -197,6 +197,81 @@ func TestPickTelegramPhoto(t *testing.T) { } } +func TestBuildTelegramMediaGroupInboundMessageAggregatesAttachments(t *testing.T) { + t.Parallel() + + adapter := NewTelegramAdapter(nil) + bot := &tgbotapi.BotAPI{ + Token: "test", + Self: tgbotapi.User{ID: 1001, UserName: "memohbot"}, + } + cfg := channel.ChannelConfig{BotID: "bot-1"} + first := &tgbotapi.Message{ + MessageID: 101, + MediaGroupID: "group-1", + Date: 1710000000, + Chat: &tgbotapi.Chat{ID: -10001, Type: "group", Title: "G1"}, + From: &tgbotapi.User{ID: 10, UserName: "alice"}, + Photo: []tgbotapi.PhotoSize{ + {FileID: "photo-1", Width: 320, Height: 240, FileSize: 10}, + }, + } + second := &tgbotapi.Message{ + MessageID: 102, + MediaGroupID: "group-1", + Date: 1710000001, + Chat: &tgbotapi.Chat{ID: -10001, Type: "group", Title: "G1"}, + From: &tgbotapi.User{ID: 10, UserName: "alice"}, + Caption: "album caption", + Photo: []tgbotapi.PhotoSize{ + {FileID: "photo-2", Width: 640, Height: 480, FileSize: 20}, + }, + } + + inbound, ok := adapter.buildTelegramMediaGroupInboundMessage(bot, cfg, []*tgbotapi.Message{first, second}) + if !ok { + t.Fatal("expected grouped inbound message") + } + if inbound.Message.Text != "album caption" { + t.Fatalf("unexpected grouped text: %q", inbound.Message.Text) + } + if len(inbound.Message.Attachments) != 2 { + t.Fatalf("expected 2 attachments, got %d", len(inbound.Message.Attachments)) + } + if inbound.Message.Attachments[0].PlatformKey != "photo-1" || inbound.Message.Attachments[1].PlatformKey != "photo-2" { + t.Fatalf("unexpected attachment order: %#v", inbound.Message.Attachments) + } + if inbound.Message.ID != "102" { + t.Fatalf("expected anchor message id 102, got %s", inbound.Message.ID) + } + if inbound.ReplyTarget != "-10001" { + t.Fatalf("unexpected reply target: %q", inbound.ReplyTarget) + } + if got := inbound.Metadata["media_group_id"]; got != "group-1" { + t.Fatalf("unexpected media_group_id metadata: %#v", got) + } + if got := inbound.Metadata["media_group_size"]; got != 2 { + t.Fatalf("unexpected media_group_size metadata: %#v", got) + } +} + +func TestIsTelegramMediaGroupForChat(t *testing.T) { + t.Parallel() + + if isTelegramMediaGroupForChat("12:group-a", 12) == false { + t.Fatal("expected same chat key to match") + } + if isTelegramMediaGroupForChat("123:group-a", 12) { + t.Fatal("expected different chat key to not match") + } + if isTelegramMediaGroupForChat("", 12) { + t.Fatal("expected empty key to not match") + } + if isTelegramMediaGroupForChat("12:group-a", 0) { + t.Fatal("expected zero chat id to not match") + } +} + func TestTelegramAdapter_Type(t *testing.T) { t.Parallel() diff --git a/internal/channel/inbound/channel.go b/internal/channel/inbound/channel.go index a94cc35a..14c1efdb 100644 --- a/internal/channel/inbound/channel.go +++ b/internal/channel/inbound/channel.go @@ -939,7 +939,18 @@ func mapStreamChunkToChannelEvents(chunk conversation.StreamChunk) ([]channel.St } func buildInboundQuery(message channel.Message) string { - return strings.TrimSpace(message.PlainText()) + text := strings.TrimSpace(message.PlainText()) + if text != "" { + return text + } + if len(message.Attachments) == 0 { + return "" + } + count := len(message.Attachments) + if count == 1 { + return "[User sent 1 attachment]" + } + return fmt.Sprintf("[User sent %d attachments]", count) } func normalizeContentPartType(raw string) channel.MessagePartType { diff --git a/internal/channel/inbound/channel_test.go b/internal/channel/inbound/channel_test.go index ca980962..d5a78476 100644 --- a/internal/channel/inbound/channel_test.go +++ b/internal/channel/inbound/channel_test.go @@ -374,6 +374,73 @@ func TestChannelInboundProcessorIgnoreEmpty(t *testing.T) { } } +func TestBuildInboundQueryAttachmentFallback(t *testing.T) { + t.Parallel() + + one := channel.Message{ + Attachments: []channel.Attachment{ + {Type: channel.AttachmentImage}, + }, + } + if got := buildInboundQuery(one); got != "[User sent 1 attachment]" { + t.Fatalf("unexpected single attachment fallback: %q", got) + } + + two := channel.Message{ + Attachments: []channel.Attachment{ + {Type: channel.AttachmentImage}, + {Type: channel.AttachmentImage}, + }, + } + if got := buildInboundQuery(two); got != "[User sent 2 attachments]" { + t.Fatalf("unexpected multiple attachment fallback: %q", got) + } +} + +func TestChannelInboundProcessorAttachmentOnlyUsesFallbackQuery(t *testing.T) { + channelIdentitySvc := &fakeChannelIdentityService{channelIdentity: identities.ChannelIdentity{ID: "channelIdentity-fallback"}} + memberSvc := &fakeMemberService{isMember: true} + chatSvc := &fakeChatService{resolveResult: route.ResolveConversationResult{ChatID: "chat-fallback", RouteID: "route-fallback"}} + gateway := &fakeChatGateway{ + resp: conversation.ChatResponse{ + Messages: []conversation.ModelMessage{ + {Role: "assistant", Content: conversation.NewTextContent("AI reply")}, + }, + }, + } + processor := NewChannelInboundProcessor(slog.Default(), nil, chatSvc, chatSvc, gateway, channelIdentitySvc, memberSvc, nil, nil, nil, "", 0) + sender := &fakeReplySender{} + + cfg := channel.ChannelConfig{ID: "cfg-1", BotID: "bot-1", ChannelType: channel.ChannelType("telegram")} + msg := channel.InboundMessage{ + BotID: "bot-1", + Channel: channel.ChannelType("telegram"), + Message: channel.Message{ + Attachments: []channel.Attachment{ + {Type: channel.AttachmentImage, URL: "https://example.com/a.png"}, + {Type: channel.AttachmentImage, URL: "https://example.com/b.png"}, + }, + }, + ReplyTarget: "chat-123", + Sender: channel.Identity{SubjectID: "ext-1"}, + Conversation: channel.Conversation{ + ID: "conv-1", + Type: "p2p", + }, + } + + err := processor.HandleInbound(context.Background(), cfg, msg, sender) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gateway.gotReq.Query != "[User sent 2 attachments]" { + t.Fatalf("unexpected fallback query: %q", gateway.gotReq.Query) + } + if len(gateway.gotReq.Attachments) != 2 { + t.Fatalf("expected attachments to pass through, got %d", len(gateway.gotReq.Attachments)) + } +} + func TestChannelInboundProcessorSilentReply(t *testing.T) { channelIdentitySvc := &fakeChannelIdentityService{channelIdentity: identities.ChannelIdentity{ID: "channelIdentity-4"}} memberSvc := &fakeMemberService{isMember: true}