diff --git a/devenv/server-entrypoint.sh b/devenv/server-entrypoint.sh index e2587438..69e03bfc 100644 --- a/devenv/server-entrypoint.sh +++ b/devenv/server-entrypoint.sh @@ -1,6 +1,12 @@ #!/bin/sh set -e +# Clean up stale CNI state from previous runs. After a container restart the +# cni0 bridge may linger with a zeroed MAC (00:00:00:00:00:00), causing the +# bridge plugin to fail with "could not set bridge's mac: invalid argument". +ip link delete cni0 2>/dev/null || true +rm -rf /var/lib/cni/networks/* /var/lib/cni/results/* 2>/dev/null || true + # Ensure IP forwarding and subnet MASQUERADE for CNI. sysctl -w net.ipv4.ip_forward=1 2>/dev/null || true iptables -t nat -C POSTROUTING -s 10.88.0.0/16 ! -o cni0 -j MASQUERADE 2>/dev/null || \ diff --git a/docker/server-entrypoint.sh b/docker/server-entrypoint.sh index fd6bc15c..6dbf1df1 100644 --- a/docker/server-entrypoint.sh +++ b/docker/server-entrypoint.sh @@ -3,6 +3,12 @@ set -e MCP_IMAGE="${MCP_IMAGE:-docker.io/library/memoh-mcp:latest}" +# ---- Clean up stale CNI state from previous runs ---- +# After a container restart the cni0 bridge may linger with a zeroed MAC +# (00:00:00:00:00:00), causing "could not set bridge's mac: invalid argument". +ip link delete cni0 2>/dev/null || true +rm -rf /var/lib/cni/networks/* /var/lib/cni/results/* 2>/dev/null || true + # ---- Ensure IP forwarding and subnet MASQUERADE for CNI ---- sysctl -w net.ipv4.ip_forward=1 2>/dev/null || true iptables -t nat -C POSTROUTING -s 10.88.0.0/16 ! -o cni0 -j MASQUERADE 2>/dev/null || \ diff --git a/internal/containerd/network.go b/internal/containerd/network.go index b08b7be3..31074568 100644 --- a/internal/containerd/network.go +++ b/internal/containerd/network.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "os/exec" "path/filepath" "strconv" "strings" @@ -51,19 +52,26 @@ func setupCNINetwork(ctx context.Context, task client.Task, containerID string, } result, err := cni.Setup(ctx, containerID, netnsPath) if err != nil { - if !isDuplicateAllocationError(err) && !isVethExistsError(err) { + retryable := isDuplicateAllocationError(err) || isVethExistsError(err) || isBridgeMACError(err) + if !retryable { return "", err } - // Stale IPAM allocation or veth exists (e.g. after container restart with persisted - // /var/lib/cni). Remove may fail if the previous iptables/veth state - // is already gone; ignore the error so the retry Setup still runs. + if isBridgeMACError(err) { + // Stale bridge with zeroed MAC after container restart; delete it so + // the plugin can recreate a healthy one. + _ = exec.CommandContext(ctx, "ip", "link", "delete", "cni0").Run() + } _ = cni.Remove(ctx, containerID, netnsPath) result, err = cni.Setup(ctx, containerID, netnsPath) if err != nil { return "", err } } - return extractIP(result), nil + ip := extractIP(result) + if ip == "" { + return "", fmt.Errorf("cni setup returned no usable IP for %s", containerID) + } + return ip, nil } func extractIP(result *gocni.Result) string { @@ -139,3 +147,13 @@ func isVethExistsError(err error) bool { } return strings.Contains(err.Error(), "already exists") } + +// isBridgeMACError returns true if the CNI bridge plugin failed because the +// stale cni0 bridge has a zeroed MAC address (common after container restart). +func isBridgeMACError(err error) bool { + if err == nil { + return false + } + msg := err.Error() + return strings.Contains(msg, "set bridge") && strings.Contains(msg, "mac") +} diff --git a/internal/handlers/containerd.go b/internal/handlers/containerd.go index 188d60cc..1310ad2e 100644 --- a/internal/handlers/containerd.go +++ b/internal/handlers/containerd.go @@ -247,15 +247,8 @@ func (h *ContainerdHandler) ensureContainerAndTask(ctx context.Context, containe } if len(tasks) > 0 { if tasks[0].Status == ctr.TaskStatusRunning { - if netResult, netErr := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ - ContainerID: containerID, - CNIBinDir: h.cfg.CNIBinaryDir, - CNIConfDir: h.cfg.CNIConfigDir, - }); netErr != nil { - h.logger.Warn("network re-setup failed for running task", - slog.String("container_id", containerID), slog.Any("error", netErr)) - } else if netResult.IP != "" && h.manager != nil { - h.manager.SetContainerIP(botID, netResult.IP) + if err := h.setupNetworkOrFail(ctx, containerID, botID); err != nil { + return err } return nil } @@ -270,17 +263,37 @@ func (h *ContainerdHandler) ensureContainerAndTask(ctx context.Context, containe if err := h.service.StartContainer(ctx, containerID, nil); err != nil { return err } - if netResult, netErr := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ - ContainerID: containerID, - CNIBinDir: h.cfg.CNIBinaryDir, - CNIConfDir: h.cfg.CNIConfigDir, - }); netErr != nil { - h.logger.Warn("network setup failed, task kept running", - slog.String("container_id", containerID), slog.Any("error", netErr)) - } else if netResult.IP != "" && h.manager != nil { - h.manager.SetContainerIP(botID, netResult.IP) + return h.setupNetworkOrFail(ctx, containerID, botID) +} + +// setupNetworkOrFail attempts CNI network setup with one retry. Returns an error +// if no usable IP is obtained — callers must not silently ignore this. +func (h *ContainerdHandler) setupNetworkOrFail(ctx context.Context, containerID, botID string) error { + var lastErr error + for attempt := 0; attempt < 2; attempt++ { + netResult, err := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ + ContainerID: containerID, + CNIBinDir: h.cfg.CNIBinaryDir, + CNIConfDir: h.cfg.CNIConfigDir, + }) + if err != nil { + lastErr = err + h.logger.Warn("network setup attempt failed", + slog.String("container_id", containerID), + slog.Int("attempt", attempt+1), + slog.Any("error", err)) + continue + } + if netResult.IP == "" { + lastErr = fmt.Errorf("network setup returned no IP for %s", containerID) + continue + } + if h.manager != nil { + h.manager.SetContainerIP(botID, netResult.IP) + } + return nil } - return nil + return fmt.Errorf("network setup failed for container %s: %w", containerID, lastErr) } // botContainerID resolves container_id for a bot from the database. @@ -967,20 +980,15 @@ func (h *ContainerdHandler) ReconcileContainers(ctx context.Context) { slog.String("bot_id", botID), slog.Any("error", dbErr)) } } - if netResult, netErr := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ - ContainerID: containerID, - CNIBinDir: h.cfg.CNIBinaryDir, - CNIConfDir: h.cfg.CNIConfigDir, - }); netErr != nil { - h.logger.Warn("reconcile: network re-setup failed for running task", + if netErr := h.setupNetworkOrFail(ctx, containerID, botID); netErr != nil { + h.logger.Error("reconcile: network setup failed for running task, container unreachable", slog.String("bot_id", botID), slog.String("container_id", containerID), slog.Any("error", netErr)) - } else if netResult.IP != "" && h.manager != nil { - h.manager.SetContainerIP(botID, netResult.IP) + } else { + h.logger.Info("reconcile: container healthy", + slog.String("bot_id", botID), slog.String("container_id", containerID)) } - h.logger.Info("reconcile: container healthy", - slog.String("bot_id", botID), slog.String("container_id", containerID)) continue } diff --git a/internal/mcp/manager.go b/internal/mcp/manager.go index e6626635..51d553f2 100644 --- a/internal/mcp/manager.go +++ b/internal/mcp/manager.go @@ -116,34 +116,39 @@ func (m *Manager) SetContainerIP(botID, ip string) { } // recoverContainerIP attempts to restore the container IP by re-running CNI setup. -// CNI plugins are idempotent - calling Setup again returns the existing IP allocation. +// CNI plugins are idempotent — calling Setup again returns the existing IP allocation. +// Retries up to 2 times to tolerate transient CNI failures (IPAM lock contention, etc.). func (m *Manager) recoverContainerIP(botID string) (string, error) { ctx := context.Background() containerID := m.containerID(botID) - // First check if container exists and get basic info info, err := m.service.GetContainer(ctx, containerID) if err != nil { return "", err } - // Check if IP is stored in labels (if we ever add label persistence) if ip, ok := info.Labels["mcp.container_ip"]; ok { return ip, nil } - // Container exists but IP not cached - need to re-setup network to get IP - // This happens after server restart when in-memory cache is lost - netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ - ContainerID: containerID, - CNIBinDir: m.cfg.CNIBinaryDir, - CNIConfDir: m.cfg.CNIConfigDir, - }) - if err != nil { - return "", fmt.Errorf("network setup for IP recovery: %w", err) + const maxAttempts = 2 + var lastErr error + for i := 0; i < maxAttempts; i++ { + netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ + ContainerID: containerID, + CNIBinDir: m.cfg.CNIBinaryDir, + CNIConfDir: m.cfg.CNIConfigDir, + }) + if err != nil { + lastErr = err + m.logger.Warn("IP recovery attempt failed", + slog.String("bot_id", botID), slog.Int("attempt", i+1), slog.Any("error", err)) + time.Sleep(time.Duration(i+1) * 500 * time.Millisecond) + continue + } + return netResult.IP, nil } - - return netResult.IP, nil + return "", fmt.Errorf("network setup for IP recovery after %d attempts: %w", maxAttempts, lastErr) } // MCPClient returns a gRPC client for the given bot's container. @@ -295,12 +300,14 @@ func (m *Manager) Start(ctx context.Context, botID string) error { } return err } - if netResult.IP != "" { - m.mu.Lock() - m.containerIPs[botID] = netResult.IP - m.mu.Unlock() - m.logger.Info("container network ready", slog.String("bot_id", botID), slog.String("ip", netResult.IP)) + if netResult.IP == "" { + if stopErr := m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}); stopErr != nil { + m.logger.Warn("cleanup: stop task failed", slog.String("container_id", m.containerID(botID)), slog.Any("error", stopErr)) + } + return fmt.Errorf("network setup returned no IP for bot %s", botID) } + m.SetContainerIP(botID, netResult.IP) + m.logger.Info("container network ready", slog.String("bot_id", botID), slog.String("ip", netResult.IP)) return nil } diff --git a/internal/mcp/mcpclient/client.go b/internal/mcp/mcpclient/client.go index f8b9728b..c1d8e12c 100644 --- a/internal/mcp/mcpclient/client.go +++ b/internal/mcp/mcpclient/client.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "sync" + "time" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" @@ -20,11 +21,14 @@ import ( pb "github.com/memohai/memoh/internal/mcp/mcpcontainer" ) +const connectingTimeout = 30 * time.Second + // Client wraps a gRPC connection to a single MCP container. type Client struct { - conn *grpc.ClientConn - svc pb.ContainerServiceClient - target string + conn *grpc.ClientConn + svc pb.ContainerServiceClient + target string + createdAt time.Time } // NewClientFromConn wraps an existing gRPC connection into a Client. @@ -47,9 +51,10 @@ func Dial(_ context.Context, ip string) (*Client, error) { return nil, fmt.Errorf("grpc dial %s: %w", target, err) } return &Client{ - conn: conn, - svc: pb.NewContainerServiceClient(conn), - target: target, + conn: conn, + svc: pb.NewContainerServiceClient(conn), + target: target, + createdAt: time.Now(), }, nil } @@ -308,12 +313,14 @@ func (p *Pool) MCPClient(ctx context.Context, botID string) (*Client, error) { } // Get returns a cached client or dials a new one. -// Stale connections (Shutdown / TransientFailure) are evicted automatically. +// Stale connections (Shutdown / TransientFailure / stuck Connecting) are evicted automatically. func (p *Pool) Get(ctx context.Context, botID string) (*Client, error) { p.mu.RLock() if c, ok := p.clients[botID]; ok { state := c.conn.GetState() - if state != connectivity.Shutdown && state != connectivity.TransientFailure { + stale := state == connectivity.Shutdown || state == connectivity.TransientFailure || + (state == connectivity.Connecting && time.Since(c.createdAt) > connectingTimeout) + if !stale { p.mu.RUnlock() return c, nil } diff --git a/internal/mcp/versioning.go b/internal/mcp/versioning.go index 8775aee7..17505ffb 100644 --- a/internal/mcp/versioning.go +++ b/internal/mcp/versioning.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "log/slog" "math" "strings" "time" @@ -392,16 +391,18 @@ func (m *Manager) replaceContainerSnapshot(ctx context.Context, botID, container // unconditionally so the next call dials fresh to the new process. m.grpcPool.Remove(botID) - if netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ + netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ ContainerID: containerID, CNIBinDir: m.cfg.CNIBinaryDir, CNIConfDir: m.cfg.CNIConfigDir, - }); err != nil { - m.logger.Warn("network setup failed after snapshot replace", - slog.String("container_id", containerID), slog.Any("error", err)) - } else { - m.SetContainerIP(botID, netResult.IP) + }) + if err != nil { + return fmt.Errorf("network setup after snapshot replace: %w", err) } + if netResult.IP == "" { + return fmt.Errorf("network setup returned no IP after snapshot replace for %s", containerID) + } + m.SetContainerIP(botID, netResult.IP) return nil }