From e9059fddda507fe4481d921e7abce8126b9f8511 Mon Sep 17 00:00:00 2001 From: Acbox Date: Thu, 12 Mar 2026 18:59:33 +0800 Subject: [PATCH] refactor: defer user message persistence to storeRound for atomic writes User messages from channel inbound (Telegram, Discord, Feishu, etc.) were previously persisted before the agent runs. Now they are written together with assistant/tool messages at the end of a conversation turn (or on abort), matching the behavior of WebSocket and sync chat paths. --- internal/channel/inbound/channel.go | 99 +----------------------- internal/channel/inbound/channel_test.go | 37 +++------ 2 files changed, 12 insertions(+), 124 deletions(-) diff --git a/internal/channel/inbound/channel.go b/internal/channel/inbound/channel.go index fa73479d..ee4f6aaf 100644 --- a/internal/channel/inbound/channel.go +++ b/internal/channel/inbound/channel.go @@ -279,8 +279,6 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel p.markInboxItemRead(ctx, inboxItem) } - userMessagePersisted := p.persistInboundUser(ctx, resolved.RouteID, identity, msg, text, attachments, "active_chat") - // Issue chat token for reply routing. chatToken := "" if p.jwtSecret != "" && strings.TrimSpace(msg.ReplyTarget) != "" { @@ -425,7 +423,7 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel Query: text, CurrentChannel: msg.Channel.String(), Channels: []string{msg.Channel.String()}, - UserMessagePersisted: userMessagePersisted, + UserMessagePersisted: false, Attachments: attachments, OutboundAssetCollector: assetCollector, }) @@ -680,71 +678,6 @@ func metadataBool(metadata map[string]any, key string) bool { } } -func (p *ChannelInboundProcessor) persistInboundUser( - ctx context.Context, - routeID string, - identity InboundIdentity, - msg channel.InboundMessage, - query string, - attachments []conversation.ChatAttachment, - triggerMode string, -) bool { - if p.message == nil { - return false - } - botID := strings.TrimSpace(identity.BotID) - if botID == "" { - return false - } - var attachmentPaths []string - for _, att := range attachments { - if ap := strings.TrimSpace(att.Path); ap != "" { - attachmentPaths = append(attachmentPaths, ap) - } - } - headerifiedQuery := flow.FormatUserHeader( - strings.TrimSpace(msg.Message.ID), - strings.TrimSpace(identity.ChannelIdentityID), - strings.TrimSpace(identity.DisplayName), - msg.Channel.String(), - strings.TrimSpace(msg.Conversation.Type), - strings.TrimSpace(msg.Conversation.Name), - attachmentPaths, - query, - ) - payload, err := json.Marshal(conversation.ModelMessage{ - Role: "user", - Content: conversation.NewTextContent(headerifiedQuery), - }) - if err != nil { - if p.logger != nil { - p.logger.Warn("marshal inbound user message failed", slog.Any("error", err)) - } - return false - } - meta := map[string]any{ - "route_id": strings.TrimSpace(routeID), - "platform": msg.Channel.String(), - "trigger_mode": strings.TrimSpace(triggerMode), - } - if _, err := p.message.Persist(ctx, messagepkg.PersistInput{ - BotID: botID, - RouteID: strings.TrimSpace(routeID), - SenderChannelIdentityID: strings.TrimSpace(identity.ChannelIdentityID), - SenderUserID: strings.TrimSpace(identity.UserID), - Platform: msg.Channel.String(), - ExternalMessageID: strings.TrimSpace(msg.Message.ID), - Role: "user", - Content: payload, - Metadata: meta, - Assets: chatAttachmentsToAssetRefs(attachments), - }); err != nil && p.logger != nil { - p.logger.Warn("persist inbound user message failed", slog.Any("error", err)) - return false - } - return true -} - func (p *ChannelInboundProcessor) createInboxItem( ctx context.Context, ident InboundIdentity, @@ -1887,36 +1820,6 @@ func channelAttachmentsToAssetRefs(attachments []channel.Attachment, role string return refs } -func chatAttachmentsToAssetRefs(attachments []conversation.ChatAttachment) []messagepkg.AssetRef { - if len(attachments) == 0 { - return nil - } - refs := make([]messagepkg.AssetRef, 0, len(attachments)) - for idx, att := range attachments { - contentHash := strings.TrimSpace(att.ContentHash) - if contentHash == "" { - continue - } - ref := messagepkg.AssetRef{ - ContentHash: contentHash, - Role: "attachment", - Ordinal: idx, - Mime: strings.TrimSpace(att.Mime), - SizeBytes: att.Size, - } - if att.Metadata != nil { - if sk, ok := att.Metadata["storage_key"].(string); ok { - ref.StorageKey = sk - } - } - refs = append(refs, ref) - } - if len(refs) == 0 { - return nil - } - return refs -} - func mapChannelToChatAttachments(attachments []channel.Attachment) []conversation.ChatAttachment { if len(attachments) == 0 { return nil diff --git a/internal/channel/inbound/channel_test.go b/internal/channel/inbound/channel_test.go index cc6d4f32..8092f686 100644 --- a/internal/channel/inbound/channel_test.go +++ b/internal/channel/inbound/channel_test.go @@ -629,8 +629,8 @@ func TestChannelInboundProcessorGroupMentionTriggersReply(t *testing.T) { if len(sender.sent) != 1 { t.Fatalf("expected one outbound reply, got %d", len(sender.sent)) } - if !gateway.gotReq.UserMessagePersisted { - t.Fatalf("expected UserMessagePersisted=true for pre-persisted inbound message") + if gateway.gotReq.UserMessagePersisted { + t.Fatalf("expected UserMessagePersisted=false: user message persistence is deferred to storeRound") } } @@ -649,7 +649,7 @@ func (s *failingOpenStreamSender) OpenStream(_ context.Context, _ string, _ chan return nil, errors.New("open stream failed") } -func TestChannelInboundProcessorPersistsActiveChatBeforeOpenStream(t *testing.T) { +func TestChannelInboundProcessorDoesNotPersistBeforeOpenStream(t *testing.T) { channelIdentitySvc := &fakeChannelIdentityService{channelIdentity: identities.ChannelIdentity{ID: "channelIdentity-openstream"}} memberSvc := &fakeMemberService{isMember: true} chatSvc := &fakeChatService{resolveResult: route.ResolveConversationResult{ChatID: "chat-openstream", RouteID: "route-openstream"}} @@ -674,11 +674,8 @@ func TestChannelInboundProcessorPersistsActiveChatBeforeOpenStream(t *testing.T) if err == nil || err.Error() != "stream unavailable" { t.Fatalf("expected open stream error, got: %v", err) } - if len(chatSvc.persistedIn) != 1 { - t.Fatalf("expected active-chat user turn to be persisted before stream open, got %d", len(chatSvc.persistedIn)) - } - if got := chatSvc.persistedIn[0].ExternalMessageID; got != "msg-openstream-1" { - t.Fatalf("unexpected persisted external_message_id: %q", got) + if len(chatSvc.persistedIn) != 0 { + t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn)) } if gateway.gotReq.Query != "" { t.Fatalf("runner should not be called when stream open fails") @@ -727,14 +724,8 @@ func TestChannelInboundProcessorPersistsAttachmentAssetRefs(t *testing.T) { if err := processor.HandleInbound(context.Background(), cfg, msg, sender); err != nil { t.Fatalf("unexpected error: %v", err) } - if len(chatSvc.persistedIn) != 1 { - t.Fatalf("expected one persisted input, got %d", len(chatSvc.persistedIn)) - } - if len(chatSvc.persistedIn[0].Assets) != 1 { - t.Fatalf("expected one persisted asset ref, got %d", len(chatSvc.persistedIn[0].Assets)) - } - if got := chatSvc.persistedIn[0].Assets[0].ContentHash; got != "asset-1" { - t.Fatalf("expected persisted content_hash asset-1, got %q", got) + if len(chatSvc.persistedIn) != 0 { + t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn)) } if len(gateway.gotReq.Attachments) != 1 { t.Fatalf("expected one gateway attachment, got %d", len(gateway.gotReq.Attachments)) @@ -796,11 +787,8 @@ func TestChannelInboundProcessorIngestsPlatformKeyWithResolver(t *testing.T) { if got := gateway.gotReq.Attachments[0].ContentHash; got != "asset-resolved-1" { t.Fatalf("expected resolved asset id, got %q", got) } - if len(chatSvc.persistedIn) != 1 || len(chatSvc.persistedIn[0].Assets) != 1 { - t.Fatalf("expected one persisted asset ref, got %+v", chatSvc.persistedIn) - } - if got := chatSvc.persistedIn[0].Assets[0].ContentHash; got != "asset-resolved-1" { - t.Fatalf("expected persisted content_hash asset-resolved-1, got %q", got) + if len(chatSvc.persistedIn) != 0 { + t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn)) } } @@ -871,11 +859,8 @@ func TestChannelInboundProcessorIngestsBase64Attachment(t *testing.T) { if !strings.HasPrefix(gotAttachment.Path, "/data/media/") { t.Fatalf("expected attachment path under /data/media/, got %q", gotAttachment.Path) } - if len(chatSvc.persistedIn) != 1 || len(chatSvc.persistedIn[0].Assets) != 1 { - t.Fatalf("expected one persisted asset ref, got %+v", chatSvc.persistedIn) - } - if got := chatSvc.persistedIn[0].Assets[0].ContentHash; got != "asset-base64-1" { - t.Fatalf("expected persisted content_hash asset-base64-1, got %q", got) + if len(chatSvc.persistedIn) != 0 { + t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn)) } }