feat: container

This commit is contained in:
Acbox
2026-01-15 20:10:37 +08:00
parent 31d8103ed2
commit 9ee8b19475
24 changed files with 1527 additions and 53 deletions
+52 -29
View File
@@ -32,10 +32,14 @@ export const createAgent = (params: AgentParams) => {
}
})
} else if (connection.type === 'stdio') {
// Build exec command for container execution
const commands = params.onBuildExecCommand?.([connection.command]) ?? [connection.command]
commands.push(...connection.args)
const [command, ...args] = commands
return await createMCPClient({
transport: new StdioClientTransport({
command: connection.command,
args: connection.args,
command,
args,
env: connection.env,
cwd: connection.cwd,
}),
@@ -123,35 +127,54 @@ export const createAgent = (params: AgentParams) => {
}
async function* ask(input: string) {
await loadContext()
const user = {
role: 'user',
content: input,
} as UserModelMessage
messages.push(user)
const { fullStream, response } = streamText({
model: gateway,
system: getSystemPrompt(),
prepareStep: async () => {
return {
system: getSystemPrompt(),
try {
await loadContext()
const user = {
role: 'user',
content: input,
} as UserModelMessage
messages.push(user)
const { fullStream, response } = streamText({
model: gateway,
system: getSystemPrompt(),
prepareStep: async () => {
return {
system: getSystemPrompt(),
}
},
stopWhen: stepCountIs(10),
messages,
tools: await getTools(),
onFinish: async () => {
await onComplete()
},
})
for await (const event of fullStream) {
yield event
}
// Wait for response and save to memory
try {
const newMessages = (await response).messages
await params.onFinish?.([
user as ModelMessage,
...newMessages,
])
} catch (finishError) {
console.error('Error in onFinish callback:', finishError)
// Yield error event but don't throw - let the stream complete
yield {
type: 'error' as const,
error: finishError instanceof Error ? finishError.message : 'Failed to save conversation'
}
},
stopWhen: stepCountIs(10),
messages,
tools: await getTools(),
onFinish: async () => {
await onComplete()
},
})
for await (const event of fullStream) {
yield event
}
} catch (error) {
console.error('Error in agent.ask():', error)
yield {
type: 'error' as const,
error: error instanceof Error ? error.message : 'Unknown error occurred'
}
}
const newMessages = (await response).messages
await params.onFinish?.([
user as ModelMessage,
...newMessages,
])
}
const triggerSchedule = async (schedule: Schedule) => {
+4
View File
@@ -28,6 +28,10 @@ export interface AgentParams {
mcpConnections?: MCPConnection[]
onBuildExecCommand?: (command: string[]) => string[]
onExecCommand?: (command: string[]) => Promise<{ stdout: string, stderr: string, exitCode: number }>
onSendMessage?: (platform: string, options: SendMessageOptions) => Promise<void>
onReadMemory?: (from: Date, to: Date) => Promise<MemoryUnit[]>
+1
View File
@@ -21,6 +21,7 @@
"@memoh/db": "workspace:*",
"@memoh/memory": "workspace:*",
"@memoh/shared": "workspace:*",
"@memoh/container": "workspace:*",
"@memoh/platform": "workspace:*",
"@memoh/platform-telegram": "workspace:*",
"drizzle-orm": "^0.45.1",
+2
View File
@@ -10,6 +10,7 @@ import {
platformModule,
memoryModule,
mcpModule,
containerModule,
} from './modules'
import openapi from '@elysiajs/openapi'
@@ -28,6 +29,7 @@ export const app = new Elysia()
.use(userModule)
.use(platformModule)
.use(mcpModule)
.use(containerModule)
.listen(port)
console.log(
+9
View File
@@ -58,22 +58,31 @@ export const agentModule = new Elysia({
try {
const encoder = new TextEncoder()
console.log('📨 Starting agent stream for message:', body.message.substring(0, 50))
// Send events as they come
for await (const event of agent.ask(body.message)) {
const data = JSON.stringify(event)
controller.enqueue(encoder.encode(`data: ${data}\n\n`))
}
console.log('✅ Agent stream completed successfully')
// Send done event
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
} catch (error) {
console.error('❌ Error in agent stream:', error)
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
const errorStack = error instanceof Error ? error.stack : undefined
console.error('Error stack:', errorStack)
const errorData = JSON.stringify({
type: 'error',
error: errorMessage
})
controller.enqueue(new TextEncoder().encode(`data: ${errorData}\n\n`))
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
controller.close()
}
},
+25
View File
@@ -1,9 +1,11 @@
import { createAgent as createAgentService } from '@memoh/agent'
import { createMemory, filterByTimestamp, MemoryUnit } from '@memoh/memory'
import { ChatModel, EmbeddingModel, Platform, Schedule } from '@memoh/shared'
import { useContainer } from '@memoh/container'
import { createSchedule, deleteSchedule, getActiveSchedules } from '../schedule/service'
import { getActivePlatforms, sendMessageToPlatform } from '../platform/service'
import { getActiveMCPConnections } from '../mcp/service'
import { getUserContainerInfo } from '../container/service'
// Type for messages passed to onFinish callback
type MessageType = Record<string, unknown>
@@ -39,7 +41,24 @@ export async function createAgent(params: CreateAgentStreamParams) {
const platforms = await getActivePlatforms()
const mcpConnections = await getActiveMCPConnections(userId)
const containerInfo = await getUserContainerInfo(userId)
if (!containerInfo) {
throw new Error('Container not found')
}
// Ensure container is running before creating agent
const container = useContainer(containerInfo.containerName, {
namespace: containerInfo.namespace,
})
// Check and start container if not running
const info = await container.info()
if (info.status !== 'running') {
console.log(`🚀 Starting container ${containerInfo.containerName} for agent...`)
await container.start()
// Wait a bit for container to be fully ready
await new Promise(resolve => setTimeout(resolve, 2000))
}
// Create agent
const agent = createAgentService({
model: chatModel,
@@ -96,6 +115,12 @@ export async function createAgent(params: CreateAgentStreamParams) {
maxCalls: schedule.maxCalls || undefined,
})
},
onBuildExecCommand(command) {
return container.buildExecCommand(command)
},
async onExecCommand(command) {
return await container.exec(command)
},
})
return agent
+268
View File
@@ -0,0 +1,268 @@
import { Elysia } from 'elysia'
import { adminMiddleware } from '../../middlewares'
import {
createUserContainer,
startUserContainer,
stopUserContainer,
restartUserContainer,
pauseUserContainer,
resumeUserContainer,
deleteUserContainer,
getUserContainerInfo,
ensureUserContainer,
syncAllContainerStatus,
getAllContainers,
startAllAutoStartContainers,
pauseAllContainers,
} from './service'
import {
CreateContainerSchema,
ContainerActionSchema,
EnsureContainerSchema,
} from './model'
import { getUsers } from '../user/service'
/**
* Container Management Module
* All routes require admin privileges
*/
export const containerModule = new Elysia({ prefix: '/containers' })
// Protect all routes with admin middleware
.use(adminMiddleware)
.onStart(async () => {
console.log('\n📦 Initializing containers...')
try {
// 0. 初始化容器基础目录
const { initializeContainerBaseDirectory } = await import('./utils')
initializeContainerBaseDirectory()
// 1. 同步所有容器状态
await syncAllContainerStatus()
// 2. 检查所有用户是否有容器,没有则创建
const usersResult = await getUsers({ page: 1, limit: 1000 })
console.log(`👥 Found ${usersResult.items.length} users`)
for (const user of usersResult.items) {
try {
await ensureUserContainer(user.id)
console.log(`✅ Container ensured for user: ${user.username}`)
} catch (error) {
console.error(`❌ Failed to ensure container for user ${user.username}:`, error)
}
}
// 3. 启动所有自动启动的容器
await startAllAutoStartContainers()
console.log('✨ Container initialization complete\n')
} catch (error) {
console.error('❌ Container initialization failed:', error)
}
})
.onStop(async () => {
console.log('\n⏸️ Pausing all containers...')
try {
await pauseAllContainers()
console.log('✨ All containers paused\n')
} catch (error) {
console.error('❌ Failed to pause containers:', error)
}
})
.get(
'/',
async () => {
const containers = await getAllContainers()
return {
success: true,
data: containers,
}
},
{
detail: {
tags: ['Container'],
summary: 'Get all containers',
description: 'Retrieve information about all containers in the system',
},
}
)
.get(
'/user/:userId',
async ({ params: { userId } }) => {
const container = await getUserContainerInfo(userId)
if (!container) {
return {
success: false,
error: 'Container not found for user',
}
}
return {
success: true,
data: container,
}
},
{
detail: {
tags: ['Container'],
summary: 'Get user container',
description: 'Get container information for a specific user',
},
}
)
.post(
'/create',
async ({ body }) => {
try {
const container = await createUserContainer(
body.userId,
body.image,
body.namespace
)
return {
success: true,
data: container,
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to create container',
}
}
},
{
body: CreateContainerSchema,
detail: {
tags: ['Container'],
summary: 'Create user container',
description: 'Create a new container for a specified user',
},
}
)
.post(
'/user/:userId/ensure',
async ({ params: { userId }, body }) => {
try {
const container = await ensureUserContainer(
userId,
body?.image,
body?.namespace
)
return {
success: true,
data: container,
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to ensure container',
}
}
},
{
body: EnsureContainerSchema,
detail: {
tags: ['Container'],
summary: 'Ensure user has container',
description: 'Check if user has a container, create one if not exists',
},
}
)
.post(
'/user/:userId/action',
async ({ params: { userId }, body }) => {
try {
switch (body.action) {
case 'start':
await startUserContainer(userId)
break
case 'stop':
await stopUserContainer(userId)
break
case 'restart':
await restartUserContainer(userId)
break
case 'pause':
await pauseUserContainer(userId)
break
case 'resume':
await resumeUserContainer(userId)
break
default:
return {
success: false,
error: 'Invalid action',
}
}
return {
success: true,
message: `Container ${body.action} successful`,
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : `Failed to ${body.action} container`,
}
}
},
{
body: ContainerActionSchema,
detail: {
tags: ['Container'],
summary: 'Execute container action',
description: 'Perform start, stop, restart, pause, or resume actions on a user container',
},
}
)
.delete(
'/user/:userId',
async ({ params: { userId }, query }) => {
try {
const force = query.force === 'true'
await deleteUserContainer(userId, force)
return {
success: true,
message: 'Container deleted successfully',
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to delete container',
}
}
},
{
detail: {
tags: ['Container'],
summary: 'Delete user container',
description: 'Delete the container for a specified user',
},
}
)
.post(
'/sync',
async () => {
try {
await syncAllContainerStatus()
return {
success: true,
message: 'Container status synced successfully',
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to sync container status',
}
}
},
{
detail: {
tags: ['Container'],
summary: 'Sync all container statuses',
description: 'Synchronize all container statuses from containerd to the database',
},
}
)
@@ -0,0 +1,62 @@
import { z } from 'zod'
/**
*
*/
export const CreateContainerSchema = z.object({
userId: z.string(),
image: z.string().optional().default('docker.io/library/alpine:latest'),
namespace: z.string().optional().default('default'),
autoStart: z.boolean().optional().default(true),
})
export type CreateContainerInput = z.infer<typeof CreateContainerSchema>
/**
*
*/
export const UpdateContainerSchema = z.object({
autoStart: z.boolean().optional(),
})
export type UpdateContainerInput = z.infer<typeof UpdateContainerSchema>
/**
*
*/
export const ContainerActionSchema = z.object({
action: z.enum(['start', 'stop', 'restart', 'pause', 'resume']),
})
export type ContainerActionInput = z.infer<typeof ContainerActionSchema>
/**
*
*/
export const EnsureContainerSchema = z.object({
image: z.string().optional(),
namespace: z.string().optional(),
})
export type EnsureContainerInput = z.infer<typeof EnsureContainerSchema>
/**
*
*/
export const ContainerResponseSchema = z.object({
id: z.string().uuid(),
userId: z.string().uuid(),
containerId: z.string(),
containerName: z.string(),
image: z.string(),
status: z.string(),
namespace: z.string(),
autoStart: z.boolean(),
createdAt: z.date(),
updatedAt: z.date(),
lastStartedAt: z.date().nullable().optional(),
lastStoppedAt: z.date().nullable().optional(),
})
export type ContainerResponse = z.infer<typeof ContainerResponseSchema>
@@ -0,0 +1,375 @@
import {
getAllContainers as dbGetAllContainers,
getAutoStartContainers,
getContainerByUserId,
createContainerRecord,
updateContainerStatus,
deleteContainerRecord,
type ContainerInfo,
} from '@memoh/db'
import { createContainer, useContainer, containerExists, type ContainerConfig } from '@memoh/container'
import { getContainerPaths, ensureDirectoryExists } from './utils'
/**
*
*/
export const getAllContainers = async (): Promise<ContainerInfo[]> => {
return await dbGetAllContainers()
}
/**
*
*/
export const createUserContainer = async (
userId: string,
image: string = 'docker.io/library/node:20-alpine',
namespace: string = 'default'
): Promise<ContainerInfo> => {
// 检查用户是否已有容器
const existing = await getContainerByUserId(userId)
if (existing) {
throw new Error('User already has a container')
}
const containerName = `user-${userId.slice(0, 8)}-container`
// 检查 containerd 中是否已存在同名容器
try {
const exists = await containerExists(containerName, { namespace })
if (exists) {
console.log(`⚠️ Container ${containerName} already exists in containerd, syncing to database...` )
// 获取容器信息并同步到数据库
const ops = useContainer(containerName, { namespace })
const info = await ops.info()
const paths = getContainerPaths(userId)
const dbRecord = await createContainerRecord({
userId,
containerId: info.id,
containerName: info.name,
image: info.image,
namespace,
autoStart: true,
hostPath: paths.hostPath,
containerPath: paths.containerPath,
})
return dbRecord
}
} catch (error) {
console.error('Error checking container existence:', error)
}
// 获取挂载路径
const paths = getContainerPaths(userId)
// 确保宿主机目录存在
ensureDirectoryExists(paths.hostPath)
// 创建容器配置
const config: ContainerConfig = {
name: containerName,
image,
command: ['sh', '-c', 'while true; do sleep 3600; done'], // 保持容器运行
namespace,
labels: {
userId,
managedBy: 'memoh-api',
},
mounts: [
{
type: 'bind',
source: paths.hostPath,
target: paths.containerPath,
readonly: false,
},
],
}
// 在 containerd 中创建容器
const containerInfo = await createContainer(config, {
namespace,
ctrCommand: process.env.CTR_COMMAND || 'ctr',
})
// 在数据库中记录
const dbRecord = await createContainerRecord({
userId,
containerId: containerInfo.id,
containerName: containerInfo.name,
image: containerInfo.image,
namespace,
autoStart: true,
hostPath: paths.hostPath,
containerPath: paths.containerPath,
})
console.log(`✅ Created container with mount: ${paths.hostPath} -> ${paths.containerPath}`)
return dbRecord
}
/**
*
*/
export const startUserContainer = async (userId: string): Promise<void> => {
const container = await getContainerByUserId(userId)
if (!container) {
throw new Error('Container not found for user')
}
const ops = useContainer(container.containerName, { namespace: container.namespace })
await ops.start()
// 更新数据库状态
await updateContainerStatus(container.containerId, 'running')
}
/**
*
*/
export const stopUserContainer = async (userId: string, timeout: number = 10): Promise<void> => {
const container = await getContainerByUserId(userId)
if (!container) {
throw new Error('Container not found for user')
}
const ops = useContainer(container.containerName, { namespace: container.namespace })
await ops.stop(timeout)
// 更新数据库状态
await updateContainerStatus(container.containerId, 'stopped')
}
/**
*
*/
export const restartUserContainer = async (userId: string): Promise<void> => {
const container = await getContainerByUserId(userId)
if (!container) {
throw new Error('Container not found for user')
}
const ops = useContainer(container.containerName, { namespace: container.namespace })
await ops.restart()
// 更新数据库状态
await updateContainerStatus(container.containerId, 'running')
}
/**
*
*/
export const pauseUserContainer = async (userId: string): Promise<void> => {
const container = await getContainerByUserId(userId)
if (!container) {
throw new Error('Container not found for user')
}
const ops = useContainer(container.containerName, { namespace: container.namespace })
await ops.pause()
// 更新数据库状态
await updateContainerStatus(container.containerId, 'paused')
}
/**
*
*/
export const resumeUserContainer = async (userId: string): Promise<void> => {
const container = await getContainerByUserId(userId)
if (!container) {
throw new Error('Container not found for user')
}
const ops = useContainer(container.containerName, { namespace: container.namespace })
await ops.resume()
// 更新数据库状态
await updateContainerStatus(container.containerId, 'running')
}
/**
*
*/
export const deleteUserContainer = async (userId: string, force: boolean = false): Promise<void> => {
const container = await getContainerByUserId(userId)
if (!container) {
throw new Error('Container not found for user')
}
const ops = useContainer(container.containerName, { namespace: container.namespace })
await ops.remove(force)
// 从数据库删除记录
await deleteContainerRecord(container.id)
}
/**
*
*/
export const getUserContainerInfo = async (userId: string): Promise<ContainerInfo | undefined> => {
return await getContainerByUserId(userId)
}
/**
*
*/
export const startAllAutoStartContainers = async (): Promise<{ success: number; failed: number }> => {
const containers = await getAutoStartContainers()
let success = 0
let failed = 0
console.log(`🚀 Starting ${containers.length} auto-start containers...`)
for (const container of containers) {
try {
const ops = useContainer(container.containerName, { namespace: container.namespace })
// 获取当前状态
const info = await ops.info()
// 只有非运行状态才启动
if (info.status !== 'running') {
await ops.start()
await updateContainerStatus(container.containerId, 'running')
console.log(`✅ Started container: ${container.containerName}`)
success++
} else {
console.log(`⏭️ Container already running: ${container.containerName}`)
success++
}
} catch (error) {
console.error(`❌ Failed to start container ${container.containerName}:`, error)
failed++
// 更新状态为 unknown
await updateContainerStatus(container.containerId, 'unknown')
}
}
console.log(`✨ Container startup complete: ${success} succeeded, ${failed} failed`)
return { success, failed }
}
/**
*
*/
export const pauseAllContainers = async (): Promise<{ success: number; failed: number }> => {
const containers = await dbGetAllContainers()
let success = 0
let failed = 0
console.log(`⏸️ Pausing ${containers.length} containers...`)
for (const container of containers) {
try {
const ops = useContainer(container.containerName, { namespace: container.namespace })
// 获取当前状态
const info = await ops.info()
// 只暂停运行中的容器
if (info.status === 'running') {
await ops.pause()
await updateContainerStatus(container.containerId, 'paused')
console.log(`✅ Paused container: ${container.containerName}`)
success++
} else {
console.log(`⏭️ Container not running, skipped: ${container.containerName}`)
success++
}
} catch (error) {
console.error(`❌ Failed to pause container ${container.containerName}:`, error)
failed++
}
}
console.log(`✨ Container pause complete: ${success} succeeded, ${failed} failed`)
return { success, failed }
}
/**
*
*/
export const stopAllContainers = async (timeout: number = 10): Promise<{ success: number; failed: number }> => {
const containers = await dbGetAllContainers()
let success = 0
let failed = 0
console.log(`⏹️ Stopping ${containers.length} containers...`)
for (const container of containers) {
try {
const ops = useContainer(container.containerName, { namespace: container.namespace })
// 获取当前状态
const info = await ops.info()
// 只停止运行中的容器
if (info.status === 'running') {
await ops.stop(timeout)
await updateContainerStatus(container.containerId, 'stopped')
console.log(`✅ Stopped container: ${container.containerName}`)
success++
} else {
console.log(`⏭️ Container not running, skipped: ${container.containerName}`)
success++
}
} catch (error) {
console.error(`❌ Failed to stop container ${container.containerName}:`, error)
failed++
}
}
console.log(`✨ Container stop complete: ${success} succeeded, ${failed} failed`)
return { success, failed }
}
/**
*
*/
export const ensureUserContainer = async (
userId: string,
image?: string,
namespace?: string
): Promise<ContainerInfo> => {
const existing = await getContainerByUserId(userId)
if (existing) {
return existing
}
// 创建新容器
return await createUserContainer(userId, image, namespace)
}
/**
*
*/
export const syncAllContainerStatus = async (): Promise<void> => {
const containers = await dbGetAllContainers()
console.log(`🔄 Syncing ${containers.length} container statuses...`)
for (const container of containers) {
try {
const ops = useContainer(container.containerName, { namespace: container.namespace })
const info = await ops.info()
if (info.status !== container.status) {
await updateContainerStatus(container.containerId, info.status)
console.log(`✅ Updated container ${container.containerName}: ${container.status} -> ${info.status}`)
}
} catch (error) {
console.error(`❌ Failed to sync container ${container.containerName}:`, error)
await updateContainerStatus(container.containerId, 'unknown')
}
}
console.log('✨ Container status sync complete')
}
@@ -0,0 +1,48 @@
import { existsSync, mkdirSync } from 'fs'
import { join } from 'path'
/**
* Container data directory configuration
*/
const CONTAINER_BASE_DIR = process.env.CONTAINER_DATA_DIR || '/var/lib/memoh/containers'
/**
* Get host path for user container
* @param userId - User ID
* @returns Host path for the container
*/
export function getUserContainerHostPath(userId: string): string {
return join(CONTAINER_BASE_DIR, userId)
}
/**
* Ensure directory exists, create if not
* @param path - Directory path
*/
export function ensureDirectoryExists(path: string): void {
if (!existsSync(path)) {
mkdirSync(path, { recursive: true, mode: 0o755 })
console.log(`📁 Created directory: ${path}`)
}
}
/**
* Initialize container base directory
*/
export function initializeContainerBaseDirectory(): void {
ensureDirectoryExists(CONTAINER_BASE_DIR)
console.log(`✅ Container base directory initialized: ${CONTAINER_BASE_DIR}`)
}
/**
* Get container paths for a user
* @param userId - User ID
* @returns Object with host and container paths
*/
export function getContainerPaths(userId: string) {
return {
hostPath: getUserContainerHostPath(userId),
containerPath: '/data',
}
}
+2 -1
View File
@@ -7,4 +7,5 @@ export * from './user'
export * from './mcp'
export * from './platform'
export * from './schedule'
export * from './memory'
export * from './memory'
export * from './container'
+10
View File
@@ -157,6 +157,16 @@ export const createUser = async (data: CreateUserInput) => {
language: 'Same as user input',
})
// 自动创建用户的容器
try {
const { createUserContainer } = await import('../container/service')
await createUserContainer(newUser.id)
console.log(`✅ Container created for user: ${newUser.username}`)
} catch (error) {
console.error(`❌ Failed to create container for user ${newUser.username}:`, error)
// 不阻塞用户创建,容器可以后续创建
}
return newUser
}
+7
View File
@@ -83,6 +83,7 @@ async function performStreamChat(
}
let buffer = ''
let receivedDone = false
while (true) {
const { done, value } = await reader.read()
@@ -99,6 +100,7 @@ async function performStreamChat(
const data = line.slice(6).trim()
if (data === '[DONE]') {
receivedDone = true
await onEvent({ type: 'done' })
return
}
@@ -119,6 +121,11 @@ async function performStreamChat(
}
}
}
// If stream ended without [DONE], it's an error
if (!receivedDone) {
throw new Error('Connection closed unexpectedly - stream ended without completion signal')
}
}
/**
+21 -9
View File
@@ -1,10 +1,10 @@
# @memoh/container
基于 containerd 的容器化工具包,提供简单易用的容器管理 API。
基于 nerdctl (containerd) 的容器化工具包,提供简单易用的容器管理 API。
## 特性
- 🚀 基于 containerd 的高性能容器管理
- 🚀 基于 nerdctl 的现代容器管理(Docker 兼容)
- 📦 简洁的 API 设计
- 🔧 完整的容器生命周期管理
- 📝 TypeScript 支持
@@ -18,19 +18,31 @@ pnpm install @memoh/container
## 前置要求
系统需要安装 containerd 和 ctr 命令行工具:
### macOS (推荐使用 Lima)
```bash
# macOS (使用 Homebrew)
brew install containerd
# 安装 Lima
brew install lima
# Ubuntu/Debian
apt-get install containerd
# 启动 Lima(已包含 nerdctl
limactl start
# 启动 containerd 服务
sudo systemctl start containerd
# 验证
lima nerdctl version
```
### Linux
```bash
# 安装 nerdctl
# 参考: https://github.com/containerd/nerdctl/releases
# 或使用包管理器
brew install nerdctl # Homebrew on Linux
```
**详细 macOS 配置请参考 [NERDCTL_SETUP.md](./NERDCTL_SETUP.md)**
## 快速开始
### 创建容器
+11 -6
View File
@@ -2,7 +2,7 @@
* High-level container management API
*/
import { ContainerdClient } from './containerd'
import { NerdctlClient } from './nerdctl'
import type {
ContainerConfig,
ContainerInfo,
@@ -34,7 +34,7 @@ export async function createContainer(
config: ContainerConfig,
options?: ContainerdOptions
): Promise<ContainerInfo> {
const client = new ContainerdClient(options)
const client = new NerdctlClient(options)
// Ensure image is pulled
await client.pullImage(config.image)
@@ -76,7 +76,7 @@ export function useContainer(
containerIdOrName: string,
options?: ContainerdOptions
): ContainerOperations {
const client = new ContainerdClient(options)
const client = new NerdctlClient(options)
const containerName = containerIdOrName
return {
@@ -176,6 +176,11 @@ export function useContainer(
},
}
},
buildExecCommand(command: string[]): string[] {
// nerdctl exec with -i to keep STDIN open for MCP servers
return [...client.nerdctlCommand, 'exec', '-i', containerName, ...command]
}
}
}
@@ -194,7 +199,7 @@ export function useContainer(
* ```
*/
export async function listContainers(options?: ContainerdOptions): Promise<ContainerInfo[]> {
const client = new ContainerdClient(options)
const client = new NerdctlClient(options)
return await client.listContainers()
}
@@ -216,7 +221,7 @@ export async function containerExists(
containerIdOrName: string,
options?: ContainerdOptions
): Promise<boolean> {
const client = new ContainerdClient(options)
const client = new NerdctlClient(options)
return await client.containerExists(containerIdOrName)
}
@@ -236,7 +241,7 @@ export async function removeAllContainers(
force: boolean = false,
options?: ContainerdOptions
): Promise<void> {
const client = new ContainerdClient(options)
const client = new NerdctlClient(options)
const containers = await client.listContainers()
for (const container of containers) {
+20 -5
View File
@@ -5,7 +5,6 @@
import { execa } from 'execa'
import type { ContainerConfig, ContainerInfo, ContainerStatus, ContainerdOptions } from './types'
export const buildExecCommand = (name: string, command: string[]) => ['task', 'exec', '--exec-id', `exec-${Date.now()}`, name, ...command]
/**
* Containerd client for managing containers
@@ -14,18 +13,25 @@ export class ContainerdClient {
private namespace: string
private socket?: string
private timeout: number
private ctrCommand: string
constructor(options: ContainerdOptions = {}) {
this.namespace = options.namespace || 'default'
this.socket = options.socket
this.socket = options.socket || process.env.CONTAINERD_SOCKET
this.timeout = options.timeout || 30000
this.ctrCommand = options.ctrCommand || process.env.CTR_COMMAND || 'ctr'
}
buildExecCommand(name: string, command: string[]): string[] {
return this.buildCtrCommand(['task', 'exec', '--exec-id', `exec-${Date.now()}`, name, ...command])
}
/**
* Build ctr command with options
*/
private buildCtrCommand(args: string[]): string[] {
const cmd = ['ctr']
buildCtrCommand(args: string[]): string[] {
// Split ctrCommand and filter out empty strings (supports "lima sudo ctr")
const cmd = this.ctrCommand.split(' ').filter(part => part.length > 0)
if (this.socket) {
cmd.push('--address', this.socket)
@@ -72,6 +78,15 @@ export class ContainerdClient {
async createContainer(config: ContainerConfig): Promise<ContainerInfo> {
const args = ['container', 'create']
// Add mounts if specified
if (config.mounts && config.mounts.length > 0) {
for (const mount of config.mounts) {
// ctr uses 'src' and 'dst' instead of 'source' and 'target'
const mountStr = `type=${mount.type},src=${mount.source},dst=${mount.target}${mount.readonly ? ',readonly' : ''}`
args.push('--mount', mountStr)
}
}
// Add image
args.push(config.image)
@@ -155,7 +170,7 @@ export class ContainerdClient {
* Execute command in container
*/
async execInContainer(name: string, command: string[]): Promise<{ stdout: string; stderr: string; exitCode: number }> {
const args = buildExecCommand(name, command)
const args = this.buildExecCommand(name, command)
try {
const result = await this.exec(args)
+3 -2
View File
@@ -11,8 +11,9 @@ export {
removeAllContainers,
} from './container'
// Export client
export { ContainerdClient, buildExecCommand } from './containerd'
// Export clients
export { ContainerdClient } from './containerd'
export { NerdctlClient } from './nerdctl'
// Export types
export type {
+339
View File
@@ -0,0 +1,339 @@
/**
* Nerdctl client implementation (Docker-compatible CLI for containerd)
*/
import { execa } from 'execa'
import type { ContainerConfig, ContainerInfo, ContainerStatus, ContainerdOptions } from './types'
/**
* Nerdctl client for managing containers
* Provides a Docker-like interface for containerd
*/
export class NerdctlClient {
private namespace: string
private socket?: string
private timeout: number
nerdctlCommand: string[]
constructor(options: ContainerdOptions = {}) {
this.namespace = options.namespace || 'default'
this.socket = options.socket || process.env.CONTAINERD_SOCKET
this.timeout = options.timeout || 30000
// Support commands like "lima nerdctl"
const rawCommand = options.ctrCommand || process.env.CTR_COMMAND || 'nerdctl'
this.nerdctlCommand = rawCommand.split(' ').filter(part => part.length > 0)
}
/**
* Build nerdctl command with global options
*/
private buildCommand(args: string[]): string[] {
// Split command to support "lima nerdctl"
const cmd = [...this.nerdctlCommand]
// Add global options before the subcommand
if (this.socket) {
cmd.push('--address', this.socket)
}
cmd.push('--namespace', this.namespace)
cmd.push(...args)
return cmd
}
/**
* Execute nerdctl command
*/
private async exec(args: string[]): Promise<{ stdout: string; stderr: string }> {
const cmd = this.buildCommand(args)
const [program, ...programArgs] = cmd
try {
const result = await execa(program, programArgs, {
timeout: this.timeout,
})
return {
stdout: result.stdout,
stderr: result.stderr,
}
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error)
throw new Error(`Nerdctl command failed: ${message}`)
}
}
/**
* Pull container image
*/
async pullImage(image: string): Promise<void> {
await this.exec(['pull', image])
}
/**
* Create a new container
*/
async createContainer(config: ContainerConfig): Promise<ContainerInfo> {
const args = ['container', 'create']
// Add container name
args.push('--name', config.name)
// Add environment variables
if (config.env) {
for (const [key, value] of Object.entries(config.env)) {
args.push('--env', `${key}=${value}`)
}
}
// Add working directory
if (config.workingDir) {
args.push('--workdir', config.workingDir)
}
// Add labels
if (config.labels) {
for (const [key, value] of Object.entries(config.labels)) {
args.push('--label', `${key}=${value}`)
}
}
// Add mounts (nerdctl uses Docker-style mount syntax)
if (config.mounts && config.mounts.length > 0) {
for (const mount of config.mounts) {
let mountStr = `type=${mount.type},src=${mount.source},dst=${mount.target}`
if (mount.readonly) {
mountStr += ',readonly'
}
args.push('--mount', mountStr)
}
}
// Add image
args.push(config.image)
// Add command if specified
if (config.command && config.command.length > 0) {
args.push(...config.command)
}
await this.exec(args)
// Return container info
return this.getContainerInfo(config.name)
}
/**
* Start a container
*/
async startContainer(name: string): Promise<void> {
// Check container status and handle accordingly
try {
const status = await this.getContainerStatus(name)
if (status === 'running') {
console.log(`Container ${name} is already running`)
return
}
if (status === 'paused') {
console.log(`Container ${name} is paused, unpausing first...`)
await this.exec(['unpause', name])
return
}
// For 'created' or 'stopped' status, we can start
} catch {
// Container might not exist, let start command handle it
}
await this.exec(['start', name])
}
/**
* Stop a container
*/
async stopContainer(name: string, timeout: number = 10): Promise<void> {
// Check if container is running
try {
const status = await this.getContainerStatus(name)
if (status !== 'running') {
console.log(`Container ${name} is not running (status: ${status})`)
return
}
} catch {
// Container might not exist, let stop command handle it
}
await this.exec(['stop', '--time', timeout.toString(), name])
}
/**
* Pause a container
*/
async pauseContainer(name: string): Promise<void> {
// Check if container is running before pausing
const status = await this.getContainerStatus(name)
if (status !== 'running') {
console.log(`Container ${name} cannot be paused (status: ${status})`)
return
}
await this.exec(['pause', name])
}
/**
* Resume a paused container
*/
async resumeContainer(name: string): Promise<void> {
// Check if container is paused before resuming
const status = await this.getContainerStatus(name)
if (status !== 'paused') {
console.log(`Container ${name} is not paused (status: ${status})`)
return
}
await this.exec(['unpause', name])
}
/**
* Remove a container
*/
async removeContainer(name: string, force: boolean = false): Promise<void> {
const args = ['rm']
if (force) {
args.push('--force')
}
args.push(name)
await this.exec(args)
}
/**
* Execute command in container
*/
async execInContainer(name: string, command: string[]): Promise<{ stdout: string; stderr: string; exitCode: number }> {
const args = ['exec', name, ...command]
try {
const result = await this.exec(args)
return {
stdout: result.stdout,
stderr: result.stderr,
exitCode: 0,
}
} catch (error: unknown) {
const err = error as { stdout?: string; stderr?: string; exitCode?: number; message?: string }
return {
stdout: err.stdout || '',
stderr: err.stderr || err.message || '',
exitCode: err.exitCode || 1,
}
}
}
/**
* Get container information
*/
async getContainerInfo(name: string): Promise<ContainerInfo> {
const result = await this.exec(['inspect', name])
try {
const data = JSON.parse(result.stdout)
const info = Array.isArray(data) ? data[0] : data
// Parse nerdctl inspect output (similar to Docker)
return {
id: info.Id || name,
name: info.Name?.replace(/^\//, '') || name,
image: info.Config?.Image || info.Image || '',
status: this.parseStatus(info.State),
namespace: this.namespace,
createdAt: info.Created ? new Date(info.Created) : new Date(),
labels: info.Config?.Labels || {},
}
} catch {
// Fallback if JSON parsing fails
return {
id: name,
name: name,
image: '',
status: 'unknown',
namespace: this.namespace,
createdAt: new Date(),
}
}
}
/**
* Parse container status from inspect output
*/
private parseStatus(state: unknown): ContainerStatus {
const s = state as { Running?: boolean; Paused?: boolean; Status?: string; Dead?: boolean }
if (!s) return 'unknown'
if (s.Running) return 'running'
if (s.Paused) return 'paused'
if (s.Status === 'created') return 'created'
if (s.Status === 'exited' || s.Dead) return 'stopped'
return 'unknown'
}
/**
* Get container status
*/
async getContainerStatus(name: string): Promise<ContainerStatus> {
try {
const info = await this.getContainerInfo(name)
return info.status
} catch {
return 'unknown'
}
}
/**
* Get container logs
*/
async getContainerLogs(name: string): Promise<string> {
try {
const result = await this.exec(['logs', name])
return result.stdout
} catch (error: unknown) {
return error instanceof Error ? error.message : ''
}
}
/**
* List all containers
*/
async listContainers(): Promise<ContainerInfo[]> {
const result = await this.exec(['ps', '--all', '--format', '{{.Names}}'])
const containerNames = result.stdout.split('\n').filter(name => name.trim())
const containers: ContainerInfo[] = []
for (const name of containerNames) {
try {
const info = await this.getContainerInfo(name)
containers.push(info)
} catch {
// Skip containers that can't be accessed
}
}
return containers
}
/**
* Check if container exists
*/
async containerExists(name: string): Promise<boolean> {
try {
await this.getContainerInfo(name)
return true
} catch {
return false
}
}
}
+4
View File
@@ -98,6 +98,8 @@ export interface ContainerStats {
* Container operations interface
*/
export interface ContainerOperations {
/** Build exec command */
buildExecCommand(command: string[]): string[];
/** Start the container */
start(): Promise<void>;
/** Stop the container */
@@ -130,5 +132,7 @@ export interface ContainerdOptions {
namespace?: string;
/** Timeout for operations (ms) */
timeout?: number;
/** ctr command */
ctrCommand?: string;
}
+203
View File
@@ -0,0 +1,203 @@
import { db } from './index'
import { containers } from './container'
import { eq } from 'drizzle-orm'
/**
*
*/
export type ContainerInfo = {
id: string
userId: string
containerId: string
containerName: string
image: string
status: string
namespace: string
autoStart: boolean
hostPath: string | null
containerPath: string
createdAt: Date
updatedAt: Date
lastStartedAt: Date | null
lastStoppedAt: Date | null
}
/**
*
*/
export type CreateContainerInput = {
userId: string
containerId: string
containerName: string
image: string
namespace?: string
autoStart?: boolean
hostPath?: string
containerPath?: string
}
/**
*
*/
export type UpdateContainerInput = {
status?: string
autoStart?: boolean
lastStartedAt?: Date
lastStoppedAt?: Date
}
/**
*
*/
export const getAllContainers = async (): Promise<ContainerInfo[]> => {
const containerList = await db.select().from(containers)
return containerList
}
/**
*
*/
export const getAutoStartContainers = async (): Promise<ContainerInfo[]> => {
const containerList = await db
.select()
.from(containers)
.where(eq(containers.autoStart, true))
return containerList
}
/**
* ID获取容器
*/
export const getContainerByUserId = async (userId: string): Promise<ContainerInfo | undefined> => {
const [container] = await db
.select()
.from(containers)
.where(eq(containers.userId, userId))
return container
}
/**
*
*/
export const getContainerByName = async (containerName: string): Promise<ContainerInfo | undefined> => {
const [container] = await db
.select()
.from(containers)
.where(eq(containers.containerName, containerName))
return container
}
/**
* ID获取容器
*/
export const getContainerById = async (id: string): Promise<ContainerInfo | undefined> => {
const [container] = await db
.select()
.from(containers)
.where(eq(containers.id, id))
return container
}
/**
*
*/
export const createContainerRecord = async (data: CreateContainerInput): Promise<ContainerInfo> => {
// 检查用户是否已有容器
const existing = await getContainerByUserId(data.userId)
if (existing) {
throw new Error('User already has a container')
}
const [newContainer] = await db
.insert(containers)
.values({
userId: data.userId,
containerId: data.containerId,
containerName: data.containerName,
image: data.image,
namespace: data.namespace || 'default',
autoStart: data.autoStart ?? true,
hostPath: data.hostPath || null,
containerPath: data.containerPath || '/data',
status: 'created',
})
.returning()
return newContainer
}
/**
*
*/
export const updateContainerStatus = async (
containerId: string,
status: string
): Promise<ContainerInfo | null> => {
const [updated] = await db
.update(containers)
.set({
status,
updatedAt: new Date(),
...(status === 'running' ? { lastStartedAt: new Date() } : {}),
...(status === 'stopped' || status === 'paused' ? { lastStoppedAt: new Date() } : {}),
})
.where(eq(containers.containerId, containerId))
.returning()
return updated || null
}
/**
*
*/
export const updateContainer = async (
id: string,
data: UpdateContainerInput
): Promise<ContainerInfo | null> => {
const [updated] = await db
.update(containers)
.set({
...data,
updatedAt: new Date(),
})
.where(eq(containers.id, id))
.returning()
return updated || null
}
/**
*
*/
export const deleteContainerRecord = async (id: string): Promise<ContainerInfo | null> => {
const [deleted] = await db
.delete(containers)
.where(eq(containers.id, id))
.returning()
return deleted || null
}
/**
* ID删除容器记录
*/
export const deleteContainerByUserId = async (userId: string): Promise<ContainerInfo | null> => {
const [deleted] = await db
.delete(containers)
.where(eq(containers.userId, userId))
.returning()
return deleted || null
}
/**
*
*/
export const userHasContainer = async (userId: string): Promise<boolean> => {
const container = await getContainerByUserId(userId)
return !!container
}
+52
View File
@@ -0,0 +1,52 @@
import { pgTable, text, timestamp, uuid, boolean } from 'drizzle-orm/pg-core'
import { users } from './users'
/**
* -
*/
export const containers = pgTable('containers', {
// 主键ID
id: uuid('id').primaryKey().defaultRandom(),
// 关联用户ID
userId: uuid('user_id')
.notNull()
.references(() => users.id, { onDelete: 'cascade' }),
// 容器IDcontainerd 中的实际容器ID
containerId: text('container_id').notNull().unique(),
// 容器名称
containerName: text('container_name').notNull().unique(),
// 容器镜像
image: text('image').notNull(),
// 容器状态:created, running, paused, stopped, unknown
status: text('status').notNull().default('created'),
// 容器命名空间
namespace: text('namespace').notNull().default('default'),
// 是否自动启动
autoStart: boolean('auto_start').notNull().default(true),
// 宿主机挂载目录
hostPath: text('host_path'),
// 容器内挂载目录
containerPath: text('container_path').notNull().default('/data'),
// 创建时间
createdAt: timestamp('created_at').notNull().defaultNow(),
// 更新时间
updatedAt: timestamp('updated_at').notNull().defaultNow(),
// 最后启动时间
lastStartedAt: timestamp('last_started_at'),
// 最后停止时间
lastStoppedAt: timestamp('last_stopped_at'),
})
+4
View File
@@ -4,3 +4,7 @@ import { config } from 'dotenv'
config({ path: '../../' })
export const db = drizzle(process.env.DATABASE_URL!)
// Export helpers
export * from './user-helpers'
export * from './container-helpers'
+2 -1
View File
@@ -4,4 +4,5 @@ export * from './settings'
export * from './schedule'
export * from './users'
export * from './platform'
export * from './mcp-connection'
export * from './mcp-connection'
export * from './container'
+3
View File
@@ -126,6 +126,9 @@ importers:
'@memoh/agent':
specifier: workspace:*
version: link:../agent
'@memoh/container':
specifier: workspace:*
version: link:../container
'@memoh/db':
specifier: workspace:*
version: link:../db