Files
Memoh/internal/agent/tools/subagent.go
T
Acbox Liu 8d5c38f0e5 refactor: unify providers and models tables (#338)
* refactor: unify providers and models tables

- Rename `llm_providers` → `providers`, `llm_provider_oauth_tokens` → `provider_oauth_tokens`
- Remove `tts_providers` and `tts_models` tables; speech models now live in the unified `models` table with `type = 'speech'`
- Replace top-level `api_key`/`base_url` columns with a JSONB `config` field on `providers`
- Rename `llm_provider_id` → `provider_id` across all references
- Add `edge-speech` client type and `conf/providers/edge.yaml` default provider
- Create new read-only speech endpoints (`/speech-providers`, `/speech-models`) backed by filtered views of the unified tables
- Remove old TTS CRUD handlers; simplify speech page to read-only + test
- Update registry loader to skip malformed YAML files instead of failing entirely
- Fix YAML quoting for model names containing colons in openrouter.yaml
- Regenerate sqlc, swagger, and TypeScript SDK

* fix: exclude speech providers from providers list endpoint

ListProviders now filters out client_type matching '%-speech' so Edge
and future speech providers no longer appear on the Providers page.
ListSpeechProviders uses the same pattern match instead of hard-coding
'edge-speech'.

* fix: use explicit client_type list instead of LIKE pattern

Replace '%-speech' pattern with explicit IN ('edge-speech') for both
ListProviders (exclusion) and ListSpeechProviders (inclusion). New
speech client types must be added to both queries.

* fix: use EXECUTE for dynamic SQL in migrations referencing old schema

PL/pgSQL pre-validates column/table references in static SQL statements
inside DO blocks before evaluating IF/RETURN guards. This caused
migrations 0010-0061 to fail on fresh databases where the canonical
schema uses `providers`/`provider_id` instead of `llm_providers`/
`llm_provider_id`.

Wrap all SQL that references potentially non-existent old schema objects
(llm_providers, llm_provider_id, tts_providers, tts_models, etc.) in
EXECUTE strings so they are only parsed at runtime when actually reached.

* fix: revert canonical schema to use llm_providers for migration compatibility

The CI migrations workflow (up → down → up) failed because 0061 down
renames `providers` back to `llm_providers`, but 0001 down only dropped
`providers` — leaving `llm_providers` as a remnant. On the second
migrate up, 0010 found the stale `llm_providers` and tried to reference
`models.llm_provider_id` which no longer existed.

Revert 0001 canonical schema to use original names (llm_providers,
tts_providers, tts_models) so incremental migrations work naturally and
0061 handles the final rename. Remove EXECUTE wrappers and unnecessary
guards from migrations that now always operate on llm_providers.

* fix: icons

* fix: sync canonical schema with 0061 migration to fix sqlc column mismatch

0001_init.up.sql still used old names (llm_providers, llm_provider_id)
and included dropped tts_providers/tts_models tables. sqlc could not
parse the PL/pgSQL EXECUTE in migration 0061, so generated code retained
stale columns (input_modalities, supports_reasoning) causing runtime
"column does not exist" errors when adding models.

- Update 0001_init.up.sql to current schema (providers, provider_id,
  no tts tables, add provider_oauth_tokens)
- Use ALTER TABLE IF EXISTS in 0010/0041/0042 for backward compat
- Regenerate sqlc

* fix: guard all legacy migrations against fresh schema for CI compat

On fresh databases, 0001_init.up.sql creates providers/provider_id
(not llm_providers/llm_provider_id). Migrations 0013, 0041, 0046, 0047
referenced the old names without guards, causing CI migration failures.

- 0013: check llm_provider_id column exists before adding old constraint
- 0041: check llm_providers table exists before backfill/constraint DDL
- 0046: wrap CREATE TABLE in DO block with llm_providers existence check
- 0047: use ALTER TABLE IF EXISTS + DO block guard
2026-04-08 01:03:44 +08:00

371 lines
10 KiB
Go

package tools
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"strings"
"sync"
sdk "github.com/memohai/twilight-ai/sdk"
"github.com/memohai/memoh/internal/db/sqlc"
messagepkg "github.com/memohai/memoh/internal/message"
"github.com/memohai/memoh/internal/models"
"github.com/memohai/memoh/internal/providers"
sessionpkg "github.com/memohai/memoh/internal/session"
"github.com/memohai/memoh/internal/settings"
)
// SpawnAgent is the interface the spawn tool uses to run subagent tasks.
// It is satisfied by *agent.Agent and avoids an import cycle.
type SpawnAgent interface {
Generate(ctx context.Context, cfg SpawnRunConfig) (*SpawnResult, error)
}
// SpawnRunConfig mirrors agent.RunConfig fields needed by spawn.
type SpawnRunConfig struct {
Model *sdk.Model
System string
Query string
SessionType string
Identity SpawnIdentity
LoopDetection SpawnLoopConfig
Messages []sdk.Message
ReasoningEffort string
}
// SpawnIdentity mirrors agent.SessionContext fields needed by spawn.
type SpawnIdentity struct {
BotID string
ChatID string
SessionID string
ChannelIdentityID string
CurrentPlatform string
SessionToken string //nolint:gosec // #nosec G117 -- session identifier, not a secret
IsSubagent bool
}
// SpawnLoopConfig mirrors agent.LoopDetectionConfig.
type SpawnLoopConfig struct {
Enabled bool
}
// SpawnResult mirrors agent.GenerateResult.
type SpawnResult struct {
Messages []sdk.Message
Text string
Usage *sdk.Usage
}
// SpawnProvider exposes a "spawn" tool that runs one or more subagent tasks
// concurrently and returns results to the parent agent.
type SpawnProvider struct {
agent SpawnAgent
settings *settings.Service
models *models.Service
queries *sqlc.Queries
sessionService *sessionpkg.Service
messageService messagepkg.Writer
systemPromptFn func(sessionType string) string
modelCreator ModelCreator
logger *slog.Logger
}
// NewSpawnProvider creates a SpawnProvider. The agent must be injected later
// via SetAgent to avoid a dependency cycle.
func NewSpawnProvider(
log *slog.Logger,
settingsSvc *settings.Service,
modelsSvc *models.Service,
queries *sqlc.Queries,
sessionService *sessionpkg.Service,
) *SpawnProvider {
if log == nil {
log = slog.Default()
}
return &SpawnProvider{
settings: settingsSvc,
models: modelsSvc,
queries: queries,
sessionService: sessionService,
logger: log.With(slog.String("tool", "spawn")),
}
}
// SetAgent injects the agent after construction (breaking the DI cycle).
func (p *SpawnProvider) SetAgent(a SpawnAgent) {
p.agent = a
}
// SetMessageService injects an optional message writer for persisting
// subagent conversation history.
func (p *SpawnProvider) SetMessageService(w messagepkg.Writer) {
p.messageService = w
}
// SetSystemPromptFunc injects the function used to generate the system prompt
// (typically agent.GenerateSystemPrompt).
func (p *SpawnProvider) SetSystemPromptFunc(fn func(sessionType string) string) {
p.systemPromptFn = fn
}
func (p *SpawnProvider) Tools(_ context.Context, session SessionContext) ([]sdk.Tool, error) {
if session.IsSubagent || p.agent == nil {
return nil, nil
}
sess := session
return []sdk.Tool{
{
Name: "spawn",
Description: "Spawn one or more subagents to work on tasks in parallel. Each task runs in its own context with file, exec, and web tools. All results are returned together.",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"tasks": map[string]any{
"type": "array",
"description": "List of task instructions. Each string is a self-contained prompt for one subagent.",
"items": map[string]any{"type": "string"},
},
},
"required": []string{"tasks"},
},
Execute: func(ctx *sdk.ToolExecContext, input any) (any, error) {
return p.execSpawn(ctx.Context, sess, inputAsMap(input))
},
},
}, nil
}
type spawnResult struct {
Task string `json:"task"`
SessionID string `json:"session_id,omitempty"`
Text string `json:"text"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
func (p *SpawnProvider) execSpawn(ctx context.Context, session SessionContext, args map[string]any) (any, error) {
botID := strings.TrimSpace(session.BotID)
if botID == "" {
return nil, errors.New("bot_id is required")
}
tasksRaw, ok := args["tasks"]
if !ok {
return nil, errors.New("tasks is required")
}
tasks, err := toStringSlice(tasksRaw)
if err != nil {
return nil, fmt.Errorf("invalid tasks: %w", err)
}
if len(tasks) == 0 {
return nil, errors.New("at least one task is required")
}
sdkModel, modelID, err := p.resolveModel(ctx, botID)
if err != nil {
return nil, fmt.Errorf("resolve model: %w", err)
}
systemPrompt := ""
if p.systemPromptFn != nil {
systemPrompt = p.systemPromptFn(sessionpkg.TypeSubagent)
}
results := make([]spawnResult, len(tasks))
var wg sync.WaitGroup
wg.Add(len(tasks))
for i, task := range tasks {
go func(idx int, query string) {
defer wg.Done()
results[idx] = p.runSubagentTask(ctx, session, sdkModel, modelID, systemPrompt, query)
}(i, task)
}
wg.Wait()
return map[string]any{"results": results}, nil
}
func (p *SpawnProvider) runSubagentTask(
ctx context.Context,
parentSession SessionContext,
model *sdk.Model,
modelID string,
systemPrompt string,
query string,
) spawnResult {
res := spawnResult{Task: query}
var sessionID string
if p.sessionService != nil {
sess, err := p.sessionService.Create(ctx, sessionpkg.CreateInput{
BotID: parentSession.BotID,
Type: sessionpkg.TypeSubagent,
Title: truncateTitle(query, 100),
ParentSessionID: parentSession.SessionID,
})
if err != nil {
p.logger.Warn("failed to create subagent session", slog.Any("error", err))
} else {
sessionID = sess.ID
res.SessionID = sessionID
}
}
cfg := SpawnRunConfig{
Model: model,
System: systemPrompt,
Query: query,
SessionType: sessionpkg.TypeSubagent,
Identity: SpawnIdentity{
BotID: parentSession.BotID,
ChatID: parentSession.ChatID,
SessionID: sessionID,
ChannelIdentityID: parentSession.ChannelIdentityID,
CurrentPlatform: parentSession.CurrentPlatform,
SessionToken: parentSession.SessionToken,
IsSubagent: true,
},
LoopDetection: SpawnLoopConfig{Enabled: true},
}
genResult, err := p.agent.Generate(ctx, cfg)
if err != nil {
res.Error = err.Error()
return res
}
res.Text = genResult.Text
res.Success = true
if p.messageService != nil && sessionID != "" {
p.persistMessages(ctx, parentSession.BotID, sessionID, modelID, query, genResult)
}
return res
}
func (p *SpawnProvider) persistMessages(
ctx context.Context,
botID, sessionID, modelID, query string,
result *SpawnResult,
) {
userContent, _ := json.Marshal(map[string]any{
"role": "user",
"content": query,
})
if _, err := p.messageService.Persist(ctx, messagepkg.PersistInput{
BotID: botID,
SessionID: sessionID,
Role: "user",
Content: userContent,
}); err != nil {
p.logger.Warn("persist subagent user message failed", slog.Any("error", err))
}
for _, msg := range result.Messages {
if msg.Role == sdk.MessageRoleUser {
continue
}
content, err := json.Marshal(msg)
if err != nil {
continue
}
var usage json.RawMessage
if msg.Usage != nil {
usage, _ = json.Marshal(msg.Usage)
}
if _, err := p.messageService.Persist(ctx, messagepkg.PersistInput{
BotID: botID,
SessionID: sessionID,
Role: string(msg.Role),
Content: content,
Usage: usage,
ModelID: modelID,
}); err != nil {
p.logger.Warn("persist subagent message failed", slog.Any("error", err))
}
}
}
// ModelCreator creates an sdk.Model from provider config. Set via SetModelCreator.
type ModelCreator func(modelID, clientType, apiKey, codexAccountID, baseURL string, httpClient *http.Client) *sdk.Model
// SetModelCreator injects the function used to create SDK models
// (typically agent.CreateModel wrapped to match the signature).
func (p *SpawnProvider) SetModelCreator(fn ModelCreator) {
p.modelCreator = fn
}
func (p *SpawnProvider) resolveModel(ctx context.Context, botID string) (*sdk.Model, string, error) {
if p.settings == nil || p.models == nil || p.queries == nil {
return nil, "", errors.New("model resolution services not configured")
}
botSettings, err := p.settings.GetBot(ctx, botID)
if err != nil {
return nil, "", err
}
chatModelID := strings.TrimSpace(botSettings.ChatModelID)
if chatModelID == "" {
return nil, "", errors.New("no chat model configured for bot")
}
modelInfo, err := p.models.GetByID(ctx, chatModelID)
if err != nil {
return nil, "", err
}
provider, err := models.FetchProviderByID(ctx, p.queries, modelInfo.ProviderID)
if err != nil {
return nil, "", err
}
if p.modelCreator == nil {
return nil, "", errors.New("model creator not configured")
}
authResolver := providers.NewService(nil, p.queries, "")
creds, err := authResolver.ResolveModelCredentials(ctx, provider)
if err != nil {
return nil, "", err
}
sdkModel := p.modelCreator(
modelInfo.ModelID,
provider.ClientType,
creds.APIKey,
creds.CodexAccountID,
providers.ProviderConfigString(provider, "base_url"),
nil,
)
return sdkModel, modelInfo.ID, nil
}
func toStringSlice(v any) ([]string, error) {
switch val := v.(type) {
case []string:
return val, nil
case []any:
result := make([]string, 0, len(val))
for _, item := range val {
s, ok := item.(string)
if !ok {
return nil, fmt.Errorf("expected string, got %T", item)
}
result = append(result, s)
}
return result, nil
default:
return nil, fmt.Errorf("expected array, got %T", v)
}
}
func truncateTitle(s string, maxRunes int) string {
s = strings.ReplaceAll(s, "\n", " ")
runes := []rune(s)
if len(runes) <= maxRunes {
return s
}
return string(runes[:maxRunes-3]) + "..."
}