Merge pull request #18 from chen-ran/main

feat: mcp fs operate
This commit is contained in:
晨苒
2026-02-01 02:31:39 +08:00
committed by GitHub
10 changed files with 2422 additions and 48 deletions
+366
View File
@@ -386,6 +386,26 @@ const docTemplate = `{
}
},
"/mcp/containers": {
"get": {
"tags": [
"containerd"
],
"summary": "List containers",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.ListContainersResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"post": {
"tags": [
"containerd"
@@ -464,7 +484,215 @@ const docTemplate = `{
}
}
},
"/mcp/fs/{id}": {
"post": {
"description": "Forwards MCP JSON-RPC requests to the MCP server inside the container.\nRequired:\n- container task is running\n- container has data mount (default /data) bound to \u003cdata_root\u003e/users/\u003cuser_id\u003e\n- container image contains the \"mcp\" binary\nAuth: Bearer JWT is used to determine user_id (sub or user_id).\nPaths must be relative (no leading slash) and must not contain \"..\".\n\nExample: tools/list\n{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\"}\n\nExample: tools/call (fs.read)\n{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"tools/call\",\"params\":{\"name\":\"fs.read\",\"arguments\":{\"path\":\"notes.txt\"}}}",
"tags": [
"containerd"
],
"summary": "MCP filesystem tools (JSON-RPC)",
"parameters": [
{
"type": "string",
"description": "Bearer \u003ctoken\u003e",
"name": "Authorization",
"in": "header",
"required": true
},
{
"type": "string",
"description": "Container ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "JSON-RPC request",
"name": "payload",
"in": "body",
"required": true,
"schema": {
"type": "object"
}
}
],
"responses": {
"200": {
"description": "JSON-RPC response: {jsonrpc,id,result|error}",
"schema": {
"type": "object"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
}
},
"/mcp/skills": {
"get": {
"tags": [
"containerd"
],
"summary": "List skills from container",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.SkillsResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"post": {
"tags": [
"containerd"
],
"summary": "Upload skills into container",
"parameters": [
{
"description": "Skills payload",
"name": "payload",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/handlers.SkillsUpsertRequest"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.skillsOpResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"delete": {
"tags": [
"containerd"
],
"summary": "Delete skills from container",
"parameters": [
{
"description": "Delete skills payload",
"name": "payload",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/handlers.SkillsDeleteRequest"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.skillsOpResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
}
},
"/mcp/snapshots": {
"get": {
"tags": [
"containerd"
],
"summary": "List snapshots",
"parameters": [
{
"type": "string",
"description": "Snapshotter name",
"name": "snapshotter",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.ListSnapshotsResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"post": {
"tags": [
"containerd"
@@ -2428,6 +2656,35 @@ const docTemplate = `{
"type": "object",
"additionalProperties": true
},
"handlers.ContainerInfo": {
"type": "object",
"properties": {
"created_at": {
"type": "string"
},
"id": {
"type": "string"
},
"image": {
"type": "string"
},
"labels": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"snapshot_key": {
"type": "string"
},
"snapshotter": {
"type": "string"
},
"updated_at": {
"type": "string"
}
}
},
"handlers.CreateContainerRequest": {
"type": "object",
"properties": {
@@ -2569,6 +2826,31 @@ const docTemplate = `{
}
}
},
"handlers.ListContainersResponse": {
"type": "object",
"properties": {
"containers": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.ContainerInfo"
}
}
}
},
"handlers.ListSnapshotsResponse": {
"type": "object",
"properties": {
"snapshots": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.SnapshotInfo"
}
},
"snapshotter": {
"type": "string"
}
}
},
"handlers.LoginRequest": {
"type": "object",
"properties": {
@@ -2606,6 +2888,90 @@ const docTemplate = `{
}
}
},
"handlers.SkillItem": {
"type": "object",
"properties": {
"content": {
"type": "string"
},
"description": {
"type": "string"
},
"name": {
"type": "string"
}
}
},
"handlers.SkillsDeleteRequest": {
"type": "object",
"properties": {
"names": {
"type": "array",
"items": {
"type": "string"
}
}
}
},
"handlers.SkillsResponse": {
"type": "object",
"properties": {
"skills": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.SkillItem"
}
}
}
},
"handlers.SkillsUpsertRequest": {
"type": "object",
"properties": {
"skills": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.SkillItem"
}
}
}
},
"handlers.SnapshotInfo": {
"type": "object",
"properties": {
"created_at": {
"type": "string"
},
"kind": {
"type": "string"
},
"labels": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"name": {
"type": "string"
},
"parent": {
"type": "string"
},
"snapshotter": {
"type": "string"
},
"updated_at": {
"type": "string"
}
}
},
"handlers.skillsOpResponse": {
"type": "object",
"properties": {
"ok": {
"type": "boolean"
}
}
},
"history.CreateRequest": {
"type": "object",
"properties": {
+366
View File
@@ -377,6 +377,26 @@
}
},
"/mcp/containers": {
"get": {
"tags": [
"containerd"
],
"summary": "List containers",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.ListContainersResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"post": {
"tags": [
"containerd"
@@ -455,7 +475,215 @@
}
}
},
"/mcp/fs/{id}": {
"post": {
"description": "Forwards MCP JSON-RPC requests to the MCP server inside the container.\nRequired:\n- container task is running\n- container has data mount (default /data) bound to \u003cdata_root\u003e/users/\u003cuser_id\u003e\n- container image contains the \"mcp\" binary\nAuth: Bearer JWT is used to determine user_id (sub or user_id).\nPaths must be relative (no leading slash) and must not contain \"..\".\n\nExample: tools/list\n{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\"}\n\nExample: tools/call (fs.read)\n{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"tools/call\",\"params\":{\"name\":\"fs.read\",\"arguments\":{\"path\":\"notes.txt\"}}}",
"tags": [
"containerd"
],
"summary": "MCP filesystem tools (JSON-RPC)",
"parameters": [
{
"type": "string",
"description": "Bearer \u003ctoken\u003e",
"name": "Authorization",
"in": "header",
"required": true
},
{
"type": "string",
"description": "Container ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "JSON-RPC request",
"name": "payload",
"in": "body",
"required": true,
"schema": {
"type": "object"
}
}
],
"responses": {
"200": {
"description": "JSON-RPC response: {jsonrpc,id,result|error}",
"schema": {
"type": "object"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
}
},
"/mcp/skills": {
"get": {
"tags": [
"containerd"
],
"summary": "List skills from container",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.SkillsResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"post": {
"tags": [
"containerd"
],
"summary": "Upload skills into container",
"parameters": [
{
"description": "Skills payload",
"name": "payload",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/handlers.SkillsUpsertRequest"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.skillsOpResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"delete": {
"tags": [
"containerd"
],
"summary": "Delete skills from container",
"parameters": [
{
"description": "Delete skills payload",
"name": "payload",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/handlers.SkillsDeleteRequest"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.skillsOpResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
}
},
"/mcp/snapshots": {
"get": {
"tags": [
"containerd"
],
"summary": "List snapshots",
"parameters": [
{
"type": "string",
"description": "Snapshotter name",
"name": "snapshotter",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.ListSnapshotsResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"post": {
"tags": [
"containerd"
@@ -2419,6 +2647,35 @@
"type": "object",
"additionalProperties": true
},
"handlers.ContainerInfo": {
"type": "object",
"properties": {
"created_at": {
"type": "string"
},
"id": {
"type": "string"
},
"image": {
"type": "string"
},
"labels": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"snapshot_key": {
"type": "string"
},
"snapshotter": {
"type": "string"
},
"updated_at": {
"type": "string"
}
}
},
"handlers.CreateContainerRequest": {
"type": "object",
"properties": {
@@ -2560,6 +2817,31 @@
}
}
},
"handlers.ListContainersResponse": {
"type": "object",
"properties": {
"containers": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.ContainerInfo"
}
}
}
},
"handlers.ListSnapshotsResponse": {
"type": "object",
"properties": {
"snapshots": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.SnapshotInfo"
}
},
"snapshotter": {
"type": "string"
}
}
},
"handlers.LoginRequest": {
"type": "object",
"properties": {
@@ -2597,6 +2879,90 @@
}
}
},
"handlers.SkillItem": {
"type": "object",
"properties": {
"content": {
"type": "string"
},
"description": {
"type": "string"
},
"name": {
"type": "string"
}
}
},
"handlers.SkillsDeleteRequest": {
"type": "object",
"properties": {
"names": {
"type": "array",
"items": {
"type": "string"
}
}
}
},
"handlers.SkillsResponse": {
"type": "object",
"properties": {
"skills": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.SkillItem"
}
}
}
},
"handlers.SkillsUpsertRequest": {
"type": "object",
"properties": {
"skills": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.SkillItem"
}
}
}
},
"handlers.SnapshotInfo": {
"type": "object",
"properties": {
"created_at": {
"type": "string"
},
"kind": {
"type": "string"
},
"labels": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"name": {
"type": "string"
},
"parent": {
"type": "string"
},
"snapshotter": {
"type": "string"
},
"updated_at": {
"type": "string"
}
}
},
"handlers.skillsOpResponse": {
"type": "object",
"properties": {
"ok": {
"type": "boolean"
}
}
},
"history.CreateRequest": {
"type": "object",
"properties": {
+251
View File
@@ -40,6 +40,25 @@ definitions:
chat.GatewayMessage:
additionalProperties: true
type: object
handlers.ContainerInfo:
properties:
created_at:
type: string
id:
type: string
image:
type: string
labels:
additionalProperties:
type: string
type: object
snapshot_key:
type: string
snapshotter:
type: string
updated_at:
type: string
type: object
handlers.CreateContainerRequest:
properties:
container_id:
@@ -131,6 +150,22 @@ definitions:
message:
type: string
type: object
handlers.ListContainersResponse:
properties:
containers:
items:
$ref: '#/definitions/handlers.ContainerInfo'
type: array
type: object
handlers.ListSnapshotsResponse:
properties:
snapshots:
items:
$ref: '#/definitions/handlers.SnapshotInfo'
type: array
snapshotter:
type: string
type: object
handlers.LoginRequest:
properties:
password:
@@ -155,6 +190,60 @@ definitions:
username:
type: string
type: object
handlers.SkillItem:
properties:
content:
type: string
description:
type: string
name:
type: string
type: object
handlers.SkillsDeleteRequest:
properties:
names:
items:
type: string
type: array
type: object
handlers.SkillsResponse:
properties:
skills:
items:
$ref: '#/definitions/handlers.SkillItem'
type: array
type: object
handlers.SkillsUpsertRequest:
properties:
skills:
items:
$ref: '#/definitions/handlers.SkillItem'
type: array
type: object
handlers.SnapshotInfo:
properties:
created_at:
type: string
kind:
type: string
labels:
additionalProperties:
type: string
type: object
name:
type: string
parent:
type: string
snapshotter:
type: string
updated_at:
type: string
type: object
handlers.skillsOpResponse:
properties:
ok:
type: boolean
type: object
history.CreateRequest:
properties:
messages:
@@ -918,6 +1007,19 @@ paths:
tags:
- history
/mcp/containers:
get:
responses:
"200":
description: OK
schema:
$ref: '#/definitions/handlers.ListContainersResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
summary: List containers
tags:
- containerd
post:
parameters:
- description: Create container payload
@@ -968,7 +1070,156 @@ paths:
summary: Delete MCP container
tags:
- containerd
/mcp/fs/{id}:
post:
description: |-
Forwards MCP JSON-RPC requests to the MCP server inside the container.
Required:
- container task is running
- container has data mount (default /data) bound to <data_root>/users/<user_id>
- container image contains the "mcp" binary
Auth: Bearer JWT is used to determine user_id (sub or user_id).
Paths must be relative (no leading slash) and must not contain "..".
Example: tools/list
{"jsonrpc":"2.0","id":1,"method":"tools/list"}
Example: tools/call (fs.read)
{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"fs.read","arguments":{"path":"notes.txt"}}}
parameters:
- description: Bearer <token>
in: header
name: Authorization
required: true
type: string
- description: Container ID
in: path
name: id
required: true
type: string
- description: JSON-RPC request
in: body
name: payload
required: true
schema:
type: object
responses:
"200":
description: 'JSON-RPC response: {jsonrpc,id,result|error}'
schema:
type: object
"400":
description: Bad Request
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"404":
description: Not Found
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
summary: MCP filesystem tools (JSON-RPC)
tags:
- containerd
/mcp/skills:
delete:
parameters:
- description: Delete skills payload
in: body
name: payload
required: true
schema:
$ref: '#/definitions/handlers.SkillsDeleteRequest'
responses:
"200":
description: OK
schema:
$ref: '#/definitions/handlers.skillsOpResponse'
"400":
description: Bad Request
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"404":
description: Not Found
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
summary: Delete skills from container
tags:
- containerd
get:
responses:
"200":
description: OK
schema:
$ref: '#/definitions/handlers.SkillsResponse'
"400":
description: Bad Request
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"404":
description: Not Found
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
summary: List skills from container
tags:
- containerd
post:
parameters:
- description: Skills payload
in: body
name: payload
required: true
schema:
$ref: '#/definitions/handlers.SkillsUpsertRequest'
responses:
"200":
description: OK
schema:
$ref: '#/definitions/handlers.skillsOpResponse'
"400":
description: Bad Request
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"404":
description: Not Found
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
summary: Upload skills into container
tags:
- containerd
/mcp/snapshots:
get:
parameters:
- description: Snapshotter name
in: query
name: snapshotter
type: string
responses:
"200":
description: OK
schema:
$ref: '#/definitions/handlers.ListSnapshotsResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
summary: List snapshots
tags:
- containerd
post:
parameters:
- description: Create snapshot payload
+7 -7
View File
@@ -26,13 +26,13 @@ const (
)
type Config struct {
Server ServerConfig `toml:"server"`
Admin AdminConfig `toml:"admin"`
Auth AuthConfig `toml:"auth"`
Containerd ContainerdConfig `toml:"containerd"`
MCP MCPConfig `toml:"mcp"`
Postgres PostgresConfig `toml:"postgres"`
Qdrant QdrantConfig `toml:"qdrant"`
Server ServerConfig `toml:"server"`
Admin AdminConfig `toml:"admin"`
Auth AuthConfig `toml:"auth"`
Containerd ContainerdConfig `toml:"containerd"`
MCP MCPConfig `toml:"mcp"`
Postgres PostgresConfig `toml:"postgres"`
Qdrant QdrantConfig `toml:"qdrant"`
AgentGateway AgentGatewayConfig `toml:"agent_gateway"`
}
+274 -30
View File
@@ -1,24 +1,33 @@
package containerd
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"syscall"
"strings"
"syscall"
"time"
tasksv1 "github.com/containerd/containerd/api/services/tasks/v1"
tasktypes "github.com/containerd/containerd/api/types/task"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/core/snapshots"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/errdefs"
"github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/errdefs"
"github.com/containerd/platforms"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/runtime-spec/specs-go"
)
@@ -71,6 +80,18 @@ type ExecTaskRequest struct {
WorkDir string
Terminal bool
UseStdio bool
FIFODir string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
type ExecTaskSession struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
Wait func() (ExecTaskResult, error)
Close func() error
}
type ExecTaskResult struct {
@@ -111,8 +132,10 @@ type Service interface {
StopTask(ctx context.Context, containerID string, opts *StopTaskOptions) error
DeleteTask(ctx context.Context, containerID string, opts *DeleteTaskOptions) error
ExecTask(ctx context.Context, containerID string, req ExecTaskRequest) (ExecTaskResult, error)
ExecTaskStreaming(ctx context.Context, containerID string, req ExecTaskRequest) (*ExecTaskSession, error)
ListContainersByLabel(ctx context.Context, key, value string) ([]containerd.Container, error)
CommitSnapshot(ctx context.Context, snapshotter, name, key string) error
ListSnapshots(ctx context.Context, snapshotter string) ([]snapshots.Info, error)
PrepareSnapshot(ctx context.Context, snapshotter, key, parent string) error
CreateContainerFromSnapshot(ctx context.Context, req CreateContainerRequest) (containerd.Container, error)
SnapshotMounts(ctx context.Context, snapshotter, key string) ([]mount.Mount, error)
@@ -181,6 +204,11 @@ func (s *DefaultService) CreateContainer(ctx context.Context, req CreateContaine
}
ctx = s.withNamespace(ctx)
ctx, done, err := s.client.WithLease(ctx)
if err != nil {
return nil, err
}
defer done(ctx)
image, err := s.getImageWithFallback(ctx, req.ImageRef)
if err != nil {
pullOpts := &PullImageOptions{
@@ -192,12 +220,6 @@ func (s *DefaultService) CreateContainer(ctx context.Context, req CreateContaine
return nil, err
}
}
if req.Snapshotter != "" {
if err := image.Unpack(ctx, req.Snapshotter); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
}
snapshotID := req.SnapshotID
if snapshotID == "" {
snapshotID = req.ID
@@ -213,20 +235,32 @@ func (s *DefaultService) CreateContainer(ctx context.Context, req CreateContaine
containerOpts := []containerd.NewContainerOpts{
containerd.WithImage(image),
containerd.WithNewSnapshot(snapshotID, image),
containerd.WithNewSpec(specOpts...),
}
runtimeName := s.client.Runtime()
if runtimeName == "" {
runtimeName = defaults.DefaultRuntime
if runtimeName == "" {
runtimeName = "io.containerd.runc.v2"
}
}
containerOpts = append(containerOpts, containerd.WithRuntime(runtimeName, nil))
if req.Snapshotter != "" {
containerOpts = append(containerOpts, containerd.WithSnapshotter(req.Snapshotter))
}
if req.Snapshotter != "" {
parent, err := s.snapshotParentFromLayers(ctx, image)
if err != nil {
return nil, err
}
ok, err := s.snapshotExists(ctx, req.Snapshotter, parent)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("parent snapshot %s does not exist", parent)
}
if err := s.prepareSnapshot(ctx, req.Snapshotter, snapshotID, parent); err != nil {
return nil, err
}
containerOpts = append(containerOpts, containerd.WithSnapshot(snapshotID))
} else {
containerOpts = append(containerOpts, containerd.WithNewSnapshot(snapshotID, image))
}
containerOpts = append(containerOpts, containerd.WithNewSpec(specOpts...))
runtimeName := "io.containerd.runc.v2"
containerOpts = append(containerOpts, containerd.WithRuntime(runtimeName, nil))
if len(req.Labels) > 0 {
containerOpts = append(containerOpts, containerd.WithContainerLabels(req.Labels))
}
@@ -234,6 +268,73 @@ func (s *DefaultService) CreateContainer(ctx context.Context, req CreateContaine
return s.client.NewContainer(ctx, req.ID, containerOpts...)
}
func (s *DefaultService) snapshotParentFromLayers(ctx context.Context, image containerd.Image) (string, error) {
manifest, err := images.Manifest(ctx, s.client.ContentStore(), image.Target(), platforms.Default())
if err != nil {
return "", err
}
if len(manifest.Layers) == 0 {
return "", fmt.Errorf("image has no layer descriptors")
}
diffIDs := make([]digest.Digest, 0, len(manifest.Layers))
for _, layer := range manifest.Layers {
blob, err := content.ReadBlob(ctx, s.client.ContentStore(), layer)
if err != nil {
return "", err
}
reader := bytes.NewReader(blob)
var r io.ReadCloser
if strings.Contains(layer.MediaType, "gzip") {
r, err = gzip.NewReader(reader)
if err != nil {
return "", err
}
} else {
r = io.NopCloser(reader)
}
digester := digest.Canonical.Digester()
if _, err := io.Copy(digester.Hash(), r); err != nil {
_ = r.Close()
return "", err
}
_ = r.Close()
diffIDs = append(diffIDs, digester.Digest())
}
chainIDs := identity.ChainIDs(diffIDs)
return chainIDs[len(chainIDs)-1].String(), nil
}
func (s *DefaultService) snapshotExists(ctx context.Context, snapshotter, key string) (bool, error) {
if snapshotter == "" || key == "" {
return false, ErrInvalidArgument
}
_, err := s.client.SnapshotService(snapshotter).Stat(ctx, key)
if err == nil {
return true, nil
}
if errdefs.IsNotFound(err) {
return false, nil
}
return false, err
}
func (s *DefaultService) prepareSnapshot(ctx context.Context, snapshotter, key, parent string) error {
if snapshotter == "" || key == "" || parent == "" {
return ErrInvalidArgument
}
sn := s.client.SnapshotService(snapshotter)
if _, err := sn.Stat(ctx, key); err == nil {
if err := sn.Remove(ctx, key); err != nil {
return err
}
} else if !errdefs.IsNotFound(err) {
return err
}
_, err := sn.Prepare(ctx, key, parent)
return err
}
func (s *DefaultService) getImageWithFallback(ctx context.Context, ref string) (containerd.Image, error) {
image, err := s.GetImage(ctx, ref)
if err == nil {
@@ -481,12 +582,20 @@ func (s *DefaultService) ExecTask(ctx context.Context, containerID string, req E
}
ioOpts := []cio.Opt{}
if req.UseStdio {
if req.Stdin != nil || req.Stdout != nil || req.Stderr != nil {
ioOpts = append(ioOpts, cio.WithStreams(req.Stdin, req.Stdout, req.Stderr))
} else if req.UseStdio {
ioOpts = append(ioOpts, cio.WithStdio)
}
if req.Terminal {
ioOpts = append(ioOpts, cio.WithTerminal)
}
if strings.TrimSpace(req.FIFODir) != "" {
if err := os.MkdirAll(req.FIFODir, 0o755); err != nil {
return ExecTaskResult{}, err
}
ioOpts = append(ioOpts, cio.WithFIFODir(req.FIFODir))
}
ioCreator := cio.NewCreator(ioOpts...)
execID := fmt.Sprintf("exec-%d", time.Now().UnixNano())
@@ -513,6 +622,131 @@ func (s *DefaultService) ExecTask(ctx context.Context, containerID string, req E
return ExecTaskResult{ExitCode: code}, nil
}
func (s *DefaultService) ExecTaskStreaming(ctx context.Context, containerID string, req ExecTaskRequest) (*ExecTaskSession, error) {
if containerID == "" || len(req.Args) == 0 {
return nil, ErrInvalidArgument
}
ctx = s.withNamespace(ctx)
container, err := s.client.LoadContainer(ctx, containerID)
if err != nil {
return nil, err
}
spec, err := container.Spec(ctx)
if err != nil {
return nil, err
}
if spec.Process == nil {
spec.Process = &specs.Process{}
}
if len(req.Env) > 0 {
if err := oci.WithEnv(req.Env)(ctx, nil, nil, spec); err != nil {
return nil, err
}
}
spec.Process.Args = req.Args
if req.WorkDir != "" {
spec.Process.Cwd = req.WorkDir
}
if req.Terminal {
spec.Process.Terminal = true
}
task, err := container.Task(ctx, nil)
if err != nil {
return nil, err
}
stdinR, stdinW := io.Pipe()
stdoutR, stdoutW := io.Pipe()
stderrR, stderrW := io.Pipe()
ioOpts := []cio.Opt{
cio.WithStreams(stdinR, stdoutW, stderrW),
}
if req.Terminal {
ioOpts = append(ioOpts, cio.WithTerminal)
}
fifoDir := strings.TrimSpace(req.FIFODir)
if fifoDir == "" {
if homeDir, err := os.UserHomeDir(); err == nil && homeDir != "" {
fifoDir = filepath.Join(homeDir, ".memoh", "containerd-fifo")
} else {
fifoDir = "/tmp/memoh-containerd-fifo"
}
}
if err := os.MkdirAll(fifoDir, 0o755); err != nil {
_ = stdinR.Close()
_ = stdinW.Close()
_ = stdoutR.Close()
_ = stdoutW.Close()
_ = stderrR.Close()
_ = stderrW.Close()
return nil, err
}
ioOpts = append(ioOpts, cio.WithFIFODir(fifoDir))
ioCreator := cio.NewCreator(ioOpts...)
execID := fmt.Sprintf("exec-%d", time.Now().UnixNano())
process, err := task.Exec(ctx, execID, spec.Process, ioCreator)
if err != nil {
_ = stdinR.Close()
_ = stdinW.Close()
_ = stdoutR.Close()
_ = stdoutW.Close()
_ = stderrR.Close()
_ = stderrW.Close()
return nil, err
}
if err := process.Start(ctx); err != nil {
_, _ = process.Delete(ctx)
_ = stdinR.Close()
_ = stdinW.Close()
_ = stdoutR.Close()
_ = stdoutW.Close()
_ = stderrR.Close()
_ = stderrW.Close()
return nil, err
}
wait := func() (ExecTaskResult, error) {
statusC, err := process.Wait(ctx)
if err != nil {
return ExecTaskResult{}, err
}
status := <-statusC
code, _, err := status.Result()
if err != nil {
return ExecTaskResult{}, err
}
_, _ = process.Delete(ctx)
_ = stdoutW.Close()
_ = stderrW.Close()
return ExecTaskResult{ExitCode: code}, nil
}
closeFn := func() error {
_ = stdinW.Close()
_ = stdoutR.Close()
_ = stderrR.Close()
_ = stdinR.Close()
_ = stdoutW.Close()
_ = stderrW.Close()
_, err := process.Delete(ctx)
return err
}
return &ExecTaskSession{
Stdin: stdinW,
Stdout: stdoutR,
Stderr: stderrR,
Wait: wait,
Close: closeFn,
}, nil
}
func (s *DefaultService) ListContainersByLabel(ctx context.Context, key, value string) ([]containerd.Container, error) {
if key == "" {
return nil, ErrInvalidArgument
@@ -545,6 +779,21 @@ func (s *DefaultService) CommitSnapshot(ctx context.Context, snapshotter, name,
return s.client.SnapshotService(snapshotter).Commit(ctx, name, key)
}
func (s *DefaultService) ListSnapshots(ctx context.Context, snapshotter string) ([]snapshots.Info, error) {
if snapshotter == "" {
return nil, ErrInvalidArgument
}
ctx = s.withNamespace(ctx)
infos := []snapshots.Info{}
if err := s.client.SnapshotService(snapshotter).Walk(ctx, func(ctx context.Context, info snapshots.Info) error {
infos = append(infos, info)
return nil
}); err != nil {
return nil, err
}
return infos, nil
}
func (s *DefaultService) PrepareSnapshot(ctx context.Context, snapshotter, key, parent string) error {
if snapshotter == "" || key == "" || parent == "" {
return ErrInvalidArgument
@@ -587,23 +836,19 @@ func (s *DefaultService) CreateContainerFromSnapshot(ctx context.Context, req Cr
containerOpts := []containerd.NewContainerOpts{
containerd.WithImage(image),
containerd.WithSnapshot(req.SnapshotID),
containerd.WithNewSpec(specOpts...),
}
if req.Snapshotter != "" {
containerOpts = append(containerOpts, containerd.WithSnapshotter(req.Snapshotter))
}
containerOpts = append(containerOpts,
containerd.WithSnapshot(req.SnapshotID),
containerd.WithNewSpec(specOpts...),
)
if len(req.Labels) > 0 {
containerOpts = append(containerOpts, containerd.WithContainerLabels(req.Labels))
}
runtimeName := s.client.Runtime()
if runtimeName == "" {
runtimeName = defaults.DefaultRuntime
if runtimeName == "" {
runtimeName = "io.containerd.runc.v2"
}
}
runtimeName := "io.containerd.runc.v2"
containerOpts = append(containerOpts, containerd.WithRuntime(runtimeName, nil))
return s.client.NewContainer(ctx, req.ID, containerOpts...)
@@ -620,4 +865,3 @@ func (s *DefaultService) SnapshotMounts(ctx context.Context, snapshotter, key st
func (s *DefaultService) withNamespace(ctx context.Context) context.Context {
return namespaces.WithNamespace(ctx, s.namespace)
}
+249 -8
View File
@@ -1,22 +1,35 @@
package handlers
import (
"context"
"log"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/containerd/errdefs"
tasktypes "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/errdefs"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/memohai/memoh/internal/config"
ctr "github.com/memohai/memoh/internal/containerd"
"github.com/memohai/memoh/internal/mcp"
)
type ContainerdHandler struct {
service ctr.Service
cfg config.MCPConfig
service ctr.Service
cfg config.MCPConfig
namespace string
mcpMu sync.Mutex
mcpSess map[string]*mcpSession
}
type CreateContainerRequest struct {
@@ -43,19 +56,55 @@ type CreateSnapshotResponse struct {
Snapshotter string `json:"snapshotter"`
}
type ContainerInfo struct {
ID string `json:"id"`
Image string `json:"image,omitempty"`
Snapshotter string `json:"snapshotter,omitempty"`
SnapshotKey string `json:"snapshot_key,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}
type ListContainersResponse struct {
Containers []ContainerInfo `json:"containers"`
}
type SnapshotInfo struct {
Snapshotter string `json:"snapshotter"`
Name string `json:"name"`
Parent string `json:"parent,omitempty"`
Kind string `json:"kind"`
CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}
type ListSnapshotsResponse struct {
Snapshotter string `json:"snapshotter"`
Snapshots []SnapshotInfo `json:"snapshots"`
}
func NewContainerdHandler(service ctr.Service, cfg config.MCPConfig, namespace string) *ContainerdHandler {
return &ContainerdHandler{
service: service,
cfg: cfg,
namespace: namespace,
mcpSess: make(map[string]*mcpSession),
}
}
func (h *ContainerdHandler) Register(e *echo.Echo) {
group := e.Group("/mcp")
group.POST("/containers", h.CreateContainer)
group.GET("/containers", h.ListContainers)
group.DELETE("/containers/:id", h.DeleteContainer)
group.POST("/snapshots", h.CreateSnapshot)
group.GET("/snapshots", h.ListSnapshots)
group.GET("/skills", h.ListSkills)
group.POST("/skills", h.UpsertSkills)
group.DELETE("/skills", h.DeleteSkills)
group.POST("/fs/:id", h.HandleMCPFS)
}
// CreateContainer godoc
@@ -67,12 +116,18 @@ func (h *ContainerdHandler) Register(e *echo.Echo) {
// @Failure 500 {object} ErrorResponse
// @Router /mcp/containers [post]
func (h *ContainerdHandler) CreateContainer(c echo.Context) error {
userID, err := h.requireUserID(c)
if err != nil {
return err
}
var req CreateContainerRequest
if err := c.Bind(&req); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
if strings.TrimSpace(req.ContainerID) == "" {
return echo.NewHTTPError(http.StatusBadRequest, "container_id is required")
req.ContainerID = strings.TrimSpace(req.ContainerID)
if req.ContainerID == "" {
req.ContainerID = uuid.NewString()
}
image := strings.TrimSpace(req.Image)
@@ -86,24 +141,62 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error {
if snapshotter == "" {
snapshotter = h.cfg.Snapshotter
}
if snapshotter == "" {
snapshotter = "overlayfs"
ctx := c.Request().Context()
if strings.TrimSpace(h.namespace) != "" {
ctx = namespaces.WithNamespace(ctx, h.namespace)
}
dataRoot := strings.TrimSpace(h.cfg.DataRoot)
if dataRoot == "" {
dataRoot = config.DefaultDataRoot
}
dataMount := strings.TrimSpace(h.cfg.DataMount)
if dataMount == "" {
dataMount = config.DefaultDataMount
}
dataDir := filepath.Join(dataRoot, "users", userID)
if err := os.MkdirAll(dataDir, 0o755); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
if err := os.MkdirAll(filepath.Join(dataDir, ".skills"), 0o755); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
_, err := h.service.CreateContainer(c.Request().Context(), ctr.CreateContainerRequest{
specOpts := []oci.SpecOpts{
oci.WithMounts([]specs.Mount{{
Destination: dataMount,
Type: "bind",
Source: dataDir,
Options: []string{"rbind", "rw"},
}}),
oci.WithProcessArgs("/bin/sh", "-lc", "sleep 2147483647"),
}
_, err = h.service.CreateContainer(ctx, ctr.CreateContainerRequest{
ID: req.ContainerID,
ImageRef: image,
Snapshotter: snapshotter,
Labels: map[string]string{
mcp.UserLabelKey: userID,
},
SpecOpts: specOpts,
})
if err != nil && !errdefs.IsAlreadyExists(err) {
return echo.NewHTTPError(http.StatusInternalServerError, "snapshotter="+snapshotter+" image="+image+" err="+err.Error())
}
started := false
fifoDir, err := h.taskFIFODir()
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
if _, err := h.service.StartTask(c.Request().Context(), req.ContainerID, &ctr.StartTaskOptions{
UseStdio: false,
FIFODir: fifoDir,
}); err == nil {
started = true
} else {
log.Printf("mcp container start failed: id=%s err=%v", req.ContainerID, err)
}
return c.JSON(http.StatusOK, CreateContainerResponse{
@@ -114,6 +207,111 @@ func (h *ContainerdHandler) CreateContainer(c echo.Context) error {
})
}
func (h *ContainerdHandler) taskFIFODir() (string, error) {
if homeDir, err := os.UserHomeDir(); err == nil && homeDir != "" {
fifoDir := filepath.Join(homeDir, ".memoh", "containerd-fifo")
if err := os.MkdirAll(fifoDir, 0o755); err != nil {
return "", err
}
return fifoDir, nil
}
fifoDir := "/tmp/memoh-containerd-fifo"
if err := os.MkdirAll(fifoDir, 0o755); err != nil {
return "", err
}
return fifoDir, nil
}
func (h *ContainerdHandler) ensureTaskRunning(ctx context.Context, containerID string) error {
tasks, err := h.service.ListTasks(ctx, &ctr.ListTasksOptions{
Filter: "container.id==" + containerID,
})
if err != nil {
return err
}
if len(tasks) > 0 {
if tasks[0].Status == tasktypes.Status_RUNNING {
return nil
}
_ = h.service.DeleteTask(ctx, containerID, &ctr.DeleteTaskOptions{Force: true})
}
fifoDir, err := h.taskFIFODir()
if err != nil {
return err
}
_, err = h.service.StartTask(ctx, containerID, &ctr.StartTaskOptions{
UseStdio: false,
FIFODir: fifoDir,
})
return err
}
func (h *ContainerdHandler) userContainerID(ctx context.Context, userID string) (string, error) {
containers, err := h.service.ListContainersByLabel(ctx, mcp.UserLabelKey, userID)
if err != nil {
return "", err
}
if len(containers) == 0 {
return "", echo.NewHTTPError(http.StatusNotFound, "container not found")
}
infoCtx := ctx
if strings.TrimSpace(h.namespace) != "" {
infoCtx = namespaces.WithNamespace(ctx, h.namespace)
}
bestID := ""
var bestUpdated time.Time
for _, container := range containers {
info, err := container.Info(infoCtx)
if err != nil {
return "", err
}
if bestID == "" || info.UpdatedAt.After(bestUpdated) {
bestID = info.ID
bestUpdated = info.UpdatedAt
}
}
return bestID, nil
}
// ListContainers godoc
// @Summary List containers
// @Tags containerd
// @Success 200 {object} ListContainersResponse
// @Failure 500 {object} ErrorResponse
// @Router /mcp/containers [get]
func (h *ContainerdHandler) ListContainers(c echo.Context) error {
ctx := c.Request().Context()
containers, err := h.service.ListContainers(ctx)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
infoCtx := ctx
if strings.TrimSpace(h.namespace) != "" {
infoCtx = namespaces.WithNamespace(ctx, h.namespace)
}
items := make([]ContainerInfo, 0, len(containers))
for _, container := range containers {
info, err := container.Info(infoCtx)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
items = append(items, ContainerInfo{
ID: info.ID,
Image: info.Image,
Snapshotter: info.Snapshotter,
SnapshotKey: info.SnapshotKey,
CreatedAt: info.CreatedAt,
UpdatedAt: info.UpdatedAt,
Labels: info.Labels,
})
}
sort.Slice(items, func(i, j int) bool {
return items[i].ID < items[j].ID
})
return c.JSON(http.StatusOK, ListContainersResponse{Containers: items})
}
// DeleteContainer godoc
// @Summary Delete MCP container
// @Tags containerd
@@ -186,3 +384,46 @@ func (h *ContainerdHandler) CreateSnapshot(c echo.Context) error {
Snapshotter: info.Snapshotter,
})
}
// ListSnapshots godoc
// @Summary List snapshots
// @Tags containerd
// @Param snapshotter query string false "Snapshotter name"
// @Success 200 {object} ListSnapshotsResponse
// @Failure 500 {object} ErrorResponse
// @Router /mcp/snapshots [get]
func (h *ContainerdHandler) ListSnapshots(c echo.Context) error {
snapshotter := strings.TrimSpace(c.QueryParam("snapshotter"))
if snapshotter == "" {
snapshotter = strings.TrimSpace(h.cfg.Snapshotter)
}
if snapshotter == "" {
snapshotter = "overlayfs"
}
snapshots, err := h.service.ListSnapshots(c.Request().Context(), snapshotter)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
items := make([]SnapshotInfo, 0, len(snapshots))
for _, info := range snapshots {
items = append(items, SnapshotInfo{
Snapshotter: snapshotter,
Name: info.Name,
Parent: info.Parent,
Kind: info.Kind.String(),
CreatedAt: info.Created,
UpdatedAt: info.Updated,
Labels: info.Labels,
})
}
sort.Slice(items, func(i, j int) bool {
if items[i].CreatedAt.Equal(items[j].CreatedAt) {
return items[i].Name < items[j].Name
}
return items[i].CreatedAt.Before(items[j].CreatedAt)
})
return c.JSON(http.StatusOK, ListSnapshotsResponse{
Snapshotter: snapshotter,
Snapshots: items,
})
}
+462
View File
@@ -0,0 +1,462 @@
package handlers
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os/exec"
"runtime"
"strings"
"sync"
"time"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/errdefs"
"github.com/labstack/echo/v4"
"github.com/memohai/memoh/internal/auth"
ctr "github.com/memohai/memoh/internal/containerd"
"github.com/memohai/memoh/internal/identity"
mcptools "github.com/memohai/memoh/internal/mcp"
)
// HandleMCPFS godoc
// @Summary MCP filesystem tools (JSON-RPC)
// @Description Forwards MCP JSON-RPC requests to the MCP server inside the container.
// @Description Required:
// @Description - container task is running
// @Description - container has data mount (default /data) bound to <data_root>/users/<user_id>
// @Description - container image contains the "mcp" binary
// @Description Auth: Bearer JWT is used to determine user_id (sub or user_id).
// @Description Paths must be relative (no leading slash) and must not contain "..".
// @Description
// @Description Example: tools/list
// @Description {"jsonrpc":"2.0","id":1,"method":"tools/list"}
// @Description
// @Description Example: tools/call (fs.read)
// @Description {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"fs.read","arguments":{"path":"notes.txt"}}}
// @Tags containerd
// @Param Authorization header string true "Bearer <token>"
// @Param id path string true "Container ID"
// @Param payload body object true "JSON-RPC request"
// @Success 200 {object} object "JSON-RPC response: {jsonrpc,id,result|error}"
// @Failure 400 {object} ErrorResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /mcp/fs/{id} [post]
func (h *ContainerdHandler) HandleMCPFS(c echo.Context) error {
containerID := strings.TrimSpace(c.Param("id"))
if containerID == "" {
return echo.NewHTTPError(http.StatusBadRequest, "container id is required")
}
var req mcptools.JSONRPCRequest
if err := c.Bind(&req); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
if req.JSONRPC != "" && req.JSONRPC != "2.0" {
return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: &mcptools.JSONRPCError{Code: -32600, Message: "invalid jsonrpc version"},
})
}
userID, err := h.requireUserID(c)
if err != nil {
return err
}
if err := h.validateMCPContainer(c.Request().Context(), containerID, userID); err != nil {
return err
}
if err := h.ensureTaskRunning(c.Request().Context(), containerID); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
switch req.Method {
case "tools/list":
payload, err := h.callMCPServer(c.Request().Context(), containerID, req)
if err != nil {
return err
}
return c.JSON(http.StatusOK, payload)
case "tools/call":
payload, err := h.callMCPServer(c.Request().Context(), containerID, req)
if err != nil {
return err
}
return c.JSON(http.StatusOK, payload)
default:
return c.JSON(http.StatusOK, mcptools.JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: &mcptools.JSONRPCError{Code: -32601, Message: "method not found"},
})
}
}
func (h *ContainerdHandler) validateMCPContainer(ctx context.Context, containerID, userID string) error {
if strings.TrimSpace(userID) == "" {
return echo.NewHTTPError(http.StatusUnauthorized, "invalid token")
}
container, err := h.service.GetContainer(ctx, containerID)
if err != nil {
if errdefs.IsNotFound(err) {
return echo.NewHTTPError(http.StatusNotFound, "container not found")
}
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
infoCtx := ctx
if strings.TrimSpace(h.namespace) != "" {
infoCtx = namespaces.WithNamespace(ctx, h.namespace)
}
info, err := container.Info(infoCtx)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
labelUserID := strings.TrimSpace(info.Labels[mcptools.UserLabelKey])
if labelUserID != "" && labelUserID != userID {
return echo.NewHTTPError(http.StatusForbidden, "user mismatch")
}
return nil
}
func (h *ContainerdHandler) requireUserID(c echo.Context) (string, error) {
userID, err := auth.UserIDFromContext(c)
if err != nil {
return "", err
}
if err := identity.ValidateUserID(userID); err != nil {
return "", echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
return userID, nil
}
func (h *ContainerdHandler) callMCPServer(ctx context.Context, containerID string, req mcptools.JSONRPCRequest) (map[string]any, error) {
session, err := h.getMCPSession(ctx, containerID)
if err != nil {
return nil, err
}
return session.call(ctx, req)
}
type mcpSession struct {
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
cmd *exec.Cmd
initOnce sync.Once
writeMu sync.Mutex
pendingMu sync.Mutex
pending map[string]chan mcptools.JSONRPCResponse
closed chan struct{}
closeOnce sync.Once
closeErr error
onClose func()
}
func (h *ContainerdHandler) getMCPSession(ctx context.Context, containerID string) (*mcpSession, error) {
h.mcpMu.Lock()
if sess, ok := h.mcpSess[containerID]; ok {
h.mcpMu.Unlock()
return sess, nil
}
h.mcpMu.Unlock()
var sess *mcpSession
var err error
if runtime.GOOS == "darwin" {
sess, err = h.startLimaMCPSession(containerID)
}
if err != nil || sess == nil {
sess, err = h.startContainerdMCPSession(ctx, containerID)
if err != nil {
return nil, err
}
}
h.mcpMu.Lock()
h.mcpSess[containerID] = sess
h.mcpMu.Unlock()
sess.onClose = func() {
h.mcpMu.Lock()
if current, ok := h.mcpSess[containerID]; ok && current == sess {
delete(h.mcpSess, containerID)
}
h.mcpMu.Unlock()
}
return sess, nil
}
func (h *ContainerdHandler) startContainerdMCPSession(ctx context.Context, containerID string) (*mcpSession, error) {
execSession, err := h.service.ExecTaskStreaming(ctx, containerID, ctr.ExecTaskRequest{
Args: []string{"/mcp"},
})
if err != nil {
return nil, err
}
sess := &mcpSession{
stdin: execSession.Stdin,
stdout: execSession.Stdout,
stderr: execSession.Stderr,
pending: make(map[string]chan mcptools.JSONRPCResponse),
closed: make(chan struct{}),
}
go sess.readLoop()
go func() {
_, err := execSession.Wait()
if err != nil {
sess.closeWithError(err)
} else {
sess.closeWithError(io.EOF)
}
}()
return sess, nil
}
func (h *ContainerdHandler) startLimaMCPSession(containerID string) (*mcpSession, error) {
execID := fmt.Sprintf("mcp-%d", time.Now().UnixNano())
cmd := exec.Command(
"limactl",
"shell",
"--tty=false",
"default",
"--",
"sudo",
"-n",
"ctr",
"-n",
"default",
"tasks",
"exec",
"--exec-id",
execID,
containerID,
"/mcp",
)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
_ = stdin.Close()
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
_ = stdin.Close()
_ = stdout.Close()
return nil, err
}
if err := cmd.Start(); err != nil {
_ = stdin.Close()
_ = stdout.Close()
_ = stderr.Close()
return nil, err
}
sess := &mcpSession{
stdin: stdin,
stdout: stdout,
stderr: stderr,
cmd: cmd,
pending: make(map[string]chan mcptools.JSONRPCResponse),
closed: make(chan struct{}),
}
go sess.readLoop()
go func() {
if err := cmd.Wait(); err != nil {
sess.closeWithError(err)
} else {
sess.closeWithError(io.EOF)
}
}()
return sess, nil
}
func (s *mcpSession) closeWithError(err error) {
s.closeOnce.Do(func() {
s.closeErr = err
close(s.closed)
s.pendingMu.Lock()
for _, ch := range s.pending {
close(ch)
}
s.pending = map[string]chan mcptools.JSONRPCResponse{}
s.pendingMu.Unlock()
_ = s.stdin.Close()
_ = s.stdout.Close()
_ = s.stderr.Close()
if s.cmd != nil && s.cmd.Process != nil {
_ = s.cmd.Process.Kill()
}
if s.onClose != nil {
s.onClose()
}
})
}
func (s *mcpSession) readLoop() {
scanner := bufio.NewScanner(s.stdout)
scanner.Buffer(make([]byte, 0, 64*1024), 8*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
var resp mcptools.JSONRPCResponse
if err := json.Unmarshal([]byte(line), &resp); err != nil {
continue
}
id := strings.TrimSpace(string(resp.ID))
if id == "" {
continue
}
s.pendingMu.Lock()
ch, ok := s.pending[id]
if ok {
delete(s.pending, id)
}
s.pendingMu.Unlock()
if ok {
ch <- resp
close(ch)
}
}
if err := scanner.Err(); err != nil {
s.closeWithError(err)
} else {
s.closeWithError(io.EOF)
}
}
func (s *mcpSession) call(ctx context.Context, req mcptools.JSONRPCRequest) (map[string]any, error) {
payloads, targetID, err := buildMCPPayloads(req, &s.initOnce)
if err != nil {
return nil, err
}
target := strings.TrimSpace(string(targetID))
if target == "" {
return nil, fmt.Errorf("missing request id")
}
respCh := make(chan mcptools.JSONRPCResponse, 1)
s.pendingMu.Lock()
s.pending[target] = respCh
s.pendingMu.Unlock()
s.writeMu.Lock()
for _, payload := range payloads {
if _, err := s.stdin.Write([]byte(payload + "\n")); err != nil {
s.writeMu.Unlock()
return nil, err
}
}
s.writeMu.Unlock()
select {
case resp, ok := <-respCh:
if !ok {
if s.closeErr != nil {
return nil, s.closeErr
}
return nil, io.EOF
}
if resp.Error != nil {
return map[string]any{
"jsonrpc": "2.0",
"id": resp.ID,
"error": map[string]any{
"code": resp.Error.Code,
"message": resp.Error.Message,
},
}, nil
}
return map[string]any{
"jsonrpc": "2.0",
"id": resp.ID,
"result": resp.Result,
}, nil
case <-s.closed:
if s.closeErr != nil {
return nil, s.closeErr
}
return nil, io.EOF
case <-ctx.Done():
return nil, ctx.Err()
}
}
func buildMCPPayloads(req mcptools.JSONRPCRequest, initOnce *sync.Once) ([]string, json.RawMessage, error) {
if req.JSONRPC == "" {
req.JSONRPC = "2.0"
}
targetID := req.ID
payloads := []string{}
shouldInit := req.Method != "initialize" && req.Method != "notifications/initialized"
if initOnce != nil {
ran := false
initOnce.Do(func() {
ran = true
})
if ran {
// This is the first call on the session.
} else {
shouldInit = false
}
}
if shouldInit {
initReq := map[string]any{
"jsonrpc": "2.0",
"id": "init-1",
"method": "initialize",
"params": map[string]any{
"protocolVersion": "2025-06-18",
"capabilities": map[string]any{
"roots": map[string]any{
"listChanged": false,
},
},
"clientInfo": map[string]any{
"name": "memoh-http-proxy",
"version": "v0",
},
},
}
initBytes, err := json.Marshal(initReq)
if err != nil {
return nil, nil, err
}
payloads = append(payloads, string(initBytes))
initialized := map[string]any{
"jsonrpc": "2.0",
"method": "notifications/initialized",
}
initializedBytes, err := json.Marshal(initialized)
if err != nil {
return nil, nil, err
}
payloads = append(payloads, string(initializedBytes))
}
reqBytes, err := json.Marshal(req)
if err != nil {
return nil, nil, err
}
payloads = append(payloads, string(reqBytes))
return payloads, targetID, nil
}
+342
View File
@@ -0,0 +1,342 @@
package handlers
import (
"context"
"errors"
"net/http"
"os"
"path"
"strconv"
"strings"
"time"
"github.com/labstack/echo/v4"
"github.com/memohai/memoh/internal/config"
mcptools "github.com/memohai/memoh/internal/mcp"
)
type SkillItem struct {
Name string `json:"name"`
Description string `json:"description"`
Content string `json:"content"`
}
type SkillsResponse struct {
Skills []SkillItem `json:"skills"`
}
type SkillsUpsertRequest struct {
Skills []SkillItem `json:"skills"`
}
type SkillsDeleteRequest struct {
Names []string `json:"names"`
}
type skillsOpResponse struct {
OK bool `json:"ok"`
}
// ListSkills godoc
// @Summary List skills from container
// @Tags containerd
// @Success 200 {object} SkillsResponse
// @Failure 400 {object} ErrorResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /mcp/skills [get]
func (h *ContainerdHandler) ListSkills(c echo.Context) error {
userID, err := h.requireUserID(c)
if err != nil {
return err
}
ctx := c.Request().Context()
containerID, err := h.userContainerID(ctx, userID)
if err != nil {
return err
}
if err := h.ensureTaskRunning(ctx, containerID); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
if err := h.ensureSkillsDirHost(userID); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
listPayload, err := h.callMCPTool(ctx, containerID, "fs.list", map[string]any{
"path": ".skills",
"recursive": false,
})
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
entries, err := extractListEntries(listPayload)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
skills := make([]SkillItem, 0, len(entries))
for _, entry := range entries {
skillPath, name := skillPathForEntry(entry)
if skillPath == "" {
continue
}
content, err := h.readSkillFile(ctx, containerID, skillPath)
if err != nil {
continue
}
skills = append(skills, SkillItem{
Name: name,
Description: skillDescription(content),
Content: content,
})
}
return c.JSON(http.StatusOK, SkillsResponse{Skills: skills})
}
// UpsertSkills godoc
// @Summary Upload skills into container
// @Tags containerd
// @Param payload body SkillsUpsertRequest true "Skills payload"
// @Success 200 {object} skillsOpResponse
// @Failure 400 {object} ErrorResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /mcp/skills [post]
func (h *ContainerdHandler) UpsertSkills(c echo.Context) error {
userID, err := h.requireUserID(c)
if err != nil {
return err
}
var req SkillsUpsertRequest
if err := c.Bind(&req); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
if len(req.Skills) == 0 {
return echo.NewHTTPError(http.StatusBadRequest, "skills is required")
}
ctx := c.Request().Context()
containerID, err := h.userContainerID(ctx, userID)
if err != nil {
return err
}
if err := h.ensureTaskRunning(ctx, containerID); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
for _, skill := range req.Skills {
name := strings.TrimSpace(skill.Name)
if !isValidSkillName(name) {
return echo.NewHTTPError(http.StatusBadRequest, "invalid skill name")
}
content := strings.TrimSpace(skill.Content)
if content == "" {
content = buildSkillContent(name, strings.TrimSpace(skill.Description))
}
filePath := path.Join(".skills", name, "SKILL.md")
if _, err := h.callMCPTool(ctx, containerID, "fs.write", map[string]any{
"path": filePath,
"content": content,
}); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
}
return c.JSON(http.StatusOK, skillsOpResponse{OK: true})
}
// DeleteSkills godoc
// @Summary Delete skills from container
// @Tags containerd
// @Param payload body SkillsDeleteRequest true "Delete skills payload"
// @Success 200 {object} skillsOpResponse
// @Failure 400 {object} ErrorResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /mcp/skills [delete]
func (h *ContainerdHandler) DeleteSkills(c echo.Context) error {
userID, err := h.requireUserID(c)
if err != nil {
return err
}
var req SkillsDeleteRequest
if err := c.Bind(&req); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
if len(req.Names) == 0 {
return echo.NewHTTPError(http.StatusBadRequest, "names is required")
}
ctx := c.Request().Context()
containerID, err := h.userContainerID(ctx, userID)
if err != nil {
return err
}
if err := h.ensureTaskRunning(ctx, containerID); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
for _, name := range req.Names {
skillName := strings.TrimSpace(name)
if !isValidSkillName(skillName) {
return echo.NewHTTPError(http.StatusBadRequest, "invalid skill name")
}
deletePath := path.Join(".skills", skillName)
if _, err := h.callMCPTool(ctx, containerID, "fs.delete", map[string]any{
"path": deletePath,
}); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
}
return c.JSON(http.StatusOK, skillsOpResponse{OK: true})
}
func (h *ContainerdHandler) ensureSkillsDirHost(userID string) error {
dataRoot := strings.TrimSpace(h.cfg.DataRoot)
if dataRoot == "" {
dataRoot = config.DefaultDataRoot
}
skillsDir := path.Join(dataRoot, "users", userID, ".skills")
return os.MkdirAll(skillsDir, 0o755)
}
func (h *ContainerdHandler) readSkillFile(ctx context.Context, containerID, filePath string) (string, error) {
payload, err := h.callMCPTool(ctx, containerID, "fs.read", map[string]any{
"path": filePath,
})
if err != nil {
return "", err
}
content, err := extractContentString(payload)
if err != nil {
return "", err
}
return content, nil
}
func (h *ContainerdHandler) callMCPTool(ctx context.Context, containerID, toolName string, args map[string]any) (map[string]any, error) {
id := "skills-" + strconv.FormatInt(time.Now().UnixNano(), 10)
req, err := mcptools.NewToolCallRequest(id, toolName, args)
if err != nil {
return nil, err
}
payload, err := h.callMCPServer(ctx, containerID, req)
if err != nil {
return nil, err
}
if err := mcptools.PayloadError(payload); err != nil {
return nil, err
}
if err := mcptools.ResultError(payload); err != nil {
return nil, err
}
return payload, nil
}
func extractListEntries(payload map[string]any) ([]skillEntry, error) {
result, err := mcptools.StructuredContent(payload)
if err != nil {
return nil, err
}
rawEntries, ok := result["entries"].([]any)
if !ok {
return nil, errors.New("invalid list response")
}
entries := make([]skillEntry, 0, len(rawEntries))
for _, raw := range rawEntries {
entryMap, ok := raw.(map[string]any)
if !ok {
continue
}
entryPath, _ := entryMap["path"].(string)
if entryPath == "" {
continue
}
isDir, _ := entryMap["is_dir"].(bool)
entries = append(entries, skillEntry{Path: entryPath, IsDir: isDir})
}
return entries, nil
}
type skillEntry struct {
Path string
IsDir bool
}
func extractContentString(payload map[string]any) (string, error) {
result, err := mcptools.StructuredContent(payload)
if err != nil {
return "", err
}
content, _ := result["content"].(string)
if content == "" {
return "", errors.New("empty content")
}
return content, nil
}
func skillNameFromPath(rel string) string {
if rel == "" || rel == "SKILL.md" {
return "default"
}
parent := path.Dir(rel)
if parent == "." {
return "default"
}
return path.Base(parent)
}
func skillPathForEntry(entry skillEntry) (string, string) {
rel := strings.TrimPrefix(entry.Path, ".skills/")
if rel == entry.Path {
rel = strings.TrimPrefix(entry.Path, "./.skills/")
}
if entry.IsDir {
name := path.Base(rel)
if name == "." || name == "" {
return "", ""
}
return path.Join(".skills", name, "SKILL.md"), name
}
if path.Base(rel) == "SKILL.md" {
return path.Join(".skills", "SKILL.md"), skillNameFromPath(rel)
}
return "", ""
}
func skillDescription(content string) string {
lines := strings.Split(content, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
if strings.HasPrefix(line, "#") {
return strings.TrimSpace(strings.TrimPrefix(line, "#"))
}
return line
}
return ""
}
func buildSkillContent(name, description string) string {
if description == "" {
return "# " + name
}
return "# " + name + "\n\n" + description
}
func isValidSkillName(name string) bool {
if name == "" {
return false
}
if strings.Contains(name, "..") {
return false
}
if strings.Contains(name, "/") || strings.Contains(name, "\\") {
return false
}
return true
}
+105
View File
@@ -0,0 +1,105 @@
package mcp
import (
"encoding/json"
"errors"
"strconv"
)
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
ID json.RawMessage `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}
type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID json.RawMessage `json:"id,omitempty"`
Result any `json:"result,omitempty"`
Error *JSONRPCError `json:"error,omitempty"`
}
type JSONRPCError struct {
Code int `json:"code"`
Message string `json:"message"`
}
func NewToolCallRequest(id string, toolName string, args map[string]any) (JSONRPCRequest, error) {
params := map[string]any{
"name": toolName,
"arguments": args,
}
rawParams, err := json.Marshal(params)
if err != nil {
return JSONRPCRequest{}, err
}
return JSONRPCRequest{
JSONRPC: "2.0",
ID: RawStringID(id),
Method: "tools/call",
Params: rawParams,
}, nil
}
func RawStringID(id string) json.RawMessage {
return json.RawMessage([]byte(strconv.Quote(id)))
}
func PayloadError(payload map[string]any) error {
if payload == nil {
return errors.New("empty payload")
}
if errObj, ok := payload["error"].(map[string]any); ok {
if msg, ok := errObj["message"].(string); ok && msg != "" {
return errors.New(msg)
}
return errors.New("mcp error")
}
return nil
}
func ResultError(payload map[string]any) error {
result, ok := payload["result"].(map[string]any)
if !ok {
return nil
}
if isErr, ok := result["isError"].(bool); ok && isErr {
msg := ContentText(result)
if msg == "" {
msg = "mcp tool error"
}
return errors.New(msg)
}
return nil
}
func StructuredContent(payload map[string]any) (map[string]any, error) {
result, ok := payload["result"].(map[string]any)
if !ok {
return nil, errors.New("missing result")
}
if structured, ok := result["structuredContent"].(map[string]any); ok {
return structured, nil
}
if content := ContentText(result); content != "" {
var out map[string]any
if err := json.Unmarshal([]byte(content), &out); err == nil {
return out, nil
}
}
return nil, errors.New("missing structured content")
}
func ContentText(result map[string]any) string {
rawContent, ok := result["content"].([]any)
if !ok || len(rawContent) == 0 {
return ""
}
first, ok := rawContent[0].(map[string]any)
if !ok {
return ""
}
text, _ := first["text"].(string)
return text
}
-3
View File
@@ -29,9 +29,6 @@ func NewServer(addr string, jwtSecret string, pingHandler *handlers.PingHandler,
if path == "/ping" || path == "/api/swagger.json" || path == "/auth/login" {
return true
}
if strings.HasPrefix(path, "/mcp/") {
return true
}
if strings.HasPrefix(path, "/api/docs") {
return true
}