Files
Memoh/internal/message/service.go
T
Acbox Liu 1680316c7f refactor(agent): remove agent gateway instead of twilight sdk (#264)
* refactor(agent): replace TypeScript agent gateway with in-process Go agent using twilight-ai SDK

- Remove apps/agent (Bun/Elysia gateway), packages/agent (@memoh/agent),
  internal/bun runtime manager, and all embedded agent/bun assets
- Add internal/agent package powered by twilight-ai SDK for LLM calls,
  tool execution, streaming, sential logic, tag extraction, and prompts
- Integrate ToolGatewayService in-process for both built-in and user MCP
  tools, eliminating HTTP round-trips to the old gateway
- Update resolver to convert between sdk.Message and ModelMessage at the
  boundary (resolver_messages.go), keeping agent package free of
  persistence concerns
- Prepend user message before storeRound since SDK only returns output
  messages (assistant + tool)
- Clean up all Docker configs, TOML configs, nginx proxy, Dockerfile.agent,
  and Go config structs related to the removed agent gateway
- Update cmd/agent and cmd/memoh entry points with setter-based
  ToolGateway injection to avoid FX dependency cycles

* fix(web): move form declaration before computed properties that reference it

The `form` reactive object was declared after computed properties like
`selectedMemoryProvider` and `isSelectedMemoryProviderPersisted` that
reference it, causing a TDZ ReferenceError during setup.

* fix: prevent UTF-8 character corruption in streaming text output

StreamTagExtractor.Push() used byte-level string slicing to hold back
buffer tails for tag detection, which could split multi-byte UTF-8
characters. After json.Marshal replaced invalid bytes with U+FFFD,
the corruption became permanent — causing garbled CJK characters (�)
in agent responses.

Add safeUTF8SplitIndex() to back up split points to valid character
boundaries. Also fix byte-level truncation in command/formatter.go
and command/fs.go to use rune-aware slicing.

* fix: add agent error logging and fix Gemini tool schema validation

- Log agent stream errors in both SSE and WebSocket paths with bot/model context
- Fix send tool `attachments` parameter: empty `items` schema rejected by
  Google Gemini API (INVALID_ARGUMENT), now specifies `{"type": "string"}`
- Upgrade twilight-ai to d898f0b (includes raw body in API error messages)

* chore(ci): remove agent gateway from Docker build and release pipelines

Agent gateway has been replaced by in-process Go agent; remove the
obsolete Docker image matrix entry, Bun/UPX CI steps, and agent-binary
build logic from the release script.

* fix: preserve attachment filename, metadata, and container path through persistence

- Add `name` column to `bot_history_message_assets` (migration 0034) to
  persist original filenames across page refreshes.
- Add `metadata` JSONB column (migration 0035) to store source_path,
  source_url, and other context alongside each asset.
- Update SQL queries, sqlc-generated code, and all Go types (MessageAsset,
  AssetRef, OutboundAssetRef, FileAttachment) to carry name and metadata
  through the full lifecycle.
- Extract filenames from path/URL in AttachmentsResolver before clearing
  raw paths; enrich streaming event metadata with name, source_path, and
  source_url in both the WebSocket and channel inbound ingestion paths.
- Implement `LinkAssets` on message service and `LinkOutboundAssets` on
  flow resolver so WebSocket-streamed bot attachments are persisted to the
  correct assistant message after streaming completes.
- Frontend: update MessageAsset type with metadata field, pass metadata
  through to attachment items, and reorder attachment-block.vue template
  so container files (identified by metadata.source_path) open in the
  sidebar file manager instead of triggering a download.

* refactor(agent): decouple built-in tools from MCP, load via ToolProvider interface

Migrate all 13 built-in tool providers from internal/mcp/providers/ to
internal/agent/tools/ using the twilight-ai sdk.Tool structure. The agent
now loads tools through a ToolProvider interface instead of the MCP
ToolGatewayService, which is simplified to only manage external federation
sources. This enables selective tool loading and removes the coupling
between business tools and the MCP protocol layer.

* refactor(flow): split monolithic resolver.go into focused modules

Break the 1959-line resolver.go into 12 files organized by concern:
- resolver.go: core orchestration (Resolver struct, resolve, Chat, prepareRunConfig)
- resolver_stream.go: streaming (StreamChat, StreamChatWS, tryStoreStream)
- resolver_trigger.go: schedule/heartbeat triggers
- resolver_attachments.go: attachment routing, inlining, encoding
- resolver_history.go: message loading, deduplication, token trimming
- resolver_store.go: persistence (storeRound, storeMessages, asset linking)
- resolver_memory.go: memory provider integration
- resolver_model_selection.go: model selection and candidate matching
- resolver_identity.go: display name and channel identity resolution
- resolver_settings.go: bot settings, loop detection, inbox
- user_header.go: YAML front-matter formatting
- resolver_util.go: shared utilities (sanitize, normalize, dedup, UUID)

* fix(agent): enable Anthropic extended thinking by passing ReasoningConfig to provider

Anthropic's thinking requires WithThinking() at provider creation time,
unlike OpenAI which uses per-request ReasoningEffort. The config was
never wired through, so Claude models could not trigger thinking.

* refactor(agent): extract prompts into embedded markdown templates

Move inline prompt strings from prompt.go into separate .md files under
internal/agent/prompts/, using {{key}} placeholders and a simple render
engine. Remove obsolete SystemPromptParams fields (Language,
MaxContextLoadTime, Channels, CurrentChannel) and their call-site usage.

* fix: lint
2026-03-19 13:31:54 +08:00

613 lines
16 KiB
Go

package message
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"math"
"strings"
"time"
"github.com/jackc/pgx/v5/pgtype"
dbpkg "github.com/memohai/memoh/internal/db"
"github.com/memohai/memoh/internal/db/sqlc"
"github.com/memohai/memoh/internal/message/event"
)
// DBService persists and reads bot history messages.
type DBService struct {
queries *sqlc.Queries
logger *slog.Logger
publisher event.Publisher
}
// NewService creates a message service.
func NewService(log *slog.Logger, queries *sqlc.Queries, publishers ...event.Publisher) *DBService {
if log == nil {
log = slog.Default()
}
var publisher event.Publisher
if len(publishers) > 0 {
publisher = publishers[0]
}
return &DBService{
queries: queries,
logger: log.With(slog.String("service", "message")),
publisher: publisher,
}
}
// Persist writes a single message to bot_history_messages.
func (s *DBService) Persist(ctx context.Context, input PersistInput) (Message, error) {
pgBotID, err := dbpkg.ParseUUID(input.BotID)
if err != nil {
return Message{}, fmt.Errorf("invalid bot id: %w", err)
}
pgRouteID, err := parseOptionalUUID(input.RouteID)
if err != nil {
return Message{}, fmt.Errorf("invalid route id: %w", err)
}
pgSenderChannelIdentityID, err := parseOptionalUUID(input.SenderChannelIdentityID)
if err != nil {
return Message{}, fmt.Errorf("invalid sender channel identity id: %w", err)
}
pgSenderUserID, err := parseOptionalUUID(input.SenderUserID)
if err != nil {
return Message{}, fmt.Errorf("invalid sender user id: %w", err)
}
pgModelID, err := parseOptionalUUID(input.ModelID)
if err != nil {
return Message{}, fmt.Errorf("invalid model id: %w", err)
}
metaBytes, err := json.Marshal(nonNilMap(input.Metadata))
if err != nil {
return Message{}, fmt.Errorf("marshal message metadata: %w", err)
}
content := input.Content
if len(content) == 0 {
content = []byte("{}")
}
row, err := s.queries.CreateMessage(ctx, sqlc.CreateMessageParams{
BotID: pgBotID,
RouteID: pgRouteID,
SenderChannelIdentityID: pgSenderChannelIdentityID,
SenderUserID: pgSenderUserID,
Platform: toPgText(input.Platform),
ExternalMessageID: toPgText(input.ExternalMessageID),
SourceReplyToMessageID: toPgText(input.SourceReplyToMessageID),
Role: input.Role,
Content: content,
Metadata: metaBytes,
Usage: input.Usage,
ModelID: pgModelID,
})
if err != nil {
return Message{}, err
}
result := toMessageFromCreate(row)
// Persist asset links if provided.
for _, ref := range input.Assets {
pgMsgID := row.ID
role := ref.Role
if strings.TrimSpace(role) == "" {
role = "attachment"
}
contentHash := strings.TrimSpace(ref.ContentHash)
if contentHash == "" {
s.logger.Warn("skip asset ref without content_hash")
continue
}
if ref.Ordinal < math.MinInt32 || ref.Ordinal > math.MaxInt32 {
return Message{}, fmt.Errorf("asset ordinal out of range: %d", ref.Ordinal)
}
if _, assetErr := s.queries.CreateMessageAsset(ctx, sqlc.CreateMessageAssetParams{
MessageID: pgMsgID,
Role: role,
Ordinal: int32(ref.Ordinal),
ContentHash: contentHash,
Name: ref.Name,
Metadata: marshalMetadata(ref.Metadata),
}); assetErr != nil {
s.logger.Warn("create message asset link failed", slog.String("message_id", result.ID), slog.Any("error", assetErr))
}
}
// Populate assets from input refs for SSE so consumers see them immediately.
// DB only stores the link (content_hash); mime/size/storage_key come from the caller.
if len(input.Assets) > 0 {
assets := make([]MessageAsset, 0, len(input.Assets))
for _, ref := range input.Assets {
ch := strings.TrimSpace(ref.ContentHash)
if ch == "" {
continue
}
assets = append(assets, MessageAsset{
ContentHash: ch,
Role: coalesce(ref.Role, "attachment"),
Ordinal: ref.Ordinal,
Mime: ref.Mime,
SizeBytes: ref.SizeBytes,
StorageKey: ref.StorageKey,
Name: ref.Name,
Metadata: ref.Metadata,
})
}
result.Assets = assets
}
s.publishMessageCreated(result)
return result, nil
}
// List returns all messages for a bot.
func (s *DBService) List(ctx context.Context, botID string) ([]Message, error) {
pgBotID, err := dbpkg.ParseUUID(botID)
if err != nil {
return nil, err
}
rows, err := s.queries.ListMessages(ctx, pgBotID)
if err != nil {
return nil, err
}
msgs := toMessagesFromList(rows)
s.enrichAssets(ctx, msgs)
return msgs, nil
}
// ListSince returns bot messages since a given time.
func (s *DBService) ListSince(ctx context.Context, botID string, since time.Time) ([]Message, error) {
pgBotID, err := dbpkg.ParseUUID(botID)
if err != nil {
return nil, err
}
rows, err := s.queries.ListMessagesSince(ctx, sqlc.ListMessagesSinceParams{
BotID: pgBotID,
CreatedAt: pgtype.Timestamptz{Time: since, Valid: true},
})
if err != nil {
return nil, err
}
msgs := toMessagesFromSince(rows)
s.enrichAssets(ctx, msgs)
return msgs, nil
}
// ListActiveSince returns bot messages since a given time, excluding passive_sync messages.
func (s *DBService) ListActiveSince(ctx context.Context, botID string, since time.Time) ([]Message, error) {
pgBotID, err := dbpkg.ParseUUID(botID)
if err != nil {
return nil, err
}
rows, err := s.queries.ListActiveMessagesSince(ctx, sqlc.ListActiveMessagesSinceParams{
BotID: pgBotID,
CreatedAt: pgtype.Timestamptz{Time: since, Valid: true},
})
if err != nil {
return nil, err
}
msgs := toMessagesFromActiveSince(rows)
s.enrichAssets(ctx, msgs)
return msgs, nil
}
// ListLatest returns the latest N bot messages (newest first in DB; caller may reverse for ASC).
func (s *DBService) ListLatest(ctx context.Context, botID string, limit int32) ([]Message, error) {
pgBotID, err := dbpkg.ParseUUID(botID)
if err != nil {
return nil, err
}
rows, err := s.queries.ListMessagesLatest(ctx, sqlc.ListMessagesLatestParams{
BotID: pgBotID,
MaxCount: limit,
})
if err != nil {
return nil, err
}
msgs := toMessagesFromLatest(rows)
s.enrichAssets(ctx, msgs)
return msgs, nil
}
// ListBefore returns up to limit messages older than before (created_at < before), ordered oldest-first.
func (s *DBService) ListBefore(ctx context.Context, botID string, before time.Time, limit int32) ([]Message, error) {
pgBotID, err := dbpkg.ParseUUID(botID)
if err != nil {
return nil, err
}
rows, err := s.queries.ListMessagesBefore(ctx, sqlc.ListMessagesBeforeParams{
BotID: pgBotID,
CreatedAt: pgtype.Timestamptz{Time: before, Valid: true},
MaxCount: limit,
})
if err != nil {
return nil, err
}
msgs := toMessagesFromBefore(rows)
s.enrichAssets(ctx, msgs)
return msgs, nil
}
// LinkAssets links asset refs to an existing persisted message.
func (s *DBService) LinkAssets(ctx context.Context, messageID string, assets []AssetRef) error {
pgMsgID, err := dbpkg.ParseUUID(messageID)
if err != nil {
return fmt.Errorf("invalid message id: %w", err)
}
for _, ref := range assets {
contentHash := strings.TrimSpace(ref.ContentHash)
if contentHash == "" {
continue
}
role := ref.Role
if strings.TrimSpace(role) == "" {
role = "attachment"
}
if ref.Ordinal < math.MinInt32 || ref.Ordinal > math.MaxInt32 {
return fmt.Errorf("asset ordinal out of range: %d", ref.Ordinal)
}
if _, assetErr := s.queries.CreateMessageAsset(ctx, sqlc.CreateMessageAssetParams{
MessageID: pgMsgID,
Role: role,
Ordinal: int32(ref.Ordinal),
ContentHash: contentHash,
Name: ref.Name,
Metadata: marshalMetadata(ref.Metadata),
}); assetErr != nil {
s.logger.Warn("link asset failed", slog.String("message_id", messageID), slog.Any("error", assetErr))
}
}
return nil
}
// DeleteByBot deletes all messages for a bot.
func (s *DBService) DeleteByBot(ctx context.Context, botID string) error {
pgBotID, err := dbpkg.ParseUUID(botID)
if err != nil {
return err
}
return s.queries.DeleteMessagesByBot(ctx, pgBotID)
}
func toMessageFromCreate(row sqlc.CreateMessageRow) Message {
return toMessageFields(
row.ID,
row.BotID,
row.RouteID,
row.SenderChannelIdentityID,
row.SenderUserID,
pgtype.Text{},
pgtype.Text{},
row.Platform,
row.ExternalMessageID,
row.SourceReplyToMessageID,
row.Role,
row.Content,
row.Metadata,
row.Usage,
row.CreatedAt,
)
}
func toMessageFromListRow(row sqlc.ListMessagesRow) Message {
return toMessageFields(
row.ID,
row.BotID,
row.RouteID,
row.SenderChannelIdentityID,
row.SenderUserID,
row.SenderDisplayName,
row.SenderAvatarUrl,
row.Platform,
row.ExternalMessageID,
row.SourceReplyToMessageID,
row.Role,
row.Content,
row.Metadata,
row.Usage,
row.CreatedAt,
)
}
func toMessageFromSinceRow(row sqlc.ListMessagesSinceRow) Message {
return toMessageFields(
row.ID,
row.BotID,
row.RouteID,
row.SenderChannelIdentityID,
row.SenderUserID,
row.SenderDisplayName,
row.SenderAvatarUrl,
row.Platform,
row.ExternalMessageID,
row.SourceReplyToMessageID,
row.Role,
row.Content,
row.Metadata,
row.Usage,
row.CreatedAt,
)
}
func toMessageFromActiveSinceRow(row sqlc.ListActiveMessagesSinceRow) Message {
return toMessageFields(
row.ID,
row.BotID,
row.RouteID,
row.SenderChannelIdentityID,
row.SenderUserID,
row.SenderDisplayName,
row.SenderAvatarUrl,
row.Platform,
row.ExternalMessageID,
row.SourceReplyToMessageID,
row.Role,
row.Content,
row.Metadata,
row.Usage,
row.CreatedAt,
)
}
func toMessageFromLatestRow(row sqlc.ListMessagesLatestRow) Message {
return toMessageFields(
row.ID,
row.BotID,
row.RouteID,
row.SenderChannelIdentityID,
row.SenderUserID,
row.SenderDisplayName,
row.SenderAvatarUrl,
row.Platform,
row.ExternalMessageID,
row.SourceReplyToMessageID,
row.Role,
row.Content,
row.Metadata,
row.Usage,
row.CreatedAt,
)
}
func toMessageFields(
id pgtype.UUID,
botID pgtype.UUID,
routeID pgtype.UUID,
senderChannelIdentityID pgtype.UUID,
senderUserID pgtype.UUID,
senderDisplayName pgtype.Text,
senderAvatarURL pgtype.Text,
platform pgtype.Text,
externalMessageID pgtype.Text,
sourceReplyToMessageID pgtype.Text,
role string,
content []byte,
metadata []byte,
usage []byte,
createdAt pgtype.Timestamptz,
) Message {
return Message{
ID: id.String(),
BotID: botID.String(),
RouteID: routeID.String(),
SenderChannelIdentityID: senderChannelIdentityID.String(),
SenderUserID: senderUserID.String(),
SenderDisplayName: dbpkg.TextToString(senderDisplayName),
SenderAvatarURL: dbpkg.TextToString(senderAvatarURL),
Platform: dbpkg.TextToString(platform),
ExternalMessageID: dbpkg.TextToString(externalMessageID),
SourceReplyToMessageID: dbpkg.TextToString(sourceReplyToMessageID),
Role: role,
Content: json.RawMessage(content),
Metadata: parseJSONMap(metadata),
Usage: json.RawMessage(usage),
CreatedAt: createdAt.Time,
}
}
func toMessagesFromList(rows []sqlc.ListMessagesRow) []Message {
messages := make([]Message, 0, len(rows))
for _, row := range rows {
messages = append(messages, toMessageFromListRow(row))
}
return messages
}
func toMessagesFromSince(rows []sqlc.ListMessagesSinceRow) []Message {
messages := make([]Message, 0, len(rows))
for _, row := range rows {
messages = append(messages, toMessageFromSinceRow(row))
}
return messages
}
func toMessagesFromActiveSince(rows []sqlc.ListActiveMessagesSinceRow) []Message {
messages := make([]Message, 0, len(rows))
for _, row := range rows {
messages = append(messages, toMessageFromActiveSinceRow(row))
}
return messages
}
func toMessagesFromLatest(rows []sqlc.ListMessagesLatestRow) []Message {
messages := make([]Message, 0, len(rows))
for _, row := range rows {
messages = append(messages, toMessageFromLatestRow(row))
}
return messages
}
func toMessageFromBeforeRow(row sqlc.ListMessagesBeforeRow) Message {
return toMessageFields(
row.ID,
row.BotID,
row.RouteID,
row.SenderChannelIdentityID,
row.SenderUserID,
row.SenderDisplayName,
row.SenderAvatarUrl,
row.Platform,
row.ExternalMessageID,
row.SourceReplyToMessageID,
row.Role,
row.Content,
row.Metadata,
row.Usage,
row.CreatedAt,
)
}
// toMessagesFromBefore returns messages in oldest-first order (ListMessagesBefore returns DESC; we reverse).
func toMessagesFromBefore(rows []sqlc.ListMessagesBeforeRow) []Message {
messages := make([]Message, 0, len(rows))
for i := len(rows) - 1; i >= 0; i-- {
messages = append(messages, toMessageFromBeforeRow(rows[i]))
}
return messages
}
func parseOptionalUUID(id string) (pgtype.UUID, error) {
if strings.TrimSpace(id) == "" {
return pgtype.UUID{}, nil
}
return dbpkg.ParseUUID(id)
}
func toPgText(value string) pgtype.Text {
value = strings.TrimSpace(value)
if value == "" {
return pgtype.Text{}
}
return pgtype.Text{String: value, Valid: true}
}
func nonNilMap(m map[string]any) map[string]any {
if m == nil {
return map[string]any{}
}
return m
}
func coalesce(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return v
}
}
return ""
}
func parseJSONMap(data []byte) map[string]any {
if len(data) == 0 {
return nil
}
var m map[string]any
_ = json.Unmarshal(data, &m)
return m
}
func (s *DBService) publishMessageCreated(message Message) {
if s.publisher == nil {
return
}
payload, err := json.Marshal(message)
if err != nil {
if s.logger != nil {
s.logger.Warn("marshal message event failed", slog.Any("error", err))
}
return
}
s.publisher.Publish(event.Event{
Type: event.EventTypeMessageCreated,
BotID: strings.TrimSpace(message.BotID),
Data: payload,
})
}
// enrichAssets batch-loads asset links for a list of messages (single-table query).
// On DB error (e.g. missing content_hash column), we skip enrichment and leave Assets empty
// so the list request still returns all messages and does not fail.
func (s *DBService) enrichAssets(ctx context.Context, messages []Message) {
if len(messages) == 0 {
return
}
ids := make([]pgtype.UUID, 0, len(messages))
for _, m := range messages {
pgID, err := dbpkg.ParseUUID(m.ID)
if err != nil {
continue
}
ids = append(ids, pgID)
}
if len(ids) == 0 {
return
}
rows, err := s.queries.ListMessageAssetsBatch(ctx, ids)
if err != nil {
s.logger.Warn("enrich assets failed, returning messages without assets", slog.Any("error", err))
ensureAssetsSlice(messages)
return
}
assetMap := map[string][]MessageAsset{}
for _, row := range rows {
msgID := row.MessageID.String()
contentHash := strings.TrimSpace(row.ContentHash)
if contentHash == "" {
continue
}
assetMap[msgID] = append(assetMap[msgID], MessageAsset{
ContentHash: contentHash,
Role: row.Role,
Ordinal: int(row.Ordinal),
Name: row.Name,
Metadata: unmarshalMetadata(row.Metadata),
})
}
for i := range messages {
if assets, ok := assetMap[messages[i].ID]; ok {
messages[i].Assets = assets
} else {
messages[i].Assets = []MessageAsset{}
}
}
}
// ensureAssetsSlice sets Assets to a non-nil empty slice for each message so JSON is "assets": [].
// Used when enrich fails so frontend gets a consistent shape and does not treat missing assets as broken.
func ensureAssetsSlice(messages []Message) {
for i := range messages {
if messages[i].Assets == nil {
messages[i].Assets = []MessageAsset{}
}
}
}
func marshalMetadata(m map[string]any) []byte {
if len(m) == 0 {
return []byte("{}")
}
b, err := json.Marshal(m)
if err != nil {
return []byte("{}")
}
return b
}
func unmarshalMetadata(b []byte) map[string]any {
if len(b) == 0 {
return nil
}
var m map[string]any
if err := json.Unmarshal(b, &m); err != nil || len(m) == 0 {
return nil
}
return m
}