fix(telegram): aggregate media_group inbound and preserve ordering with (#69)

attachment fallback query
This commit is contained in:
Ringo.Typowriter
2026-02-19 17:14:12 +08:00
committed by GitHub
parent 51faa1f29f
commit 53f080503c
4 changed files with 435 additions and 65 deletions
+281 -64
View File
@@ -8,6 +8,7 @@ import (
"io"
"log/slog"
"net/http"
"sort"
"strconv"
"strings"
"sync"
@@ -22,6 +23,12 @@ import (
)
const telegramMaxMessageLength = 4096
const telegramMediaGroupCollectWindow = 700 * time.Millisecond
type telegramMediaGroupBuffer struct {
messages []*tgbotapi.Message
timer *time.Timer
}
// assetOpener reads stored asset bytes by content hash.
type assetOpener interface {
@@ -182,14 +189,93 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig
updateConfig.Timeout = 30
updates := bot.GetUpdatesChan(updateConfig)
connCtx, cancel := context.WithCancel(ctx)
mediaGroups := make(map[string]*telegramMediaGroupBuffer)
var mediaGroupsMu sync.Mutex
var flushMediaGroup func(groupKey string)
flushMediaGroup = func(groupKey string) {
var batch []*tgbotapi.Message
mediaGroupsMu.Lock()
buffer, ok := mediaGroups[groupKey]
if ok {
delete(mediaGroups, groupKey)
batch = append(batch, buffer.messages...)
}
mediaGroupsMu.Unlock()
if !ok || len(batch) == 0 {
return
}
msg, ok := a.buildTelegramMediaGroupInboundMessage(bot, cfg, batch)
if !ok {
return
}
a.dispatchInbound(connCtx, cfg, handler, msg)
}
flushAllMediaGroups := func() {
mediaGroupsMu.Lock()
keys := make([]string, 0, len(mediaGroups))
for key, buffer := range mediaGroups {
keys = append(keys, key)
if buffer != nil && buffer.timer != nil {
buffer.timer.Stop()
}
}
mediaGroupsMu.Unlock()
for _, key := range keys {
flushMediaGroup(key)
}
}
flushMediaGroupsByChat := func(chatID int64) {
if chatID == 0 {
return
}
mediaGroupsMu.Lock()
keys := make([]string, 0, len(mediaGroups))
for key, buffer := range mediaGroups {
if !isTelegramMediaGroupForChat(key, chatID) {
continue
}
keys = append(keys, key)
if buffer != nil && buffer.timer != nil {
buffer.timer.Stop()
}
}
mediaGroupsMu.Unlock()
for _, key := range keys {
flushMediaGroup(key)
}
}
queueMediaGroup := func(msg *tgbotapi.Message) bool {
groupKey := telegramMediaGroupKey(msg)
if groupKey == "" {
return false
}
mediaGroupsMu.Lock()
buffer, ok := mediaGroups[groupKey]
if !ok {
buffer = &telegramMediaGroupBuffer{}
mediaGroups[groupKey] = buffer
}
buffer.messages = append(buffer.messages, msg)
if buffer.timer != nil {
buffer.timer.Stop()
}
buffer.timer = time.AfterFunc(telegramMediaGroupCollectWindow, func() {
flushMediaGroup(groupKey)
})
mediaGroupsMu.Unlock()
return true
}
go func() {
for {
select {
case <-connCtx.Done():
flushAllMediaGroups()
return
case update, ok := <-updates:
if !ok {
flushAllMediaGroups()
if a.logger != nil {
a.logger.Info("updates channel closed", slog.String("config_id", cfg.ID))
}
@@ -198,73 +284,15 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig
if update.Message == nil {
continue
}
text := strings.TrimSpace(update.Message.Text)
caption := strings.TrimSpace(update.Message.Caption)
if text == "" && caption != "" {
text = caption
}
attachments := a.collectTelegramAttachments(bot, update.Message)
if text == "" && len(attachments) == 0 {
if queueMediaGroup(update.Message) {
continue
}
subjectID, displayName, attrs := resolveTelegramSender(update.Message)
chatID := ""
chatType := ""
chatName := ""
if update.Message.Chat != nil {
chatID = strconv.FormatInt(update.Message.Chat.ID, 10)
chatType = strings.TrimSpace(update.Message.Chat.Type)
chatName = strings.TrimSpace(update.Message.Chat.Title)
flushMediaGroupsByChat(telegramChatID(update.Message))
msg, ok := a.buildTelegramInboundMessage(bot, cfg, update.Message)
if !ok {
continue
}
replyRef := buildTelegramReplyRef(update.Message, chatID)
isReplyToBot := update.Message.ReplyToMessage != nil &&
update.Message.ReplyToMessage.From != nil &&
update.Message.ReplyToMessage.From.ID == bot.Self.ID
isMentioned := isTelegramBotMentioned(update.Message, bot.Self.UserName)
msg := channel.InboundMessage{
Channel: Type,
Message: channel.Message{
ID: strconv.Itoa(update.Message.MessageID),
Format: channel.MessageFormatPlain,
Text: text,
Attachments: attachments,
Reply: replyRef,
},
BotID: cfg.BotID,
ReplyTarget: chatID,
Sender: channel.Identity{
SubjectID: subjectID,
DisplayName: displayName,
Attributes: attrs,
},
Conversation: channel.Conversation{
ID: chatID,
Type: chatType,
Name: chatName,
},
ReceivedAt: time.Unix(int64(update.Message.Date), 0).UTC(),
Source: "telegram",
Metadata: map[string]any{
"is_mentioned": isMentioned,
"is_reply_to_bot": isReplyToBot,
},
}
if a.logger != nil {
a.logger.Info(
"inbound received",
slog.String("config_id", cfg.ID),
slog.String("chat_type", msg.Conversation.Type),
slog.String("chat_id", msg.Conversation.ID),
slog.String("user_id", attrs["user_id"]),
slog.String("username", attrs["username"]),
slog.String("text", common.SummarizeText(text)),
)
}
go func() {
if err := handler(connCtx, cfg, msg); err != nil && a.logger != nil {
a.logger.Error("handle inbound failed", slog.String("config_id", cfg.ID), slog.Any("error", err))
}
}()
a.dispatchInbound(connCtx, cfg, handler, msg)
}
}
}()
@@ -287,6 +315,195 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig
return channel.NewConnection(cfg, stop), nil
}
func telegramMediaGroupKey(msg *tgbotapi.Message) string {
if msg == nil {
return ""
}
mediaGroupID := strings.TrimSpace(msg.MediaGroupID)
if mediaGroupID == "" {
return ""
}
chatID := telegramChatID(msg)
return fmt.Sprintf("%d:%s", chatID, mediaGroupID)
}
func telegramChatID(msg *tgbotapi.Message) int64 {
if msg == nil || msg.Chat == nil {
return 0
}
return msg.Chat.ID
}
func isTelegramMediaGroupForChat(groupKey string, chatID int64) bool {
if chatID == 0 || strings.TrimSpace(groupKey) == "" {
return false
}
return strings.HasPrefix(groupKey, fmt.Sprintf("%d:", chatID))
}
func (a *TelegramAdapter) dispatchInbound(ctx context.Context, cfg channel.ChannelConfig, handler channel.InboundHandler, msg channel.InboundMessage) {
a.logTelegramInbound(cfg.ID, msg)
go func() {
if err := handler(ctx, cfg, msg); err != nil && a.logger != nil {
a.logger.Error("handle inbound failed", slog.String("config_id", cfg.ID), slog.Any("error", err))
}
}()
}
func (a *TelegramAdapter) buildTelegramInboundMessage(bot *tgbotapi.BotAPI, cfg channel.ChannelConfig, raw *tgbotapi.Message) (channel.InboundMessage, bool) {
text := strings.TrimSpace(raw.Text)
caption := strings.TrimSpace(raw.Caption)
if text == "" && caption != "" {
text = caption
}
attachments := a.collectTelegramAttachments(bot, raw)
return a.toInboundTelegramMessage(bot, cfg, raw, text, attachments, nil)
}
func (a *TelegramAdapter) buildTelegramMediaGroupInboundMessage(
bot *tgbotapi.BotAPI,
cfg channel.ChannelConfig,
raw []*tgbotapi.Message,
) (channel.InboundMessage, bool) {
if len(raw) == 0 {
return channel.InboundMessage{}, false
}
items := make([]*tgbotapi.Message, 0, len(raw))
for _, msg := range raw {
if msg != nil {
items = append(items, msg)
}
}
if len(items) == 0 {
return channel.InboundMessage{}, false
}
sort.SliceStable(items, func(i, j int) bool {
return items[i].MessageID < items[j].MessageID
})
anchor := items[0]
text := ""
attachments := make([]channel.Attachment, 0, len(items))
isMentioned := false
isReplyToBot := false
botUsername := ""
botID := int64(0)
if bot != nil {
botUsername = bot.Self.UserName
botID = bot.Self.ID
}
for _, msg := range items {
candidate := strings.TrimSpace(msg.Text)
if candidate == "" {
candidate = strings.TrimSpace(msg.Caption)
}
if text == "" && candidate != "" {
text = candidate
anchor = msg
}
attachments = append(attachments, a.collectTelegramAttachments(bot, msg)...)
if !isMentioned {
isMentioned = isTelegramBotMentioned(msg, botUsername)
}
if !isReplyToBot {
isReplyToBot = msg.ReplyToMessage != nil &&
msg.ReplyToMessage.From != nil &&
msg.ReplyToMessage.From.ID == botID
}
}
metadata := map[string]any{
"is_mentioned": isMentioned,
"is_reply_to_bot": isReplyToBot,
"media_group_id": strings.TrimSpace(anchor.MediaGroupID),
"media_group_size": len(items),
}
return a.toInboundTelegramMessage(bot, cfg, anchor, text, attachments, metadata)
}
func (a *TelegramAdapter) toInboundTelegramMessage(
bot *tgbotapi.BotAPI,
cfg channel.ChannelConfig,
raw *tgbotapi.Message,
text string,
attachments []channel.Attachment,
metadata map[string]any,
) (channel.InboundMessage, bool) {
if raw == nil {
return channel.InboundMessage{}, false
}
text = strings.TrimSpace(text)
if text == "" && len(attachments) == 0 {
return channel.InboundMessage{}, false
}
subjectID, displayName, attrs := resolveTelegramSender(raw)
chatID := ""
chatType := ""
chatName := ""
if raw.Chat != nil {
chatID = strconv.FormatInt(raw.Chat.ID, 10)
chatType = strings.TrimSpace(raw.Chat.Type)
chatName = strings.TrimSpace(raw.Chat.Title)
}
replyRef := buildTelegramReplyRef(raw, chatID)
botUsername := ""
botID := int64(0)
if bot != nil {
botUsername = bot.Self.UserName
botID = bot.Self.ID
}
isReplyToBot := raw.ReplyToMessage != nil &&
raw.ReplyToMessage.From != nil &&
raw.ReplyToMessage.From.ID == botID
isMentioned := isTelegramBotMentioned(raw, botUsername)
meta := map[string]any{
"is_mentioned": isMentioned,
"is_reply_to_bot": isReplyToBot,
}
for key, value := range metadata {
meta[key] = value
}
return channel.InboundMessage{
Channel: Type,
Message: channel.Message{
ID: strconv.Itoa(raw.MessageID),
Format: channel.MessageFormatPlain,
Text: text,
Attachments: attachments,
Reply: replyRef,
},
BotID: cfg.BotID,
ReplyTarget: chatID,
Sender: channel.Identity{
SubjectID: subjectID,
DisplayName: displayName,
Attributes: attrs,
},
Conversation: channel.Conversation{
ID: chatID,
Type: chatType,
Name: chatName,
},
ReceivedAt: time.Unix(int64(raw.Date), 0).UTC(),
Source: "telegram",
Metadata: meta,
}, true
}
func (a *TelegramAdapter) logTelegramInbound(configID string, msg channel.InboundMessage) {
if a.logger == nil {
return
}
a.logger.Info(
"inbound received",
slog.String("config_id", configID),
slog.String("chat_type", msg.Conversation.Type),
slog.String("chat_id", msg.Conversation.ID),
slog.String("user_id", msg.Sender.Attribute("user_id")),
slog.String("username", msg.Sender.Attribute("username")),
slog.String("text", common.SummarizeText(msg.Message.Text)),
slog.Int("attachments", len(msg.Message.Attachments)),
)
}
// Send delivers an outbound message to Telegram, handling text, attachments, and replies.
func (a *TelegramAdapter) Send(ctx context.Context, cfg channel.ChannelConfig, msg channel.OutboundMessage) error {
telegramCfg, err := parseConfig(cfg.Credentials)
@@ -197,6 +197,81 @@ func TestPickTelegramPhoto(t *testing.T) {
}
}
func TestBuildTelegramMediaGroupInboundMessageAggregatesAttachments(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
bot := &tgbotapi.BotAPI{
Token: "test",
Self: tgbotapi.User{ID: 1001, UserName: "memohbot"},
}
cfg := channel.ChannelConfig{BotID: "bot-1"}
first := &tgbotapi.Message{
MessageID: 101,
MediaGroupID: "group-1",
Date: 1710000000,
Chat: &tgbotapi.Chat{ID: -10001, Type: "group", Title: "G1"},
From: &tgbotapi.User{ID: 10, UserName: "alice"},
Photo: []tgbotapi.PhotoSize{
{FileID: "photo-1", Width: 320, Height: 240, FileSize: 10},
},
}
second := &tgbotapi.Message{
MessageID: 102,
MediaGroupID: "group-1",
Date: 1710000001,
Chat: &tgbotapi.Chat{ID: -10001, Type: "group", Title: "G1"},
From: &tgbotapi.User{ID: 10, UserName: "alice"},
Caption: "album caption",
Photo: []tgbotapi.PhotoSize{
{FileID: "photo-2", Width: 640, Height: 480, FileSize: 20},
},
}
inbound, ok := adapter.buildTelegramMediaGroupInboundMessage(bot, cfg, []*tgbotapi.Message{first, second})
if !ok {
t.Fatal("expected grouped inbound message")
}
if inbound.Message.Text != "album caption" {
t.Fatalf("unexpected grouped text: %q", inbound.Message.Text)
}
if len(inbound.Message.Attachments) != 2 {
t.Fatalf("expected 2 attachments, got %d", len(inbound.Message.Attachments))
}
if inbound.Message.Attachments[0].PlatformKey != "photo-1" || inbound.Message.Attachments[1].PlatformKey != "photo-2" {
t.Fatalf("unexpected attachment order: %#v", inbound.Message.Attachments)
}
if inbound.Message.ID != "102" {
t.Fatalf("expected anchor message id 102, got %s", inbound.Message.ID)
}
if inbound.ReplyTarget != "-10001" {
t.Fatalf("unexpected reply target: %q", inbound.ReplyTarget)
}
if got := inbound.Metadata["media_group_id"]; got != "group-1" {
t.Fatalf("unexpected media_group_id metadata: %#v", got)
}
if got := inbound.Metadata["media_group_size"]; got != 2 {
t.Fatalf("unexpected media_group_size metadata: %#v", got)
}
}
func TestIsTelegramMediaGroupForChat(t *testing.T) {
t.Parallel()
if isTelegramMediaGroupForChat("12:group-a", 12) == false {
t.Fatal("expected same chat key to match")
}
if isTelegramMediaGroupForChat("123:group-a", 12) {
t.Fatal("expected different chat key to not match")
}
if isTelegramMediaGroupForChat("", 12) {
t.Fatal("expected empty key to not match")
}
if isTelegramMediaGroupForChat("12:group-a", 0) {
t.Fatal("expected zero chat id to not match")
}
}
func TestTelegramAdapter_Type(t *testing.T) {
t.Parallel()
+12 -1
View File
@@ -939,7 +939,18 @@ func mapStreamChunkToChannelEvents(chunk conversation.StreamChunk) ([]channel.St
}
func buildInboundQuery(message channel.Message) string {
return strings.TrimSpace(message.PlainText())
text := strings.TrimSpace(message.PlainText())
if text != "" {
return text
}
if len(message.Attachments) == 0 {
return ""
}
count := len(message.Attachments)
if count == 1 {
return "[User sent 1 attachment]"
}
return fmt.Sprintf("[User sent %d attachments]", count)
}
func normalizeContentPartType(raw string) channel.MessagePartType {
+67
View File
@@ -374,6 +374,73 @@ func TestChannelInboundProcessorIgnoreEmpty(t *testing.T) {
}
}
func TestBuildInboundQueryAttachmentFallback(t *testing.T) {
t.Parallel()
one := channel.Message{
Attachments: []channel.Attachment{
{Type: channel.AttachmentImage},
},
}
if got := buildInboundQuery(one); got != "[User sent 1 attachment]" {
t.Fatalf("unexpected single attachment fallback: %q", got)
}
two := channel.Message{
Attachments: []channel.Attachment{
{Type: channel.AttachmentImage},
{Type: channel.AttachmentImage},
},
}
if got := buildInboundQuery(two); got != "[User sent 2 attachments]" {
t.Fatalf("unexpected multiple attachment fallback: %q", got)
}
}
func TestChannelInboundProcessorAttachmentOnlyUsesFallbackQuery(t *testing.T) {
channelIdentitySvc := &fakeChannelIdentityService{channelIdentity: identities.ChannelIdentity{ID: "channelIdentity-fallback"}}
memberSvc := &fakeMemberService{isMember: true}
chatSvc := &fakeChatService{resolveResult: route.ResolveConversationResult{ChatID: "chat-fallback", RouteID: "route-fallback"}}
gateway := &fakeChatGateway{
resp: conversation.ChatResponse{
Messages: []conversation.ModelMessage{
{Role: "assistant", Content: conversation.NewTextContent("AI reply")},
},
},
}
processor := NewChannelInboundProcessor(slog.Default(), nil, chatSvc, chatSvc, gateway, channelIdentitySvc, memberSvc, nil, nil, nil, "", 0)
sender := &fakeReplySender{}
cfg := channel.ChannelConfig{ID: "cfg-1", BotID: "bot-1", ChannelType: channel.ChannelType("telegram")}
msg := channel.InboundMessage{
BotID: "bot-1",
Channel: channel.ChannelType("telegram"),
Message: channel.Message{
Attachments: []channel.Attachment{
{Type: channel.AttachmentImage, URL: "https://example.com/a.png"},
{Type: channel.AttachmentImage, URL: "https://example.com/b.png"},
},
},
ReplyTarget: "chat-123",
Sender: channel.Identity{SubjectID: "ext-1"},
Conversation: channel.Conversation{
ID: "conv-1",
Type: "p2p",
},
}
err := processor.HandleInbound(context.Background(), cfg, msg, sender)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if gateway.gotReq.Query != "[User sent 2 attachments]" {
t.Fatalf("unexpected fallback query: %q", gateway.gotReq.Query)
}
if len(gateway.gotReq.Attachments) != 2 {
t.Fatalf("expected attachments to pass through, got %d", len(gateway.gotReq.Attachments))
}
}
func TestChannelInboundProcessorSilentReply(t *testing.T) {
channelIdentitySvc := &fakeChannelIdentityService{channelIdentity: identities.ChannelIdentity{ID: "channelIdentity-4"}}
memberSvc := &fakeMemberService{isMember: true}