Merge branch 'v0.4'

This commit is contained in:
Ran
2026-03-11 19:10:26 +08:00
2 changed files with 125 additions and 6 deletions
+98
View File
@@ -239,6 +239,104 @@ func (m *Manager) importLegacyDir(ctx context.Context, botID, srcDir string) err
return nil
}
// recoverOrphanedSnapshot detects a snapshot whose container was deleted
// (e.g. dev image rebuild, containerd metadata loss) and exports /data to a
// backup archive. The caller should invoke restorePreservedIntoSnapshot after
// creating the replacement container. Returns true when data was preserved.
func (m *Manager) recoverOrphanedSnapshot(ctx context.Context, botID string) bool {
snapshotter := m.cfg.Snapshotter
if snapshotter == "" {
return false
}
snapshotKey := m.containerID(botID)
raw, err := m.service.SnapshotMounts(ctx, snapshotter, snapshotKey)
if err != nil {
return false
}
mounts := make([]mount.Mount, len(raw))
for i, r := range raw {
mounts[i] = mount.Mount{Type: r.Type, Source: r.Source, Options: r.Options}
}
backupPath := m.backupPath(botID)
if err := os.MkdirAll(filepath.Dir(backupPath), 0o750); err != nil {
m.logger.Warn("recover orphaned snapshot: mkdir failed",
slog.String("bot_id", botID), slog.Any("error", err))
return false
}
f, err := os.Create(backupPath) //nolint:gosec // G304: operator-controlled path
if err != nil {
m.logger.Warn("recover orphaned snapshot: create backup failed",
slog.String("bot_id", botID), slog.Any("error", err))
return false
}
writeErr := mount.WithReadonlyTempMount(ctx, mounts, func(root string) error {
dataDir := mountedDataDir(root)
if _, statErr := os.Stat(dataDir); statErr != nil {
return nil
}
return tarGzDir(f, dataDir)
})
closeErr := f.Close()
if writeErr != nil {
_ = os.Remove(backupPath)
m.logger.Warn("recover orphaned snapshot: export failed",
slog.String("bot_id", botID), slog.Any("error", writeErr))
return false
}
if closeErr != nil {
_ = os.Remove(backupPath)
return false
}
m.logger.Info("recovered data from orphaned snapshot",
slog.String("bot_id", botID), slog.String("backup", backupPath))
return true
}
// restorePreservedIntoSnapshot restores a preserved backup directly into
// the container's snapshot before the task is started. This avoids the
// stop/start cycle that RestorePreservedData (via ImportData) requires.
func (m *Manager) restorePreservedIntoSnapshot(ctx context.Context, botID string) error {
bp := m.backupPath(botID)
f, err := os.Open(bp) //nolint:gosec // G304: operator-controlled path
if err != nil {
return err
}
defer func() { _ = f.Close() }()
containerID := m.containerID(botID)
info, err := m.service.GetContainer(ctx, containerID)
if err != nil {
return fmt.Errorf("get container: %w", err)
}
mounts, err := m.snapshotMounts(ctx, info)
if err != nil {
return err
}
if err := mount.WithTempMount(ctx, mounts, func(root string) error {
dataDir := mountedDataDir(root)
if err := os.MkdirAll(dataDir, 0o750); err != nil {
return err
}
return untarGzDir(f, dataDir)
}); err != nil {
return err
}
_ = os.Remove(bp)
m.logger.Info("restored preserved data into new container",
slog.String("bot_id", botID))
return nil
}
// errMountNotSupported indicates the backend doesn't support snapshot mounts
// (e.g. Apple Virtualization). Callers fall back to gRPC-based data operations.
var errMountNotSupported = errors.New("snapshot mount not supported on this backend")
+27 -6
View File
@@ -282,27 +282,48 @@ func (m *Manager) ListBots(ctx context.Context) ([]string, error) {
}
func (m *Manager) Start(ctx context.Context, botID string) error {
containerID := m.containerID(botID)
// Before creating a new container, check for an orphaned snapshot
// (container deleted but snapshot with /data survived). Export /data
// to a backup so it can be restored after EnsureBot creates a fresh
// container. This covers dev image rebuilds, containerd metadata loss,
// and manual container deletion.
if _, err := m.service.GetContainer(ctx, containerID); errdefs.IsNotFound(err) {
m.recoverOrphanedSnapshot(ctx, botID)
}
if err := m.EnsureBot(ctx, botID); err != nil {
return err
}
if err := m.service.StartContainer(ctx, m.containerID(botID), nil); err != nil {
// Restore preserved data (from orphaned snapshot recovery or a previous
// CleanupBotContainer with preserveData) into the fresh snapshot before
// starting the task, avoiding a redundant stop/start cycle.
if m.HasPreservedData(botID) {
if err := m.restorePreservedIntoSnapshot(ctx, botID); err != nil {
m.logger.Warn("restore preserved data into new container failed",
slog.String("bot_id", botID), slog.Any("error", err))
}
}
if err := m.service.StartContainer(ctx, containerID, nil); err != nil {
return err
}
netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
ContainerID: m.containerID(botID),
ContainerID: containerID,
CNIBinDir: m.cfg.CNIBinaryDir,
CNIConfDir: m.cfg.CNIConfigDir,
})
if err != nil {
if stopErr := m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}); stopErr != nil {
m.logger.Warn("cleanup: stop task failed", slog.String("container_id", m.containerID(botID)), slog.Any("error", stopErr))
if stopErr := m.service.StopContainer(ctx, containerID, &ctr.StopTaskOptions{Force: true}); stopErr != nil {
m.logger.Warn("cleanup: stop task failed", slog.String("container_id", containerID), slog.Any("error", stopErr))
}
return err
}
if netResult.IP == "" {
if stopErr := m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}); stopErr != nil {
m.logger.Warn("cleanup: stop task failed", slog.String("container_id", m.containerID(botID)), slog.Any("error", stopErr))
if stopErr := m.service.StopContainer(ctx, containerID, &ctr.StopTaskOptions{Force: true}); stopErr != nil {
m.logger.Warn("cleanup: stop task failed", slog.String("container_id", containerID), slog.Any("error", stopErr))
}
return fmt.Errorf("network setup returned no IP for bot %s", botID)
}