feat(agent): add background task execution and notifications (#365)

This commit is contained in:
EYHN
2026-04-13 20:28:42 +08:00
committed by GitHub
parent df8fbd8859
commit c4114227e5
18 changed files with 2934 additions and 31 deletions
+39 -3
View File
@@ -24,6 +24,7 @@ import (
"github.com/memohai/memoh/internal/accounts"
"github.com/memohai/memoh/internal/acl"
agentpkg "github.com/memohai/memoh/internal/agent"
"github.com/memohai/memoh/internal/agent/background"
agenttools "github.com/memohai/memoh/internal/agent/tools"
"github.com/memohai/memoh/internal/bind"
"github.com/memohai/memoh/internal/boot"
@@ -230,6 +231,7 @@ func runServe() {
provideContainerdHandler,
provideFederationGateway,
provideToolGatewayService,
provideBackgroundManager,
provideToolProviders,
// http handlers (group:"server_handlers")
@@ -280,9 +282,11 @@ func runServe() {
startScheduleService,
startHeartbeatService,
wireResolverOutbound,
startChannelManager,
startEmailManager,
startContainerReconciliation,
startBackgroundTaskCleanup,
startTtsTempStoreCleanup,
startServer,
),
@@ -486,15 +490,20 @@ func injectToolProviders(a *agentpkg.Agent, msgService *message.DBService, provi
}
}
func provideChatResolver(log *slog.Logger, a *agentpkg.Agent, modelsService *models.Service, queries *dbsqlc.Queries, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, accountService *accounts.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler, memoryRegistry *memprovider.Registry, sessionService *sessionpkg.Service, eventHub *event.Hub, compactionService *compaction.Service, pipeline *pipelinepkg.Pipeline, rc *boot.RuntimeConfig) *flow.Resolver {
func provideChatResolver(log *slog.Logger, a *agentpkg.Agent, modelsService *models.Service, queries *dbsqlc.Queries, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, accountService *accounts.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler, memoryRegistry *memprovider.Registry, routeService *route.DBService, sessionService *sessionpkg.Service, eventHub *event.Hub, compactionService *compaction.Service, pipeline *pipelinepkg.Pipeline, rc *boot.RuntimeConfig, bgManager *background.Manager) *flow.Resolver {
resolver := flow.NewResolver(log, modelsService, queries, chatService, msgService, settingsService, accountService, a, rc.TimezoneLocation, 120*time.Second)
resolver.SetMemoryRegistry(memoryRegistry)
resolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler})
resolver.SetGatewayAssetLoader(&gatewayAssetLoaderAdapter{media: mediaService})
resolver.SetRouteService(routeService)
resolver.SetSessionService(sessionService)
resolver.SetEventPublisher(eventHub)
resolver.SetCompactionService(compactionService)
resolver.SetPipeline(pipeline)
resolver.SetBackgroundManager(bgManager)
bgManager.SetWakeFunc(func(botID, sessionID string) {
resolver.TriggerBackgroundNotification(context.Background(), botID, sessionID)
})
return resolver
}
@@ -669,7 +678,11 @@ func provideToolGatewayService(log *slog.Logger, fedGateway *handlers.MCPFederat
return svc
}
func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, routeService *route.DBService, scheduleService *schedule.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *workspace.Manager, mediaService *media.Service, memoryRegistry *memprovider.Registry, emailService *emailpkg.Service, emailManager *emailpkg.Manager, fedGateway *handlers.MCPFederationGateway, mcpConnService *mcp.ConnectionService, modelsService *models.Service, browserContextService *browsercontexts.Service, queries *dbsqlc.Queries, ttsService *ttspkg.Service, sessionService *sessionpkg.Service) []agenttools.ToolProvider {
func provideBackgroundManager(log *slog.Logger) *background.Manager {
return background.New(log)
}
func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, routeService *route.DBService, scheduleService *schedule.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *workspace.Manager, mediaService *media.Service, memoryRegistry *memprovider.Registry, emailService *emailpkg.Service, emailManager *emailpkg.Manager, fedGateway *handlers.MCPFederationGateway, mcpConnService *mcp.ConnectionService, modelsService *models.Service, browserContextService *browsercontexts.Service, queries *dbsqlc.Queries, ttsService *ttspkg.Service, sessionService *sessionpkg.Service, bgManager *background.Manager) []agenttools.ToolProvider {
var assetResolver messaging.AssetResolver
if mediaService != nil {
assetResolver = &mediaAssetResolverAdapter{media: mediaService}
@@ -681,7 +694,7 @@ func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *c
agenttools.NewScheduleProvider(log, scheduleService),
agenttools.NewMemoryProvider(log, memoryRegistry, settingsService),
agenttools.NewWebProvider(log, settingsService, searchProviderService),
agenttools.NewContainerProvider(log, manager, config.DefaultDataMount),
agenttools.NewContainerProvider(log, manager, bgManager, config.DefaultDataMount),
agenttools.NewEmailProvider(log, emailService, emailManager),
agenttools.NewWebFetchProvider(log),
agenttools.NewSpawnProvider(log, settingsService, modelsService, queries, sessionService),
@@ -771,6 +784,20 @@ func startTtsTempStoreCleanup(lc fx.Lifecycle, store *ttspkg.TempStore) {
})
}
func startBackgroundTaskCleanup(lc fx.Lifecycle, mgr *background.Manager) {
done := make(chan struct{})
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
go mgr.StartCleanupLoop(done, background.DefaultCleanupInterval, background.DefaultTaskRetention)
return nil
},
OnStop: func(_ context.Context) error {
close(done)
return nil
},
})
}
// settingsTtsModelResolver adapts settings.Service to the ttsModelResolver interface
// expected by ChannelInboundProcessor and LocalChannelHandler.
// sessionEnsurerAdapter adapts session.Service to the inbound sessionEnsurer interface.
@@ -958,6 +985,15 @@ func startHeartbeatService(lc fx.Lifecycle, heartbeatService *heartbeat.Service)
})
}
func wireResolverOutbound(resolver *flow.Resolver, channelManager *channel.Manager) {
resolver.SetOutboundFn(func(ctx context.Context, botID, channelType, target, text string) error {
return channelManager.Send(ctx, botID, channel.ChannelType(channelType), channel.SendRequest{
Target: target,
Message: channel.Message{Text: text},
})
})
}
func startChannelManager(lc fx.Lifecycle, channelManager *channel.Manager) {
ctx, cancel := context.WithCancel(context.Background())
lc.Append(fx.Hook{
+21 -10
View File
@@ -369,10 +369,14 @@ func execPipe(stream pb.ContainerService_ExecServer, firstMsg *pb.ExecInput) err
timeout = defaultTimeout
}
ctx, cancel := context.WithTimeout(stream.Context(), time.Duration(timeout)*time.Second)
defer cancel()
// Process context is independent of the gRPC stream so the process keeps
// running even if the stream is cancelled (e.g. background tasks whose client
// disconnects or whose stream context dies after the process completes).
// Only the process-level timeout kills the process, not stream death.
procCtx, procCancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer procCancel()
cmd := exec.CommandContext(ctx, "/bin/sh", "-c", command) //nolint:gosec // G204: MCP exec tool intentionally executes agent-issued shell commands inside the container
cmd := exec.CommandContext(procCtx, "/bin/sh", "-c", command) //nolint:gosec // G204: MCP exec tool intentionally executes agent-issued shell commands inside the container
cmd.Dir = workDir
if len(firstMsg.GetEnv()) > 0 {
cmd.Env = append(os.Environ(), firstMsg.GetEnv()...)
@@ -396,13 +400,15 @@ func execPipe(stream pb.ContainerService_ExecServer, firstMsg *pb.ExecInput) err
return status.Errorf(codes.Internal, "start: %v", err)
}
// When the context deadline fires, exec.CommandContext sends SIGKILL to the
// main process. However, child processes may still hold the stdout/stderr
// pipe file descriptors open, causing streamPipe's Read to block forever.
// Closing the pipes here unblocks those reads so the function can proceed
// to cmd.Wait and send the EXIT message back to the client.
// Close pipes when EITHER the process finishes (procCtx done) OR the gRPC
// stream dies (stream.Context done). Closing unblocks streamPipe's Read so
// the goroutines can exit. We do NOT cancel procCtx on stream death — the
// process keeps running so background tasks survive client disconnects.
go func() {
<-ctx.Done()
select {
case <-procCtx.Done():
case <-stream.Context().Done():
}
_ = stdoutPipe.Close()
_ = stderrPipe.Close()
}()
@@ -439,10 +445,15 @@ func execPipe(stream pb.ContainerService_ExecServer, firstMsg *pb.ExecInput) err
}
}
return stream.Send(&pb.ExecOutput{
// Send exit code to the client. If the stream is already gone (e.g. the
// client is a background task manager that got a stream error when the
// process completed), the send will fail but we return nil so the gRPC
// handler does not propagate a spurious "context canceled" error status.
_ = stream.Send(&pb.ExecOutput{
Stream: pb.ExecOutput_EXIT,
ExitCode: exitCode,
})
return nil
}
func (*containerServer) ReadRaw(req *pb.ReadRawRequest, stream pb.ContainerService_ReadRawServer) error {
+29 -3
View File
@@ -24,6 +24,7 @@ import (
"github.com/memohai/memoh/internal/accounts"
"github.com/memohai/memoh/internal/acl"
agentpkg "github.com/memohai/memoh/internal/agent"
"github.com/memohai/memoh/internal/agent/background"
agenttools "github.com/memohai/memoh/internal/agent/tools"
"github.com/memohai/memoh/internal/auth"
"github.com/memohai/memoh/internal/bind"
@@ -156,6 +157,7 @@ func runServe() {
provideContainerdHandler,
provideFederationGateway,
provideToolGatewayService,
provideBackgroundManager,
provideToolProviders,
provideServerHandler(handlers.NewPingHandler),
provideServerHandler(provideMemohAuthHandler),
@@ -207,6 +209,7 @@ func runServe() {
startChannelManager,
startEmailManager,
startContainerReconciliation,
startBackgroundTaskCleanup,
startTtsTempStoreCleanup,
startServer,
),
@@ -417,15 +420,20 @@ func injectToolProviders(a *agentpkg.Agent, msgService *message.DBService, provi
}
}
func provideChatResolver(log *slog.Logger, a *agentpkg.Agent, modelsService *models.Service, queries *dbsqlc.Queries, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, accountService *accounts.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler, memoryRegistry *memprovider.Registry, sessionService *sessionpkg.Service, eventHub *event.Hub, compactionService *compaction.Service, pipeline *pipelinepkg.Pipeline, rc *boot.RuntimeConfig) *flow.Resolver {
func provideChatResolver(log *slog.Logger, a *agentpkg.Agent, modelsService *models.Service, queries *dbsqlc.Queries, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, accountService *accounts.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler, memoryRegistry *memprovider.Registry, routeService *route.DBService, sessionService *sessionpkg.Service, eventHub *event.Hub, compactionService *compaction.Service, pipeline *pipelinepkg.Pipeline, rc *boot.RuntimeConfig, bgManager *background.Manager) *flow.Resolver {
resolver := flow.NewResolver(log, modelsService, queries, chatService, msgService, settingsService, accountService, a, rc.TimezoneLocation, 120*time.Second)
resolver.SetMemoryRegistry(memoryRegistry)
resolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler})
resolver.SetGatewayAssetLoader(&gatewayAssetLoaderAdapter{media: mediaService})
resolver.SetRouteService(routeService)
resolver.SetSessionService(sessionService)
resolver.SetEventPublisher(eventHub)
resolver.SetCompactionService(compactionService)
resolver.SetPipeline(pipeline)
resolver.SetBackgroundManager(bgManager)
bgManager.SetWakeFunc(func(botID, sessionID string) {
resolver.TriggerBackgroundNotification(context.Background(), botID, sessionID)
})
return resolver
}
@@ -552,7 +560,11 @@ func provideToolGatewayService(log *slog.Logger, fedGateway *handlers.MCPFederat
return svc
}
func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, routeService *route.DBService, scheduleService *schedule.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *workspace.Manager, mediaService *media.Service, memoryRegistry *memprovider.Registry, emailService *emailpkg.Service, emailManager *emailpkg.Manager, fedGateway *handlers.MCPFederationGateway, mcpConnService *mcp.ConnectionService, modelsService *models.Service, browserContextService *browsercontexts.Service, queries *dbsqlc.Queries, ttsService *ttspkg.Service, sessionService *sessionpkg.Service) []agenttools.ToolProvider {
func provideBackgroundManager(log *slog.Logger) *background.Manager {
return background.New(log)
}
func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, routeService *route.DBService, scheduleService *schedule.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *workspace.Manager, mediaService *media.Service, memoryRegistry *memprovider.Registry, emailService *emailpkg.Service, emailManager *emailpkg.Manager, fedGateway *handlers.MCPFederationGateway, mcpConnService *mcp.ConnectionService, modelsService *models.Service, browserContextService *browsercontexts.Service, queries *dbsqlc.Queries, ttsService *ttspkg.Service, sessionService *sessionpkg.Service, bgManager *background.Manager) []agenttools.ToolProvider {
var assetResolver messaging.AssetResolver
if mediaService != nil {
assetResolver = &mediaAssetResolverAdapter{media: mediaService}
@@ -564,7 +576,7 @@ func provideToolProviders(log *slog.Logger, cfg config.Config, channelManager *c
agenttools.NewScheduleProvider(log, scheduleService),
agenttools.NewMemoryProvider(log, memoryRegistry, settingsService),
agenttools.NewWebProvider(log, settingsService, searchProviderService),
agenttools.NewContainerProvider(log, manager, config.DefaultDataMount),
agenttools.NewContainerProvider(log, manager, bgManager, config.DefaultDataMount),
agenttools.NewEmailProvider(log, emailService, emailManager),
agenttools.NewWebFetchProvider(log),
agenttools.NewSpawnProvider(log, settingsService, modelsService, queries, sessionService),
@@ -869,6 +881,20 @@ func startTtsTempStoreCleanup(lc fx.Lifecycle, store *ttspkg.TempStore) {
})
}
func startBackgroundTaskCleanup(lc fx.Lifecycle, mgr *background.Manager) {
done := make(chan struct{})
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
go mgr.StartCleanupLoop(done, background.DefaultCleanupInterval, background.DefaultTaskRetention)
return nil
},
OnStop: func(_ context.Context) error {
close(done)
return nil
},
})
}
// settingsTtsModelResolver adapts settings.Service to the ttsModelResolver interface
// expected by ChannelInboundProcessor and LocalChannelHandler.
type sessionEnsurerAdapter struct {