From ee587b8ef5b387466133b92e1e1475617c8d54bd Mon Sep 17 00:00:00 2001 From: BBQ <35603386+HoneyBBQ@users.noreply.github.com> Date: Tue, 3 Mar 2026 15:59:57 +0800 Subject: [PATCH] fix(mcp): fix snapshot management and encapsulate locking (#169) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix DeleteContainer FAILED_PRECONDITION by cleaning up stopped task entries before container deletion - Fix CreateSnapshot leaving container in broken state: commit turns the active snapshot read-only, so the full cycle (stop → commit → prepare → delete → recreate → start) is now applied consistently - Use context.WithoutCancel for atomic container replacement sequences to prevent cancelled HTTP requests from corrupting container state - Use dctx for DB operations (recordSnapshotVersion/insertEvent) to avoid orphan snapshots in containerd without matching DB records - Restart task + network after snapshot replacement, fixing Exec after CreateVersion where the container had no running task - Extract replaceContainerSnapshot helper to deduplicate the prepare → delete → recreate → start pattern across three call sites - Move snapshot list data fetching into Manager.ListBotSnapshotData to encapsulate per-container locking; remove exported LockBot method - Use UnixNano for snapshot names to avoid second-precision collisions --- internal/containerd/service.go | 10 ++ internal/handlers/containerd.go | 88 +++++--------- internal/mcp/versioning.go | 197 +++++++++++++++++++++++--------- 3 files changed, 180 insertions(+), 115 deletions(-) diff --git a/internal/containerd/service.go b/internal/containerd/service.go index a82c51f1..5299a616 100644 --- a/internal/containerd/service.go +++ b/internal/containerd/service.go @@ -407,6 +407,16 @@ func (s *DefaultService) DeleteContainer(ctx context.Context, id string, opts *D return err } + // A stopped task still holds an entry in containerd; container.Delete fails + // with FAILED_PRECONDITION if any task entry exists. Delete it first. + if task, err := container.Task(ctx, nil); err == nil { + if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { + return err + } + } else if !errdefs.IsNotFound(err) { + return err + } + deleteOpts := []containerd.DeleteOpts{} cleanupSnapshot := true if opts != nil { diff --git a/internal/handlers/containerd.go b/internal/handlers/containerd.go index bd02263d..f6726ace 100644 --- a/internal/handlers/containerd.go +++ b/internal/handlers/containerd.go @@ -583,12 +583,11 @@ func (h *ContainerdHandler) ListSnapshots(c echo.Context) error { if err != nil { return err } - ctx := c.Request().Context() - containerID, err := h.botContainerID(ctx, botID) - if err != nil { - return echo.NewHTTPError(http.StatusNotFound, "container not found for bot") + if h.manager == nil { + return echo.NewHTTPError(http.StatusInternalServerError, "snapshot manager not configured") } - containerInfo, err := h.service.GetContainer(ctx, containerID) + + data, err := h.manager.ListBotSnapshotData(c.Request().Context(), botID) if err != nil { if errdefs.IsNotFound(err) { return echo.NewHTTPError(http.StatusNotFound, "container not found") @@ -596,80 +595,48 @@ func (h *ContainerdHandler) ListSnapshots(c echo.Context) error { return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) } - requestedSnapshotter := strings.TrimSpace(c.QueryParam("snapshotter")) - snapshotter := strings.TrimSpace(containerInfo.Snapshotter) - if requestedSnapshotter != "" { - if snapshotter != "" && requestedSnapshotter != snapshotter { - return echo.NewHTTPError(http.StatusBadRequest, "snapshotter does not match container snapshotter") - } - snapshotter = requestedSnapshotter + if req := strings.TrimSpace(c.QueryParam("snapshotter")); req != "" && req != data.Snapshotter { + return echo.NewHTTPError(http.StatusBadRequest, "snapshotter does not match container snapshotter") } - if snapshotter == "" { - snapshotter = strings.TrimSpace(h.cfg.Snapshotter) - } - if snapshotter == "" { - snapshotter = "overlayfs" - } - snapshotKey := strings.TrimSpace(containerInfo.SnapshotKey) + + snapshotKey := strings.TrimSpace(data.Info.SnapshotKey) if snapshotKey == "" { return echo.NewHTTPError(http.StatusInternalServerError, "container snapshot key is empty") } - allSnapshots, err := h.service.ListSnapshots(ctx, snapshotter) - if err != nil { - return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) - } - runtimeByName := make(map[string]ctr.SnapshotInfo, len(allSnapshots)) - for _, info := range allSnapshots { + runtimeByName := make(map[string]ctr.SnapshotInfo, len(data.RuntimeSnapshots)) + for _, info := range data.RuntimeSnapshots { name := strings.TrimSpace(info.Name) if name == "" { continue } runtimeByName[name] = info } - lineage, ok := snapshotLineage(snapshotKey, allSnapshots) + lineage, ok := snapshotLineage(snapshotKey, data.RuntimeSnapshots) if !ok { h.logger.Warn("container snapshot chain root not found", - slog.String("container_id", containerID), - slog.String("snapshotter", snapshotter), + slog.String("container_id", data.ContainerID), + slog.String("snapshotter", data.Snapshotter), slog.String("snapshot_key", snapshotKey), ) return echo.NewHTTPError(http.StatusInternalServerError, "container snapshot chain not found") } - metadataByName := map[string]dbsqlc.ListSnapshotsWithVersionByContainerIDRow{} - if h.queries != nil { - managedRows, dbErr := h.queries.ListSnapshotsWithVersionByContainerID(ctx, containerID) - if dbErr != nil { - return echo.NewHTTPError(http.StatusInternalServerError, dbErr.Error()) - } - for _, row := range managedRows { - name := strings.TrimSpace(row.RuntimeSnapshotName) - if name == "" { - continue - } - metadataByName[name] = row - } - } - - items := make([]SnapshotInfo, 0, len(lineage)+len(metadataByName)) - seen := make(map[string]struct{}, len(lineage)+len(metadataByName)) - appendRuntime := func(runtimeInfo ctr.SnapshotInfo, fallbackSource string, meta *dbsqlc.ListSnapshotsWithVersionByContainerIDRow) { + items := make([]SnapshotInfo, 0, len(lineage)+len(data.ManagedMeta)) + seen := make(map[string]struct{}, len(lineage)+len(data.ManagedMeta)) + appendRuntime := func(runtimeInfo ctr.SnapshotInfo, fallbackSource string, meta *mcp.ManagedSnapshotMeta) { source := fallbackSource managed := false var version *int if meta != nil { - if strings.TrimSpace(meta.Source) != "" { - source = strings.TrimSpace(meta.Source) + if meta.Source != "" { + source = meta.Source } managed = true - if meta.Version.Valid { - v := int(meta.Version.Int32) - version = &v - } + version = meta.Version } items = append(items, SnapshotInfo{ - Snapshotter: snapshotter, + Snapshotter: data.Snapshotter, Name: runtimeInfo.Name, Parent: runtimeInfo.Parent, Kind: runtimeInfo.Kind, @@ -685,28 +652,27 @@ func (h *ContainerdHandler) ListSnapshots(c echo.Context) error { for _, runtimeInfo := range lineage { name := strings.TrimSpace(runtimeInfo.Name) - row, hasMeta := metadataByName[name] - if hasMeta { - appendRuntime(runtimeInfo, "image_layer", &row) + if meta, hasMeta := data.ManagedMeta[name]; hasMeta { + appendRuntime(runtimeInfo, "image_layer", &meta) continue } appendRuntime(runtimeInfo, "image_layer", nil) } - for name, row := range metadataByName { + for name, meta := range data.ManagedMeta { if _, exists := seen[name]; exists { continue } runtimeInfo, exists := runtimeByName[name] if !exists { h.logger.Warn("managed snapshot not found in runtime", - slog.String("container_id", containerID), + slog.String("container_id", data.ContainerID), slog.String("snapshot_name", name), - slog.String("snapshotter", snapshotter), + slog.String("snapshotter", data.Snapshotter), ) continue } - appendRuntime(runtimeInfo, "managed", &row) + appendRuntime(runtimeInfo, "managed", &meta) } sort.Slice(items, func(i, j int) bool { if items[i].CreatedAt.Equal(items[j].CreatedAt) { @@ -715,7 +681,7 @@ func (h *ContainerdHandler) ListSnapshots(c echo.Context) error { return items[i].CreatedAt.Before(items[j].CreatedAt) }) return c.JSON(http.StatusOK, ListSnapshotsResponse{ - Snapshotter: snapshotter, + Snapshotter: data.Snapshotter, Snapshots: items, }) } diff --git a/internal/mcp/versioning.go b/internal/mcp/versioning.go index b4d5e65b..c1b04eed 100644 --- a/internal/mcp/versioning.go +++ b/internal/mcp/versioning.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "strings" "time" @@ -39,6 +40,19 @@ type SnapshotCreateInfo struct { CreatedAt time.Time } +type ManagedSnapshotMeta struct { + Source string + Version *int +} + +type BotSnapshotData struct { + ContainerID string + Info ctr.ContainerInfo + Snapshotter string + RuntimeSnapshots []ctr.SnapshotInfo + ManagedMeta map[string]ManagedSnapshotMeta +} + func (m *Manager) CreateSnapshot(ctx context.Context, botID, snapshotName, source string) (*SnapshotCreateInfo, error) { if m.db == nil || m.queries == nil { return nil, fmt.Errorf("db is not configured") @@ -61,16 +75,30 @@ func (m *Manager) CreateSnapshot(ctx context.Context, botID, snapshotName, sourc normalizedSnapshotName := strings.TrimSpace(snapshotName) if normalizedSnapshotName == "" { - normalizedSnapshotName = fmt.Sprintf("%s-%s", containerID, time.Now().Format("20060102150405")) + normalizedSnapshotName = fmt.Sprintf("%s-%d", containerID, time.Now().UnixNano()) } normalizedSource := normalizeSnapshotSource(source) - if err := m.service.CommitSnapshot(ctx, info.Snapshotter, normalizedSnapshotName, info.SnapshotKey); err != nil { + // The sequence below (stop → commit → replace → start) is atomic from the + // container's perspective: interrupting it mid-way leaves the container missing. + // Use a detached context so a cancelled HTTP request cannot break it. + dctx := context.WithoutCancel(ctx) + + if err := m.safeStopTask(dctx, containerID); err != nil { + return nil, err + } + + if err := m.service.CommitSnapshot(dctx, info.Snapshotter, normalizedSnapshotName, info.SnapshotKey); err != nil { + return nil, err + } + + activeSnapshotName := fmt.Sprintf("%s-active-%d", containerID, time.Now().UnixNano()) + if err := m.replaceContainerSnapshot(dctx, botID, containerID, info, activeSnapshotName, normalizedSnapshotName); err != nil { return nil, err } _, versionNumber, createdAt, err := m.recordSnapshotVersion( - ctx, + dctx, containerID, normalizedSnapshotName, info.SnapshotKey, @@ -80,7 +108,7 @@ func (m *Manager) CreateSnapshot(ctx context.Context, botID, snapshotName, sourc if err != nil { return nil, err } - if err := m.insertEvent(ctx, containerID, "snapshot_create", map[string]any{ + if err := m.insertEvent(dctx, containerID, "snapshot_create", map[string]any{ "snapshot_name": normalizedSnapshotName, "snapshotter": info.Snapshotter, "source": normalizedSource, @@ -119,43 +147,24 @@ func (m *Manager) CreateVersion(ctx context.Context, botID string) (*VersionInfo return nil, err } - if err := m.safeStopTask(ctx, containerID); err != nil { + dctx := context.WithoutCancel(ctx) + + if err := m.safeStopTask(dctx, containerID); err != nil { return nil, err } versionSnapshotName := fmt.Sprintf("%s-v%d", containerID, time.Now().UnixNano()) - if err := m.service.CommitSnapshot(ctx, info.Snapshotter, versionSnapshotName, info.SnapshotKey); err != nil { + if err := m.service.CommitSnapshot(dctx, info.Snapshotter, versionSnapshotName, info.SnapshotKey); err != nil { return nil, err } activeSnapshotName := fmt.Sprintf("%s-active-%d", containerID, time.Now().UnixNano()) - if err := m.service.PrepareSnapshot(ctx, info.Snapshotter, activeSnapshotName, versionSnapshotName); err != nil { - return nil, err - } - - if err := m.service.DeleteContainer(ctx, containerID, &ctr.DeleteContainerOptions{CleanupSnapshot: false}); err != nil { - return nil, err - } - - spec, err := m.buildVersionSpec(botID) - if err != nil { - return nil, err - } - - _, err = m.service.CreateContainerFromSnapshot(ctx, ctr.CreateContainerRequest{ - ID: containerID, - ImageRef: info.Image, - SnapshotID: activeSnapshotName, - Snapshotter: info.Snapshotter, - Labels: info.Labels, - Spec: spec, - }) - if err != nil { + if err := m.replaceContainerSnapshot(dctx, botID, containerID, info, activeSnapshotName, versionSnapshotName); err != nil { return nil, err } versionID, versionNumber, createdAt, err := m.recordSnapshotVersion( - ctx, + dctx, containerID, versionSnapshotName, info.SnapshotKey, @@ -166,7 +175,7 @@ func (m *Manager) CreateVersion(ctx context.Context, botID string) (*VersionInfo return nil, err } - if err := m.insertEvent(ctx, containerID, "version_create", map[string]any{ + if err := m.insertEvent(dctx, containerID, "version_create", map[string]any{ "snapshot_name": versionSnapshotName, "version": versionNumber, "version_id": versionID, @@ -182,6 +191,67 @@ func (m *Manager) CreateVersion(ctx context.Context, botID string) (*VersionInfo }, nil } +// ListBotSnapshotData returns the raw snapshot data for a bot under the +// per-container lock, so callers never observe transient state during +// snapshot/version operations. +func (m *Manager) ListBotSnapshotData(ctx context.Context, botID string) (*BotSnapshotData, error) { + if err := validateBotID(botID); err != nil { + return nil, err + } + + containerID := m.containerID(botID) + unlock := m.lockContainer(containerID) + defer unlock() + + info, err := m.service.GetContainer(ctx, containerID) + if err != nil { + return nil, err + } + + snapshotter := strings.TrimSpace(info.Snapshotter) + if snapshotter == "" { + snapshotter = m.cfg.Snapshotter + } + if snapshotter == "" { + snapshotter = "overlayfs" + } + + runtimeSnapshots, err := m.service.ListSnapshots(ctx, snapshotter) + if err != nil { + return nil, err + } + + managedMeta := make(map[string]ManagedSnapshotMeta) + if m.queries != nil { + rows, err := m.queries.ListSnapshotsWithVersionByContainerID(ctx, containerID) + if err != nil { + return nil, err + } + for _, row := range rows { + name := strings.TrimSpace(row.RuntimeSnapshotName) + if name == "" { + continue + } + meta := ManagedSnapshotMeta{ + Source: strings.TrimSpace(row.Source), + } + if row.Version.Valid { + v := int(row.Version.Int32) + meta.Version = &v + } + managedMeta[name] = meta + } + } + + return &BotSnapshotData{ + ContainerID: containerID, + Info: info, + Snapshotter: snapshotter, + RuntimeSnapshots: runtimeSnapshots, + ManagedMeta: managedMeta, + }, nil +} + func (m *Manager) ListVersions(ctx context.Context, botID string) ([]VersionInfo, error) { if m.db == nil || m.queries == nil { return nil, fmt.Errorf("db is not configured") @@ -237,37 +307,18 @@ func (m *Manager) RollbackVersion(ctx context.Context, botID string, version int return err } - if err := m.safeStopTask(ctx, containerID); err != nil { + dctx := context.WithoutCancel(ctx) + + if err := m.safeStopTask(dctx, containerID); err != nil { return err } activeSnapshotName := fmt.Sprintf("%s-rollback-%d", containerID, time.Now().UnixNano()) - if err := m.service.PrepareSnapshot(ctx, info.Snapshotter, activeSnapshotName, snapshotName); err != nil { + if err := m.replaceContainerSnapshot(dctx, botID, containerID, info, activeSnapshotName, snapshotName); err != nil { return err } - if err := m.service.DeleteContainer(ctx, containerID, &ctr.DeleteContainerOptions{CleanupSnapshot: false}); err != nil { - return err - } - - spec, err := m.buildVersionSpec(botID) - if err != nil { - return err - } - - _, err = m.service.CreateContainerFromSnapshot(ctx, ctr.CreateContainerRequest{ - ID: containerID, - ImageRef: info.Image, - SnapshotID: activeSnapshotName, - Snapshotter: info.Snapshotter, - Labels: info.Labels, - Spec: spec, - }) - if err != nil { - return err - } - - return m.insertEvent(ctx, containerID, "version_rollback", map[string]any{ + return m.insertEvent(dctx, containerID, "version_rollback", map[string]any{ "snapshot_name": snapshotName, "version": version, "source": SnapshotSourceRollback, @@ -289,6 +340,44 @@ func (m *Manager) VersionSnapshotName(ctx context.Context, botID string, version }) } +// replaceContainerSnapshot prepares a new active snapshot from parentSnapshot, +// deletes the old container, recreates it on the new snapshot, and restarts the task. +// Caller must pass a detached context (context.WithoutCancel) to guarantee atomicity. +func (m *Manager) replaceContainerSnapshot(ctx context.Context, botID, containerID string, info ctr.ContainerInfo, activeSnapshotName, parentSnapshot string) error { + if err := m.service.PrepareSnapshot(ctx, info.Snapshotter, activeSnapshotName, parentSnapshot); err != nil { + return err + } + if err := m.service.DeleteContainer(ctx, containerID, &ctr.DeleteContainerOptions{CleanupSnapshot: false}); err != nil { + return err + } + spec, err := m.buildVersionSpec(botID) + if err != nil { + return err + } + if _, err := m.service.CreateContainerFromSnapshot(ctx, ctr.CreateContainerRequest{ + ID: containerID, + ImageRef: info.Image, + SnapshotID: activeSnapshotName, + Snapshotter: info.Snapshotter, + Labels: info.Labels, + Spec: spec, + }); err != nil { + return err + } + if err := m.service.StartContainer(ctx, containerID, &ctr.StartTaskOptions{UseStdio: false}); err != nil { + return err + } + if err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ + ContainerID: containerID, + CNIBinDir: m.cfg.CNIBinaryDir, + CNIConfDir: m.cfg.CNIConfigDir, + }); err != nil { + m.logger.Warn("network setup failed after snapshot replace", + slog.String("container_id", containerID), slog.Any("error", err)) + } + return nil +} + func (m *Manager) buildVersionSpec(botID string) (ctr.ContainerSpec, error) { dataDir, err := m.ensureBotDir(botID) if err != nil {