mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
Merge branch 'main' into pr/350
This commit is contained in:
+121
-39
@@ -24,18 +24,22 @@ import (
|
||||
"github.com/memohai/memoh/internal/accounts"
|
||||
"github.com/memohai/memoh/internal/acl"
|
||||
agentpkg "github.com/memohai/memoh/internal/agent"
|
||||
"github.com/memohai/memoh/internal/agent/background"
|
||||
agenttools "github.com/memohai/memoh/internal/agent/tools"
|
||||
"github.com/memohai/memoh/internal/bind"
|
||||
"github.com/memohai/memoh/internal/boot"
|
||||
"github.com/memohai/memoh/internal/bots"
|
||||
"github.com/memohai/memoh/internal/browsercontexts"
|
||||
"github.com/memohai/memoh/internal/channel"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/dingtalk"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/discord"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/feishu"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/local"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/matrix"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/misskey"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/qq"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/telegram"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/wechatoa"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/wecom"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/weixin"
|
||||
"github.com/memohai/memoh/internal/channel/identities"
|
||||
@@ -227,6 +231,7 @@ func runServe() {
|
||||
provideContainerdHandler,
|
||||
provideFederationGateway,
|
||||
provideToolGatewayService,
|
||||
provideBackgroundManager,
|
||||
provideToolProviders,
|
||||
|
||||
// http handlers (group:"server_handlers")
|
||||
@@ -247,7 +252,7 @@ func runServe() {
|
||||
provideServerHandler(handlers.NewHeartbeatHandler),
|
||||
provideServerHandler(handlers.NewCompactionHandler),
|
||||
provideServerHandler(handlers.NewChannelHandler),
|
||||
provideServerHandler(feishu.NewWebhookServerHandler),
|
||||
provideServerHandler(channel.NewWebhookServerHandler),
|
||||
provideServerHandler(weixin.NewQRServerHandler),
|
||||
provideServerHandler(provideUsersHandler),
|
||||
provideServerHandler(handlers.NewMemoryProvidersHandler),
|
||||
@@ -277,9 +282,11 @@ func runServe() {
|
||||
|
||||
startScheduleService,
|
||||
startHeartbeatService,
|
||||
wireResolverOutbound,
|
||||
startChannelManager,
|
||||
startEmailManager,
|
||||
startContainerReconciliation,
|
||||
startBackgroundTaskCleanup,
|
||||
startTtsTempStoreCleanup,
|
||||
startServer,
|
||||
),
|
||||
@@ -359,12 +366,13 @@ func provideWorkspaceManager(log *slog.Logger, service ctr.Service, cfg config.C
|
||||
// memory providers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func provideMemoryLLM(modelsService *models.Service, queries *dbsqlc.Queries, log *slog.Logger) memprovider.LLM {
|
||||
func provideMemoryLLM(modelsService *models.Service, settingsService *settings.Service, queries *dbsqlc.Queries, log *slog.Logger) memprovider.LLM {
|
||||
return &lazyLLMClient{
|
||||
modelsService: modelsService,
|
||||
queries: queries,
|
||||
timeout: 30 * time.Second,
|
||||
logger: log,
|
||||
modelsService: modelsService,
|
||||
settingsService: settingsService,
|
||||
queries: queries,
|
||||
timeout: 30 * time.Second,
|
||||
logger: log,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -482,15 +490,20 @@ func injectToolProviders(a *agentpkg.Agent, msgService *message.DBService, provi
|
||||
}
|
||||
}
|
||||
|
||||
func provideChatResolver(log *slog.Logger, a *agentpkg.Agent, modelsService *models.Service, queries *dbsqlc.Queries, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, accountService *accounts.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler, memoryRegistry *memprovider.Registry, sessionService *sessionpkg.Service, eventHub *event.Hub, compactionService *compaction.Service, pipeline *pipelinepkg.Pipeline, rc *boot.RuntimeConfig) *flow.Resolver {
|
||||
func provideChatResolver(log *slog.Logger, a *agentpkg.Agent, modelsService *models.Service, queries *dbsqlc.Queries, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, accountService *accounts.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler, memoryRegistry *memprovider.Registry, routeService *route.DBService, sessionService *sessionpkg.Service, eventHub *event.Hub, compactionService *compaction.Service, pipeline *pipelinepkg.Pipeline, rc *boot.RuntimeConfig, bgManager *background.Manager) *flow.Resolver {
|
||||
resolver := flow.NewResolver(log, modelsService, queries, chatService, msgService, settingsService, accountService, a, rc.TimezoneLocation, 120*time.Second)
|
||||
resolver.SetMemoryRegistry(memoryRegistry)
|
||||
resolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler})
|
||||
resolver.SetGatewayAssetLoader(&gatewayAssetLoaderAdapter{media: mediaService})
|
||||
resolver.SetRouteService(routeService)
|
||||
resolver.SetSessionService(sessionService)
|
||||
resolver.SetEventPublisher(eventHub)
|
||||
resolver.SetCompactionService(compactionService)
|
||||
resolver.SetPipeline(pipeline)
|
||||
resolver.SetBackgroundManager(bgManager)
|
||||
bgManager.SetWakeFunc(func(botID, sessionID string) {
|
||||
resolver.TriggerBackgroundNotification(context.Background(), botID, sessionID)
|
||||
})
|
||||
return resolver
|
||||
}
|
||||
|
||||
@@ -522,10 +535,17 @@ func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub, mediaService
|
||||
feishuAdapter.SetAssetOpener(mediaService)
|
||||
registry.MustRegister(feishuAdapter)
|
||||
registry.MustRegister(wecom.NewWeComAdapter(log))
|
||||
dingTalkAdapter := dingtalk.NewDingTalkAdapter(log)
|
||||
registry.MustRegister(dingTalkAdapter)
|
||||
registry.MustRegister(wechatoa.NewWeChatOAAdapter(log))
|
||||
weixinAdapter := weixin.NewWeixinAdapter(log)
|
||||
weixinAdapter.SetAssetOpener(mediaService)
|
||||
registry.MustRegister(weixinAdapter)
|
||||
registry.MustRegister(local.NewWebAdapter(hub))
|
||||
|
||||
// Misskey
|
||||
registry.MustRegister(misskey.NewMisskeyAdapter(log))
|
||||
|
||||
return registry
|
||||
}
|
||||
|
||||
@@ -599,19 +619,21 @@ func provideChannelRouter(
|
||||
emailOutboxService,
|
||||
heartbeatService,
|
||||
queries,
|
||||
aclService,
|
||||
&commandSkillLoaderAdapter{handler: containerdHandler},
|
||||
&commandContainerFSAdapter{manager: manager},
|
||||
))
|
||||
return processor
|
||||
}
|
||||
|
||||
func provideChannelManager(log *slog.Logger, registry *channel.Registry, channelStore *channel.Store, channelRouter *inbound.ChannelInboundProcessor) *channel.Manager {
|
||||
func provideChannelManager(log *slog.Logger, registry *channel.Registry, channelStore *channel.Store, channelRouter *inbound.ChannelInboundProcessor, mediaService *media.Service) *channel.Manager {
|
||||
if adapter, ok := registry.Get(matrix.Type); ok {
|
||||
if matrixAdapter, ok := adapter.(*matrix.MatrixAdapter); ok {
|
||||
matrixAdapter.SetSyncStateSaver(channelStore.SaveMatrixSyncSinceToken)
|
||||
}
|
||||
}
|
||||
mgr := channel.NewManager(log, registry, channelStore, channelRouter)
|
||||
mgr.SetAttachmentStore(mediaService)
|
||||
if mw := channelRouter.IdentityMiddleware(); mw != nil {
|
||||
mgr.Use(mw)
|
||||
}
|
||||
@@ -656,7 +678,11 @@ func provideToolGatewayService(log *slog.Logger, fedGateway *handlers.MCPFederat
|
||||
return svc
|
||||
}
|
||||
|
||||
func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, routeService *route.DBService, scheduleService *schedule.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *workspace.Manager, mediaService *media.Service, memoryRegistry *memprovider.Registry, emailService *emailpkg.Service, emailManager *emailpkg.Manager, fedGateway *handlers.MCPFederationGateway, mcpConnService *mcp.ConnectionService, modelsService *models.Service, browserContextService *browsercontexts.Service, queries *dbsqlc.Queries, ttsService *ttspkg.Service, sessionService *sessionpkg.Service) []agenttools.ToolProvider {
|
||||
func provideBackgroundManager(log *slog.Logger) *background.Manager {
|
||||
return background.New(log)
|
||||
}
|
||||
|
||||
func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, routeService *route.DBService, scheduleService *schedule.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *workspace.Manager, mediaService *media.Service, memoryRegistry *memprovider.Registry, emailService *emailpkg.Service, emailManager *emailpkg.Manager, fedGateway *handlers.MCPFederationGateway, mcpConnService *mcp.ConnectionService, modelsService *models.Service, browserContextService *browsercontexts.Service, queries *dbsqlc.Queries, ttsService *ttspkg.Service, sessionService *sessionpkg.Service, bgManager *background.Manager) []agenttools.ToolProvider {
|
||||
var assetResolver messaging.AssetResolver
|
||||
if mediaService != nil {
|
||||
assetResolver = &mediaAssetResolverAdapter{media: mediaService}
|
||||
@@ -668,7 +694,7 @@ func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *c
|
||||
agenttools.NewScheduleProvider(log, scheduleService),
|
||||
agenttools.NewMemoryProvider(log, memoryRegistry, settingsService),
|
||||
agenttools.NewWebProvider(log, settingsService, searchProviderService),
|
||||
agenttools.NewContainerProvider(log, manager, config.DefaultDataMount),
|
||||
agenttools.NewContainerProvider(log, manager, bgManager, config.DefaultDataMount),
|
||||
agenttools.NewEmailProvider(log, emailService, emailManager),
|
||||
agenttools.NewWebFetchProvider(log),
|
||||
agenttools.NewSpawnProvider(log, settingsService, modelsService, queries, sessionService),
|
||||
@@ -758,6 +784,20 @@ func startTtsTempStoreCleanup(lc fx.Lifecycle, store *ttspkg.TempStore) {
|
||||
})
|
||||
}
|
||||
|
||||
func startBackgroundTaskCleanup(lc fx.Lifecycle, mgr *background.Manager) {
|
||||
done := make(chan struct{})
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(_ context.Context) error {
|
||||
go mgr.StartCleanupLoop(done, background.DefaultCleanupInterval, background.DefaultTaskRetention)
|
||||
return nil
|
||||
},
|
||||
OnStop: func(_ context.Context) error {
|
||||
close(done)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// settingsTtsModelResolver adapts settings.Service to the ttsModelResolver interface
|
||||
// expected by ChannelInboundProcessor and LocalChannelHandler.
|
||||
// sessionEnsurerAdapter adapts session.Service to the inbound sessionEnsurer interface.
|
||||
@@ -773,6 +813,14 @@ func (a *sessionEnsurerAdapter) EnsureActiveSession(ctx context.Context, botID,
|
||||
return inbound.SessionResult{ID: sess.ID, Type: sess.Type}, nil
|
||||
}
|
||||
|
||||
func (a *sessionEnsurerAdapter) GetActiveSession(ctx context.Context, routeID string) (inbound.SessionResult, error) {
|
||||
sess, err := a.svc.GetActiveForRoute(ctx, routeID)
|
||||
if err != nil {
|
||||
return inbound.SessionResult{}, err
|
||||
}
|
||||
return inbound.SessionResult{ID: sess.ID, Type: sess.Type}, nil
|
||||
}
|
||||
|
||||
func (a *sessionEnsurerAdapter) CreateNewSession(ctx context.Context, botID, routeID, channelType, sessionType string) (inbound.SessionResult, error) {
|
||||
sess, err := a.svc.CreateNewSession(ctx, botID, routeID, channelType, sessionType)
|
||||
if err != nil {
|
||||
@@ -937,6 +985,15 @@ func startHeartbeatService(lc fx.Lifecycle, heartbeatService *heartbeat.Service)
|
||||
})
|
||||
}
|
||||
|
||||
func wireResolverOutbound(resolver *flow.Resolver, channelManager *channel.Manager) {
|
||||
resolver.SetOutboundFn(func(ctx context.Context, botID, channelType, target, text string) error {
|
||||
return channelManager.Send(ctx, botID, channel.ChannelType(channelType), channel.SendRequest{
|
||||
Target: target,
|
||||
Message: channel.Message{Text: text},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func startChannelManager(lc fx.Lifecycle, channelManager *channel.Manager) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
lc.Append(fx.Hook{
|
||||
@@ -1069,14 +1126,15 @@ func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Quer
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type lazyLLMClient struct {
|
||||
modelsService *models.Service
|
||||
queries *dbsqlc.Queries
|
||||
timeout time.Duration
|
||||
logger *slog.Logger
|
||||
modelsService *models.Service
|
||||
settingsService *settings.Service
|
||||
queries *dbsqlc.Queries
|
||||
timeout time.Duration
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Extract(ctx context.Context, req memprovider.ExtractRequest) (memprovider.ExtractResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, req.BotID)
|
||||
if err != nil {
|
||||
return memprovider.ExtractResponse{}, err
|
||||
}
|
||||
@@ -1084,7 +1142,7 @@ func (c *lazyLLMClient) Extract(ctx context.Context, req memprovider.ExtractRequ
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Decide(ctx context.Context, req memprovider.DecideRequest) (memprovider.DecideResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, req.BotID)
|
||||
if err != nil {
|
||||
return memprovider.DecideResponse{}, err
|
||||
}
|
||||
@@ -1092,7 +1150,7 @@ func (c *lazyLLMClient) Decide(ctx context.Context, req memprovider.DecideReques
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Compact(ctx context.Context, req memprovider.CompactRequest) (memprovider.CompactResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, "")
|
||||
if err != nil {
|
||||
return memprovider.CompactResponse{}, err
|
||||
}
|
||||
@@ -1100,18 +1158,32 @@ func (c *lazyLLMClient) Compact(ctx context.Context, req memprovider.CompactRequ
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) DetectLanguage(ctx context.Context, text string) (string, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, "")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return client.DetectLanguage(ctx, text)
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) resolve(ctx context.Context) (memprovider.LLM, error) {
|
||||
func (c *lazyLLMClient) resolve(ctx context.Context, botID string) (memprovider.LLM, error) {
|
||||
if c.modelsService == nil || c.queries == nil {
|
||||
return nil, errors.New("models service not configured")
|
||||
}
|
||||
memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, "")
|
||||
|
||||
// Try to use the bot's configured chat model for memory operations.
|
||||
chatModelID := ""
|
||||
if c.settingsService != nil && strings.TrimSpace(botID) != "" {
|
||||
if botSettings, err := c.settingsService.GetBot(ctx, botID); err == nil {
|
||||
// Prefer compaction model (smaller/cheaper), then chat model.
|
||||
if id := strings.TrimSpace(botSettings.CompactionModelID); id != "" {
|
||||
chatModelID = id
|
||||
} else if id := strings.TrimSpace(botSettings.ChatModelID); id != "" {
|
||||
chatModelID = id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, chatModelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1151,36 +1223,46 @@ type mediaAssetResolverAdapter struct {
|
||||
media *media.Service
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) Stat(ctx context.Context, botID, contentHash string) (media.Asset, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return media.Asset{}, errors.New("media service not configured")
|
||||
}
|
||||
return a.media.Stat(ctx, botID, contentHash)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) Open(ctx context.Context, botID, contentHash string) (io.ReadCloser, media.Asset, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return nil, media.Asset{}, errors.New("media service not configured")
|
||||
}
|
||||
return a.media.Open(ctx, botID, contentHash)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) Ingest(ctx context.Context, input media.IngestInput) (media.Asset, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return media.Asset{}, errors.New("media service not configured")
|
||||
}
|
||||
return a.media.Ingest(ctx, input)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) GetByStorageKey(ctx context.Context, botID, storageKey string) (messaging.AssetMeta, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return messaging.AssetMeta{}, errors.New("media service not configured")
|
||||
}
|
||||
asset, err := a.media.GetByStorageKey(ctx, botID, storageKey)
|
||||
if err != nil {
|
||||
return messaging.AssetMeta{}, err
|
||||
return a.media.GetByStorageKey(ctx, botID, storageKey)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) AccessPath(asset media.Asset) string {
|
||||
if a == nil || a.media == nil {
|
||||
return ""
|
||||
}
|
||||
return messaging.AssetMeta{
|
||||
ContentHash: asset.ContentHash,
|
||||
Mime: asset.Mime,
|
||||
SizeBytes: asset.SizeBytes,
|
||||
StorageKey: asset.StorageKey,
|
||||
}, nil
|
||||
return a.media.AccessPath(asset)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) IngestContainerFile(ctx context.Context, botID, containerPath string) (messaging.AssetMeta, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return messaging.AssetMeta{}, errors.New("media service not configured")
|
||||
}
|
||||
asset, err := a.media.IngestContainerFile(ctx, botID, containerPath)
|
||||
if err != nil {
|
||||
return messaging.AssetMeta{}, err
|
||||
}
|
||||
return messaging.AssetMeta{
|
||||
ContentHash: asset.ContentHash,
|
||||
Mime: asset.Mime,
|
||||
SizeBytes: asset.SizeBytes,
|
||||
StorageKey: asset.StorageKey,
|
||||
}, nil
|
||||
return a.media.IngestContainerFile(ctx, botID, containerPath)
|
||||
}
|
||||
|
||||
// gatewayAssetLoaderAdapter bridges media service to flow gateway asset loader.
|
||||
|
||||
+17
-2
@@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
||||
"github.com/memohai/memoh/internal/logger"
|
||||
@@ -49,7 +50,7 @@ func initDataDir() {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := os.WriteFile(dst, data, fs.FileMode(0o644)); err != nil {
|
||||
if err := os.WriteFile(dst, data, fs.FileMode(0o644)); err != nil { //nolint:gosec // G703: dst is built from filepath.Join(defaultWorkDir, e.Name()) where e comes from os.ReadDir
|
||||
logger.Warn("failed to seed template", slog.String("file", e.Name()), slog.Any("error", err))
|
||||
}
|
||||
}
|
||||
@@ -91,7 +92,21 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
srv := grpc.NewServer()
|
||||
srv := grpc.NewServer(
|
||||
grpc.MaxRecvMsgSize(16*1024*1024),
|
||||
grpc.MaxSendMsgSize(16*1024*1024),
|
||||
grpc.KeepaliveParams(keepalive.ServerParameters{
|
||||
MaxConnectionIdle: 5 * time.Minute,
|
||||
MaxConnectionAge: 30 * time.Minute,
|
||||
MaxConnectionAgeGrace: 10 * time.Second,
|
||||
Time: 60 * time.Second,
|
||||
Timeout: 15 * time.Second,
|
||||
}),
|
||||
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
||||
MinTime: 10 * time.Second,
|
||||
PermitWithoutStream: true,
|
||||
}),
|
||||
)
|
||||
pb.RegisterContainerServiceServer(srv, &containerServer{})
|
||||
reflection.Register(srv)
|
||||
|
||||
|
||||
+46
-19
@@ -24,14 +24,15 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
readMaxLines = 200
|
||||
readMaxBytes = 5120
|
||||
readMaxLineLen = 1000
|
||||
listMaxEntries = 200
|
||||
binaryProbeBytes = 8 * 1024
|
||||
rawChunkSize = 64 * 1024
|
||||
defaultWorkDir = "/data"
|
||||
defaultTimeout = 30
|
||||
readMaxLines = 2000
|
||||
readMaxBytes = 0 // 0 = no byte limit (line count only)
|
||||
readMaxLineLen = 0 // 0 = no per-line truncation
|
||||
listMaxEntries = 200
|
||||
binaryProbeBytes = 8 * 1024
|
||||
rawChunkSize = 64 * 1024
|
||||
defaultWorkDir = "/data"
|
||||
defaultTimeout = 30
|
||||
defaultPTYTimeout = 5 * 60 // 5 minutes max for PTY sessions (agent tool calls)
|
||||
)
|
||||
|
||||
type containerServer struct {
|
||||
@@ -89,12 +90,12 @@ func (*containerServer) ReadFile(_ context.Context, req *pb.ReadFileRequest) (*p
|
||||
}
|
||||
|
||||
line := scanner.Text()
|
||||
if utf8.RuneCountInString(line) > readMaxLineLen {
|
||||
if readMaxLineLen > 0 && utf8.RuneCountInString(line) > readMaxLineLen {
|
||||
line = truncateRunes(line, readMaxLineLen) + "..."
|
||||
}
|
||||
|
||||
entry := line + "\n"
|
||||
if bytesWritten+len(entry) > readMaxBytes {
|
||||
if readMaxBytes > 0 && bytesWritten+len(entry) > readMaxBytes {
|
||||
break
|
||||
}
|
||||
out.WriteString(entry)
|
||||
@@ -288,11 +289,18 @@ func execPTY(stream pb.ContainerService_ExecServer, firstMsg *pb.ExecInput) erro
|
||||
workDir = defaultWorkDir
|
||||
}
|
||||
|
||||
timeout := int(firstMsg.GetTimeoutSeconds())
|
||||
if timeout <= 0 {
|
||||
timeout = defaultPTYTimeout
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(stream.Context(), time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var cmd *exec.Cmd
|
||||
if isBarePath(command) {
|
||||
cmd = exec.CommandContext(stream.Context(), command) //nolint:gosec // G204: intentional
|
||||
cmd = exec.CommandContext(ctx, command) //nolint:gosec // G204: intentional
|
||||
} else {
|
||||
cmd = exec.CommandContext(stream.Context(), "/bin/sh", "-c", command) //nolint:gosec // G204: intentional
|
||||
cmd = exec.CommandContext(ctx, "/bin/sh", "-c", command) //nolint:gosec // G204: intentional
|
||||
}
|
||||
cmd.Dir = workDir
|
||||
cmd.Env = append(os.Environ(), firstMsg.GetEnv()...)
|
||||
@@ -361,13 +369,14 @@ func execPipe(stream pb.ContainerService_ExecServer, firstMsg *pb.ExecInput) err
|
||||
timeout = defaultTimeout
|
||||
}
|
||||
|
||||
// Keep non-PTY execs alive across transport cancellation so a dropped
|
||||
// stream does not rewrite a successful command into exit -1. The timeout
|
||||
// still bounds command lifetime, and stream shutdown still closes stdin.
|
||||
ctx, cancel := context.WithTimeout(context.WithoutCancel(stream.Context()), time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
// Process context is independent of the gRPC stream so the process keeps
|
||||
// running even if the stream is cancelled (e.g. background tasks whose client
|
||||
// disconnects or whose stream context dies after the process completes).
|
||||
// Only the process-level timeout kills the process, not stream death.
|
||||
procCtx, procCancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
|
||||
defer procCancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "/bin/sh", "-c", command) //nolint:gosec // G204: MCP exec tool intentionally executes agent-issued shell commands inside the container
|
||||
cmd := exec.CommandContext(procCtx, "/bin/sh", "-c", command) //nolint:gosec // G204: MCP exec tool intentionally executes agent-issued shell commands inside the container
|
||||
cmd.Dir = workDir
|
||||
if len(firstMsg.GetEnv()) > 0 {
|
||||
cmd.Env = append(os.Environ(), firstMsg.GetEnv()...)
|
||||
@@ -391,6 +400,19 @@ func execPipe(stream pb.ContainerService_ExecServer, firstMsg *pb.ExecInput) err
|
||||
return status.Errorf(codes.Internal, "start: %v", err)
|
||||
}
|
||||
|
||||
// Close pipes when EITHER the process finishes (procCtx done) OR the gRPC
|
||||
// stream dies (stream.Context done). Closing unblocks streamPipe's Read so
|
||||
// the goroutines can exit. We do NOT cancel procCtx on stream death — the
|
||||
// process keeps running so background tasks survive client disconnects.
|
||||
go func() {
|
||||
select {
|
||||
case <-procCtx.Done():
|
||||
case <-stream.Context().Done():
|
||||
}
|
||||
_ = stdoutPipe.Close()
|
||||
_ = stderrPipe.Close()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
msg, recvErr := stream.Recv()
|
||||
@@ -423,10 +445,15 @@ func execPipe(stream pb.ContainerService_ExecServer, firstMsg *pb.ExecInput) err
|
||||
}
|
||||
}
|
||||
|
||||
return stream.Send(&pb.ExecOutput{
|
||||
// Send exit code to the client. If the stream is already gone (e.g. the
|
||||
// client is a background task manager that got a stream error when the
|
||||
// process completed), the send will fail but we return nil so the gRPC
|
||||
// handler does not propagate a spurious "context canceled" error status.
|
||||
_ = stream.Send(&pb.ExecOutput{
|
||||
Stream: pb.ExecOutput_EXIT,
|
||||
ExitCode: exitCode,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*containerServer) ReadRaw(req *pb.ReadRawRequest, stream pb.ContainerService_ReadRawServer) error {
|
||||
|
||||
+107
-26
@@ -24,6 +24,7 @@ import (
|
||||
"github.com/memohai/memoh/internal/accounts"
|
||||
"github.com/memohai/memoh/internal/acl"
|
||||
agentpkg "github.com/memohai/memoh/internal/agent"
|
||||
"github.com/memohai/memoh/internal/agent/background"
|
||||
agenttools "github.com/memohai/memoh/internal/agent/tools"
|
||||
"github.com/memohai/memoh/internal/auth"
|
||||
"github.com/memohai/memoh/internal/bind"
|
||||
@@ -31,12 +32,15 @@ import (
|
||||
"github.com/memohai/memoh/internal/bots"
|
||||
"github.com/memohai/memoh/internal/browsercontexts"
|
||||
"github.com/memohai/memoh/internal/channel"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/dingtalk"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/discord"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/feishu"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/local"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/matrix"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/misskey"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/qq"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/telegram"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/wechatoa"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/wecom"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/weixin"
|
||||
"github.com/memohai/memoh/internal/channel/identities"
|
||||
@@ -153,6 +157,7 @@ func runServe() {
|
||||
provideContainerdHandler,
|
||||
provideFederationGateway,
|
||||
provideToolGatewayService,
|
||||
provideBackgroundManager,
|
||||
provideToolProviders,
|
||||
provideServerHandler(handlers.NewPingHandler),
|
||||
provideServerHandler(provideMemohAuthHandler),
|
||||
@@ -171,7 +176,7 @@ func runServe() {
|
||||
provideServerHandler(handlers.NewHeartbeatHandler),
|
||||
provideServerHandler(handlers.NewCompactionHandler),
|
||||
provideServerHandler(handlers.NewChannelHandler),
|
||||
provideServerHandler(feishu.NewWebhookServerHandler),
|
||||
provideServerHandler(channel.NewWebhookServerHandler),
|
||||
provideServerHandler(weixin.NewQRServerHandler),
|
||||
provideServerHandler(provideUsersHandler),
|
||||
provideServerHandler(handlers.NewMemoryProvidersHandler),
|
||||
@@ -204,6 +209,7 @@ func runServe() {
|
||||
startChannelManager,
|
||||
startEmailManager,
|
||||
startContainerReconciliation,
|
||||
startBackgroundTaskCleanup,
|
||||
startTtsTempStoreCleanup,
|
||||
startServer,
|
||||
),
|
||||
@@ -258,8 +264,8 @@ func provideWorkspaceManager(log *slog.Logger, service ctr.Service, cfg config.C
|
||||
return workspace.NewManager(log, service, cfg.Workspace, cfg.Containerd.Namespace, conn)
|
||||
}
|
||||
|
||||
func provideMemoryLLM(modelsService *models.Service, queries *dbsqlc.Queries, log *slog.Logger) memprovider.LLM {
|
||||
return &lazyLLMClient{modelsService: modelsService, queries: queries, timeout: 30 * time.Second, logger: log}
|
||||
func provideMemoryLLM(modelsService *models.Service, settingsService *settings.Service, queries *dbsqlc.Queries, log *slog.Logger) memprovider.LLM {
|
||||
return &lazyLLMClient{modelsService: modelsService, settingsService: settingsService, queries: queries, timeout: 30 * time.Second, logger: log}
|
||||
}
|
||||
|
||||
func provideMemoryProviderRegistry(log *slog.Logger, llm memprovider.LLM, chatService *conversation.Service, accountService *accounts.Service, manager *workspace.Manager, queries *dbsqlc.Queries, cfg config.Config) *memprovider.Registry {
|
||||
@@ -414,15 +420,20 @@ func injectToolProviders(a *agentpkg.Agent, msgService *message.DBService, provi
|
||||
}
|
||||
}
|
||||
|
||||
func provideChatResolver(log *slog.Logger, a *agentpkg.Agent, modelsService *models.Service, queries *dbsqlc.Queries, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, accountService *accounts.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler, memoryRegistry *memprovider.Registry, sessionService *sessionpkg.Service, eventHub *event.Hub, compactionService *compaction.Service, pipeline *pipelinepkg.Pipeline, rc *boot.RuntimeConfig) *flow.Resolver {
|
||||
func provideChatResolver(log *slog.Logger, a *agentpkg.Agent, modelsService *models.Service, queries *dbsqlc.Queries, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, accountService *accounts.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler, memoryRegistry *memprovider.Registry, routeService *route.DBService, sessionService *sessionpkg.Service, eventHub *event.Hub, compactionService *compaction.Service, pipeline *pipelinepkg.Pipeline, rc *boot.RuntimeConfig, bgManager *background.Manager) *flow.Resolver {
|
||||
resolver := flow.NewResolver(log, modelsService, queries, chatService, msgService, settingsService, accountService, a, rc.TimezoneLocation, 120*time.Second)
|
||||
resolver.SetMemoryRegistry(memoryRegistry)
|
||||
resolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler})
|
||||
resolver.SetGatewayAssetLoader(&gatewayAssetLoaderAdapter{media: mediaService})
|
||||
resolver.SetRouteService(routeService)
|
||||
resolver.SetSessionService(sessionService)
|
||||
resolver.SetEventPublisher(eventHub)
|
||||
resolver.SetCompactionService(compactionService)
|
||||
resolver.SetPipeline(pipeline)
|
||||
resolver.SetBackgroundManager(bgManager)
|
||||
bgManager.SetWakeFunc(func(botID, sessionID string) {
|
||||
resolver.TriggerBackgroundNotification(context.Background(), botID, sessionID)
|
||||
})
|
||||
return resolver
|
||||
}
|
||||
|
||||
@@ -444,10 +455,17 @@ func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub, mediaService
|
||||
feishuAdapter.SetAssetOpener(mediaService)
|
||||
registry.MustRegister(feishuAdapter)
|
||||
registry.MustRegister(wecom.NewWeComAdapter(log))
|
||||
dingTalkAdapter := dingtalk.NewDingTalkAdapter(log)
|
||||
registry.MustRegister(dingTalkAdapter)
|
||||
registry.MustRegister(wechatoa.NewWeChatOAAdapter(log))
|
||||
weixinAdapter := weixin.NewWeixinAdapter(log)
|
||||
weixinAdapter.SetAssetOpener(mediaService)
|
||||
registry.MustRegister(weixinAdapter)
|
||||
registry.MustRegister(local.NewWebAdapter(hub))
|
||||
|
||||
// Misskey
|
||||
registry.MustRegister(misskey.NewMisskeyAdapter(log))
|
||||
|
||||
return registry
|
||||
}
|
||||
|
||||
@@ -487,19 +505,21 @@ func provideChannelRouter(log *slog.Logger, registry *channel.Registry, hub *loc
|
||||
emailOutboxService,
|
||||
heartbeatService,
|
||||
queries,
|
||||
aclService,
|
||||
&commandSkillLoaderAdapter{handler: containerdHandler},
|
||||
&commandContainerFSAdapter{manager: manager},
|
||||
))
|
||||
return processor
|
||||
}
|
||||
|
||||
func provideChannelManager(log *slog.Logger, registry *channel.Registry, channelStore *channel.Store, channelRouter *inbound.ChannelInboundProcessor) *channel.Manager {
|
||||
func provideChannelManager(log *slog.Logger, registry *channel.Registry, channelStore *channel.Store, channelRouter *inbound.ChannelInboundProcessor, mediaService *media.Service) *channel.Manager {
|
||||
if adapter, ok := registry.Get(matrix.Type); ok {
|
||||
if matrixAdapter, ok := adapter.(*matrix.MatrixAdapter); ok {
|
||||
matrixAdapter.SetSyncStateSaver(channelStore.SaveMatrixSyncSinceToken)
|
||||
}
|
||||
}
|
||||
mgr := channel.NewManager(log, registry, channelStore, channelRouter)
|
||||
mgr.SetAttachmentStore(mediaService)
|
||||
if mw := channelRouter.IdentityMiddleware(); mw != nil {
|
||||
mgr.Use(mw)
|
||||
}
|
||||
@@ -540,7 +560,11 @@ func provideToolGatewayService(log *slog.Logger, fedGateway *handlers.MCPFederat
|
||||
return svc
|
||||
}
|
||||
|
||||
func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, routeService *route.DBService, scheduleService *schedule.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *workspace.Manager, mediaService *media.Service, memoryRegistry *memprovider.Registry, emailService *emailpkg.Service, emailManager *emailpkg.Manager, fedGateway *handlers.MCPFederationGateway, mcpConnService *mcp.ConnectionService, modelsService *models.Service, browserContextService *browsercontexts.Service, queries *dbsqlc.Queries, ttsService *ttspkg.Service, sessionService *sessionpkg.Service) []agenttools.ToolProvider {
|
||||
func provideBackgroundManager(log *slog.Logger) *background.Manager {
|
||||
return background.New(log)
|
||||
}
|
||||
|
||||
func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, routeService *route.DBService, scheduleService *schedule.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *workspace.Manager, mediaService *media.Service, memoryRegistry *memprovider.Registry, emailService *emailpkg.Service, emailManager *emailpkg.Manager, fedGateway *handlers.MCPFederationGateway, mcpConnService *mcp.ConnectionService, modelsService *models.Service, browserContextService *browsercontexts.Service, queries *dbsqlc.Queries, ttsService *ttspkg.Service, sessionService *sessionpkg.Service, bgManager *background.Manager) []agenttools.ToolProvider {
|
||||
var assetResolver messaging.AssetResolver
|
||||
if mediaService != nil {
|
||||
assetResolver = &mediaAssetResolverAdapter{media: mediaService}
|
||||
@@ -552,7 +576,7 @@ func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *c
|
||||
agenttools.NewScheduleProvider(log, scheduleService),
|
||||
agenttools.NewMemoryProvider(log, memoryRegistry, settingsService),
|
||||
agenttools.NewWebProvider(log, settingsService, searchProviderService),
|
||||
agenttools.NewContainerProvider(log, manager, config.DefaultDataMount),
|
||||
agenttools.NewContainerProvider(log, manager, bgManager, config.DefaultDataMount),
|
||||
agenttools.NewEmailProvider(log, emailService, emailManager),
|
||||
agenttools.NewWebFetchProvider(log),
|
||||
agenttools.NewSpawnProvider(log, settingsService, modelsService, queries, sessionService),
|
||||
@@ -857,6 +881,20 @@ func startTtsTempStoreCleanup(lc fx.Lifecycle, store *ttspkg.TempStore) {
|
||||
})
|
||||
}
|
||||
|
||||
func startBackgroundTaskCleanup(lc fx.Lifecycle, mgr *background.Manager) {
|
||||
done := make(chan struct{})
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(_ context.Context) error {
|
||||
go mgr.StartCleanupLoop(done, background.DefaultCleanupInterval, background.DefaultTaskRetention)
|
||||
return nil
|
||||
},
|
||||
OnStop: func(_ context.Context) error {
|
||||
close(done)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// settingsTtsModelResolver adapts settings.Service to the ttsModelResolver interface
|
||||
// expected by ChannelInboundProcessor and LocalChannelHandler.
|
||||
type sessionEnsurerAdapter struct {
|
||||
@@ -871,6 +909,14 @@ func (a *sessionEnsurerAdapter) EnsureActiveSession(ctx context.Context, botID,
|
||||
return inbound.SessionResult{ID: sess.ID, Type: sess.Type}, nil
|
||||
}
|
||||
|
||||
func (a *sessionEnsurerAdapter) GetActiveSession(ctx context.Context, routeID string) (inbound.SessionResult, error) {
|
||||
sess, err := a.svc.GetActiveForRoute(ctx, routeID)
|
||||
if err != nil {
|
||||
return inbound.SessionResult{}, err
|
||||
}
|
||||
return inbound.SessionResult{ID: sess.ID, Type: sess.Type}, nil
|
||||
}
|
||||
|
||||
func (a *sessionEnsurerAdapter) CreateNewSession(ctx context.Context, botID, routeID, channelType, sessionType string) (inbound.SessionResult, error) {
|
||||
sess, err := a.svc.CreateNewSession(ctx, botID, routeID, channelType, sessionType)
|
||||
if err != nil {
|
||||
@@ -991,14 +1037,15 @@ func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Quer
|
||||
}
|
||||
|
||||
type lazyLLMClient struct {
|
||||
modelsService *models.Service
|
||||
queries *dbsqlc.Queries
|
||||
timeout time.Duration
|
||||
logger *slog.Logger
|
||||
modelsService *models.Service
|
||||
settingsService *settings.Service
|
||||
queries *dbsqlc.Queries
|
||||
timeout time.Duration
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Extract(ctx context.Context, req memprovider.ExtractRequest) (memprovider.ExtractResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, req.BotID)
|
||||
if err != nil {
|
||||
return memprovider.ExtractResponse{}, err
|
||||
}
|
||||
@@ -1006,7 +1053,7 @@ func (c *lazyLLMClient) Extract(ctx context.Context, req memprovider.ExtractRequ
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Decide(ctx context.Context, req memprovider.DecideRequest) (memprovider.DecideResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, req.BotID)
|
||||
if err != nil {
|
||||
return memprovider.DecideResponse{}, err
|
||||
}
|
||||
@@ -1014,7 +1061,7 @@ func (c *lazyLLMClient) Decide(ctx context.Context, req memprovider.DecideReques
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Compact(ctx context.Context, req memprovider.CompactRequest) (memprovider.CompactResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, "")
|
||||
if err != nil {
|
||||
return memprovider.CompactResponse{}, err
|
||||
}
|
||||
@@ -1022,18 +1069,32 @@ func (c *lazyLLMClient) Compact(ctx context.Context, req memprovider.CompactRequ
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) DetectLanguage(ctx context.Context, text string) (string, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, "")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return client.DetectLanguage(ctx, text)
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) resolve(ctx context.Context) (memprovider.LLM, error) {
|
||||
func (c *lazyLLMClient) resolve(ctx context.Context, botID string) (memprovider.LLM, error) {
|
||||
if c.modelsService == nil || c.queries == nil {
|
||||
return nil, errors.New("models service not configured")
|
||||
}
|
||||
memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, "")
|
||||
|
||||
// Try to use the bot's configured chat model for memory operations.
|
||||
chatModelID := ""
|
||||
if c.settingsService != nil && strings.TrimSpace(botID) != "" {
|
||||
if botSettings, err := c.settingsService.GetBot(ctx, botID); err == nil {
|
||||
// Prefer compaction model (smaller/cheaper), then chat model.
|
||||
if id := strings.TrimSpace(botSettings.CompactionModelID); id != "" {
|
||||
chatModelID = id
|
||||
} else if id := strings.TrimSpace(botSettings.ChatModelID); id != "" {
|
||||
chatModelID = id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, chatModelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1062,26 +1123,46 @@ func (a *skillLoaderAdapter) LoadSkills(ctx context.Context, botID string) ([]fl
|
||||
|
||||
type mediaAssetResolverAdapter struct{ media *media.Service }
|
||||
|
||||
func (a *mediaAssetResolverAdapter) Stat(ctx context.Context, botID, contentHash string) (media.Asset, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return media.Asset{}, errors.New("media service not configured")
|
||||
}
|
||||
return a.media.Stat(ctx, botID, contentHash)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) Open(ctx context.Context, botID, contentHash string) (io.ReadCloser, media.Asset, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return nil, media.Asset{}, errors.New("media service not configured")
|
||||
}
|
||||
return a.media.Open(ctx, botID, contentHash)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) Ingest(ctx context.Context, input media.IngestInput) (media.Asset, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return media.Asset{}, errors.New("media service not configured")
|
||||
}
|
||||
return a.media.Ingest(ctx, input)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) GetByStorageKey(ctx context.Context, botID, storageKey string) (messaging.AssetMeta, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return messaging.AssetMeta{}, errors.New("media service not configured")
|
||||
}
|
||||
asset, err := a.media.GetByStorageKey(ctx, botID, storageKey)
|
||||
if err != nil {
|
||||
return messaging.AssetMeta{}, err
|
||||
return a.media.GetByStorageKey(ctx, botID, storageKey)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) AccessPath(asset media.Asset) string {
|
||||
if a == nil || a.media == nil {
|
||||
return ""
|
||||
}
|
||||
return messaging.AssetMeta{ContentHash: asset.ContentHash, Mime: asset.Mime, SizeBytes: asset.SizeBytes, StorageKey: asset.StorageKey}, nil
|
||||
return a.media.AccessPath(asset)
|
||||
}
|
||||
|
||||
func (a *mediaAssetResolverAdapter) IngestContainerFile(ctx context.Context, botID, containerPath string) (messaging.AssetMeta, error) {
|
||||
if a == nil || a.media == nil {
|
||||
return messaging.AssetMeta{}, errors.New("media service not configured")
|
||||
}
|
||||
asset, err := a.media.IngestContainerFile(ctx, botID, containerPath)
|
||||
if err != nil {
|
||||
return messaging.AssetMeta{}, err
|
||||
}
|
||||
return messaging.AssetMeta{ContentHash: asset.ContentHash, Mime: asset.Mime, SizeBytes: asset.SizeBytes, StorageKey: asset.StorageKey}, nil
|
||||
return a.media.IngestContainerFile(ctx, botID, containerPath)
|
||||
}
|
||||
|
||||
type gatewayAssetLoaderAdapter struct{ media *media.Service }
|
||||
|
||||
Reference in New Issue
Block a user