From ffda558d246a21180734e61ebb8ec5efa93b409b Mon Sep 17 00:00:00 2001 From: Ran <16112591+chen-ran@users.noreply.github.com> Date: Tue, 24 Feb 2026 19:39:01 +0800 Subject: [PATCH] feat(memoh): unify embedded runtime serving and release binaries --- .github/workflows/release.yml | 60 +++ agent/src/index.ts | 4 +- cmd/memoh/main.go | 48 ++ cmd/memoh/migrate.go | 41 ++ cmd/memoh/serve.go | 744 +++++++++++++++++++++++++++++++ cmd/memoh/version.go | 12 + conf/app.windows.toml | 52 +++ go.mod | 3 + go.sum | 8 + internal/auth/jwt_test.go | 3 +- internal/bun/runtime/manager.go | 255 +++++++++++ internal/embedded/.gitignore | 6 + internal/embedded/assets.go | 46 ++ internal/embedded/bun/.gitignore | 2 + internal/handlers/file_embed.go | 100 +++++ internal/server/server.go | 5 +- mise.toml | 15 + scripts/release.sh | 164 +++++++ 18 files changed, 1564 insertions(+), 4 deletions(-) create mode 100644 cmd/memoh/main.go create mode 100644 cmd/memoh/migrate.go create mode 100644 cmd/memoh/serve.go create mode 100644 cmd/memoh/version.go create mode 100644 conf/app.windows.toml create mode 100644 internal/bun/runtime/manager.go create mode 100644 internal/embedded/.gitignore create mode 100644 internal/embedded/assets.go create mode 100644 internal/embedded/bun/.gitignore create mode 100644 internal/handlers/file_embed.go create mode 100755 scripts/release.sh diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ccce14cd..e735ae3f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -28,6 +28,66 @@ jobs: env: GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}} + build-binaries: + needs: release + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - goos: linux + goarch: amd64 + - goos: linux + goarch: arm64 + - goos: linux + goarch: riscv64 + - goos: darwin + goarch: amd64 + - goos: darwin + goarch: arm64 + - goos: windows + goarch: amd64 + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: pnpm/action-setup@v4 + with: + version: 10 + + - uses: actions/setup-node@v4 + with: + node-version: lts/* + + - uses: oven-sh/setup-bun@v2 + + - uses: actions/setup-go@v5 + with: + go-version-file: go.mod + + - name: Install JS dependencies + run: pnpm install --frozen-lockfile + + - name: Build release binary + env: + TARGET_OS: ${{ matrix.goos }} + TARGET_ARCH: ${{ matrix.goarch }} + VERSION: ${{ github.ref_name }} + COMMIT_HASH: ${{ github.sha }} + OUTPUT_DIR: dist + run: scripts/release.sh + + - name: Upload release assets + uses: softprops/action-gh-release@v2 + with: + files: | + dist/*.tar.gz + dist/*.zip + tag_name: ${{ github.ref_name }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + # # Uncomment the following lines to publish to npm on CI # # - run: pnpm install diff --git a/agent/src/index.ts b/agent/src/index.ts index 8a82c950..d89fc2be 100644 --- a/agent/src/index.ts +++ b/agent/src/index.ts @@ -5,7 +5,9 @@ import { errorMiddleware } from './middlewares/error' import { loadConfig, getBaseUrl as getBaseUrlByConfig } from '@memoh/config' import { AgentAuthContext, AuthFetcher } from '@memoh/agent' -const config = loadConfig('../config.toml') +const configuredPath = process.env.MEMOH_CONFIG_PATH?.trim() || process.env.CONFIG_PATH?.trim() +const configPath = configuredPath && configuredPath.length > 0 ? configuredPath : '../config.toml' +const config = loadConfig(configPath) export const getBaseUrl = () => { return getBaseUrlByConfig(config) diff --git a/cmd/memoh/main.go b/cmd/memoh/main.go new file mode 100644 index 00000000..f4df8a0c --- /dev/null +++ b/cmd/memoh/main.go @@ -0,0 +1,48 @@ +package main + +import ( + "os" + + "github.com/spf13/cobra" +) + +func main() { + rootCmd := &cobra.Command{ + Use: "memoh", + Short: "Memoh unified binary", + RunE: func(cmd *cobra.Command, args []string) error { + runServe() + return nil + }, + } + + rootCmd.AddCommand(&cobra.Command{ + Use: "serve", + Short: "Start the server", + RunE: func(cmd *cobra.Command, args []string) error { + runServe() + return nil + }, + }) + + rootCmd.AddCommand(&cobra.Command{ + Use: "migrate ", + Short: "Run database migrations", + Args: cobra.MinimumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return runMigrate(args) + }, + }) + + rootCmd.AddCommand(&cobra.Command{ + Use: "version", + Short: "Print version information", + RunE: func(cmd *cobra.Command, args []string) error { + return runVersion() + }, + }) + + if err := rootCmd.Execute(); err != nil { + os.Exit(1) + } +} diff --git a/cmd/memoh/migrate.go b/cmd/memoh/migrate.go new file mode 100644 index 00000000..a1778afd --- /dev/null +++ b/cmd/memoh/migrate.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "io/fs" + "log/slog" + + dbembed "github.com/memohai/memoh/db" + "github.com/memohai/memoh/internal/db" + "github.com/memohai/memoh/internal/logger" +) + +func migrationsFS() fs.FS { + sub, err := fs.Sub(dbembed.MigrationsFS, "migrations") + if err != nil { + panic(fmt.Sprintf("embedded migrations: %v", err)) + } + return sub +} + +func runMigrate(args []string) error { + cfg, err := provideConfig() + if err != nil { + return fmt.Errorf("config: %w", err) + } + + logger.Init(cfg.Log.Level, cfg.Log.Format) + log := logger.L + + migrateCmd := args[0] + var migrateArgs []string + if len(args) > 1 { + migrateArgs = args[1:] + } + + if err := db.RunMigrate(log, cfg.Postgres, migrationsFS(), migrateCmd, migrateArgs); err != nil { + log.Error("migration failed", slog.Any("error", err)) + return err + } + return nil +} diff --git a/cmd/memoh/serve.go b/cmd/memoh/serve.go new file mode 100644 index 00000000..65dff89f --- /dev/null +++ b/cmd/memoh/serve.go @@ -0,0 +1,744 @@ +package main + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "strings" + "time" + + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + "go.uber.org/fx" + "go.uber.org/fx/fxevent" + "golang.org/x/crypto/bcrypt" + + "github.com/memohai/memoh/internal/accounts" + "github.com/memohai/memoh/internal/auth" + "github.com/memohai/memoh/internal/bind" + "github.com/memohai/memoh/internal/boot" + "github.com/memohai/memoh/internal/bots" + agentruntime "github.com/memohai/memoh/internal/bun/runtime" + "github.com/memohai/memoh/internal/channel" + "github.com/memohai/memoh/internal/channel/adapters/discord" + "github.com/memohai/memoh/internal/channel/adapters/feishu" + "github.com/memohai/memoh/internal/channel/adapters/local" + "github.com/memohai/memoh/internal/channel/adapters/telegram" + "github.com/memohai/memoh/internal/channel/identities" + "github.com/memohai/memoh/internal/channel/inbound" + "github.com/memohai/memoh/internal/channel/route" + "github.com/memohai/memoh/internal/config" + ctr "github.com/memohai/memoh/internal/containerd" + "github.com/memohai/memoh/internal/conversation" + "github.com/memohai/memoh/internal/conversation/flow" + "github.com/memohai/memoh/internal/db" + dbsqlc "github.com/memohai/memoh/internal/db/sqlc" + "github.com/memohai/memoh/internal/embeddings" + "github.com/memohai/memoh/internal/handlers" + "github.com/memohai/memoh/internal/healthcheck" + channelchecker "github.com/memohai/memoh/internal/healthcheck/checkers/channel" + mcpchecker "github.com/memohai/memoh/internal/healthcheck/checkers/mcp" + "github.com/memohai/memoh/internal/inbox" + "github.com/memohai/memoh/internal/logger" + "github.com/memohai/memoh/internal/mcp" + mcpcontacts "github.com/memohai/memoh/internal/mcp/providers/contacts" + mcpcontainer "github.com/memohai/memoh/internal/mcp/providers/container" + mcpinbox "github.com/memohai/memoh/internal/mcp/providers/inbox" + mcpmemory "github.com/memohai/memoh/internal/mcp/providers/memory" + mcpmessage "github.com/memohai/memoh/internal/mcp/providers/message" + mcpschedule "github.com/memohai/memoh/internal/mcp/providers/schedule" + mcpweb "github.com/memohai/memoh/internal/mcp/providers/web" + mcpfederation "github.com/memohai/memoh/internal/mcp/sources/federation" + "github.com/memohai/memoh/internal/media" + "github.com/memohai/memoh/internal/memory" + "github.com/memohai/memoh/internal/message" + "github.com/memohai/memoh/internal/message/event" + "github.com/memohai/memoh/internal/models" + "github.com/memohai/memoh/internal/policy" + "github.com/memohai/memoh/internal/preauth" + "github.com/memohai/memoh/internal/providers" + "github.com/memohai/memoh/internal/schedule" + "github.com/memohai/memoh/internal/searchproviders" + "github.com/memohai/memoh/internal/server" + "github.com/memohai/memoh/internal/settings" + "github.com/memohai/memoh/internal/storage/providers/containerfs" + "github.com/memohai/memoh/internal/subagent" + "github.com/memohai/memoh/internal/version" +) + +func runServe() { + fx.New( + fx.Provide( + provideConfig, + boot.ProvideRuntimeConfig, + provideLogger, + provideContainerService, + provideDBConn, + provideDBQueries, + provideMCPManager, + provideAgentRuntimeManager, + provideMemoryLLM, + provideEmbeddingsResolver, + provideEmbeddingSetup, + provideTextEmbedderForMemory, + provideQdrantStore, + memory.NewBM25Indexer, + provideMemoryService, + models.NewService, + bots.NewService, + accounts.NewService, + settings.NewService, + providers.NewService, + searchproviders.NewService, + policy.NewService, + preauth.NewService, + mcp.NewConnectionService, + subagent.NewService, + conversation.NewService, + identities.NewService, + bind.NewService, + event.NewHub, + inbox.NewService, + provideRouteService, + provideMessageService, + provideMediaService, + local.NewRouteHub, + provideChannelRegistry, + channel.NewStore, + provideChannelRouter, + provideChannelManager, + provideChannelLifecycleService, + provideChatResolver, + provideScheduleTriggerer, + schedule.NewService, + provideContainerdHandler, + provideToolGatewayService, + provideServerHandler(handlers.NewPingHandler), + provideServerHandler(provideMemohAuthHandler), + provideServerHandler(provideMemoryHandler), + provideServerHandler(handlers.NewEmbeddingsHandler), + provideServerHandler(provideMessageHandler), + provideServerHandler(handlers.NewSwaggerHandler), + provideServerHandler(handlers.NewProvidersHandler), + provideServerHandler(handlers.NewSearchProvidersHandler), + provideServerHandler(handlers.NewModelsHandler), + provideServerHandler(handlers.NewSettingsHandler), + provideServerHandler(handlers.NewPreauthHandler), + provideServerHandler(handlers.NewBindHandler), + provideServerHandler(handlers.NewScheduleHandler), + provideServerHandler(handlers.NewSubagentHandler), + provideServerHandler(handlers.NewChannelHandler), + provideServerHandler(feishu.NewWebhookServerHandler), + provideServerHandler(provideUsersHandler), + provideServerHandler(handlers.NewMCPHandler), + provideServerHandler(handlers.NewInboxHandler), + provideServerHandler(provideCLIHandler), + provideServerHandler(provideWebHandler), + provideServerHandler(handlers.NewEmbeddedWebHandler), + provideServer, + ), + fx.Invoke( + startMemoryWarmup, + startScheduleService, + startChannelManager, + startContainerReconciliation, + startAgentRuntime, + startServer, + ), + fx.WithLogger(func(logger *slog.Logger) fxevent.Logger { + return &fxevent.SlogLogger{Logger: logger.With(slog.String("component", "fx"))} + }), + ).Run() +} + +func provideServerHandler(fn any) any { + return fx.Annotate( + fn, + fx.As(new(server.Handler)), + fx.ResultTags(`group:"server_handlers"`), + ) +} + +func provideConfig() (config.Config, error) { + cfgPath := os.Getenv("CONFIG_PATH") + cfg, err := config.Load(cfgPath) + if err != nil { + return config.Config{}, fmt.Errorf("load config: %w", err) + } + return cfg, nil +} + +func provideLogger(cfg config.Config) *slog.Logger { + logger.Init(cfg.Log.Level, cfg.Log.Format) + return logger.L +} + +func provideContainerService(lc fx.Lifecycle, log *slog.Logger, cfg config.Config, rc *boot.RuntimeConfig) (ctr.Service, error) { + svc, cleanup, err := ctr.ProvideService(context.Background(), log, cfg, rc.ContainerBackend) + if err != nil { + return nil, err + } + lc.Append(fx.Hook{OnStop: func(ctx context.Context) error { cleanup(); return nil }}) + return svc, nil +} + +func provideDBConn(lc fx.Lifecycle, cfg config.Config) (*pgxpool.Pool, error) { + conn, err := db.Open(context.Background(), cfg.Postgres) + if err != nil { + return nil, fmt.Errorf("db connect: %w", err) + } + lc.Append(fx.Hook{OnStop: func(ctx context.Context) error { conn.Close(); return nil }}) + return conn, nil +} + +func provideDBQueries(conn *pgxpool.Pool) *dbsqlc.Queries { return dbsqlc.New(conn) } +func provideMCPManager(log *slog.Logger, service ctr.Service, cfg config.Config, conn *pgxpool.Pool) *mcp.Manager { + return mcp.NewManager(log, service, cfg.MCP, cfg.Containerd.Namespace, conn) +} +func provideAgentRuntimeManager(log *slog.Logger, cfg config.Config) *agentruntime.Manager { + return agentruntime.NewManager(log, cfg) +} +func provideMemoryLLM(modelsService *models.Service, queries *dbsqlc.Queries, log *slog.Logger) memory.LLM { + return &lazyLLMClient{modelsService: modelsService, queries: queries, timeout: 30 * time.Second, logger: log} +} +func provideEmbeddingsResolver(log *slog.Logger, modelsService *models.Service, queries *dbsqlc.Queries) *embeddings.Resolver { + return embeddings.NewResolver(log, modelsService, queries, 10*time.Second) +} + +type embeddingSetup struct { + Vectors map[string]int + TextModel models.GetResponse + MultimodalModel models.GetResponse + HasEmbeddingModels bool +} + +func provideEmbeddingSetup(log *slog.Logger, modelsService *models.Service) (embeddingSetup, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + vectors, textModel, multimodalModel, hasEmbeddingModels, err := embeddings.CollectEmbeddingVectors(ctx, modelsService) + if err != nil { + return embeddingSetup{}, fmt.Errorf("embedding models: %w", err) + } + if hasEmbeddingModels && multimodalModel.ModelID == "" { + log.Warn("No multimodal embedding model configured. Multimodal embedding features will be limited.") + } + return embeddingSetup{Vectors: vectors, TextModel: textModel, MultimodalModel: multimodalModel, HasEmbeddingModels: hasEmbeddingModels}, nil +} +func provideTextEmbedderForMemory(resolver *embeddings.Resolver, setup embeddingSetup, log *slog.Logger) embeddings.Embedder { + return buildTextEmbedder(resolver, setup.TextModel, setup.HasEmbeddingModels, log) +} +func provideQdrantStore(log *slog.Logger, cfg config.Config, setup embeddingSetup) (*memory.QdrantStore, error) { + qcfg := cfg.Qdrant + timeout := time.Duration(qcfg.TimeoutSeconds) * time.Second + if setup.HasEmbeddingModels && len(setup.Vectors) > 0 { + store, err := memory.NewQdrantStoreWithVectors(log, qcfg.BaseURL, qcfg.APIKey, qcfg.Collection, setup.Vectors, "sparse_hash", timeout) + if err != nil { + return nil, fmt.Errorf("qdrant named vectors init: %w", err) + } + return store, nil + } + store, err := memory.NewQdrantStore(log, qcfg.BaseURL, qcfg.APIKey, qcfg.Collection, setup.TextModel.Dimensions, "sparse_hash", timeout) + if err != nil { + return nil, fmt.Errorf("qdrant init: %w", err) + } + return store, nil +} +func provideMemoryService(log *slog.Logger, llm memory.LLM, embedder embeddings.Embedder, store *memory.QdrantStore, resolver *embeddings.Resolver, bm25 *memory.BM25Indexer, setup embeddingSetup) *memory.Service { + return memory.NewService(log, llm, embedder, store, resolver, bm25, setup.TextModel.ModelID, setup.MultimodalModel.ModelID) +} +func provideRouteService(log *slog.Logger, queries *dbsqlc.Queries, chatService *conversation.Service) *route.DBService { + return route.NewService(log, queries, chatService) +} +func provideMessageService(log *slog.Logger, queries *dbsqlc.Queries, hub *event.Hub) *message.DBService { + return message.NewService(log, queries, hub) +} +func provideScheduleTriggerer(resolver *flow.Resolver) schedule.Triggerer { + return flow.NewScheduleGateway(resolver) +} +func provideChatResolver(log *slog.Logger, cfg config.Config, modelsService *models.Service, queries *dbsqlc.Queries, memoryService *memory.Service, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler, inboxService *inbox.Service) *flow.Resolver { + resolver := flow.NewResolver(log, modelsService, queries, memoryService, chatService, msgService, settingsService, cfg.AgentGateway.BaseURL(), 120*time.Second) + resolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler}) + resolver.SetGatewayAssetLoader(&gatewayAssetLoaderAdapter{media: mediaService}) + resolver.SetInboxService(inboxService) + return resolver +} +func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub, mediaService *media.Service) *channel.Registry { + registry := channel.NewRegistry() + tgAdapter := telegram.NewTelegramAdapter(log) + tgAdapter.SetAssetOpener(mediaService) + registry.MustRegister(tgAdapter) + discordAdapter := discord.NewDiscordAdapter(log) + registry.MustRegister(discordAdapter) + feishuAdapter := feishu.NewFeishuAdapter(log) + feishuAdapter.SetAssetOpener(mediaService) + registry.MustRegister(feishuAdapter) + registry.MustRegister(local.NewCLIAdapter(hub)) + registry.MustRegister(local.NewWebAdapter(hub)) + return registry +} +func provideChannelRouter(log *slog.Logger, registry *channel.Registry, hub *local.RouteHub, routeService *route.DBService, msgService *message.DBService, resolver *flow.Resolver, identityService *identities.Service, botService *bots.Service, policyService *policy.Service, preauthService *preauth.Service, bindService *bind.Service, mediaService *media.Service, inboxService *inbox.Service, rc *boot.RuntimeConfig) *inbound.ChannelInboundProcessor { + processor := inbound.NewChannelInboundProcessor(log, registry, routeService, msgService, resolver, identityService, botService, policyService, preauthService, bindService, rc.JwtSecret, 5*time.Minute) + processor.SetMediaService(mediaService) + processor.SetStreamObserver(local.NewRouteHubBroadcaster(hub)) + processor.SetInboxService(inboxService) + return processor +} +func provideChannelManager(log *slog.Logger, registry *channel.Registry, channelStore *channel.Store, channelRouter *inbound.ChannelInboundProcessor) *channel.Manager { + mgr := channel.NewManager(log, registry, channelStore, channelRouter) + if mw := channelRouter.IdentityMiddleware(); mw != nil { + mgr.Use(mw) + } + return mgr +} +func provideChannelLifecycleService(channelStore *channel.Store, channelManager *channel.Manager) *channel.Lifecycle { + return channel.NewLifecycle(channelStore, channelManager) +} +func provideContainerdHandler(log *slog.Logger, service ctr.Service, manager *mcp.Manager, cfg config.Config, rc *boot.RuntimeConfig, botService *bots.Service, accountService *accounts.Service, policyService *policy.Service, queries *dbsqlc.Queries) *handlers.ContainerdHandler { + return handlers.NewContainerdHandler(log, service, manager, cfg.MCP, cfg.Containerd.Namespace, rc.ContainerBackend, botService, accountService, policyService, queries) +} +func provideToolGatewayService(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, routeService *route.DBService, scheduleService *schedule.Service, memoryService *memory.Service, chatService *conversation.Service, accountService *accounts.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *mcp.Manager, containerdHandler *handlers.ContainerdHandler, mcpConnService *mcp.ConnectionService, mediaService *media.Service, inboxService *inbox.Service) *mcp.ToolGatewayService { + var assetResolver mcpmessage.AssetResolver + if mediaService != nil { + assetResolver = &mediaAssetResolverAdapter{media: mediaService} + } + messageExec := mcpmessage.NewExecutor(log, channelManager, channelManager, registry, assetResolver) + contactsExec := mcpcontacts.NewExecutor(log, routeService) + scheduleExec := mcpschedule.NewExecutor(log, scheduleService) + memoryExec := mcpmemory.NewExecutor(log, memoryService, chatService, accountService) + webExec := mcpweb.NewExecutor(log, settingsService, searchProviderService) + inboxExec := mcpinbox.NewExecutor(log, inboxService) + fsExec := mcpcontainer.NewExecutor(log, manager, config.DefaultDataMount) + fedGateway := handlers.NewMCPFederationGateway(log, containerdHandler) + fedSource := mcpfederation.NewSource(log, fedGateway, mcpConnService) + svc := mcp.NewToolGatewayService(log, []mcp.ToolExecutor{messageExec, contactsExec, scheduleExec, memoryExec, webExec, fsExec, inboxExec}, []mcp.ToolSource{fedSource}) + containerdHandler.SetToolGatewayService(svc) + return svc +} +func provideMemoryHandler(log *slog.Logger, service *memory.Service, chatService *conversation.Service, accountService *accounts.Service, cfg config.Config, manager *mcp.Manager) *handlers.MemoryHandler { + h := handlers.NewMemoryHandler(log, service, chatService, accountService) + if manager != nil { + execWorkDir := config.DefaultDataMount + h.SetMemoryFS(memory.NewMemoryFS(log, manager, execWorkDir)) + } + return h +} +func provideAuthHandler(log *slog.Logger, accountService *accounts.Service, rc *boot.RuntimeConfig) *handlers.AuthHandler { + return handlers.NewAuthHandler(log, accountService, rc.JwtSecret, rc.JwtExpiresIn) +} +func provideMemohAuthHandler(log *slog.Logger, accountService *accounts.Service, rc *boot.RuntimeConfig) *memohAuthHandler { + return &memohAuthHandler{inner: handlers.NewAuthHandler(log, accountService, rc.JwtSecret, rc.JwtExpiresIn)} +} +func provideMessageHandler(log *slog.Logger, chatService *conversation.Service, msgService *message.DBService, mediaService *media.Service, botService *bots.Service, accountService *accounts.Service, hub *event.Hub) *handlers.MessageHandler { + h := handlers.NewMessageHandler(log, chatService, msgService, botService, accountService, hub) + h.SetMediaService(mediaService) + return h +} + +type memohAuthHandler struct{ inner *handlers.AuthHandler } + +func (h *memohAuthHandler) Register(e *echo.Echo) { + e.POST("/api/auth/login", h.inner.Login) + e.POST("/api/auth/refresh", h.inner.Refresh) +} +func provideMediaService(log *slog.Logger, cfg config.Config) (*media.Service, error) { + dataRoot := strings.TrimSpace(cfg.MCP.DataRoot) + if dataRoot == "" { + dataRoot = config.DefaultDataRoot + } + provider, err := containerfs.New(dataRoot) + if err != nil { + return nil, fmt.Errorf("init media provider: %w", err) + } + return media.NewService(log, provider), nil +} +func provideUsersHandler(log *slog.Logger, accountService *accounts.Service, identityService *identities.Service, botService *bots.Service, routeService *route.DBService, channelStore *channel.Store, channelLifecycle *channel.Lifecycle, channelManager *channel.Manager, registry *channel.Registry) *handlers.UsersHandler { + return handlers.NewUsersHandler(log, accountService, identityService, botService, routeService, channelStore, channelLifecycle, channelManager, registry) +} +func provideCLIHandler(channelManager *channel.Manager, channelStore *channel.Store, chatService *conversation.Service, hub *local.RouteHub, botService *bots.Service, accountService *accounts.Service) *handlers.LocalChannelHandler { + return handlers.NewLocalChannelHandler(local.CLIType, channelManager, channelStore, chatService, hub, botService, accountService) +} +func provideWebHandler(channelManager *channel.Manager, channelStore *channel.Store, chatService *conversation.Service, hub *local.RouteHub, botService *bots.Service, accountService *accounts.Service) *handlers.LocalChannelHandler { + return handlers.NewLocalChannelHandler(local.WebType, channelManager, channelStore, chatService, hub, botService, accountService) +} + +type serverParams struct { + fx.In + Logger *slog.Logger + RuntimeConfig *boot.RuntimeConfig + Config config.Config + ServerHandlers []server.Handler `group:"server_handlers"` + ContainerdHandler *handlers.ContainerdHandler +} + +type memohServer struct { + echo *echo.Echo + addr string +} + +var ( + memohJWTExactSkipPaths = map[string]struct{}{ + "/": {}, + "/ping": {}, + "/health": {}, + "/api/swagger.json": {}, + "/api/auth/login": {}, + "/logo.png": {}, + "/channels/telegram.webp": {}, + "/channels/feishu.png": {}, + } + memohJWTPrefixSkipPaths = []string{ + "/assets/", + "/api/docs", + "/channels/feishu/webhook/", + } + memohSPABackendPrefixes = []string{ + "/api", + "/auth", + "/channels", + "/containers", + "/inbox", + "/users", + "/bots", + "/models", + "/providers", + "/search_providers", + "/settings", + "/memory", + "/message", + "/mcp", + "/schedule", + "/bind", + "/preauth", + "/subagents", + "/embeddings", + "/ping", + "/health", + } + memohAPIRewriteBypassExact = map[string]struct{}{ + "/api/swagger.json": {}, + } + memohAPIRewriteBypassPrefixes = []string{ + "/api/docs", + "/api/auth/", + } +) + +func (s *memohServer) Start() error { return s.echo.Start(s.addr) } +func (s *memohServer) Stop(ctx context.Context) error { return s.echo.Shutdown(ctx) } + +func provideServer(params serverParams) *memohServer { + allHandlers := make([]server.Handler, 0, len(params.ServerHandlers)+1) + allHandlers = append(allHandlers, params.ServerHandlers...) + allHandlers = append(allHandlers, params.ContainerdHandler) + + addr := params.RuntimeConfig.ServerAddr + if addr == "" { + addr = ":8080" + } + e := echo.New() + e.HideBanner = true + e.Pre(func(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + rewriteAPIPathForMemoh(c.Request()) + return next(c) + } + }) + e.Use(middleware.Recover()) + e.Use(middleware.RequestLoggerWithConfig(middleware.RequestLoggerConfig{ + LogStatus: true, + LogURI: true, + LogMethod: true, + LogValuesFunc: func(c echo.Context, v middleware.RequestLoggerValues) error { + params.Logger.Info("request", + slog.String("method", v.Method), + slog.String("uri", v.URI), + slog.Int("status", v.Status), + slog.Duration("latency", v.Latency), + slog.String("remote_ip", c.RealIP()), + ) + return nil + }, + })) + e.Use(auth.JWTMiddleware(params.Config.Auth.JWTSecret, func(c echo.Context) bool { + return shouldSkipJWTForMemoh(c.Request().URL.Path) + })) + for _, h := range allHandlers { + if h != nil { + h.Register(e) + } + } + return &memohServer{echo: e, addr: addr} +} +func startMemoryWarmup(lc fx.Lifecycle, memoryService *memory.Service, logger *slog.Logger) { + lc.Append(fx.Hook{OnStart: func(ctx context.Context) error { + go func() { + if err := memoryService.WarmupBM25(context.Background(), 200); err != nil { + logger.Warn("bm25 warmup failed", slog.Any("error", err)) + } + }() + return nil + }}) +} +func startScheduleService(lc fx.Lifecycle, scheduleService *schedule.Service) { + lc.Append(fx.Hook{OnStart: func(ctx context.Context) error { return scheduleService.Bootstrap(ctx) }}) +} +func startChannelManager(lc fx.Lifecycle, channelManager *channel.Manager) { + ctx, cancel := context.WithCancel(context.Background()) + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { channelManager.Start(ctx); return nil }, + OnStop: func(stopCtx context.Context) error { cancel(); return channelManager.Shutdown(stopCtx) }, + }) +} +func startContainerReconciliation(lc fx.Lifecycle, containerdHandler *handlers.ContainerdHandler, _ *mcp.ToolGatewayService) { + lc.Append(fx.Hook{OnStart: func(ctx context.Context) error { go containerdHandler.ReconcileContainers(ctx); return nil }}) +} +func startAgentRuntime(lc fx.Lifecycle, manager *agentruntime.Manager) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { return manager.Start(ctx) }, + OnStop: func(ctx context.Context) error { return manager.Stop(ctx) }, + }) +} +func startServer(lc fx.Lifecycle, logger *slog.Logger, srv *memohServer, shutdowner fx.Shutdowner, cfg config.Config, queries *dbsqlc.Queries, botService *bots.Service, containerdHandler *handlers.ContainerdHandler, mcpConnService *mcp.ConnectionService, toolGateway *mcp.ToolGatewayService, channelManager *channel.Manager) { + fmt.Printf("Starting Memoh Agent %s\n", version.GetInfo()) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + if err := ensureAdminUser(ctx, logger, queries, cfg); err != nil { + return err + } + botService.SetContainerLifecycle(containerdHandler) + botService.AddRuntimeChecker(healthcheck.NewRuntimeCheckerAdapter(mcpchecker.NewChecker(logger, mcpConnService, toolGateway))) + botService.AddRuntimeChecker(healthcheck.NewRuntimeCheckerAdapter(channelchecker.NewChecker(logger, channelManager))) + go func() { + if err := srv.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Error("server failed", slog.Any("error", err)) + _ = shutdowner.Shutdown() + } + }() + return nil + }, + OnStop: func(ctx context.Context) error { + if err := srv.Stop(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { + return fmt.Errorf("server stop: %w", err) + } + return nil + }, + }) +} + +func shouldSkipJWTForMemoh(path string) bool { + if _, ok := memohJWTExactSkipPaths[path]; ok { + return true + } + if hasAnyPrefix(path, memohJWTPrefixSkipPaths) { + return true + } + // Treat non-backend, extension-less paths as SPA routes (e.g. /chat, /settings/profile). + return shouldServeSPARouteForMemoh(path) +} + +func shouldServeSPARouteForMemoh(path string) bool { + if path == "" || path == "/" { + return true + } + if strings.Contains(path, ".") { + return false + } + if hasAnyPrefix(path, memohSPABackendPrefixes) { + return false + } + return true +} + +func rewriteAPIPathForMemoh(r *http.Request) { + if r == nil || r.URL == nil { + return + } + path := r.URL.Path + if !strings.HasPrefix(path, "/api/") { + return + } + if _, ok := memohAPIRewriteBypassExact[path]; ok { + return + } + if hasAnyPrefix(path, memohAPIRewriteBypassPrefixes) { + return + } + rewritten := strings.TrimPrefix(path, "/api") + if rewritten == "" { + rewritten = "/" + } + r.URL.Path = rewritten +} + +func hasAnyPrefix(path string, prefixes []string) bool { + for _, prefix := range prefixes { + if strings.HasPrefix(path, prefix) { + return true + } + } + return false +} +func buildTextEmbedder(resolver *embeddings.Resolver, textModel models.GetResponse, hasModels bool, log *slog.Logger) embeddings.Embedder { + if !hasModels { + return nil + } + if textModel.ModelID == "" || textModel.Dimensions <= 0 { + log.Warn("No text embedding model configured. Text embedding features will be limited.") + return nil + } + return &embeddings.ResolverTextEmbedder{Resolver: resolver, ModelID: textModel.ModelID, Dims: textModel.Dimensions} +} +func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Queries, cfg config.Config) error { + if queries == nil { + return fmt.Errorf("db queries not configured") + } + count, err := queries.CountAccounts(ctx) + if err != nil { + return err + } + if count > 0 { + return nil + } + username := strings.TrimSpace(cfg.Admin.Username) + password := strings.TrimSpace(cfg.Admin.Password) + email := strings.TrimSpace(cfg.Admin.Email) + if username == "" || password == "" { + return fmt.Errorf("admin username/password required in config.toml") + } + if password == "change-your-password-here" { + log.Warn("admin password uses default placeholder; please update config.toml") + } + hashed, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) + if err != nil { + return err + } + user, err := queries.CreateUser(ctx, dbsqlc.CreateUserParams{IsActive: true, Metadata: []byte("{}")}) + if err != nil { + return fmt.Errorf("create admin user: %w", err) + } + emailValue := pgtype.Text{Valid: false} + if email != "" { + emailValue = pgtype.Text{String: email, Valid: true} + } + displayName := pgtype.Text{String: username, Valid: true} + dataRoot := pgtype.Text{String: cfg.MCP.DataRoot, Valid: cfg.MCP.DataRoot != ""} + _, err = queries.CreateAccount(ctx, dbsqlc.CreateAccountParams{ + UserID: user.ID, Username: pgtype.Text{String: username, Valid: true}, Email: emailValue, + PasswordHash: pgtype.Text{String: string(hashed), Valid: true}, Role: "admin", + DisplayName: displayName, AvatarUrl: pgtype.Text{Valid: false}, IsActive: true, DataRoot: dataRoot, + }) + if err != nil { + return err + } + log.Info("Admin user created", slog.String("username", username)) + return nil +} + +type lazyLLMClient struct { + modelsService *models.Service + queries *dbsqlc.Queries + timeout time.Duration + logger *slog.Logger +} + +func (c *lazyLLMClient) Extract(ctx context.Context, req memory.ExtractRequest) (memory.ExtractResponse, error) { + client, err := c.resolve(ctx) + if err != nil { + return memory.ExtractResponse{}, err + } + return client.Extract(ctx, req) +} +func (c *lazyLLMClient) Decide(ctx context.Context, req memory.DecideRequest) (memory.DecideResponse, error) { + client, err := c.resolve(ctx) + if err != nil { + return memory.DecideResponse{}, err + } + return client.Decide(ctx, req) +} +func (c *lazyLLMClient) Compact(ctx context.Context, req memory.CompactRequest) (memory.CompactResponse, error) { + client, err := c.resolve(ctx) + if err != nil { + return memory.CompactResponse{}, err + } + return client.Compact(ctx, req) +} +func (c *lazyLLMClient) DetectLanguage(ctx context.Context, text string) (string, error) { + client, err := c.resolve(ctx) + if err != nil { + return "", err + } + return client.DetectLanguage(ctx, text) +} +func (c *lazyLLMClient) resolve(ctx context.Context) (memory.LLM, error) { + if c.modelsService == nil || c.queries == nil { + return nil, fmt.Errorf("models service not configured") + } + botID := memory.BotIDFromContext(ctx) + memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, botID) + if err != nil { + return nil, err + } + clientType := string(memoryModel.ClientType) + switch clientType { + case "openai-responses", "openai-completions", "anthropic-messages", "google-generative-ai": + default: + return nil, fmt.Errorf("memory model client type not supported: %s", clientType) + } + return memory.NewLLMClient(c.logger, memoryProvider.BaseUrl, memoryProvider.ApiKey, memoryModel.ModelID, c.timeout) +} + +type skillLoaderAdapter struct{ handler *handlers.ContainerdHandler } + +func (a *skillLoaderAdapter) LoadSkills(ctx context.Context, botID string) ([]flow.SkillEntry, error) { + items, err := a.handler.LoadSkills(ctx, botID) + if err != nil { + return nil, err + } + entries := make([]flow.SkillEntry, len(items)) + for i, item := range items { + entries[i] = flow.SkillEntry{Name: item.Name, Description: item.Description, Content: item.Content, Metadata: item.Metadata} + } + return entries, nil +} + +type mediaAssetResolverAdapter struct{ media *media.Service } + +func (a *mediaAssetResolverAdapter) GetByStorageKey(ctx context.Context, botID, storageKey string) (mcpmessage.AssetMeta, error) { + if a == nil || a.media == nil { + return mcpmessage.AssetMeta{}, fmt.Errorf("media service not configured") + } + asset, err := a.media.GetByStorageKey(ctx, botID, storageKey) + if err != nil { + return mcpmessage.AssetMeta{}, err + } + return mcpmessage.AssetMeta{ContentHash: asset.ContentHash, Mime: asset.Mime, SizeBytes: asset.SizeBytes, StorageKey: asset.StorageKey}, nil +} +func (a *mediaAssetResolverAdapter) IngestContainerFile(ctx context.Context, botID, containerPath string) (mcpmessage.AssetMeta, error) { + if a == nil || a.media == nil { + return mcpmessage.AssetMeta{}, fmt.Errorf("media service not configured") + } + asset, err := a.media.IngestContainerFile(ctx, botID, containerPath) + if err != nil { + return mcpmessage.AssetMeta{}, err + } + return mcpmessage.AssetMeta{ContentHash: asset.ContentHash, Mime: asset.Mime, SizeBytes: asset.SizeBytes, StorageKey: asset.StorageKey}, nil +} + +type gatewayAssetLoaderAdapter struct{ media *media.Service } + +func (a *gatewayAssetLoaderAdapter) OpenForGateway(ctx context.Context, botID, contentHash string) (io.ReadCloser, string, error) { + if a == nil || a.media == nil { + return nil, "", fmt.Errorf("media service not configured") + } + reader, asset, err := a.media.Open(ctx, botID, contentHash) + if err != nil { + return nil, "", err + } + return reader, strings.TrimSpace(asset.Mime), nil +} diff --git a/cmd/memoh/version.go b/cmd/memoh/version.go new file mode 100644 index 00000000..74833b32 --- /dev/null +++ b/cmd/memoh/version.go @@ -0,0 +1,12 @@ +package main + +import ( + "fmt" + + "github.com/memohai/memoh/internal/version" +) + +func runVersion() error { + fmt.Printf("memoh %s\n", version.GetInfo()) + return nil +} diff --git a/conf/app.windows.toml b/conf/app.windows.toml new file mode 100644 index 00000000..bafa1792 --- /dev/null +++ b/conf/app.windows.toml @@ -0,0 +1,52 @@ +# Memoh Windows configuration template +# It is an experimental feature and not recommended for production. + +[log] +level = "debug" +format = "text" + +[server] +addr = ":8080" + +[admin] +username = "admin" +password = "admin123" +email = "dev@memoh.local" + +[auth] +jwt_secret = "memoh-dev-secret-do-not-use-in-production" +jwt_expires_in = "168h" + +[containerd] +# Windows containerd usually exposes a named pipe endpoint. +# If your environment uses another pipe name or tcp endpoint, change this value. +socket_path = "npipe:////./pipe/containerd-containerd" +namespace = "default" + +[mcp] +image = "docker.io/library/memoh-mcp:dev" +snapshotter = "overlayfs" +data_root = "data" + +[postgres] +host = "127.0.0.1" +port = 5432 +user = "memoh" +password = "memoh123" +database = "memoh" +sslmode = "disable" + +[qdrant] +base_url = "http://127.0.0.1:6334" +api_key = "" +collection = "memory" +timeout_seconds = 10 + +[agent_gateway] +host = "127.0.0.1" +port = 8081 +server_addr = ":8080" + +[web] +host = "127.0.0.1" +port = 8082 diff --git a/go.mod b/go.mod index 8437ec9e..ac82d617 100644 --- a/go.mod +++ b/go.mod @@ -78,6 +78,7 @@ require ( github.com/google/go-cmp v0.7.0 // indirect github.com/google/jsonschema-go v0.4.2 // indirect github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect @@ -101,6 +102,8 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/sasha-s/go-deadlock v0.3.6 // indirect github.com/sirupsen/logrus v1.9.4 // indirect + github.com/spf13/cobra v1.10.2 // indirect + github.com/spf13/pflag v1.0.9 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect diff --git a/go.sum b/go.sum index 01da829c..39854861 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,7 @@ github.com/containerd/typeurl/v2 v2.2.3 h1:yNA/94zxWdvYACdYO8zofhrTVuQY73fFU1y++ github.com/containerd/typeurl/v2 v2.2.3/go.mod h1:95ljDnPfD3bAbDJRugOiShd/DlAAsxGtUBhJxIn7SCk= github.com/containernetworking/cni v1.3.0 h1:v6EpN8RznAZj9765HhXQrtXgX+ECGebEYEmnuFjskwo= github.com/containernetworking/cni v1.3.0/go.mod h1:Bs8glZjjFfGPHMw6hQu82RUgEPNGEaBb9KS5KtNMnJ4= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/cyphar/filepath-securejoin v0.6.1 h1:5CeZ1jPXEiYt3+Z6zqprSAgSWiggmpVyciv8syjIpVE= github.com/cyphar/filepath-securejoin v0.6.1/go.mod h1:A8hd4EnAeyujCJRrICiOWqjS1AX0a9kM5XL+NwKoYSc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -167,6 +168,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -257,10 +260,15 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sasha-s/go-deadlock v0.3.6 h1:TR7sfOnZ7x00tWPfD397Peodt57KzMDo+9Ae9rMiUmw= github.com/sasha-s/go-deadlock v0.3.6/go.mod h1:CUqNyyvMxTyjFqDT7MRg9mb4Dv/btmGTqSR+rky/UXo= github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w= github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g= +github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= +github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= +github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/internal/auth/jwt_test.go b/internal/auth/jwt_test.go index 5ccbf8ba..2b48f6eb 100644 --- a/internal/auth/jwt_test.go +++ b/internal/auth/jwt_test.go @@ -45,7 +45,6 @@ func TestRefreshTokenFromContext(t *testing.T) { originalClaims, ok := token.Claims.(jwt.MapClaims) assert.True(t, ok) origIat := int64(originalClaims["iat"].(float64)) - origExp := int64(originalClaims["exp"].(float64)) // Parse the new token newToken, err := jwt.Parse(newTokenStr, func(token *jwt.Token) (interface{}, error) { @@ -89,7 +88,7 @@ func TestRefreshTokenFromContext_MissingUser(t *testing.T) { // Context without the "user" key _, _, err := RefreshTokenFromContext(c, secret, defaultDuration) assert.Error(t, err) - + httpErr, ok := err.(*echo.HTTPError) assert.True(t, ok) assert.Equal(t, http.StatusUnauthorized, httpErr.Code) diff --git a/internal/bun/runtime/manager.go b/internal/bun/runtime/manager.go new file mode 100644 index 00000000..b6d1814e --- /dev/null +++ b/internal/bun/runtime/manager.go @@ -0,0 +1,255 @@ +package runtime + +import ( + "context" + "errors" + "fmt" + "io" + "io/fs" + "log/slog" + "net/http" + "os" + "os/exec" + "path/filepath" + "runtime" + "sync" + "syscall" + "time" + + "github.com/BurntSushi/toml" + + "github.com/memohai/memoh/internal/config" + "github.com/memohai/memoh/internal/embedded" +) + +type Manager struct { + log *slog.Logger + cfg config.Config + host string + port int + workdir string + cmd *exec.Cmd + stopOnce sync.Once +} + +const ( + defaultGatewayHost = "127.0.0.1" + defaultGatewayPort = 8081 + agentConfigFileName = "config.toml" + healthCheckTimeout = 30 * time.Second + healthCheckRetryBackoff = 400 * time.Millisecond + processStopTimeout = 5 * time.Second +) + +func NewManager(log *slog.Logger, cfg config.Config) *Manager { + host := cfg.AgentGateway.Host + if host == "" { + host = defaultGatewayHost + } + port := cfg.AgentGateway.Port + if port == 0 { + port = defaultGatewayPort + } + return &Manager{ + log: log.With(slog.String("component", "agent-runtime")), + cfg: cfg, + host: host, + port: port, + } +} + +func (m *Manager) Start(ctx context.Context) error { + workdir, err := os.MkdirTemp("", "memoh-agent-runtime-*") + if err != nil { + return fmt.Errorf("create runtime temp dir: %w", err) + } + m.workdir = workdir + + agentFS, err := embedded.AgentFS() + if err != nil { + return err + } + bunFS, bunBinName, err := embedded.BunFS("", "") + if err != nil { + return err + } + + agentDir := filepath.Join(workdir, "agent") + bunDir := filepath.Join(workdir, "bun") + if err := extractFS(agentFS, agentDir); err != nil { + return fmt.Errorf("extract agent assets: %w", err) + } + if err := extractFS(bunFS, bunDir); err != nil { + return fmt.Errorf("extract bun runtime: %w", err) + } + + bunPath := filepath.Join(bunDir, bunBinName) + if _, err := os.Stat(bunPath); err != nil { + if errors.Is(err, os.ErrNotExist) { + m.log.Warn("bundled bun runtime unavailable for current platform; falling back to configured agent gateway", slog.String("platform", runtimePlatform())) + return nil + } + return err + } + if err := os.Chmod(bunPath, 0o755); err != nil { + return fmt.Errorf("chmod bun binary: %w", err) + } + agentConfigPath := filepath.Join(agentDir, agentConfigFileName) + if err := writeAgentConfig(agentConfigPath, m.cfg); err != nil { + return err + } + + cmd := exec.Command(bunPath, "run", "dist/index.js") + cmd.Dir = agentDir + cmd.Env = append( + os.Environ(), + "MEMOH_CONFIG_PATH="+agentConfigPath, + "CONFIG_PATH="+agentConfigPath, + ) + cmd.Stdout = &logWriter{log: m.log, level: slog.LevelInfo} + cmd.Stderr = &logWriter{log: m.log, level: slog.LevelError} + + if err := cmd.Start(); err != nil { + return fmt.Errorf("start bundled agent runtime: %w", err) + } + m.cmd = cmd + + m.log.Info("bundled agent runtime started", slog.Int("pid", cmd.Process.Pid), slog.String("addr", m.address())) + if err := m.waitHealthy(ctx); err != nil { + return err + } + return nil +} + +func (m *Manager) Stop(ctx context.Context) error { + var retErr error + m.stopOnce.Do(func() { + if m.cmd == nil || m.cmd.Process == nil { + return + } + + _ = m.cmd.Process.Signal(os.Interrupt) + done := make(chan error, 1) + go func() { + done <- m.cmd.Wait() + }() + + select { + case err := <-done: + if err != nil && !errors.Is(err, syscall.EINTR) { + retErr = err + } + case <-ctx.Done(): + _ = m.cmd.Process.Kill() + retErr = ctx.Err() + case <-time.After(processStopTimeout): + _ = m.cmd.Process.Kill() + <-done + } + + if m.workdir != "" { + _ = os.RemoveAll(m.workdir) + } + }) + return retErr +} + +func (m *Manager) waitHealthy(ctx context.Context) error { + client := &http.Client{Timeout: 2 * time.Second} + healthURL := fmt.Sprintf("http://%s/health", m.address()) + deadline := time.Now().Add(healthCheckTimeout) + for time.Now().Before(deadline) { + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, healthURL, nil) + resp, err := client.Do(req) + if err == nil { + _ = resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + } + time.Sleep(healthCheckRetryBackoff) + } + return fmt.Errorf("bundled agent runtime health check timeout: %s", healthURL) +} + +func (m *Manager) address() string { + return fmt.Sprintf("%s:%d", m.host, m.port) +} + +func extractFS(src fs.FS, targetDir string) error { + if err := os.MkdirAll(targetDir, 0o755); err != nil { + return err + } + return fs.WalkDir(src, ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if path == "." { + return nil + } + target := filepath.Join(targetDir, path) + if d.IsDir() { + return os.MkdirAll(target, 0o755) + } + + if err := os.MkdirAll(filepath.Dir(target), 0o755); err != nil { + return err + } + r, err := src.Open(path) + if err != nil { + return err + } + defer r.Close() + + w, err := os.OpenFile(target, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644) + if err != nil { + return err + } + if _, err := io.Copy(w, r); err != nil { + _ = w.Close() + return err + } + return w.Close() + }) +} + +func writeAgentConfig(path string, cfg config.Config) error { + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return err + } + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("create agent config: %w", err) + } + defer f.Close() + return toml.NewEncoder(f).Encode(cfg) +} + +type logWriter struct { + log *slog.Logger + level slog.Level +} + +func (w *logWriter) Write(p []byte) (n int, err error) { + msg := string(p) + msg = trimTrailingNewline(msg) + if msg != "" { + w.log.Log(context.Background(), w.level, msg) + } + return len(p), nil +} + +func trimTrailingNewline(s string) string { + for len(s) > 0 { + last := s[len(s)-1] + if last != '\n' && last != '\r' { + break + } + s = s[:len(s)-1] + } + return s +} + +func runtimePlatform() string { + return fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH) +} diff --git a/internal/embedded/.gitignore b/internal/embedded/.gitignore new file mode 100644 index 00000000..729f3666 --- /dev/null +++ b/internal/embedded/.gitignore @@ -0,0 +1,6 @@ +web/** +agent/** +bun/** +!web/.gitignore +!agent/.gitignore +!bun/.gitignore diff --git a/internal/embedded/assets.go b/internal/embedded/assets.go new file mode 100644 index 00000000..10a19110 --- /dev/null +++ b/internal/embedded/assets.go @@ -0,0 +1,46 @@ +package embedded + +import ( + "embed" + "fmt" + "io/fs" + "path/filepath" + "runtime" +) + +// Include underscore/dot-prefixed files from bundled build output (e.g. Vite helper chunks). +// Keep the explicit _plugin pattern to ensure helper chunks are embedded for production SPA routing. +// +//go:embed all:web all:web/assets/* all:web/assets/_plugin-vue_export-helper-*.js all:web/channels/* all:agent all:bun +var assetsFS embed.FS + +func AssetsFS() fs.FS { + return assetsFS +} + +func WebFS() (fs.FS, error) { + return fs.Sub(assetsFS, "web") +} + +func AgentFS() (fs.FS, error) { + return fs.Sub(assetsFS, "agent") +} + +func BunFS(goos, goarch string) (fs.FS, string, error) { + if goos == "" { + goos = runtime.GOOS + } + if goarch == "" { + goarch = runtime.GOARCH + } + sub := filepath.ToSlash(filepath.Join("bun", goos+"-"+goarch)) + dirFS, err := fs.Sub(assetsFS, sub) + if err != nil { + return nil, "", fmt.Errorf("bun runtime not bundled for %s/%s: %w", goos, goarch, err) + } + bin := "bun" + if goos == "windows" { + bin = "bun.exe" + } + return dirFS, bin, nil +} diff --git a/internal/embedded/bun/.gitignore b/internal/embedded/bun/.gitignore new file mode 100644 index 00000000..d6b7ef32 --- /dev/null +++ b/internal/embedded/bun/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/internal/handlers/file_embed.go b/internal/handlers/file_embed.go new file mode 100644 index 00000000..852d6212 --- /dev/null +++ b/internal/handlers/file_embed.go @@ -0,0 +1,100 @@ +package handlers + +import ( + "io/fs" + "log/slog" + "mime" + "net/http" + "path" + "path/filepath" + "strings" + + "github.com/labstack/echo/v4" + + "github.com/memohai/memoh/internal/embedded" +) + +type EmbeddedWebHandler struct { + log *slog.Logger + webFS fs.FS +} + +var embeddedStaticRoutes = map[string]struct { + assetPath string + contentType string +}{ + "/logo.png": {assetPath: "logo.png", contentType: "image/png"}, + "/channels/telegram.webp": {assetPath: "channels/telegram.webp", contentType: "image/webp"}, + "/channels/feishu.png": {assetPath: "channels/feishu.png", contentType: "image/png"}, +} + +func NewEmbeddedWebHandler(log *slog.Logger) (*EmbeddedWebHandler, error) { + webFS, err := embedded.WebFS() + if err != nil { + return nil, err + } + return &EmbeddedWebHandler{log: log, webFS: webFS}, nil +} + +func (h *EmbeddedWebHandler) Register(e *echo.Echo) { + e.GET("/assets/*", h.serveAsset) + for route, meta := range embeddedStaticRoutes { + e.GET(route, h.serveStatic(meta.assetPath, meta.contentType)) + } + e.GET("/", h.serveIndex) + e.GET("/*", func(c echo.Context) error { + reqPath := c.Request().URL.Path + if isBackendPath(reqPath) || strings.Contains(path.Base(reqPath), ".") { + return echo.ErrNotFound + } + return h.serveIndex(c) + }) +} + +func (h *EmbeddedWebHandler) serveIndex(c echo.Context) error { + content, err := fs.ReadFile(h.webFS, "index.html") + if err != nil { + h.log.Error("read embedded index.html failed", slog.Any("error", err)) + return echo.ErrNotFound + } + return c.Blob(http.StatusOK, "text/html; charset=utf-8", content) +} + +func (h *EmbeddedWebHandler) serveStatic(targetPath, contentType string) echo.HandlerFunc { + return func(c echo.Context) error { + content, err := fs.ReadFile(h.webFS, targetPath) + if err != nil { + return echo.ErrNotFound + } + return c.Blob(http.StatusOK, contentType, content) + } +} + +func (h *EmbeddedWebHandler) serveAsset(c echo.Context) error { + assetPath := strings.TrimPrefix(c.Param("*"), "/") + if assetPath == "" { + return echo.ErrNotFound + } + + fullPath := path.Join("assets", assetPath) + content, err := fs.ReadFile(h.webFS, fullPath) + if err != nil { + return echo.ErrNotFound + } + + contentType := mime.TypeByExtension(filepath.Ext(assetPath)) + if contentType == "" { + contentType = "application/octet-stream" + } + return c.Blob(http.StatusOK, contentType, content) +} + +func isBackendPath(p string) bool { + return p == "/ping" || + p == "/health" || + strings.HasPrefix(p, "/api") || + strings.HasPrefix(p, "/auth") || + strings.HasPrefix(p, "/channels") || + strings.HasPrefix(p, "/containers") || + strings.HasPrefix(p, "/inbox") +} diff --git a/internal/server/server.go b/internal/server/server.go index 68131bd8..a6452b4f 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -72,7 +72,10 @@ func (s *Server) Stop(ctx context.Context) error { } func shouldSkipJWT(path string) bool { - if path == "/ping" || path == "/health" || path == "/api/swagger.json" || path == "/auth/login" { + if path == "/" || path == "/ping" || path == "/health" || path == "/api/swagger.json" || path == "/auth/login" { + return true + } + if strings.HasPrefix(path, "/assets/") { return true } if strings.HasPrefix(path, "/api/docs") { diff --git a/mise.toml b/mise.toml index 07b9e40a..9068b584 100644 --- a/mise.toml +++ b/mise.toml @@ -72,6 +72,21 @@ run = "scripts/db-drop.sh" description = "Release new version" run = "pnpm release" +[tasks.build-embedded-assets] +description = "Build and stage embedded web/agent/bun assets" +run = "scripts/release.sh --prepare-assets" +depends = ["//:pnpm-install"] + +[tasks.build-unified] +description = "Build unified memoh binary" +depends = ["//:build-embedded-assets"] +run = "go build -o bin/memoh ./cmd/memoh" + +[tasks.release-binaries] +description = "Build release archive for one target (requires TARGET_OS TARGET_ARCH)" +depends = ["//:pnpm-install"] +run = "scripts/release.sh" + [tasks.install-cli] description = "Install CLI" depends = ["//:pnpm-install"] diff --git a/scripts/release.sh b/scripts/release.sh new file mode 100755 index 00000000..07095a4c --- /dev/null +++ b/scripts/release.sh @@ -0,0 +1,164 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" +TARGET_OS="${TARGET_OS:-$(go env GOOS)}" +TARGET_ARCH="${TARGET_ARCH:-$(go env GOARCH)}" +BUN_VERSION="${BUN_VERSION:-latest}" +VERSION="${VERSION:-dev}" +COMMIT_HASH="${COMMIT_HASH:-unknown}" +BUILD_TIME="${BUILD_TIME:-$(date -u +"%Y-%m-%dT%H:%M:%SZ")}" +OUTPUT_DIR="${OUTPUT_DIR:-$ROOT_DIR/dist}" +PREPARE_ASSETS_ONLY="false" + +while [[ $# -gt 0 ]]; do + case "$1" in + --os) + TARGET_OS="$2" + shift 2 + ;; + --arch) + TARGET_ARCH="$2" + shift 2 + ;; + --bun-version) + BUN_VERSION="$2" + shift 2 + ;; + --version) + VERSION="$2" + shift 2 + ;; + --commit-hash) + COMMIT_HASH="$2" + shift 2 + ;; + --output-dir) + OUTPUT_DIR="$2" + shift 2 + ;; + --prepare-assets) + PREPARE_ASSETS_ONLY="true" + shift + ;; + *) + echo "Unknown arg: $1" >&2 + exit 1 + ;; + esac +done + +prepare_assets() { + local web_dir="$ROOT_DIR/internal/embedded/web" + local agent_dir="$ROOT_DIR/internal/embedded/agent" + local bun_dir="$ROOT_DIR/internal/embedded/bun/${TARGET_OS}-${TARGET_ARCH}" + + rm -rf "$web_dir" "$agent_dir" "$bun_dir" + mkdir -p "$web_dir" "$agent_dir" "$bun_dir" + + echo "[release] building web assets" + pnpm --dir "$ROOT_DIR" web:build + cp -R "$ROOT_DIR/packages/web/dist/." "$web_dir/" + + echo "[release] building agent bundle" + pnpm --dir "$ROOT_DIR" agent:build + mkdir -p "$agent_dir/dist" + cp "$ROOT_DIR/agent/dist/index.js" "$agent_dir/dist/index.js" + if [[ -f "$ROOT_DIR/agent/package.json" ]]; then + cp "$ROOT_DIR/agent/package.json" "$agent_dir/package.json" + fi + + local bun_target="" + case "${TARGET_OS}-${TARGET_ARCH}" in + linux-amd64) bun_target="bun-linux-x64.zip" ;; + linux-arm64) bun_target="bun-linux-aarch64.zip" ;; + darwin-amd64) bun_target="bun-darwin-x64.zip" ;; + darwin-arm64) bun_target="bun-darwin-aarch64.zip" ;; + windows-amd64) bun_target="bun-windows-x64.zip" ;; + windows-arm64) bun_target="bun-windows-aarch64.zip" ;; + *) + echo "bun runtime not available for ${TARGET_OS}-${TARGET_ARCH}" > "$bun_dir/UNAVAILABLE" + echo "[release] skipped bun bundle for unsupported target ${TARGET_OS}-${TARGET_ARCH}" + return 0 + ;; + esac + + local tmp_dir + tmp_dir="$(mktemp -d)" + trap 'rm -rf "$tmp_dir"' RETURN + + local url + if [[ "$BUN_VERSION" == "latest" ]]; then + url="https://github.com/oven-sh/bun/releases/latest/download/${bun_target}" + else + url="https://github.com/oven-sh/bun/releases/download/bun-v${BUN_VERSION}/${bun_target}" + fi + + echo "[release] downloading ${url}" + curl -fsSL "$url" -o "$tmp_dir/bun.zip" + unzip -q -o "$tmp_dir/bun.zip" -d "$tmp_dir" + + local bun_bin_name="bun" + if [[ "$TARGET_OS" == "windows" ]]; then + bun_bin_name="bun.exe" + fi + + local bun_source_path="" + if [[ -f "$tmp_dir/${bun_target%.zip}/${bun_bin_name}" ]]; then + bun_source_path="$tmp_dir/${bun_target%.zip}/${bun_bin_name}" + else + for candidate in "$tmp_dir"/bun-"${TARGET_OS}"-*/"${bun_bin_name}"; do + if [[ -f "$candidate" ]]; then + bun_source_path="$candidate" + break + fi + done + fi + + if [[ -z "$bun_source_path" ]]; then + echo "failed to locate bun binary in downloaded archive" >&2 + exit 1 + fi + + cp "$bun_source_path" "$bun_dir/$bun_bin_name" + chmod +x "$bun_dir/$bun_bin_name" || true + + echo "[release] embedded assets prepared (${TARGET_OS}-${TARGET_ARCH})" +} + +build_archive() { + mkdir -p "$OUTPUT_DIR" + + local ext="" + if [[ "$TARGET_OS" == "windows" ]]; then + ext=".exe" + fi + + local binary_name="memoh${ext}" + local target_dir="$OUTPUT_DIR/memoh_${VERSION}_${TARGET_OS}_${TARGET_ARCH}" + mkdir -p "$target_dir" + + echo "[release] building binary ${TARGET_OS}/${TARGET_ARCH}" + CGO_ENABLED=0 GOOS="$TARGET_OS" GOARCH="$TARGET_ARCH" \ + go build \ + -trimpath \ + -ldflags "-s -w -X github.com/memohai/memoh/internal/version.Version=${VERSION} -X github.com/memohai/memoh/internal/version.CommitHash=${COMMIT_HASH} -X github.com/memohai/memoh/internal/version.BuildTime=${BUILD_TIME}" \ + -o "$target_dir/$binary_name" \ + "$ROOT_DIR/cmd/memoh" + + if [[ "$TARGET_OS" == "windows" ]]; then + (cd "$OUTPUT_DIR" && zip -q -r "memoh_${VERSION}_${TARGET_OS}_${TARGET_ARCH}.zip" "memoh_${VERSION}_${TARGET_OS}_${TARGET_ARCH}") + else + tar -C "$OUTPUT_DIR" -czf "$OUTPUT_DIR/memoh_${VERSION}_${TARGET_OS}_${TARGET_ARCH}.tar.gz" "memoh_${VERSION}_${TARGET_OS}_${TARGET_ARCH}" + fi + + echo "[release] archive created (${TARGET_OS}-${TARGET_ARCH})" +} + +prepare_assets +if [[ "$PREPARE_ASSETS_ONLY" == "true" ]]; then + echo "[release] prepare-assets only mode completed" + exit 0 +fi + +build_archive