mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-25 07:00:48 +09:00
fix(containerd): prevent silent network failures from leaving containers unreachable (#202)
* fix(containerd): prevent silent network failures from leaving containers unreachable Container network setup failures were silently swallowed at multiple points in the call chain, leaving containers in a "running but unreachable" ghost state. This patch closes every silent-failure path: - setupCNINetwork: return error when CNI yields no usable IP - Manager.Start: roll back container when IP is empty instead of returning success - ensureContainerAndTask: extract setupNetworkOrFail with 1 retry, propagate error to callers - ReconcileContainers: stop reporting "healthy" when network setup fails - recoverContainerIP: retry up to 2 times with backoff for transient CNI failures (IPAM lock contention, etc.) - gRPC Pool: evict connections stuck in Connecting state for >30s * fix(containerd): clean stale cni0 bridge on startup to prevent MAC error After a Docker container restart, the cni0 bridge interface can linger with a zeroed MAC (00:00:00:00:00:00) and DOWN state. The CNI bridge plugin then fails with "could not set bridge's mac: invalid argument", making all MCP containers unreachable. Two-layer fix: - Entrypoint: delete cni0 and flush IPAM state before starting containerd - Go: detect bridge MAC errors in setupCNINetwork and auto-delete cni0 before retrying, as defense-in-depth for runtime recovery * fix(containerd): use exec.CommandContext to satisfy noctx linter
This commit is contained in:
@@ -1,6 +1,12 @@
|
||||
#!/bin/sh
|
||||
set -e
|
||||
|
||||
# Clean up stale CNI state from previous runs. After a container restart the
|
||||
# cni0 bridge may linger with a zeroed MAC (00:00:00:00:00:00), causing the
|
||||
# bridge plugin to fail with "could not set bridge's mac: invalid argument".
|
||||
ip link delete cni0 2>/dev/null || true
|
||||
rm -rf /var/lib/cni/networks/* /var/lib/cni/results/* 2>/dev/null || true
|
||||
|
||||
# Ensure IP forwarding and subnet MASQUERADE for CNI.
|
||||
sysctl -w net.ipv4.ip_forward=1 2>/dev/null || true
|
||||
iptables -t nat -C POSTROUTING -s 10.88.0.0/16 ! -o cni0 -j MASQUERADE 2>/dev/null || \
|
||||
|
||||
@@ -3,6 +3,12 @@ set -e
|
||||
|
||||
MCP_IMAGE="${MCP_IMAGE:-docker.io/library/memoh-mcp:latest}"
|
||||
|
||||
# ---- Clean up stale CNI state from previous runs ----
|
||||
# After a container restart the cni0 bridge may linger with a zeroed MAC
|
||||
# (00:00:00:00:00:00), causing "could not set bridge's mac: invalid argument".
|
||||
ip link delete cni0 2>/dev/null || true
|
||||
rm -rf /var/lib/cni/networks/* /var/lib/cni/results/* 2>/dev/null || true
|
||||
|
||||
# ---- Ensure IP forwarding and subnet MASQUERADE for CNI ----
|
||||
sysctl -w net.ipv4.ip_forward=1 2>/dev/null || true
|
||||
iptables -t nat -C POSTROUTING -s 10.88.0.0/16 ! -o cni0 -j MASQUERADE 2>/dev/null || \
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -51,19 +52,26 @@ func setupCNINetwork(ctx context.Context, task client.Task, containerID string,
|
||||
}
|
||||
result, err := cni.Setup(ctx, containerID, netnsPath)
|
||||
if err != nil {
|
||||
if !isDuplicateAllocationError(err) && !isVethExistsError(err) {
|
||||
retryable := isDuplicateAllocationError(err) || isVethExistsError(err) || isBridgeMACError(err)
|
||||
if !retryable {
|
||||
return "", err
|
||||
}
|
||||
// Stale IPAM allocation or veth exists (e.g. after container restart with persisted
|
||||
// /var/lib/cni). Remove may fail if the previous iptables/veth state
|
||||
// is already gone; ignore the error so the retry Setup still runs.
|
||||
if isBridgeMACError(err) {
|
||||
// Stale bridge with zeroed MAC after container restart; delete it so
|
||||
// the plugin can recreate a healthy one.
|
||||
_ = exec.CommandContext(ctx, "ip", "link", "delete", "cni0").Run()
|
||||
}
|
||||
_ = cni.Remove(ctx, containerID, netnsPath)
|
||||
result, err = cni.Setup(ctx, containerID, netnsPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return extractIP(result), nil
|
||||
ip := extractIP(result)
|
||||
if ip == "" {
|
||||
return "", fmt.Errorf("cni setup returned no usable IP for %s", containerID)
|
||||
}
|
||||
return ip, nil
|
||||
}
|
||||
|
||||
func extractIP(result *gocni.Result) string {
|
||||
@@ -139,3 +147,13 @@ func isVethExistsError(err error) bool {
|
||||
}
|
||||
return strings.Contains(err.Error(), "already exists")
|
||||
}
|
||||
|
||||
// isBridgeMACError returns true if the CNI bridge plugin failed because the
|
||||
// stale cni0 bridge has a zeroed MAC address (common after container restart).
|
||||
func isBridgeMACError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
msg := err.Error()
|
||||
return strings.Contains(msg, "set bridge") && strings.Contains(msg, "mac")
|
||||
}
|
||||
|
||||
@@ -247,15 +247,8 @@ func (h *ContainerdHandler) ensureContainerAndTask(ctx context.Context, containe
|
||||
}
|
||||
if len(tasks) > 0 {
|
||||
if tasks[0].Status == ctr.TaskStatusRunning {
|
||||
if netResult, netErr := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
|
||||
ContainerID: containerID,
|
||||
CNIBinDir: h.cfg.CNIBinaryDir,
|
||||
CNIConfDir: h.cfg.CNIConfigDir,
|
||||
}); netErr != nil {
|
||||
h.logger.Warn("network re-setup failed for running task",
|
||||
slog.String("container_id", containerID), slog.Any("error", netErr))
|
||||
} else if netResult.IP != "" && h.manager != nil {
|
||||
h.manager.SetContainerIP(botID, netResult.IP)
|
||||
if err := h.setupNetworkOrFail(ctx, containerID, botID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -270,17 +263,37 @@ func (h *ContainerdHandler) ensureContainerAndTask(ctx context.Context, containe
|
||||
if err := h.service.StartContainer(ctx, containerID, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
if netResult, netErr := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
|
||||
return h.setupNetworkOrFail(ctx, containerID, botID)
|
||||
}
|
||||
|
||||
// setupNetworkOrFail attempts CNI network setup with one retry. Returns an error
|
||||
// if no usable IP is obtained — callers must not silently ignore this.
|
||||
func (h *ContainerdHandler) setupNetworkOrFail(ctx context.Context, containerID, botID string) error {
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < 2; attempt++ {
|
||||
netResult, err := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
|
||||
ContainerID: containerID,
|
||||
CNIBinDir: h.cfg.CNIBinaryDir,
|
||||
CNIConfDir: h.cfg.CNIConfigDir,
|
||||
}); netErr != nil {
|
||||
h.logger.Warn("network setup failed, task kept running",
|
||||
slog.String("container_id", containerID), slog.Any("error", netErr))
|
||||
} else if netResult.IP != "" && h.manager != nil {
|
||||
})
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
h.logger.Warn("network setup attempt failed",
|
||||
slog.String("container_id", containerID),
|
||||
slog.Int("attempt", attempt+1),
|
||||
slog.Any("error", err))
|
||||
continue
|
||||
}
|
||||
if netResult.IP == "" {
|
||||
lastErr = fmt.Errorf("network setup returned no IP for %s", containerID)
|
||||
continue
|
||||
}
|
||||
if h.manager != nil {
|
||||
h.manager.SetContainerIP(botID, netResult.IP)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("network setup failed for container %s: %w", containerID, lastErr)
|
||||
}
|
||||
|
||||
// botContainerID resolves container_id for a bot from the database.
|
||||
@@ -967,20 +980,15 @@ func (h *ContainerdHandler) ReconcileContainers(ctx context.Context) {
|
||||
slog.String("bot_id", botID), slog.Any("error", dbErr))
|
||||
}
|
||||
}
|
||||
if netResult, netErr := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
|
||||
ContainerID: containerID,
|
||||
CNIBinDir: h.cfg.CNIBinaryDir,
|
||||
CNIConfDir: h.cfg.CNIConfigDir,
|
||||
}); netErr != nil {
|
||||
h.logger.Warn("reconcile: network re-setup failed for running task",
|
||||
if netErr := h.setupNetworkOrFail(ctx, containerID, botID); netErr != nil {
|
||||
h.logger.Error("reconcile: network setup failed for running task, container unreachable",
|
||||
slog.String("bot_id", botID),
|
||||
slog.String("container_id", containerID),
|
||||
slog.Any("error", netErr))
|
||||
} else if netResult.IP != "" && h.manager != nil {
|
||||
h.manager.SetContainerIP(botID, netResult.IP)
|
||||
}
|
||||
} else {
|
||||
h.logger.Info("reconcile: container healthy",
|
||||
slog.String("bot_id", botID), slog.String("container_id", containerID))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
+19
-12
@@ -116,34 +116,39 @@ func (m *Manager) SetContainerIP(botID, ip string) {
|
||||
}
|
||||
|
||||
// recoverContainerIP attempts to restore the container IP by re-running CNI setup.
|
||||
// CNI plugins are idempotent - calling Setup again returns the existing IP allocation.
|
||||
// CNI plugins are idempotent — calling Setup again returns the existing IP allocation.
|
||||
// Retries up to 2 times to tolerate transient CNI failures (IPAM lock contention, etc.).
|
||||
func (m *Manager) recoverContainerIP(botID string) (string, error) {
|
||||
ctx := context.Background()
|
||||
containerID := m.containerID(botID)
|
||||
|
||||
// First check if container exists and get basic info
|
||||
info, err := m.service.GetContainer(ctx, containerID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Check if IP is stored in labels (if we ever add label persistence)
|
||||
if ip, ok := info.Labels["mcp.container_ip"]; ok {
|
||||
return ip, nil
|
||||
}
|
||||
|
||||
// Container exists but IP not cached - need to re-setup network to get IP
|
||||
// This happens after server restart when in-memory cache is lost
|
||||
const maxAttempts = 2
|
||||
var lastErr error
|
||||
for i := 0; i < maxAttempts; i++ {
|
||||
netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
|
||||
ContainerID: containerID,
|
||||
CNIBinDir: m.cfg.CNIBinaryDir,
|
||||
CNIConfDir: m.cfg.CNIConfigDir,
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("network setup for IP recovery: %w", err)
|
||||
lastErr = err
|
||||
m.logger.Warn("IP recovery attempt failed",
|
||||
slog.String("bot_id", botID), slog.Int("attempt", i+1), slog.Any("error", err))
|
||||
time.Sleep(time.Duration(i+1) * 500 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
return netResult.IP, nil
|
||||
}
|
||||
return "", fmt.Errorf("network setup for IP recovery after %d attempts: %w", maxAttempts, lastErr)
|
||||
}
|
||||
|
||||
// MCPClient returns a gRPC client for the given bot's container.
|
||||
@@ -295,12 +300,14 @@ func (m *Manager) Start(ctx context.Context, botID string) error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
if netResult.IP != "" {
|
||||
m.mu.Lock()
|
||||
m.containerIPs[botID] = netResult.IP
|
||||
m.mu.Unlock()
|
||||
m.logger.Info("container network ready", slog.String("bot_id", botID), slog.String("ip", netResult.IP))
|
||||
if netResult.IP == "" {
|
||||
if stopErr := m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}); stopErr != nil {
|
||||
m.logger.Warn("cleanup: stop task failed", slog.String("container_id", m.containerID(botID)), slog.Any("error", stopErr))
|
||||
}
|
||||
return fmt.Errorf("network setup returned no IP for bot %s", botID)
|
||||
}
|
||||
m.SetContainerIP(botID, netResult.IP)
|
||||
m.logger.Info("container network ready", slog.String("bot_id", botID), slog.String("ip", netResult.IP))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
@@ -20,11 +21,14 @@ import (
|
||||
pb "github.com/memohai/memoh/internal/mcp/mcpcontainer"
|
||||
)
|
||||
|
||||
const connectingTimeout = 30 * time.Second
|
||||
|
||||
// Client wraps a gRPC connection to a single MCP container.
|
||||
type Client struct {
|
||||
conn *grpc.ClientConn
|
||||
svc pb.ContainerServiceClient
|
||||
target string
|
||||
createdAt time.Time
|
||||
}
|
||||
|
||||
// NewClientFromConn wraps an existing gRPC connection into a Client.
|
||||
@@ -50,6 +54,7 @@ func Dial(_ context.Context, ip string) (*Client, error) {
|
||||
conn: conn,
|
||||
svc: pb.NewContainerServiceClient(conn),
|
||||
target: target,
|
||||
createdAt: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -333,12 +338,14 @@ func (p *Pool) MCPClient(ctx context.Context, botID string) (*Client, error) {
|
||||
}
|
||||
|
||||
// Get returns a cached client or dials a new one.
|
||||
// Stale connections (Shutdown / TransientFailure) are evicted automatically.
|
||||
// Stale connections (Shutdown / TransientFailure / stuck Connecting) are evicted automatically.
|
||||
func (p *Pool) Get(ctx context.Context, botID string) (*Client, error) {
|
||||
p.mu.RLock()
|
||||
if c, ok := p.clients[botID]; ok {
|
||||
state := c.conn.GetState()
|
||||
if state != connectivity.Shutdown && state != connectivity.TransientFailure {
|
||||
stale := state == connectivity.Shutdown || state == connectivity.TransientFailure ||
|
||||
(state == connectivity.Connecting && time.Since(c.createdAt) > connectingTimeout)
|
||||
if !stale {
|
||||
p.mu.RUnlock()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user