mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
dae772f729
* fix(containerd): prevent silent network failures from leaving containers unreachable (#202) * fix(containerd): prevent silent network failures from leaving containers unreachable Container network setup failures were silently swallowed at multiple points in the call chain, leaving containers in a "running but unreachable" ghost state. This patch closes every silent-failure path: - setupCNINetwork: return error when CNI yields no usable IP - Manager.Start: roll back container when IP is empty instead of returning success - ensureContainerAndTask: extract setupNetworkOrFail with 1 retry, propagate error to callers - ReconcileContainers: stop reporting "healthy" when network setup fails - recoverContainerIP: retry up to 2 times with backoff for transient CNI failures (IPAM lock contention, etc.) - gRPC Pool: evict connections stuck in Connecting state for >30s * fix(containerd): clean stale cni0 bridge on startup to prevent MAC error After a Docker container restart, the cni0 bridge interface can linger with a zeroed MAC (00:00:00:00:00:00) and DOWN state. The CNI bridge plugin then fails with "could not set bridge's mac: invalid argument", making all MCP containers unreachable. Two-layer fix: - Entrypoint: delete cni0 and flush IPAM state before starting containerd - Go: detect bridge MAC errors in setupCNINetwork and auto-delete cni0 before retrying, as defense-in-depth for runtime recovery * fix(containerd): use exec.CommandContext to satisfy noctx linter * fix(mcp): propagate network errors from replaceContainerSnapshot Network setup failure after snapshot replace (rollback/commit) was silently swallowed — the container would start but remain unreachable via gRPC. Return the error so callers (CreateSnapshot, RollbackVersion, etc.) surface the failure instead of reporting success.
389 lines
11 KiB
Go
389 lines
11 KiB
Go
package mcp
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/containerd/errdefs"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/memohai/memoh/internal/config"
|
|
ctr "github.com/memohai/memoh/internal/containerd"
|
|
dbsqlc "github.com/memohai/memoh/internal/db/sqlc"
|
|
"github.com/memohai/memoh/internal/identity"
|
|
"github.com/memohai/memoh/internal/mcp/mcpclient"
|
|
)
|
|
|
|
const (
|
|
BotLabelKey = "mcp.bot_id"
|
|
ContainerPrefix = "mcp-"
|
|
)
|
|
|
|
type Manager struct {
|
|
service ctr.Service
|
|
cfg config.MCPConfig
|
|
namespace string
|
|
containerID func(string) string
|
|
db *pgxpool.Pool
|
|
queries *dbsqlc.Queries
|
|
logger *slog.Logger
|
|
containerLockMu sync.Mutex
|
|
containerLocks map[string]*sync.Mutex
|
|
mu sync.RWMutex
|
|
containerIPs map[string]string
|
|
grpcPool *mcpclient.Pool
|
|
}
|
|
|
|
func NewManager(log *slog.Logger, service ctr.Service, cfg config.MCPConfig, namespace string, conn *pgxpool.Pool) *Manager {
|
|
if namespace == "" {
|
|
namespace = config.DefaultNamespace
|
|
}
|
|
m := &Manager{
|
|
service: service,
|
|
cfg: cfg,
|
|
namespace: namespace,
|
|
db: conn,
|
|
queries: dbsqlc.New(conn),
|
|
logger: log.With(slog.String("component", "mcp")),
|
|
containerLocks: make(map[string]*sync.Mutex),
|
|
containerIPs: make(map[string]string),
|
|
containerID: func(botID string) string {
|
|
return ContainerPrefix + botID
|
|
},
|
|
}
|
|
m.grpcPool = mcpclient.NewPool(m.ContainerIP)
|
|
return m
|
|
}
|
|
|
|
func (m *Manager) lockContainer(containerID string) func() {
|
|
m.containerLockMu.Lock()
|
|
lock, ok := m.containerLocks[containerID]
|
|
if !ok {
|
|
lock = &sync.Mutex{}
|
|
m.containerLocks[containerID] = lock
|
|
}
|
|
m.containerLockMu.Unlock()
|
|
|
|
lock.Lock()
|
|
return lock.Unlock
|
|
}
|
|
|
|
// ContainerIP returns the cached IP address for a bot's container.
|
|
// If not cached, it attempts to recover the IP by re-running CNI setup.
|
|
func (m *Manager) ContainerIP(botID string) string {
|
|
m.mu.RLock()
|
|
if ip, ok := m.containerIPs[botID]; ok {
|
|
m.mu.RUnlock()
|
|
return ip
|
|
}
|
|
m.mu.RUnlock()
|
|
|
|
// Cache miss - try to recover IP via CNI setup (idempotent)
|
|
ip, err := m.recoverContainerIP(botID)
|
|
if err != nil {
|
|
m.logger.Warn("container IP recovery failed", slog.String("bot_id", botID), slog.Any("error", err))
|
|
return ""
|
|
}
|
|
if ip != "" {
|
|
m.mu.Lock()
|
|
m.containerIPs[botID] = ip
|
|
m.mu.Unlock()
|
|
m.logger.Info("container IP recovered", slog.String("bot_id", botID), slog.String("ip", ip))
|
|
}
|
|
return ip
|
|
}
|
|
|
|
// SetContainerIP stores the container IP in the cache.
|
|
// If the IP changed, the stale gRPC connection is evicted from the pool.
|
|
func (m *Manager) SetContainerIP(botID, ip string) {
|
|
if ip == "" {
|
|
return
|
|
}
|
|
m.mu.Lock()
|
|
old := m.containerIPs[botID]
|
|
m.containerIPs[botID] = ip
|
|
m.mu.Unlock()
|
|
|
|
if old != "" && old != ip {
|
|
m.grpcPool.Remove(botID)
|
|
m.logger.Info("evicted stale gRPC connection", slog.String("bot_id", botID), slog.String("old_ip", old), slog.String("new_ip", ip))
|
|
}
|
|
}
|
|
|
|
// recoverContainerIP attempts to restore the container IP by re-running CNI setup.
|
|
// CNI plugins are idempotent — calling Setup again returns the existing IP allocation.
|
|
// Retries up to 2 times to tolerate transient CNI failures (IPAM lock contention, etc.).
|
|
func (m *Manager) recoverContainerIP(botID string) (string, error) {
|
|
ctx := context.Background()
|
|
containerID := m.containerID(botID)
|
|
|
|
info, err := m.service.GetContainer(ctx, containerID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if ip, ok := info.Labels["mcp.container_ip"]; ok {
|
|
return ip, nil
|
|
}
|
|
|
|
const maxAttempts = 2
|
|
var lastErr error
|
|
for i := 0; i < maxAttempts; i++ {
|
|
netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
|
|
ContainerID: containerID,
|
|
CNIBinDir: m.cfg.CNIBinaryDir,
|
|
CNIConfDir: m.cfg.CNIConfigDir,
|
|
})
|
|
if err != nil {
|
|
lastErr = err
|
|
m.logger.Warn("IP recovery attempt failed",
|
|
slog.String("bot_id", botID), slog.Int("attempt", i+1), slog.Any("error", err))
|
|
time.Sleep(time.Duration(i+1) * 500 * time.Millisecond)
|
|
continue
|
|
}
|
|
return netResult.IP, nil
|
|
}
|
|
return "", fmt.Errorf("network setup for IP recovery after %d attempts: %w", maxAttempts, lastErr)
|
|
}
|
|
|
|
// MCPClient returns a gRPC client for the given bot's container.
|
|
// Implements mcpclient.Provider.
|
|
func (m *Manager) MCPClient(ctx context.Context, botID string) (*mcpclient.Client, error) {
|
|
return m.grpcPool.Get(ctx, botID)
|
|
}
|
|
|
|
func (m *Manager) Init(ctx context.Context) error {
|
|
image := m.imageRef()
|
|
|
|
needsPull, remoteErr := m.checkImageUpgrade(ctx, image)
|
|
if remoteErr != nil {
|
|
// Remote check failed (network unavailable, registry down, etc.).
|
|
// Fall back to local image if available; fail only when nothing is cached.
|
|
m.logger.Warn("image upgrade check failed, falling back to local",
|
|
slog.String("image", image), slog.Any("error", remoteErr))
|
|
if _, err := m.service.GetImage(ctx, image); err != nil {
|
|
_, err = m.service.PullImage(ctx, image, &ctr.PullImageOptions{
|
|
Unpack: true,
|
|
Snapshotter: m.cfg.Snapshotter,
|
|
})
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if !needsPull {
|
|
return nil
|
|
}
|
|
|
|
m.logger.Info("pulling updated MCP image", slog.String("image", image))
|
|
if _, err := m.service.PullImage(ctx, image, &ctr.PullImageOptions{
|
|
Unpack: true,
|
|
Snapshotter: m.cfg.Snapshotter,
|
|
}); err != nil {
|
|
m.logger.Warn("image pull failed, using existing version", slog.Any("error", err))
|
|
if _, err2 := m.service.GetImage(ctx, image); err2 != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Existing bot containers keep running with their current image.
|
|
// New containers created after this point will use the updated image.
|
|
return nil
|
|
}
|
|
|
|
// checkImageUpgrade compares the local image digest against the remote registry.
|
|
// Returns (true, nil) when a newer image is available or no local image exists.
|
|
// Returns (false, err) when the remote cannot be reached.
|
|
func (m *Manager) checkImageUpgrade(ctx context.Context, image string) (needsPull bool, _ error) {
|
|
checkCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
defer cancel()
|
|
|
|
remoteDigest, err := m.service.ResolveRemoteDigest(checkCtx, image)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
localImg, err := m.service.GetImage(ctx, image)
|
|
if err != nil {
|
|
return true, nil // no local image
|
|
}
|
|
return localImg.ID != remoteDigest, nil
|
|
}
|
|
|
|
// EnsureBot creates the MCP container for a bot if it does not exist.
|
|
// Bot data lives in the container's writable layer (snapshot), not bind mounts.
|
|
func (m *Manager) EnsureBot(ctx context.Context, botID string) error {
|
|
if err := validateBotID(botID); err != nil {
|
|
return err
|
|
}
|
|
|
|
image := m.imageRef()
|
|
resolvPath, err := ctr.ResolveConfSource(m.dataRoot())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mounts := []ctr.MountSpec{
|
|
{
|
|
Destination: "/etc/resolv.conf",
|
|
Type: "bind",
|
|
Source: resolvPath,
|
|
Options: []string{"rbind", "ro"},
|
|
},
|
|
}
|
|
tzMounts, tzEnv := ctr.TimezoneSpec()
|
|
mounts = append(mounts, tzMounts...)
|
|
|
|
_, err = m.service.CreateContainer(ctx, ctr.CreateContainerRequest{
|
|
ID: m.containerID(botID),
|
|
ImageRef: image,
|
|
Snapshotter: m.cfg.Snapshotter,
|
|
Labels: map[string]string{
|
|
BotLabelKey: botID,
|
|
},
|
|
Spec: ctr.ContainerSpec{
|
|
Mounts: mounts,
|
|
Env: tzEnv,
|
|
},
|
|
})
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if !errdefs.IsAlreadyExists(err) {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListBots returns the bot IDs that have MCP containers.
|
|
func (m *Manager) ListBots(ctx context.Context) ([]string, error) {
|
|
containers, err := m.service.ListContainers(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
botIDs := make([]string, 0, len(containers))
|
|
for _, info := range containers {
|
|
if strings.HasPrefix(info.ID, ContainerPrefix) {
|
|
if botID, ok := info.Labels[BotLabelKey]; ok {
|
|
botIDs = append(botIDs, botID)
|
|
}
|
|
}
|
|
}
|
|
return botIDs, nil
|
|
}
|
|
|
|
func (m *Manager) Start(ctx context.Context, botID string) error {
|
|
if err := m.EnsureBot(ctx, botID); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := m.service.StartContainer(ctx, m.containerID(botID), nil); err != nil {
|
|
return err
|
|
}
|
|
netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
|
|
ContainerID: m.containerID(botID),
|
|
CNIBinDir: m.cfg.CNIBinaryDir,
|
|
CNIConfDir: m.cfg.CNIConfigDir,
|
|
})
|
|
if err != nil {
|
|
if stopErr := m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}); stopErr != nil {
|
|
m.logger.Warn("cleanup: stop task failed", slog.String("container_id", m.containerID(botID)), slog.Any("error", stopErr))
|
|
}
|
|
return err
|
|
}
|
|
if netResult.IP == "" {
|
|
if stopErr := m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}); stopErr != nil {
|
|
m.logger.Warn("cleanup: stop task failed", slog.String("container_id", m.containerID(botID)), slog.Any("error", stopErr))
|
|
}
|
|
return fmt.Errorf("network setup returned no IP for bot %s", botID)
|
|
}
|
|
m.SetContainerIP(botID, netResult.IP)
|
|
m.logger.Info("container network ready", slog.String("bot_id", botID), slog.String("ip", netResult.IP))
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) Stop(ctx context.Context, botID string, timeout time.Duration) error {
|
|
if err := validateBotID(botID); err != nil {
|
|
return err
|
|
}
|
|
return m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{
|
|
Timeout: timeout,
|
|
Force: true,
|
|
})
|
|
}
|
|
|
|
func (m *Manager) Delete(ctx context.Context, botID string, preserveData bool) error {
|
|
if err := validateBotID(botID); err != nil {
|
|
return err
|
|
}
|
|
|
|
containerID := m.containerID(botID)
|
|
stoppedForPreserve := false
|
|
|
|
if preserveData {
|
|
info, err := m.service.GetContainer(ctx, containerID)
|
|
if err != nil {
|
|
return fmt.Errorf("get container for preserve: %w", err)
|
|
}
|
|
if _, err := m.snapshotMounts(ctx, info); errors.Is(err, errMountNotSupported) {
|
|
// Apple backend fallback uses gRPC against a running container.
|
|
} else if err != nil {
|
|
return err
|
|
} else {
|
|
if err := m.safeStopTask(ctx, containerID); err != nil {
|
|
return fmt.Errorf("stop for data preserve: %w", err)
|
|
}
|
|
stoppedForPreserve = true
|
|
}
|
|
|
|
if err := m.PreserveData(ctx, botID); err != nil {
|
|
// Export failed — restart only if we stopped the task, and abort
|
|
// deletion to prevent data loss.
|
|
if stoppedForPreserve {
|
|
m.restartContainer(ctx, botID, containerID)
|
|
}
|
|
return fmt.Errorf("preserve data: %w", err)
|
|
}
|
|
}
|
|
|
|
m.grpcPool.Remove(botID)
|
|
|
|
if err := m.service.RemoveNetwork(ctx, ctr.NetworkSetupRequest{
|
|
ContainerID: containerID,
|
|
CNIBinDir: m.cfg.CNIBinaryDir,
|
|
CNIConfDir: m.cfg.CNIConfigDir,
|
|
}); err != nil {
|
|
m.logger.Warn("cleanup: remove network failed", slog.String("container_id", containerID), slog.Any("error", err))
|
|
}
|
|
if err := m.service.DeleteTask(ctx, containerID, &ctr.DeleteTaskOptions{Force: true}); err != nil {
|
|
m.logger.Warn("cleanup: delete task failed", slog.String("container_id", containerID), slog.Any("error", err))
|
|
}
|
|
return m.service.DeleteContainer(ctx, containerID, &ctr.DeleteContainerOptions{
|
|
CleanupSnapshot: true,
|
|
})
|
|
}
|
|
|
|
func (m *Manager) dataRoot() string {
|
|
if m.cfg.DataRoot == "" {
|
|
return config.DefaultDataRoot
|
|
}
|
|
return m.cfg.DataRoot
|
|
}
|
|
|
|
func (m *Manager) imageRef() string {
|
|
return m.cfg.ImageRef()
|
|
}
|
|
|
|
func validateBotID(botID string) error {
|
|
return identity.ValidateChannelIdentityID(botID)
|
|
}
|