diff --git a/internal/channel/inbound/channel.go b/internal/channel/inbound/channel.go index fa73479d..ee4f6aaf 100644 --- a/internal/channel/inbound/channel.go +++ b/internal/channel/inbound/channel.go @@ -279,8 +279,6 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel p.markInboxItemRead(ctx, inboxItem) } - userMessagePersisted := p.persistInboundUser(ctx, resolved.RouteID, identity, msg, text, attachments, "active_chat") - // Issue chat token for reply routing. chatToken := "" if p.jwtSecret != "" && strings.TrimSpace(msg.ReplyTarget) != "" { @@ -425,7 +423,7 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel Query: text, CurrentChannel: msg.Channel.String(), Channels: []string{msg.Channel.String()}, - UserMessagePersisted: userMessagePersisted, + UserMessagePersisted: false, Attachments: attachments, OutboundAssetCollector: assetCollector, }) @@ -680,71 +678,6 @@ func metadataBool(metadata map[string]any, key string) bool { } } -func (p *ChannelInboundProcessor) persistInboundUser( - ctx context.Context, - routeID string, - identity InboundIdentity, - msg channel.InboundMessage, - query string, - attachments []conversation.ChatAttachment, - triggerMode string, -) bool { - if p.message == nil { - return false - } - botID := strings.TrimSpace(identity.BotID) - if botID == "" { - return false - } - var attachmentPaths []string - for _, att := range attachments { - if ap := strings.TrimSpace(att.Path); ap != "" { - attachmentPaths = append(attachmentPaths, ap) - } - } - headerifiedQuery := flow.FormatUserHeader( - strings.TrimSpace(msg.Message.ID), - strings.TrimSpace(identity.ChannelIdentityID), - strings.TrimSpace(identity.DisplayName), - msg.Channel.String(), - strings.TrimSpace(msg.Conversation.Type), - strings.TrimSpace(msg.Conversation.Name), - attachmentPaths, - query, - ) - payload, err := json.Marshal(conversation.ModelMessage{ - Role: "user", - Content: conversation.NewTextContent(headerifiedQuery), - }) - if err != nil { - if p.logger != nil { - p.logger.Warn("marshal inbound user message failed", slog.Any("error", err)) - } - return false - } - meta := map[string]any{ - "route_id": strings.TrimSpace(routeID), - "platform": msg.Channel.String(), - "trigger_mode": strings.TrimSpace(triggerMode), - } - if _, err := p.message.Persist(ctx, messagepkg.PersistInput{ - BotID: botID, - RouteID: strings.TrimSpace(routeID), - SenderChannelIdentityID: strings.TrimSpace(identity.ChannelIdentityID), - SenderUserID: strings.TrimSpace(identity.UserID), - Platform: msg.Channel.String(), - ExternalMessageID: strings.TrimSpace(msg.Message.ID), - Role: "user", - Content: payload, - Metadata: meta, - Assets: chatAttachmentsToAssetRefs(attachments), - }); err != nil && p.logger != nil { - p.logger.Warn("persist inbound user message failed", slog.Any("error", err)) - return false - } - return true -} - func (p *ChannelInboundProcessor) createInboxItem( ctx context.Context, ident InboundIdentity, @@ -1887,36 +1820,6 @@ func channelAttachmentsToAssetRefs(attachments []channel.Attachment, role string return refs } -func chatAttachmentsToAssetRefs(attachments []conversation.ChatAttachment) []messagepkg.AssetRef { - if len(attachments) == 0 { - return nil - } - refs := make([]messagepkg.AssetRef, 0, len(attachments)) - for idx, att := range attachments { - contentHash := strings.TrimSpace(att.ContentHash) - if contentHash == "" { - continue - } - ref := messagepkg.AssetRef{ - ContentHash: contentHash, - Role: "attachment", - Ordinal: idx, - Mime: strings.TrimSpace(att.Mime), - SizeBytes: att.Size, - } - if att.Metadata != nil { - if sk, ok := att.Metadata["storage_key"].(string); ok { - ref.StorageKey = sk - } - } - refs = append(refs, ref) - } - if len(refs) == 0 { - return nil - } - return refs -} - func mapChannelToChatAttachments(attachments []channel.Attachment) []conversation.ChatAttachment { if len(attachments) == 0 { return nil diff --git a/internal/channel/inbound/channel_test.go b/internal/channel/inbound/channel_test.go index cc6d4f32..8092f686 100644 --- a/internal/channel/inbound/channel_test.go +++ b/internal/channel/inbound/channel_test.go @@ -629,8 +629,8 @@ func TestChannelInboundProcessorGroupMentionTriggersReply(t *testing.T) { if len(sender.sent) != 1 { t.Fatalf("expected one outbound reply, got %d", len(sender.sent)) } - if !gateway.gotReq.UserMessagePersisted { - t.Fatalf("expected UserMessagePersisted=true for pre-persisted inbound message") + if gateway.gotReq.UserMessagePersisted { + t.Fatalf("expected UserMessagePersisted=false: user message persistence is deferred to storeRound") } } @@ -649,7 +649,7 @@ func (s *failingOpenStreamSender) OpenStream(_ context.Context, _ string, _ chan return nil, errors.New("open stream failed") } -func TestChannelInboundProcessorPersistsActiveChatBeforeOpenStream(t *testing.T) { +func TestChannelInboundProcessorDoesNotPersistBeforeOpenStream(t *testing.T) { channelIdentitySvc := &fakeChannelIdentityService{channelIdentity: identities.ChannelIdentity{ID: "channelIdentity-openstream"}} memberSvc := &fakeMemberService{isMember: true} chatSvc := &fakeChatService{resolveResult: route.ResolveConversationResult{ChatID: "chat-openstream", RouteID: "route-openstream"}} @@ -674,11 +674,8 @@ func TestChannelInboundProcessorPersistsActiveChatBeforeOpenStream(t *testing.T) if err == nil || err.Error() != "stream unavailable" { t.Fatalf("expected open stream error, got: %v", err) } - if len(chatSvc.persistedIn) != 1 { - t.Fatalf("expected active-chat user turn to be persisted before stream open, got %d", len(chatSvc.persistedIn)) - } - if got := chatSvc.persistedIn[0].ExternalMessageID; got != "msg-openstream-1" { - t.Fatalf("unexpected persisted external_message_id: %q", got) + if len(chatSvc.persistedIn) != 0 { + t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn)) } if gateway.gotReq.Query != "" { t.Fatalf("runner should not be called when stream open fails") @@ -727,14 +724,8 @@ func TestChannelInboundProcessorPersistsAttachmentAssetRefs(t *testing.T) { if err := processor.HandleInbound(context.Background(), cfg, msg, sender); err != nil { t.Fatalf("unexpected error: %v", err) } - if len(chatSvc.persistedIn) != 1 { - t.Fatalf("expected one persisted input, got %d", len(chatSvc.persistedIn)) - } - if len(chatSvc.persistedIn[0].Assets) != 1 { - t.Fatalf("expected one persisted asset ref, got %d", len(chatSvc.persistedIn[0].Assets)) - } - if got := chatSvc.persistedIn[0].Assets[0].ContentHash; got != "asset-1" { - t.Fatalf("expected persisted content_hash asset-1, got %q", got) + if len(chatSvc.persistedIn) != 0 { + t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn)) } if len(gateway.gotReq.Attachments) != 1 { t.Fatalf("expected one gateway attachment, got %d", len(gateway.gotReq.Attachments)) @@ -796,11 +787,8 @@ func TestChannelInboundProcessorIngestsPlatformKeyWithResolver(t *testing.T) { if got := gateway.gotReq.Attachments[0].ContentHash; got != "asset-resolved-1" { t.Fatalf("expected resolved asset id, got %q", got) } - if len(chatSvc.persistedIn) != 1 || len(chatSvc.persistedIn[0].Assets) != 1 { - t.Fatalf("expected one persisted asset ref, got %+v", chatSvc.persistedIn) - } - if got := chatSvc.persistedIn[0].Assets[0].ContentHash; got != "asset-resolved-1" { - t.Fatalf("expected persisted content_hash asset-resolved-1, got %q", got) + if len(chatSvc.persistedIn) != 0 { + t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn)) } } @@ -871,11 +859,8 @@ func TestChannelInboundProcessorIngestsBase64Attachment(t *testing.T) { if !strings.HasPrefix(gotAttachment.Path, "/data/media/") { t.Fatalf("expected attachment path under /data/media/, got %q", gotAttachment.Path) } - if len(chatSvc.persistedIn) != 1 || len(chatSvc.persistedIn[0].Assets) != 1 { - t.Fatalf("expected one persisted asset ref, got %+v", chatSvc.persistedIn) - } - if got := chatSvc.persistedIn[0].Assets[0].ContentHash; got != "asset-base64-1" { - t.Fatalf("expected persisted content_hash asset-base64-1, got %q", got) + if len(chatSvc.persistedIn) != 0 { + t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn)) } }