refactor: defer user message persistence to storeRound for atomic writes

User messages from channel inbound (Telegram, Discord, Feishu, etc.)
were previously persisted before the agent runs. Now they are written
together with assistant/tool messages at the end of a conversation turn
(or on abort), matching the behavior of WebSocket and sync chat paths.
This commit is contained in:
Acbox
2026-03-12 18:59:33 +08:00
parent 0ed4fb69fc
commit e9059fddda
2 changed files with 12 additions and 124 deletions
+1 -98
View File
@@ -279,8 +279,6 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel
p.markInboxItemRead(ctx, inboxItem)
}
userMessagePersisted := p.persistInboundUser(ctx, resolved.RouteID, identity, msg, text, attachments, "active_chat")
// Issue chat token for reply routing.
chatToken := ""
if p.jwtSecret != "" && strings.TrimSpace(msg.ReplyTarget) != "" {
@@ -425,7 +423,7 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel
Query: text,
CurrentChannel: msg.Channel.String(),
Channels: []string{msg.Channel.String()},
UserMessagePersisted: userMessagePersisted,
UserMessagePersisted: false,
Attachments: attachments,
OutboundAssetCollector: assetCollector,
})
@@ -680,71 +678,6 @@ func metadataBool(metadata map[string]any, key string) bool {
}
}
func (p *ChannelInboundProcessor) persistInboundUser(
ctx context.Context,
routeID string,
identity InboundIdentity,
msg channel.InboundMessage,
query string,
attachments []conversation.ChatAttachment,
triggerMode string,
) bool {
if p.message == nil {
return false
}
botID := strings.TrimSpace(identity.BotID)
if botID == "" {
return false
}
var attachmentPaths []string
for _, att := range attachments {
if ap := strings.TrimSpace(att.Path); ap != "" {
attachmentPaths = append(attachmentPaths, ap)
}
}
headerifiedQuery := flow.FormatUserHeader(
strings.TrimSpace(msg.Message.ID),
strings.TrimSpace(identity.ChannelIdentityID),
strings.TrimSpace(identity.DisplayName),
msg.Channel.String(),
strings.TrimSpace(msg.Conversation.Type),
strings.TrimSpace(msg.Conversation.Name),
attachmentPaths,
query,
)
payload, err := json.Marshal(conversation.ModelMessage{
Role: "user",
Content: conversation.NewTextContent(headerifiedQuery),
})
if err != nil {
if p.logger != nil {
p.logger.Warn("marshal inbound user message failed", slog.Any("error", err))
}
return false
}
meta := map[string]any{
"route_id": strings.TrimSpace(routeID),
"platform": msg.Channel.String(),
"trigger_mode": strings.TrimSpace(triggerMode),
}
if _, err := p.message.Persist(ctx, messagepkg.PersistInput{
BotID: botID,
RouteID: strings.TrimSpace(routeID),
SenderChannelIdentityID: strings.TrimSpace(identity.ChannelIdentityID),
SenderUserID: strings.TrimSpace(identity.UserID),
Platform: msg.Channel.String(),
ExternalMessageID: strings.TrimSpace(msg.Message.ID),
Role: "user",
Content: payload,
Metadata: meta,
Assets: chatAttachmentsToAssetRefs(attachments),
}); err != nil && p.logger != nil {
p.logger.Warn("persist inbound user message failed", slog.Any("error", err))
return false
}
return true
}
func (p *ChannelInboundProcessor) createInboxItem(
ctx context.Context,
ident InboundIdentity,
@@ -1887,36 +1820,6 @@ func channelAttachmentsToAssetRefs(attachments []channel.Attachment, role string
return refs
}
func chatAttachmentsToAssetRefs(attachments []conversation.ChatAttachment) []messagepkg.AssetRef {
if len(attachments) == 0 {
return nil
}
refs := make([]messagepkg.AssetRef, 0, len(attachments))
for idx, att := range attachments {
contentHash := strings.TrimSpace(att.ContentHash)
if contentHash == "" {
continue
}
ref := messagepkg.AssetRef{
ContentHash: contentHash,
Role: "attachment",
Ordinal: idx,
Mime: strings.TrimSpace(att.Mime),
SizeBytes: att.Size,
}
if att.Metadata != nil {
if sk, ok := att.Metadata["storage_key"].(string); ok {
ref.StorageKey = sk
}
}
refs = append(refs, ref)
}
if len(refs) == 0 {
return nil
}
return refs
}
func mapChannelToChatAttachments(attachments []channel.Attachment) []conversation.ChatAttachment {
if len(attachments) == 0 {
return nil
+11 -26
View File
@@ -629,8 +629,8 @@ func TestChannelInboundProcessorGroupMentionTriggersReply(t *testing.T) {
if len(sender.sent) != 1 {
t.Fatalf("expected one outbound reply, got %d", len(sender.sent))
}
if !gateway.gotReq.UserMessagePersisted {
t.Fatalf("expected UserMessagePersisted=true for pre-persisted inbound message")
if gateway.gotReq.UserMessagePersisted {
t.Fatalf("expected UserMessagePersisted=false: user message persistence is deferred to storeRound")
}
}
@@ -649,7 +649,7 @@ func (s *failingOpenStreamSender) OpenStream(_ context.Context, _ string, _ chan
return nil, errors.New("open stream failed")
}
func TestChannelInboundProcessorPersistsActiveChatBeforeOpenStream(t *testing.T) {
func TestChannelInboundProcessorDoesNotPersistBeforeOpenStream(t *testing.T) {
channelIdentitySvc := &fakeChannelIdentityService{channelIdentity: identities.ChannelIdentity{ID: "channelIdentity-openstream"}}
memberSvc := &fakeMemberService{isMember: true}
chatSvc := &fakeChatService{resolveResult: route.ResolveConversationResult{ChatID: "chat-openstream", RouteID: "route-openstream"}}
@@ -674,11 +674,8 @@ func TestChannelInboundProcessorPersistsActiveChatBeforeOpenStream(t *testing.T)
if err == nil || err.Error() != "stream unavailable" {
t.Fatalf("expected open stream error, got: %v", err)
}
if len(chatSvc.persistedIn) != 1 {
t.Fatalf("expected active-chat user turn to be persisted before stream open, got %d", len(chatSvc.persistedIn))
}
if got := chatSvc.persistedIn[0].ExternalMessageID; got != "msg-openstream-1" {
t.Fatalf("unexpected persisted external_message_id: %q", got)
if len(chatSvc.persistedIn) != 0 {
t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn))
}
if gateway.gotReq.Query != "" {
t.Fatalf("runner should not be called when stream open fails")
@@ -727,14 +724,8 @@ func TestChannelInboundProcessorPersistsAttachmentAssetRefs(t *testing.T) {
if err := processor.HandleInbound(context.Background(), cfg, msg, sender); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(chatSvc.persistedIn) != 1 {
t.Fatalf("expected one persisted input, got %d", len(chatSvc.persistedIn))
}
if len(chatSvc.persistedIn[0].Assets) != 1 {
t.Fatalf("expected one persisted asset ref, got %d", len(chatSvc.persistedIn[0].Assets))
}
if got := chatSvc.persistedIn[0].Assets[0].ContentHash; got != "asset-1" {
t.Fatalf("expected persisted content_hash asset-1, got %q", got)
if len(chatSvc.persistedIn) != 0 {
t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn))
}
if len(gateway.gotReq.Attachments) != 1 {
t.Fatalf("expected one gateway attachment, got %d", len(gateway.gotReq.Attachments))
@@ -796,11 +787,8 @@ func TestChannelInboundProcessorIngestsPlatformKeyWithResolver(t *testing.T) {
if got := gateway.gotReq.Attachments[0].ContentHash; got != "asset-resolved-1" {
t.Fatalf("expected resolved asset id, got %q", got)
}
if len(chatSvc.persistedIn) != 1 || len(chatSvc.persistedIn[0].Assets) != 1 {
t.Fatalf("expected one persisted asset ref, got %+v", chatSvc.persistedIn)
}
if got := chatSvc.persistedIn[0].Assets[0].ContentHash; got != "asset-resolved-1" {
t.Fatalf("expected persisted content_hash asset-resolved-1, got %q", got)
if len(chatSvc.persistedIn) != 0 {
t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn))
}
}
@@ -871,11 +859,8 @@ func TestChannelInboundProcessorIngestsBase64Attachment(t *testing.T) {
if !strings.HasPrefix(gotAttachment.Path, "/data/media/") {
t.Fatalf("expected attachment path under /data/media/, got %q", gotAttachment.Path)
}
if len(chatSvc.persistedIn) != 1 || len(chatSvc.persistedIn[0].Assets) != 1 {
t.Fatalf("expected one persisted asset ref, got %+v", chatSvc.persistedIn)
}
if got := chatSvc.persistedIn[0].Assets[0].ContentHash; got != "asset-base64-1" {
t.Fatalf("expected persisted content_hash asset-base64-1, got %q", got)
if len(chatSvc.persistedIn) != 0 {
t.Fatalf("user message persistence is deferred to storeRound; expected 0 persisted, got %d", len(chatSvc.persistedIn))
}
}