diff --git a/DEPLOYMENT.md b/DEPLOYMENT.md index 6b57052b..b53393b0 100644 --- a/DEPLOYMENT.md +++ b/DEPLOYMENT.md @@ -1,394 +1,64 @@ -# Memoh Docker Deployment Guide +# Memoh Deployment Guide -Deploy Memoh AI Agent System with Docker Compose in one command. +## Quick Deploy -## Quick Start - -### 1. Clone the Repository ```bash git clone https://github.com/memohai/Memoh.git cd Memoh -``` - -### 2. One-Click Deployment -```bash ./deploy.sh ``` -The script will automatically: -- Check Docker and Docker Compose installation -- Create `config.toml` configuration file (if not exists) -- Build MCP image -- Start all services - -### 3. Access the Application +Access: - Web UI: http://localhost -- API Service: http://localhost:8080 -- Agent Gateway: http://localhost:8081 +- API: http://localhost:8080 +- Agent: http://localhost:8081 -Default admin credentials: -- Username: `admin` -- Password: `admin123` (change in `config.toml`) +Default credentials: `admin` / `admin123` -## Manual Deployment - -If you prefer not to use the automated script: +## Manual Deploy ```bash -# 1. Create configuration file cp docker/config/config.docker.toml config.toml - -# 2. Edit configuration (Important!) -nano config.toml - -# 3. Build MCP image -docker build -f docker/Dockerfile.mcp -t memoh-mcp:latest . - -# 4. Start services +nano config.toml # Change passwords and secrets +nerdctl build -f docker/Dockerfile.mcp -t memoh-mcp:latest . docker compose up -d - -# 5. View logs -docker compose logs -f ``` -## Architecture +## Required Configuration -This deployment uses the host's Docker daemon to manage Bot containers: - -``` -Host Docker -├── memoh-postgres (PostgreSQL) -├── memoh-qdrant (Qdrant) -├── memoh-server (Main Service) ← Manages Bot containers via /var/run/docker.sock -├── memoh-agent (Agent Gateway) -├── memoh-web (Web Frontend) -└── memoh-bot-* (Bot containers, dynamically created by main service) -``` - -Advantages: -- ✅ Lightweight, no additional Docker daemon needed -- ✅ Better performance, uses host container runtime directly -- ✅ Easier to manage and debug -- ✅ Lower resource consumption +Must change in `config.toml`: +- `admin.password` - Admin password +- `auth.jwt_secret` - JWT secret (generate with `openssl rand -base64 32`) +- `postgres.password` - Database password ## Common Commands -### Using Docker Compose ```bash -docker compose up -d # Start services -docker compose down # Stop services -docker compose logs -f # View logs -docker compose ps # View status -docker compose restart # Restart services +docker compose up -d # Start +docker compose down # Stop +docker compose logs -f # View logs +nerdctl ps -a | grep memoh-bot # View bot containers ``` -### Bot Container Management +## Production -View all Bot containers: -```bash -docker ps -a | grep memoh-bot -``` - -## Configuration - -### Environment Variables - -Configuration is managed through `config.toml` file. Key configuration items: - -```toml -# Admin account -[admin] -username = "admin" -password = "admin123" # Must change -email = "admin@yourdomain.com" - -# Auth configuration -[auth] -jwt_secret = "YZq8kXrW5dFpNt9mLxQvHbRjKsMnOePw" # Must change -jwt_expires_in = "168h" - -# PostgreSQL password -[postgres] -host = "postgres" -port = 5432 -user = "memoh" -password = "memoh123" # Must change -database = "memoh" -sslmode = "disable" -``` - -### Application Configuration (config.toml) - -Main configuration items: - -```toml -[postgres] -host = "postgres" -password = "your_secure_password" # Must change in config.toml - -[containerd] -socket_path = "/run/containerd/containerd.sock" - -[qdrant] -base_url = "http://qdrant:6334" -``` - -## Service Overview - -| Service | Container Name | Ports | Description | -|---------|---------------|-------|-------------| -| postgres | memoh-postgres | - | PostgreSQL database (internal only) | -| qdrant | memoh-qdrant | - | Qdrant vector database (internal only) | -| docker-cli | memoh-docker-cli | - | Docker CLI (uses host Docker) | -| server | memoh-server | 8080 | Main service (Go) | -| agent | memoh-agent | 8081 | Agent Gateway (Bun) | -| web | memoh-web | 80 | Web frontend (Nginx) | - -## Data Persistence - -Data is stored in Docker volumes: - -```bash -# View volumes -docker volume ls | grep memoh - -# Backup database -docker compose exec postgres pg_dump -U memoh memoh > backup.sql -``` - -### Bot Container Management - -Bot containers are dynamically created by the main service and run directly on the host: - -```bash -# View all Bot containers -docker ps -a | grep memoh-bot - -# View Bot logs -docker logs - -# Enter Bot container -docker exec -it sh - -# Stop Bot container -docker stop -``` - -## Backup and Restore - -### Backup -```bash -# Create backup directory -mkdir -p backups - -# Backup database -docker compose exec postgres pg_dump -U memoh memoh > backups/postgres_$(date +%Y%m%d).sql - -# Backup Bot data -docker run --rm -v memoh_memoh_bot_data:/data -v $(pwd)/backups:/backup alpine \ - tar czf /backup/bot_data_$(date +%Y%m%d).tar.gz -C /data . - -# Backup configuration files -tar czf backups/config_$(date +%Y%m%d).tar.gz config.toml -``` - -### Restore -```bash -# Restore database -docker compose exec -T postgres psql -U memoh memoh < backups/postgres_20240101.sql - -# Restore Bot data -docker run --rm -v memoh_memoh_bot_data:/data -v $(pwd)/backups:/backup alpine \ - tar xzf /backup/bot_data_20240101.tar.gz -C /data -``` +1. Configure HTTPS (create `docker-compose.override.yml` with SSL certs) +2. Change all default passwords +3. Configure firewall +4. Set resource limits +5. Regular backups ## Troubleshooting -### Services Won't Start ```bash -# View detailed logs -docker compose logs server - -# Check configuration -docker compose config - -# Rebuild -docker compose build --no-cache -docker compose up -d +docker compose logs server # View service logs +docker compose config # Check configuration +docker compose build --no-cache && docker compose up -d # Rebuild ``` -### Database Connection Failed -```bash -# Check if database is ready -docker compose exec postgres pg_isready -U memoh +## Security Warnings -# Test connection -docker compose exec postgres psql -U memoh -d memoh +⚠️ Main service has host Docker access - only run in trusted environments +⚠️ Must change all default passwords and secrets +⚠️ Use HTTPS in production -# View database logs -docker compose logs postgres -``` - -### Port Conflicts -```bash -# Check port usage -sudo netstat -tlnp | grep :8080 -sudo netstat -tlnp | grep :80 - -# Modify port mapping in docker-compose.yml -# Example: change "80:80" to "8000:80" -``` - -### Docker Socket Permission Issues -```bash -# Add user to docker group -sudo usermod -aG docker $USER -newgrp docker - -# Check permissions -ls -la /var/run/docker.sock -``` - -## Production Deployment - -### 1. Use HTTPS - -Create `docker-compose.override.yml`: -```yaml -services: - web: - ports: - - "443:443" - volumes: - - ./ssl:/etc/nginx/ssl:ro - - ./docker/config/nginx-https.conf:/etc/nginx/conf.d/default.conf:ro -``` - -Create `docker/config/nginx-https.conf`: -```nginx -server { - listen 80; - server_name your-domain.com; - return 301 https://$server_name$request_uri; -} - -server { - listen 443 ssl http2; - server_name your-domain.com; - - ssl_certificate /etc/nginx/ssl/cert.pem; - ssl_certificate_key /etc/nginx/ssl/key.pem; - - # SSL configuration - ssl_protocols TLSv1.2 TLSv1.3; - ssl_ciphers HIGH:!aNULL:!MD5; - ssl_prefer_server_ciphers on; - - # Other configurations same as docker/config/nginx.conf - # ... -} -``` - -### 2. Resource Limits - -Edit `docker-compose.yml` to add resource limits: -```yaml -services: - server: - deploy: - resources: - limits: - cpus: '2' - memory: 2G - reservations: - cpus: '1' - memory: 1G -``` - -### 3. Security Recommendations - -Production environment recommendations: -- Change all default passwords in `config.toml` -- Use strong JWT secret -- Configure firewall rules -- Use HTTPS -- Regular data backups -- Limit containerd socket access permissions -- Run services as non-root user -- Configure log rotation - -## Performance Optimization - -### PostgreSQL Optimization -Create `postgres-custom.conf`: -``` -shared_buffers = 2GB -effective_cache_size = 6GB -maintenance_work_mem = 512MB -checkpoint_completion_target = 0.9 -wal_buffers = 16MB -``` - -Mount in `docker-compose.yml`: -```yaml -postgres: - volumes: - - ./postgres-custom.conf:/etc/postgresql/postgresql.conf:ro - command: postgres -c config_file=/etc/postgresql/postgresql.conf -``` - -### Network Optimization -```yaml -networks: - memoh-network: - driver: bridge - driver_opts: - com.docker.network.driver.mtu: 1500 -``` - -## Update Application - -```bash -# Pull latest code -git pull - -# Rebuild and restart -docker compose up -d --build -``` - -## Complete Uninstall - -```bash -# Stop and remove all containers -docker compose down - -# Remove data volumes (Warning! This deletes all data) -docker compose down -v - -# Remove images -docker rmi memoh-mcp:latest -docker rmi $(docker images | grep memoh | awk '{print $3}') -``` - -## Security Considerations - -⚠️ Important Security Notes: - -1. **Docker Socket Access**: The main service container has access to the host Docker socket, which means the application can manage other containers on the host. Only run in trusted environments. -2. **Change Default Passwords**: Must change all default passwords in `config.toml` -3. **Strong JWT Secret**: Use a strong random JWT secret (generate with `openssl rand -base64 32`) -4. **Firewall**: Configure firewall to only open necessary ports -5. **HTTPS**: Use HTTPS in production -6. **Regular Backups**: Regularly backup data -7. **Updates**: Regularly update images and dependencies - -## Get Help - -- Detailed Documentation: [DOCKER_DEPLOYMENT_CN.md](DOCKER_DEPLOYMENT_CN.md) (Chinese) -- GitHub Issues: https://github.com/memohai/Memoh/issues -- Telegram Group: https://t.me/memohai -- Email: business@memoh.net - ---- - -**That's it! Deploy Memoh in minutes!** diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 4251adde..1ec0c4d6 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -2,14 +2,24 @@ package main import ( "context" + "errors" "fmt" "log/slog" + "net/http" "os" "strings" "time" + containerd "github.com/containerd/containerd/v2/client" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + "go.uber.org/fx" + "go.uber.org/fx/fxevent" + "golang.org/x/crypto/bcrypt" + "github.com/memohai/memoh/internal/accounts" "github.com/memohai/memoh/internal/bind" + "github.com/memohai/memoh/internal/boot" "github.com/memohai/memoh/internal/bots" "github.com/memohai/memoh/internal/channel" "github.com/memohai/memoh/internal/channel/adapters/feishu" @@ -46,194 +56,441 @@ import ( "github.com/memohai/memoh/internal/settings" "github.com/memohai/memoh/internal/subagent" "github.com/memohai/memoh/internal/version" - - "github.com/jackc/pgx/v5/pgtype" - "golang.org/x/crypto/bcrypt" ) func main() { - fmt.Printf("Starting Memoh Agent %s\n", version.GetInfo()) - ctx := context.Background() + fx.New( + fx.Provide( + provideConfig, + boot.ProvideRuntimeConfig, + provideLogger, + provideContainerdClient, + provideDBConn, + provideDBQueries, + + // containerd & mcp infrastructure + fx.Annotate(ctr.NewDefaultService, fx.As(new(ctr.Service))), + provideMCPManager, + + // memory pipeline + provideMemoryLLM, + provideEmbeddingsResolver, + provideEmbeddingSetup, + provideTextEmbedderForMemory, + provideQdrantStore, + memory.NewBM25Indexer, + provideMemoryService, + + // domain services (auto-wired) + models.NewService, + bots.NewService, + accounts.NewService, + settings.NewService, + providers.NewService, + policy.NewService, + preauth.NewService, + mcp.NewConnectionService, + subagent.NewService, + conversation.NewService, + identities.NewService, + bind.NewService, + event.NewHub, + + // services requiring provide functions + provideRouteService, + provideMessageService, + + // channel infrastructure + local.NewRouteHub, + provideChannelRegistry, + channel.NewService, + provideChannelRouter, + provideChannelManager, + + // conversation flow + provideChatResolver, + provideScheduleTriggerer, + schedule.NewService, + + // containerd handler & tool gateway + provideContainerdHandler, + provideToolGatewayService, + + // http handlers (group:"server_handlers") + provideServerHandler(handlers.NewPingHandler), + provideServerHandler(provideAuthHandler), + provideServerHandler(handlers.NewMemoryHandler), + provideServerHandler(handlers.NewEmbeddingsHandler), + provideServerHandler(provideMessageHandler), + provideServerHandler(handlers.NewSwaggerHandler), + provideServerHandler(handlers.NewProvidersHandler), + provideServerHandler(handlers.NewModelsHandler), + provideServerHandler(handlers.NewSettingsHandler), + provideServerHandler(handlers.NewPreauthHandler), + provideServerHandler(handlers.NewBindHandler), + provideServerHandler(handlers.NewScheduleHandler), + provideServerHandler(handlers.NewSubagentHandler), + provideServerHandler(handlers.NewChannelHandler), + provideServerHandler(provideUsersHandler), + provideServerHandler(handlers.NewMCPHandler), + provideServerHandler(provideCLIHandler), + provideServerHandler(provideWebHandler), + + provideServer, + ), + fx.Invoke( + startMemoryWarmup, + startScheduleService, + startChannelManager, + startContainerReconciliation, + startServer, + ), + fx.WithLogger(func(logger *slog.Logger) fxevent.Logger { + return &fxevent.SlogLogger{Logger: logger.With(slog.String("component", "fx"))} + }), + ).Run() +} + +// --------------------------------------------------------------------------- +// fx helper +// --------------------------------------------------------------------------- + +func provideServerHandler(fn any) any { + return fx.Annotate( + fn, + fx.As(new(server.Handler)), + fx.ResultTags(`group:"server_handlers"`), + ) +} + +// --------------------------------------------------------------------------- +// infrastructure providers +// --------------------------------------------------------------------------- + +func provideConfig() (config.Config, error) { cfgPath := os.Getenv("CONFIG_PATH") cfg, err := config.Load(cfgPath) if err != nil { - fmt.Fprintf(os.Stderr, "load config: %v\n", err) - os.Exit(1) + return config.Config{}, fmt.Errorf("load config: %w", err) } + return cfg, nil +} +func provideLogger(cfg config.Config) *slog.Logger { logger.Init(cfg.Log.Level, cfg.Log.Format) + return logger.L +} - if strings.TrimSpace(cfg.Auth.JWTSecret) == "" { - logger.Error("jwt secret is required") - os.Exit(1) - } - jwtExpiresIn, err := time.ParseDuration(cfg.Auth.JWTExpiresIn) +func provideContainerdClient(lc fx.Lifecycle, rc *boot.RuntimeConfig) (*containerd.Client, error) { + factory := ctr.DefaultClientFactory{SocketPath: rc.ContainerdSocketPath} + client, err := factory.New(context.Background()) if err != nil { - logger.Error("invalid jwt expires in", slog.Any("error", err)) - os.Exit(1) + return nil, fmt.Errorf("connect containerd: %w", err) } + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return client.Close() + }, + }) + return client, nil +} - addr := cfg.Server.Addr - if value := os.Getenv("HTTP_ADDR"); value != "" { - addr = value - } - - socketPath := cfg.Containerd.SocketPath - if value := os.Getenv("CONTAINERD_SOCKET"); value != "" { - socketPath = value - } - factory := ctr.DefaultClientFactory{SocketPath: socketPath} - client, err := factory.New(ctx) +func provideDBConn(lc fx.Lifecycle, cfg config.Config) (*pgxpool.Pool, error) { + conn, err := db.Open(context.Background(), cfg.Postgres) if err != nil { - logger.Error("connect containerd", slog.Any("error", err)) - os.Exit(1) + return nil, fmt.Errorf("db connect: %w", err) } - defer client.Close() + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + conn.Close() + return nil + }, + }) + return conn, nil +} - service := ctr.NewDefaultService(logger.L, client, cfg.Containerd.Namespace) - manager := mcp.NewManager(logger.L, service, cfg.MCP, cfg.Containerd.Namespace) +func provideDBQueries(conn *pgxpool.Pool) *dbsqlc.Queries { + return dbsqlc.New(conn) +} - pingHandler := handlers.NewPingHandler(logger.L) - // containerdHandler is created later after DB services are initialized +func provideMCPManager(log *slog.Logger, service ctr.Service, cfg config.Config, conn *pgxpool.Pool) *mcp.Manager { + return mcp.NewManager(log, service, cfg.MCP, cfg.Containerd.Namespace, conn) +} - conn, err := db.Open(ctx, cfg.Postgres) - if err != nil { - logger.Error("db connect", slog.Any("error", err)) - os.Exit(1) - } - defer conn.Close() - manager.WithDB(conn) - queries := dbsqlc.New(conn) - modelsService := models.NewService(logger.L, queries) - botService := bots.NewService(logger.L, queries) - accountService := accounts.NewService(logger.L, queries) - settingsService := settings.NewService(logger.L, queries) - policyService := policy.NewService(logger.L, botService, settingsService) +// --------------------------------------------------------------------------- +// memory providers +// --------------------------------------------------------------------------- - containerdHandler := handlers.NewContainerdHandler(logger.L, service, cfg.MCP, cfg.Containerd.Namespace, botService, accountService, policyService, queries) - botService.SetContainerLifecycle(containerdHandler) - - if err := ensureAdminUser(ctx, logger.L, queries, cfg); err != nil { - logger.Error("ensure admin user", slog.Any("error", err)) - os.Exit(1) - } - - authHandler := handlers.NewAuthHandler(logger.L, accountService, cfg.Auth.JWTSecret, jwtExpiresIn) - - // Initialize conversation runner after memory service is configured. - var chatResolver *flow.Resolver - - // Create LLM client for memory operations (deferred model/provider selection). - var llmClient memory.LLM = &lazyLLMClient{ +func provideMemoryLLM(modelsService *models.Service, queries *dbsqlc.Queries, log *slog.Logger) memory.LLM { + return &lazyLLMClient{ modelsService: modelsService, queries: queries, timeout: 30 * time.Second, - logger: logger.L, + logger: log, } +} + +func provideEmbeddingsResolver(log *slog.Logger, modelsService *models.Service, queries *dbsqlc.Queries) *embeddings.Resolver { + return embeddings.NewResolver(log, modelsService, queries, 10*time.Second) +} + +type embeddingSetup struct { + Vectors map[string]int + TextModel models.GetResponse + MultimodalModel models.GetResponse + HasEmbeddingModels bool +} + +func provideEmbeddingSetup(log *slog.Logger, modelsService *models.Service) (embeddingSetup, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - resolver := embeddings.NewResolver(logger.L, modelsService, queries, 10*time.Second) vectors, textModel, multimodalModel, hasEmbeddingModels, err := embeddings.CollectEmbeddingVectors(ctx, modelsService) if err != nil { - logger.Error("embedding models", slog.Any("error", err)) - os.Exit(1) + return embeddingSetup{}, fmt.Errorf("embedding models: %w", err) } - - textEmbedder := buildTextEmbedder(resolver, textModel, hasEmbeddingModels, logger.L) if hasEmbeddingModels && multimodalModel.ModelID == "" { - logger.Warn("No multimodal embedding model configured. Multimodal embedding features will be limited.") + log.Warn("No multimodal embedding model configured. Multimodal embedding features will be limited.") } - store := buildQdrantStore(logger.L, cfg.Qdrant, vectors, hasEmbeddingModels, textModel.Dimensions) + return embeddingSetup{ + Vectors: vectors, + TextModel: textModel, + MultimodalModel: multimodalModel, + HasEmbeddingModels: hasEmbeddingModels, + }, nil +} - bm25Indexer := memory.NewBM25Indexer(logger.L) - memoryService := memory.NewService(logger.L, llmClient, textEmbedder, store, resolver, bm25Indexer, textModel.ModelID, multimodalModel.ModelID) - go func() { - if err := memoryService.WarmupBM25(ctx, 200); err != nil { - logger.Warn("bm25 warmup failed", slog.Any("error", err)) +func provideTextEmbedderForMemory(resolver *embeddings.Resolver, setup embeddingSetup, log *slog.Logger) embeddings.Embedder { + return buildTextEmbedder(resolver, setup.TextModel, setup.HasEmbeddingModels, log) +} + +func provideQdrantStore(log *slog.Logger, cfg config.Config, setup embeddingSetup) (*memory.QdrantStore, error) { + qcfg := cfg.Qdrant + timeout := time.Duration(qcfg.TimeoutSeconds) * time.Second + if setup.HasEmbeddingModels && len(setup.Vectors) > 0 { + store, err := memory.NewQdrantStoreWithVectors(log, qcfg.BaseURL, qcfg.APIKey, qcfg.Collection, setup.Vectors, "sparse_hash", timeout) + if err != nil { + return nil, fmt.Errorf("qdrant named vectors init: %w", err) } - }() + return store, nil + } + store, err := memory.NewQdrantStore(log, qcfg.BaseURL, qcfg.APIKey, qcfg.Collection, setup.TextModel.Dimensions, "sparse_hash", timeout) + if err != nil { + return nil, fmt.Errorf("qdrant init: %w", err) + } + return store, nil +} - // Initialize providers and models handlers - providersService := providers.NewService(logger.L, queries) - providersHandler := handlers.NewProvidersHandler(logger.L, providersService, modelsService) - settingsHandler := handlers.NewSettingsHandler(logger.L, settingsService, botService, accountService) - modelsHandler := handlers.NewModelsHandler(logger.L, modelsService, settingsService) - chatService := conversation.NewService(logger.L, queries) - routeService := route.NewService(logger.L, queries, chatService) - messageEvents := event.NewHub() - messageService := message.NewService(logger.L, queries, messageEvents) - memoryHandler := handlers.NewMemoryHandler(logger.L, memoryService, chatService, accountService) - channelIdentitySvc := identities.NewService(logger.L, queries) - preauthService := preauth.NewService(queries) - preauthHandler := handlers.NewPreauthHandler(preauthService, botService, accountService) - bindService := bind.NewService(logger.L, conn, queries) - bindHandler := handlers.NewBindHandler(logger.L, bindService) - mcpConnectionsService := mcp.NewConnectionService(logger.L, queries) - mcpHandler := handlers.NewMCPHandler(logger.L, mcpConnectionsService, botService, accountService) - chatResolver = flow.NewResolver(logger.L, modelsService, queries, memoryService, chatService, messageService, settingsService, mcpConnectionsService, cfg.AgentGateway.BaseURL(), 120*time.Second) - chatResolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler}) - embeddingsHandler := handlers.NewEmbeddingsHandler(logger.L, modelsService, queries) - swaggerHandler := handlers.NewSwaggerHandler(logger.L) - conversationHandler := handlers.NewMessageHandler(logger.L, chatResolver, chatService, messageService, botService, accountService, channelIdentitySvc, messageEvents) - channelRegistry := channel.NewRegistry() - routeHub := local.NewRouteHub() - channelRegistry.MustRegister(telegram.NewTelegramAdapter(logger.L)) - channelRegistry.MustRegister(feishu.NewFeishuAdapter(logger.L)) - channelRegistry.MustRegister(local.NewCLIAdapter(routeHub)) - channelRegistry.MustRegister(local.NewWebAdapter(routeHub)) - channelService := channel.NewService(queries, channelRegistry) - channelRouter := router.NewChannelInboundProcessor(logger.L, channelRegistry, routeService, messageService, chatResolver, channelIdentitySvc, botService, policyService, preauthService, bindService, cfg.Auth.JWTSecret, 5*time.Minute) - channelManager := channel.NewManager(logger.L, channelRegistry, channelService, channelRouter) +func provideMemoryService(log *slog.Logger, llm memory.LLM, embedder embeddings.Embedder, store *memory.QdrantStore, resolver *embeddings.Resolver, bm25 *memory.BM25Indexer, setup embeddingSetup) *memory.Service { + return memory.NewService(log, llm, embedder, store, resolver, bm25, setup.TextModel.ModelID, setup.MultimodalModel.ModelID) +} + +// --------------------------------------------------------------------------- +// domain service providers (interface adapters) +// --------------------------------------------------------------------------- + +func provideRouteService(log *slog.Logger, queries *dbsqlc.Queries, chatService *conversation.Service) *route.DBService { + return route.NewService(log, queries, chatService) +} + +func provideMessageService(log *slog.Logger, queries *dbsqlc.Queries, hub *event.Hub) *message.DBService { + return message.NewService(log, queries, hub) +} + +func provideScheduleTriggerer(resolver *flow.Resolver) schedule.Triggerer { + return flow.NewScheduleGateway(resolver) +} + +// --------------------------------------------------------------------------- +// conversation flow +// --------------------------------------------------------------------------- + +func provideChatResolver(log *slog.Logger, cfg config.Config, modelsService *models.Service, queries *dbsqlc.Queries, memoryService *memory.Service, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, mcpConnService *mcp.ConnectionService, containerdHandler *handlers.ContainerdHandler) *flow.Resolver { + resolver := flow.NewResolver(log, modelsService, queries, memoryService, chatService, msgService, settingsService, mcpConnService, cfg.AgentGateway.BaseURL(), 120*time.Second) + resolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler}) + return resolver +} + +// --------------------------------------------------------------------------- +// channel providers +// --------------------------------------------------------------------------- + +func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub) *channel.Registry { + registry := channel.NewRegistry() + registry.MustRegister(telegram.NewTelegramAdapter(log)) + registry.MustRegister(feishu.NewFeishuAdapter(log)) + registry.MustRegister(local.NewCLIAdapter(hub)) + registry.MustRegister(local.NewWebAdapter(hub)) + return registry +} + +func provideChannelRouter(log *slog.Logger, registry *channel.Registry, routeService *route.DBService, msgService *message.DBService, resolver *flow.Resolver, identityService *identities.Service, botService *bots.Service, policyService *policy.Service, preauthService *preauth.Service, bindService *bind.Service, rc *boot.RuntimeConfig) *router.ChannelInboundProcessor { + return router.NewChannelInboundProcessor(log, registry, routeService, msgService, resolver, identityService, botService, policyService, preauthService, bindService, rc.JwtSecret, 5*time.Minute) +} + +func provideChannelManager(log *slog.Logger, registry *channel.Registry, channelService *channel.Service, channelRouter *router.ChannelInboundProcessor) *channel.Manager { + mgr := channel.NewManager(log, registry, channelService, channelRouter) if mw := channelRouter.IdentityMiddleware(); mw != nil { - channelManager.Use(mw) + mgr.Use(mw) } - channelManager.Start(ctx) - channelHandler := handlers.NewChannelHandler(channelService, channelRegistry) - usersHandler := handlers.NewUsersHandler(logger.L, accountService, channelIdentitySvc, botService, routeService, channelService, channelManager, channelRegistry) - cliHandler := handlers.NewLocalChannelHandler(local.CLIType, channelManager, channelService, chatService, routeHub, botService, accountService) - webHandler := handlers.NewLocalChannelHandler(local.WebType, channelManager, channelService, chatService, routeHub, botService, accountService) - scheduleGateway := flow.NewScheduleGateway(chatResolver) - scheduleService := schedule.NewService(logger.L, queries, scheduleGateway, cfg.Auth.JWTSecret) - if err := scheduleService.Bootstrap(ctx); err != nil { - logger.Error("schedule bootstrap", slog.Any("error", err)) - os.Exit(1) - } - scheduleHandler := handlers.NewScheduleHandler(logger.L, scheduleService, botService, accountService) - subagentService := subagent.NewService(logger.L, queries) - subagentHandler := handlers.NewSubagentHandler(logger.L, subagentService, botService, accountService) - messageToolExecutor := mcpmessage.NewExecutor(logger.L, channelManager, channelRegistry) - directoryToolExecutor := mcpdirectory.NewExecutor(logger.L, channelRegistry, channelService, channelRegistry) - scheduleToolExecutor := mcpschedule.NewExecutor(logger.L, scheduleService) - memoryToolExecutor := mcpmemory.NewExecutor(logger.L, memoryService, chatService, accountService) + return mgr +} + +// --------------------------------------------------------------------------- +// containerd handler & tool gateway +// --------------------------------------------------------------------------- + +func provideContainerdHandler(log *slog.Logger, service ctr.Service, cfg config.Config, botService *bots.Service, accountService *accounts.Service, policyService *policy.Service, queries *dbsqlc.Queries) *handlers.ContainerdHandler { + return handlers.NewContainerdHandler(log, service, cfg.MCP, cfg.Containerd.Namespace, botService, accountService, policyService, queries) +} + +func provideToolGatewayService(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, channelService *channel.Service, scheduleService *schedule.Service, memoryService *memory.Service, chatService *conversation.Service, accountService *accounts.Service, manager *mcp.Manager, containerdHandler *handlers.ContainerdHandler, mcpConnService *mcp.ConnectionService) *mcp.ToolGatewayService { + messageExec := mcpmessage.NewExecutor(log, channelManager, registry) + directoryExec := mcpdirectory.NewExecutor(log, registry, channelService, registry) + scheduleExec := mcpschedule.NewExecutor(log, scheduleService) + memoryExec := mcpmemory.NewExecutor(log, memoryService, chatService, accountService) execWorkDir := cfg.MCP.DataMount if strings.TrimSpace(execWorkDir) == "" { execWorkDir = config.DefaultDataMount } - fsToolExecutor := mcpcontainer.NewExecutor(logger.L, manager, execWorkDir) - federationGateway := handlers.NewMCPFederationGateway(logger.L, containerdHandler) - federatedToolSource := mcpfederation.NewSource(logger.L, federationGateway, mcpConnectionsService) - toolGatewayService := mcp.NewToolGatewayService( - logger.L, - []mcp.ToolExecutor{ - messageToolExecutor, - directoryToolExecutor, - scheduleToolExecutor, - memoryToolExecutor, - fsToolExecutor, - }, - []mcp.ToolSource{ - federatedToolSource, - }, - ) - containerdHandler.SetToolGatewayService(toolGatewayService) - go containerdHandler.ReconcileContainers(ctx) - srv := server.NewServer(logger.L, addr, cfg.Auth.JWTSecret, pingHandler, authHandler, memoryHandler, embeddingsHandler, conversationHandler, swaggerHandler, providersHandler, modelsHandler, settingsHandler, preauthHandler, bindHandler, scheduleHandler, subagentHandler, containerdHandler, channelHandler, usersHandler, mcpHandler, cliHandler, webHandler) + fsExec := mcpcontainer.NewExecutor(log, manager, execWorkDir) - if err := srv.Start(); err != nil { - logger.Error("server failed", slog.Any("error", err)) - os.Exit(1) - } + fedGateway := handlers.NewMCPFederationGateway(log, containerdHandler) + fedSource := mcpfederation.NewSource(log, fedGateway, mcpConnService) + + svc := mcp.NewToolGatewayService( + log, + []mcp.ToolExecutor{messageExec, directoryExec, scheduleExec, memoryExec, fsExec}, + []mcp.ToolSource{fedSource}, + ) + containerdHandler.SetToolGatewayService(svc) + return svc } +// --------------------------------------------------------------------------- +// handler providers (interface adaptation / config extraction) +// --------------------------------------------------------------------------- + +func provideAuthHandler(log *slog.Logger, accountService *accounts.Service, rc *boot.RuntimeConfig) *handlers.AuthHandler { + return handlers.NewAuthHandler(log, accountService, rc.JwtSecret, rc.JwtExpiresIn) +} + +func provideMessageHandler(log *slog.Logger, resolver *flow.Resolver, chatService *conversation.Service, msgService *message.DBService, botService *bots.Service, accountService *accounts.Service, identityService *identities.Service, hub *event.Hub) *handlers.MessageHandler { + return handlers.NewMessageHandler(log, resolver, chatService, msgService, botService, accountService, identityService, hub) +} + +func provideUsersHandler(log *slog.Logger, accountService *accounts.Service, identityService *identities.Service, botService *bots.Service, routeService *route.DBService, channelService *channel.Service, channelManager *channel.Manager, registry *channel.Registry) *handlers.UsersHandler { + return handlers.NewUsersHandler(log, accountService, identityService, botService, routeService, channelService, channelManager, registry) +} + +func provideCLIHandler(channelManager *channel.Manager, channelService *channel.Service, chatService *conversation.Service, hub *local.RouteHub, botService *bots.Service, accountService *accounts.Service) *handlers.LocalChannelHandler { + return handlers.NewLocalChannelHandler(local.CLIType, channelManager, channelService, chatService, hub, botService, accountService) +} + +func provideWebHandler(channelManager *channel.Manager, channelService *channel.Service, chatService *conversation.Service, hub *local.RouteHub, botService *bots.Service, accountService *accounts.Service) *handlers.LocalChannelHandler { + return handlers.NewLocalChannelHandler(local.WebType, channelManager, channelService, chatService, hub, botService, accountService) +} + +// --------------------------------------------------------------------------- +// server +// --------------------------------------------------------------------------- + +type serverParams struct { + fx.In + + Logger *slog.Logger + RuntimeConfig *boot.RuntimeConfig + Config config.Config + ServerHandlers []server.Handler `group:"server_handlers"` + ContainerdHandler *handlers.ContainerdHandler +} + +func provideServer(params serverParams) *server.Server { + allHandlers := make([]server.Handler, 0, len(params.ServerHandlers)+1) + allHandlers = append(allHandlers, params.ServerHandlers...) + allHandlers = append(allHandlers, params.ContainerdHandler) + return server.NewServer(params.Logger, params.RuntimeConfig.ServerAddr, params.Config.Auth.JWTSecret, allHandlers...) +} + +// --------------------------------------------------------------------------- +// lifecycle hooks +// --------------------------------------------------------------------------- + +func startMemoryWarmup(lc fx.Lifecycle, memoryService *memory.Service, logger *slog.Logger) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + go func() { + if err := memoryService.WarmupBM25(context.Background(), 200); err != nil { + logger.Warn("bm25 warmup failed", slog.Any("error", err)) + } + }() + return nil + }, + }) +} + +func startScheduleService(lc fx.Lifecycle, scheduleService *schedule.Service) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return scheduleService.Bootstrap(ctx) + }, + }) +} + +func startChannelManager(lc fx.Lifecycle, channelManager *channel.Manager) { + ctx, cancel := context.WithCancel(context.Background()) + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + channelManager.Start(ctx) + return nil + }, + OnStop: func(stopCtx context.Context) error { + cancel() + return channelManager.Shutdown(stopCtx) + }, + }) +} + +func startContainerReconciliation(lc fx.Lifecycle, containerdHandler *handlers.ContainerdHandler, _ *mcp.ToolGatewayService) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + go containerdHandler.ReconcileContainers(ctx) + return nil + }, + }) +} + +func startServer(lc fx.Lifecycle, logger *slog.Logger, srv *server.Server, shutdowner fx.Shutdowner, cfg config.Config, queries *dbsqlc.Queries, botService *bots.Service, containerdHandler *handlers.ContainerdHandler) { + fmt.Printf("Starting Memoh Agent %s\n", version.GetInfo()) + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + if err := ensureAdminUser(ctx, logger, queries, cfg); err != nil { + return err + } + botService.SetContainerLifecycle(containerdHandler) + + go func() { + if err := srv.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Error("server failed", slog.Any("error", err)) + _ = shutdowner.Shutdown() + } + }() + return nil + }, + OnStop: func(ctx context.Context) error { + if err := srv.Stop(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { + return fmt.Errorf("server stop: %w", err) + } + return nil + }, + }) +} + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + func buildTextEmbedder(resolver *embeddings.Resolver, textModel models.GetResponse, hasModels bool, log *slog.Logger) embeddings.Embedder { if !hasModels { return nil @@ -249,40 +506,6 @@ func buildTextEmbedder(resolver *embeddings.Resolver, textModel models.GetRespon } } -func buildQdrantStore(log *slog.Logger, cfg config.QdrantConfig, vectors map[string]int, hasModels bool, textDims int) *memory.QdrantStore { - timeout := time.Duration(cfg.TimeoutSeconds) * time.Second - if hasModels && len(vectors) > 0 { - store, err := memory.NewQdrantStoreWithVectors( - log, - cfg.BaseURL, - cfg.APIKey, - cfg.Collection, - vectors, - "sparse_hash", - timeout, - ) - if err != nil { - log.Error("qdrant named vectors init", slog.Any("error", err)) - os.Exit(1) - } - return store - } - store, err := memory.NewQdrantStore( - log, - cfg.BaseURL, - cfg.APIKey, - cfg.Collection, - textDims, - "sparse_hash", - timeout, - ) - if err != nil { - log.Error("qdrant init", slog.Any("error", err)) - os.Exit(1) - } - return store -} - func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Queries, cfg config.Config) error { if queries == nil { return fmt.Errorf("db queries not configured") @@ -343,6 +566,10 @@ func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Quer return nil } +// --------------------------------------------------------------------------- +// lazy LLM client +// --------------------------------------------------------------------------- + type lazyLLMClient struct { modelsService *models.Service queries *dbsqlc.Queries diff --git a/docker-compose.yml b/docker-compose.yml index 28f554ca..189513b8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,7 +49,7 @@ services: - /run/containerd/containerd.sock:/run/containerd/containerd.sock - /var/lib/containerd:/var/lib/containerd - server_cni_state:/var/lib/cni - - ${MEMOH_DATA_ROOT:-/opt/memoh/data}:${MEMOH_DATA_ROOT:-/opt/memoh/data} + - /app/data:/app/data cap_add: - SYS_ADMIN - NET_ADMIN @@ -58,6 +58,12 @@ services: - apparmor:unconfined ports: - "8080:8080" + healthcheck: + test: ["CMD-SHELL", "netstat -tln | grep :8080 || exit 1 "] + interval: 10s + timeout: 5s + retries: 5 + start_period: 30s depends_on: postgres: condition: service_healthy @@ -76,8 +82,15 @@ services: - ./config.toml:/config.toml:ro ports: - "8081:8081" + healthcheck: + test: ["CMD-SHELL", "netstat -tln | grep :8081 || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 20s depends_on: - - server + server: + condition: service_healthy restart: unless-stopped networks: - memoh-network @@ -92,9 +105,17 @@ services: container_name: memoh-web ports: - "80:80" + healthcheck: + test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:80 || exit 1"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 10s depends_on: - - server - - agent + server: + condition: service_healthy + agent: + condition: service_healthy restart: unless-stopped networks: - memoh-network diff --git a/go.mod b/go.mod index 2005a394..a3f6bbe3 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/containerd/containerd/api v1.10.0 github.com/containerd/containerd/v2 v2.2.1 github.com/containerd/errdefs v1.0.0 + github.com/containerd/go-cni v1.1.13 github.com/containerd/platforms v1.0.0-rc.2 github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 github.com/golang-jwt/jwt/v5 v5.3.1 @@ -24,7 +25,10 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.11.1 github.com/swaggo/swag v1.16.6 + go.uber.org/fx v1.24.0 golang.org/x/crypto v0.47.0 + google.golang.org/grpc v1.78.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -45,7 +49,6 @@ require ( github.com/containerd/continuity v0.4.5 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/fifo v1.1.0 // indirect - github.com/containerd/go-cni v1.1.13 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/plugin v1.0.0 // indirect github.com/containerd/ttrpc v1.2.7 // indirect @@ -103,6 +106,9 @@ require ( go.opentelemetry.io/otel v1.39.0 // indirect go.opentelemetry.io/otel/metric v1.39.0 // indirect go.opentelemetry.io/otel/trace v1.39.0 // indirect + go.uber.org/dig v1.19.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + go.uber.org/zap v1.26.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/mod v0.32.0 // indirect golang.org/x/net v0.49.0 // indirect @@ -113,7 +119,5 @@ require ( golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.41.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect - google.golang.org/grpc v1.78.0 // indirect google.golang.org/protobuf v1.36.11 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8864560c..6283effb 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ github.com/go-openapi/testify/enable/yaml/v2 v2.0.2 h1:0+Y41Pz1NkbTHz8NngxTuAXxE github.com/go-openapi/testify/enable/yaml/v2 v2.0.2/go.mod h1:kme83333GCtJQHXQ8UKX3IBZu6z8T5Dvy5+CW3NLUUg= github.com/go-openapi/testify/v2 v2.0.2 h1:X999g3jeLcoY8qctY/c/Z8iBHTbwLz7R2WXd6Ub6wls= github.com/go-openapi/testify/v2 v2.0.2/go.mod h1:HCPmvFFnheKK2BuwSA0TbbdxJ3I16pjwMkYkP4Ywn54= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 h1:wG8n/XJQ07TmjbITcGiUaOtXxdrINDz1b0J1w0SzqDc= github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1/go.mod h1:A2S0CWkNylc2phvKXWBBdD3K0iGnDBGbzRpISP2zBl8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -142,6 +144,8 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/jsonschema-go v0.4.2 h1:tmrUohrwoLZZS/P3x7ex0WAVknEkBZM46iALbcqoRA8= github.com/google/jsonschema-go v0.4.2/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= +github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k= +github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -198,6 +202,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/onsi/ginkgo/v2 v2.20.1 h1:YlVIbqct+ZmnEph770q9Q7NVAz4wwIiVNahee6JyUzo= +github.com/onsi/ginkgo/v2 v2.20.1/go.mod h1:lG9ey2Z29hR41WMVthyJBGUBcBhGOtoPF2VFMvBXFCI= +github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k= +github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= @@ -229,6 +237,8 @@ github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC4 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -242,6 +252,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= +github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zdEY= +github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -262,6 +274,16 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4= +go.uber.org/dig v1.19.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= +go.uber.org/fx v1.24.0 h1:wE8mruvpg2kiiL1Vqd0CC+tr0/24XIB10Iwp2lLWzkg= +go.uber.org/fx v1.24.0/go.mod h1:AmDeGyS+ZARGKM4tlH4FY2Jr63VjbEDJHtqXTGP5hbo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -270,6 +292,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo= +golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/internal/boot/runtime.go b/internal/boot/runtime.go new file mode 100644 index 00000000..0ef1acbc --- /dev/null +++ b/internal/boot/runtime.go @@ -0,0 +1,45 @@ +package boot + +import ( + "errors" + "fmt" + "os" + "strings" + "time" + + "github.com/memohai/memoh/internal/config" +) + +type RuntimeConfig struct { + JwtSecret string + JwtExpiresIn time.Duration + ServerAddr string + ContainerdSocketPath string +} + +func ProvideRuntimeConfig(cfg config.Config) (*RuntimeConfig, error) { + if strings.TrimSpace(cfg.Auth.JWTSecret) == "" { + return nil, errors.New("jwt secret is required") + } + + jwtExpiresIn, err := time.ParseDuration(cfg.Auth.JWTExpiresIn) + if err != nil { + return nil, fmt.Errorf("invalid jwt expires in: %w", err) + } + + ret := &RuntimeConfig{ + JwtSecret: cfg.Auth.JWTSecret, + JwtExpiresIn: jwtExpiresIn, + ServerAddr: cfg.Server.Addr, + ContainerdSocketPath: cfg.Containerd.SocketPath, + } + + if value := os.Getenv("HTTP_ADDR"); value != "" { + ret.ServerAddr = value + } + + if value := os.Getenv("CONTAINERD_SOCKET"); value != "" { + ret.ContainerdSocketPath = value + } + return ret, nil +} diff --git a/internal/channel/adapters/feishu/feishu.go b/internal/channel/adapters/feishu/feishu.go index cca6d370..d07be6a8 100644 --- a/internal/channel/adapters/feishu/feishu.go +++ b/internal/channel/adapters/feishu/feishu.go @@ -279,6 +279,9 @@ func (a *FeishuAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig, feishuCfg.EncryptKey, ) eventDispatcher.OnP2MessageReceiveV1(func(_ context.Context, event *larkim.P2MessageReceiveV1) error { + if connCtx.Err() != nil { + return nil + } msg := extractFeishuInbound(event) text := msg.Message.PlainText() rawMessageID := "" diff --git a/internal/channel/adapters/telegram/telegram.go b/internal/channel/adapters/telegram/telegram.go index 9f76b655..1c1544de 100644 --- a/internal/channel/adapters/telegram/telegram.go +++ b/internal/channel/adapters/telegram/telegram.go @@ -165,10 +165,6 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig for { select { case <-connCtx.Done(): - if a.logger != nil { - a.logger.Info("stop", slog.String("config_id", cfg.ID)) - } - bot.StopReceivingUpdates() return case update, ok := <-updates: if !ok { @@ -251,12 +247,19 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig } }() - stop := func(context.Context) error { + stop := func(_ context.Context) error { if a.logger != nil { a.logger.Info("stop", slog.String("config_id", cfg.ID)) } - cancel() bot.StopReceivingUpdates() + cancel() + // Drain remaining updates so the library's polling goroutine can + // finish writing and exit. Without this, the in-flight long-poll + // HTTP request keeps the old getUpdates session alive, causing + // "Conflict: terminated by other getUpdates request" when a new + // connection starts with the same bot token. + for range updates { + } return nil } return channel.NewConnection(cfg, stop), nil diff --git a/internal/channel/manager_core_test.go b/internal/channel/manager_core_test.go deleted file mode 100644 index 9283bcd6..00000000 --- a/internal/channel/manager_core_test.go +++ /dev/null @@ -1,128 +0,0 @@ -package channel - -import ( - "context" - "fmt" - "log/slog" - "testing" -) - -// mockAdapter 专门用于 Manager 路由测试 -type mockAdapter struct { - sentMessages []OutboundMessage -} - -func (m *mockAdapter) Type() ChannelType { return ChannelType("test") } -func (m *mockAdapter) Descriptor() Descriptor { - return Descriptor{Type: ChannelType("test"), DisplayName: "Test", Capabilities: ChannelCapabilities{Text: true}} -} -func (m *mockAdapter) Send(ctx context.Context, cfg ChannelConfig, msg OutboundMessage) error { - m.sentMessages = append(m.sentMessages, msg) - return nil -} - -type fakeInboundProcessor struct { - resp *OutboundMessage - err error - gotCfg ChannelConfig - gotMsg InboundMessage -} - -func (f *fakeInboundProcessor) HandleInbound(ctx context.Context, cfg ChannelConfig, msg InboundMessage, sender ReplySender) error { - f.gotCfg = cfg - f.gotMsg = msg - if f.err != nil { - return f.err - } - if f.resp == nil { - return nil - } - if sender == nil { - return fmt.Errorf("sender missing") - } - return sender.Send(ctx, *f.resp) -} - -func TestManager_HandleInbound_CoreLogic(t *testing.T) { - logger := slog.Default() - - t.Run("返回回复_发送成功", func(t *testing.T) { - processor := &fakeInboundProcessor{ - resp: &OutboundMessage{ - Target: "target-id", - Message: Message{ - Text: "AI回复内容", - }, - }, - } - - reg := NewRegistry() - m := NewManager(logger, reg, &fakeConfigStore{}, processor) - adapter := &mockAdapter{} - m.RegisterAdapter(adapter) - - cfg := ChannelConfig{ID: "bot-1", BotID: "bot-1", ChannelType: ChannelType("test")} - msg := InboundMessage{ - Channel: ChannelType("test"), - Message: Message{Text: "你好"}, - ReplyTarget: "target-id", - Conversation: Conversation{ - ID: "chat-1", - Type: "p2p", - }, - } - - err := m.handleInbound(context.Background(), cfg, msg) - if err != nil { - t.Fatalf("不应报错: %v", err) - } - - // 验证: 是否正确调用了 Adapter 发送回复 - if len(adapter.sentMessages) != 1 { - t.Fatalf("应该发送 1 条回复,实际发送: %d", len(adapter.sentMessages)) - } - if adapter.sentMessages[0].Message.PlainText() != "AI回复内容" { - t.Errorf("回复内容错误: %s", adapter.sentMessages[0].Message.PlainText()) - } - if adapter.sentMessages[0].Target != "target-id" { - t.Errorf("回复目标错误: %s", adapter.sentMessages[0].Target) - } - }) - - t.Run("无回复_不发送", func(t *testing.T) { - processor := &fakeInboundProcessor{resp: nil} - reg := NewRegistry() - m := NewManager(logger, reg, &fakeConfigStore{}, processor) - adapter := &mockAdapter{} - m.RegisterAdapter(adapter) - - cfg := ChannelConfig{ID: "bot-1", BotID: "bot-1", ChannelType: ChannelType("test")} - msg := InboundMessage{ - Channel: ChannelType("test"), - Message: Message{Text: "你好"}, - ReplyTarget: "target-id", - } - - err := m.handleInbound(context.Background(), cfg, msg) - if err != nil { - t.Fatalf("不应报错: %v", err) - } - - if len(adapter.sentMessages) != 0 { - t.Errorf("不应发送回复,实际发送: %+v", adapter.sentMessages) - } - }) - - t.Run("处理失败_返回错误", func(t *testing.T) { - processor := &fakeInboundProcessor{err: context.Canceled} - reg := NewRegistry() - m := NewManager(logger, reg, &fakeConfigStore{}, processor) - cfg := ChannelConfig{ID: "bot-1"} - msg := InboundMessage{Message: Message{Text: " "}} // 空格消息 - - err := m.handleInbound(context.Background(), cfg, msg) - if err == nil { - t.Errorf("应返回处理错误") - } - }) -} diff --git a/internal/containerd/service.go b/internal/containerd/service.go index 44ee8633..c825905a 100644 --- a/internal/containerd/service.go +++ b/internal/containerd/service.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd/v2/pkg/oci" "github.com/containerd/errdefs" "github.com/containerd/platforms" + "github.com/memohai/memoh/internal/config" "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" "github.com/opencontainers/runtime-spec/specs-go" @@ -147,7 +148,8 @@ type DefaultService struct { logger *slog.Logger } -func NewDefaultService(log *slog.Logger, client *containerd.Client, namespace string) *DefaultService { +func NewDefaultService(log *slog.Logger, client *containerd.Client, cfg config.Config) *DefaultService { + namespace := cfg.Containerd.Namespace if namespace == "" { namespace = DefaultNamespace } diff --git a/internal/mcp/manager.go b/internal/mcp/manager.go index 0a7f2256..9ad442b8 100644 --- a/internal/mcp/manager.go +++ b/internal/mcp/manager.go @@ -60,7 +60,7 @@ type Manager struct { logger *slog.Logger } -func NewManager(log *slog.Logger, service ctr.Service, cfg config.MCPConfig, namespace string) *Manager { +func NewManager(log *slog.Logger, service ctr.Service, cfg config.MCPConfig, namespace string, conn *pgxpool.Pool) *Manager { if namespace == "" { namespace = config.DefaultNamespace } @@ -68,6 +68,8 @@ func NewManager(log *slog.Logger, service ctr.Service, cfg config.MCPConfig, nam service: service, cfg: cfg, namespace: namespace, + db: conn, + queries: dbsqlc.New(conn), logger: log.With(slog.String("component", "mcp")), containerID: func(botID string) string { return ContainerPrefix + botID @@ -75,12 +77,6 @@ func NewManager(log *slog.Logger, service ctr.Service, cfg config.MCPConfig, nam } } -func (m *Manager) WithDB(db *pgxpool.Pool) *Manager { - m.db = db - m.queries = dbsqlc.New(db) - return m -} - func (m *Manager) Init(ctx context.Context) error { image := DefaultImageRef diff --git a/internal/router/identity.go b/internal/router/identity.go index 399fda71..ae1995e4 100644 --- a/internal/router/identity.go +++ b/internal/router/identity.go @@ -494,7 +494,7 @@ func (r *IdentityResolver) resolveDisplayNameFromDirectory(ctx context.Context, if ctx == nil { ctx = context.Background() } - lookupCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + lookupCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() entry, err := directoryAdapter.ResolveEntry(lookupCtx, cfg, subjectID, channel.DirectoryEntryUser) if err != nil { diff --git a/internal/schedule/service.go b/internal/schedule/service.go index b7843723..c9cd2e43 100644 --- a/internal/schedule/service.go +++ b/internal/schedule/service.go @@ -14,6 +14,7 @@ import ( "github.com/robfig/cron/v3" "github.com/memohai/memoh/internal/auth" + "github.com/memohai/memoh/internal/boot" "github.com/memohai/memoh/internal/db" "github.com/memohai/memoh/internal/db/sqlc" ) @@ -29,7 +30,7 @@ type Service struct { jobs map[string]cron.EntryID } -func NewService(log *slog.Logger, queries *sqlc.Queries, triggerer Triggerer, jwtSecret string) *Service { +func NewService(log *slog.Logger, queries *sqlc.Queries, triggerer Triggerer, runtimeConfig *boot.RuntimeConfig) *Service { parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) c := cron.New(cron.WithParser(parser)) service := &Service{ @@ -37,7 +38,7 @@ func NewService(log *slog.Logger, queries *sqlc.Queries, triggerer Triggerer, jw cron: c, parser: parser, triggerer: triggerer, - jwtSecret: jwtSecret, + jwtSecret: runtimeConfig.JwtSecret, logger: log.With(slog.String("service", "schedule")), jobs: map[string]cron.EntryID{}, } diff --git a/internal/server/server.go b/internal/server/server.go index 8d1b64c6..f2e64937 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1,6 +1,7 @@ package server import ( + "context" "log/slog" "strings" @@ -8,7 +9,6 @@ import ( "github.com/labstack/echo/v4/middleware" "github.com/memohai/memoh/internal/auth" - "github.com/memohai/memoh/internal/handlers" ) type Server struct { @@ -17,7 +17,13 @@ type Server struct { logger *slog.Logger } -func NewServer(log *slog.Logger, addr string, jwtSecret string, pingHandler *handlers.PingHandler, authHandler *handlers.AuthHandler, memoryHandler *handlers.MemoryHandler, embeddingsHandler *handlers.EmbeddingsHandler, conversationHandler *handlers.MessageHandler, swaggerHandler *handlers.SwaggerHandler, providersHandler *handlers.ProvidersHandler, modelsHandler *handlers.ModelsHandler, settingsHandler *handlers.SettingsHandler, preauthHandler *handlers.PreauthHandler, bindHandler *handlers.BindHandler, scheduleHandler *handlers.ScheduleHandler, subagentHandler *handlers.SubagentHandler, containerdHandler *handlers.ContainerdHandler, channelHandler *handlers.ChannelHandler, usersHandler *handlers.UsersHandler, mcpHandler *handlers.MCPHandler, cliHandler *handlers.LocalChannelHandler, webHandler *handlers.LocalChannelHandler) *Server { +type Handler interface { + Register(e *echo.Echo) +} + +func NewServer(log *slog.Logger, addr string, jwtSecret string, + handlers ...Handler, +) *Server { if addr == "" { addr = ":8080" } @@ -51,62 +57,10 @@ func NewServer(log *slog.Logger, addr string, jwtSecret string, pingHandler *han return false })) - if pingHandler != nil { - pingHandler.Register(e) - } - if authHandler != nil { - authHandler.Register(e) - } - if memoryHandler != nil { - memoryHandler.Register(e) - } - if embeddingsHandler != nil { - embeddingsHandler.Register(e) - } - if conversationHandler != nil { - conversationHandler.Register(e) - } - if swaggerHandler != nil { - swaggerHandler.Register(e) - } - if settingsHandler != nil { - settingsHandler.Register(e) - } - if preauthHandler != nil { - preauthHandler.Register(e) - } - if bindHandler != nil { - bindHandler.Register(e) - } - if scheduleHandler != nil { - scheduleHandler.Register(e) - } - if subagentHandler != nil { - subagentHandler.Register(e) - } - if providersHandler != nil { - providersHandler.Register(e) - } - if modelsHandler != nil { - modelsHandler.Register(e) - } - if containerdHandler != nil { - containerdHandler.Register(e) - } - if channelHandler != nil { - channelHandler.Register(e) - } - if usersHandler != nil { - usersHandler.Register(e) - } - if mcpHandler != nil { - mcpHandler.Register(e) - } - if cliHandler != nil { - cliHandler.Register(e) - } - if webHandler != nil { - webHandler.Register(e) + for _, h := range handlers { + if h != nil { + h.Register(e) + } } return &Server{ @@ -119,3 +73,7 @@ func NewServer(log *slog.Logger, addr string, jwtSecret string, pingHandler *han func (s *Server) Start() error { return s.echo.Start(s.addr) } + +func (s *Server) Stop(ctx context.Context) error { + return s.echo.Shutdown(ctx) +}