mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-25 07:00:48 +09:00
feat: Misskey channel adapter, agent reliability hardening & stream error resilience (#359)
This commit is contained in:
+36
-15
@@ -35,6 +35,7 @@ import (
|
||||
"github.com/memohai/memoh/internal/channel/adapters/feishu"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/local"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/matrix"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/misskey"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/qq"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/telegram"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/wechatoa"
|
||||
@@ -361,12 +362,13 @@ func provideWorkspaceManager(log *slog.Logger, service ctr.Service, cfg config.C
|
||||
// memory providers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func provideMemoryLLM(modelsService *models.Service, queries *dbsqlc.Queries, log *slog.Logger) memprovider.LLM {
|
||||
func provideMemoryLLM(modelsService *models.Service, settingsService *settings.Service, queries *dbsqlc.Queries, log *slog.Logger) memprovider.LLM {
|
||||
return &lazyLLMClient{
|
||||
modelsService: modelsService,
|
||||
queries: queries,
|
||||
timeout: 30 * time.Second,
|
||||
logger: log,
|
||||
modelsService: modelsService,
|
||||
settingsService: settingsService,
|
||||
queries: queries,
|
||||
timeout: 30 * time.Second,
|
||||
logger: log,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -531,6 +533,10 @@ func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub, mediaService
|
||||
weixinAdapter.SetAssetOpener(mediaService)
|
||||
registry.MustRegister(weixinAdapter)
|
||||
registry.MustRegister(local.NewWebAdapter(hub))
|
||||
|
||||
// Misskey
|
||||
registry.MustRegister(misskey.NewMisskeyAdapter(log))
|
||||
|
||||
return registry
|
||||
}
|
||||
|
||||
@@ -1084,14 +1090,15 @@ func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Quer
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type lazyLLMClient struct {
|
||||
modelsService *models.Service
|
||||
queries *dbsqlc.Queries
|
||||
timeout time.Duration
|
||||
logger *slog.Logger
|
||||
modelsService *models.Service
|
||||
settingsService *settings.Service
|
||||
queries *dbsqlc.Queries
|
||||
timeout time.Duration
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Extract(ctx context.Context, req memprovider.ExtractRequest) (memprovider.ExtractResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, req.BotID)
|
||||
if err != nil {
|
||||
return memprovider.ExtractResponse{}, err
|
||||
}
|
||||
@@ -1099,7 +1106,7 @@ func (c *lazyLLMClient) Extract(ctx context.Context, req memprovider.ExtractRequ
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Decide(ctx context.Context, req memprovider.DecideRequest) (memprovider.DecideResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, req.BotID)
|
||||
if err != nil {
|
||||
return memprovider.DecideResponse{}, err
|
||||
}
|
||||
@@ -1107,7 +1114,7 @@ func (c *lazyLLMClient) Decide(ctx context.Context, req memprovider.DecideReques
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Compact(ctx context.Context, req memprovider.CompactRequest) (memprovider.CompactResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, "")
|
||||
if err != nil {
|
||||
return memprovider.CompactResponse{}, err
|
||||
}
|
||||
@@ -1115,18 +1122,32 @@ func (c *lazyLLMClient) Compact(ctx context.Context, req memprovider.CompactRequ
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) DetectLanguage(ctx context.Context, text string) (string, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, "")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return client.DetectLanguage(ctx, text)
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) resolve(ctx context.Context) (memprovider.LLM, error) {
|
||||
func (c *lazyLLMClient) resolve(ctx context.Context, botID string) (memprovider.LLM, error) {
|
||||
if c.modelsService == nil || c.queries == nil {
|
||||
return nil, errors.New("models service not configured")
|
||||
}
|
||||
memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, "")
|
||||
|
||||
// Try to use the bot's configured chat model for memory operations.
|
||||
chatModelID := ""
|
||||
if c.settingsService != nil && strings.TrimSpace(botID) != "" {
|
||||
if botSettings, err := c.settingsService.GetBot(ctx, botID); err == nil {
|
||||
// Prefer compaction model (smaller/cheaper), then chat model.
|
||||
if id := strings.TrimSpace(botSettings.CompactionModelID); id != "" {
|
||||
chatModelID = id
|
||||
} else if id := strings.TrimSpace(botSettings.ChatModelID); id != "" {
|
||||
chatModelID = id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, chatModelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
+17
-2
@@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
||||
"github.com/memohai/memoh/internal/logger"
|
||||
@@ -49,7 +50,7 @@ func initDataDir() {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := os.WriteFile(dst, data, fs.FileMode(0o644)); err != nil {
|
||||
if err := os.WriteFile(dst, data, fs.FileMode(0o644)); err != nil { //nolint:gosec // G703: dst is built from filepath.Join(defaultWorkDir, e.Name()) where e comes from os.ReadDir
|
||||
logger.Warn("failed to seed template", slog.String("file", e.Name()), slog.Any("error", err))
|
||||
}
|
||||
}
|
||||
@@ -91,7 +92,21 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
srv := grpc.NewServer()
|
||||
srv := grpc.NewServer(
|
||||
grpc.MaxRecvMsgSize(16*1024*1024),
|
||||
grpc.MaxSendMsgSize(16*1024*1024),
|
||||
grpc.KeepaliveParams(keepalive.ServerParameters{
|
||||
MaxConnectionIdle: 5 * time.Minute,
|
||||
MaxConnectionAge: 30 * time.Minute,
|
||||
MaxConnectionAgeGrace: 10 * time.Second,
|
||||
Time: 60 * time.Second,
|
||||
Timeout: 15 * time.Second,
|
||||
}),
|
||||
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
||||
MinTime: 10 * time.Second,
|
||||
PermitWithoutStream: true,
|
||||
}),
|
||||
)
|
||||
pb.RegisterContainerServiceServer(srv, &containerServer{})
|
||||
reflection.Register(srv)
|
||||
|
||||
|
||||
+20
-12
@@ -24,14 +24,15 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
readMaxLines = 200
|
||||
readMaxBytes = 5120
|
||||
readMaxLineLen = 1000
|
||||
listMaxEntries = 200
|
||||
binaryProbeBytes = 8 * 1024
|
||||
rawChunkSize = 64 * 1024
|
||||
defaultWorkDir = "/data"
|
||||
defaultTimeout = 30
|
||||
readMaxLines = 2000
|
||||
readMaxBytes = 0 // 0 = no byte limit (line count only)
|
||||
readMaxLineLen = 0 // 0 = no per-line truncation
|
||||
listMaxEntries = 200
|
||||
binaryProbeBytes = 8 * 1024
|
||||
rawChunkSize = 64 * 1024
|
||||
defaultWorkDir = "/data"
|
||||
defaultTimeout = 30
|
||||
defaultPTYTimeout = 5 * 60 // 5 minutes max for PTY sessions (agent tool calls)
|
||||
)
|
||||
|
||||
type containerServer struct {
|
||||
@@ -89,12 +90,12 @@ func (*containerServer) ReadFile(_ context.Context, req *pb.ReadFileRequest) (*p
|
||||
}
|
||||
|
||||
line := scanner.Text()
|
||||
if utf8.RuneCountInString(line) > readMaxLineLen {
|
||||
if readMaxLineLen > 0 && utf8.RuneCountInString(line) > readMaxLineLen {
|
||||
line = truncateRunes(line, readMaxLineLen) + "..."
|
||||
}
|
||||
|
||||
entry := line + "\n"
|
||||
if bytesWritten+len(entry) > readMaxBytes {
|
||||
if readMaxBytes > 0 && bytesWritten+len(entry) > readMaxBytes {
|
||||
break
|
||||
}
|
||||
out.WriteString(entry)
|
||||
@@ -288,11 +289,18 @@ func execPTY(stream pb.ContainerService_ExecServer, firstMsg *pb.ExecInput) erro
|
||||
workDir = defaultWorkDir
|
||||
}
|
||||
|
||||
timeout := int(firstMsg.GetTimeoutSeconds())
|
||||
if timeout <= 0 {
|
||||
timeout = defaultPTYTimeout
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(stream.Context(), time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var cmd *exec.Cmd
|
||||
if isBarePath(command) {
|
||||
cmd = exec.CommandContext(stream.Context(), command) //nolint:gosec // G204: intentional
|
||||
cmd = exec.CommandContext(ctx, command) //nolint:gosec // G204: intentional
|
||||
} else {
|
||||
cmd = exec.CommandContext(stream.Context(), "/bin/sh", "-c", command) //nolint:gosec // G204: intentional
|
||||
cmd = exec.CommandContext(ctx, "/bin/sh", "-c", command) //nolint:gosec // G204: intentional
|
||||
}
|
||||
cmd.Dir = workDir
|
||||
cmd.Env = append(os.Environ(), firstMsg.GetEnv()...)
|
||||
|
||||
+32
-12
@@ -36,6 +36,7 @@ import (
|
||||
"github.com/memohai/memoh/internal/channel/adapters/feishu"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/local"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/matrix"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/misskey"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/qq"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/telegram"
|
||||
"github.com/memohai/memoh/internal/channel/adapters/wechatoa"
|
||||
@@ -260,8 +261,8 @@ func provideWorkspaceManager(log *slog.Logger, service ctr.Service, cfg config.C
|
||||
return workspace.NewManager(log, service, cfg.Workspace, cfg.Containerd.Namespace, conn)
|
||||
}
|
||||
|
||||
func provideMemoryLLM(modelsService *models.Service, queries *dbsqlc.Queries, log *slog.Logger) memprovider.LLM {
|
||||
return &lazyLLMClient{modelsService: modelsService, queries: queries, timeout: 30 * time.Second, logger: log}
|
||||
func provideMemoryLLM(modelsService *models.Service, settingsService *settings.Service, queries *dbsqlc.Queries, log *slog.Logger) memprovider.LLM {
|
||||
return &lazyLLMClient{modelsService: modelsService, settingsService: settingsService, queries: queries, timeout: 30 * time.Second, logger: log}
|
||||
}
|
||||
|
||||
func provideMemoryProviderRegistry(log *slog.Logger, llm memprovider.LLM, chatService *conversation.Service, accountService *accounts.Service, manager *workspace.Manager, queries *dbsqlc.Queries, cfg config.Config) *memprovider.Registry {
|
||||
@@ -453,6 +454,10 @@ func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub, mediaService
|
||||
weixinAdapter.SetAssetOpener(mediaService)
|
||||
registry.MustRegister(weixinAdapter)
|
||||
registry.MustRegister(local.NewWebAdapter(hub))
|
||||
|
||||
// Misskey
|
||||
registry.MustRegister(misskey.NewMisskeyAdapter(log))
|
||||
|
||||
return registry
|
||||
}
|
||||
|
||||
@@ -1006,14 +1011,15 @@ func ensureAdminUser(ctx context.Context, log *slog.Logger, queries *dbsqlc.Quer
|
||||
}
|
||||
|
||||
type lazyLLMClient struct {
|
||||
modelsService *models.Service
|
||||
queries *dbsqlc.Queries
|
||||
timeout time.Duration
|
||||
logger *slog.Logger
|
||||
modelsService *models.Service
|
||||
settingsService *settings.Service
|
||||
queries *dbsqlc.Queries
|
||||
timeout time.Duration
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Extract(ctx context.Context, req memprovider.ExtractRequest) (memprovider.ExtractResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, req.BotID)
|
||||
if err != nil {
|
||||
return memprovider.ExtractResponse{}, err
|
||||
}
|
||||
@@ -1021,7 +1027,7 @@ func (c *lazyLLMClient) Extract(ctx context.Context, req memprovider.ExtractRequ
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Decide(ctx context.Context, req memprovider.DecideRequest) (memprovider.DecideResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, req.BotID)
|
||||
if err != nil {
|
||||
return memprovider.DecideResponse{}, err
|
||||
}
|
||||
@@ -1029,7 +1035,7 @@ func (c *lazyLLMClient) Decide(ctx context.Context, req memprovider.DecideReques
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) Compact(ctx context.Context, req memprovider.CompactRequest) (memprovider.CompactResponse, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, "")
|
||||
if err != nil {
|
||||
return memprovider.CompactResponse{}, err
|
||||
}
|
||||
@@ -1037,18 +1043,32 @@ func (c *lazyLLMClient) Compact(ctx context.Context, req memprovider.CompactRequ
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) DetectLanguage(ctx context.Context, text string) (string, error) {
|
||||
client, err := c.resolve(ctx)
|
||||
client, err := c.resolve(ctx, "")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return client.DetectLanguage(ctx, text)
|
||||
}
|
||||
|
||||
func (c *lazyLLMClient) resolve(ctx context.Context) (memprovider.LLM, error) {
|
||||
func (c *lazyLLMClient) resolve(ctx context.Context, botID string) (memprovider.LLM, error) {
|
||||
if c.modelsService == nil || c.queries == nil {
|
||||
return nil, errors.New("models service not configured")
|
||||
}
|
||||
memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, "")
|
||||
|
||||
// Try to use the bot's configured chat model for memory operations.
|
||||
chatModelID := ""
|
||||
if c.settingsService != nil && strings.TrimSpace(botID) != "" {
|
||||
if botSettings, err := c.settingsService.GetBot(ctx, botID); err == nil {
|
||||
// Prefer compaction model (smaller/cheaper), then chat model.
|
||||
if id := strings.TrimSpace(botSettings.CompactionModelID); id != "" {
|
||||
chatModelID = id
|
||||
} else if id := strings.TrimSpace(botSettings.ChatModelID); id != "" {
|
||||
chatModelID = id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
memoryModel, memoryProvider, err := models.SelectMemoryModelForBot(ctx, c.modelsService, c.queries, chatModelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user