feat: bot inbox (#77)

* feat: bot inbox

* feat: unified header

* fix: missing tool_call usage

* feat: add group name in header
This commit is contained in:
Acbox Liu
2026-02-22 01:27:24 +08:00
committed by GitHub
parent 2c6b5e5565
commit c591af14b0
42 changed files with 3367 additions and 260 deletions
+189 -60
View File
@@ -21,6 +21,7 @@ import (
"github.com/memohai/memoh/internal/conversation"
"github.com/memohai/memoh/internal/db"
"github.com/memohai/memoh/internal/db/sqlc"
"github.com/memohai/memoh/internal/inbox"
"github.com/memohai/memoh/internal/memory"
messagepkg "github.com/memohai/memoh/internal/message"
"github.com/memohai/memoh/internal/models"
@@ -75,6 +76,7 @@ type Resolver struct {
conversationSvc ConversationSettingsReader
messageService messagepkg.Service
settingsService *settings.Service
inboxService *inbox.Service
skillLoader SkillLoader
assetLoader gatewayAssetLoader
gatewayBaseURL string
@@ -129,6 +131,12 @@ func (r *Resolver) SetGatewayAssetLoader(loader gatewayAssetLoader) {
r.assetLoader = loader
}
// SetInboxService configures inbox support for injecting unread items into the
// system prompt and marking them as read after a response.
func (r *Resolver) SetInboxService(service *inbox.Service) {
r.inboxService = service
}
// --- gateway payload ---
type gatewayModelConfig struct {
@@ -156,6 +164,13 @@ type gatewaySkill struct {
Metadata map[string]any `json:"metadata,omitempty"`
}
type gatewayInboxItem struct {
ID string `json:"id"`
Source string `json:"source"`
Content map[string]any `json:"content"`
CreatedAt string `json:"createdAt"`
}
type gatewayRequest struct {
Model gatewayModelConfig `json:"model"`
ActiveContextTime int `json:"activeContextTime"`
@@ -168,12 +183,14 @@ type gatewayRequest struct {
Query string `json:"query"`
Identity gatewayIdentity `json:"identity"`
Attachments []any `json:"attachments"`
Inbox []gatewayInboxItem `json:"inbox,omitempty"`
}
type gatewayResponse struct {
Messages []conversation.ModelMessage `json:"messages"`
Skills []string `json:"skills"`
Usage json.RawMessage `json:"usage,omitempty"`
Usages []json.RawMessage `json:"usages,omitempty"`
}
type gatewayUsage struct {
@@ -219,9 +236,10 @@ func (t triggerScheduleRequest) MarshalJSON() ([]byte, error) {
// --- resolved context (shared by Chat / StreamChat / TriggerSchedule) ---
type resolvedContext struct {
payload gatewayRequest
model models.GetResponse
provider sqlc.LlmProvider
payload gatewayRequest
model models.GetResponse
provider sqlc.LlmProvider
inboxItemIDs []string
}
func (r *Resolver) resolve(ctx context.Context, req conversation.ChatRequest) (resolvedContext, error) {
@@ -285,6 +303,13 @@ func (r *Resolver) resolve(ctx context.Context, req conversation.ChatRequest) (r
historyBudget = 0
}
r.logger.Debug("context token budget",
slog.Int("max_tokens", maxTokens),
slog.Int("overhead", overhead),
slog.Int("system_prompt_reserve", systemPromptReserve),
slog.Int("history_budget", historyBudget),
)
var messages []conversation.ModelMessage
if !skipHistory && r.conversationSvc != nil {
loaded, loadErr := r.loadMessages(ctx, req.ChatID, maxCtx)
@@ -293,6 +318,12 @@ func (r *Resolver) resolve(ctx context.Context, req conversation.ChatRequest) (r
}
loaded = pruneHistoryForGateway(loaded)
messages = trimMessagesByTokens(loaded, historyBudget)
r.logger.Debug("context trim result",
slog.Int("loaded_messages", len(loaded)),
slog.Int("kept_messages", len(messages)),
slog.Int("trimmed_messages", len(loaded)-len(messages)),
slog.Int("history_budget", historyBudget),
)
}
if memoryMsg != nil {
messages = append(messages, *memoryMsg)
@@ -322,6 +353,31 @@ func (r *Resolver) resolve(ctx context.Context, req conversation.ChatRequest) (r
usableSkills = []gatewaySkill{}
}
var inboxGatewayItems []gatewayInboxItem
var inboxItemIDs []string
if r.inboxService != nil {
maxInbox := botSettings.MaxInboxItems
if maxInbox <= 0 {
maxInbox = settings.DefaultMaxInboxItems
}
items, err := r.inboxService.ListUnread(ctx, req.BotID, maxInbox)
if err != nil {
r.logger.Warn("failed to load inbox items", slog.String("bot_id", req.BotID), slog.Any("error", err))
} else if len(items) > 0 {
inboxGatewayItems = make([]gatewayInboxItem, 0, len(items))
inboxItemIDs = make([]string, 0, len(items))
for _, item := range items {
inboxGatewayItems = append(inboxGatewayItems, gatewayInboxItem{
ID: item.ID,
Source: item.Source,
Content: item.Content,
CreatedAt: item.CreatedAt.Format(time.RFC3339),
})
inboxItemIDs = append(inboxItemIDs, item.ID)
}
}
}
attachments := r.routeAndMergeAttachments(ctx, chatModel, req)
displayName := r.resolveDisplayName(ctx, req)
@@ -330,6 +386,7 @@ func (r *Resolver) resolve(ctx context.Context, req conversation.ChatRequest) (r
displayName,
req.CurrentChannel,
strings.TrimSpace(req.ConversationType),
strings.TrimSpace(req.ConversationName),
extractFileRefPaths(attachments),
req.Query,
)
@@ -360,9 +417,10 @@ func (r *Resolver) resolve(ctx context.Context, req conversation.ChatRequest) (r
SessionToken: req.ChatToken,
},
Attachments: attachments,
Inbox: inboxGatewayItems,
}
return resolvedContext{payload: payload, model: chatModel, provider: provider}, nil
return resolvedContext{payload: payload, model: chatModel, provider: provider, inboxItemIDs: inboxItemIDs}, nil
}
// --- Chat ---
@@ -378,9 +436,10 @@ func (r *Resolver) Chat(ctx context.Context, req conversation.ChatRequest) (conv
if err != nil {
return conversation.ChatResponse{}, err
}
if err := r.storeRound(ctx, req, resp.Messages, resp.Usage); err != nil {
if err := r.storeRound(ctx, req, resp.Messages, resp.Usage, resp.Usages); err != nil {
return conversation.ChatResponse{}, err
}
r.markInboxRead(ctx, req.BotID, rc.inboxItemIDs)
return conversation.ChatResponse{
Messages: resp.Messages,
Skills: resp.Skills,
@@ -432,7 +491,7 @@ func (r *Resolver) TriggerSchedule(ctx context.Context, botID string, payload sc
if err != nil {
return err
}
return r.storeRound(ctx, req, resp.Messages, resp.Usage)
return r.storeRound(ctx, req, resp.Messages, resp.Usage, resp.Usages)
}
// --- StreamChat ---
@@ -481,7 +540,9 @@ func (r *Resolver) StreamChat(ctx context.Context, req conversation.ChatRequest)
slog.Any("error", err),
)
errCh <- err
return
}
r.markInboxRead(ctx, streamReq.BotID, rc.inboxItemIDs)
}()
return chunkCh, errCh
}
@@ -679,15 +740,16 @@ func (r *Resolver) tryStoreStream(ctx context.Context, req conversation.ChatRequ
Data json.RawMessage `json:"data"`
Messages []conversation.ModelMessage `json:"messages"`
Usage json.RawMessage `json:"usage,omitempty"`
Usages []json.RawMessage `json:"usages,omitempty"`
}
if err := json.Unmarshal(data, &envelope); err == nil {
if (envelope.Type == "agent_end" || envelope.Type == "done") && len(envelope.Messages) > 0 {
return true, r.storeRound(ctx, req, envelope.Messages, envelope.Usage)
return true, r.storeRound(ctx, req, envelope.Messages, envelope.Usage, envelope.Usages)
}
if envelope.Type == "done" && len(envelope.Data) > 0 {
var resp gatewayResponse
if err := json.Unmarshal(envelope.Data, &resp); err == nil && len(resp.Messages) > 0 {
return true, r.storeRound(ctx, req, resp.Messages, resp.Usage)
return true, r.storeRound(ctx, req, resp.Messages, resp.Usage, resp.Usages)
}
}
}
@@ -695,7 +757,7 @@ func (r *Resolver) tryStoreStream(ctx context.Context, req conversation.ChatRequ
// fallback: data: {messages: [...]}
var resp gatewayResponse
if err := json.Unmarshal(data, &resp); err == nil && len(resp.Messages) > 0 {
return true, r.storeRound(ctx, req, resp.Messages, resp.Usage)
return true, r.storeRound(ctx, req, resp.Messages, resp.Usage, resp.Usages)
}
return false, nil
}
@@ -959,8 +1021,9 @@ func (r *Resolver) resolveContainerID(ctx context.Context, botID, explicit strin
// --- message loading ---
type messageWithUsage struct {
Message conversation.ModelMessage
UsageInputTokens *int
Message conversation.ModelMessage
UsageInputTokens *int
UsageOutputTokens *int
}
func (r *Resolver) loadMessages(ctx context.Context, chatID string, maxContextMinutes int) ([]messageWithUsage, error) {
@@ -968,7 +1031,7 @@ func (r *Resolver) loadMessages(ctx context.Context, chatID string, maxContextMi
return nil, nil
}
since := time.Now().UTC().Add(-time.Duration(maxContextMinutes) * time.Minute)
msgs, err := r.messageService.ListSince(ctx, chatID, since)
msgs, err := r.messageService.ListActiveSince(ctx, chatID, since)
if err != nil {
return nil, err
}
@@ -983,13 +1046,15 @@ func (r *Resolver) loadMessages(ctx context.Context, chatID string, maxContextMi
mm.Role = m.Role
}
var inputTokens *int
var outputTokens *int
if len(m.Usage) > 0 {
var u gatewayUsage
if json.Unmarshal(m.Usage, &u) == nil {
inputTokens = u.InputTokens
outputTokens = u.OutputTokens
}
}
result = append(result, messageWithUsage{Message: mm, UsageInputTokens: inputTokens})
result = append(result, messageWithUsage{Message: mm, UsageInputTokens: inputTokens, UsageOutputTokens: outputTokens})
}
return result, nil
}
@@ -1012,44 +1077,21 @@ func trimMessagesByTokens(messages []messageWithUsage, maxTokens int) []conversa
return result
}
// Scan backwards. When a message with UsageInputTokens is found, that value
// represents the cumulative input tokens for all messages up to and including
// that message. Messages after it are estimated with chars/4.
// Scan from newest to oldest, accumulating per-message outputTokens from
// stored usage data. Messages without usage (user / tool) are included for
// free — the outputTokens of surrounding assistant turns already account
// for the context they consumed.
totalTokens := 0
anchorFound := false
cutoff := 0
tailEstimate := 0
messagesWithUsage := 0
for i := len(messages) - 1; i >= 0; i-- {
if !anchorFound && messages[i].UsageInputTokens != nil {
anchorFound = true
totalTokens = *messages[i].UsageInputTokens + tailEstimate
if totalTokens > maxTokens {
cutoff = i + 1
break
}
continue
if messages[i].UsageOutputTokens != nil {
totalTokens += *messages[i].UsageOutputTokens
messagesWithUsage++
}
est := estimateMessageTokens(messages[i].Message)
if anchorFound {
totalTokens += est
if totalTokens > maxTokens {
cutoff = i + 1
break
}
} else {
tailEstimate += est
}
}
if !anchorFound {
totalTokens = 0
for i := len(messages) - 1; i >= 0; i-- {
totalTokens += estimateMessageTokens(messages[i].Message)
if totalTokens > maxTokens {
cutoff = i + 1
break
}
if totalTokens > maxTokens {
cutoff = i + 1
break
}
}
@@ -1060,6 +1102,15 @@ func trimMessagesByTokens(messages []messageWithUsage, maxTokens int) []conversa
cutoff++
}
slog.Debug("trimMessagesByTokens",
slog.Int("total_messages", len(messages)),
slog.Int("messages_with_usage", messagesWithUsage),
slog.Int("accumulated_output_tokens", totalTokens),
slog.Int("max_tokens", maxTokens),
slog.Int("cutoff_index", cutoff),
slog.Int("kept_messages", len(messages)-cutoff),
)
result := make([]conversation.ModelMessage, 0, len(messages)-cutoff)
for _, m := range messages[cutoff:] {
result = append(result, m.Message)
@@ -1187,24 +1238,28 @@ func (r *Resolver) persistUserMessage(ctx context.Context, req conversation.Chat
return err
}
func (r *Resolver) storeRound(ctx context.Context, req conversation.ChatRequest, messages []conversation.ModelMessage, usage json.RawMessage) error {
func (r *Resolver) storeRound(ctx context.Context, req conversation.ChatRequest, messages []conversation.ModelMessage, usage json.RawMessage, usages []json.RawMessage) error {
fullRound := make([]conversation.ModelMessage, 0, len(messages))
for _, m := range messages {
roundUsages := make([]json.RawMessage, 0, len(usages))
for i, m := range messages {
if req.UserMessagePersisted && m.Role == "user" && strings.TrimSpace(m.TextContent()) == strings.TrimSpace(req.Query) {
continue
}
fullRound = append(fullRound, m)
if i < len(usages) {
roundUsages = append(roundUsages, usages[i])
}
}
if len(fullRound) == 0 {
return nil
}
r.storeMessages(ctx, req, fullRound, usage)
r.storeMessages(ctx, req, fullRound, usage, roundUsages)
go r.storeMemory(context.WithoutCancel(ctx), req.BotID, fullRound)
return nil
}
func (r *Resolver) storeMessages(ctx context.Context, req conversation.ChatRequest, messages []conversation.ModelMessage, usage json.RawMessage) {
func (r *Resolver) storeMessages(ctx context.Context, req conversation.ChatRequest, messages []conversation.ModelMessage, usage json.RawMessage, usages []json.RawMessage) {
if r.messageService == nil {
return
}
@@ -1254,7 +1309,9 @@ func (r *Resolver) storeMessages(ctx context.Context, req conversation.ChatReque
assets = append(assets, outboundAssets...)
}
var msgUsage json.RawMessage
if i == len(messages)-1 && len(usage) > 0 {
if i < len(usages) && len(usages[i]) > 0 && !isJSONNull(usages[i]) {
msgUsage = usages[i]
} else if i == len(messages)-1 && len(usage) > 0 {
msgUsage = usage
}
if _, err := r.messageService.Persist(ctx, messagepkg.PersistInput{
@@ -1276,6 +1333,10 @@ func (r *Resolver) storeMessages(ctx context.Context, req conversation.ChatReque
}
}
func isJSONNull(data json.RawMessage) bool {
return len(data) == 0 || bytes.Equal(bytes.TrimSpace(data), []byte("null"))
}
// outboundAssetRefsToMessageRefs converts outbound asset refs from the streaming
// collector into message-level asset refs for persistence.
func outboundAssetRefsToMessageRefs(refs []conversation.OutboundAssetRef) []messagepkg.AssetRef {
@@ -1569,6 +1630,17 @@ func (r *Resolver) listCandidates(ctx context.Context, providerFilter string) ([
return filtered, nil
}
// --- inbox ---
func (r *Resolver) markInboxRead(ctx context.Context, botID string, ids []string) {
if r.inboxService == nil || len(ids) == 0 {
return
}
if err := r.inboxService.MarkRead(ctx, botID, ids); err != nil {
r.logger.Warn("failed to mark inbox items as read", slog.String("bot_id", botID), slog.Any("error", err))
}
}
// --- settings ---
func (r *Resolver) loadBotSettings(ctx context.Context, botID string) (settings.Settings, error) {
@@ -1696,21 +1768,78 @@ func parseResolverUUID(id string) (pgtype.UUID, error) {
return db.ParseUUID(id)
}
// UserMessageMeta holds the structured metadata attached to every user
// message. It is the single source of truth shared by the YAML header
// (sent to the LLM) and the inbox content JSONB.
type UserMessageMeta struct {
ChannelIdentityID string `json:"channel-identity-id"`
DisplayName string `json:"display-name"`
Channel string `json:"channel"`
ConversationType string `json:"conversation-type"`
ConversationName string `json:"conversation-name,omitempty"`
Time string `json:"time"`
AttachmentPaths []string `json:"attachments"`
}
// BuildUserMessageMeta constructs a UserMessageMeta from the inbound
// parameters. Both FormatUserHeader and inbox content use this.
func BuildUserMessageMeta(channelIdentityID, displayName, channel, conversationType, conversationName string, attachmentPaths []string) UserMessageMeta {
if attachmentPaths == nil {
attachmentPaths = []string{}
}
return UserMessageMeta{
ChannelIdentityID: channelIdentityID,
DisplayName: displayName,
Channel: channel,
ConversationType: conversationType,
ConversationName: conversationName,
Time: time.Now().UTC().Format(time.RFC3339),
AttachmentPaths: attachmentPaths,
}
}
// ToMap returns the metadata as a map with the same keys used in the YAML
// header, suitable for storing as inbox content JSONB.
func (m UserMessageMeta) ToMap() map[string]any {
result := map[string]any{
"channel-identity-id": m.ChannelIdentityID,
"display-name": m.DisplayName,
"channel": m.Channel,
"conversation-type": m.ConversationType,
"time": m.Time,
"attachments": m.AttachmentPaths,
}
if m.ConversationName != "" {
result["conversation-name"] = m.ConversationName
}
return result
}
// FormatUserHeader wraps a user query with YAML front-matter metadata so
// the LLM sees structured context (sender, channel, time, attachments)
// alongside the raw message. This must be the single source of truth for
// user-message formatting — the agent gateway must NOT add its own header.
func FormatUserHeader(channelIdentityID, displayName, channel, conversationType string, attachmentPaths []string, query string) string {
func FormatUserHeader(channelIdentityID, displayName, channel, conversationType, conversationName string, attachmentPaths []string, query string) string {
meta := BuildUserMessageMeta(channelIdentityID, displayName, channel, conversationType, conversationName, attachmentPaths)
return FormatUserHeaderFromMeta(meta, query)
}
// FormatUserHeaderFromMeta formats a pre-built UserMessageMeta into the
// YAML front-matter string sent to the LLM.
func FormatUserHeaderFromMeta(meta UserMessageMeta, query string) string {
var sb strings.Builder
sb.WriteString("---\n")
writeYAMLString(&sb, "channel-identity-id", channelIdentityID)
writeYAMLString(&sb, "display-name", displayName)
writeYAMLString(&sb, "channel", channel)
writeYAMLString(&sb, "conversation-type", conversationType)
writeYAMLString(&sb, "time", time.Now().UTC().Format(time.RFC3339))
if len(attachmentPaths) > 0 {
writeYAMLString(&sb, "channel-identity-id", meta.ChannelIdentityID)
writeYAMLString(&sb, "display-name", meta.DisplayName)
writeYAMLString(&sb, "channel", meta.Channel)
writeYAMLString(&sb, "conversation-type", meta.ConversationType)
if meta.ConversationName != "" {
writeYAMLString(&sb, "conversation-name", meta.ConversationName)
}
writeYAMLString(&sb, "time", meta.Time)
if len(meta.AttachmentPaths) > 0 {
sb.WriteString("attachments:\n")
for _, p := range attachmentPaths {
for _, p := range meta.AttachmentPaths {
sb.WriteString(" - ")
sb.WriteString(p)
sb.WriteByte('\n')