From 3c1ab85349d6c2061e090c12507e69bb24abdfae Mon Sep 17 00:00:00 2001 From: Acbox Date: Thu, 19 Feb 2026 23:41:25 +0800 Subject: [PATCH] Revert "refactor: agent (#60)" This reverts commit 19f87dbae84538cce2e4d81846d96fa26ee33b96. --- cmd/agent/main.go | 728 +++++++++++++++++++++++++++- cmd/agent/modules/channel.go | 75 --- cmd/agent/modules/containerd.go | 80 --- cmd/agent/modules/conversation.go | 73 --- cmd/agent/modules/domain.go | 61 --- cmd/agent/modules/handle.go | 98 ---- cmd/agent/modules/infrastructure.go | 87 ---- cmd/agent/modules/memory.go | 191 -------- cmd/agent/modules/server.go | 141 ------ 9 files changed, 719 insertions(+), 815 deletions(-) delete mode 100644 cmd/agent/modules/channel.go delete mode 100644 cmd/agent/modules/containerd.go delete mode 100644 cmd/agent/modules/conversation.go delete mode 100644 cmd/agent/modules/domain.go delete mode 100644 cmd/agent/modules/handle.go delete mode 100644 cmd/agent/modules/infrastructure.go delete mode 100644 cmd/agent/modules/memory.go delete mode 100644 cmd/agent/modules/server.go diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 79b2da21..4f2d23f7 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -1,12 +1,71 @@ package main import ( + "context" + "errors" + "fmt" + "io" + "io/fs" "log/slog" + "net/http" + "os" + "strings" + "time" + containerd "github.com/containerd/containerd/v2/client" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" "go.uber.org/fx" "go.uber.org/fx/fxevent" + "golang.org/x/crypto/bcrypt" - "github.com/memohai/memoh/cmd/agent/modules" + dbembed "github.com/memohai/memoh/db" + "github.com/memohai/memoh/internal/accounts" + "github.com/memohai/memoh/internal/bind" + "github.com/memohai/memoh/internal/boot" + "github.com/memohai/memoh/internal/bots" + "github.com/memohai/memoh/internal/channel" + "github.com/memohai/memoh/internal/channel/adapters/feishu" + "github.com/memohai/memoh/internal/channel/adapters/local" + "github.com/memohai/memoh/internal/channel/adapters/telegram" + "github.com/memohai/memoh/internal/channel/identities" + "github.com/memohai/memoh/internal/channel/inbound" + "github.com/memohai/memoh/internal/channel/route" + "github.com/memohai/memoh/internal/config" + ctr "github.com/memohai/memoh/internal/containerd" + "github.com/memohai/memoh/internal/conversation" + "github.com/memohai/memoh/internal/conversation/flow" + "github.com/memohai/memoh/internal/db" + dbsqlc "github.com/memohai/memoh/internal/db/sqlc" + "github.com/memohai/memoh/internal/embeddings" + "github.com/memohai/memoh/internal/handlers" + "github.com/memohai/memoh/internal/healthcheck" + channelchecker "github.com/memohai/memoh/internal/healthcheck/checkers/channel" + mcpchecker "github.com/memohai/memoh/internal/healthcheck/checkers/mcp" + "github.com/memohai/memoh/internal/logger" + "github.com/memohai/memoh/internal/mcp" + mcpcontainer "github.com/memohai/memoh/internal/mcp/providers/container" + mcpdirectory "github.com/memohai/memoh/internal/mcp/providers/directory" + mcpmemory "github.com/memohai/memoh/internal/mcp/providers/memory" + mcpmessage "github.com/memohai/memoh/internal/mcp/providers/message" + mcpschedule "github.com/memohai/memoh/internal/mcp/providers/schedule" + mcpweb "github.com/memohai/memoh/internal/mcp/providers/web" + mcpfederation "github.com/memohai/memoh/internal/mcp/sources/federation" + "github.com/memohai/memoh/internal/media" + "github.com/memohai/memoh/internal/memory" + "github.com/memohai/memoh/internal/message" + "github.com/memohai/memoh/internal/message/event" + "github.com/memohai/memoh/internal/models" + "github.com/memohai/memoh/internal/policy" + "github.com/memohai/memoh/internal/preauth" + "github.com/memohai/memoh/internal/providers" + "github.com/memohai/memoh/internal/schedule" + "github.com/memohai/memoh/internal/searchproviders" + "github.com/memohai/memoh/internal/server" + "github.com/memohai/memoh/internal/settings" + "github.com/memohai/memoh/internal/storage/providers/containerfs" + "github.com/memohai/memoh/internal/subagent" + "github.com/memohai/memoh/internal/version" ) func migrationsFS() fs.FS { @@ -65,17 +124,668 @@ func runMigrate(args []string) { func runServe() { fx.New( - modules.InfraModule, - modules.DomainModule, - modules.MemoryModule, - modules.ChannelModule, - modules.ConversationModule, - modules.ContainerdModule, - modules.HandlersModule, - modules.ServerModule, + fx.Provide( + provideConfig, + boot.ProvideRuntimeConfig, + provideLogger, + provideContainerdClient, + provideDBConn, + provideDBQueries, + // containerd & mcp infrastructure + fx.Annotate(ctr.NewDefaultService, fx.As(new(ctr.Service))), + provideMCPManager, + + // memory pipeline + provideMemoryLLM, + provideEmbeddingsResolver, + provideEmbeddingSetup, + provideTextEmbedderForMemory, + provideQdrantStore, + memory.NewBM25Indexer, + provideMemoryService, + + // domain services (auto-wired) + models.NewService, + bots.NewService, + accounts.NewService, + settings.NewService, + providers.NewService, + searchproviders.NewService, + policy.NewService, + preauth.NewService, + mcp.NewConnectionService, + subagent.NewService, + conversation.NewService, + identities.NewService, + bind.NewService, + event.NewHub, + + // services requiring provide functions + provideRouteService, + provideMessageService, + provideMediaService, + + // channel infrastructure + local.NewRouteHub, + provideChannelRegistry, + channel.NewStore, + provideChannelRouter, + provideChannelManager, + provideChannelLifecycleService, + + // conversation flow + provideChatResolver, + provideScheduleTriggerer, + schedule.NewService, + + // containerd handler & tool gateway + provideContainerdHandler, + provideToolGatewayService, + + // http handlers (group:"server_handlers") + provideServerHandler(handlers.NewPingHandler), + provideServerHandler(provideAuthHandler), + provideServerHandler(provideMemoryHandler), + provideServerHandler(handlers.NewEmbeddingsHandler), + provideServerHandler(provideMessageHandler), + provideServerHandler(handlers.NewSwaggerHandler), + provideServerHandler(handlers.NewProvidersHandler), + provideServerHandler(handlers.NewSearchProvidersHandler), + provideServerHandler(handlers.NewModelsHandler), + provideServerHandler(handlers.NewSettingsHandler), + provideServerHandler(handlers.NewPreauthHandler), + provideServerHandler(handlers.NewBindHandler), + provideServerHandler(handlers.NewScheduleHandler), + provideServerHandler(handlers.NewSubagentHandler), + provideServerHandler(handlers.NewChannelHandler), + provideServerHandler(provideUsersHandler), + provideServerHandler(handlers.NewMCPHandler), + provideServerHandler(provideCLIHandler), + provideServerHandler(provideWebHandler), + + provideServer, + ), + fx.Invoke( + startMemoryWarmup, + startScheduleService, + startChannelManager, + startContainerReconciliation, + startServer, + ), fx.WithLogger(func(logger *slog.Logger) fxevent.Logger { return &fxevent.SlogLogger{Logger: logger.With(slog.String("component", "fx"))} }), ).Run() } + +// --------------------------------------------------------------------------- +// fx helper +// --------------------------------------------------------------------------- + +func provideServerHandler(fn any) any { + return fx.Annotate( + fn, + fx.As(new(server.Handler)), + fx.ResultTags(`group:"server_handlers"`), + ) +} + +// --------------------------------------------------------------------------- +// infrastructure providers +// --------------------------------------------------------------------------- + +func provideConfig() (config.Config, error) { + cfgPath := os.Getenv("CONFIG_PATH") + cfg, err := config.Load(cfgPath) + if err != nil { + return config.Config{}, fmt.Errorf("load config: %w", err) + } + return cfg, nil +} + +func provideLogger(cfg config.Config) *slog.Logger { + logger.Init(cfg.Log.Level, cfg.Log.Format) + return logger.L +} + +func provideContainerdClient(lc fx.Lifecycle, rc *boot.RuntimeConfig) (*containerd.Client, error) { + factory := ctr.DefaultClientFactory{SocketPath: rc.ContainerdSocketPath} + client, err := factory.New(context.Background()) + if err != nil { + return nil, fmt.Errorf("connect containerd: %w", err) + } + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return client.Close() + }, + }) + return client, nil +} + +func provideDBConn(lc fx.Lifecycle, cfg config.Config) (*pgxpool.Pool, error) { + conn, err := db.Open(context.Background(), cfg.Postgres) + if err != nil { + return nil, fmt.Errorf("db connect: %w", err) + } + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + conn.Close() + return nil + }, + }) + return conn, nil +} + +func provideDBQueries(conn *pgxpool.Pool) *dbsqlc.Queries { + return dbsqlc.New(conn) +} + +func provideMCPManager(log *slog.Logger, service ctr.Service, cfg config.Config, conn *pgxpool.Pool) *mcp.Manager { + return mcp.NewManager(log, service, cfg.MCP, cfg.Containerd.Namespace, conn) +} + +// --------------------------------------------------------------------------- +// memory providers +// --------------------------------------------------------------------------- + +func provideMemoryLLM(modelsService *models.Service, queries *dbsqlc.Queries, log *slog.Logger) memory.LLM { + return &lazyLLMClient{ + modelsService: modelsService, + queries: queries, + timeout: 30 * time.Second, + logger: log, + } +} + +func provideEmbeddingsResolver(log *slog.Logger, modelsService *models.Service, queries *dbsqlc.Queries) *embeddings.Resolver { + return embeddings.NewResolver(log, modelsService, queries, 10*time.Second) +} + +type embeddingSetup struct { + Vectors map[string]int + TextModel models.GetResponse + MultimodalModel models.GetResponse + HasEmbeddingModels bool +} + +func provideEmbeddingSetup(log *slog.Logger, modelsService *models.Service) (embeddingSetup, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + vectors, textModel, multimodalModel, hasEmbeddingModels, err := embeddings.CollectEmbeddingVectors(ctx, modelsService) + if err != nil { + return embeddingSetup{}, fmt.Errorf("embedding models: %w", err) + } + if hasEmbeddingModels && multimodalModel.ModelID == "" { + log.Warn("No multimodal embedding model configured. Multimodal embedding features will be limited.") + } + return embeddingSetup{ + Vectors: vectors, + TextModel: textModel, + MultimodalModel: multimodalModel, + HasEmbeddingModels: hasEmbeddingModels, + }, nil +} + +func provideTextEmbedderForMemory(resolver *embeddings.Resolver, setup embeddingSetup, log *slog.Logger) embeddings.Embedder { + return buildTextEmbedder(resolver, setup.TextModel, setup.HasEmbeddingModels, log) +} + +func provideQdrantStore(log *slog.Logger, cfg config.Config, setup embeddingSetup) (*memory.QdrantStore, error) { + qcfg := cfg.Qdrant + timeout := time.Duration(qcfg.TimeoutSeconds) * time.Second + if setup.HasEmbeddingModels && len(setup.Vectors) > 0 { + store, err := memory.NewQdrantStoreWithVectors(log, qcfg.BaseURL, qcfg.APIKey, qcfg.Collection, setup.Vectors, "sparse_hash", timeout) + if err != nil { + return nil, fmt.Errorf("qdrant named vectors init: %w", err) + } + return store, nil + } + store, err := memory.NewQdrantStore(log, qcfg.BaseURL, qcfg.APIKey, qcfg.Collection, setup.TextModel.Dimensions, "sparse_hash", timeout) + if err != nil { + return nil, fmt.Errorf("qdrant init: %w", err) + } + return store, nil +} + +func provideMemoryService(log *slog.Logger, llm memory.LLM, embedder embeddings.Embedder, store *memory.QdrantStore, resolver *embeddings.Resolver, bm25 *memory.BM25Indexer, setup embeddingSetup) *memory.Service { + return memory.NewService(log, llm, embedder, store, resolver, bm25, setup.TextModel.ModelID, setup.MultimodalModel.ModelID) +} + +// --------------------------------------------------------------------------- +// domain service providers (interface adapters) +// --------------------------------------------------------------------------- + +func provideRouteService(log *slog.Logger, queries *dbsqlc.Queries, chatService *conversation.Service) *route.DBService { + return route.NewService(log, queries, chatService) +} + +func provideMessageService(log *slog.Logger, queries *dbsqlc.Queries, hub *event.Hub) *message.DBService { + return message.NewService(log, queries, hub) +} + +func provideScheduleTriggerer(resolver *flow.Resolver) schedule.Triggerer { + return flow.NewScheduleGateway(resolver) +} + +// --------------------------------------------------------------------------- +// conversation flow +// --------------------------------------------------------------------------- + +func provideChatResolver(log *slog.Logger, cfg config.Config, modelsService *models.Service, queries *dbsqlc.Queries, memoryService *memory.Service, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler) *flow.Resolver { + resolver := flow.NewResolver(log, modelsService, queries, memoryService, chatService, msgService, settingsService, cfg.AgentGateway.BaseURL(), 120*time.Second) + resolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler}) + resolver.SetGatewayAssetLoader(&gatewayAssetLoaderAdapter{media: mediaService}) + return resolver +} + +// --------------------------------------------------------------------------- +// channel providers +// --------------------------------------------------------------------------- + +func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub, mediaService *media.Service) *channel.Registry { + registry := channel.NewRegistry() + tgAdapter := telegram.NewTelegramAdapter(log) + tgAdapter.SetAssetOpener(mediaService) + registry.MustRegister(tgAdapter) + registry.MustRegister(feishu.NewFeishuAdapter(log)) + registry.MustRegister(local.NewCLIAdapter(hub)) + registry.MustRegister(local.NewWebAdapter(hub)) + return registry +} + +func provideChannelRouter( + log *slog.Logger, + registry *channel.Registry, + hub *local.RouteHub, + routeService *route.DBService, + msgService *message.DBService, + resolver *flow.Resolver, + identityService *identities.Service, + botService *bots.Service, + policyService *policy.Service, + preauthService *preauth.Service, + bindService *bind.Service, + mediaService *media.Service, + rc *boot.RuntimeConfig, +) *inbound.ChannelInboundProcessor { + processor := inbound.NewChannelInboundProcessor(log, registry, routeService, msgService, resolver, identityService, botService, policyService, preauthService, bindService, rc.JwtSecret, 5*time.Minute) + processor.SetMediaService(mediaService) + processor.SetStreamObserver(local.NewRouteHubBroadcaster(hub)) + return processor +} + +func provideChannelManager(log *slog.Logger, registry *channel.Registry, channelStore *channel.Store, channelRouter *inbound.ChannelInboundProcessor) *channel.Manager { + mgr := channel.NewManager(log, registry, channelStore, channelRouter) + if mw := channelRouter.IdentityMiddleware(); mw != nil { + mgr.Use(mw) + } + return mgr +} + +func provideChannelLifecycleService(channelStore *channel.Store, channelManager *channel.Manager) *channel.Lifecycle { + return channel.NewLifecycle(channelStore, channelManager) +} + +// --------------------------------------------------------------------------- +// containerd handler & tool gateway +// --------------------------------------------------------------------------- + +func provideContainerdHandler(log *slog.Logger, service ctr.Service, manager *mcp.Manager, cfg config.Config, botService *bots.Service, accountService *accounts.Service, policyService *policy.Service, queries *dbsqlc.Queries) *handlers.ContainerdHandler { + return handlers.NewContainerdHandler(log, service, manager, cfg.MCP, cfg.Containerd.Namespace, botService, accountService, policyService, queries) +} + +func provideToolGatewayService(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, channelStore *channel.Store, scheduleService *schedule.Service, memoryService *memory.Service, chatService *conversation.Service, accountService *accounts.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *mcp.Manager, containerdHandler *handlers.ContainerdHandler, mcpConnService *mcp.ConnectionService) *mcp.ToolGatewayService { + messageExec := mcpmessage.NewExecutor(log, channelManager, channelManager, registry) + directoryExec := mcpdirectory.NewExecutor(log, registry, channelStore, registry) + scheduleExec := mcpschedule.NewExecutor(log, scheduleService) + memoryExec := mcpmemory.NewExecutor(log, memoryService, chatService, accountService) + webExec := mcpweb.NewExecutor(log, settingsService, searchProviderService) + execWorkDir := cfg.MCP.DataMount + if strings.TrimSpace(execWorkDir) == "" { + execWorkDir = config.DefaultDataMount + } + fsExec := mcpcontainer.NewExecutor(log, manager, execWorkDir) + + fedGateway := handlers.NewMCPFederationGateway(log, containerdHandler) + fedSource := mcpfederation.NewSource(log, fedGateway, mcpConnService) + + svc := mcp.NewToolGatewayService( + log, + []mcp.ToolExecutor{messageExec, directoryExec, scheduleExec, memoryExec, webExec, fsExec}, + []mcp.ToolSource{fedSource}, + ) + containerdHandler.SetToolGatewayService(svc) + return svc +} + +// --------------------------------------------------------------------------- +// handler providers (interface adaptation / config extraction) +// --------------------------------------------------------------------------- + +func provideMemoryHandler(log *slog.Logger, service *memory.Service, chatService *conversation.Service, accountService *accounts.Service, cfg config.Config, manager *mcp.Manager) *handlers.MemoryHandler { + h := handlers.NewMemoryHandler(log, service, chatService, accountService) + if manager != nil { + execWorkDir := cfg.MCP.DataMount + if strings.TrimSpace(execWorkDir) == "" { + execWorkDir = config.DefaultDataMount + } + h.SetMemoryFS(memory.NewMemoryFS(log, manager, execWorkDir)) + } + return h +} + +func provideAuthHandler(log *slog.Logger, accountService *accounts.Service, rc *boot.RuntimeConfig) *handlers.AuthHandler { + return handlers.NewAuthHandler(log, accountService, rc.JwtSecret, rc.JwtExpiresIn) +} + +func provideMessageHandler(log *slog.Logger, chatService *conversation.Service, msgService *message.DBService, mediaService *media.Service, botService *bots.Service, accountService *accounts.Service, hub *event.Hub) *handlers.MessageHandler { + h := handlers.NewMessageHandler(log, chatService, msgService, botService, accountService, hub) + h.SetMediaService(mediaService) + return h +} + +func provideMediaService(log *slog.Logger, cfg config.Config) (*media.Service, error) { + dataRoot := strings.TrimSpace(cfg.MCP.DataRoot) + if dataRoot == "" { + dataRoot = config.DefaultDataRoot + } + provider, err := containerfs.New(dataRoot) + if err != nil { + return nil, fmt.Errorf("init media provider: %w", err) + } + return media.NewService(log, provider), nil +} + +func provideUsersHandler(log *slog.Logger, accountService *accounts.Service, identityService *identities.Service, botService *bots.Service, routeService *route.DBService, channelStore *channel.Store, channelLifecycle *channel.Lifecycle, channelManager *channel.Manager, registry *channel.Registry) *handlers.UsersHandler { + return handlers.NewUsersHandler(log, accountService, identityService, botService, routeService, channelStore, channelLifecycle, channelManager, registry) +} + +func provideCLIHandler(channelManager *channel.Manager, channelStore *channel.Store, chatService *conversation.Service, hub *local.RouteHub, botService *bots.Service, accountService *accounts.Service) *handlers.LocalChannelHandler { + return handlers.NewLocalChannelHandler(local.CLIType, channelManager, channelStore, chatService, hub, botService, accountService) +} + +func provideWebHandler(channelManager *channel.Manager, channelStore *channel.Store, chatService *conversation.Service, hub *local.RouteHub, botService *bots.Service, accountService *accounts.Service) *handlers.LocalChannelHandler { + return handlers.NewLocalChannelHandler(local.WebType, channelManager, channelStore, chatService, hub, botService, accountService) +} + +// --------------------------------------------------------------------------- +// server +// --------------------------------------------------------------------------- + +type serverParams struct { + fx.In + + Logger *slog.Logger + RuntimeConfig *boot.RuntimeConfig + Config config.Config + ServerHandlers []server.Handler `group:"server_handlers"` + ContainerdHandler *handlers.ContainerdHandler +} + +func provideServer(params serverParams) *server.Server { + allHandlers := make([]server.Handler, 0, len(params.ServerHandlers)+1) + allHandlers = append(allHandlers, params.ServerHandlers...) + allHandlers = append(allHandlers, params.ContainerdHandler) + return server.NewServer(params.Logger, params.RuntimeConfig.ServerAddr, params.Config.Auth.JWTSecret, allHandlers...) +} + +// --------------------------------------------------------------------------- +// lifecycle hooks +// --------------------------------------------------------------------------- + +func startMemoryWarmup(lc fx.Lifecycle, memoryService *memory.Service, logger *slog.Logger) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + go func() { + if err := memoryService.WarmupBM25(context.Background(), 200); err != nil { + logger.Warn("bm25 warmup failed", slog.Any("error", err)) + } + }() + return nil + }, + }) +} + +func startScheduleService(lc fx.Lifecycle, scheduleService *schedule.Service) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return scheduleService.Bootstrap(ctx) + }, + }) +} + +func startChannelManager(lc fx.Lifecycle, channelManager *channel.Manager) { + ctx, cancel := context.WithCancel(context.Background()) + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + channelManager.Start(ctx) + return nil + }, + OnStop: func(stopCtx context.Context) error { + cancel() + return channelManager.Shutdown(stopCtx) + }, + }) +} + +func startContainerReconciliation(lc fx.Lifecycle, containerdHandler *handlers.ContainerdHandler, _ *mcp.ToolGatewayService) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + go containerdHandler.ReconcileContainers(ctx) + return nil + }, + }) +} + +func startServer(lc fx.Lifecycle, logger *slog.Logger, srv *server.Server, shutdowner fx.Shutdowner, cfg config.Config, queries *dbsqlc.Queries, botService *bots.Service, containerdHandler *handlers.ContainerdHandler, mcpConnService *mcp.ConnectionService, toolGateway *mcp.ToolGatewayService, channelManager *channel.Manager) { + fmt.Printf("Starting Memoh Agent %s\n", version.GetInfo()) + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + if err := ensureAdminUser(ctx, logger, queries, cfg); err != nil { + return err + } + botService.SetContainerLifecycle(containerdHandler) + botService.AddRuntimeChecker(healthcheck.NewRuntimeCheckerAdapter( + mcpchecker.NewChecker(logger, mcpConnService, toolGateway), + )) + botService.AddRuntimeChecker(healthcheck.NewRuntimeCheckerAdapter( + channelchecker.NewChecker(logger, channelManager), + )) + + go func() { + if err := srv.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Error("server failed", slog.Any("error", err)) + _ = shutdowner.Shutdown() + } + }() + return nil + }, + OnStop: func(ctx context.Context) error { + if err := srv.Stop(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { + return fmt.Errorf("server stop: %w", err) + } + return nil + }, + }) +} + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + +func buildTextEmbedder(resolver *embeddings.Resolver, textModel models.GetResponse, hasModels bool, log *slog.Logger) embeddings.Embedder { + if !hasModels { + return nil + } + if textModel.ModelID == "" || textModel.Dimensions <= 0 { + log.Warn("No text embedding model configured. Text embedding features will be limited.") + return nil + } + return &embeddings.ResolverTextEmbedder{ + Resolver: resolver, + ModelID: textModel.ModelID, + Dims: textModel.Dimensions, + } +} + +func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Queries, cfg config.Config) error { + if queries == nil { + return fmt.Errorf("db queries not configured") + } + count, err := queries.CountAccounts(ctx) + if err != nil { + return err + } + if count > 0 { + return nil + } + + username := strings.TrimSpace(cfg.Admin.Username) + password := strings.TrimSpace(cfg.Admin.Password) + email := strings.TrimSpace(cfg.Admin.Email) + if username == "" || password == "" { + return fmt.Errorf("admin username/password required in config.toml") + } + if password == "change-your-password-here" { + log.Warn("admin password uses default placeholder; please update config.toml") + } + + hashed, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) + if err != nil { + return err + } + + user, err := queries.CreateUser(ctx, dbsqlc.CreateUserParams{ + IsActive: true, + Metadata: []byte("{}"), + }) + if err != nil { + return fmt.Errorf("create admin user: %w", err) + } + + emailValue := pgtype.Text{Valid: false} + if email != "" { + emailValue = pgtype.Text{String: email, Valid: true} + } + displayName := pgtype.Text{String: username, Valid: true} + dataRoot := pgtype.Text{String: cfg.MCP.DataRoot, Valid: cfg.MCP.DataRoot != ""} + + _, err = queries.CreateAccount(ctx, dbsqlc.CreateAccountParams{ + UserID: user.ID, + Username: pgtype.Text{String: username, Valid: true}, + Email: emailValue, + PasswordHash: pgtype.Text{String: string(hashed), Valid: true}, + Role: "admin", + DisplayName: displayName, + AvatarUrl: pgtype.Text{Valid: false}, + IsActive: true, + DataRoot: dataRoot, + }) + if err != nil { + return err + } + log.Info("Admin user created", slog.String("username", username)) + return nil +} + +// --------------------------------------------------------------------------- +// lazy LLM client +// --------------------------------------------------------------------------- + +type lazyLLMClient struct { + modelsService *models.Service + queries *dbsqlc.Queries + timeout time.Duration + logger *slog.Logger +} + +func (c *lazyLLMClient) Extract(ctx context.Context, req memory.ExtractRequest) (memory.ExtractResponse, error) { + client, err := c.resolve(ctx) + if err != nil { + return memory.ExtractResponse{}, err + } + return client.Extract(ctx, req) +} + +func (c *lazyLLMClient) Decide(ctx context.Context, req memory.DecideRequest) (memory.DecideResponse, error) { + client, err := c.resolve(ctx) + if err != nil { + return memory.DecideResponse{}, err + } + return client.Decide(ctx, req) +} + +func (c *lazyLLMClient) Compact(ctx context.Context, req memory.CompactRequest) (memory.CompactResponse, error) { + client, err := c.resolve(ctx) + if err != nil { + return memory.CompactResponse{}, err + } + return client.Compact(ctx, req) +} + +func (c *lazyLLMClient) DetectLanguage(ctx context.Context, text string) (string, error) { + client, err := c.resolve(ctx) + if err != nil { + return "", err + } + return client.DetectLanguage(ctx, text) +} + +func (c *lazyLLMClient) resolve(ctx context.Context) (memory.LLM, error) { + if c.modelsService == nil || c.queries == nil { + return nil, fmt.Errorf("models service not configured") + } + botID := memory.BotIDFromContext(ctx) + memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, botID) + if err != nil { + return nil, err + } + clientType := string(memoryModel.ClientType) + switch clientType { + case "openai-responses", "openai-completions", "anthropic-messages", "google-generative-ai": + default: + return nil, fmt.Errorf("memory model client type not supported: %s", clientType) + } + return memory.NewLLMClient(c.logger, memoryProvider.BaseUrl, memoryProvider.ApiKey, memoryModel.ModelID, c.timeout) +} + +// skillLoaderAdapter bridges handlers.ContainerdHandler to flow.SkillLoader. +type skillLoaderAdapter struct { + handler *handlers.ContainerdHandler +} + +func (a *skillLoaderAdapter) LoadSkills(ctx context.Context, botID string) ([]flow.SkillEntry, error) { + items, err := a.handler.LoadSkills(ctx, botID) + if err != nil { + return nil, err + } + entries := make([]flow.SkillEntry, len(items)) + for i, item := range items { + entries[i] = flow.SkillEntry{ + Name: item.Name, + Description: item.Description, + Content: item.Content, + Metadata: item.Metadata, + } + } + return entries, nil +} + +// gatewayAssetLoaderAdapter bridges media service to flow gateway asset loader. +type gatewayAssetLoaderAdapter struct { + media *media.Service +} + +func (a *gatewayAssetLoaderAdapter) OpenForGateway(ctx context.Context, botID, contentHash string) (io.ReadCloser, string, error) { + if a == nil || a.media == nil { + return nil, "", fmt.Errorf("media service not configured") + } + reader, asset, err := a.media.Open(ctx, botID, contentHash) + if err != nil { + return nil, "", err + } + return reader, strings.TrimSpace(asset.Mime), nil +} diff --git a/cmd/agent/modules/channel.go b/cmd/agent/modules/channel.go deleted file mode 100644 index 867ff75a..00000000 --- a/cmd/agent/modules/channel.go +++ /dev/null @@ -1,75 +0,0 @@ -package modules - -import ( - "context" - "log/slog" - "time" - - "github.com/memohai/memoh/internal/bind" - "github.com/memohai/memoh/internal/boot" - "github.com/memohai/memoh/internal/bots" - "github.com/memohai/memoh/internal/channel" - "github.com/memohai/memoh/internal/channel/adapters/feishu" - "github.com/memohai/memoh/internal/channel/adapters/local" - "github.com/memohai/memoh/internal/channel/adapters/telegram" - "github.com/memohai/memoh/internal/channel/identities" - "github.com/memohai/memoh/internal/channel/inbound" - "github.com/memohai/memoh/internal/channel/route" - "github.com/memohai/memoh/internal/conversation/flow" - "github.com/memohai/memoh/internal/message" - "github.com/memohai/memoh/internal/policy" - "github.com/memohai/memoh/internal/preauth" - "go.uber.org/fx" -) - -var ChannelModule = fx.Module( - "channel", - fx.Provide( - local.NewRouteHub, - channel.NewService, - provideChannelRegistry, - provideChannelRouter, - provideChannelManager, - ), - fx.Invoke(startChannelManager), -) - -// --------------------------------------------------------------------------- -// channel providers -// --------------------------------------------------------------------------- - -func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub) *channel.Registry { - registry := channel.NewRegistry() - registry.MustRegister(telegram.NewTelegramAdapter(log)) - registry.MustRegister(feishu.NewFeishuAdapter(log)) - registry.MustRegister(local.NewCLIAdapter(hub)) - registry.MustRegister(local.NewWebAdapter(hub)) - return registry -} - -func provideChannelRouter(log *slog.Logger, registry *channel.Registry, routeService *route.DBService, msgService *message.DBService, resolver *flow.Resolver, identityService *identities.Service, botService *bots.Service, policyService *policy.Service, preauthService *preauth.Service, bindService *bind.Service, rc *boot.RuntimeConfig) *inbound.ChannelInboundProcessor { - return inbound.NewChannelInboundProcessor(log, registry, routeService, msgService, resolver, identityService, botService, policyService, preauthService, bindService, rc.JwtSecret, 5*time.Minute) -} - -func provideChannelManager(log *slog.Logger, registry *channel.Registry, channelService *channel.Service, channelRouter *inbound.ChannelInboundProcessor) *channel.Manager { - mgr := channel.NewManager(log, registry, channelService, channelRouter) - if mw := channelRouter.IdentityMiddleware(); mw != nil { - mgr.Use(mw) - } - return mgr -} - -func startChannelManager(lc fx.Lifecycle, channelManager *channel.Manager) { - ctx, cancel := context.WithCancel(context.Background()) - lc.Append(fx.Hook{ - OnStart: func(_ context.Context) error { - channelManager.Start(ctx) - return nil - }, - OnStop: func(stopCtx context.Context) error { - cancel() - return channelManager.Shutdown(stopCtx) - }, - }) -} - diff --git a/cmd/agent/modules/containerd.go b/cmd/agent/modules/containerd.go deleted file mode 100644 index b4a6bf98..00000000 --- a/cmd/agent/modules/containerd.go +++ /dev/null @@ -1,80 +0,0 @@ -package modules - -import ( - "context" - "log/slog" - "strings" - - "github.com/memohai/memoh/internal/accounts" - "github.com/memohai/memoh/internal/bots" - "github.com/memohai/memoh/internal/channel" - "github.com/memohai/memoh/internal/config" - ctr "github.com/memohai/memoh/internal/containerd" - "github.com/memohai/memoh/internal/conversation" - dbsqlc "github.com/memohai/memoh/internal/db/sqlc" - "github.com/memohai/memoh/internal/handlers" - "github.com/memohai/memoh/internal/mcp" - mcpcontainer "github.com/memohai/memoh/internal/mcp/providers/container" - mcpdirectory "github.com/memohai/memoh/internal/mcp/providers/directory" - mcpmemory "github.com/memohai/memoh/internal/mcp/providers/memory" - mcpmessage "github.com/memohai/memoh/internal/mcp/providers/message" - mcpschedule "github.com/memohai/memoh/internal/mcp/providers/schedule" - mcpweb "github.com/memohai/memoh/internal/mcp/providers/web" - mcpfederation "github.com/memohai/memoh/internal/mcp/sources/federation" - "github.com/memohai/memoh/internal/memory" - "github.com/memohai/memoh/internal/policy" - "github.com/memohai/memoh/internal/schedule" - "github.com/memohai/memoh/internal/searchproviders" - "github.com/memohai/memoh/internal/settings" - "go.uber.org/fx" -) - -var ContainerdModule = fx.Module( - "containerd", - fx.Provide( - provideContainerdHandler, - provideToolGatewayService, - ), - fx.Invoke(startContainerReconciliation), -) - -// --------------------------------------------------------------------------- -// containerd handler & tool gateway -// --------------------------------------------------------------------------- - -func provideContainerdHandler(log *slog.Logger, service ctr.Service, cfg config.Config, botService *bots.Service, accountService *accounts.Service, policyService *policy.Service, queries *dbsqlc.Queries) *handlers.ContainerdHandler { - return handlers.NewContainerdHandler(log, service, cfg.MCP, cfg.Containerd.Namespace, botService, accountService, policyService, queries) -} - -func provideToolGatewayService(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, channelService *channel.Service, scheduleService *schedule.Service, memoryService *memory.Service, chatService *conversation.Service, accountService *accounts.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *mcp.Manager, containerdHandler *handlers.ContainerdHandler, mcpConnService *mcp.ConnectionService) *mcp.ToolGatewayService { - messageExec := mcpmessage.NewExecutor(log, channelManager, channelManager, registry) - directoryExec := mcpdirectory.NewExecutor(log, registry, channelService, registry) - scheduleExec := mcpschedule.NewExecutor(log, scheduleService) - memoryExec := mcpmemory.NewExecutor(log, memoryService, chatService, accountService) - webExec := mcpweb.NewExecutor(log, settingsService, searchProviderService) - execWorkDir := cfg.MCP.DataMount - if strings.TrimSpace(execWorkDir) == "" { - execWorkDir = config.DefaultDataMount - } - fsExec := mcpcontainer.NewExecutor(log, manager, execWorkDir) - - fedGateway := handlers.NewMCPFederationGateway(log, containerdHandler) - fedSource := mcpfederation.NewSource(log, fedGateway, mcpConnService) - - svc := mcp.NewToolGatewayService( - log, - []mcp.ToolExecutor{messageExec, directoryExec, scheduleExec, memoryExec, webExec, fsExec}, - []mcp.ToolSource{fedSource}, - ) - containerdHandler.SetToolGatewayService(svc) - return svc -} - -func startContainerReconciliation(lc fx.Lifecycle, containerdHandler *handlers.ContainerdHandler, _ *mcp.ToolGatewayService) { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - go containerdHandler.ReconcileContainers(ctx) - return nil - }, - }) -} diff --git a/cmd/agent/modules/conversation.go b/cmd/agent/modules/conversation.go deleted file mode 100644 index d94f64b8..00000000 --- a/cmd/agent/modules/conversation.go +++ /dev/null @@ -1,73 +0,0 @@ -package modules - -import ( - "context" - "log/slog" - "time" - - "github.com/memohai/memoh/internal/config" - "github.com/memohai/memoh/internal/conversation" - "github.com/memohai/memoh/internal/conversation/flow" - dbsqlc "github.com/memohai/memoh/internal/db/sqlc" - "github.com/memohai/memoh/internal/handlers" - "github.com/memohai/memoh/internal/memory" - "github.com/memohai/memoh/internal/message" - "github.com/memohai/memoh/internal/models" - "github.com/memohai/memoh/internal/schedule" - "github.com/memohai/memoh/internal/settings" - "go.uber.org/fx" -) - -var ConversationModule = fx.Module( - "conversation", - fx.Provide( - provideChatResolver, - provideScheduleTriggerer, - schedule.NewService, - ), - fx.Invoke(startScheduleService), -) - -// --------------------------------------------------------------------------- -// conversation flow -// --------------------------------------------------------------------------- - -func provideChatResolver(log *slog.Logger, cfg config.Config, modelsService *models.Service, queries *dbsqlc.Queries, memoryService *memory.Service, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, containerdHandler *handlers.ContainerdHandler) *flow.Resolver { - resolver := flow.NewResolver(log, modelsService, queries, memoryService, chatService, msgService, settingsService, cfg.AgentGateway.BaseURL(), 120*time.Second) - resolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler}) - return resolver -} - -func provideScheduleTriggerer(resolver *flow.Resolver) schedule.Triggerer { - return flow.NewScheduleGateway(resolver) -} - -func startScheduleService(lc fx.Lifecycle, scheduleService *schedule.Service) { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - return scheduleService.Bootstrap(ctx) - }, - }) -} - -// skillLoaderAdapter bridges handlers.ContainerdHandler to flow.SkillLoader. -type skillLoaderAdapter struct { - handler *handlers.ContainerdHandler -} - -func (a *skillLoaderAdapter) LoadSkills(ctx context.Context, botID string) ([]flow.SkillEntry, error) { - items, err := a.handler.LoadSkills(ctx, botID) - if err != nil { - return nil, err - } - entries := make([]flow.SkillEntry, len(items)) - for i, item := range items { - entries[i] = flow.SkillEntry{ - Name: item.Name, - Description: item.Description, - Content: item.Content, - Metadata: item.Metadata, - } - } - return entries, nil -} diff --git a/cmd/agent/modules/domain.go b/cmd/agent/modules/domain.go deleted file mode 100644 index 9f971fab..00000000 --- a/cmd/agent/modules/domain.go +++ /dev/null @@ -1,61 +0,0 @@ -package modules - -import ( - "log/slog" - - "github.com/memohai/memoh/internal/accounts" - "github.com/memohai/memoh/internal/bind" - "github.com/memohai/memoh/internal/bots" - "github.com/memohai/memoh/internal/channel/identities" - "github.com/memohai/memoh/internal/channel/route" - "github.com/memohai/memoh/internal/conversation" - dbsqlc "github.com/memohai/memoh/internal/db/sqlc" - "github.com/memohai/memoh/internal/mcp" - "github.com/memohai/memoh/internal/message" - "github.com/memohai/memoh/internal/message/event" - "github.com/memohai/memoh/internal/models" - "github.com/memohai/memoh/internal/policy" - "github.com/memohai/memoh/internal/preauth" - "github.com/memohai/memoh/internal/providers" - "github.com/memohai/memoh/internal/searchproviders" - "github.com/memohai/memoh/internal/settings" - "github.com/memohai/memoh/internal/subagent" - "go.uber.org/fx" -) - - -var DomainModule = fx.Module( - "domain", - fx.Provide( - models.NewService, - bots.NewService, - accounts.NewService, - settings.NewService, - providers.NewService, - searchproviders.NewService, - policy.NewService, - preauth.NewService, - mcp.NewConnectionService, - subagent.NewService, - conversation.NewService, - identities.NewService, - bind.NewService, - event.NewHub, - - provideRouteService, - provideMessageService, - ), -) - -// --------------------------------------------------------------------------- -// domain service providers (interface adapters) -// --------------------------------------------------------------------------- - -func provideRouteService(log *slog.Logger, queries *dbsqlc.Queries, chatService *conversation.Service) *route.DBService { - return route.NewService(log, queries, chatService) -} - -func provideMessageService(log *slog.Logger, queries *dbsqlc.Queries, hub *event.Hub) *message.DBService { - return message.NewService(log, queries, hub) -} - diff --git a/cmd/agent/modules/handle.go b/cmd/agent/modules/handle.go deleted file mode 100644 index 6b9d5efb..00000000 --- a/cmd/agent/modules/handle.go +++ /dev/null @@ -1,98 +0,0 @@ -package modules - -import ( - "log/slog" - "strings" - - "github.com/memohai/memoh/internal/accounts" - "github.com/memohai/memoh/internal/boot" - "github.com/memohai/memoh/internal/bots" - "github.com/memohai/memoh/internal/channel" - "github.com/memohai/memoh/internal/channel/adapters/local" - "github.com/memohai/memoh/internal/channel/identities" - "github.com/memohai/memoh/internal/channel/route" - "github.com/memohai/memoh/internal/config" - "github.com/memohai/memoh/internal/conversation" - "github.com/memohai/memoh/internal/conversation/flow" - "github.com/memohai/memoh/internal/handlers" - "github.com/memohai/memoh/internal/mcp" - "github.com/memohai/memoh/internal/memory" - "github.com/memohai/memoh/internal/message" - "github.com/memohai/memoh/internal/message/event" - "github.com/memohai/memoh/internal/server" - "go.uber.org/fx" -) - -var HandlersModule = fx.Module( - "handlers", - fx.Provide( - // Custom handlers with provide functions - annotateHandler(provideMemoryHandler), - annotateHandler(provideAuthHandler), - annotateHandler(provideMessageHandler), - annotateHandler(provideUsersHandler), - annotateHandler(provideCLIHandler), - annotateHandler(provideWebHandler), - - // Simple handlers from handlers package - annotateHandler(handlers.NewEmbeddingsHandler), - annotateHandler(handlers.NewPingHandler), - annotateHandler(handlers.NewSwaggerHandler), - annotateHandler(handlers.NewProvidersHandler), - annotateHandler(handlers.NewSearchProvidersHandler), - annotateHandler(handlers.NewModelsHandler), - annotateHandler(handlers.NewSettingsHandler), - annotateHandler(handlers.NewPreauthHandler), - annotateHandler(handlers.NewBindHandler), - annotateHandler(handlers.NewScheduleHandler), - annotateHandler(handlers.NewSubagentHandler), - annotateHandler(handlers.NewChannelHandler), - annotateHandler(handlers.NewMCPHandler), - ), -) - -// annotateHandler wraps a handler provider function with fx.Annotate -// to register it as a server.Handler with the correct group tag -func annotateHandler(fn any) any { - return fx.Annotate( - fn, - fx.As(new(server.Handler)), - fx.ResultTags(`group:"server_handlers"`), - ) -} - -// --------------------------------------------------------------------------- -// handler providers (interface adaptation / config extraction) -// --------------------------------------------------------------------------- - -func provideMemoryHandler(log *slog.Logger, service *memory.Service, chatService *conversation.Service, accountService *accounts.Service, cfg config.Config, manager *mcp.Manager) *handlers.MemoryHandler { - h := handlers.NewMemoryHandler(log, service, chatService, accountService) - if manager != nil { - execWorkDir := cfg.MCP.DataMount - if strings.TrimSpace(execWorkDir) == "" { - execWorkDir = config.DefaultDataMount - } - h.SetMemoryFS(memory.NewMemoryFS(log, manager, execWorkDir)) - } - return h -} - -func provideAuthHandler(log *slog.Logger, accountService *accounts.Service, rc *boot.RuntimeConfig) *handlers.AuthHandler { - return handlers.NewAuthHandler(log, accountService, rc.JwtSecret, rc.JwtExpiresIn) -} - -func provideMessageHandler(log *slog.Logger, resolver *flow.Resolver, chatService *conversation.Service, msgService *message.DBService, botService *bots.Service, accountService *accounts.Service, identityService *identities.Service, hub *event.Hub) *handlers.MessageHandler { - return handlers.NewMessageHandler(log, resolver, chatService, msgService, botService, accountService, identityService, hub) -} - -func provideUsersHandler(log *slog.Logger, accountService *accounts.Service, identityService *identities.Service, botService *bots.Service, routeService *route.DBService, channelService *channel.Service, channelManager *channel.Manager, registry *channel.Registry) *handlers.UsersHandler { - return handlers.NewUsersHandler(log, accountService, identityService, botService, routeService, channelService, channelManager, registry) -} - -func provideCLIHandler(channelManager *channel.Manager, channelService *channel.Service, chatService *conversation.Service, hub *local.RouteHub, botService *bots.Service, accountService *accounts.Service) *handlers.LocalChannelHandler { - return handlers.NewLocalChannelHandler(local.CLIType, channelManager, channelService, chatService, hub, botService, accountService) -} - -func provideWebHandler(channelManager *channel.Manager, channelService *channel.Service, chatService *conversation.Service, hub *local.RouteHub, botService *bots.Service, accountService *accounts.Service) *handlers.LocalChannelHandler { - return handlers.NewLocalChannelHandler(local.WebType, channelManager, channelService, chatService, hub, botService, accountService) -} diff --git a/cmd/agent/modules/infrastructure.go b/cmd/agent/modules/infrastructure.go deleted file mode 100644 index 34422778..00000000 --- a/cmd/agent/modules/infrastructure.go +++ /dev/null @@ -1,87 +0,0 @@ -package modules - -import ( - "context" - "fmt" - "log/slog" - "os" - - containerd "github.com/containerd/containerd/v2/client" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/memohai/memoh/internal/boot" - "github.com/memohai/memoh/internal/config" - ctr "github.com/memohai/memoh/internal/containerd" - "github.com/memohai/memoh/internal/db" - dbsqlc "github.com/memohai/memoh/internal/db/sqlc" - "github.com/memohai/memoh/internal/logger" - "github.com/memohai/memoh/internal/mcp" - "go.uber.org/fx" -) - -var InfraModule = fx.Module( - "Infra", - fx.Provide( - provideConfig, - provideLogger, - provideContainerdClient, - provideDBConn, - provideDBQueries, - provideMCPManager, - boot.ProvideRuntimeConfig, - fx.Annotate(ctr.NewDefaultService, fx.As(new(ctr.Service))), - ), -) - -// --------------------------------------------------------------------------- -// infrastructure providers -// --------------------------------------------------------------------------- - -func provideConfig() (config.Config, error) { - cfgPath := os.Getenv("CONFIG_PATH") - cfg, err := config.Load(cfgPath) - if err != nil { - return config.Config{}, fmt.Errorf("load config: %w", err) - } - return cfg, nil -} - -func provideLogger(cfg config.Config) *slog.Logger { - logger.Init(cfg.Log.Level, cfg.Log.Format) - return logger.L -} - -func provideContainerdClient(lc fx.Lifecycle, rc *boot.RuntimeConfig) (*containerd.Client, error) { - factory := ctr.DefaultClientFactory{SocketPath: rc.ContainerdSocketPath} - client, err := factory.New(context.Background()) - if err != nil { - return nil, fmt.Errorf("connect containerd: %w", err) - } - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return client.Close() - }, - }) - return client, nil -} - -func provideDBConn(lc fx.Lifecycle, cfg config.Config) (*pgxpool.Pool, error) { - conn, err := db.Open(context.Background(), cfg.Postgres) - if err != nil { - return nil, fmt.Errorf("db connect: %w", err) - } - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - conn.Close() - return nil - }, - }) - return conn, nil -} - -func provideDBQueries(conn *pgxpool.Pool) *dbsqlc.Queries { - return dbsqlc.New(conn) -} - -func provideMCPManager(log *slog.Logger, service ctr.Service, cfg config.Config, conn *pgxpool.Pool) *mcp.Manager { - return mcp.NewManager(log, service, cfg.MCP, cfg.Containerd.Namespace, conn) -} \ No newline at end of file diff --git a/cmd/agent/modules/memory.go b/cmd/agent/modules/memory.go deleted file mode 100644 index f5cf8a42..00000000 --- a/cmd/agent/modules/memory.go +++ /dev/null @@ -1,191 +0,0 @@ -package modules - -import ( - "context" - "fmt" - "log/slog" - "strings" - "time" - - "github.com/memohai/memoh/internal/config" - dbsqlc "github.com/memohai/memoh/internal/db/sqlc" - "github.com/memohai/memoh/internal/embeddings" - "github.com/memohai/memoh/internal/memory" - "github.com/memohai/memoh/internal/models" - "go.uber.org/fx" -) - - -var MemoryModule = fx.Module( - "memory", - fx.Provide( - provideMemoryLLM, - provideEmbeddingSetup, - provideEmbeddingsResolver, - provideTextEmbedderForMemory, - provideQdrantStore, - memory.NewBM25Indexer, - provideMemoryService, - ), - fx.Invoke(startMemoryWarmup), -) - -// --------------------------------------------------------------------------- -// memory providers -// --------------------------------------------------------------------------- - -func provideMemoryLLM(modelsService *models.Service, queries *dbsqlc.Queries, log *slog.Logger) memory.LLM { - return &lazyLLMClient{ - modelsService: modelsService, - queries: queries, - timeout: 30 * time.Second, - logger: log, - } -} - -func provideEmbeddingsResolver(log *slog.Logger, modelsService *models.Service, queries *dbsqlc.Queries) *embeddings.Resolver { - return embeddings.NewResolver(log, modelsService, queries, 10*time.Second) -} - -type embeddingSetup struct { - Vectors map[string]int - TextModel models.GetResponse - MultimodalModel models.GetResponse - HasEmbeddingModels bool -} - -func provideEmbeddingSetup(log *slog.Logger, modelsService *models.Service) (embeddingSetup, error) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - vectors, textModel, multimodalModel, hasEmbeddingModels, err := embeddings.CollectEmbeddingVectors(ctx, modelsService) - if err != nil { - return embeddingSetup{}, fmt.Errorf("embedding models: %w", err) - } - if hasEmbeddingModels && multimodalModel.ModelID == "" { - log.Warn("No multimodal embedding model configured. Multimodal embedding features will be limited.") - } - return embeddingSetup{ - Vectors: vectors, - TextModel: textModel, - MultimodalModel: multimodalModel, - HasEmbeddingModels: hasEmbeddingModels, - }, nil -} - -func provideTextEmbedderForMemory(resolver *embeddings.Resolver, setup embeddingSetup, log *slog.Logger) embeddings.Embedder { - return buildTextEmbedder(resolver, setup.TextModel, setup.HasEmbeddingModels, log) -} - -func provideQdrantStore(log *slog.Logger, cfg config.Config, setup embeddingSetup) (*memory.QdrantStore, error) { - qcfg := cfg.Qdrant - timeout := time.Duration(qcfg.TimeoutSeconds) * time.Second - if setup.HasEmbeddingModels && len(setup.Vectors) > 0 { - store, err := memory.NewQdrantStoreWithVectors(log, qcfg.BaseURL, qcfg.APIKey, qcfg.Collection, setup.Vectors, "sparse_hash", timeout) - if err != nil { - return nil, fmt.Errorf("qdrant named vectors init: %w", err) - } - return store, nil - } - store, err := memory.NewQdrantStore(log, qcfg.BaseURL, qcfg.APIKey, qcfg.Collection, setup.TextModel.Dimensions, "sparse_hash", timeout) - if err != nil { - return nil, fmt.Errorf("qdrant init: %w", err) - } - return store, nil -} - -func provideMemoryService(log *slog.Logger, llm memory.LLM, embedder embeddings.Embedder, store *memory.QdrantStore, resolver *embeddings.Resolver, bm25 *memory.BM25Indexer, setup embeddingSetup) *memory.Service { - return memory.NewService(log, llm, embedder, store, resolver, bm25, setup.TextModel.ModelID, setup.MultimodalModel.ModelID) -} - -func buildTextEmbedder(resolver *embeddings.Resolver, textModel models.GetResponse, hasModels bool, log *slog.Logger) embeddings.Embedder { - if !hasModels { - return nil - } - if textModel.ModelID == "" || textModel.Dimensions <= 0 { - log.Warn("No text embedding model configured. Text embedding features will be limited.") - return nil - } - return &embeddings.ResolverTextEmbedder{ - Resolver: resolver, - ModelID: textModel.ModelID, - Dims: textModel.Dimensions, - } -} - -func startMemoryWarmup(lc fx.Lifecycle, memoryService *memory.Service, logger *slog.Logger) { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - go func() { - if err := memoryService.WarmupBM25(context.Background(), 200); err != nil { - logger.Warn("bm25 warmup failed", slog.Any("error", err)) - } - }() - return nil - }, - }) -} - - -// --------------------------------------------------------------------------- -// lazy LLM client -// --------------------------------------------------------------------------- - -type lazyLLMClient struct { - modelsService *models.Service - queries *dbsqlc.Queries - timeout time.Duration - logger *slog.Logger -} - -func (c *lazyLLMClient) Extract(ctx context.Context, req memory.ExtractRequest) (memory.ExtractResponse, error) { - client, err := c.resolve(ctx) - if err != nil { - return memory.ExtractResponse{}, err - } - return client.Extract(ctx, req) -} - -func (c *lazyLLMClient) Decide(ctx context.Context, req memory.DecideRequest) (memory.DecideResponse, error) { - client, err := c.resolve(ctx) - if err != nil { - return memory.DecideResponse{}, err - } - return client.Decide(ctx, req) -} - -func (c *lazyLLMClient) Compact(ctx context.Context, req memory.CompactRequest) (memory.CompactResponse, error) { - client, err := c.resolve(ctx) - if err != nil { - return memory.CompactResponse{}, err - } - return client.Compact(ctx, req) -} - -func (c *lazyLLMClient) DetectLanguage(ctx context.Context, text string) (string, error) { - client, err := c.resolve(ctx) - if err != nil { - return "", err - } - return client.DetectLanguage(ctx, text) -} - -func (c *lazyLLMClient) resolve(ctx context.Context) (memory.LLM, error) { - if c.modelsService == nil || c.queries == nil { - return nil, fmt.Errorf("models service not configured") - } - memoryModel, memoryProvider, err := models.SelectMemoryModel(ctx, c.modelsService, c.queries) - if err != nil { - return nil, err - } - clientType := strings.ToLower(strings.TrimSpace(memoryProvider.ClientType)) - switch clientType { - case "openai", "openai-compat", "azure", "mistral", "xai", "ollama", "dashscope": - // These providers support OpenAI-compatible /chat/completions endpoint - default: - return nil, fmt.Errorf("memory provider client type not supported: %s", memoryProvider.ClientType) - } - return memory.NewLLMClient(c.logger, memoryProvider.BaseUrl, memoryProvider.ApiKey, memoryModel.ModelID, c.timeout) -} - - diff --git a/cmd/agent/modules/server.go b/cmd/agent/modules/server.go deleted file mode 100644 index f1c72acc..00000000 --- a/cmd/agent/modules/server.go +++ /dev/null @@ -1,141 +0,0 @@ -package modules - -import ( - "context" - "errors" - "fmt" - "log/slog" - "net/http" - "strings" - - "github.com/jackc/pgx/v5/pgtype" - "github.com/memohai/memoh/internal/boot" - "github.com/memohai/memoh/internal/bots" - "github.com/memohai/memoh/internal/config" - dbsqlc "github.com/memohai/memoh/internal/db/sqlc" - "github.com/memohai/memoh/internal/handlers" - "github.com/memohai/memoh/internal/mcp" - "github.com/memohai/memoh/internal/server" - "github.com/memohai/memoh/internal/version" - "go.uber.org/fx" - "golang.org/x/crypto/bcrypt" -) - - -var ServerModule = fx.Module( - "server", - fx.Provide( - provideServer, - ), - fx.Invoke(startServer), -) - -// --------------------------------------------------------------------------- -// server -// --------------------------------------------------------------------------- - -type serverParams struct { - fx.In - - Logger *slog.Logger - RuntimeConfig *boot.RuntimeConfig - Config config.Config - ServerHandlers []server.Handler `group:"server_handlers"` - ContainerdHandler *handlers.ContainerdHandler -} - -func provideServer(params serverParams) *server.Server { - allHandlers := make([]server.Handler, 0, len(params.ServerHandlers)+1) - allHandlers = append(allHandlers, params.ServerHandlers...) - allHandlers = append(allHandlers, params.ContainerdHandler) - return server.NewServer(params.Logger, params.RuntimeConfig.ServerAddr, params.Config.Auth.JWTSecret, allHandlers...) -} - -func startServer(lc fx.Lifecycle, logger *slog.Logger, srv *server.Server, shutdowner fx.Shutdowner, cfg config.Config, queries *dbsqlc.Queries, botService *bots.Service, containerdHandler *handlers.ContainerdHandler, mcpConnService *mcp.ConnectionService, toolGateway *mcp.ToolGatewayService) { - fmt.Printf("Starting Memoh Agent %s\n", version.GetInfo()) - - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - if err := ensureAdminUser(ctx, logger, queries, cfg); err != nil { - return err - } - botService.SetContainerLifecycle(containerdHandler) - botService.AddRuntimeChecker(mcp.NewConnectionChecker(logger, mcpConnService, toolGateway)) - - go func() { - if err := srv.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { - logger.Error("server failed", slog.Any("error", err)) - _ = shutdowner.Shutdown() - } - }() - return nil - }, - OnStop: func(ctx context.Context) error { - if err := srv.Stop(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { - return fmt.Errorf("server stop: %w", err) - } - return nil - }, - }) -} - -func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Queries, cfg config.Config) error { - if queries == nil { - return fmt.Errorf("db queries not configured") - } - count, err := queries.CountAccounts(ctx) - if err != nil { - return err - } - if count > 0 { - return nil - } - - username := strings.TrimSpace(cfg.Admin.Username) - password := strings.TrimSpace(cfg.Admin.Password) - email := strings.TrimSpace(cfg.Admin.Email) - if username == "" || password == "" { - return fmt.Errorf("admin username/password required in config.toml") - } - if password == "change-your-password-here" { - log.Warn("admin password uses default placeholder; please update config.toml") - } - - hashed, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) - if err != nil { - return err - } - - user, err := queries.CreateUser(ctx, dbsqlc.CreateUserParams{ - IsActive: true, - Metadata: []byte("{}"), - }) - if err != nil { - return fmt.Errorf("create admin user: %w", err) - } - - emailValue := pgtype.Text{Valid: false} - if email != "" { - emailValue = pgtype.Text{String: email, Valid: true} - } - displayName := pgtype.Text{String: username, Valid: true} - dataRoot := pgtype.Text{String: cfg.MCP.DataRoot, Valid: cfg.MCP.DataRoot != ""} - - _, err = queries.CreateAccount(ctx, dbsqlc.CreateAccountParams{ - UserID: user.ID, - Username: pgtype.Text{String: username, Valid: true}, - Email: emailValue, - PasswordHash: pgtype.Text{String: string(hashed), Valid: true}, - Role: "admin", - DisplayName: displayName, - AvatarUrl: pgtype.Text{Valid: false}, - IsActive: true, - DataRoot: dataRoot, - }) - if err != nil { - return err - } - log.Info("Admin user created", slog.String("username", username)) - return nil -} -