mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
9ceabf68c4
Replace the host bind-mount + containerd exec approach with a per-bot
in-container gRPC server (ContainerService, port 9090). All file I/O,
exec, and MCP stdio sessions now go through gRPC instead of running
shell commands or reading host-mounted directories.
Architecture changes:
- cmd/mcp: rewritten as a gRPC server (ContainerService) with full
file and exec API (ReadFile, WriteFile, ListDir, ReadRaw, WriteRaw,
Exec, Stat, Mkdir, Rename, DeleteFile)
- internal/mcp/mcpcontainer: protobuf definitions and generated stubs
- internal/mcp/mcpclient: gRPC client wrapper with connection pool
(Pool) and Provider interface for dependency injection
- mcp.Manager: add per-bot IP cache, gRPC connection pool, and
SetContainerIP/MCPClient methods; remove DataDir/Exec helpers
- containerd.Service: remove ExecTask/ExecTaskStreaming; network setup
now returns NetworkResult{IP} for pool routing
- internal/fs/service.go: deleted (replaced by mcpclient)
- handlers/fs.go: deleted; MCP stdio session logic moved to mcp_stdio.go
- container provider Executor: all tools (read/write/list/edit/exec)
now call gRPC client instead of running shell via exec
- storefs, containerfs, media, skills, memory: all I/O ported to
mcpclient.Provider
Database:
- migration 0022: drop host_path column from containers table
One-time data migration:
- migrateBindMountData: on first Start() after upgrade, copies old
bind-mount data into the container via gRPC, then renames src dir
to prevent re-migration; runs in background goroutine
Bug fixes:
- mcp_stdio: callRaw now returns full JSON-RPC envelope
{"jsonrpc","id","result"|"error"} matching protocol spec;
explicit "initialize" call now advances session init state to
prevent duplicate handshake on next non-initialize call
- mcpclient Pool: properly evict stale gRPC connection after snapshot
replace (container process recreated); use SetContainerIP instead
of direct map write so IP changes always evict pool entry
- migrateBindMountData: walkErr on directories now counted as failure
so partially-walked trees don't get incorrectly marked as migrated
- cmd/mcp/Dockerfile: removed dead file (docker/Dockerfile.mcp is the
canonical production build)
Tests:
- provider_test.go: restored with bufconn in-process gRPC mock
(fakeContainerService + staticProvider), 14 cases covering all 5
tools plus edge cases
- mcp_session_test.go: new, covers JSON-RPC envelope, init state
machine, pending cleanup on cancel/close, readLoop cancel
- storefs/service_test.go: restored (pure function roundtrip tests)
104 lines
2.8 KiB
Go
104 lines
2.8 KiB
Go
package mcp
|
|
|
|
import (
|
|
"context"
|
|
"io/fs"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/memohai/memoh/internal/mcp/mcpclient"
|
|
)
|
|
|
|
const migratedSuffix = ".migrated"
|
|
|
|
// migrateBindMountData copies bot data from the old host bind-mount directory
|
|
// into the container via gRPC, then renames the source to prevent re-migration.
|
|
// This is a one-time operation for bots that were created before the switch
|
|
// from bind mounts to container-local storage.
|
|
func (m *Manager) migrateBindMountData(ctx context.Context, botID string) {
|
|
srcDir := filepath.Join(m.dataRoot(), "bots", botID)
|
|
migratedDir := srcDir + migratedSuffix
|
|
|
|
if _, err := os.Stat(migratedDir); err == nil {
|
|
return // already migrated
|
|
}
|
|
info, err := os.Stat(srcDir)
|
|
if err != nil || !info.IsDir() {
|
|
return // no old data
|
|
}
|
|
|
|
// Quick check: is the directory empty?
|
|
entries, err := os.ReadDir(srcDir)
|
|
if err != nil || len(entries) == 0 {
|
|
return
|
|
}
|
|
|
|
client, err := m.grpcPool.Get(ctx, botID)
|
|
if err != nil {
|
|
m.logger.Warn("migrate: cannot connect to container",
|
|
slog.String("bot_id", botID), slog.Any("error", err))
|
|
return
|
|
}
|
|
|
|
m.logger.Info("migrating bind-mount data into container",
|
|
slog.String("bot_id", botID), slog.String("src", srcDir))
|
|
|
|
var migrated, failed int
|
|
err = filepath.WalkDir(srcDir, func(path string, d fs.DirEntry, walkErr error) error {
|
|
if walkErr != nil {
|
|
// A directory walk error means the entire subtree is skipped by
|
|
// WalkDir. Count it as a failure so the src dir is NOT renamed
|
|
// and migration is retried on next start.
|
|
m.logger.Warn("migrate: walk error",
|
|
slog.String("path", path), slog.Any("error", walkErr))
|
|
failed++
|
|
return nil
|
|
}
|
|
rel, relErr := filepath.Rel(srcDir, path)
|
|
if relErr != nil || rel == "." {
|
|
return nil
|
|
}
|
|
if d.IsDir() {
|
|
return nil // dirs are created implicitly by WriteFile
|
|
}
|
|
|
|
if err := copyFileToContainer(ctx, client, path, rel); err != nil {
|
|
m.logger.Warn("migrate: copy failed",
|
|
slog.String("file", rel), slog.Any("error", err))
|
|
failed++
|
|
return nil
|
|
}
|
|
migrated++
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
m.logger.Warn("migrate: walk failed", slog.String("bot_id", botID), slog.Any("error", err))
|
|
}
|
|
|
|
m.logger.Info("migration complete",
|
|
slog.String("bot_id", botID),
|
|
slog.Int("migrated", migrated),
|
|
slog.Int("failed", failed))
|
|
|
|
if failed == 0 {
|
|
if renameErr := os.Rename(srcDir, migratedDir); renameErr != nil {
|
|
m.logger.Warn("migrate: rename src dir failed",
|
|
slog.String("src", srcDir), slog.Any("error", renameErr))
|
|
}
|
|
}
|
|
}
|
|
|
|
func copyFileToContainer(ctx context.Context, client *mcpclient.Client, hostPath, containerRelPath string) error {
|
|
f, err := os.Open(hostPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
containerRelPath = strings.ReplaceAll(containerRelPath, string(filepath.Separator), "/")
|
|
_, err = client.WriteRaw(ctx, containerRelPath, f)
|
|
return err
|
|
}
|