mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
+293
-128
@@ -2,12 +2,16 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/memohai/memoh/internal/boot"
|
||||
"github.com/memohai/memoh/internal/bots"
|
||||
"github.com/memohai/memoh/internal/channel"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/feishu"
|
||||
@@ -36,164 +40,317 @@ import (
|
||||
"github.com/memohai/memoh/internal/subagent"
|
||||
"github.com/memohai/memoh/internal/users"
|
||||
"github.com/memohai/memoh/internal/version"
|
||||
"go.uber.org/fx"
|
||||
"go.uber.org/fx/fxevent"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Printf("Starting Memoh Agent %s\n", version.GetInfo())
|
||||
ctx := context.Background()
|
||||
func provideConfig() (config.Config, error) {
|
||||
cfgPath := os.Getenv("CONFIG_PATH")
|
||||
cfg, err := config.Load(cfgPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "load config: %v\n", err)
|
||||
os.Exit(1)
|
||||
return config.Config{}, fmt.Errorf("load config: %v\n", err)
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func provideLogger(cfg config.Config) *slog.Logger {
|
||||
logger.Init(cfg.Log.Level, cfg.Log.Format)
|
||||
return logger.L
|
||||
}
|
||||
|
||||
if strings.TrimSpace(cfg.Auth.JWTSecret) == "" {
|
||||
logger.Error("jwt secret is required")
|
||||
os.Exit(1)
|
||||
}
|
||||
jwtExpiresIn, err := time.ParseDuration(cfg.Auth.JWTExpiresIn)
|
||||
func provideContainerdClient(lc fx.Lifecycle, runtimeConfig *boot.RuntimeConfig) (*containerd.Client, error) {
|
||||
factory := ctr.DefaultClientFactory{SocketPath: runtimeConfig.ContainerdSocketPath}
|
||||
client, err := factory.New(context.Background())
|
||||
if err != nil {
|
||||
logger.Error("invalid jwt expires in", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
return nil, fmt.Errorf("connect containerd: %w", err)
|
||||
}
|
||||
|
||||
addr := cfg.Server.Addr
|
||||
if value := os.Getenv("HTTP_ADDR"); value != "" {
|
||||
addr = value
|
||||
}
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
if err := client.Close(); err != nil {
|
||||
return fmt.Errorf("close containerd client: %w", err)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return client, nil
|
||||
}
|
||||
|
||||
socketPath := cfg.Containerd.SocketPath
|
||||
if value := os.Getenv("CONTAINERD_SOCKET"); value != "" {
|
||||
socketPath = value
|
||||
}
|
||||
factory := ctr.DefaultClientFactory{SocketPath: socketPath}
|
||||
client, err := factory.New(ctx)
|
||||
if err != nil {
|
||||
logger.Error("connect containerd", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
defer client.Close()
|
||||
func main() {
|
||||
fx.New(
|
||||
fx.Provide(
|
||||
provideConfig,
|
||||
boot.ProvideRuntimeConfig,
|
||||
provideLogger,
|
||||
|
||||
service := ctr.NewDefaultService(logger.L, client, cfg.Containerd.Namespace)
|
||||
manager := mcp.NewManager(logger.L, service, cfg.MCP)
|
||||
// misc
|
||||
provideContainerdClient,
|
||||
provideDBConn,
|
||||
provideDBQueries,
|
||||
|
||||
pingHandler := handlers.NewPingHandler(logger.L)
|
||||
// containerdHandler is created later after DB services are initialized
|
||||
fx.Annotate(ctr.NewDefaultService, fx.As(new(ctr.Service))),
|
||||
mcp.NewManager,
|
||||
|
||||
provideMemoryLLM,
|
||||
provideEmbeddingsResolver,
|
||||
provideEmbeddingSetup,
|
||||
provideTextEmbedderForMemory,
|
||||
provideQdrantStore,
|
||||
memory.NewBM25Indexer,
|
||||
provideChatResolver,
|
||||
local.NewSessionHub,
|
||||
provideChannelRegistry,
|
||||
|
||||
provideChannelRouter,
|
||||
provideChannelManager,
|
||||
|
||||
chat.NewScheduleGateway,
|
||||
fx.Annotate(func(scheduleGateway *chat.ScheduleGateway) schedule.Triggerer {
|
||||
return scheduleGateway
|
||||
}, fx.As(new(schedule.Triggerer))),
|
||||
|
||||
models.NewService,
|
||||
bots.NewService,
|
||||
users.NewService,
|
||||
providers.NewService,
|
||||
settings.NewService,
|
||||
history.NewService,
|
||||
contacts.NewService,
|
||||
preauth.NewService,
|
||||
mcp.NewConnectionService,
|
||||
subagent.NewService,
|
||||
schedule.NewService,
|
||||
channel.NewService,
|
||||
policy.NewService,
|
||||
provideMemoryService,
|
||||
|
||||
provideServerHandler(handlers.NewPingHandler),
|
||||
provideServerHandler(handlers.NewAuthHandler),
|
||||
provideServerHandler(handlers.NewMemoryHandler),
|
||||
provideServerHandler(handlers.NewEmbeddingsHandler),
|
||||
provideServerHandler(handlers.NewChatHandler),
|
||||
provideServerHandler(handlers.NewSwaggerHandler),
|
||||
provideServerHandler(handlers.NewProvidersHandler),
|
||||
provideServerHandler(handlers.NewModelsHandler),
|
||||
provideServerHandler(handlers.NewSettingsHandler),
|
||||
provideServerHandler(handlers.NewHistoryHandler),
|
||||
provideServerHandler(handlers.NewContactsHandler),
|
||||
provideServerHandler(handlers.NewPreauthHandler),
|
||||
provideServerHandler(handlers.NewScheduleHandler),
|
||||
provideServerHandler(handlers.NewSubagentHandler),
|
||||
handlers.NewContainerdHandler,
|
||||
provideServerHandler(handlers.NewContainerdHandler),
|
||||
provideServerHandler(handlers.NewChannelHandler),
|
||||
provideServerHandler(handlers.NewUsersHandler),
|
||||
provideServerHandler(handlers.NewMCPHandler),
|
||||
provideServerHandler(provideCLIHandler),
|
||||
provideServerHandler(provideWebHandler),
|
||||
|
||||
provideServer,
|
||||
),
|
||||
fx.Invoke(
|
||||
startMemoryWarmup,
|
||||
startScheduleService,
|
||||
startChannelManager,
|
||||
startServer,
|
||||
),
|
||||
fx.WithLogger(func(logger *slog.Logger) fxevent.Logger {
|
||||
l := &fxevent.SlogLogger{Logger: logger.With(slog.String("component", "fx"))}
|
||||
// l.UseLogLevel(slog.LevelInfo)
|
||||
return l
|
||||
}),
|
||||
).Run()
|
||||
}
|
||||
|
||||
func provideServerHandler(fn any) any {
|
||||
return fx.Annotate(
|
||||
fn,
|
||||
fx.As(new(server.Handler)),
|
||||
fx.ResultTags(`group:"server_handlers"`),
|
||||
)
|
||||
}
|
||||
|
||||
func provideDBConn(lc fx.Lifecycle, cfg config.Config) (*pgxpool.Pool, error) {
|
||||
ctx := context.Background() // TODO: use timeout context
|
||||
|
||||
conn, err := db.Open(ctx, cfg.Postgres)
|
||||
if err != nil {
|
||||
logger.Error("db connect", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
return nil, fmt.Errorf("db connect: %w", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
manager.WithDB(conn)
|
||||
queries := dbsqlc.New(conn)
|
||||
modelsService := models.NewService(logger.L, queries)
|
||||
botService := bots.NewService(logger.L, queries)
|
||||
usersService := users.NewService(logger.L, queries)
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
conn.Close()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
containerdHandler := handlers.NewContainerdHandler(logger.L, service, cfg.MCP, cfg.Containerd.Namespace, botService, usersService, queries)
|
||||
botService.SetContainerLifecycle(containerdHandler)
|
||||
func provideDBQueries(conn *pgxpool.Pool) *dbsqlc.Queries {
|
||||
return dbsqlc.New(conn)
|
||||
}
|
||||
|
||||
if err := ensureAdminUser(ctx, logger.L, queries, cfg); err != nil {
|
||||
logger.Error("ensure admin user", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
func provideEmbeddingsResolver(log *slog.Logger, modelsService *models.Service, queries *dbsqlc.Queries) *embeddings.Resolver {
|
||||
return embeddings.NewResolver(log, modelsService, queries, 10*time.Second)
|
||||
}
|
||||
|
||||
authHandler := handlers.NewAuthHandler(logger.L, usersService, cfg.Auth.JWTSecret, jwtExpiresIn)
|
||||
type embeddingSetup struct {
|
||||
Vectors map[string]int
|
||||
TextModel models.GetResponse
|
||||
MultimodalModel models.GetResponse
|
||||
HasEmbeddingModels bool
|
||||
}
|
||||
|
||||
// Initialize chat resolver after memory service is configured.
|
||||
var chatResolver *chat.Resolver
|
||||
func provideEmbeddingSetup(log *slog.Logger, modelsService *models.Service) (embeddingSetup, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Create LLM client for memory operations (deferred model/provider selection).
|
||||
var llmClient memory.LLM = &lazyLLMClient{
|
||||
modelsService: modelsService,
|
||||
queries: queries,
|
||||
timeout: 30 * time.Second,
|
||||
logger: logger.L,
|
||||
}
|
||||
|
||||
resolver := embeddings.NewResolver(logger.L, modelsService, queries, 10*time.Second)
|
||||
vectors, textModel, multimodalModel, hasEmbeddingModels, err := embeddings.CollectEmbeddingVectors(ctx, modelsService)
|
||||
if err != nil {
|
||||
logger.Error("embedding models", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
return embeddingSetup{}, fmt.Errorf("embedding models: %w", err)
|
||||
}
|
||||
|
||||
textEmbedder := buildTextEmbedder(resolver, textModel, hasEmbeddingModels, logger.L)
|
||||
if hasEmbeddingModels && multimodalModel.ModelID == "" {
|
||||
logger.Warn("No multimodal embedding model configured. Multimodal embedding features will be limited.")
|
||||
log.Warn("No multimodal embedding model configured. Multimodal embedding features will be limited.")
|
||||
}
|
||||
store := buildQdrantStore(logger.L, cfg.Qdrant, vectors, hasEmbeddingModels, textModel.Dimensions)
|
||||
return embeddingSetup{
|
||||
Vectors: vectors,
|
||||
TextModel: textModel,
|
||||
MultimodalModel: multimodalModel,
|
||||
HasEmbeddingModels: hasEmbeddingModels,
|
||||
}, nil
|
||||
}
|
||||
|
||||
bm25Indexer := memory.NewBM25Indexer(logger.L)
|
||||
memoryService := memory.NewService(logger.L, llmClient, textEmbedder, store, resolver, bm25Indexer, textModel.ModelID, multimodalModel.ModelID)
|
||||
memoryHandler := handlers.NewMemoryHandler(logger.L, memoryService, botService, usersService)
|
||||
go func() {
|
||||
if err := memoryService.WarmupBM25(ctx, 200); err != nil {
|
||||
logger.Warn("bm25 warmup failed", slog.Any("error", err))
|
||||
}
|
||||
}()
|
||||
func provideTextEmbedderForMemory(resolver *embeddings.Resolver, setup embeddingSetup, log *slog.Logger) embeddings.Embedder {
|
||||
return buildTextEmbedder(resolver, setup.TextModel, setup.HasEmbeddingModels, log)
|
||||
}
|
||||
|
||||
// Initialize providers and models handlers
|
||||
providersService := providers.NewService(logger.L, queries)
|
||||
providersHandler := handlers.NewProvidersHandler(logger.L, providersService, modelsService)
|
||||
settingsService := settings.NewService(logger.L, queries)
|
||||
settingsHandler := handlers.NewSettingsHandler(logger.L, settingsService, botService, usersService)
|
||||
modelsHandler := handlers.NewModelsHandler(logger.L, modelsService, settingsService)
|
||||
policyService := policy.NewService(logger.L, botService, settingsService)
|
||||
historyService := history.NewService(logger.L, queries)
|
||||
historyHandler := handlers.NewHistoryHandler(logger.L, historyService, botService, usersService)
|
||||
contactsService := contacts.NewService(queries)
|
||||
contactsHandler := handlers.NewContactsHandler(contactsService, botService, usersService)
|
||||
preauthService := preauth.NewService(queries)
|
||||
preauthHandler := handlers.NewPreauthHandler(preauthService, botService, usersService)
|
||||
mcpConnectionsService := mcp.NewConnectionService(logger.L, queries)
|
||||
mcpHandler := handlers.NewMCPHandler(logger.L, mcpConnectionsService, botService, usersService)
|
||||
func provideMemoryService(log *slog.Logger, llm memory.LLM, embedder embeddings.Embedder, store *memory.QdrantStore, resolver *embeddings.Resolver, bm25Indexer *memory.BM25Indexer, setup embeddingSetup) *memory.Service {
|
||||
return memory.NewService(log, llm, embedder, store, resolver, bm25Indexer, setup.TextModel.ModelID, setup.MultimodalModel.ModelID)
|
||||
}
|
||||
|
||||
chatResolver = chat.NewResolver(logger.L, modelsService, queries, memoryService, historyService, settingsService, mcpConnectionsService, cfg.AgentGateway.BaseURL(), 120*time.Second)
|
||||
func provideChatResolver(log *slog.Logger, cfg config.Config, modelsService *models.Service, queries *dbsqlc.Queries, memoryService *memory.Service, historyService *history.Service, settingsService *settings.Service, mcpConnectionsService *mcp.ConnectionService, containerdHandler *handlers.ContainerdHandler) *chat.Resolver {
|
||||
chatResolver := chat.NewResolver(log, modelsService, queries, memoryService, historyService, settingsService, mcpConnectionsService, cfg.AgentGateway.BaseURL(), 120*time.Second)
|
||||
chatResolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler})
|
||||
embeddingsHandler := handlers.NewEmbeddingsHandler(logger.L, modelsService, queries)
|
||||
swaggerHandler := handlers.NewSwaggerHandler(logger.L)
|
||||
chatHandler := handlers.NewChatHandler(logger.L, chatResolver, botService, usersService)
|
||||
channelRegistry := channel.NewRegistry()
|
||||
sessionHub := local.NewSessionHub()
|
||||
channelRegistry.MustRegister(telegram.NewTelegramAdapter(logger.L))
|
||||
channelRegistry.MustRegister(feishu.NewFeishuAdapter(logger.L))
|
||||
channelRegistry.MustRegister(local.NewCLIAdapter(sessionHub))
|
||||
channelRegistry.MustRegister(local.NewWebAdapter(sessionHub))
|
||||
channelService := channel.NewService(queries, channelRegistry)
|
||||
channelRouter := router.NewChannelInboundProcessor(logger.L, channelRegistry, channelService, chatResolver, contactsService, policyService, preauthService, cfg.Auth.JWTSecret, 5*time.Minute)
|
||||
channelManager := channel.NewManager(logger.L, channelRegistry, channelService, channelRouter)
|
||||
return chatResolver
|
||||
}
|
||||
|
||||
func provideChannelRegistry(log *slog.Logger, sessionHub *local.SessionHub) *channel.Registry {
|
||||
registry := channel.NewRegistry()
|
||||
registry.MustRegister(telegram.NewTelegramAdapter(log))
|
||||
registry.MustRegister(feishu.NewFeishuAdapter(log))
|
||||
registry.MustRegister(local.NewCLIAdapter(sessionHub))
|
||||
registry.MustRegister(local.NewWebAdapter(sessionHub))
|
||||
return registry
|
||||
}
|
||||
|
||||
func provideChannelRouter(log *slog.Logger, registry *channel.Registry, channelService *channel.Service, chatResolver *chat.Resolver, contactsService *contacts.Service, policyService *policy.Service, preauthService *preauth.Service, cfg config.Config) *router.ChannelInboundProcessor {
|
||||
return router.NewChannelInboundProcessor(log, registry, channelService, chatResolver, contactsService, policyService, preauthService, cfg.Auth.JWTSecret, 5*time.Minute)
|
||||
}
|
||||
|
||||
func provideChannelManager(log *slog.Logger, registry *channel.Registry, channelService *channel.Service, channelRouter *router.ChannelInboundProcessor) *channel.Manager {
|
||||
channelManager := channel.NewManager(log, registry, channelService, channelRouter)
|
||||
if mw := channelRouter.IdentityMiddleware(); mw != nil {
|
||||
channelManager.Use(mw)
|
||||
}
|
||||
channelManager.Start(ctx)
|
||||
channelHandler := handlers.NewChannelHandler(channelService, channelRegistry)
|
||||
usersHandler := handlers.NewUsersHandler(logger.L, usersService, botService, channelService, channelManager, channelRegistry)
|
||||
cliHandler := handlers.NewLocalChannelHandler(local.CLIType, channelManager, channelService, sessionHub, botService, usersService)
|
||||
webHandler := handlers.NewLocalChannelHandler(local.WebType, channelManager, channelService, sessionHub, botService, usersService)
|
||||
scheduleGateway := chat.NewScheduleGateway(chatResolver)
|
||||
scheduleService := schedule.NewService(logger.L, queries, scheduleGateway, cfg.Auth.JWTSecret)
|
||||
if err := scheduleService.Bootstrap(ctx); err != nil {
|
||||
logger.Error("schedule bootstrap", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
scheduleHandler := handlers.NewScheduleHandler(logger.L, scheduleService, botService, usersService)
|
||||
subagentService := subagent.NewService(logger.L, queries)
|
||||
subagentHandler := handlers.NewSubagentHandler(logger.L, subagentService, botService, usersService)
|
||||
srv := server.NewServer(logger.L, addr, cfg.Auth.JWTSecret, pingHandler, authHandler, memoryHandler, embeddingsHandler, chatHandler, swaggerHandler, providersHandler, modelsHandler, settingsHandler, historyHandler, contactsHandler, preauthHandler, scheduleHandler, subagentHandler, containerdHandler, channelHandler, usersHandler, mcpHandler, cliHandler, webHandler)
|
||||
return channelManager
|
||||
}
|
||||
|
||||
if err := srv.Start(); err != nil {
|
||||
logger.Error("server failed", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
func provideCLIHandler(channelManager *channel.Manager, channelService *channel.Service, sessionHub *local.SessionHub, botService *bots.Service, usersService *users.Service) *handlers.LocalChannelHandler {
|
||||
return handlers.NewLocalChannelHandler(local.CLIType, channelManager, channelService, sessionHub, botService, usersService)
|
||||
}
|
||||
|
||||
func provideWebHandler(channelManager *channel.Manager, channelService *channel.Service, sessionHub *local.SessionHub, botService *bots.Service, usersService *users.Service) *handlers.LocalChannelHandler {
|
||||
return handlers.NewLocalChannelHandler(local.WebType, channelManager, channelService, sessionHub, botService, usersService)
|
||||
}
|
||||
|
||||
type serverParams struct {
|
||||
fx.In
|
||||
|
||||
Logger *slog.Logger
|
||||
RuntimeConfig *boot.RuntimeConfig
|
||||
Config config.Config
|
||||
ServerHandlers []server.Handler `group:"server_handlers"`
|
||||
}
|
||||
|
||||
func provideServer(params serverParams) *server.Server {
|
||||
return server.NewServer(params.Logger, params.RuntimeConfig.ServerAddr, params.Config.Auth.JWTSecret, params.ServerHandlers...)
|
||||
}
|
||||
|
||||
func startMemoryWarmup(lc fx.Lifecycle, memoryService *memory.Service, logger *slog.Logger) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
go func() {
|
||||
if err := memoryService.WarmupBM25(context.Background(), 200); err != nil {
|
||||
logger.Warn("bm25 warmup failed", slog.Any("error", err))
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func startChannelManager(lc fx.Lifecycle, channelManager *channel.Manager, logger *slog.Logger) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
channelManager.Start(ctx)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func startScheduleService(lc fx.Lifecycle, scheduleService *schedule.Service, logger *slog.Logger) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
return scheduleService.Bootstrap(ctx)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func startServer(
|
||||
lc fx.Lifecycle,
|
||||
logger *slog.Logger,
|
||||
srv *server.Server,
|
||||
shutdowner fx.Shutdowner,
|
||||
cfg config.Config,
|
||||
queries *dbsqlc.Queries,
|
||||
scheduleService *schedule.Service,
|
||||
channelManager *channel.Manager,
|
||||
botService *bots.Service,
|
||||
containerdHandler *handlers.ContainerdHandler,
|
||||
) {
|
||||
fmt.Printf("Starting Memoh Agent %s\n", version.GetInfo())
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
|
||||
if err := ensureAdminUser(ctx, logger, queries, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
botService.SetContainerLifecycle(containerdHandler)
|
||||
|
||||
go func() {
|
||||
if err := srv.Start(); err != nil { // block until server is stopped
|
||||
logger.Error("server failed", slog.Any("error", err))
|
||||
_ = shutdowner.Shutdown() // shutdown the application if the server fails to start
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
// graceful shutdown
|
||||
if err := srv.Stop(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return fmt.Errorf("server stop: %w", err)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func buildTextEmbedder(resolver *embeddings.Resolver, textModel models.GetResponse, hasModels bool, log *slog.Logger) embeddings.Embedder {
|
||||
@@ -211,38 +368,37 @@ func buildTextEmbedder(resolver *embeddings.Resolver, textModel models.GetRespon
|
||||
}
|
||||
}
|
||||
|
||||
func buildQdrantStore(log *slog.Logger, cfg config.QdrantConfig, vectors map[string]int, hasModels bool, textDims int) *memory.QdrantStore {
|
||||
func provideQdrantStore(log *slog.Logger, cfgAll config.Config, setup embeddingSetup) (*memory.QdrantStore, error) {
|
||||
cfg := cfgAll.Qdrant
|
||||
timeout := time.Duration(cfg.TimeoutSeconds) * time.Second
|
||||
if hasModels && len(vectors) > 0 {
|
||||
if setup.HasEmbeddingModels && len(setup.Vectors) > 0 {
|
||||
store, err := memory.NewQdrantStoreWithVectors(
|
||||
log,
|
||||
cfg.BaseURL,
|
||||
cfg.APIKey,
|
||||
cfg.Collection,
|
||||
vectors,
|
||||
setup.Vectors,
|
||||
"sparse_hash",
|
||||
timeout,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("qdrant named vectors init", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
return nil, fmt.Errorf("qdrant named vectors init: %w", err)
|
||||
}
|
||||
return store
|
||||
return store, nil
|
||||
}
|
||||
store, err := memory.NewQdrantStore(
|
||||
log,
|
||||
cfg.BaseURL,
|
||||
cfg.APIKey,
|
||||
cfg.Collection,
|
||||
textDims,
|
||||
setup.TextModel.Dimensions,
|
||||
"sparse_hash",
|
||||
timeout,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("qdrant init", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
return nil, fmt.Errorf("qdrant init: %w", err)
|
||||
}
|
||||
return store
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Queries, cfg config.Config) error {
|
||||
@@ -296,6 +452,15 @@ func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Quer
|
||||
return nil
|
||||
}
|
||||
|
||||
func provideMemoryLLM(modelsService *models.Service, queries *dbsqlc.Queries, log *slog.Logger) memory.LLM {
|
||||
return &lazyLLMClient{
|
||||
modelsService: modelsService,
|
||||
queries: queries,
|
||||
timeout: 30 * time.Second,
|
||||
logger: log,
|
||||
}
|
||||
}
|
||||
|
||||
type lazyLLMClient struct {
|
||||
modelsService *models.Service
|
||||
queries *dbsqlc.Queries
|
||||
|
||||
Reference in New Issue
Block a user