Files
Memoh/internal/containerd/service_apple.go
T
BBQ 9ceabf68c4 feat(mcp): replace bind-mount+exec with in-container gRPC service (#179)
Replace the host bind-mount + containerd exec approach with a per-bot
in-container gRPC server (ContainerService, port 9090). All file I/O,
exec, and MCP stdio sessions now go through gRPC instead of running
shell commands or reading host-mounted directories.

Architecture changes:
- cmd/mcp: rewritten as a gRPC server (ContainerService) with full
  file and exec API (ReadFile, WriteFile, ListDir, ReadRaw, WriteRaw,
  Exec, Stat, Mkdir, Rename, DeleteFile)
- internal/mcp/mcpcontainer: protobuf definitions and generated stubs
- internal/mcp/mcpclient: gRPC client wrapper with connection pool
  (Pool) and Provider interface for dependency injection
- mcp.Manager: add per-bot IP cache, gRPC connection pool, and
  SetContainerIP/MCPClient methods; remove DataDir/Exec helpers
- containerd.Service: remove ExecTask/ExecTaskStreaming; network setup
  now returns NetworkResult{IP} for pool routing
- internal/fs/service.go: deleted (replaced by mcpclient)
- handlers/fs.go: deleted; MCP stdio session logic moved to mcp_stdio.go
- container provider Executor: all tools (read/write/list/edit/exec)
  now call gRPC client instead of running shell via exec
- storefs, containerfs, media, skills, memory: all I/O ported to
  mcpclient.Provider

Database:
- migration 0022: drop host_path column from containers table

One-time data migration:
- migrateBindMountData: on first Start() after upgrade, copies old
  bind-mount data into the container via gRPC, then renames src dir
  to prevent re-migration; runs in background goroutine

Bug fixes:
- mcp_stdio: callRaw now returns full JSON-RPC envelope
  {"jsonrpc","id","result"|"error"} matching protocol spec;
  explicit "initialize" call now advances session init state to
  prevent duplicate handshake on next non-initialize call
- mcpclient Pool: properly evict stale gRPC connection after snapshot
  replace (container process recreated); use SetContainerIP instead
  of direct map write so IP changes always evict pool entry
- migrateBindMountData: walkErr on directories now counted as failure
  so partially-walked trees don't get incorrectly marked as migrated
- cmd/mcp/Dockerfile: removed dead file (docker/Dockerfile.mcp is the
  canonical production build)

Tests:
- provider_test.go: restored with bufconn in-process gRPC mock
  (fakeContainerService + staticProvider), 14 cases covering all 5
  tools plus edge cases
- mcp_session_test.go: new, covers JSON-RPC envelope, init state
  machine, pending cleanup on cancel/close, readLoop cancel
- storefs/service_test.go: restored (pure function roundtrip tests)
2026-03-04 21:50:08 +08:00

465 lines
12 KiB
Go

package containerd
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"github.com/memohai/acgo"
"github.com/memohai/acgo/socktainer"
)
// ---------------------------------------------------------------------------
// Service & lifecycle
// ---------------------------------------------------------------------------
type AppleService struct {
client *acgo.Client
manager *socktainer.Manager
managerOpts []socktainer.Option
socketPath string
logger *slog.Logger
mu sync.Mutex
}
type AppleServiceConfig struct {
SocketPath string
BinaryPath string
}
func NewAppleService(ctx context.Context, log *slog.Logger, cfg AppleServiceConfig) (*AppleService, error) {
var managerOpts []socktainer.Option
if cfg.BinaryPath != "" {
managerOpts = append(managerOpts, socktainer.WithBinary(cfg.BinaryPath))
}
if cfg.SocketPath != "" {
managerOpts = append(managerOpts, socktainer.WithSocket(expandHome(cfg.SocketPath)))
}
svc := &AppleService{
managerOpts: managerOpts,
logger: log.With(slog.String("service", "apple-container")),
}
if err := svc.startSocktainer(ctx); err != nil {
return nil, err
}
return svc, nil
}
func (s *AppleService) startSocktainer(ctx context.Context) error {
mgr := socktainer.NewManager(s.managerOpts...)
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("start socktainer: %w", err)
}
client, err := acgo.New(acgo.WithSocketPath(mgr.SocketPath()))
if err != nil {
_ = mgr.Stop()
return fmt.Errorf("create acgo client: %w", err)
}
s.manager = mgr
s.client = client
s.socketPath = mgr.SocketPath()
return nil
}
func (s *AppleService) ensureHealthy(ctx context.Context) error {
if ok, _ := s.client.IsServing(ctx); ok {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
if ok, _ := s.client.IsServing(ctx); ok {
return nil
}
s.logger.Warn("socktainer not responding, restarting")
_ = s.client.Close()
_ = s.manager.Stop()
_ = os.Remove(s.socketPath)
if err := s.startSocktainer(ctx); err != nil {
s.logger.Error("socktainer restart failed", slog.Any("error", err))
return err
}
s.logger.Info("socktainer restarted successfully")
return nil
}
func (s *AppleService) Close() error {
_ = s.client.Close()
return s.manager.Stop()
}
// ---------------------------------------------------------------------------
// Images
// ---------------------------------------------------------------------------
func (s *AppleService) PullImage(ctx context.Context, ref string, opts *PullImageOptions) (ImageInfo, error) {
if ref == "" {
return ImageInfo{}, ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return ImageInfo{}, err
}
img, err := s.client.Pull(ctx, ref)
if err != nil {
return ImageInfo{}, err
}
return toAcgoImageInfo(img), nil
}
func (s *AppleService) GetImage(ctx context.Context, ref string) (ImageInfo, error) {
if ref == "" {
return ImageInfo{}, ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return ImageInfo{}, err
}
img, err := s.client.GetImage(ctx, ref)
if err != nil {
return ImageInfo{}, err
}
return toAcgoImageInfo(img), nil
}
func (s *AppleService) ListImages(ctx context.Context) ([]ImageInfo, error) {
if err := s.ensureHealthy(ctx); err != nil {
return nil, err
}
imgs, err := s.client.ListImages(ctx)
if err != nil {
return nil, err
}
out := make([]ImageInfo, len(imgs))
for i, img := range imgs {
out[i] = toAcgoImageInfo(img)
}
return out, nil
}
func (s *AppleService) DeleteImage(ctx context.Context, ref string, opts *DeleteImageOptions) error {
if ref == "" {
return ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return err
}
return s.client.DeleteImage(ctx, ref)
}
// ---------------------------------------------------------------------------
// Containers
// ---------------------------------------------------------------------------
func (s *AppleService) CreateContainer(ctx context.Context, req CreateContainerRequest) (ContainerInfo, error) {
if req.ID == "" || req.ImageRef == "" {
return ContainerInfo{}, ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return ContainerInfo{}, err
}
if _, err := s.client.GetImage(ctx, req.ImageRef); err != nil {
s.logger.Info("image not found locally, pulling", slog.String("image", req.ImageRef))
if _, pullErr := s.client.Pull(ctx, req.ImageRef); pullErr != nil {
return ContainerInfo{}, fmt.Errorf("pull image %s: %w", req.ImageRef, pullErr)
}
}
ctr, err := s.client.NewContainer(ctx, req.ID, specToCreateOpts(req)...)
if err != nil {
return ContainerInfo{}, err
}
return acgoContainerToInfo(ctx, ctr)
}
func (s *AppleService) GetContainer(ctx context.Context, id string) (ContainerInfo, error) {
if id == "" {
return ContainerInfo{}, ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return ContainerInfo{}, err
}
ctr, err := s.client.LoadContainer(ctx, id)
if err != nil {
return ContainerInfo{}, err
}
return acgoContainerToInfo(ctx, ctr)
}
func (s *AppleService) ListContainers(ctx context.Context) ([]ContainerInfo, error) {
if err := s.ensureHealthy(ctx); err != nil {
return nil, err
}
ctrs, err := s.client.Containers(ctx, acgo.WithListAll())
if err != nil {
return nil, err
}
out := make([]ContainerInfo, 0, len(ctrs))
for _, c := range ctrs {
info, err := acgoContainerToInfo(ctx, c)
if err != nil {
return nil, err
}
out = append(out, info)
}
return out, nil
}
func (s *AppleService) DeleteContainer(ctx context.Context, id string, opts *DeleteContainerOptions) error {
if id == "" {
return ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return err
}
ctr, err := s.client.LoadContainer(ctx, id)
if err != nil {
return err
}
var deleteOpts []acgo.DeleteOpt
if opts != nil && opts.CleanupSnapshot {
deleteOpts = append(deleteOpts, acgo.WithRemoveVolumes())
}
deleteOpts = append(deleteOpts, acgo.WithForceDelete())
return ctr.Delete(ctx, deleteOpts...)
}
func (s *AppleService) ListContainersByLabel(ctx context.Context, key, value string) ([]ContainerInfo, error) {
if key == "" {
return nil, ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return nil, err
}
filtersJSON := fmt.Sprintf(`{"label":["%s=%s"]}`, key, value)
ctrs, err := s.client.Containers(ctx, acgo.WithListAll(), acgo.WithListFilters(filtersJSON))
if err != nil {
return nil, err
}
var out []ContainerInfo
for _, c := range ctrs {
info, err := acgoContainerToInfo(ctx, c)
if err != nil {
return nil, err
}
if v, ok := info.Labels[key]; ok && (value == "" || v == value) {
out = append(out, info)
}
}
return out, nil
}
// ---------------------------------------------------------------------------
// Task / process lifecycle
// ---------------------------------------------------------------------------
func (s *AppleService) StartContainer(ctx context.Context, containerID string, opts *StartTaskOptions) error {
if containerID == "" {
return ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return err
}
ctr, err := s.client.LoadContainer(ctx, containerID)
if err != nil {
return err
}
return ctr.Start(ctx)
}
func (s *AppleService) StopContainer(ctx context.Context, containerID string, opts *StopTaskOptions) error {
if containerID == "" {
return ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return err
}
ctr, err := s.client.LoadContainer(ctx, containerID)
if err != nil {
return err
}
timeout := 10
if opts != nil && opts.Timeout > 0 {
timeout = int(opts.Timeout.Seconds())
}
var stopOpts []acgo.StopOpt
stopOpts = append(stopOpts, acgo.WithStopTimeout(timeout))
if opts != nil && opts.Signal != 0 {
stopOpts = append(stopOpts, acgo.WithStopSignal(syscall.Signal(opts.Signal).String()))
}
if err := ctr.Stop(ctx, stopOpts...); err != nil && opts != nil && opts.Force {
return ctr.Kill(ctx)
}
return nil
}
func (s *AppleService) DeleteTask(context.Context, string, *DeleteTaskOptions) error {
return nil
}
func (s *AppleService) GetTaskInfo(ctx context.Context, containerID string) (TaskInfo, error) {
if containerID == "" {
return TaskInfo{}, ErrInvalidArgument
}
if err := s.ensureHealthy(ctx); err != nil {
return TaskInfo{}, err
}
ctr, err := s.client.LoadContainer(ctx, containerID)
if err != nil {
return TaskInfo{}, err
}
info, err := ctr.Info(ctx)
if err != nil {
return TaskInfo{}, err
}
return TaskInfo{
ContainerID: containerID,
ID: containerID,
Status: containerStateToTaskStatus(info.State),
}, nil
}
func (s *AppleService) ListTasks(ctx context.Context, opts *ListTasksOptions) ([]TaskInfo, error) {
if err := s.ensureHealthy(ctx); err != nil {
return nil, err
}
ctrs, err := s.client.Containers(ctx, acgo.WithListAll())
if err != nil {
return nil, err
}
var out []TaskInfo
for _, c := range ctrs {
info, err := c.Info(ctx)
if err != nil {
continue
}
if opts != nil && opts.Filter != "" {
if strings.Contains(opts.Filter, "container.id==") {
if strings.TrimPrefix(opts.Filter, "container.id==") != info.ID {
continue
}
}
}
out = append(out, TaskInfo{
ContainerID: info.ID,
ID: info.ID,
Status: containerStateToTaskStatus(info.State),
})
}
return out, nil
}
// ---------------------------------------------------------------------------
// Network (no-op — Apple Container handles networking natively)
// ---------------------------------------------------------------------------
func (s *AppleService) SetupNetwork(context.Context, NetworkSetupRequest) (NetworkResult, error) {
return NetworkResult{}, nil
}
func (s *AppleService) RemoveNetwork(context.Context, NetworkSetupRequest) error { return nil }
// ---------------------------------------------------------------------------
// Snapshots (not supported on Apple Container)
// ---------------------------------------------------------------------------
func (s *AppleService) CommitSnapshot(context.Context, string, string, string) error {
return ErrNotSupported
}
func (s *AppleService) ListSnapshots(context.Context, string) ([]SnapshotInfo, error) {
return nil, ErrNotSupported
}
func (s *AppleService) PrepareSnapshot(context.Context, string, string, string) error {
return ErrNotSupported
}
func (s *AppleService) CreateContainerFromSnapshot(context.Context, CreateContainerRequest) (ContainerInfo, error) {
return ContainerInfo{}, ErrNotSupported
}
func (s *AppleService) SnapshotMounts(context.Context, string, string) ([]MountInfo, error) {
return nil, ErrNotSupported
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
func specToCreateOpts(req CreateContainerRequest) []acgo.CreateOpt {
var opts []acgo.CreateOpt
opts = append(opts, acgo.WithImage(req.ImageRef))
if len(req.Spec.Cmd) > 0 {
opts = append(opts, acgo.WithEntrypoint(req.Spec.Cmd[0]))
if len(req.Spec.Cmd) > 1 {
opts = append(opts, acgo.WithCmd(req.Spec.Cmd[1:]...))
}
}
if req.Spec.WorkDir != "" {
opts = append(opts, acgo.WithWorkdir(req.Spec.WorkDir))
}
if req.Spec.User != "" {
opts = append(opts, acgo.WithUser(req.Spec.User))
}
if req.Spec.TTY {
opts = append(opts, acgo.WithTTY())
}
for _, env := range req.Spec.Env {
if k, v, ok := strings.Cut(env, "="); ok {
opts = append(opts, acgo.WithEnv(k, v))
}
}
for _, m := range req.Spec.Mounts {
opts = append(opts, acgo.WithVolume(m.Source, m.Destination))
}
for _, dns := range req.Spec.DNS {
opts = append(opts, acgo.WithDNS(dns))
}
for k, v := range req.Labels {
opts = append(opts, acgo.WithLabel(k, v))
}
return opts
}
func toAcgoImageInfo(img acgo.Image) ImageInfo {
return ImageInfo{Name: img.Name(), ID: img.ID(), Tags: img.RepoTags()}
}
func acgoContainerToInfo(ctx context.Context, c acgo.Container) (ContainerInfo, error) {
info, err := c.Info(ctx)
if err != nil {
return ContainerInfo{}, err
}
return ContainerInfo{
ID: info.ID,
Image: info.Image,
Labels: info.Labels,
Runtime: RuntimeInfo{Name: "apple-container"},
CreatedAt: info.CreatedAt,
UpdatedAt: info.CreatedAt,
}, nil
}
func containerStateToTaskStatus(state string) TaskStatus {
switch state {
case "running":
return TaskStatusRunning
case "created":
return TaskStatusCreated
case "exited", "dead":
return TaskStatusStopped
case "paused":
return TaskStatusPaused
default:
return TaskStatusUnknown
}
}
func expandHome(path string) string {
if !strings.HasPrefix(path, "~/") {
return path
}
if home, err := os.UserHomeDir(); err == nil {
return filepath.Join(home, path[2:])
}
return path
}