package containerd import ( "context" "errors" "fmt" "runtime" "syscall" "strings" "time" tasksv1 "github.com/containerd/containerd/api/services/tasks/v1" tasktypes "github.com/containerd/containerd/api/types/task" containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/mount" "github.com/containerd/containerd/v2/defaults" "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/containerd/v2/pkg/namespaces" "github.com/containerd/errdefs" "github.com/containerd/containerd/v2/pkg/oci" "github.com/opencontainers/runtime-spec/specs-go" ) var ( ErrInvalidArgument = errors.New("invalid argument") ErrTaskStopTimeout = errors.New("timeout waiting for task to stop") ) type PullImageOptions struct { Unpack bool Snapshotter string } type DeleteImageOptions struct { Synchronous bool } type CreateContainerRequest struct { ID string ImageRef string SnapshotID string Snapshotter string Labels map[string]string SpecOpts []oci.SpecOpts } type DeleteContainerOptions struct { CleanupSnapshot bool } type StartTaskOptions struct { UseStdio bool Terminal bool FIFODir string } type StopTaskOptions struct { Signal syscall.Signal Timeout time.Duration Force bool } type DeleteTaskOptions struct { Force bool } type ExecTaskRequest struct { Args []string Env []string WorkDir string Terminal bool UseStdio bool } type ExecTaskResult struct { ExitCode uint32 } type SnapshotCommitResult struct { VersionSnapshotID string ActiveSnapshotID string } type ListTasksOptions struct { Filter string } type TaskInfo struct { ContainerID string ID string PID uint32 Status tasktypes.Status ExitStatus uint32 } type Service interface { PullImage(ctx context.Context, ref string, opts *PullImageOptions) (containerd.Image, error) GetImage(ctx context.Context, ref string) (containerd.Image, error) ListImages(ctx context.Context) ([]containerd.Image, error) DeleteImage(ctx context.Context, ref string, opts *DeleteImageOptions) error CreateContainer(ctx context.Context, req CreateContainerRequest) (containerd.Container, error) GetContainer(ctx context.Context, id string) (containerd.Container, error) ListContainers(ctx context.Context) ([]containerd.Container, error) DeleteContainer(ctx context.Context, id string, opts *DeleteContainerOptions) error StartTask(ctx context.Context, containerID string, opts *StartTaskOptions) (containerd.Task, error) GetTask(ctx context.Context, containerID string) (containerd.Task, error) ListTasks(ctx context.Context, opts *ListTasksOptions) ([]TaskInfo, error) StopTask(ctx context.Context, containerID string, opts *StopTaskOptions) error DeleteTask(ctx context.Context, containerID string, opts *DeleteTaskOptions) error ExecTask(ctx context.Context, containerID string, req ExecTaskRequest) (ExecTaskResult, error) ListContainersByLabel(ctx context.Context, key, value string) ([]containerd.Container, error) CommitSnapshot(ctx context.Context, snapshotter, name, key string) error PrepareSnapshot(ctx context.Context, snapshotter, key, parent string) error CreateContainerFromSnapshot(ctx context.Context, req CreateContainerRequest) (containerd.Container, error) SnapshotMounts(ctx context.Context, snapshotter, key string) ([]mount.Mount, error) } type DefaultService struct { client *containerd.Client namespace string } func NewDefaultService(client *containerd.Client, namespace string) *DefaultService { if namespace == "" { namespace = DefaultNamespace } return &DefaultService{ client: client, namespace: namespace, } } func (s *DefaultService) PullImage(ctx context.Context, ref string, opts *PullImageOptions) (containerd.Image, error) { if ref == "" { return nil, ErrInvalidArgument } ctx = s.withNamespace(ctx) pullOpts := []containerd.RemoteOpt{} if opts == nil || opts.Unpack { pullOpts = append(pullOpts, containerd.WithPullUnpack) } if opts != nil && opts.Snapshotter != "" { pullOpts = append(pullOpts, containerd.WithPullSnapshotter(opts.Snapshotter)) } return s.client.Pull(ctx, ref, pullOpts...) } func (s *DefaultService) GetImage(ctx context.Context, ref string) (containerd.Image, error) { if ref == "" { return nil, ErrInvalidArgument } ctx = s.withNamespace(ctx) return s.client.GetImage(ctx, ref) } func (s *DefaultService) ListImages(ctx context.Context) ([]containerd.Image, error) { ctx = s.withNamespace(ctx) return s.client.ListImages(ctx) } func (s *DefaultService) DeleteImage(ctx context.Context, ref string, opts *DeleteImageOptions) error { if ref == "" { return ErrInvalidArgument } ctx = s.withNamespace(ctx) deleteOpts := []images.DeleteOpt{} if opts != nil && opts.Synchronous { deleteOpts = append(deleteOpts, images.SynchronousDelete()) } return s.client.ImageService().Delete(ctx, ref, deleteOpts...) } func (s *DefaultService) CreateContainer(ctx context.Context, req CreateContainerRequest) (containerd.Container, error) { if req.ID == "" || req.ImageRef == "" { return nil, ErrInvalidArgument } ctx = s.withNamespace(ctx) image, err := s.getImageWithFallback(ctx, req.ImageRef) if err != nil { pullOpts := &PullImageOptions{ Unpack: true, Snapshotter: req.Snapshotter, } image, err = s.PullImage(ctx, req.ImageRef, pullOpts) if err != nil { return nil, err } } if req.Snapshotter != "" { if err := image.Unpack(ctx, req.Snapshotter); err != nil && !errdefs.IsAlreadyExists(err) { return nil, err } } snapshotID := req.SnapshotID if snapshotID == "" { snapshotID = req.ID } specOpts := []oci.SpecOpts{ oci.WithDefaultSpecForPlatform("linux/" + runtime.GOARCH), oci.WithImageConfig(image), } if len(req.SpecOpts) > 0 { specOpts = append(specOpts, req.SpecOpts...) } containerOpts := []containerd.NewContainerOpts{ containerd.WithImage(image), containerd.WithNewSnapshot(snapshotID, image), containerd.WithNewSpec(specOpts...), } runtimeName := s.client.Runtime() if runtimeName == "" { runtimeName = defaults.DefaultRuntime if runtimeName == "" { runtimeName = "io.containerd.runc.v2" } } containerOpts = append(containerOpts, containerd.WithRuntime(runtimeName, nil)) if req.Snapshotter != "" { containerOpts = append(containerOpts, containerd.WithSnapshotter(req.Snapshotter)) } if len(req.Labels) > 0 { containerOpts = append(containerOpts, containerd.WithContainerLabels(req.Labels)) } return s.client.NewContainer(ctx, req.ID, containerOpts...) } func (s *DefaultService) getImageWithFallback(ctx context.Context, ref string) (containerd.Image, error) { image, err := s.GetImage(ctx, ref) if err == nil { return image, nil } if strings.HasPrefix(ref, "docker.io/library/") { alt := strings.TrimPrefix(ref, "docker.io/library/") image, altErr := s.GetImage(ctx, alt) if altErr == nil { return image, nil } } images, listErr := s.ListImages(ctx) if listErr == nil { for _, img := range images { name := img.Name() if name == ref || strings.HasSuffix(ref, "/"+name) || strings.HasSuffix(name, "/"+ref) { return img, nil } if strings.HasPrefix(ref, "docker.io/library/") { alt := strings.TrimPrefix(ref, "docker.io/library/") if name == alt || strings.HasSuffix(name, "/"+alt) { return img, nil } } } } return nil, err } func (s *DefaultService) GetContainer(ctx context.Context, id string) (containerd.Container, error) { if id == "" { return nil, ErrInvalidArgument } ctx = s.withNamespace(ctx) return s.client.LoadContainer(ctx, id) } func (s *DefaultService) ListContainers(ctx context.Context) ([]containerd.Container, error) { ctx = s.withNamespace(ctx) return s.client.Containers(ctx) } func (s *DefaultService) DeleteContainer(ctx context.Context, id string, opts *DeleteContainerOptions) error { if id == "" { return ErrInvalidArgument } ctx = s.withNamespace(ctx) container, err := s.client.LoadContainer(ctx, id) if err != nil { return err } deleteOpts := []containerd.DeleteOpts{} cleanupSnapshot := true if opts != nil { cleanupSnapshot = opts.CleanupSnapshot } if cleanupSnapshot { deleteOpts = append(deleteOpts, containerd.WithSnapshotCleanup) } return container.Delete(ctx, deleteOpts...) } func (s *DefaultService) StartTask(ctx context.Context, containerID string, opts *StartTaskOptions) (containerd.Task, error) { if containerID == "" { return nil, ErrInvalidArgument } ctx = s.withNamespace(ctx) container, err := s.client.LoadContainer(ctx, containerID) if err != nil { return nil, err } var cioOpts []cio.Opt if opts == nil || opts.UseStdio { cioOpts = append(cioOpts, cio.WithStdio) } if opts != nil && opts.Terminal { cioOpts = append(cioOpts, cio.WithTerminal) } if opts != nil && opts.FIFODir != "" { cioOpts = append(cioOpts, cio.WithFIFODir(opts.FIFODir)) } ioCreator := cio.NewCreator(cioOpts...) task, err := container.NewTask(ctx, ioCreator) if err != nil { return nil, err } if err := task.Start(ctx); err != nil { return nil, err } return task, nil } func (s *DefaultService) GetTask(ctx context.Context, containerID string) (containerd.Task, error) { if containerID == "" { return nil, ErrInvalidArgument } ctx = s.withNamespace(ctx) container, err := s.client.LoadContainer(ctx, containerID) if err != nil { return nil, err } return container.Task(ctx, nil) } func (s *DefaultService) ListTasks(ctx context.Context, opts *ListTasksOptions) ([]TaskInfo, error) { ctx = s.withNamespace(ctx) request := &tasksv1.ListTasksRequest{} if opts != nil { request.Filter = opts.Filter } response, err := s.client.TaskService().List(ctx, request) if err != nil { return nil, err } tasks := make([]TaskInfo, 0, len(response.Tasks)) for _, task := range response.Tasks { tasks = append(tasks, TaskInfo{ ContainerID: task.ContainerID, ID: task.ID, PID: task.Pid, Status: task.Status, ExitStatus: task.ExitStatus, }) } return tasks, nil } func (s *DefaultService) StopTask(ctx context.Context, containerID string, opts *StopTaskOptions) error { if containerID == "" { return ErrInvalidArgument } ctx = s.withNamespace(ctx) task, err := s.GetTask(ctx, containerID) if err != nil { return err } signal := syscall.SIGTERM timeout := 10 * time.Second force := false if opts != nil { if opts.Signal != 0 { signal = opts.Signal } if opts.Timeout != 0 { timeout = opts.Timeout } force = opts.Force } if err := task.Kill(ctx, signal); err != nil { return err } statusC, err := task.Wait(ctx) if err != nil { return err } timer := time.NewTimer(timeout) defer timer.Stop() select { case <-statusC: return nil case <-timer.C: if force { if err := task.Kill(ctx, syscall.SIGKILL); err != nil { return fmt.Errorf("force kill failed: %w", err) } <-statusC return nil } return ErrTaskStopTimeout } } func (s *DefaultService) DeleteTask(ctx context.Context, containerID string, opts *DeleteTaskOptions) error { if containerID == "" { return ErrInvalidArgument } ctx = s.withNamespace(ctx) task, err := s.GetTask(ctx, containerID) if err != nil { return err } if opts != nil && opts.Force { _ = task.Kill(ctx, syscall.SIGKILL) } _, err = task.Delete(ctx) return err } func (s *DefaultService) ExecTask(ctx context.Context, containerID string, req ExecTaskRequest) (ExecTaskResult, error) { if containerID == "" || len(req.Args) == 0 { return ExecTaskResult{}, ErrInvalidArgument } ctx = s.withNamespace(ctx) container, err := s.client.LoadContainer(ctx, containerID) if err != nil { return ExecTaskResult{}, err } spec, err := container.Spec(ctx) if err != nil { return ExecTaskResult{}, err } if spec.Process == nil { spec.Process = &specs.Process{} } if len(req.Env) > 0 { if err := oci.WithEnv(req.Env)(ctx, nil, nil, spec); err != nil { return ExecTaskResult{}, err } } spec.Process.Args = req.Args if req.WorkDir != "" { spec.Process.Cwd = req.WorkDir } if req.Terminal { spec.Process.Terminal = true } task, err := container.Task(ctx, nil) if err != nil { return ExecTaskResult{}, err } ioOpts := []cio.Opt{} if req.UseStdio { ioOpts = append(ioOpts, cio.WithStdio) } if req.Terminal { ioOpts = append(ioOpts, cio.WithTerminal) } ioCreator := cio.NewCreator(ioOpts...) execID := fmt.Sprintf("exec-%d", time.Now().UnixNano()) process, err := task.Exec(ctx, execID, spec.Process, ioCreator) if err != nil { return ExecTaskResult{}, err } defer process.Delete(ctx) statusC, err := process.Wait(ctx) if err != nil { return ExecTaskResult{}, err } if err := process.Start(ctx); err != nil { return ExecTaskResult{}, err } status := <-statusC code, _, err := status.Result() if err != nil { return ExecTaskResult{}, err } return ExecTaskResult{ExitCode: code}, nil } func (s *DefaultService) ListContainersByLabel(ctx context.Context, key, value string) ([]containerd.Container, error) { if key == "" { return nil, ErrInvalidArgument } ctx = s.withNamespace(ctx) containers, err := s.client.Containers(ctx) if err != nil { return nil, err } filtered := make([]containerd.Container, 0, len(containers)) for _, container := range containers { info, err := container.Info(ctx) if err != nil { return nil, err } if labelValue, ok := info.Labels[key]; ok && (value == "" || value == labelValue) { filtered = append(filtered, container) } } return filtered, nil } func (s *DefaultService) CommitSnapshot(ctx context.Context, snapshotter, name, key string) error { if snapshotter == "" || name == "" || key == "" { return ErrInvalidArgument } ctx = s.withNamespace(ctx) return s.client.SnapshotService(snapshotter).Commit(ctx, name, key) } func (s *DefaultService) PrepareSnapshot(ctx context.Context, snapshotter, key, parent string) error { if snapshotter == "" || key == "" || parent == "" { return ErrInvalidArgument } ctx = s.withNamespace(ctx) _, err := s.client.SnapshotService(snapshotter).Prepare(ctx, key, parent) return err } func (s *DefaultService) CreateContainerFromSnapshot(ctx context.Context, req CreateContainerRequest) (containerd.Container, error) { if req.ID == "" || req.SnapshotID == "" { return nil, ErrInvalidArgument } ctx = s.withNamespace(ctx) imageRef := req.ImageRef if imageRef == "" { return nil, ErrInvalidArgument } image, err := s.GetImage(ctx, imageRef) if err != nil { image, err = s.PullImage(ctx, imageRef, &PullImageOptions{ Unpack: true, Snapshotter: req.Snapshotter, }) if err != nil { return nil, err } } specOpts := []oci.SpecOpts{ oci.WithDefaultSpecForPlatform("linux/" + runtime.GOARCH), oci.WithImageConfig(image), } if len(req.SpecOpts) > 0 { specOpts = append(specOpts, req.SpecOpts...) } containerOpts := []containerd.NewContainerOpts{ containerd.WithImage(image), containerd.WithSnapshot(req.SnapshotID), containerd.WithNewSpec(specOpts...), } if req.Snapshotter != "" { containerOpts = append(containerOpts, containerd.WithSnapshotter(req.Snapshotter)) } if len(req.Labels) > 0 { containerOpts = append(containerOpts, containerd.WithContainerLabels(req.Labels)) } runtimeName := s.client.Runtime() if runtimeName == "" { runtimeName = defaults.DefaultRuntime if runtimeName == "" { runtimeName = "io.containerd.runc.v2" } } containerOpts = append(containerOpts, containerd.WithRuntime(runtimeName, nil)) return s.client.NewContainer(ctx, req.ID, containerOpts...) } func (s *DefaultService) SnapshotMounts(ctx context.Context, snapshotter, key string) ([]mount.Mount, error) { if snapshotter == "" || key == "" { return nil, ErrInvalidArgument } ctx = s.withNamespace(ctx) return s.client.SnapshotService(snapshotter).Mounts(ctx, key) } func (s *DefaultService) withNamespace(ctx context.Context) context.Context { return namespaces.WithNamespace(ctx, s.namespace) }