diff --git a/docs/docs.go b/docs/docs.go index 8a5cf17a..94fdb3c3 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -386,6 +386,26 @@ const docTemplate = `{ } }, "/mcp/containers": { + "get": { + "tags": [ + "containerd" + ], + "summary": "List containers", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.ListContainersResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, "post": { "tags": [ "containerd" @@ -464,7 +484,215 @@ const docTemplate = `{ } } }, + "/mcp/fs/{id}": { + "post": { + "description": "Forwards MCP JSON-RPC requests to the MCP server inside the container.\nRequired:\n- container task is running\n- container has data mount (default /data) bound to \u003cdata_root\u003e/users/\u003cuser_id\u003e\n- container image contains the \"mcp\" binary\nAuth: Bearer JWT is used to determine user_id (sub or user_id).\nPaths must be relative (no leading slash) and must not contain \"..\".\n\nExample: tools/list\n{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\"}\n\nExample: tools/call (fs.read)\n{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"tools/call\",\"params\":{\"name\":\"fs.read\",\"arguments\":{\"path\":\"notes.txt\"}}}", + "tags": [ + "containerd" + ], + "summary": "MCP filesystem tools (JSON-RPC)", + "parameters": [ + { + "type": "string", + "description": "Bearer \u003ctoken\u003e", + "name": "Authorization", + "in": "header", + "required": true + }, + { + "type": "string", + "description": "Container ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "JSON-RPC request", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "type": "object" + } + } + ], + "responses": { + "200": { + "description": "JSON-RPC response: {jsonrpc,id,result|error}", + "schema": { + "type": "object" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + } + }, + "/mcp/skills": { + "get": { + "tags": [ + "containerd" + ], + "summary": "List skills from container", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.SkillsResponse" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "post": { + "tags": [ + "containerd" + ], + "summary": "Upload skills into container", + "parameters": [ + { + "description": "Skills payload", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/handlers.SkillsUpsertRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.skillsOpResponse" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "delete": { + "tags": [ + "containerd" + ], + "summary": "Delete skills from container", + "parameters": [ + { + "description": "Delete skills payload", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/handlers.SkillsDeleteRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.skillsOpResponse" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + } + }, "/mcp/snapshots": { + "get": { + "tags": [ + "containerd" + ], + "summary": "List snapshots", + "parameters": [ + { + "type": "string", + "description": "Snapshotter name", + "name": "snapshotter", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.ListSnapshotsResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, "post": { "tags": [ "containerd" @@ -2428,6 +2656,35 @@ const docTemplate = `{ "type": "object", "additionalProperties": true }, + "handlers.ContainerInfo": { + "type": "object", + "properties": { + "created_at": { + "type": "string" + }, + "id": { + "type": "string" + }, + "image": { + "type": "string" + }, + "labels": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "snapshot_key": { + "type": "string" + }, + "snapshotter": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, "handlers.CreateContainerRequest": { "type": "object", "properties": { @@ -2569,6 +2826,31 @@ const docTemplate = `{ } } }, + "handlers.ListContainersResponse": { + "type": "object", + "properties": { + "containers": { + "type": "array", + "items": { + "$ref": "#/definitions/handlers.ContainerInfo" + } + } + } + }, + "handlers.ListSnapshotsResponse": { + "type": "object", + "properties": { + "snapshots": { + "type": "array", + "items": { + "$ref": "#/definitions/handlers.SnapshotInfo" + } + }, + "snapshotter": { + "type": "string" + } + } + }, "handlers.LoginRequest": { "type": "object", "properties": { @@ -2606,6 +2888,90 @@ const docTemplate = `{ } } }, + "handlers.SkillItem": { + "type": "object", + "properties": { + "content": { + "type": "string" + }, + "description": { + "type": "string" + }, + "name": { + "type": "string" + } + } + }, + "handlers.SkillsDeleteRequest": { + "type": "object", + "properties": { + "names": { + "type": "array", + "items": { + "type": "string" + } + } + } + }, + "handlers.SkillsResponse": { + "type": "object", + "properties": { + "skills": { + "type": "array", + "items": { + "$ref": "#/definitions/handlers.SkillItem" + } + } + } + }, + "handlers.SkillsUpsertRequest": { + "type": "object", + "properties": { + "skills": { + "type": "array", + "items": { + "$ref": "#/definitions/handlers.SkillItem" + } + } + } + }, + "handlers.SnapshotInfo": { + "type": "object", + "properties": { + "created_at": { + "type": "string" + }, + "kind": { + "type": "string" + }, + "labels": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "name": { + "type": "string" + }, + "parent": { + "type": "string" + }, + "snapshotter": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, + "handlers.skillsOpResponse": { + "type": "object", + "properties": { + "ok": { + "type": "boolean" + } + } + }, "history.CreateRequest": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index 92a3c711..99940e31 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -377,6 +377,26 @@ } }, "/mcp/containers": { + "get": { + "tags": [ + "containerd" + ], + "summary": "List containers", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.ListContainersResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, "post": { "tags": [ "containerd" @@ -455,7 +475,215 @@ } } }, + "/mcp/fs/{id}": { + "post": { + "description": "Forwards MCP JSON-RPC requests to the MCP server inside the container.\nRequired:\n- container task is running\n- container has data mount (default /data) bound to \u003cdata_root\u003e/users/\u003cuser_id\u003e\n- container image contains the \"mcp\" binary\nAuth: Bearer JWT is used to determine user_id (sub or user_id).\nPaths must be relative (no leading slash) and must not contain \"..\".\n\nExample: tools/list\n{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\"}\n\nExample: tools/call (fs.read)\n{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"tools/call\",\"params\":{\"name\":\"fs.read\",\"arguments\":{\"path\":\"notes.txt\"}}}", + "tags": [ + "containerd" + ], + "summary": "MCP filesystem tools (JSON-RPC)", + "parameters": [ + { + "type": "string", + "description": "Bearer \u003ctoken\u003e", + "name": "Authorization", + "in": "header", + "required": true + }, + { + "type": "string", + "description": "Container ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "JSON-RPC request", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "type": "object" + } + } + ], + "responses": { + "200": { + "description": "JSON-RPC response: {jsonrpc,id,result|error}", + "schema": { + "type": "object" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + } + }, + "/mcp/skills": { + "get": { + "tags": [ + "containerd" + ], + "summary": "List skills from container", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.SkillsResponse" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "post": { + "tags": [ + "containerd" + ], + "summary": "Upload skills into container", + "parameters": [ + { + "description": "Skills payload", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/handlers.SkillsUpsertRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.skillsOpResponse" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "delete": { + "tags": [ + "containerd" + ], + "summary": "Delete skills from container", + "parameters": [ + { + "description": "Delete skills payload", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/handlers.SkillsDeleteRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.skillsOpResponse" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + } + }, "/mcp/snapshots": { + "get": { + "tags": [ + "containerd" + ], + "summary": "List snapshots", + "parameters": [ + { + "type": "string", + "description": "Snapshotter name", + "name": "snapshotter", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/handlers.ListSnapshotsResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, "post": { "tags": [ "containerd" @@ -2419,6 +2647,35 @@ "type": "object", "additionalProperties": true }, + "handlers.ContainerInfo": { + "type": "object", + "properties": { + "created_at": { + "type": "string" + }, + "id": { + "type": "string" + }, + "image": { + "type": "string" + }, + "labels": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "snapshot_key": { + "type": "string" + }, + "snapshotter": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, "handlers.CreateContainerRequest": { "type": "object", "properties": { @@ -2560,6 +2817,31 @@ } } }, + "handlers.ListContainersResponse": { + "type": "object", + "properties": { + "containers": { + "type": "array", + "items": { + "$ref": "#/definitions/handlers.ContainerInfo" + } + } + } + }, + "handlers.ListSnapshotsResponse": { + "type": "object", + "properties": { + "snapshots": { + "type": "array", + "items": { + "$ref": "#/definitions/handlers.SnapshotInfo" + } + }, + "snapshotter": { + "type": "string" + } + } + }, "handlers.LoginRequest": { "type": "object", "properties": { @@ -2597,6 +2879,90 @@ } } }, + "handlers.SkillItem": { + "type": "object", + "properties": { + "content": { + "type": "string" + }, + "description": { + "type": "string" + }, + "name": { + "type": "string" + } + } + }, + "handlers.SkillsDeleteRequest": { + "type": "object", + "properties": { + "names": { + "type": "array", + "items": { + "type": "string" + } + } + } + }, + "handlers.SkillsResponse": { + "type": "object", + "properties": { + "skills": { + "type": "array", + "items": { + "$ref": "#/definitions/handlers.SkillItem" + } + } + } + }, + "handlers.SkillsUpsertRequest": { + "type": "object", + "properties": { + "skills": { + "type": "array", + "items": { + "$ref": "#/definitions/handlers.SkillItem" + } + } + } + }, + "handlers.SnapshotInfo": { + "type": "object", + "properties": { + "created_at": { + "type": "string" + }, + "kind": { + "type": "string" + }, + "labels": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "name": { + "type": "string" + }, + "parent": { + "type": "string" + }, + "snapshotter": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, + "handlers.skillsOpResponse": { + "type": "object", + "properties": { + "ok": { + "type": "boolean" + } + } + }, "history.CreateRequest": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 04608597..4542091d 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -40,6 +40,25 @@ definitions: chat.GatewayMessage: additionalProperties: true type: object + handlers.ContainerInfo: + properties: + created_at: + type: string + id: + type: string + image: + type: string + labels: + additionalProperties: + type: string + type: object + snapshot_key: + type: string + snapshotter: + type: string + updated_at: + type: string + type: object handlers.CreateContainerRequest: properties: container_id: @@ -131,6 +150,22 @@ definitions: message: type: string type: object + handlers.ListContainersResponse: + properties: + containers: + items: + $ref: '#/definitions/handlers.ContainerInfo' + type: array + type: object + handlers.ListSnapshotsResponse: + properties: + snapshots: + items: + $ref: '#/definitions/handlers.SnapshotInfo' + type: array + snapshotter: + type: string + type: object handlers.LoginRequest: properties: password: @@ -155,6 +190,60 @@ definitions: username: type: string type: object + handlers.SkillItem: + properties: + content: + type: string + description: + type: string + name: + type: string + type: object + handlers.SkillsDeleteRequest: + properties: + names: + items: + type: string + type: array + type: object + handlers.SkillsResponse: + properties: + skills: + items: + $ref: '#/definitions/handlers.SkillItem' + type: array + type: object + handlers.SkillsUpsertRequest: + properties: + skills: + items: + $ref: '#/definitions/handlers.SkillItem' + type: array + type: object + handlers.SnapshotInfo: + properties: + created_at: + type: string + kind: + type: string + labels: + additionalProperties: + type: string + type: object + name: + type: string + parent: + type: string + snapshotter: + type: string + updated_at: + type: string + type: object + handlers.skillsOpResponse: + properties: + ok: + type: boolean + type: object history.CreateRequest: properties: messages: @@ -918,6 +1007,19 @@ paths: tags: - history /mcp/containers: + get: + responses: + "200": + description: OK + schema: + $ref: '#/definitions/handlers.ListContainersResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: List containers + tags: + - containerd post: parameters: - description: Create container payload @@ -968,7 +1070,156 @@ paths: summary: Delete MCP container tags: - containerd + /mcp/fs/{id}: + post: + description: |- + Forwards MCP JSON-RPC requests to the MCP server inside the container. + Required: + - container task is running + - container has data mount (default /data) bound to /users/ + - container image contains the "mcp" binary + Auth: Bearer JWT is used to determine user_id (sub or user_id). + Paths must be relative (no leading slash) and must not contain "..". + + Example: tools/list + {"jsonrpc":"2.0","id":1,"method":"tools/list"} + + Example: tools/call (fs.read) + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"fs.read","arguments":{"path":"notes.txt"}}} + parameters: + - description: Bearer + in: header + name: Authorization + required: true + type: string + - description: Container ID + in: path + name: id + required: true + type: string + - description: JSON-RPC request + in: body + name: payload + required: true + schema: + type: object + responses: + "200": + description: 'JSON-RPC response: {jsonrpc,id,result|error}' + schema: + type: object + "400": + description: Bad Request + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "404": + description: Not Found + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: MCP filesystem tools (JSON-RPC) + tags: + - containerd + /mcp/skills: + delete: + parameters: + - description: Delete skills payload + in: body + name: payload + required: true + schema: + $ref: '#/definitions/handlers.SkillsDeleteRequest' + responses: + "200": + description: OK + schema: + $ref: '#/definitions/handlers.skillsOpResponse' + "400": + description: Bad Request + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "404": + description: Not Found + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: Delete skills from container + tags: + - containerd + get: + responses: + "200": + description: OK + schema: + $ref: '#/definitions/handlers.SkillsResponse' + "400": + description: Bad Request + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "404": + description: Not Found + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: List skills from container + tags: + - containerd + post: + parameters: + - description: Skills payload + in: body + name: payload + required: true + schema: + $ref: '#/definitions/handlers.SkillsUpsertRequest' + responses: + "200": + description: OK + schema: + $ref: '#/definitions/handlers.skillsOpResponse' + "400": + description: Bad Request + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "404": + description: Not Found + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: Upload skills into container + tags: + - containerd /mcp/snapshots: + get: + parameters: + - description: Snapshotter name + in: query + name: snapshotter + type: string + responses: + "200": + description: OK + schema: + $ref: '#/definitions/handlers.ListSnapshotsResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: List snapshots + tags: + - containerd post: parameters: - description: Create snapshot payload diff --git a/internal/config/config.go b/internal/config/config.go index 004498a5..557c2784 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -26,13 +26,13 @@ const ( ) type Config struct { - Server ServerConfig `toml:"server"` - Admin AdminConfig `toml:"admin"` - Auth AuthConfig `toml:"auth"` - Containerd ContainerdConfig `toml:"containerd"` - MCP MCPConfig `toml:"mcp"` - Postgres PostgresConfig `toml:"postgres"` - Qdrant QdrantConfig `toml:"qdrant"` + Server ServerConfig `toml:"server"` + Admin AdminConfig `toml:"admin"` + Auth AuthConfig `toml:"auth"` + Containerd ContainerdConfig `toml:"containerd"` + MCP MCPConfig `toml:"mcp"` + Postgres PostgresConfig `toml:"postgres"` + Qdrant QdrantConfig `toml:"qdrant"` AgentGateway AgentGatewayConfig `toml:"agent_gateway"` } diff --git a/internal/containerd/service.go b/internal/containerd/service.go index 481beed2..d90dd3e4 100644 --- a/internal/containerd/service.go +++ b/internal/containerd/service.go @@ -1,24 +1,33 @@ package containerd import ( + "bytes" + "compress/gzip" "context" "errors" "fmt" + "io" + "os" + "path/filepath" "runtime" - "syscall" "strings" + "syscall" "time" tasksv1 "github.com/containerd/containerd/api/services/tasks/v1" tasktypes "github.com/containerd/containerd/api/types/task" containerd "github.com/containerd/containerd/v2/client" + "github.com/containerd/containerd/v2/core/content" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/mount" - "github.com/containerd/containerd/v2/defaults" + "github.com/containerd/containerd/v2/core/snapshots" "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/containerd/v2/pkg/namespaces" - "github.com/containerd/errdefs" "github.com/containerd/containerd/v2/pkg/oci" + "github.com/containerd/errdefs" + "github.com/containerd/platforms" + "github.com/opencontainers/go-digest" + "github.com/opencontainers/image-spec/identity" "github.com/opencontainers/runtime-spec/specs-go" ) @@ -71,6 +80,18 @@ type ExecTaskRequest struct { WorkDir string Terminal bool UseStdio bool + FIFODir string + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} + +type ExecTaskSession struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser + Wait func() (ExecTaskResult, error) + Close func() error } type ExecTaskResult struct { @@ -111,8 +132,10 @@ type Service interface { StopTask(ctx context.Context, containerID string, opts *StopTaskOptions) error DeleteTask(ctx context.Context, containerID string, opts *DeleteTaskOptions) error ExecTask(ctx context.Context, containerID string, req ExecTaskRequest) (ExecTaskResult, error) + ExecTaskStreaming(ctx context.Context, containerID string, req ExecTaskRequest) (*ExecTaskSession, error) ListContainersByLabel(ctx context.Context, key, value string) ([]containerd.Container, error) CommitSnapshot(ctx context.Context, snapshotter, name, key string) error + ListSnapshots(ctx context.Context, snapshotter string) ([]snapshots.Info, error) PrepareSnapshot(ctx context.Context, snapshotter, key, parent string) error CreateContainerFromSnapshot(ctx context.Context, req CreateContainerRequest) (containerd.Container, error) SnapshotMounts(ctx context.Context, snapshotter, key string) ([]mount.Mount, error) @@ -181,6 +204,11 @@ func (s *DefaultService) CreateContainer(ctx context.Context, req CreateContaine } ctx = s.withNamespace(ctx) + ctx, done, err := s.client.WithLease(ctx) + if err != nil { + return nil, err + } + defer done(ctx) image, err := s.getImageWithFallback(ctx, req.ImageRef) if err != nil { pullOpts := &PullImageOptions{ @@ -192,12 +220,6 @@ func (s *DefaultService) CreateContainer(ctx context.Context, req CreateContaine return nil, err } } - if req.Snapshotter != "" { - if err := image.Unpack(ctx, req.Snapshotter); err != nil && !errdefs.IsAlreadyExists(err) { - return nil, err - } - } - snapshotID := req.SnapshotID if snapshotID == "" { snapshotID = req.ID @@ -213,20 +235,32 @@ func (s *DefaultService) CreateContainer(ctx context.Context, req CreateContaine containerOpts := []containerd.NewContainerOpts{ containerd.WithImage(image), - containerd.WithNewSnapshot(snapshotID, image), - containerd.WithNewSpec(specOpts...), } - runtimeName := s.client.Runtime() - if runtimeName == "" { - runtimeName = defaults.DefaultRuntime - if runtimeName == "" { - runtimeName = "io.containerd.runc.v2" - } - } - containerOpts = append(containerOpts, containerd.WithRuntime(runtimeName, nil)) if req.Snapshotter != "" { containerOpts = append(containerOpts, containerd.WithSnapshotter(req.Snapshotter)) } + if req.Snapshotter != "" { + parent, err := s.snapshotParentFromLayers(ctx, image) + if err != nil { + return nil, err + } + ok, err := s.snapshotExists(ctx, req.Snapshotter, parent) + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("parent snapshot %s does not exist", parent) + } + if err := s.prepareSnapshot(ctx, req.Snapshotter, snapshotID, parent); err != nil { + return nil, err + } + containerOpts = append(containerOpts, containerd.WithSnapshot(snapshotID)) + } else { + containerOpts = append(containerOpts, containerd.WithNewSnapshot(snapshotID, image)) + } + containerOpts = append(containerOpts, containerd.WithNewSpec(specOpts...)) + runtimeName := "io.containerd.runc.v2" + containerOpts = append(containerOpts, containerd.WithRuntime(runtimeName, nil)) if len(req.Labels) > 0 { containerOpts = append(containerOpts, containerd.WithContainerLabels(req.Labels)) } @@ -234,6 +268,73 @@ func (s *DefaultService) CreateContainer(ctx context.Context, req CreateContaine return s.client.NewContainer(ctx, req.ID, containerOpts...) } +func (s *DefaultService) snapshotParentFromLayers(ctx context.Context, image containerd.Image) (string, error) { + manifest, err := images.Manifest(ctx, s.client.ContentStore(), image.Target(), platforms.Default()) + if err != nil { + return "", err + } + if len(manifest.Layers) == 0 { + return "", fmt.Errorf("image has no layer descriptors") + } + diffIDs := make([]digest.Digest, 0, len(manifest.Layers)) + for _, layer := range manifest.Layers { + blob, err := content.ReadBlob(ctx, s.client.ContentStore(), layer) + if err != nil { + return "", err + } + reader := bytes.NewReader(blob) + var r io.ReadCloser + if strings.Contains(layer.MediaType, "gzip") { + r, err = gzip.NewReader(reader) + if err != nil { + return "", err + } + } else { + r = io.NopCloser(reader) + } + + digester := digest.Canonical.Digester() + if _, err := io.Copy(digester.Hash(), r); err != nil { + _ = r.Close() + return "", err + } + _ = r.Close() + diffIDs = append(diffIDs, digester.Digest()) + } + chainIDs := identity.ChainIDs(diffIDs) + return chainIDs[len(chainIDs)-1].String(), nil +} + +func (s *DefaultService) snapshotExists(ctx context.Context, snapshotter, key string) (bool, error) { + if snapshotter == "" || key == "" { + return false, ErrInvalidArgument + } + _, err := s.client.SnapshotService(snapshotter).Stat(ctx, key) + if err == nil { + return true, nil + } + if errdefs.IsNotFound(err) { + return false, nil + } + return false, err +} + +func (s *DefaultService) prepareSnapshot(ctx context.Context, snapshotter, key, parent string) error { + if snapshotter == "" || key == "" || parent == "" { + return ErrInvalidArgument + } + sn := s.client.SnapshotService(snapshotter) + if _, err := sn.Stat(ctx, key); err == nil { + if err := sn.Remove(ctx, key); err != nil { + return err + } + } else if !errdefs.IsNotFound(err) { + return err + } + _, err := sn.Prepare(ctx, key, parent) + return err +} + func (s *DefaultService) getImageWithFallback(ctx context.Context, ref string) (containerd.Image, error) { image, err := s.GetImage(ctx, ref) if err == nil { @@ -481,12 +582,20 @@ func (s *DefaultService) ExecTask(ctx context.Context, containerID string, req E } ioOpts := []cio.Opt{} - if req.UseStdio { + if req.Stdin != nil || req.Stdout != nil || req.Stderr != nil { + ioOpts = append(ioOpts, cio.WithStreams(req.Stdin, req.Stdout, req.Stderr)) + } else if req.UseStdio { ioOpts = append(ioOpts, cio.WithStdio) } if req.Terminal { ioOpts = append(ioOpts, cio.WithTerminal) } + if strings.TrimSpace(req.FIFODir) != "" { + if err := os.MkdirAll(req.FIFODir, 0o755); err != nil { + return ExecTaskResult{}, err + } + ioOpts = append(ioOpts, cio.WithFIFODir(req.FIFODir)) + } ioCreator := cio.NewCreator(ioOpts...) execID := fmt.Sprintf("exec-%d", time.Now().UnixNano()) @@ -513,6 +622,131 @@ func (s *DefaultService) ExecTask(ctx context.Context, containerID string, req E return ExecTaskResult{ExitCode: code}, nil } +func (s *DefaultService) ExecTaskStreaming(ctx context.Context, containerID string, req ExecTaskRequest) (*ExecTaskSession, error) { + if containerID == "" || len(req.Args) == 0 { + return nil, ErrInvalidArgument + } + + ctx = s.withNamespace(ctx) + container, err := s.client.LoadContainer(ctx, containerID) + if err != nil { + return nil, err + } + + spec, err := container.Spec(ctx) + if err != nil { + return nil, err + } + if spec.Process == nil { + spec.Process = &specs.Process{} + } + if len(req.Env) > 0 { + if err := oci.WithEnv(req.Env)(ctx, nil, nil, spec); err != nil { + return nil, err + } + } + spec.Process.Args = req.Args + if req.WorkDir != "" { + spec.Process.Cwd = req.WorkDir + } + if req.Terminal { + spec.Process.Terminal = true + } + + task, err := container.Task(ctx, nil) + if err != nil { + return nil, err + } + + stdinR, stdinW := io.Pipe() + stdoutR, stdoutW := io.Pipe() + stderrR, stderrW := io.Pipe() + + ioOpts := []cio.Opt{ + cio.WithStreams(stdinR, stdoutW, stderrW), + } + if req.Terminal { + ioOpts = append(ioOpts, cio.WithTerminal) + } + fifoDir := strings.TrimSpace(req.FIFODir) + if fifoDir == "" { + if homeDir, err := os.UserHomeDir(); err == nil && homeDir != "" { + fifoDir = filepath.Join(homeDir, ".memoh", "containerd-fifo") + } else { + fifoDir = "/tmp/memoh-containerd-fifo" + } + } + if err := os.MkdirAll(fifoDir, 0o755); err != nil { + _ = stdinR.Close() + _ = stdinW.Close() + _ = stdoutR.Close() + _ = stdoutW.Close() + _ = stderrR.Close() + _ = stderrW.Close() + return nil, err + } + ioOpts = append(ioOpts, cio.WithFIFODir(fifoDir)) + ioCreator := cio.NewCreator(ioOpts...) + + execID := fmt.Sprintf("exec-%d", time.Now().UnixNano()) + process, err := task.Exec(ctx, execID, spec.Process, ioCreator) + if err != nil { + _ = stdinR.Close() + _ = stdinW.Close() + _ = stdoutR.Close() + _ = stdoutW.Close() + _ = stderrR.Close() + _ = stderrW.Close() + return nil, err + } + + if err := process.Start(ctx); err != nil { + _, _ = process.Delete(ctx) + _ = stdinR.Close() + _ = stdinW.Close() + _ = stdoutR.Close() + _ = stdoutW.Close() + _ = stderrR.Close() + _ = stderrW.Close() + return nil, err + } + + wait := func() (ExecTaskResult, error) { + statusC, err := process.Wait(ctx) + if err != nil { + return ExecTaskResult{}, err + } + status := <-statusC + code, _, err := status.Result() + if err != nil { + return ExecTaskResult{}, err + } + _, _ = process.Delete(ctx) + _ = stdoutW.Close() + _ = stderrW.Close() + return ExecTaskResult{ExitCode: code}, nil + } + + closeFn := func() error { + _ = stdinW.Close() + _ = stdoutR.Close() + _ = stderrR.Close() + _ = stdinR.Close() + _ = stdoutW.Close() + _ = stderrW.Close() + _, err := process.Delete(ctx) + return err + } + + return &ExecTaskSession{ + Stdin: stdinW, + Stdout: stdoutR, + Stderr: stderrR, + Wait: wait, + Close: closeFn, + }, nil +} + func (s *DefaultService) ListContainersByLabel(ctx context.Context, key, value string) ([]containerd.Container, error) { if key == "" { return nil, ErrInvalidArgument @@ -545,6 +779,21 @@ func (s *DefaultService) CommitSnapshot(ctx context.Context, snapshotter, name, return s.client.SnapshotService(snapshotter).Commit(ctx, name, key) } +func (s *DefaultService) ListSnapshots(ctx context.Context, snapshotter string) ([]snapshots.Info, error) { + if snapshotter == "" { + return nil, ErrInvalidArgument + } + ctx = s.withNamespace(ctx) + infos := []snapshots.Info{} + if err := s.client.SnapshotService(snapshotter).Walk(ctx, func(ctx context.Context, info snapshots.Info) error { + infos = append(infos, info) + return nil + }); err != nil { + return nil, err + } + return infos, nil +} + func (s *DefaultService) PrepareSnapshot(ctx context.Context, snapshotter, key, parent string) error { if snapshotter == "" || key == "" || parent == "" { return ErrInvalidArgument @@ -587,23 +836,19 @@ func (s *DefaultService) CreateContainerFromSnapshot(ctx context.Context, req Cr containerOpts := []containerd.NewContainerOpts{ containerd.WithImage(image), - containerd.WithSnapshot(req.SnapshotID), - containerd.WithNewSpec(specOpts...), } if req.Snapshotter != "" { containerOpts = append(containerOpts, containerd.WithSnapshotter(req.Snapshotter)) } + containerOpts = append(containerOpts, + containerd.WithSnapshot(req.SnapshotID), + containerd.WithNewSpec(specOpts...), + ) if len(req.Labels) > 0 { containerOpts = append(containerOpts, containerd.WithContainerLabels(req.Labels)) } - runtimeName := s.client.Runtime() - if runtimeName == "" { - runtimeName = defaults.DefaultRuntime - if runtimeName == "" { - runtimeName = "io.containerd.runc.v2" - } - } + runtimeName := "io.containerd.runc.v2" containerOpts = append(containerOpts, containerd.WithRuntime(runtimeName, nil)) return s.client.NewContainer(ctx, req.ID, containerOpts...) @@ -620,4 +865,3 @@ func (s *DefaultService) SnapshotMounts(ctx context.Context, snapshotter, key st func (s *DefaultService) withNamespace(ctx context.Context) context.Context { return namespaces.WithNamespace(ctx, s.namespace) } - diff --git a/internal/handlers/containerd.go b/internal/handlers/containerd.go index 5f623534..070d9706 100644 --- a/internal/handlers/containerd.go +++ b/internal/handlers/containerd.go @@ -1,22 +1,35 @@ package handlers import ( + "context" + "log" "net/http" + "os" + "path/filepath" + "sort" "strings" + "sync" "time" - "github.com/containerd/errdefs" + tasktypes "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/v2/pkg/namespaces" + "github.com/containerd/containerd/v2/pkg/oci" + "github.com/containerd/errdefs" + "github.com/google/uuid" "github.com/labstack/echo/v4" + "github.com/opencontainers/runtime-spec/specs-go" "github.com/memohai/memoh/internal/config" ctr "github.com/memohai/memoh/internal/containerd" + "github.com/memohai/memoh/internal/mcp" ) type ContainerdHandler struct { - service ctr.Service - cfg config.MCPConfig + service ctr.Service + cfg config.MCPConfig namespace string + mcpMu sync.Mutex + mcpSess map[string]*mcpSession } type CreateContainerRequest struct { @@ -43,19 +56,55 @@ type CreateSnapshotResponse struct { Snapshotter string `json:"snapshotter"` } +type ContainerInfo struct { + ID string `json:"id"` + Image string `json:"image,omitempty"` + Snapshotter string `json:"snapshotter,omitempty"` + SnapshotKey string `json:"snapshot_key,omitempty"` + CreatedAt time.Time `json:"created_at,omitempty"` + UpdatedAt time.Time `json:"updated_at,omitempty"` + Labels map[string]string `json:"labels,omitempty"` +} + +type ListContainersResponse struct { + Containers []ContainerInfo `json:"containers"` +} + +type SnapshotInfo struct { + Snapshotter string `json:"snapshotter"` + Name string `json:"name"` + Parent string `json:"parent,omitempty"` + Kind string `json:"kind"` + CreatedAt time.Time `json:"created_at,omitempty"` + UpdatedAt time.Time `json:"updated_at,omitempty"` + Labels map[string]string `json:"labels,omitempty"` +} + +type ListSnapshotsResponse struct { + Snapshotter string `json:"snapshotter"` + Snapshots []SnapshotInfo `json:"snapshots"` +} + func NewContainerdHandler(service ctr.Service, cfg config.MCPConfig, namespace string) *ContainerdHandler { return &ContainerdHandler{ service: service, cfg: cfg, namespace: namespace, + mcpSess: make(map[string]*mcpSession), } } func (h *ContainerdHandler) Register(e *echo.Echo) { group := e.Group("/mcp") group.POST("/containers", h.CreateContainer) + group.GET("/containers", h.ListContainers) group.DELETE("/containers/:id", h.DeleteContainer) group.POST("/snapshots", h.CreateSnapshot) + group.GET("/snapshots", h.ListSnapshots) + group.GET("/skills", h.ListSkills) + group.POST("/skills", h.UpsertSkills) + group.DELETE("/skills", h.DeleteSkills) + group.POST("/fs/:id", h.HandleMCPFS) } // CreateContainer godoc @@ -67,12 +116,18 @@ func (h *ContainerdHandler) Register(e *echo.Echo) { // @Failure 500 {object} ErrorResponse // @Router /mcp/containers [post] func (h *ContainerdHandler) CreateContainer(c echo.Context) error { + userID, err := h.requireUserID(c) + if err != nil { + return err + } + var req CreateContainerRequest if err := c.Bind(&req); err != nil { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) } - if strings.TrimSpace(req.ContainerID) == "" { - return echo.NewHTTPError(http.StatusBadRequest, "container_id is required") + req.ContainerID = strings.TrimSpace(req.ContainerID) + if req.ContainerID == "" { + req.ContainerID = uuid.NewString() } image := strings.TrimSpace(req.Image) @@ -86,24 +141,62 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error { if snapshotter == "" { snapshotter = h.cfg.Snapshotter } - if snapshotter == "" { - snapshotter = "overlayfs" + + ctx := c.Request().Context() + if strings.TrimSpace(h.namespace) != "" { + ctx = namespaces.WithNamespace(ctx, h.namespace) + } + dataRoot := strings.TrimSpace(h.cfg.DataRoot) + if dataRoot == "" { + dataRoot = config.DefaultDataRoot + } + dataMount := strings.TrimSpace(h.cfg.DataMount) + if dataMount == "" { + dataMount = config.DefaultDataMount + } + dataDir := filepath.Join(dataRoot, "users", userID) + if err := os.MkdirAll(dataDir, 0o755); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + if err := os.MkdirAll(filepath.Join(dataDir, ".skills"), 0o755); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) } - _, err := h.service.CreateContainer(c.Request().Context(), ctr.CreateContainerRequest{ + specOpts := []oci.SpecOpts{ + oci.WithMounts([]specs.Mount{{ + Destination: dataMount, + Type: "bind", + Source: dataDir, + Options: []string{"rbind", "rw"}, + }}), + oci.WithProcessArgs("/bin/sh", "-lc", "sleep 2147483647"), + } + + _, err = h.service.CreateContainer(ctx, ctr.CreateContainerRequest{ ID: req.ContainerID, ImageRef: image, Snapshotter: snapshotter, + Labels: map[string]string{ + mcp.UserLabelKey: userID, + }, + SpecOpts: specOpts, }) if err != nil && !errdefs.IsAlreadyExists(err) { return echo.NewHTTPError(http.StatusInternalServerError, "snapshotter="+snapshotter+" image="+image+" err="+err.Error()) } started := false + fifoDir, err := h.taskFIFODir() + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } if _, err := h.service.StartTask(c.Request().Context(), req.ContainerID, &ctr.StartTaskOptions{ UseStdio: false, + FIFODir: fifoDir, }); err == nil { started = true + } else { + log.Printf("mcp container start failed: id=%s err=%v", req.ContainerID, err) } return c.JSON(http.StatusOK, CreateContainerResponse{ @@ -114,6 +207,111 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error { }) } +func (h *ContainerdHandler) taskFIFODir() (string, error) { + if homeDir, err := os.UserHomeDir(); err == nil && homeDir != "" { + fifoDir := filepath.Join(homeDir, ".memoh", "containerd-fifo") + if err := os.MkdirAll(fifoDir, 0o755); err != nil { + return "", err + } + return fifoDir, nil + } + fifoDir := "/tmp/memoh-containerd-fifo" + if err := os.MkdirAll(fifoDir, 0o755); err != nil { + return "", err + } + return fifoDir, nil +} + +func (h *ContainerdHandler) ensureTaskRunning(ctx context.Context, containerID string) error { + tasks, err := h.service.ListTasks(ctx, &ctr.ListTasksOptions{ + Filter: "container.id==" + containerID, + }) + if err != nil { + return err + } + if len(tasks) > 0 { + if tasks[0].Status == tasktypes.Status_RUNNING { + return nil + } + _ = h.service.DeleteTask(ctx, containerID, &ctr.DeleteTaskOptions{Force: true}) + } + + fifoDir, err := h.taskFIFODir() + if err != nil { + return err + } + _, err = h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{ + UseStdio: false, + FIFODir: fifoDir, + }) + return err +} + +func (h *ContainerdHandler) userContainerID(ctx context.Context, userID string) (string, error) { + containers, err := h.service.ListContainersByLabel(ctx, mcp.UserLabelKey, userID) + if err != nil { + return "", err + } + if len(containers) == 0 { + return "", echo.NewHTTPError(http.StatusNotFound, "container not found") + } + infoCtx := ctx + if strings.TrimSpace(h.namespace) != "" { + infoCtx = namespaces.WithNamespace(ctx, h.namespace) + } + bestID := "" + var bestUpdated time.Time + for _, container := range containers { + info, err := container.Info(infoCtx) + if err != nil { + return "", err + } + if bestID == "" || info.UpdatedAt.After(bestUpdated) { + bestID = info.ID + bestUpdated = info.UpdatedAt + } + } + return bestID, nil +} + +// ListContainers godoc +// @Summary List containers +// @Tags containerd +// @Success 200 {object} ListContainersResponse +// @Failure 500 {object} ErrorResponse +// @Router /mcp/containers [get] +func (h *ContainerdHandler) ListContainers(c echo.Context) error { + ctx := c.Request().Context() + containers, err := h.service.ListContainers(ctx) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + infoCtx := ctx + if strings.TrimSpace(h.namespace) != "" { + infoCtx = namespaces.WithNamespace(ctx, h.namespace) + } + items := make([]ContainerInfo, 0, len(containers)) + for _, container := range containers { + info, err := container.Info(infoCtx) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + items = append(items, ContainerInfo{ + ID: info.ID, + Image: info.Image, + Snapshotter: info.Snapshotter, + SnapshotKey: info.SnapshotKey, + CreatedAt: info.CreatedAt, + UpdatedAt: info.UpdatedAt, + Labels: info.Labels, + }) + } + sort.Slice(items, func(i, j int) bool { + return items[i].ID < items[j].ID + }) + return c.JSON(http.StatusOK, ListContainersResponse{Containers: items}) +} + // DeleteContainer godoc // @Summary Delete MCP container // @Tags containerd @@ -186,3 +384,46 @@ func (h *ContainerdHandler) CreateSnapshot(c echo.Context) error { Snapshotter: info.Snapshotter, }) } + +// ListSnapshots godoc +// @Summary List snapshots +// @Tags containerd +// @Param snapshotter query string false "Snapshotter name" +// @Success 200 {object} ListSnapshotsResponse +// @Failure 500 {object} ErrorResponse +// @Router /mcp/snapshots [get] +func (h *ContainerdHandler) ListSnapshots(c echo.Context) error { + snapshotter := strings.TrimSpace(c.QueryParam("snapshotter")) + if snapshotter == "" { + snapshotter = strings.TrimSpace(h.cfg.Snapshotter) + } + if snapshotter == "" { + snapshotter = "overlayfs" + } + snapshots, err := h.service.ListSnapshots(c.Request().Context(), snapshotter) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + items := make([]SnapshotInfo, 0, len(snapshots)) + for _, info := range snapshots { + items = append(items, SnapshotInfo{ + Snapshotter: snapshotter, + Name: info.Name, + Parent: info.Parent, + Kind: info.Kind.String(), + CreatedAt: info.Created, + UpdatedAt: info.Updated, + Labels: info.Labels, + }) + } + sort.Slice(items, func(i, j int) bool { + if items[i].CreatedAt.Equal(items[j].CreatedAt) { + return items[i].Name < items[j].Name + } + return items[i].CreatedAt.Before(items[j].CreatedAt) + }) + return c.JSON(http.StatusOK, ListSnapshotsResponse{ + Snapshotter: snapshotter, + Snapshots: items, + }) +} diff --git a/internal/handlers/fs.go b/internal/handlers/fs.go new file mode 100644 index 00000000..b1c70cae --- /dev/null +++ b/internal/handlers/fs.go @@ -0,0 +1,462 @@ +package handlers + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os/exec" + "runtime" + "strings" + "sync" + "time" + + "github.com/containerd/containerd/v2/pkg/namespaces" + "github.com/containerd/errdefs" + "github.com/labstack/echo/v4" + + "github.com/memohai/memoh/internal/auth" + ctr "github.com/memohai/memoh/internal/containerd" + "github.com/memohai/memoh/internal/identity" + mcptools "github.com/memohai/memoh/internal/mcp" +) + +// HandleMCPFS godoc +// @Summary MCP filesystem tools (JSON-RPC) +// @Description Forwards MCP JSON-RPC requests to the MCP server inside the container. +// @Description Required: +// @Description - container task is running +// @Description - container has data mount (default /data) bound to /users/ +// @Description - container image contains the "mcp" binary +// @Description Auth: Bearer JWT is used to determine user_id (sub or user_id). +// @Description Paths must be relative (no leading slash) and must not contain "..". +// @Description +// @Description Example: tools/list +// @Description {"jsonrpc":"2.0","id":1,"method":"tools/list"} +// @Description +// @Description Example: tools/call (fs.read) +// @Description {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"fs.read","arguments":{"path":"notes.txt"}}} +// @Tags containerd +// @Param Authorization header string true "Bearer " +// @Param id path string true "Container ID" +// @Param payload body object true "JSON-RPC request" +// @Success 200 {object} object "JSON-RPC response: {jsonrpc,id,result|error}" +// @Failure 400 {object} ErrorResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /mcp/fs/{id} [post] +func (h *ContainerdHandler) HandleMCPFS(c echo.Context) error { + containerID := strings.TrimSpace(c.Param("id")) + if containerID == "" { + return echo.NewHTTPError(http.StatusBadRequest, "container id is required") + } + + var req mcptools.JSONRPCRequest + if err := c.Bind(&req); err != nil { + return echo.NewHTTPError(http.StatusBadRequest, err.Error()) + } + if req.JSONRPC != "" && req.JSONRPC != "2.0" { + return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: &mcptools.JSONRPCError{Code: -32600, Message: "invalid jsonrpc version"}, + }) + } + + userID, err := h.requireUserID(c) + if err != nil { + return err + } + if err := h.validateMCPContainer(c.Request().Context(), containerID, userID); err != nil { + return err + } + if err := h.ensureTaskRunning(c.Request().Context(), containerID); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + + switch req.Method { + case "tools/list": + payload, err := h.callMCPServer(c.Request().Context(), containerID, req) + if err != nil { + return err + } + return c.JSON(http.StatusOK, payload) + case "tools/call": + payload, err := h.callMCPServer(c.Request().Context(), containerID, req) + if err != nil { + return err + } + return c.JSON(http.StatusOK, payload) + default: + return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: &mcptools.JSONRPCError{Code: -32601, Message: "method not found"}, + }) + } +} + +func (h *ContainerdHandler) validateMCPContainer(ctx context.Context, containerID, userID string) error { + if strings.TrimSpace(userID) == "" { + return echo.NewHTTPError(http.StatusUnauthorized, "invalid token") + } + container, err := h.service.GetContainer(ctx, containerID) + if err != nil { + if errdefs.IsNotFound(err) { + return echo.NewHTTPError(http.StatusNotFound, "container not found") + } + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + + infoCtx := ctx + if strings.TrimSpace(h.namespace) != "" { + infoCtx = namespaces.WithNamespace(ctx, h.namespace) + } + info, err := container.Info(infoCtx) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + labelUserID := strings.TrimSpace(info.Labels[mcptools.UserLabelKey]) + if labelUserID != "" && labelUserID != userID { + return echo.NewHTTPError(http.StatusForbidden, "user mismatch") + } + return nil +} + +func (h *ContainerdHandler) requireUserID(c echo.Context) (string, error) { + userID, err := auth.UserIDFromContext(c) + if err != nil { + return "", err + } + if err := identity.ValidateUserID(userID); err != nil { + return "", echo.NewHTTPError(http.StatusBadRequest, err.Error()) + } + return userID, nil +} + +func (h *ContainerdHandler) callMCPServer(ctx context.Context, containerID string, req mcptools.JSONRPCRequest) (map[string]any, error) { + session, err := h.getMCPSession(ctx, containerID) + if err != nil { + return nil, err + } + return session.call(ctx, req) +} + +type mcpSession struct { + stdin io.WriteCloser + stdout io.ReadCloser + stderr io.ReadCloser + cmd *exec.Cmd + initOnce sync.Once + writeMu sync.Mutex + pendingMu sync.Mutex + pending map[string]chan mcptools.JSONRPCResponse + closed chan struct{} + closeOnce sync.Once + closeErr error + onClose func() +} + +func (h *ContainerdHandler) getMCPSession(ctx context.Context, containerID string) (*mcpSession, error) { + h.mcpMu.Lock() + if sess, ok := h.mcpSess[containerID]; ok { + h.mcpMu.Unlock() + return sess, nil + } + h.mcpMu.Unlock() + + var sess *mcpSession + var err error + if runtime.GOOS == "darwin" { + sess, err = h.startLimaMCPSession(containerID) + } + if err != nil || sess == nil { + sess, err = h.startContainerdMCPSession(ctx, containerID) + if err != nil { + return nil, err + } + } + + h.mcpMu.Lock() + h.mcpSess[containerID] = sess + h.mcpMu.Unlock() + + sess.onClose = func() { + h.mcpMu.Lock() + if current, ok := h.mcpSess[containerID]; ok && current == sess { + delete(h.mcpSess, containerID) + } + h.mcpMu.Unlock() + } + + return sess, nil +} + +func (h *ContainerdHandler) startContainerdMCPSession(ctx context.Context, containerID string) (*mcpSession, error) { + execSession, err := h.service.ExecTaskStreaming(ctx, containerID, ctr.ExecTaskRequest{ + Args: []string{"/mcp"}, + }) + if err != nil { + return nil, err + } + + sess := &mcpSession{ + stdin: execSession.Stdin, + stdout: execSession.Stdout, + stderr: execSession.Stderr, + pending: make(map[string]chan mcptools.JSONRPCResponse), + closed: make(chan struct{}), + } + + go sess.readLoop() + go func() { + _, err := execSession.Wait() + if err != nil { + sess.closeWithError(err) + } else { + sess.closeWithError(io.EOF) + } + }() + + return sess, nil +} + +func (h *ContainerdHandler) startLimaMCPSession(containerID string) (*mcpSession, error) { + execID := fmt.Sprintf("mcp-%d", time.Now().UnixNano()) + cmd := exec.Command( + "limactl", + "shell", + "--tty=false", + "default", + "--", + "sudo", + "-n", + "ctr", + "-n", + "default", + "tasks", + "exec", + "--exec-id", + execID, + containerID, + "/mcp", + ) + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + _ = stdin.Close() + return nil, err + } + stderr, err := cmd.StderrPipe() + if err != nil { + _ = stdin.Close() + _ = stdout.Close() + return nil, err + } + if err := cmd.Start(); err != nil { + _ = stdin.Close() + _ = stdout.Close() + _ = stderr.Close() + return nil, err + } + + sess := &mcpSession{ + stdin: stdin, + stdout: stdout, + stderr: stderr, + cmd: cmd, + pending: make(map[string]chan mcptools.JSONRPCResponse), + closed: make(chan struct{}), + } + + go sess.readLoop() + go func() { + if err := cmd.Wait(); err != nil { + sess.closeWithError(err) + } else { + sess.closeWithError(io.EOF) + } + }() + + return sess, nil +} + +func (s *mcpSession) closeWithError(err error) { + s.closeOnce.Do(func() { + s.closeErr = err + close(s.closed) + s.pendingMu.Lock() + for _, ch := range s.pending { + close(ch) + } + s.pending = map[string]chan mcptools.JSONRPCResponse{} + s.pendingMu.Unlock() + _ = s.stdin.Close() + _ = s.stdout.Close() + _ = s.stderr.Close() + if s.cmd != nil && s.cmd.Process != nil { + _ = s.cmd.Process.Kill() + } + if s.onClose != nil { + s.onClose() + } + }) +} + +func (s *mcpSession) readLoop() { + scanner := bufio.NewScanner(s.stdout) + scanner.Buffer(make([]byte, 0, 64*1024), 8*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + var resp mcptools.JSONRPCResponse + if err := json.Unmarshal([]byte(line), &resp); err != nil { + continue + } + id := strings.TrimSpace(string(resp.ID)) + if id == "" { + continue + } + s.pendingMu.Lock() + ch, ok := s.pending[id] + if ok { + delete(s.pending, id) + } + s.pendingMu.Unlock() + if ok { + ch <- resp + close(ch) + } + } + if err := scanner.Err(); err != nil { + s.closeWithError(err) + } else { + s.closeWithError(io.EOF) + } +} + +func (s *mcpSession) call(ctx context.Context, req mcptools.JSONRPCRequest) (map[string]any, error) { + payloads, targetID, err := buildMCPPayloads(req, &s.initOnce) + if err != nil { + return nil, err + } + target := strings.TrimSpace(string(targetID)) + if target == "" { + return nil, fmt.Errorf("missing request id") + } + + respCh := make(chan mcptools.JSONRPCResponse, 1) + s.pendingMu.Lock() + s.pending[target] = respCh + s.pendingMu.Unlock() + + s.writeMu.Lock() + for _, payload := range payloads { + if _, err := s.stdin.Write([]byte(payload + "\n")); err != nil { + s.writeMu.Unlock() + return nil, err + } + } + s.writeMu.Unlock() + + select { + case resp, ok := <-respCh: + if !ok { + if s.closeErr != nil { + return nil, s.closeErr + } + return nil, io.EOF + } + if resp.Error != nil { + return map[string]any{ + "jsonrpc": "2.0", + "id": resp.ID, + "error": map[string]any{ + "code": resp.Error.Code, + "message": resp.Error.Message, + }, + }, nil + } + return map[string]any{ + "jsonrpc": "2.0", + "id": resp.ID, + "result": resp.Result, + }, nil + case <-s.closed: + if s.closeErr != nil { + return nil, s.closeErr + } + return nil, io.EOF + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func buildMCPPayloads(req mcptools.JSONRPCRequest, initOnce *sync.Once) ([]string, json.RawMessage, error) { + if req.JSONRPC == "" { + req.JSONRPC = "2.0" + } + targetID := req.ID + payloads := []string{} + shouldInit := req.Method != "initialize" && req.Method != "notifications/initialized" + if initOnce != nil { + ran := false + initOnce.Do(func() { + ran = true + }) + if ran { + // This is the first call on the session. + } else { + shouldInit = false + } + } + if shouldInit { + initReq := map[string]any{ + "jsonrpc": "2.0", + "id": "init-1", + "method": "initialize", + "params": map[string]any{ + "protocolVersion": "2025-06-18", + "capabilities": map[string]any{ + "roots": map[string]any{ + "listChanged": false, + }, + }, + "clientInfo": map[string]any{ + "name": "memoh-http-proxy", + "version": "v0", + }, + }, + } + initBytes, err := json.Marshal(initReq) + if err != nil { + return nil, nil, err + } + payloads = append(payloads, string(initBytes)) + + initialized := map[string]any{ + "jsonrpc": "2.0", + "method": "notifications/initialized", + } + initializedBytes, err := json.Marshal(initialized) + if err != nil { + return nil, nil, err + } + payloads = append(payloads, string(initializedBytes)) + } + + reqBytes, err := json.Marshal(req) + if err != nil { + return nil, nil, err + } + payloads = append(payloads, string(reqBytes)) + return payloads, targetID, nil +} diff --git a/internal/handlers/skills.go b/internal/handlers/skills.go new file mode 100644 index 00000000..8a50da2d --- /dev/null +++ b/internal/handlers/skills.go @@ -0,0 +1,342 @@ +package handlers + +import ( + "context" + "errors" + "net/http" + "os" + "path" + "strconv" + "strings" + "time" + + "github.com/labstack/echo/v4" + + "github.com/memohai/memoh/internal/config" + mcptools "github.com/memohai/memoh/internal/mcp" +) + +type SkillItem struct { + Name string `json:"name"` + Description string `json:"description"` + Content string `json:"content"` +} + +type SkillsResponse struct { + Skills []SkillItem `json:"skills"` +} + +type SkillsUpsertRequest struct { + Skills []SkillItem `json:"skills"` +} + +type SkillsDeleteRequest struct { + Names []string `json:"names"` +} + +type skillsOpResponse struct { + OK bool `json:"ok"` +} + +// ListSkills godoc +// @Summary List skills from container +// @Tags containerd +// @Success 200 {object} SkillsResponse +// @Failure 400 {object} ErrorResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /mcp/skills [get] +func (h *ContainerdHandler) ListSkills(c echo.Context) error { + userID, err := h.requireUserID(c) + if err != nil { + return err + } + ctx := c.Request().Context() + containerID, err := h.userContainerID(ctx, userID) + if err != nil { + return err + } + if err := h.ensureTaskRunning(ctx, containerID); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + if err := h.ensureSkillsDirHost(userID); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + + listPayload, err := h.callMCPTool(ctx, containerID, "fs.list", map[string]any{ + "path": ".skills", + "recursive": false, + }) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + entries, err := extractListEntries(listPayload) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + + skills := make([]SkillItem, 0, len(entries)) + for _, entry := range entries { + skillPath, name := skillPathForEntry(entry) + if skillPath == "" { + continue + } + content, err := h.readSkillFile(ctx, containerID, skillPath) + if err != nil { + continue + } + skills = append(skills, SkillItem{ + Name: name, + Description: skillDescription(content), + Content: content, + }) + } + + return c.JSON(http.StatusOK, SkillsResponse{Skills: skills}) +} + +// UpsertSkills godoc +// @Summary Upload skills into container +// @Tags containerd +// @Param payload body SkillsUpsertRequest true "Skills payload" +// @Success 200 {object} skillsOpResponse +// @Failure 400 {object} ErrorResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /mcp/skills [post] +func (h *ContainerdHandler) UpsertSkills(c echo.Context) error { + userID, err := h.requireUserID(c) + if err != nil { + return err + } + var req SkillsUpsertRequest + if err := c.Bind(&req); err != nil { + return echo.NewHTTPError(http.StatusBadRequest, err.Error()) + } + if len(req.Skills) == 0 { + return echo.NewHTTPError(http.StatusBadRequest, "skills is required") + } + + ctx := c.Request().Context() + containerID, err := h.userContainerID(ctx, userID) + if err != nil { + return err + } + if err := h.ensureTaskRunning(ctx, containerID); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + for _, skill := range req.Skills { + name := strings.TrimSpace(skill.Name) + if !isValidSkillName(name) { + return echo.NewHTTPError(http.StatusBadRequest, "invalid skill name") + } + content := strings.TrimSpace(skill.Content) + if content == "" { + content = buildSkillContent(name, strings.TrimSpace(skill.Description)) + } + filePath := path.Join(".skills", name, "SKILL.md") + if _, err := h.callMCPTool(ctx, containerID, "fs.write", map[string]any{ + "path": filePath, + "content": content, + }); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + } + + return c.JSON(http.StatusOK, skillsOpResponse{OK: true}) +} + +// DeleteSkills godoc +// @Summary Delete skills from container +// @Tags containerd +// @Param payload body SkillsDeleteRequest true "Delete skills payload" +// @Success 200 {object} skillsOpResponse +// @Failure 400 {object} ErrorResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /mcp/skills [delete] +func (h *ContainerdHandler) DeleteSkills(c echo.Context) error { + userID, err := h.requireUserID(c) + if err != nil { + return err + } + var req SkillsDeleteRequest + if err := c.Bind(&req); err != nil { + return echo.NewHTTPError(http.StatusBadRequest, err.Error()) + } + if len(req.Names) == 0 { + return echo.NewHTTPError(http.StatusBadRequest, "names is required") + } + + ctx := c.Request().Context() + containerID, err := h.userContainerID(ctx, userID) + if err != nil { + return err + } + if err := h.ensureTaskRunning(ctx, containerID); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + + for _, name := range req.Names { + skillName := strings.TrimSpace(name) + if !isValidSkillName(skillName) { + return echo.NewHTTPError(http.StatusBadRequest, "invalid skill name") + } + deletePath := path.Join(".skills", skillName) + if _, err := h.callMCPTool(ctx, containerID, "fs.delete", map[string]any{ + "path": deletePath, + }); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + } + + return c.JSON(http.StatusOK, skillsOpResponse{OK: true}) +} + +func (h *ContainerdHandler) ensureSkillsDirHost(userID string) error { + dataRoot := strings.TrimSpace(h.cfg.DataRoot) + if dataRoot == "" { + dataRoot = config.DefaultDataRoot + } + skillsDir := path.Join(dataRoot, "users", userID, ".skills") + return os.MkdirAll(skillsDir, 0o755) +} + +func (h *ContainerdHandler) readSkillFile(ctx context.Context, containerID, filePath string) (string, error) { + payload, err := h.callMCPTool(ctx, containerID, "fs.read", map[string]any{ + "path": filePath, + }) + if err != nil { + return "", err + } + content, err := extractContentString(payload) + if err != nil { + return "", err + } + return content, nil +} + +func (h *ContainerdHandler) callMCPTool(ctx context.Context, containerID, toolName string, args map[string]any) (map[string]any, error) { + id := "skills-" + strconv.FormatInt(time.Now().UnixNano(), 10) + req, err := mcptools.NewToolCallRequest(id, toolName, args) + if err != nil { + return nil, err + } + payload, err := h.callMCPServer(ctx, containerID, req) + if err != nil { + return nil, err + } + if err := mcptools.PayloadError(payload); err != nil { + return nil, err + } + if err := mcptools.ResultError(payload); err != nil { + return nil, err + } + return payload, nil +} + +func extractListEntries(payload map[string]any) ([]skillEntry, error) { + result, err := mcptools.StructuredContent(payload) + if err != nil { + return nil, err + } + rawEntries, ok := result["entries"].([]any) + if !ok { + return nil, errors.New("invalid list response") + } + entries := make([]skillEntry, 0, len(rawEntries)) + for _, raw := range rawEntries { + entryMap, ok := raw.(map[string]any) + if !ok { + continue + } + entryPath, _ := entryMap["path"].(string) + if entryPath == "" { + continue + } + isDir, _ := entryMap["is_dir"].(bool) + entries = append(entries, skillEntry{Path: entryPath, IsDir: isDir}) + } + return entries, nil +} + +type skillEntry struct { + Path string + IsDir bool +} + +func extractContentString(payload map[string]any) (string, error) { + result, err := mcptools.StructuredContent(payload) + if err != nil { + return "", err + } + content, _ := result["content"].(string) + if content == "" { + return "", errors.New("empty content") + } + return content, nil +} + +func skillNameFromPath(rel string) string { + if rel == "" || rel == "SKILL.md" { + return "default" + } + parent := path.Dir(rel) + if parent == "." { + return "default" + } + return path.Base(parent) +} + +func skillPathForEntry(entry skillEntry) (string, string) { + rel := strings.TrimPrefix(entry.Path, ".skills/") + if rel == entry.Path { + rel = strings.TrimPrefix(entry.Path, "./.skills/") + } + if entry.IsDir { + name := path.Base(rel) + if name == "." || name == "" { + return "", "" + } + return path.Join(".skills", name, "SKILL.md"), name + } + if path.Base(rel) == "SKILL.md" { + return path.Join(".skills", "SKILL.md"), skillNameFromPath(rel) + } + return "", "" +} + +func skillDescription(content string) string { + lines := strings.Split(content, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + if strings.HasPrefix(line, "#") { + return strings.TrimSpace(strings.TrimPrefix(line, "#")) + } + return line + } + return "" +} + +func buildSkillContent(name, description string) string { + if description == "" { + return "# " + name + } + return "# " + name + "\n\n" + description +} + +func isValidSkillName(name string) bool { + if name == "" { + return false + } + if strings.Contains(name, "..") { + return false + } + if strings.Contains(name, "/") || strings.Contains(name, "\\") { + return false + } + return true +} diff --git a/internal/mcp/service.go b/internal/mcp/service.go new file mode 100644 index 00000000..ba055c4b --- /dev/null +++ b/internal/mcp/service.go @@ -0,0 +1,105 @@ +package mcp + +import ( + "encoding/json" + "errors" + "strconv" +) + +type JSONRPCRequest struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params,omitempty"` +} + +type JSONRPCResponse struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id,omitempty"` + Result any `json:"result,omitempty"` + Error *JSONRPCError `json:"error,omitempty"` +} + +type JSONRPCError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func NewToolCallRequest(id string, toolName string, args map[string]any) (JSONRPCRequest, error) { + params := map[string]any{ + "name": toolName, + "arguments": args, + } + rawParams, err := json.Marshal(params) + if err != nil { + return JSONRPCRequest{}, err + } + return JSONRPCRequest{ + JSONRPC: "2.0", + ID: RawStringID(id), + Method: "tools/call", + Params: rawParams, + }, nil +} + +func RawStringID(id string) json.RawMessage { + return json.RawMessage([]byte(strconv.Quote(id))) +} + +func PayloadError(payload map[string]any) error { + if payload == nil { + return errors.New("empty payload") + } + if errObj, ok := payload["error"].(map[string]any); ok { + if msg, ok := errObj["message"].(string); ok && msg != "" { + return errors.New(msg) + } + return errors.New("mcp error") + } + return nil +} + +func ResultError(payload map[string]any) error { + result, ok := payload["result"].(map[string]any) + if !ok { + return nil + } + if isErr, ok := result["isError"].(bool); ok && isErr { + msg := ContentText(result) + if msg == "" { + msg = "mcp tool error" + } + return errors.New(msg) + } + return nil +} + +func StructuredContent(payload map[string]any) (map[string]any, error) { + result, ok := payload["result"].(map[string]any) + if !ok { + return nil, errors.New("missing result") + } + if structured, ok := result["structuredContent"].(map[string]any); ok { + return structured, nil + } + if content := ContentText(result); content != "" { + var out map[string]any + if err := json.Unmarshal([]byte(content), &out); err == nil { + return out, nil + } + } + return nil, errors.New("missing structured content") +} + +func ContentText(result map[string]any) string { + rawContent, ok := result["content"].([]any) + if !ok || len(rawContent) == 0 { + return "" + } + first, ok := rawContent[0].(map[string]any) + if !ok { + return "" + } + text, _ := first["text"].(string) + return text +} diff --git a/internal/server/server.go b/internal/server/server.go index 706e5f64..a40d7880 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -29,9 +29,6 @@ func NewServer(addr string, jwtSecret string, pingHandler *handlers.PingHandler, if path == "/ping" || path == "/api/swagger.json" || path == "/auth/login" { return true } - if strings.HasPrefix(path, "/mcp/") { - return true - } if strings.HasPrefix(path, "/api/docs") { return true }