From 3f8cb3292c82c4acfe31dd66fbb24f77bc2d9a79 Mon Sep 17 00:00:00 2001 From: Ran <16112591+chen-ran@users.noreply.github.com> Date: Sun, 8 Feb 2026 01:45:53 +0800 Subject: [PATCH 1/2] chore: optimize code structure --- internal/handlers/fs.go | 134 ++++++---------------------------------- internal/mcp/jsonrpc.go | 95 ++++++++++++++++++++++++++++ internal/mcp/manager.go | 44 +++++++------ 3 files changed, 139 insertions(+), 134 deletions(-) create mode 100644 internal/mcp/jsonrpc.go diff --git a/internal/handlers/fs.go b/internal/handlers/fs.go index 60db0dc1..8dbac89e 100644 --- a/internal/handlers/fs.go +++ b/internal/handlers/fs.go @@ -62,53 +62,33 @@ func (h *ContainerdHandler) HandleMCPFS(c echo.Context) error { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) } if req.JSONRPC != "" && req.JSONRPC != "2.0" { - return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{ - JSONRPC: "2.0", - ID: req.ID, - Error: &mcptools.JSONRPCError{Code: -32600, Message: "invalid jsonrpc version"}, - }) + return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32600, "invalid jsonrpc version")) } - if err := h.validateMCPContainer(c.Request().Context(), containerID, botID); err != nil { + if err := h.validateMCPContainer(ctx, containerID, botID); err != nil { h.logger.Error("mcp fs validate failed", slog.Any("error", err), slog.String("bot_id", botID), slog.String("container_id", containerID)) - return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{ - JSONRPC: "2.0", - ID: req.ID, - Error: &mcptools.JSONRPCError{Code: -32603, Message: err.Error()}, - }) + return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32603, err.Error())) } - if err := h.ensureTaskRunning(c.Request().Context(), containerID); err != nil { + if err := h.ensureTaskRunning(ctx, containerID); err != nil { h.logger.Error("mcp fs ensure task failed", slog.Any("error", err), slog.String("bot_id", botID), slog.String("container_id", containerID)) - return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{ - JSONRPC: "2.0", - ID: req.ID, - Error: &mcptools.JSONRPCError{Code: -32603, Message: err.Error()}, - }) + return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32603, err.Error())) } if strings.TrimSpace(req.Method) == "" { - return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{ - JSONRPC: "2.0", - ID: req.ID, - Error: &mcptools.JSONRPCError{Code: -32601, Message: "method not found"}, - }) + return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32601, "method not found")) } - if len(req.ID) == 0 && strings.HasPrefix(req.Method, "notifications/") { - if err := h.notifyMCPServer(c.Request().Context(), containerID, req); err != nil { + if mcptools.IsNotification(req) { + if err := h.notifyMCPServer(ctx, containerID, req); err != nil { h.logger.Error("mcp fs notify failed", slog.Any("error", err), slog.String("method", req.Method), slog.String("bot_id", botID), slog.String("container_id", containerID)) return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) } // MCP Streamable HTTP spec: notifications must be answered with 202 Accepted and no body. return c.NoContent(http.StatusAccepted) } - payload, err := h.callMCPServer(c.Request().Context(), containerID, req) + payload, err := h.callMCPServer(ctx, containerID, req) if err != nil { h.logger.Error("mcp fs call failed", slog.Any("error", err), slog.String("method", req.Method), slog.String("bot_id", botID), slog.String("container_id", containerID)) - return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{ - JSONRPC: "2.0", - ID: req.ID, - Error: &mcptools.JSONRPCError{Code: -32603, Message: err.Error()}, - }) + return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32603, err.Error())) } return c.JSON(http.StatusOK, payload) } @@ -380,7 +360,7 @@ func (s *mcpSession) readLoop() { } func (s *mcpSession) call(ctx context.Context, req mcptools.JSONRPCRequest) (map[string]any, error) { - payloads, targetID, err := buildMCPPayloads(req, &s.initOnce) + payloads, targetID, err := mcptools.BuildPayloads(req, &s.initOnce) if err != nil { return nil, err } @@ -394,14 +374,9 @@ func (s *mcpSession) call(ctx context.Context, req mcptools.JSONRPCRequest) (map s.pending[target] = respCh s.pendingMu.Unlock() - s.writeMu.Lock() - for _, payload := range payloads { - if _, err := s.stdin.Write([]byte(payload + "\n")); err != nil { - s.writeMu.Unlock() - return nil, err - } + if err := s.writePayloads(payloads); err != nil { + return nil, err } - s.writeMu.Unlock() select { case resp, ok := <-respCh: @@ -437,92 +412,21 @@ func (s *mcpSession) call(ctx context.Context, req mcptools.JSONRPCRequest) (map } func (s *mcpSession) notify(ctx context.Context, req mcptools.JSONRPCRequest) error { - payloads, err := buildMCPNotificationPayloads(req) + payloads, err := mcptools.BuildNotificationPayloads(req) if err != nil { return err } + return s.writePayloads(payloads) +} + +func (s *mcpSession) writePayloads(payloads []string) error { s.writeMu.Lock() + defer s.writeMu.Unlock() for _, payload := range payloads { if _, err := s.stdin.Write([]byte(payload + "\n")); err != nil { - s.writeMu.Unlock() return err } } - s.writeMu.Unlock() return nil } -func buildMCPPayloads(req mcptools.JSONRPCRequest, initOnce *sync.Once) ([]string, json.RawMessage, error) { - if req.JSONRPC == "" { - req.JSONRPC = "2.0" - } - targetID := req.ID - payloads := []string{} - shouldInit := req.Method != "initialize" && req.Method != "notifications/initialized" - if initOnce != nil { - ran := false - initOnce.Do(func() { - ran = true - }) - if ran { - // This is the first call on the session. - } else { - shouldInit = false - } - } - if shouldInit { - initReq := map[string]any{ - "jsonrpc": "2.0", - "id": "init-1", - "method": "initialize", - "params": map[string]any{ - "protocolVersion": "2025-06-18", - "capabilities": map[string]any{ - "roots": map[string]any{ - "listChanged": false, - }, - }, - "clientInfo": map[string]any{ - "name": "memoh-http-proxy", - "version": "v0", - }, - }, - } - initBytes, err := json.Marshal(initReq) - if err != nil { - return nil, nil, err - } - payloads = append(payloads, string(initBytes)) - - initialized := map[string]any{ - "jsonrpc": "2.0", - "method": "notifications/initialized", - } - initializedBytes, err := json.Marshal(initialized) - if err != nil { - return nil, nil, err - } - payloads = append(payloads, string(initializedBytes)) - } - - reqBytes, err := json.Marshal(req) - if err != nil { - return nil, nil, err - } - payloads = append(payloads, string(reqBytes)) - return payloads, targetID, nil -} - -func buildMCPNotificationPayloads(req mcptools.JSONRPCRequest) ([]string, error) { - if req.JSONRPC == "" { - req.JSONRPC = "2.0" - } - if strings.TrimSpace(req.Method) == "" { - return nil, fmt.Errorf("missing method") - } - reqBytes, err := json.Marshal(req) - if err != nil { - return nil, err - } - return []string{string(reqBytes)}, nil -} diff --git a/internal/mcp/jsonrpc.go b/internal/mcp/jsonrpc.go new file mode 100644 index 00000000..d6d4933e --- /dev/null +++ b/internal/mcp/jsonrpc.go @@ -0,0 +1,95 @@ +package mcp + +import ( + "encoding/json" + "fmt" + "strings" + "sync" +) + +func IsNotification(req JSONRPCRequest) bool { + return len(req.ID) == 0 && strings.HasPrefix(req.Method, "notifications/") +} + +func JSONRPCErrorResponse(id json.RawMessage, code int, message string) JSONRPCResponse { + return JSONRPCResponse{ + JSONRPC: "2.0", + ID: id, + Error: &JSONRPCError{Code: code, Message: message}, + } +} + +func BuildPayloads(req JSONRPCRequest, initOnce *sync.Once) ([]string, json.RawMessage, error) { + if req.JSONRPC == "" { + req.JSONRPC = "2.0" + } + targetID := req.ID + payloads := []string{} + shouldInit := req.Method != "initialize" && req.Method != "notifications/initialized" + if initOnce != nil { + ran := false + initOnce.Do(func() { + ran = true + }) + if ran { + // This is the first call on the session. + } else { + shouldInit = false + } + } + if shouldInit { + initReq := map[string]any{ + "jsonrpc": "2.0", + "id": "init-1", + "method": "initialize", + "params": map[string]any{ + "protocolVersion": "2025-06-18", + "capabilities": map[string]any{ + "roots": map[string]any{ + "listChanged": false, + }, + }, + "clientInfo": map[string]any{ + "name": "memoh-http-proxy", + "version": "v0", + }, + }, + } + initBytes, err := json.Marshal(initReq) + if err != nil { + return nil, nil, err + } + payloads = append(payloads, string(initBytes)) + + initialized := map[string]any{ + "jsonrpc": "2.0", + "method": "notifications/initialized", + } + initializedBytes, err := json.Marshal(initialized) + if err != nil { + return nil, nil, err + } + payloads = append(payloads, string(initializedBytes)) + } + + reqBytes, err := json.Marshal(req) + if err != nil { + return nil, nil, err + } + payloads = append(payloads, string(reqBytes)) + return payloads, targetID, nil +} + +func BuildNotificationPayloads(req JSONRPCRequest) ([]string, error) { + if req.JSONRPC == "" { + req.JSONRPC = "2.0" + } + if strings.TrimSpace(req.Method) == "" { + return nil, fmt.Errorf("missing method") + } + reqBytes, err := json.Marshal(req) + if err != nil { + return nil, err + } + return []string{string(reqBytes)}, nil +} diff --git a/internal/mcp/manager.go b/internal/mcp/manager.go index 06e7a925..845b46fe 100644 --- a/internal/mcp/manager.go +++ b/internal/mcp/manager.go @@ -88,15 +88,8 @@ func (m *Manager) EnsureBot(ctx context.Context, botID string) error { return err } - dataMount := m.cfg.DataMount - if dataMount == "" { - dataMount = config.DefaultDataMount - } - - image := m.cfg.BusyboxImage - if image == "" { - image = config.DefaultBusyboxImg - } + dataMount := m.dataMount() + image := m.imageRef() specOpts := []oci.SpecOpts{ oci.WithMounts([]specs.Mount{ @@ -235,25 +228,38 @@ func (m *Manager) DataDir(botID string) (string, error) { return "", err } - root := m.cfg.DataRoot - if root == "" { - root = config.DefaultDataRoot - } - return filepath.Join(root, "bots", botID), nil + return filepath.Join(m.dataRoot(), "bots", botID), nil } func (m *Manager) ensureBotDir(botID string) (string, error) { - root := m.cfg.DataRoot - if root == "" { - root = config.DefaultDataRoot - } - dir := filepath.Join(root, "bots", botID) + dir := filepath.Join(m.dataRoot(), "bots", botID) if err := os.MkdirAll(dir, 0o755); err != nil { return "", err } return dir, nil } +func (m *Manager) dataRoot() string { + if m.cfg.DataRoot == "" { + return config.DefaultDataRoot + } + return m.cfg.DataRoot +} + +func (m *Manager) dataMount() string { + if m.cfg.DataMount == "" { + return config.DefaultDataMount + } + return m.cfg.DataMount +} + +func (m *Manager) imageRef() string { + if m.cfg.BusyboxImage == "" { + return config.DefaultBusyboxImg + } + return m.cfg.BusyboxImage +} + func validateBotID(botID string) error { return identity.ValidateUserID(botID) } From 26dd8651b7b69b24da620b84891151917295db58 Mon Sep 17 00:00:00 2001 From: Ran <16112591+chen-ran@users.noreply.github.com> Date: Sun, 8 Feb 2026 21:39:34 +0800 Subject: [PATCH 2/2] feat: go cni lifecycle manage --- cmd/cli/main.go | 147 ++++++++++++++++++++++++++++ go.mod | 4 + go.sum | 8 ++ internal/containerd/network.go | 164 ++++++++++++++++++++++++++++++++ internal/containerd/resolv.go | 73 ++++++++++++++ internal/handlers/containerd.go | 69 +++++++++++--- internal/mcp/manager.go | 24 ++++- internal/mcp/versioning.go | 20 ++++ mise.toml | 12 +-- scripts/lima-up.sh | 44 +++++++++ 10 files changed, 544 insertions(+), 21 deletions(-) create mode 100644 internal/containerd/network.go create mode 100644 internal/containerd/resolv.go create mode 100644 scripts/lima-up.sh diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 45f8c675..1836dfaf 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -1,14 +1,21 @@ package main import ( + "context" + "encoding/json" "flag" + "fmt" "io" "os" "os/exec" + "path/filepath" "runtime" "strconv" + "strings" "sync" "time" + + gocni "github.com/containerd/go-cni" ) func main() { @@ -16,6 +23,19 @@ func main() { containerID := flag.String("container-id", "", "") flag.Parse() + if len(flag.Args()) > 0 { + switch flag.Arg(0) { + case "cni-setup": + os.Exit(runCNISetup(flag.Args()[1:])) + case "cni-remove": + os.Exit(runCNIRemove(flag.Args()[1:])) + case "cni-check": + os.Exit(runCNICheck(flag.Args()[1:])) + case "cni-status": + os.Exit(runCNIStatus(flag.Args()[1:])) + } + } + if *containerID == "" { os.Exit(2) } @@ -108,3 +128,130 @@ func runWithStdio(cmd *exec.Cmd) error { wg.Wait() return err } + +func runCNISetup(args []string) int { + id, netns, err := parseCNIArgs(args) + if err != nil { + return exitWithError(err) + } + cni, err := newCNIFromArgs(args) + if err != nil { + return exitWithError(err) + } + if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil { + return exitWithError(err) + } + result, err := cni.Setup(context.Background(), id, netns) + if err != nil { + return exitWithError(err) + } + if result != nil { + _ = json.NewEncoder(os.Stdout).Encode(result) + } + return 0 +} + +func runCNIRemove(args []string) int { + id, netns, err := parseCNIArgs(args) + if err != nil { + return exitWithError(err) + } + cni, err := newCNIFromArgs(args) + if err != nil { + return exitWithError(err) + } + if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil { + return exitWithError(err) + } + if err := cni.Remove(context.Background(), id, netns); err != nil { + return exitWithError(err) + } + return 0 +} + +func runCNICheck(args []string) int { + id, netns, err := parseCNIArgs(args) + if err != nil { + return exitWithError(err) + } + cni, err := newCNIFromArgs(args) + if err != nil { + return exitWithError(err) + } + if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil { + return exitWithError(err) + } + if err := cni.Check(context.Background(), id, netns); err != nil { + return exitWithError(err) + } + return 0 +} + +func runCNIStatus(args []string) int { + cni, err := newCNIFromArgs(args) + if err != nil { + return exitWithError(err) + } + if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil { + return exitWithError(err) + } + if err := cni.Status(); err != nil { + return exitWithError(err) + } + return 0 +} + +func parseCNIArgs(args []string) (string, string, error) { + fs := flag.NewFlagSet("cni", flag.ContinueOnError) + fs.SetOutput(io.Discard) + id := fs.String("id", "", "") + netns := fs.String("netns", "", "") + pid := fs.Int("pid", 0, "") + _ = fs.String("conf-dir", "", "") + _ = fs.String("bin-dir", "", "") + _ = fs.String("if-prefix", "", "") + if err := fs.Parse(args); err != nil { + return "", "", err + } + if *id == "" { + return "", "", fmt.Errorf("missing --id") + } + if *netns == "" && *pid == 0 { + return "", "", fmt.Errorf("missing --netns or --pid") + } + if *netns == "" { + *netns = filepath.Join("/proc", strconv.Itoa(*pid), "ns", "net") + } + return *id, *netns, nil +} + +func newCNIFromArgs(args []string) (gocni.CNI, error) { + fs := flag.NewFlagSet("cni", flag.ContinueOnError) + fs.SetOutput(io.Discard) + confDir := fs.String("conf-dir", "", "") + binDir := fs.String("bin-dir", "", "") + ifPrefix := fs.String("if-prefix", "", "") + _ = fs.String("id", "", "") + _ = fs.String("netns", "", "") + _ = fs.Int("pid", 0, "") + if err := fs.Parse(args); err != nil { + return nil, err + } + + opts := []gocni.Opt{} + if strings.TrimSpace(*binDir) != "" { + opts = append(opts, gocni.WithPluginDir([]string{*binDir})) + } + if strings.TrimSpace(*confDir) != "" { + opts = append(opts, gocni.WithPluginConfDir(*confDir)) + } + if strings.TrimSpace(*ifPrefix) != "" { + opts = append(opts, gocni.WithInterfacePrefix(*ifPrefix)) + } + return gocni.New(opts...) +} + +func exitWithError(err error) int { + _, _ = fmt.Fprintln(os.Stderr, err.Error()) + return 1 +} diff --git a/go.mod b/go.mod index c39ed073..2005a394 100644 --- a/go.mod +++ b/go.mod @@ -45,10 +45,12 @@ require ( github.com/containerd/continuity v0.4.5 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/fifo v1.1.0 // indirect + github.com/containerd/go-cni v1.1.13 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/plugin v1.0.0 // indirect github.com/containerd/ttrpc v1.2.7 // indirect github.com/containerd/typeurl/v2 v2.2.3 // indirect + github.com/containernetworking/cni v1.3.0 // indirect github.com/cyphar/filepath-securejoin v0.6.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.6.0 // indirect @@ -87,8 +89,10 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/opencontainers/selinux v1.13.1 // indirect + github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/sasha-s/go-deadlock v0.3.5 // indirect github.com/sirupsen/logrus v1.9.4 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect diff --git a/go.sum b/go.sum index 44d2b2db..8864560c 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151X github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= github.com/containerd/fifo v1.1.0 h1:4I2mbh5stb1u6ycIABlBw9zgtlK8viPI9QkQNRQEEmY= github.com/containerd/fifo v1.1.0/go.mod h1:bmC4NWMbXlt2EZ0Hc7Fx7QzTFxgPID13eH0Qu+MAb2o= +github.com/containerd/go-cni v1.1.13 h1:eFSGOKlhoYNxpJ51KRIMHZNlg5UgocXEIEBGkY7Hnis= +github.com/containerd/go-cni v1.1.13/go.mod h1:nTieub0XDRmvCZ9VI/SBG6PyqT95N4FIhxsauF1vSBI= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/containerd/platforms v1.0.0-rc.2 h1:0SPgaNZPVWGEi4grZdV8VRYQn78y+nm6acgLGv/QzE4= @@ -59,6 +61,8 @@ github.com/containerd/ttrpc v1.2.7 h1:qIrroQvuOL9HQ1X6KHe2ohc7p+HP/0VE6XPU7elJRq github.com/containerd/ttrpc v1.2.7/go.mod h1:YCXHsb32f+Sq5/72xHubdiJRQY9inL4a4ZQrAbN1q9o= github.com/containerd/typeurl/v2 v2.2.3 h1:yNA/94zxWdvYACdYO8zofhrTVuQY73fFU1y++dYSw40= github.com/containerd/typeurl/v2 v2.2.3/go.mod h1:95ljDnPfD3bAbDJRugOiShd/DlAAsxGtUBhJxIn7SCk= +github.com/containernetworking/cni v1.3.0 h1:v6EpN8RznAZj9765HhXQrtXgX+ECGebEYEmnuFjskwo= +github.com/containernetworking/cni v1.3.0/go.mod h1:Bs8glZjjFfGPHMw6hQu82RUgEPNGEaBb9KS5KtNMnJ4= github.com/cyphar/filepath-securejoin v0.6.1 h1:5CeZ1jPXEiYt3+Z6zqprSAgSWiggmpVyciv8syjIpVE= github.com/cyphar/filepath-securejoin v0.6.1/go.mod h1:A8hd4EnAeyujCJRrICiOWqjS1AX0a9kM5XL+NwKoYSc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -202,6 +206,8 @@ github.com/opencontainers/runtime-spec v1.3.0 h1:YZupQUdctfhpZy3TM39nN9Ika5CBWT5 github.com/opencontainers/runtime-spec v1.3.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.13.1 h1:A8nNeceYngH9Ow++M+VVEwJVpdFmrlxsN22F+ISDCJE= github.com/opencontainers/selinux v1.13.1/go.mod h1:S10WXZ/osk2kWOYKy1x2f/eXF5ZHJoUs8UU/2caNRbg= +github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 h1:Dx7Ovyv/SFnMFw3fD4oEoeorXc6saIiQ23LrGLth0Gw= +github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -216,6 +222,8 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb2NsU= +github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U= github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w= github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/internal/containerd/network.go b/internal/containerd/network.go new file mode 100644 index 00000000..7dc5ab7c --- /dev/null +++ b/internal/containerd/network.go @@ -0,0 +1,164 @@ +package containerd + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + + "github.com/containerd/containerd/v2/client" + gocni "github.com/containerd/go-cni" +) + +const ( + defaultCNIConfDir = "/etc/cni/net.d" + defaultCNIBinDir = "/opt/cni/bin" +) + +// SetupNetwork attaches CNI networking to a running task. +func SetupNetwork(ctx context.Context, task client.Task, containerID string) error { + if task == nil { + return ErrInvalidArgument + } + if containerID == "" { + containerID = task.ID() + } + if containerID == "" { + return ErrInvalidArgument + } + + pid := task.Pid() + if pid == 0 { + return fmt.Errorf("task pid not available for %s", containerID) + } + if runtime.GOOS == "darwin" { + return setupNetworkWithCLI(ctx, containerID, pid) + } + + if _, err := os.Stat(defaultCNIConfDir); err != nil { + return fmt.Errorf("cni config dir missing: %s: %w", defaultCNIConfDir, err) + } + if _, err := os.Stat(defaultCNIBinDir); err != nil { + return fmt.Errorf("cni bin dir missing: %s: %w", defaultCNIBinDir, err) + } + netnsPath := filepath.Join("/proc", fmt.Sprint(pid), "ns", "net") + if _, err := os.Stat(netnsPath); err != nil { + return fmt.Errorf("netns not found: %s: %w", netnsPath, err) + } + + cni, err := gocni.New( + gocni.WithPluginDir([]string{defaultCNIBinDir}), + gocni.WithPluginConfDir(defaultCNIConfDir), + ) + if err != nil { + return err + } + if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil { + return err + } + _, err = cni.Setup(ctx, containerID, netnsPath) + return err +} + +func setupNetworkWithCLI(ctx context.Context, containerID string, pid uint32) error { + args := []string{ + "shell", + "--tty=false", + "default", + "--", + "sudo", + "-n", + "memoh-cli", + "cni-setup", + "--id", containerID, + "--pid", fmt.Sprint(pid), + "--conf-dir", defaultCNIConfDir, + "--bin-dir", defaultCNIBinDir, + } + cmd := exec.CommandContext(ctx, "limactl", args...) + var stderr bytes.Buffer + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + if stderr.Len() > 0 { + return fmt.Errorf("cni cli failed: %s", strings.TrimSpace(stderr.String())) + } + return err + } + return nil +} + +// RemoveNetwork detaches CNI networking for a running task. +func RemoveNetwork(ctx context.Context, task client.Task, containerID string) error { + if task == nil { + return ErrInvalidArgument + } + if containerID == "" { + containerID = task.ID() + } + if containerID == "" { + return ErrInvalidArgument + } + + pid := task.Pid() + if pid == 0 { + return fmt.Errorf("task pid not available for %s", containerID) + } + if runtime.GOOS == "darwin" { + return removeNetworkWithCLI(ctx, containerID, pid) + } + + if _, err := os.Stat(defaultCNIConfDir); err != nil { + return fmt.Errorf("cni config dir missing: %s: %w", defaultCNIConfDir, err) + } + if _, err := os.Stat(defaultCNIBinDir); err != nil { + return fmt.Errorf("cni bin dir missing: %s: %w", defaultCNIBinDir, err) + } + + netnsPath := filepath.Join("/proc", fmt.Sprint(pid), "ns", "net") + if _, err := os.Stat(netnsPath); err != nil { + return fmt.Errorf("netns not found: %s: %w", netnsPath, err) + } + + cni, err := gocni.New( + gocni.WithPluginDir([]string{defaultCNIBinDir}), + gocni.WithPluginConfDir(defaultCNIConfDir), + ) + if err != nil { + return err + } + if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil { + return err + } + return cni.Remove(ctx, containerID, netnsPath) +} + +func removeNetworkWithCLI(ctx context.Context, containerID string, pid uint32) error { + args := []string{ + "shell", + "--tty=false", + "default", + "--", + "sudo", + "-n", + "memoh-cli", + "cni-remove", + "--id", containerID, + "--pid", fmt.Sprint(pid), + "--conf-dir", defaultCNIConfDir, + "--bin-dir", defaultCNIBinDir, + } + cmd := exec.CommandContext(ctx, "limactl", args...) + var stderr bytes.Buffer + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + if stderr.Len() > 0 { + return fmt.Errorf("cni cli failed: %s", strings.TrimSpace(stderr.String())) + } + return err + } + return nil +} diff --git a/internal/containerd/resolv.go b/internal/containerd/resolv.go new file mode 100644 index 00000000..8455c0c1 --- /dev/null +++ b/internal/containerd/resolv.go @@ -0,0 +1,73 @@ +package containerd + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" +) + +const ( + systemdResolvConf = "/run/systemd/resolve/resolv.conf" + fallbackResolv = "nameserver 1.1.1.1\nnameserver 8.8.8.8\n" +) + +// ResolveConfSource returns a host path to mount as /etc/resolv.conf. +// If systemd-resolved config is available, use it. Otherwise write a fallback +// resolv.conf under dataDir and return that path. +func ResolveConfSource(dataDir string) (string, error) { + if strings.TrimSpace(dataDir) == "" { + return "", ErrInvalidArgument + } + if runtime.GOOS == "darwin" { + if ok, err := limaFileExists(systemdResolvConf); err != nil { + return "", err + } else if ok { + return systemdResolvConf, nil + } + } else if _, err := os.Stat(systemdResolvConf); err == nil { + return systemdResolvConf, nil + } + + if err := os.MkdirAll(dataDir, 0o755); err != nil { + return "", err + } + fallbackPath := filepath.Join(dataDir, "resolv.conf") + if _, err := os.Stat(fallbackPath); err == nil { + return fallbackPath, nil + } else if !os.IsNotExist(err) { + return "", err + } + if err := os.WriteFile(fallbackPath, []byte(fallbackResolv), 0o644); err != nil { + return "", err + } + return fallbackPath, nil +} + +func limaFileExists(path string) (bool, error) { + if strings.TrimSpace(path) == "" { + return false, ErrInvalidArgument + } + cmd := exec.Command( + "limactl", + "shell", + "--tty=false", + "default", + "--", + "test", + "-f", + path, + ) + if err := cmd.Run(); err == nil { + return true, nil + } else if exitErr, ok := err.(*exec.ExitError); ok { + if exitErr.ExitCode() == 1 { + return false, nil + } + return false, fmt.Errorf("lima test failed for %s: %w", path, err) + } else { + return false, fmt.Errorf("lima test failed for %s: %w", path, err) + } +} diff --git a/internal/handlers/containerd.go b/internal/handlers/containerd.go index e4704da3..1e194ae7 100644 --- a/internal/handlers/containerd.go +++ b/internal/handlers/containerd.go @@ -173,6 +173,10 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error { if err := os.MkdirAll(filepath.Join(dataDir, ".skills"), 0o755); err != nil { return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) } + resolvPath, err := ctr.ResolveConfSource(dataDir) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } specOpts := []oci.SpecOpts{ oci.WithMounts([]specs.Mount{ @@ -188,6 +192,12 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error { Source: dataDir, Options: []string{"rbind", "rw"}, }, + { + Destination: "/etc/resolv.conf", + Type: "bind", + Source: resolvPath, + Options: []string{"rbind", "ro"}, + }, }), oci.WithProcessArgs("/bin/sh", "-lc", "bootstrap(){ [ -e /app/mcp ] || { mkdir -p /app; [ -f /opt/mcp ] && cp -a /opt/mcp /app/mcp 2>/dev/null || true; }; }; bootstrap; exec /app/mcp"), } @@ -227,14 +237,22 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error { } started := false - if _, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{ + if task, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{ UseStdio: false, }); err == nil { - started = true - if h.queries != nil { - if pgBotID, parseErr := parsePgUUID(botID); parseErr == nil { - _ = h.queries.UpdateContainerStarted(c.Request().Context(), pgBotID) + if netErr := ctr.SetupNetwork(ctx, task, containerID); netErr == nil { + started = true + if h.queries != nil { + if pgBotID, parseErr := parsePgUUID(botID); parseErr == nil { + _ = h.queries.UpdateContainerStarted(c.Request().Context(), pgBotID) + } } + } else { + _ = h.service.StopTask(ctx, containerID, &ctr.StopTaskOptions{Force: true}) + h.logger.Error("mcp container network setup failed", + slog.String("container_id", containerID), + slog.Any("error", netErr), + ) } } else { h.logger.Error("mcp container start failed", @@ -265,9 +283,16 @@ func (h *ContainerdHandler) ensureTaskRunning(ctx context.Context, containerID s _ = h.service.DeleteTask(ctx, containerID, &ctr.DeleteTaskOptions{Force: true}) } - _, err = h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{ + task, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{ UseStdio: false, }) + if err != nil { + return err + } + if err := ctr.SetupNetwork(ctx, task, containerID); err != nil { + _ = h.service.StopTask(ctx, containerID, &ctr.StopTaskOptions{Force: true}) + return err + } return err } @@ -646,6 +671,10 @@ func (h *ContainerdHandler) SetupBotContainer(ctx context.Context, botID string) if err := os.MkdirAll(filepath.Join(dataDir, ".skills"), 0o755); err != nil { return err } + resolvPath, err := ctr.ResolveConfSource(dataDir) + if err != nil { + return err + } specOpts := []oci.SpecOpts{ oci.WithMounts([]specs.Mount{ @@ -661,11 +690,17 @@ func (h *ContainerdHandler) SetupBotContainer(ctx context.Context, botID string) Source: dataDir, Options: []string{"rbind", "rw"}, }, + { + Destination: "/etc/resolv.conf", + Type: "bind", + Source: resolvPath, + Options: []string{"rbind", "ro"}, + }, }), oci.WithProcessArgs("/bin/sh", "-lc", "bootstrap(){ [ -e /app/mcp ] || { mkdir -p /app; [ -f /opt/mcp ] && cp -a /opt/mcp /app/mcp 2>/dev/null || true; }; }; bootstrap; exec /app/mcp"), } - _, err := h.service.CreateContainer(ctx, ctr.CreateContainerRequest{ + _, err = h.service.CreateContainer(ctx, ctr.CreateContainerRequest{ ID: containerID, ImageRef: image, Snapshotter: snapshotter, @@ -699,13 +734,22 @@ func (h *ContainerdHandler) SetupBotContainer(ctx context.Context, botID string) } } - if _, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{ + if task, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{ UseStdio: false, }); err == nil { - if h.queries != nil { - if pgBotID, parseErr := parsePgUUID(botID); parseErr == nil { - _ = h.queries.UpdateContainerStarted(ctx, pgBotID) + if netErr := ctr.SetupNetwork(ctx, task, containerID); netErr == nil { + if h.queries != nil { + if pgBotID, parseErr := parsePgUUID(botID); parseErr == nil { + _ = h.queries.UpdateContainerStarted(ctx, pgBotID) + } } + } else { + _ = h.service.StopTask(ctx, containerID, &ctr.StopTaskOptions{Force: true}) + h.logger.Error("setup bot container: network setup failed", + slog.String("bot_id", botID), + slog.String("container_id", containerID), + slog.Any("error", netErr), + ) } } else { h.logger.Error("setup bot container: task start failed", @@ -729,6 +773,9 @@ func (h *ContainerdHandler) CleanupBotContainer(ctx context.Context, botID strin return nil } + if task, taskErr := h.service.GetTask(ctx, containerID); taskErr == nil { + _ = ctr.RemoveNetwork(ctx, task, containerID) + } _ = h.service.StopTask(ctx, containerID, &ctr.StopTaskOptions{ Timeout: 5 * time.Second, Force: true, diff --git a/internal/mcp/manager.go b/internal/mcp/manager.go index 845b46fe..e2a46218 100644 --- a/internal/mcp/manager.go +++ b/internal/mcp/manager.go @@ -90,6 +90,10 @@ func (m *Manager) EnsureBot(ctx context.Context, botID string) error { dataMount := m.dataMount() image := m.imageRef() + resolvPath, err := ctr.ResolveConfSource(dataDir) + if err != nil { + return err + } specOpts := []oci.SpecOpts{ oci.WithMounts([]specs.Mount{ @@ -105,6 +109,12 @@ func (m *Manager) EnsureBot(ctx context.Context, botID string) error { Source: dataDir, Options: []string{"rbind", "rw"}, }, + { + Destination: "/etc/resolv.conf", + Type: "bind", + Source: resolvPath, + Options: []string{"rbind", "ro"}, + }, }), } @@ -155,10 +165,17 @@ func (m *Manager) Start(ctx context.Context, botID string) error { return err } - _, err := m.service.StartTask(ctx, m.containerID(botID), &ctr.StartTaskOptions{ + task, err := m.service.StartTask(ctx, m.containerID(botID), &ctr.StartTaskOptions{ UseStdio: false, }) - return err + if err != nil { + return err + } + if err := ctr.SetupNetwork(ctx, task, m.containerID(botID)); err != nil { + _ = m.service.StopTask(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}) + return err + } + return nil } func (m *Manager) Stop(ctx context.Context, botID string, timeout time.Duration) error { @@ -176,6 +193,9 @@ func (m *Manager) Delete(ctx context.Context, botID string) error { return err } + if task, taskErr := m.service.GetTask(ctx, m.containerID(botID)); taskErr == nil { + _ = ctr.RemoveNetwork(ctx, task, m.containerID(botID)) + } _ = m.service.DeleteTask(ctx, m.containerID(botID), &ctr.DeleteTaskOptions{Force: true}) return m.service.DeleteContainer(ctx, m.containerID(botID), &ctr.DeleteContainerOptions{ CleanupSnapshot: true, diff --git a/internal/mcp/versioning.go b/internal/mcp/versioning.go index a4f40ca5..0ceb603d 100644 --- a/internal/mcp/versioning.go +++ b/internal/mcp/versioning.go @@ -75,6 +75,10 @@ func (m *Manager) CreateVersion(ctx context.Context, userID string) (*VersionInf if dataMount == "" { dataMount = config.DefaultDataMount } + resolvPath, err := ctr.ResolveConfSource(dataDir) + if err != nil { + return nil, err + } specOpts := []oci.SpecOpts{ oci.WithMounts([]specs.Mount{ @@ -90,6 +94,12 @@ func (m *Manager) CreateVersion(ctx context.Context, userID string) (*VersionInf Source: dataDir, Options: []string{"rbind", "rw"}, }, + { + Destination: "/etc/resolv.conf", + Type: "bind", + Source: resolvPath, + Options: []string{"rbind", "ro"}, + }, }), } @@ -202,6 +212,10 @@ func (m *Manager) RollbackVersion(ctx context.Context, userID string, version in if dataMount == "" { dataMount = config.DefaultDataMount } + resolvPath, err := ctr.ResolveConfSource(dataDir) + if err != nil { + return err + } specOpts := []oci.SpecOpts{ oci.WithMounts([]specs.Mount{ { @@ -216,6 +230,12 @@ func (m *Manager) RollbackVersion(ctx context.Context, userID string, version in Source: dataDir, Options: []string{"rbind", "rw"}, }, + { + Destination: "/etc/resolv.conf", + Type: "bind", + Source: resolvPath, + Options: []string{"rbind", "ro"}, + }, }), } diff --git a/mise.toml b/mise.toml index fe3f3b40..95643f94 100644 --- a/mise.toml +++ b/mise.toml @@ -27,11 +27,7 @@ description = "Install Go dependencies" run = "go mod download" [tasks.lima-up] -run = """ -if [ "$(uname -s)" = "Darwin" ]; then - limactl start default -fi -""" +run = "scripts/lima-up.sh" [tasks.lima-down] run = """ @@ -69,9 +65,9 @@ run = "cd packages/cli && npm install -g" description = "Build Go CLI binary and install to local bin" run = """ mkdir -p ~/.local/bin -go build -trimpath -ldflags "-s -w" -o ~/.local/bin/container-cli ./cmd/cli -chmod +x ~/.local/bin/container-cli -echo "✓ CLI binary installed to ~/.local/bin/container-cli" +go build -trimpath -ldflags "-s -w" -o ~/.local/bin/memoh-cli ./cmd/cli +chmod +x ~/.local/bin/memoh-cli +echo "✓ CLI binary installed to ~/.local/bin/memoh-cli" """ [tasks.mcp-image-up] diff --git a/scripts/lima-up.sh b/scripts/lima-up.sh new file mode 100644 index 00000000..2aa6ce34 --- /dev/null +++ b/scripts/lima-up.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env sh +set -e + +if [ "$(uname -s)" != "Darwin" ]; then + exit 0 +fi + +limactl start default + +if ! limactl shell default -- sh -lc 'command -v memoh-cli >/dev/null 2>&1'; then + vm_arch=$(limactl shell default -- uname -m) + if [ "$vm_arch" = "aarch64" ] || [ "$vm_arch" = "arm64" ]; then + go_arch="arm64" + else + go_arch="amd64" + fi + bin_path="/tmp/memoh-cli-linux-$go_arch" + GOOS=linux GOARCH=$go_arch go build -trimpath -ldflags "-s -w" -o "$bin_path" ./cmd/cli + limactl shell default -- sudo -n mkdir -p /usr/local/bin + limactl shell default -- sudo -n tee /usr/local/bin/memoh-cli >/dev/null < "$bin_path" + limactl shell default -- sudo -n chmod +x /usr/local/bin/memoh-cli +fi + +limactl shell default -- sh -lc 'command -v curl >/dev/null 2>&1' || { + echo "curl not found in Lima VM; install curl and rerun" + exit 1 +} + +limactl shell default -- sh -lc 'test -x /opt/cni/bin/bridge' || { + vm_arch=$(limactl shell default -- uname -m) + if [ "$vm_arch" = "aarch64" ] || [ "$vm_arch" = "arm64" ]; then + cni_arch="arm64" + else + cni_arch="amd64" + fi + url="https://github.com/containernetworking/plugins/releases/download/v1.9.0/cni-plugins-linux-${cni_arch}-v1.9.0.tgz" + limactl shell default -- sudo -n mkdir -p /opt/cni/bin + limactl shell default -- sudo -n curl -L -o /tmp/cni-plugins.tgz "$url" + limactl shell default -- sudo -n tar -C /opt/cni/bin -xzf /tmp/cni-plugins.tgz +} + +limactl shell default -- sudo -n mkdir -p /etc/cni/net.d +limactl shell default -- sudo -n sh -lc 'test -f /etc/cni/net.d/10-memoh-bridge.conflist' || \ +limactl shell default -- sudo -n sh -lc 'printf "%s\n" "{" " \"cniVersion\": \"0.4.0\"," " \"name\": \"memoh-bridge\"," " \"plugins\": [" " {" " \"type\": \"bridge\"," " \"bridge\": \"cni0\"," " \"isGateway\": true," " \"ipMasq\": true," " \"promiscMode\": false," " \"hairpinMode\": true," " \"ipam\": {" " \"type\": \"host-local\"," " \"subnet\": \"10.88.0.0/16\"," " \"routes\": [" " {\"dst\": \"0.0.0.0/0\"}" " ]" " }" " }," " {\"type\": \"portmap\", \"capabilities\": {\"portMappings\": true}}," " {\"type\": \"firewall\"}," " {\"type\": \"tuning\"}" " ]" "}" > /etc/cni/net.d/10-memoh-bridge.conflist'