Merge branch 'refactor/channel-gateway'

This commit is contained in:
Acbox
2026-02-08 22:34:50 +08:00
12 changed files with 682 additions and 154 deletions
+147
View File
@@ -1,14 +1,21 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
gocni "github.com/containerd/go-cni"
)
func main() {
@@ -16,6 +23,19 @@ func main() {
containerID := flag.String("container-id", "", "")
flag.Parse()
if len(flag.Args()) > 0 {
switch flag.Arg(0) {
case "cni-setup":
os.Exit(runCNISetup(flag.Args()[1:]))
case "cni-remove":
os.Exit(runCNIRemove(flag.Args()[1:]))
case "cni-check":
os.Exit(runCNICheck(flag.Args()[1:]))
case "cni-status":
os.Exit(runCNIStatus(flag.Args()[1:]))
}
}
if *containerID == "" {
os.Exit(2)
}
@@ -108,3 +128,130 @@ func runWithStdio(cmd *exec.Cmd) error {
wg.Wait()
return err
}
func runCNISetup(args []string) int {
id, netns, err := parseCNIArgs(args)
if err != nil {
return exitWithError(err)
}
cni, err := newCNIFromArgs(args)
if err != nil {
return exitWithError(err)
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return exitWithError(err)
}
result, err := cni.Setup(context.Background(), id, netns)
if err != nil {
return exitWithError(err)
}
if result != nil {
_ = json.NewEncoder(os.Stdout).Encode(result)
}
return 0
}
func runCNIRemove(args []string) int {
id, netns, err := parseCNIArgs(args)
if err != nil {
return exitWithError(err)
}
cni, err := newCNIFromArgs(args)
if err != nil {
return exitWithError(err)
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return exitWithError(err)
}
if err := cni.Remove(context.Background(), id, netns); err != nil {
return exitWithError(err)
}
return 0
}
func runCNICheck(args []string) int {
id, netns, err := parseCNIArgs(args)
if err != nil {
return exitWithError(err)
}
cni, err := newCNIFromArgs(args)
if err != nil {
return exitWithError(err)
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return exitWithError(err)
}
if err := cni.Check(context.Background(), id, netns); err != nil {
return exitWithError(err)
}
return 0
}
func runCNIStatus(args []string) int {
cni, err := newCNIFromArgs(args)
if err != nil {
return exitWithError(err)
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return exitWithError(err)
}
if err := cni.Status(); err != nil {
return exitWithError(err)
}
return 0
}
func parseCNIArgs(args []string) (string, string, error) {
fs := flag.NewFlagSet("cni", flag.ContinueOnError)
fs.SetOutput(io.Discard)
id := fs.String("id", "", "")
netns := fs.String("netns", "", "")
pid := fs.Int("pid", 0, "")
_ = fs.String("conf-dir", "", "")
_ = fs.String("bin-dir", "", "")
_ = fs.String("if-prefix", "", "")
if err := fs.Parse(args); err != nil {
return "", "", err
}
if *id == "" {
return "", "", fmt.Errorf("missing --id")
}
if *netns == "" && *pid == 0 {
return "", "", fmt.Errorf("missing --netns or --pid")
}
if *netns == "" {
*netns = filepath.Join("/proc", strconv.Itoa(*pid), "ns", "net")
}
return *id, *netns, nil
}
func newCNIFromArgs(args []string) (gocni.CNI, error) {
fs := flag.NewFlagSet("cni", flag.ContinueOnError)
fs.SetOutput(io.Discard)
confDir := fs.String("conf-dir", "", "")
binDir := fs.String("bin-dir", "", "")
ifPrefix := fs.String("if-prefix", "", "")
_ = fs.String("id", "", "")
_ = fs.String("netns", "", "")
_ = fs.Int("pid", 0, "")
if err := fs.Parse(args); err != nil {
return nil, err
}
opts := []gocni.Opt{}
if strings.TrimSpace(*binDir) != "" {
opts = append(opts, gocni.WithPluginDir([]string{*binDir}))
}
if strings.TrimSpace(*confDir) != "" {
opts = append(opts, gocni.WithPluginConfDir(*confDir))
}
if strings.TrimSpace(*ifPrefix) != "" {
opts = append(opts, gocni.WithInterfacePrefix(*ifPrefix))
}
return gocni.New(opts...)
}
func exitWithError(err error) int {
_, _ = fmt.Fprintln(os.Stderr, err.Error())
return 1
}
+4
View File
@@ -45,10 +45,12 @@ require (
github.com/containerd/continuity v0.4.5 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/containerd/fifo v1.1.0 // indirect
github.com/containerd/go-cni v1.1.13 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/plugin v1.0.0 // indirect
github.com/containerd/ttrpc v1.2.7 // indirect
github.com/containerd/typeurl/v2 v2.2.3 // indirect
github.com/containernetworking/cni v1.3.0 // indirect
github.com/cyphar/filepath-securejoin v0.6.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect
@@ -87,8 +89,10 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/opencontainers/selinux v1.13.1 // indirect
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/sasha-s/go-deadlock v0.3.5 // indirect
github.com/sirupsen/logrus v1.9.4 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
+8
View File
@@ -49,6 +49,8 @@ github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151X
github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk=
github.com/containerd/fifo v1.1.0 h1:4I2mbh5stb1u6ycIABlBw9zgtlK8viPI9QkQNRQEEmY=
github.com/containerd/fifo v1.1.0/go.mod h1:bmC4NWMbXlt2EZ0Hc7Fx7QzTFxgPID13eH0Qu+MAb2o=
github.com/containerd/go-cni v1.1.13 h1:eFSGOKlhoYNxpJ51KRIMHZNlg5UgocXEIEBGkY7Hnis=
github.com/containerd/go-cni v1.1.13/go.mod h1:nTieub0XDRmvCZ9VI/SBG6PyqT95N4FIhxsauF1vSBI=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/containerd/platforms v1.0.0-rc.2 h1:0SPgaNZPVWGEi4grZdV8VRYQn78y+nm6acgLGv/QzE4=
@@ -59,6 +61,8 @@ github.com/containerd/ttrpc v1.2.7 h1:qIrroQvuOL9HQ1X6KHe2ohc7p+HP/0VE6XPU7elJRq
github.com/containerd/ttrpc v1.2.7/go.mod h1:YCXHsb32f+Sq5/72xHubdiJRQY9inL4a4ZQrAbN1q9o=
github.com/containerd/typeurl/v2 v2.2.3 h1:yNA/94zxWdvYACdYO8zofhrTVuQY73fFU1y++dYSw40=
github.com/containerd/typeurl/v2 v2.2.3/go.mod h1:95ljDnPfD3bAbDJRugOiShd/DlAAsxGtUBhJxIn7SCk=
github.com/containernetworking/cni v1.3.0 h1:v6EpN8RznAZj9765HhXQrtXgX+ECGebEYEmnuFjskwo=
github.com/containernetworking/cni v1.3.0/go.mod h1:Bs8glZjjFfGPHMw6hQu82RUgEPNGEaBb9KS5KtNMnJ4=
github.com/cyphar/filepath-securejoin v0.6.1 h1:5CeZ1jPXEiYt3+Z6zqprSAgSWiggmpVyciv8syjIpVE=
github.com/cyphar/filepath-securejoin v0.6.1/go.mod h1:A8hd4EnAeyujCJRrICiOWqjS1AX0a9kM5XL+NwKoYSc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -202,6 +206,8 @@ github.com/opencontainers/runtime-spec v1.3.0 h1:YZupQUdctfhpZy3TM39nN9Ika5CBWT5
github.com/opencontainers/runtime-spec v1.3.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opencontainers/selinux v1.13.1 h1:A8nNeceYngH9Ow++M+VVEwJVpdFmrlxsN22F+ISDCJE=
github.com/opencontainers/selinux v1.13.1/go.mod h1:S10WXZ/osk2kWOYKy1x2f/eXF5ZHJoUs8UU/2caNRbg=
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 h1:Dx7Ovyv/SFnMFw3fD4oEoeorXc6saIiQ23LrGLth0Gw=
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -216,6 +222,8 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb2NsU=
github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U=
github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w=
github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+164
View File
@@ -0,0 +1,164 @@
package containerd
import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"github.com/containerd/containerd/v2/client"
gocni "github.com/containerd/go-cni"
)
const (
defaultCNIConfDir = "/etc/cni/net.d"
defaultCNIBinDir = "/opt/cni/bin"
)
// SetupNetwork attaches CNI networking to a running task.
func SetupNetwork(ctx context.Context, task client.Task, containerID string) error {
if task == nil {
return ErrInvalidArgument
}
if containerID == "" {
containerID = task.ID()
}
if containerID == "" {
return ErrInvalidArgument
}
pid := task.Pid()
if pid == 0 {
return fmt.Errorf("task pid not available for %s", containerID)
}
if runtime.GOOS == "darwin" {
return setupNetworkWithCLI(ctx, containerID, pid)
}
if _, err := os.Stat(defaultCNIConfDir); err != nil {
return fmt.Errorf("cni config dir missing: %s: %w", defaultCNIConfDir, err)
}
if _, err := os.Stat(defaultCNIBinDir); err != nil {
return fmt.Errorf("cni bin dir missing: %s: %w", defaultCNIBinDir, err)
}
netnsPath := filepath.Join("/proc", fmt.Sprint(pid), "ns", "net")
if _, err := os.Stat(netnsPath); err != nil {
return fmt.Errorf("netns not found: %s: %w", netnsPath, err)
}
cni, err := gocni.New(
gocni.WithPluginDir([]string{defaultCNIBinDir}),
gocni.WithPluginConfDir(defaultCNIConfDir),
)
if err != nil {
return err
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return err
}
_, err = cni.Setup(ctx, containerID, netnsPath)
return err
}
func setupNetworkWithCLI(ctx context.Context, containerID string, pid uint32) error {
args := []string{
"shell",
"--tty=false",
"default",
"--",
"sudo",
"-n",
"memoh-cli",
"cni-setup",
"--id", containerID,
"--pid", fmt.Sprint(pid),
"--conf-dir", defaultCNIConfDir,
"--bin-dir", defaultCNIBinDir,
}
cmd := exec.CommandContext(ctx, "limactl", args...)
var stderr bytes.Buffer
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
if stderr.Len() > 0 {
return fmt.Errorf("cni cli failed: %s", strings.TrimSpace(stderr.String()))
}
return err
}
return nil
}
// RemoveNetwork detaches CNI networking for a running task.
func RemoveNetwork(ctx context.Context, task client.Task, containerID string) error {
if task == nil {
return ErrInvalidArgument
}
if containerID == "" {
containerID = task.ID()
}
if containerID == "" {
return ErrInvalidArgument
}
pid := task.Pid()
if pid == 0 {
return fmt.Errorf("task pid not available for %s", containerID)
}
if runtime.GOOS == "darwin" {
return removeNetworkWithCLI(ctx, containerID, pid)
}
if _, err := os.Stat(defaultCNIConfDir); err != nil {
return fmt.Errorf("cni config dir missing: %s: %w", defaultCNIConfDir, err)
}
if _, err := os.Stat(defaultCNIBinDir); err != nil {
return fmt.Errorf("cni bin dir missing: %s: %w", defaultCNIBinDir, err)
}
netnsPath := filepath.Join("/proc", fmt.Sprint(pid), "ns", "net")
if _, err := os.Stat(netnsPath); err != nil {
return fmt.Errorf("netns not found: %s: %w", netnsPath, err)
}
cni, err := gocni.New(
gocni.WithPluginDir([]string{defaultCNIBinDir}),
gocni.WithPluginConfDir(defaultCNIConfDir),
)
if err != nil {
return err
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return err
}
return cni.Remove(ctx, containerID, netnsPath)
}
func removeNetworkWithCLI(ctx context.Context, containerID string, pid uint32) error {
args := []string{
"shell",
"--tty=false",
"default",
"--",
"sudo",
"-n",
"memoh-cli",
"cni-remove",
"--id", containerID,
"--pid", fmt.Sprint(pid),
"--conf-dir", defaultCNIConfDir,
"--bin-dir", defaultCNIBinDir,
}
cmd := exec.CommandContext(ctx, "limactl", args...)
var stderr bytes.Buffer
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
if stderr.Len() > 0 {
return fmt.Errorf("cni cli failed: %s", strings.TrimSpace(stderr.String()))
}
return err
}
return nil
}
+73
View File
@@ -0,0 +1,73 @@
package containerd
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
)
const (
systemdResolvConf = "/run/systemd/resolve/resolv.conf"
fallbackResolv = "nameserver 1.1.1.1\nnameserver 8.8.8.8\n"
)
// ResolveConfSource returns a host path to mount as /etc/resolv.conf.
// If systemd-resolved config is available, use it. Otherwise write a fallback
// resolv.conf under dataDir and return that path.
func ResolveConfSource(dataDir string) (string, error) {
if strings.TrimSpace(dataDir) == "" {
return "", ErrInvalidArgument
}
if runtime.GOOS == "darwin" {
if ok, err := limaFileExists(systemdResolvConf); err != nil {
return "", err
} else if ok {
return systemdResolvConf, nil
}
} else if _, err := os.Stat(systemdResolvConf); err == nil {
return systemdResolvConf, nil
}
if err := os.MkdirAll(dataDir, 0o755); err != nil {
return "", err
}
fallbackPath := filepath.Join(dataDir, "resolv.conf")
if _, err := os.Stat(fallbackPath); err == nil {
return fallbackPath, nil
} else if !os.IsNotExist(err) {
return "", err
}
if err := os.WriteFile(fallbackPath, []byte(fallbackResolv), 0o644); err != nil {
return "", err
}
return fallbackPath, nil
}
func limaFileExists(path string) (bool, error) {
if strings.TrimSpace(path) == "" {
return false, ErrInvalidArgument
}
cmd := exec.Command(
"limactl",
"shell",
"--tty=false",
"default",
"--",
"test",
"-f",
path,
)
if err := cmd.Run(); err == nil {
return true, nil
} else if exitErr, ok := err.(*exec.ExitError); ok {
if exitErr.ExitCode() == 1 {
return false, nil
}
return false, fmt.Errorf("lima test failed for %s: %w", path, err)
} else {
return false, fmt.Errorf("lima test failed for %s: %w", path, err)
}
}
+58 -11
View File
@@ -173,6 +173,10 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error {
if err := os.MkdirAll(filepath.Join(dataDir, ".skills"), 0o755); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
resolvPath, err := ctr.ResolveConfSource(dataDir)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
specOpts := []oci.SpecOpts{
oci.WithMounts([]specs.Mount{
@@ -188,6 +192,12 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error {
Source: dataDir,
Options: []string{"rbind", "rw"},
},
{
Destination: "/etc/resolv.conf",
Type: "bind",
Source: resolvPath,
Options: []string{"rbind", "ro"},
},
}),
oci.WithProcessArgs("/bin/sh", "-lc", "bootstrap(){ [ -e /app/mcp ] || { mkdir -p /app; [ -f /opt/mcp ] && cp -a /opt/mcp /app/mcp 2>/dev/null || true; }; }; bootstrap; exec /app/mcp"),
}
@@ -227,14 +237,22 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error {
}
started := false
if _, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{
if task, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{
UseStdio: false,
}); err == nil {
started = true
if h.queries != nil {
if pgBotID, parseErr := parsePgUUID(botID); parseErr == nil {
_ = h.queries.UpdateContainerStarted(c.Request().Context(), pgBotID)
if netErr := ctr.SetupNetwork(ctx, task, containerID); netErr == nil {
started = true
if h.queries != nil {
if pgBotID, parseErr := parsePgUUID(botID); parseErr == nil {
_ = h.queries.UpdateContainerStarted(c.Request().Context(), pgBotID)
}
}
} else {
_ = h.service.StopTask(ctx, containerID, &ctr.StopTaskOptions{Force: true})
h.logger.Error("mcp container network setup failed",
slog.String("container_id", containerID),
slog.Any("error", netErr),
)
}
} else {
h.logger.Error("mcp container start failed",
@@ -265,9 +283,16 @@ func (h *ContainerdHandler) ensureTaskRunning(ctx context.Context, containerID s
_ = h.service.DeleteTask(ctx, containerID, &ctr.DeleteTaskOptions{Force: true})
}
_, err = h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{
task, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{
UseStdio: false,
})
if err != nil {
return err
}
if err := ctr.SetupNetwork(ctx, task, containerID); err != nil {
_ = h.service.StopTask(ctx, containerID, &ctr.StopTaskOptions{Force: true})
return err
}
return err
}
@@ -646,6 +671,10 @@ func (h *ContainerdHandler) SetupBotContainer(ctx context.Context, botID string)
if err := os.MkdirAll(filepath.Join(dataDir, ".skills"), 0o755); err != nil {
return err
}
resolvPath, err := ctr.ResolveConfSource(dataDir)
if err != nil {
return err
}
specOpts := []oci.SpecOpts{
oci.WithMounts([]specs.Mount{
@@ -661,11 +690,17 @@ func (h *ContainerdHandler) SetupBotContainer(ctx context.Context, botID string)
Source: dataDir,
Options: []string{"rbind", "rw"},
},
{
Destination: "/etc/resolv.conf",
Type: "bind",
Source: resolvPath,
Options: []string{"rbind", "ro"},
},
}),
oci.WithProcessArgs("/bin/sh", "-lc", "bootstrap(){ [ -e /app/mcp ] || { mkdir -p /app; [ -f /opt/mcp ] && cp -a /opt/mcp /app/mcp 2>/dev/null || true; }; }; bootstrap; exec /app/mcp"),
}
_, err := h.service.CreateContainer(ctx, ctr.CreateContainerRequest{
_, err = h.service.CreateContainer(ctx, ctr.CreateContainerRequest{
ID: containerID,
ImageRef: image,
Snapshotter: snapshotter,
@@ -699,13 +734,22 @@ func (h *ContainerdHandler) SetupBotContainer(ctx context.Context, botID string)
}
}
if _, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{
if task, err := h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{
UseStdio: false,
}); err == nil {
if h.queries != nil {
if pgBotID, parseErr := parsePgUUID(botID); parseErr == nil {
_ = h.queries.UpdateContainerStarted(ctx, pgBotID)
if netErr := ctr.SetupNetwork(ctx, task, containerID); netErr == nil {
if h.queries != nil {
if pgBotID, parseErr := parsePgUUID(botID); parseErr == nil {
_ = h.queries.UpdateContainerStarted(ctx, pgBotID)
}
}
} else {
_ = h.service.StopTask(ctx, containerID, &ctr.StopTaskOptions{Force: true})
h.logger.Error("setup bot container: network setup failed",
slog.String("bot_id", botID),
slog.String("container_id", containerID),
slog.Any("error", netErr),
)
}
} else {
h.logger.Error("setup bot container: task start failed",
@@ -729,6 +773,9 @@ func (h *ContainerdHandler) CleanupBotContainer(ctx context.Context, botID strin
return nil
}
if task, taskErr := h.service.GetTask(ctx, containerID); taskErr == nil {
_ = ctr.RemoveNetwork(ctx, task, containerID)
}
_ = h.service.StopTask(ctx, containerID, &ctr.StopTaskOptions{
Timeout: 5 * time.Second,
Force: true,
+19 -115
View File
@@ -62,53 +62,33 @@ func (h *ContainerdHandler) HandleMCPFS(c echo.Context) error {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
if req.JSONRPC != "" && req.JSONRPC != "2.0" {
return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: &mcptools.JSONRPCError{Code: -32600, Message: "invalid jsonrpc version"},
})
return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32600, "invalid jsonrpc version"))
}
if err := h.validateMCPContainer(c.Request().Context(), containerID, botID); err != nil {
if err := h.validateMCPContainer(ctx, containerID, botID); err != nil {
h.logger.Error("mcp fs validate failed", slog.Any("error", err), slog.String("bot_id", botID), slog.String("container_id", containerID))
return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: &mcptools.JSONRPCError{Code: -32603, Message: err.Error()},
})
return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32603, err.Error()))
}
if err := h.ensureTaskRunning(c.Request().Context(), containerID); err != nil {
if err := h.ensureTaskRunning(ctx, containerID); err != nil {
h.logger.Error("mcp fs ensure task failed", slog.Any("error", err), slog.String("bot_id", botID), slog.String("container_id", containerID))
return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: &mcptools.JSONRPCError{Code: -32603, Message: err.Error()},
})
return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32603, err.Error()))
}
if strings.TrimSpace(req.Method) == "" {
return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: &mcptools.JSONRPCError{Code: -32601, Message: "method not found"},
})
return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32601, "method not found"))
}
if len(req.ID) == 0 && strings.HasPrefix(req.Method, "notifications/") {
if err := h.notifyMCPServer(c.Request().Context(), containerID, req); err != nil {
if mcptools.IsNotification(req) {
if err := h.notifyMCPServer(ctx, containerID, req); err != nil {
h.logger.Error("mcp fs notify failed", slog.Any("error", err), slog.String("method", req.Method), slog.String("bot_id", botID), slog.String("container_id", containerID))
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
// MCP Streamable HTTP spec: notifications must be answered with 202 Accepted and no body.
return c.NoContent(http.StatusAccepted)
}
payload, err := h.callMCPServer(c.Request().Context(), containerID, req)
payload, err := h.callMCPServer(ctx, containerID, req)
if err != nil {
h.logger.Error("mcp fs call failed", slog.Any("error", err), slog.String("method", req.Method), slog.String("bot_id", botID), slog.String("container_id", containerID))
return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: &mcptools.JSONRPCError{Code: -32603, Message: err.Error()},
})
return c.JSON(http.StatusOK, mcptools.JSONRPCErrorResponse(req.ID, -32603, err.Error()))
}
return c.JSON(http.StatusOK, payload)
}
@@ -380,7 +360,7 @@ func (s *mcpSession) readLoop() {
}
func (s *mcpSession) call(ctx context.Context, req mcptools.JSONRPCRequest) (map[string]any, error) {
payloads, targetID, err := buildMCPPayloads(req, &s.initOnce)
payloads, targetID, err := mcptools.BuildPayloads(req, &s.initOnce)
if err != nil {
return nil, err
}
@@ -394,14 +374,9 @@ func (s *mcpSession) call(ctx context.Context, req mcptools.JSONRPCRequest) (map
s.pending[target] = respCh
s.pendingMu.Unlock()
s.writeMu.Lock()
for _, payload := range payloads {
if _, err := s.stdin.Write([]byte(payload + "\n")); err != nil {
s.writeMu.Unlock()
return nil, err
}
if err := s.writePayloads(payloads); err != nil {
return nil, err
}
s.writeMu.Unlock()
select {
case resp, ok := <-respCh:
@@ -437,92 +412,21 @@ func (s *mcpSession) call(ctx context.Context, req mcptools.JSONRPCRequest) (map
}
func (s *mcpSession) notify(ctx context.Context, req mcptools.JSONRPCRequest) error {
payloads, err := buildMCPNotificationPayloads(req)
payloads, err := mcptools.BuildNotificationPayloads(req)
if err != nil {
return err
}
return s.writePayloads(payloads)
}
func (s *mcpSession) writePayloads(payloads []string) error {
s.writeMu.Lock()
defer s.writeMu.Unlock()
for _, payload := range payloads {
if _, err := s.stdin.Write([]byte(payload + "\n")); err != nil {
s.writeMu.Unlock()
return err
}
}
s.writeMu.Unlock()
return nil
}
func buildMCPPayloads(req mcptools.JSONRPCRequest, initOnce *sync.Once) ([]string, json.RawMessage, error) {
if req.JSONRPC == "" {
req.JSONRPC = "2.0"
}
targetID := req.ID
payloads := []string{}
shouldInit := req.Method != "initialize" && req.Method != "notifications/initialized"
if initOnce != nil {
ran := false
initOnce.Do(func() {
ran = true
})
if ran {
// This is the first call on the session.
} else {
shouldInit = false
}
}
if shouldInit {
initReq := map[string]any{
"jsonrpc": "2.0",
"id": "init-1",
"method": "initialize",
"params": map[string]any{
"protocolVersion": "2025-06-18",
"capabilities": map[string]any{
"roots": map[string]any{
"listChanged": false,
},
},
"clientInfo": map[string]any{
"name": "memoh-http-proxy",
"version": "v0",
},
},
}
initBytes, err := json.Marshal(initReq)
if err != nil {
return nil, nil, err
}
payloads = append(payloads, string(initBytes))
initialized := map[string]any{
"jsonrpc": "2.0",
"method": "notifications/initialized",
}
initializedBytes, err := json.Marshal(initialized)
if err != nil {
return nil, nil, err
}
payloads = append(payloads, string(initializedBytes))
}
reqBytes, err := json.Marshal(req)
if err != nil {
return nil, nil, err
}
payloads = append(payloads, string(reqBytes))
return payloads, targetID, nil
}
func buildMCPNotificationPayloads(req mcptools.JSONRPCRequest) ([]string, error) {
if req.JSONRPC == "" {
req.JSONRPC = "2.0"
}
if strings.TrimSpace(req.Method) == "" {
return nil, fmt.Errorf("missing method")
}
reqBytes, err := json.Marshal(req)
if err != nil {
return nil, err
}
return []string{string(reqBytes)}, nil
}
+95
View File
@@ -0,0 +1,95 @@
package mcp
import (
"encoding/json"
"fmt"
"strings"
"sync"
)
func IsNotification(req JSONRPCRequest) bool {
return len(req.ID) == 0 && strings.HasPrefix(req.Method, "notifications/")
}
func JSONRPCErrorResponse(id json.RawMessage, code int, message string) JSONRPCResponse {
return JSONRPCResponse{
JSONRPC: "2.0",
ID: id,
Error: &JSONRPCError{Code: code, Message: message},
}
}
func BuildPayloads(req JSONRPCRequest, initOnce *sync.Once) ([]string, json.RawMessage, error) {
if req.JSONRPC == "" {
req.JSONRPC = "2.0"
}
targetID := req.ID
payloads := []string{}
shouldInit := req.Method != "initialize" && req.Method != "notifications/initialized"
if initOnce != nil {
ran := false
initOnce.Do(func() {
ran = true
})
if ran {
// This is the first call on the session.
} else {
shouldInit = false
}
}
if shouldInit {
initReq := map[string]any{
"jsonrpc": "2.0",
"id": "init-1",
"method": "initialize",
"params": map[string]any{
"protocolVersion": "2025-06-18",
"capabilities": map[string]any{
"roots": map[string]any{
"listChanged": false,
},
},
"clientInfo": map[string]any{
"name": "memoh-http-proxy",
"version": "v0",
},
},
}
initBytes, err := json.Marshal(initReq)
if err != nil {
return nil, nil, err
}
payloads = append(payloads, string(initBytes))
initialized := map[string]any{
"jsonrpc": "2.0",
"method": "notifications/initialized",
}
initializedBytes, err := json.Marshal(initialized)
if err != nil {
return nil, nil, err
}
payloads = append(payloads, string(initializedBytes))
}
reqBytes, err := json.Marshal(req)
if err != nil {
return nil, nil, err
}
payloads = append(payloads, string(reqBytes))
return payloads, targetID, nil
}
func BuildNotificationPayloads(req JSONRPCRequest) ([]string, error) {
if req.JSONRPC == "" {
req.JSONRPC = "2.0"
}
if strings.TrimSpace(req.Method) == "" {
return nil, fmt.Errorf("missing method")
}
reqBytes, err := json.Marshal(req)
if err != nil {
return nil, err
}
return []string{string(reqBytes)}, nil
}
+46 -20
View File
@@ -88,14 +88,11 @@ func (m *Manager) EnsureBot(ctx context.Context, botID string) error {
return err
}
dataMount := m.cfg.DataMount
if dataMount == "" {
dataMount = config.DefaultDataMount
}
image := m.cfg.BusyboxImage
if image == "" {
image = config.DefaultBusyboxImg
dataMount := m.dataMount()
image := m.imageRef()
resolvPath, err := ctr.ResolveConfSource(dataDir)
if err != nil {
return err
}
specOpts := []oci.SpecOpts{
@@ -112,6 +109,12 @@ func (m *Manager) EnsureBot(ctx context.Context, botID string) error {
Source: dataDir,
Options: []string{"rbind", "rw"},
},
{
Destination: "/etc/resolv.conf",
Type: "bind",
Source: resolvPath,
Options: []string{"rbind", "ro"},
},
}),
}
@@ -162,10 +165,17 @@ func (m *Manager) Start(ctx context.Context, botID string) error {
return err
}
_, err := m.service.StartTask(ctx, m.containerID(botID), &ctr.StartTaskOptions{
task, err := m.service.StartTask(ctx, m.containerID(botID), &ctr.StartTaskOptions{
UseStdio: false,
})
return err
if err != nil {
return err
}
if err := ctr.SetupNetwork(ctx, task, m.containerID(botID)); err != nil {
_ = m.service.StopTask(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true})
return err
}
return nil
}
func (m *Manager) Stop(ctx context.Context, botID string, timeout time.Duration) error {
@@ -183,6 +193,9 @@ func (m *Manager) Delete(ctx context.Context, botID string) error {
return err
}
if task, taskErr := m.service.GetTask(ctx, m.containerID(botID)); taskErr == nil {
_ = ctr.RemoveNetwork(ctx, task, m.containerID(botID))
}
_ = m.service.DeleteTask(ctx, m.containerID(botID), &ctr.DeleteTaskOptions{Force: true})
return m.service.DeleteContainer(ctx, m.containerID(botID), &ctr.DeleteContainerOptions{
CleanupSnapshot: true,
@@ -235,25 +248,38 @@ func (m *Manager) DataDir(botID string) (string, error) {
return "", err
}
root := m.cfg.DataRoot
if root == "" {
root = config.DefaultDataRoot
}
return filepath.Join(root, "bots", botID), nil
return filepath.Join(m.dataRoot(), "bots", botID), nil
}
func (m *Manager) ensureBotDir(botID string) (string, error) {
root := m.cfg.DataRoot
if root == "" {
root = config.DefaultDataRoot
}
dir := filepath.Join(root, "bots", botID)
dir := filepath.Join(m.dataRoot(), "bots", botID)
if err := os.MkdirAll(dir, 0o755); err != nil {
return "", err
}
return dir, nil
}
func (m *Manager) dataRoot() string {
if m.cfg.DataRoot == "" {
return config.DefaultDataRoot
}
return m.cfg.DataRoot
}
func (m *Manager) dataMount() string {
if m.cfg.DataMount == "" {
return config.DefaultDataMount
}
return m.cfg.DataMount
}
func (m *Manager) imageRef() string {
if m.cfg.BusyboxImage == "" {
return config.DefaultBusyboxImg
}
return m.cfg.BusyboxImage
}
func validateBotID(botID string) error {
return identity.ValidateUserID(botID)
}
+20
View File
@@ -75,6 +75,10 @@ func (m *Manager) CreateVersion(ctx context.Context, userID string) (*VersionInf
if dataMount == "" {
dataMount = config.DefaultDataMount
}
resolvPath, err := ctr.ResolveConfSource(dataDir)
if err != nil {
return nil, err
}
specOpts := []oci.SpecOpts{
oci.WithMounts([]specs.Mount{
@@ -90,6 +94,12 @@ func (m *Manager) CreateVersion(ctx context.Context, userID string) (*VersionInf
Source: dataDir,
Options: []string{"rbind", "rw"},
},
{
Destination: "/etc/resolv.conf",
Type: "bind",
Source: resolvPath,
Options: []string{"rbind", "ro"},
},
}),
}
@@ -202,6 +212,10 @@ func (m *Manager) RollbackVersion(ctx context.Context, userID string, version in
if dataMount == "" {
dataMount = config.DefaultDataMount
}
resolvPath, err := ctr.ResolveConfSource(dataDir)
if err != nil {
return err
}
specOpts := []oci.SpecOpts{
oci.WithMounts([]specs.Mount{
{
@@ -216,6 +230,12 @@ func (m *Manager) RollbackVersion(ctx context.Context, userID string, version in
Source: dataDir,
Options: []string{"rbind", "rw"},
},
{
Destination: "/etc/resolv.conf",
Type: "bind",
Source: resolvPath,
Options: []string{"rbind", "ro"},
},
}),
}
+4 -8
View File
@@ -27,11 +27,7 @@ description = "Install Go dependencies"
run = "go mod download"
[tasks.lima-up]
run = """
if [ "$(uname -s)" = "Darwin" ]; then
limactl start default
fi
"""
run = "scripts/lima-up.sh"
[tasks.lima-down]
run = """
@@ -69,9 +65,9 @@ run = "cd packages/cli && npm install -g"
description = "Build Go CLI binary and install to local bin"
run = """
mkdir -p ~/.local/bin
go build -trimpath -ldflags "-s -w" -o ~/.local/bin/container-cli ./cmd/cli
chmod +x ~/.local/bin/container-cli
echo "✓ CLI binary installed to ~/.local/bin/container-cli"
go build -trimpath -ldflags "-s -w" -o ~/.local/bin/memoh-cli ./cmd/cli
chmod +x ~/.local/bin/memoh-cli
echo "✓ CLI binary installed to ~/.local/bin/memoh-cli"
"""
[tasks.mcp-image-up]
+44
View File
@@ -0,0 +1,44 @@
#!/usr/bin/env sh
set -e
if [ "$(uname -s)" != "Darwin" ]; then
exit 0
fi
limactl start default
if ! limactl shell default -- sh -lc 'command -v memoh-cli >/dev/null 2>&1'; then
vm_arch=$(limactl shell default -- uname -m)
if [ "$vm_arch" = "aarch64" ] || [ "$vm_arch" = "arm64" ]; then
go_arch="arm64"
else
go_arch="amd64"
fi
bin_path="/tmp/memoh-cli-linux-$go_arch"
GOOS=linux GOARCH=$go_arch go build -trimpath -ldflags "-s -w" -o "$bin_path" ./cmd/cli
limactl shell default -- sudo -n mkdir -p /usr/local/bin
limactl shell default -- sudo -n tee /usr/local/bin/memoh-cli >/dev/null < "$bin_path"
limactl shell default -- sudo -n chmod +x /usr/local/bin/memoh-cli
fi
limactl shell default -- sh -lc 'command -v curl >/dev/null 2>&1' || {
echo "curl not found in Lima VM; install curl and rerun"
exit 1
}
limactl shell default -- sh -lc 'test -x /opt/cni/bin/bridge' || {
vm_arch=$(limactl shell default -- uname -m)
if [ "$vm_arch" = "aarch64" ] || [ "$vm_arch" = "arm64" ]; then
cni_arch="arm64"
else
cni_arch="amd64"
fi
url="https://github.com/containernetworking/plugins/releases/download/v1.9.0/cni-plugins-linux-${cni_arch}-v1.9.0.tgz"
limactl shell default -- sudo -n mkdir -p /opt/cni/bin
limactl shell default -- sudo -n curl -L -o /tmp/cni-plugins.tgz "$url"
limactl shell default -- sudo -n tar -C /opt/cni/bin -xzf /tmp/cni-plugins.tgz
}
limactl shell default -- sudo -n mkdir -p /etc/cni/net.d
limactl shell default -- sudo -n sh -lc 'test -f /etc/cni/net.d/10-memoh-bridge.conflist' || \
limactl shell default -- sudo -n sh -lc 'printf "%s\n" "{" " \"cniVersion\": \"0.4.0\"," " \"name\": \"memoh-bridge\"," " \"plugins\": [" " {" " \"type\": \"bridge\"," " \"bridge\": \"cni0\"," " \"isGateway\": true," " \"ipMasq\": true," " \"promiscMode\": false," " \"hairpinMode\": true," " \"ipam\": {" " \"type\": \"host-local\"," " \"subnet\": \"10.88.0.0/16\"," " \"routes\": [" " {\"dst\": \"0.0.0.0/0\"}" " ]" " }" " }," " {\"type\": \"portmap\", \"capabilities\": {\"portMappings\": true}}," " {\"type\": \"firewall\"}," " {\"type\": \"tuning\"}" " ]" "}" > /etc/cni/net.d/10-memoh-bridge.conflist'