From ba34fb156dd88e14cbaa2e10d79f06ec4d1732df Mon Sep 17 00:00:00 2001 From: Acbox Date: Fri, 30 Jan 2026 23:31:43 +0800 Subject: [PATCH] feat: schedule task --- agent/package.json | 1 + agent/src/agent.ts | 49 ++++- agent/src/config.ts | 3 + agent/src/index.ts | 22 ++ agent/src/middlewares/bearer.ts | 3 + agent/src/modules/chat.ts | 45 +++-- agent/src/prompts/system.ts | 8 +- agent/src/tools/index.ts | 0 agent/src/tools/memory.ts | 24 +++ agent/src/tools/schedule.ts | 98 +++++++++ cmd/agent/main.go | 8 +- db/migrations/0001_init.down.sql | 1 + db/migrations/0001_init.up.sql | 17 ++ db/queries/schedule.sql | 49 +++++ docs/docs.go | 284 ++++++++++++++++++++++++++ docs/swagger.json | 284 ++++++++++++++++++++++++++ docs/swagger.yaml | 187 +++++++++++++++++ go.mod | 1 + go.sum | 2 + internal/chat/resolver.go | 11 +- internal/chat/types.go | 10 + internal/handlers/chat.go | 4 + internal/handlers/schedule.go | 182 +++++++++++++++++ internal/schedule/service.go | 335 +++++++++++++++++++++++++++++++ internal/schedule/types.go | 40 ++++ internal/server/server.go | 5 +- mise.toml | 2 +- pnpm-lock.yaml | 42 +++- 28 files changed, 1674 insertions(+), 43 deletions(-) create mode 100644 agent/src/middlewares/bearer.ts create mode 100644 agent/src/tools/index.ts create mode 100644 agent/src/tools/memory.ts create mode 100644 agent/src/tools/schedule.ts create mode 100644 db/queries/schedule.sql create mode 100644 internal/handlers/schedule.go create mode 100644 internal/schedule/service.go create mode 100644 internal/schedule/types.go diff --git a/agent/package.json b/agent/package.json index da792335..3996dce1 100644 --- a/agent/package.json +++ b/agent/package.json @@ -11,6 +11,7 @@ "@ai-sdk/google": "^3.0.6", "@ai-sdk/mcp": "^1.0.6", "@ai-sdk/openai": "^3.0.7", + "@elysiajs/bearer": "^1.4.2", "@elysiajs/cors": "^1.4.1", "@modelcontextprotocol/sdk": "^1.25.2", "ai": "^6.0.25", diff --git a/agent/src/agent.ts b/agent/src/agent.ts index 0826f509..7bd03645 100644 --- a/agent/src/agent.ts +++ b/agent/src/agent.ts @@ -2,6 +2,8 @@ import { generateText, ModelMessage, stepCountIs, streamText, TextStreamPart, To import { createChatGateway } from './gateway' import { ClientType, Schedule } from './types' import { system, schedule } from './prompts' +import { AuthFetcher } from './index' +import { getScheduleTools } from './tools/schedule' export interface AgentParams { apiKey: string @@ -25,12 +27,22 @@ export interface AgentResult { messages: ModelMessage[] } -export const createAgent = (params: AgentParams) => { +export const createAgent = ( + params: AgentParams, + fetcher: AuthFetcher = fetch, +) => { const gateway = createChatGateway(params.clientType) const messages: ModelMessage[] = [] const maxSteps = params.maxSteps ?? 50 + const getTools = () => { + const scheduleTools = getScheduleTools({ fetch: fetcher }) + return { + ...scheduleTools, + } + } + const generateSystem = () => { return system({ date: new Date(), @@ -44,10 +56,11 @@ export const createAgent = (params: AgentParams) => { const ask = async (input: AgentInput): Promise => { messages.push(...input.messages) - messages.push({ + const user: ModelMessage = { role: 'user', content: input.query, - }) + } + messages.push(user) const { response } = await generateText({ model: gateway({ apiKey: params.apiKey, @@ -56,18 +69,20 @@ export const createAgent = (params: AgentParams) => { system: generateSystem(), stopWhen: stepCountIs(maxSteps), messages, + tools: getTools(), }) return { - messages: response.messages, + messages: [user, ...response.messages], } } async function* stream(input: AgentInput): AsyncGenerator, AgentResult> { messages.push(...input.messages) - messages.push({ + const user: ModelMessage = { role: 'user', content: input.query, - }) + } + messages.push(user) const { response, fullStream } = streamText({ model: gateway({ apiKey: params.apiKey, @@ -76,29 +91,43 @@ export const createAgent = (params: AgentParams) => { system: generateSystem(), stopWhen: stepCountIs(maxSteps), messages, + tools: getTools(), }) for await (const event of fullStream) { yield event } return { - messages: (await response).messages, + messages: [user, ...(await response).messages], } } const triggerSchedule = async ( input: AgentInput, scheduleData: Schedule - ) => { + ): Promise => { messages.push(...input.messages) - messages.push({ + const user: ModelMessage = { role: 'user', content: schedule({ schedule: scheduleData, locale: params.locale, date: new Date(), }), + } + messages.push(user) + const { response } = await generateText({ + model: gateway({ + apiKey: params.apiKey, + baseURL: params.baseUrl, + })(params.model), + system: generateSystem(), + stopWhen: stepCountIs(maxSteps), + messages, + tools: getTools(), }) - return await ask(input) + return { + messages: [user, ...response.messages], + } } return { diff --git a/agent/src/config.ts b/agent/src/config.ts index 4e0202b0..1b88f1ba 100644 --- a/agent/src/config.ts +++ b/agent/src/config.ts @@ -5,6 +5,9 @@ type AgentGatewayConfig = { 'agent_gateway': { host?: string port?: number + }, + 'server': { + addr?: string } } diff --git a/agent/src/index.ts b/agent/src/index.ts index 60742b2a..b8467289 100644 --- a/agent/src/index.ts +++ b/agent/src/index.ts @@ -3,9 +3,31 @@ import { chatModule } from './modules/chat' import { corsMiddleware } from './middlewares/cors' import { errorMiddleware } from './middlewares/error' import { loadConfig } from './config' +import { join } from 'path' const config = loadConfig('../config.toml') +export type AuthFetcher = (url: string, options: RequestInit) => Promise +export const createAuthFetcher = (bearer: string | undefined): AuthFetcher => { + return async (url: string, options: RequestInit) => { + const headers = new Headers(options.headers || {}) + if (bearer) { + headers.set('Authorization', `Bearer ${bearer}`) + } + let baseUrl = '' + if (!baseUrl) { + baseUrl = 'http://127.0.0.1' + } + if (typeof config.server.addr === 'string' && config.server.addr.startsWith(':')) { + baseUrl = `http://127.0.0.1${config.server.addr}` + } + return await fetch(join(baseUrl, url), { + ...options, + headers, + }) + } +} + const app = new Elysia() .use(corsMiddleware) .use(errorMiddleware) diff --git a/agent/src/middlewares/bearer.ts b/agent/src/middlewares/bearer.ts new file mode 100644 index 00000000..a0829c84 --- /dev/null +++ b/agent/src/middlewares/bearer.ts @@ -0,0 +1,3 @@ +import { bearer } from '@elysiajs/bearer' + +export const bearerMiddleware = bearer() \ No newline at end of file diff --git a/agent/src/modules/chat.ts b/agent/src/modules/chat.ts index 6e41f8db..ae851f37 100644 --- a/agent/src/modules/chat.ts +++ b/agent/src/modules/chat.ts @@ -1,8 +1,10 @@ import { Elysia, sse } from 'elysia' import z from 'zod' import { createAgent } from '../agent' +import { createAuthFetcher } from '../index' import { ClientType } from '../types' import { ModelMessage } from 'ai' +import { bearerMiddleware } from '../middlewares/bearer' const ChatBody = z.object({ apiKey: z.string().min(1, 'API key is required'), @@ -36,13 +38,8 @@ const ScheduleBody = z.object({ }).and(ChatBody) export const chatModule = new Elysia({ prefix: '/chat' }) - .post('/', async ({ body }) => { - console.log('[Chat] request', { - type: 'chat', - clientType: body.clientType, - model: body.model, - baseUrl: body.baseUrl, - }) + .use(bearerMiddleware) + .post('/', async ({ body, bearer }) => { const { ask } = createAgent({ apiKey: body.apiKey, baseUrl: body.baseUrl, @@ -54,7 +51,7 @@ export const chatModule = new Elysia({ prefix: '/chat' }) maxContextLoadTime: body.maxContextLoadTime, platforms: body.platforms, currentPlatform: body.currentPlatform, - }) + }, createAuthFetcher(bearer)) try { const result = await ask({ messages: body.messages as unknown as ModelMessage[], @@ -75,12 +72,13 @@ export const chatModule = new Elysia({ prefix: '/chat' }) }, { body: ChatBody, }) - .post('/stream', async function* ({ body }) { + .post('/stream', async function* ({ body, bearer }) { console.log('[Chat] request', { type: 'stream', clientType: body.clientType, model: body.model, baseUrl: body.baseUrl, + bearer, }) const { stream } = createAgent({ apiKey: body.apiKey, @@ -93,7 +91,7 @@ export const chatModule = new Elysia({ prefix: '/chat' }) maxContextLoadTime: body.maxContextLoadTime, platforms: body.platforms, currentPlatform: body.currentPlatform, - }) + }, createAuthFetcher(bearer)) try { const streanGenerator = stream({ messages: body.messages as unknown as ModelMessage[], @@ -127,7 +125,12 @@ export const chatModule = new Elysia({ prefix: '/chat' }) }, { body: ChatBody, }) - .post('/schedule', async ({ body }) => { + .post('/schedule', async ({ body, bearer }) => { + console.log('[Chat] schedule request', { + type: 'schedule', + bearer, + body, + }) const { triggerSchedule } = createAgent({ apiKey: body.apiKey, baseUrl: body.baseUrl, @@ -139,11 +142,21 @@ export const chatModule = new Elysia({ prefix: '/chat' }) maxContextLoadTime: body.maxContextLoadTime, platforms: body.platforms, currentPlatform: body.currentPlatform, - }) - return await triggerSchedule({ - messages: body.messages as unknown as ModelMessage[], - query: body.query, - }, body.schedule) + }, createAuthFetcher(bearer)) + try { + return await triggerSchedule({ + messages: body.messages as unknown as ModelMessage[], + query: body.query, + }, body.schedule) + } catch (error) { + console.error('[Chat] schedule error', { + type: 'schedule', + bearer, + body, + error, + }) + throw error + } }, { body: ScheduleBody, }) \ No newline at end of file diff --git a/agent/src/prompts/system.ts b/agent/src/prompts/system.ts index 6bc2edfe..26a7d804 100644 --- a/agent/src/prompts/system.ts +++ b/agent/src/prompts/system.ts @@ -32,12 +32,12 @@ Your abilities: **Schedule** - We use **Cron Syntax** to schedule tasks. -- You can use ${quote('get-schedules')} to get the list of schedules. -- You can use ${quote('remove-schedule')} to remove a schedule by id. -- You can use ${quote('schedule')} to schedule a task. +- You can use ${quote('schedule_list')} to get the list of schedules. +- You can use ${quote('schedule_delete')} to remove a schedule by id. +- You can use ${quote('schedule_create')} to create a new schedule. + The ${quote('pattern')} is the pattern of the schedule with **Cron Syntax**. + The ${quote('command')} is the natural language command to execute, will send to you when the schedule is triggered, which means the command will be executed by presence of you. - + The ${quote('maxCalls')} is the maximum number of calls to the schedule, If you want to run the task only once, set it to 1. + + The ${quote('max_calls')} is the maximum number of calls to the schedule, If you want to run the task only once, set it to 1. - The ${quote('command')} should include the method (e.g. ${quote('send-message')}) for returning the task result. If the user does not specify otherwise, the user should be asked how they would like to be notified. **Message** diff --git a/agent/src/tools/index.ts b/agent/src/tools/index.ts new file mode 100644 index 00000000..e69de29b diff --git a/agent/src/tools/memory.ts b/agent/src/tools/memory.ts new file mode 100644 index 00000000..dd8d96c8 --- /dev/null +++ b/agent/src/tools/memory.ts @@ -0,0 +1,24 @@ +import { tool } from 'ai' +import { AuthFetcher } from '..' +import { z } from 'zod' + +export type MemoryToolParams = { + fetch: AuthFetcher +} + +export const getMemoryTools = ({ fetch }: MemoryToolParams) => { + const searchMemory = tool({ + description: 'Search for memories', + inputSchema: z.object({ + query: z.string().describe('The query to search for memories'), + }), + execute: async ({ query }) => { + const response = await fetch(`/memory/search?query=${query}`) + return response.json() + }, + }) + + return { + 'search_memory': searchMemory, + } +} \ No newline at end of file diff --git a/agent/src/tools/schedule.ts b/agent/src/tools/schedule.ts new file mode 100644 index 00000000..0d740b90 --- /dev/null +++ b/agent/src/tools/schedule.ts @@ -0,0 +1,98 @@ +import { tool } from 'ai' +import { z } from 'zod' +import { AuthFetcher } from '..' + +export type ScheduleToolParams = { + fetch: AuthFetcher +} + +const ScheduleSchema = z.object({ + id: z.string(), + name: z.string(), + description: z.string(), + pattern: z.string(), + max_calls: z.number().nullable().optional(), + current_calls: z.number().optional(), + created_at: z.string().optional(), + updated_at: z.string().optional(), + enabled: z.boolean(), + command: z.string(), + user_id: z.string().optional(), +}) + +export const getScheduleTools = ({ fetch }: ScheduleToolParams) => { + const listSchedules = tool({ + description: 'List schedules for current user', + inputSchema: z.object({}), + execute: async () => { + const response = await fetch('/schedule', { method: 'GET' }) + return response.json() + }, + }) + + const getSchedule = tool({ + description: 'Get a schedule by id', + inputSchema: z.object({ + id: z.string().describe('Schedule ID'), + }), + execute: async ({ id }) => { + const response = await fetch(`/schedule/${id}`, { method: 'GET' }) + return response.json() + }, + }) + + const createSchedule = tool({ + description: 'Create a new schedule', + inputSchema: z.object({ + name: z.string(), + description: z.string(), + pattern: z.string(), + max_calls: z.number().optional(), + enabled: z.boolean().optional(), + command: z.string(), + }), + execute: async (payload) => { + const response = await fetch('/schedule', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(payload), + }) + return response.json() + }, + }) + + const updateSchedule = tool({ + description: 'Update an existing schedule', + inputSchema: ScheduleSchema.partial().extend({ + id: z.string(), + }), + execute: async (payload) => { + const { id, ...body } = payload + const response = await fetch(`/schedule/${id}`, { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }) + return response.json() + }, + }) + + const deleteSchedule = tool({ + description: 'Delete a schedule', + inputSchema: z.object({ + id: z.string(), + }), + execute: async ({ id }) => { + const response = await fetch(`/schedule/${id}`, { method: 'DELETE' }) + return response.status === 204 ? { success: true } : response.json() + }, + }) + + return { + 'schedule_list': listSchedules, + 'schedule_get': getSchedule, + 'schedule_create': createSchedule, + 'schedule_update': updateSchedule, + 'schedule_delete': deleteSchedule, + } +} \ No newline at end of file diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 5d202baa..56052a81 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -20,6 +20,7 @@ import ( "github.com/memohai/memoh/internal/memory" "github.com/memohai/memoh/internal/models" "github.com/memohai/memoh/internal/providers" + "github.com/memohai/memoh/internal/schedule" "github.com/memohai/memoh/internal/settings" "github.com/memohai/memoh/internal/server" @@ -162,7 +163,12 @@ func main() { settingsHandler := handlers.NewSettingsHandler(settingsService) historyService := history.NewService(queries) historyHandler := handlers.NewHistoryHandler(historyService) - srv := server.NewServer(addr, cfg.Auth.JWTSecret, pingHandler, authHandler, memoryHandler, embeddingsHandler, chatHandler, swaggerHandler, providersHandler, modelsHandler, settingsHandler, historyHandler, containerdHandler) + scheduleService := schedule.NewService(queries, chatResolver, cfg.Auth.JWTSecret) + if err := scheduleService.Bootstrap(ctx); err != nil { + log.Fatalf("schedule bootstrap: %v", err) + } + scheduleHandler := handlers.NewScheduleHandler(scheduleService) + srv := server.NewServer(addr, cfg.Auth.JWTSecret, pingHandler, authHandler, memoryHandler, embeddingsHandler, chatHandler, swaggerHandler, providersHandler, modelsHandler, settingsHandler, historyHandler, scheduleHandler, containerdHandler) if err := srv.Start(); err != nil { log.Fatalf("server failed: %v", err) diff --git a/db/migrations/0001_init.down.sql b/db/migrations/0001_init.down.sql index e34862bb..d76577b2 100644 --- a/db/migrations/0001_init.down.sql +++ b/db/migrations/0001_init.down.sql @@ -1,5 +1,6 @@ DROP TABLE IF EXISTS user_settings; DROP TABLE IF EXISTS history; +DROP TABLE IF EXISTS schedule; DROP TABLE IF EXISTS lifecycle_events; DROP TABLE IF EXISTS container_versions; DROP TABLE IF EXISTS models; diff --git a/db/migrations/0001_init.up.sql b/db/migrations/0001_init.up.sql index 51f662c2..89d0e270 100644 --- a/db/migrations/0001_init.up.sql +++ b/db/migrations/0001_init.up.sql @@ -146,3 +146,20 @@ CREATE TABLE IF NOT EXISTS user_settings ( max_context_load_time INTEGER NOT NULL DEFAULT 1440, language TEXT NOT NULL DEFAULT 'Same as user input' ); + +CREATE TABLE IF NOT EXISTS schedule ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL, + description TEXT NOT NULL, + pattern TEXT NOT NULL, + max_calls INTEGER, + current_calls INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + enabled BOOLEAN NOT NULL DEFAULT true, + command TEXT NOT NULL, + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_schedule_user_id ON schedule(user_id); +CREATE INDEX IF NOT EXISTS idx_schedule_enabled ON schedule(enabled); diff --git a/db/queries/schedule.sql b/db/queries/schedule.sql new file mode 100644 index 00000000..014fa0b1 --- /dev/null +++ b/db/queries/schedule.sql @@ -0,0 +1,49 @@ +-- name: CreateSchedule :one +INSERT INTO schedule (name, description, pattern, max_calls, enabled, command, user_id) +VALUES ($1, $2, $3, $4, $5, $6, $7) +RETURNING id, name, description, pattern, max_calls, current_calls, created_at, updated_at, enabled, command, user_id; + +-- name: GetScheduleByID :one +SELECT id, name, description, pattern, max_calls, current_calls, created_at, updated_at, enabled, command, user_id +FROM schedule +WHERE id = $1; + +-- name: ListSchedulesByUser :many +SELECT id, name, description, pattern, max_calls, current_calls, created_at, updated_at, enabled, command, user_id +FROM schedule +WHERE user_id = $1 +ORDER BY created_at DESC; + +-- name: ListEnabledSchedules :many +SELECT id, name, description, pattern, max_calls, current_calls, created_at, updated_at, enabled, command, user_id +FROM schedule +WHERE enabled = true +ORDER BY created_at DESC; + +-- name: UpdateSchedule :one +UPDATE schedule +SET name = $2, + description = $3, + pattern = $4, + max_calls = $5, + enabled = $6, + command = $7, + updated_at = now() +WHERE id = $1 +RETURNING id, name, description, pattern, max_calls, current_calls, created_at, updated_at, enabled, command, user_id; + +-- name: DeleteSchedule :exec +DELETE FROM schedule +WHERE id = $1; + +-- name: IncrementScheduleCalls :one +UPDATE schedule +SET current_calls = current_calls + 1, + enabled = CASE + WHEN max_calls IS NOT NULL AND current_calls + 1 >= max_calls THEN false + ELSE enabled + END, + updated_at = now() +WHERE id = $1 +RETURNING id, name, description, pattern, max_calls, current_calls, created_at, updated_at, enabled, command, user_id; + diff --git a/docs/docs.go b/docs/docs.go index 435015a7..1abeec7a 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1604,6 +1604,195 @@ const docTemplate = `{ } } }, + "/schedule": { + "get": { + "description": "List schedules for current user", + "tags": [ + "schedule" + ], + "summary": "List schedules", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/schedule.ListResponse" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "post": { + "description": "Create a schedule for current user", + "tags": [ + "schedule" + ], + "summary": "Create schedule", + "parameters": [ + { + "description": "Schedule payload", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/schedule.CreateRequest" + } + } + ], + "responses": { + "201": { + "description": "Created", + "schema": { + "$ref": "#/definitions/schedule.Schedule" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + } + }, + "/schedule/{id}": { + "get": { + "description": "Get a schedule by ID", + "tags": [ + "schedule" + ], + "summary": "Get schedule", + "parameters": [ + { + "type": "string", + "description": "Schedule ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/schedule.Schedule" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "put": { + "description": "Update a schedule by ID", + "tags": [ + "schedule" + ], + "summary": "Update schedule", + "parameters": [ + { + "type": "string", + "description": "Schedule ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "Schedule payload", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/schedule.UpdateRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/schedule.Schedule" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "delete": { + "description": "Delete a schedule by ID", + "tags": [ + "schedule" + ], + "summary": "Delete schedule", + "parameters": [ + { + "type": "string", + "description": "Schedule ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "204": { + "description": "No Content" + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + } + }, "/settings": { "get": { "description": "Get agent settings for current user", @@ -2470,6 +2659,101 @@ const docTemplate = `{ } } }, + "schedule.CreateRequest": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "description": { + "type": "string" + }, + "enabled": { + "type": "boolean" + }, + "max_calls": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "pattern": { + "type": "string" + } + } + }, + "schedule.ListResponse": { + "type": "object", + "properties": { + "items": { + "type": "array", + "items": { + "$ref": "#/definitions/schedule.Schedule" + } + } + } + }, + "schedule.Schedule": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "created_at": { + "type": "string" + }, + "current_calls": { + "type": "integer" + }, + "description": { + "type": "string" + }, + "enabled": { + "type": "boolean" + }, + "id": { + "type": "string" + }, + "max_calls": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "pattern": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "user_id": { + "type": "string" + } + } + }, + "schedule.UpdateRequest": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "description": { + "type": "string" + }, + "enabled": { + "type": "boolean" + }, + "max_calls": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "pattern": { + "type": "string" + } + } + }, "settings.Settings": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index a8c2b653..cbb90be5 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1595,6 +1595,195 @@ } } }, + "/schedule": { + "get": { + "description": "List schedules for current user", + "tags": [ + "schedule" + ], + "summary": "List schedules", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/schedule.ListResponse" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "post": { + "description": "Create a schedule for current user", + "tags": [ + "schedule" + ], + "summary": "Create schedule", + "parameters": [ + { + "description": "Schedule payload", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/schedule.CreateRequest" + } + } + ], + "responses": { + "201": { + "description": "Created", + "schema": { + "$ref": "#/definitions/schedule.Schedule" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + } + }, + "/schedule/{id}": { + "get": { + "description": "Get a schedule by ID", + "tags": [ + "schedule" + ], + "summary": "Get schedule", + "parameters": [ + { + "type": "string", + "description": "Schedule ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/schedule.Schedule" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "put": { + "description": "Update a schedule by ID", + "tags": [ + "schedule" + ], + "summary": "Update schedule", + "parameters": [ + { + "type": "string", + "description": "Schedule ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "Schedule payload", + "name": "payload", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/schedule.UpdateRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/schedule.Schedule" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + }, + "delete": { + "description": "Delete a schedule by ID", + "tags": [ + "schedule" + ], + "summary": "Delete schedule", + "parameters": [ + { + "type": "string", + "description": "Schedule ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "204": { + "description": "No Content" + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handlers.ErrorResponse" + } + } + } + } + }, "/settings": { "get": { "description": "Get agent settings for current user", @@ -2461,6 +2650,101 @@ } } }, + "schedule.CreateRequest": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "description": { + "type": "string" + }, + "enabled": { + "type": "boolean" + }, + "max_calls": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "pattern": { + "type": "string" + } + } + }, + "schedule.ListResponse": { + "type": "object", + "properties": { + "items": { + "type": "array", + "items": { + "$ref": "#/definitions/schedule.Schedule" + } + } + } + }, + "schedule.Schedule": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "created_at": { + "type": "string" + }, + "current_calls": { + "type": "integer" + }, + "description": { + "type": "string" + }, + "enabled": { + "type": "boolean" + }, + "id": { + "type": "string" + }, + "max_calls": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "pattern": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "user_id": { + "type": "string" + } + } + }, + "schedule.UpdateRequest": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "description": { + "type": "string" + }, + "enabled": { + "type": "boolean" + }, + "max_calls": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "pattern": { + "type": "string" + } + } + }, "settings.Settings": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 97b7dd77..28e762d9 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -487,6 +487,68 @@ definitions: name: type: string type: object + schedule.CreateRequest: + properties: + command: + type: string + description: + type: string + enabled: + type: boolean + max_calls: + type: integer + name: + type: string + pattern: + type: string + type: object + schedule.ListResponse: + properties: + items: + items: + $ref: '#/definitions/schedule.Schedule' + type: array + type: object + schedule.Schedule: + properties: + command: + type: string + created_at: + type: string + current_calls: + type: integer + description: + type: string + enabled: + type: boolean + id: + type: string + max_calls: + type: integer + name: + type: string + pattern: + type: string + updated_at: + type: string + user_id: + type: string + type: object + schedule.UpdateRequest: + properties: + command: + type: string + description: + type: string + enabled: + type: boolean + max_calls: + type: integer + name: + type: string + pattern: + type: string + type: object settings.Settings: properties: language: @@ -1556,6 +1618,131 @@ paths: summary: Get provider by name tags: - providers + /schedule: + get: + description: List schedules for current user + responses: + "200": + description: OK + schema: + $ref: '#/definitions/schedule.ListResponse' + "400": + description: Bad Request + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: List schedules + tags: + - schedule + post: + description: Create a schedule for current user + parameters: + - description: Schedule payload + in: body + name: payload + required: true + schema: + $ref: '#/definitions/schedule.CreateRequest' + responses: + "201": + description: Created + schema: + $ref: '#/definitions/schedule.Schedule' + "400": + description: Bad Request + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: Create schedule + tags: + - schedule + /schedule/{id}: + delete: + description: Delete a schedule by ID + parameters: + - description: Schedule ID + in: path + name: id + required: true + type: string + responses: + "204": + description: No Content + "400": + description: Bad Request + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: Delete schedule + tags: + - schedule + get: + description: Get a schedule by ID + parameters: + - description: Schedule ID + in: path + name: id + required: true + type: string + responses: + "200": + description: OK + schema: + $ref: '#/definitions/schedule.Schedule' + "400": + description: Bad Request + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "404": + description: Not Found + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: Get schedule + tags: + - schedule + put: + description: Update a schedule by ID + parameters: + - description: Schedule ID + in: path + name: id + required: true + type: string + - description: Schedule payload + in: body + name: payload + required: true + schema: + $ref: '#/definitions/schedule.UpdateRequest' + responses: + "200": + description: OK + schema: + $ref: '#/definitions/schedule.Schedule' + "400": + description: Bad Request + schema: + $ref: '#/definitions/handlers.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/handlers.ErrorResponse' + summary: Update schedule + tags: + - schedule /settings: delete: description: Remove agent settings for current user diff --git a/go.mod b/go.mod index 8f8c0202..833eebd1 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/modelcontextprotocol/go-sdk v1.2.0 github.com/opencontainers/runtime-spec v1.3.0 github.com/qdrant/go-client v1.16.2 + github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.11.1 github.com/swaggo/swag v1.16.6 golang.org/x/crypto v0.47.0 diff --git a/go.sum b/go.sum index 264e972f..dbe9cdc2 100644 --- a/go.sum +++ b/go.sum @@ -176,6 +176,8 @@ github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzM github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/qdrant/go-client v1.16.2 h1:UUMJJfvXTByhwhH1DwWdbkhZ2cTdvSqVkXSIfBrVWSg= github.com/qdrant/go-client v1.16.2/go.mod h1:I+EL3h4HRoRTeHtbfOd/4kDXwCukZfkd41j/9wryGkw= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w= diff --git a/internal/chat/resolver.go b/internal/chat/resolver.go index 9b02f2ad..0bd90077 100644 --- a/internal/chat/resolver.go +++ b/internal/chat/resolver.go @@ -131,7 +131,7 @@ func (r *Resolver) Chat(ctx context.Context, req ChatRequest) (ChatResponse, err }, nil } -func (r *Resolver) TriggerSchedule(ctx context.Context, userID string, schedule SchedulePayload) error { +func (r *Resolver) TriggerSchedule(ctx context.Context, userID string, schedule SchedulePayload, token string) error { if strings.TrimSpace(userID) == "" { return fmt.Errorf("user id is required") } @@ -180,7 +180,7 @@ func (r *Resolver) TriggerSchedule(ctx context.Context, userID string, schedule Schedule: schedule, } - resp, err := r.postSchedule(ctx, payload, "") + resp, err := r.postSchedule(ctx, payload, token) if err != nil { return err } @@ -482,12 +482,7 @@ func (r *Resolver) storeHistory(ctx context.Context, userID, query string, respo if strings.TrimSpace(query) == "" && len(responseMessages) == 0 { return nil } - userMessage := GatewayMessage{ - "role": "user", - "content": query, - } - messages := append([]GatewayMessage{userMessage}, responseMessages...) - payload, err := json.Marshal(messages) + payload, err := json.Marshal(responseMessages) if err != nil { return err } diff --git a/internal/chat/types.go b/internal/chat/types.go index 10cff90e..49f62dbc 100644 --- a/internal/chat/types.go +++ b/internal/chat/types.go @@ -11,6 +11,7 @@ type GatewayMessage map[string]interface{} type ChatRequest struct { UserID string `json:"-"` + Token string `json:"-"` Query string `json:"query"` Model string `json:"model,omitempty"` Provider string `json:"provider,omitempty"` @@ -30,3 +31,12 @@ type ChatResponse struct { } type StreamChunk = json.RawMessage + +type SchedulePayload struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Pattern string `json:"pattern"` + MaxCalls *int `json:"maxCalls,omitempty"` + Command string `json:"command"` +} diff --git a/internal/handlers/chat.go b/internal/handlers/chat.go index aa7329c8..9d097938 100644 --- a/internal/handlers/chat.go +++ b/internal/handlers/chat.go @@ -53,6 +53,8 @@ func (h *ChatHandler) Chat(c echo.Context) error { return echo.NewHTTPError(http.StatusBadRequest, "query is required") } req.UserID = userID + req.Token = c.Request().Header.Get("Authorization") + req.Token = c.Request().Header.Get("Authorization") resp, err := h.resolver.Chat(c.Request().Context(), req) if err != nil { @@ -88,6 +90,8 @@ func (h *ChatHandler) StreamChat(c echo.Context) error { return echo.NewHTTPError(http.StatusBadRequest, "query is required") } req.UserID = userID + req.Token = c.Request().Header.Get("Authorization") + req.Token = c.Request().Header.Get("Authorization") // Set headers for SSE c.Response().Header().Set(echo.HeaderContentType, "text/event-stream") diff --git a/internal/handlers/schedule.go b/internal/handlers/schedule.go new file mode 100644 index 00000000..bd7bc0ae --- /dev/null +++ b/internal/handlers/schedule.go @@ -0,0 +1,182 @@ +package handlers + +import ( + "net/http" + + "github.com/labstack/echo/v4" + + "github.com/memohai/memoh/internal/auth" + "github.com/memohai/memoh/internal/identity" + "github.com/memohai/memoh/internal/schedule" +) + +type ScheduleHandler struct { + service *schedule.Service +} + +func NewScheduleHandler(service *schedule.Service) *ScheduleHandler { + return &ScheduleHandler{service: service} +} + +func (h *ScheduleHandler) Register(e *echo.Echo) { + group := e.Group("/schedule") + group.POST("", h.Create) + group.GET("", h.List) + group.GET("/:id", h.Get) + group.PUT("/:id", h.Update) + group.DELETE("/:id", h.Delete) +} + +// Create godoc +// @Summary Create schedule +// @Description Create a schedule for current user +// @Tags schedule +// @Param payload body schedule.CreateRequest true "Schedule payload" +// @Success 201 {object} schedule.Schedule +// @Failure 400 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /schedule [post] +func (h *ScheduleHandler) Create(c echo.Context) error { + userID, err := h.requireUserID(c) + if err != nil { + return err + } + var req schedule.CreateRequest + if err := c.Bind(&req); err != nil { + return echo.NewHTTPError(http.StatusBadRequest, err.Error()) + } + resp, err := h.service.Create(c.Request().Context(), userID, req) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + return c.JSON(http.StatusCreated, resp) +} + +// List godoc +// @Summary List schedules +// @Description List schedules for current user +// @Tags schedule +// @Success 200 {object} schedule.ListResponse +// @Failure 400 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /schedule [get] +func (h *ScheduleHandler) List(c echo.Context) error { + userID, err := h.requireUserID(c) + if err != nil { + return err + } + items, err := h.service.List(c.Request().Context(), userID) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + return c.JSON(http.StatusOK, schedule.ListResponse{Items: items}) +} + +// Get godoc +// @Summary Get schedule +// @Description Get a schedule by ID +// @Tags schedule +// @Param id path string true "Schedule ID" +// @Success 200 {object} schedule.Schedule +// @Failure 400 {object} ErrorResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /schedule/{id} [get] +func (h *ScheduleHandler) Get(c echo.Context) error { + userID, err := h.requireUserID(c) + if err != nil { + return err + } + id := c.Param("id") + if id == "" { + return echo.NewHTTPError(http.StatusBadRequest, "id is required") + } + item, err := h.service.Get(c.Request().Context(), id) + if err != nil { + return echo.NewHTTPError(http.StatusNotFound, err.Error()) + } + if item.UserID != userID { + return echo.NewHTTPError(http.StatusForbidden, "user mismatch") + } + return c.JSON(http.StatusOK, item) +} + +// Update godoc +// @Summary Update schedule +// @Description Update a schedule by ID +// @Tags schedule +// @Param id path string true "Schedule ID" +// @Param payload body schedule.UpdateRequest true "Schedule payload" +// @Success 200 {object} schedule.Schedule +// @Failure 400 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /schedule/{id} [put] +func (h *ScheduleHandler) Update(c echo.Context) error { + userID, err := h.requireUserID(c) + if err != nil { + return err + } + id := c.Param("id") + if id == "" { + return echo.NewHTTPError(http.StatusBadRequest, "id is required") + } + var req schedule.UpdateRequest + if err := c.Bind(&req); err != nil { + return echo.NewHTTPError(http.StatusBadRequest, err.Error()) + } + item, err := h.service.Get(c.Request().Context(), id) + if err != nil { + return echo.NewHTTPError(http.StatusNotFound, err.Error()) + } + if item.UserID != userID { + return echo.NewHTTPError(http.StatusForbidden, "user mismatch") + } + resp, err := h.service.Update(c.Request().Context(), id, req) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + return c.JSON(http.StatusOK, resp) +} + +// Delete godoc +// @Summary Delete schedule +// @Description Delete a schedule by ID +// @Tags schedule +// @Param id path string true "Schedule ID" +// @Success 204 "No Content" +// @Failure 400 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /schedule/{id} [delete] +func (h *ScheduleHandler) Delete(c echo.Context) error { + userID, err := h.requireUserID(c) + if err != nil { + return err + } + id := c.Param("id") + if id == "" { + return echo.NewHTTPError(http.StatusBadRequest, "id is required") + } + item, err := h.service.Get(c.Request().Context(), id) + if err != nil { + return echo.NewHTTPError(http.StatusNotFound, err.Error()) + } + if item.UserID != userID { + return echo.NewHTTPError(http.StatusForbidden, "user mismatch") + } + if err := h.service.Delete(c.Request().Context(), id); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + return c.NoContent(http.StatusNoContent) +} + +func (h *ScheduleHandler) requireUserID(c echo.Context) (string, error) { + userID, err := auth.UserIDFromContext(c) + if err != nil { + return "", err + } + if err := identity.ValidateUserID(userID); err != nil { + return "", echo.NewHTTPError(http.StatusBadRequest, err.Error()) + } + return userID, nil +} + diff --git a/internal/schedule/service.go b/internal/schedule/service.go new file mode 100644 index 00000000..3fe0241b --- /dev/null +++ b/internal/schedule/service.go @@ -0,0 +1,335 @@ +package schedule + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/robfig/cron/v3" + + "github.com/memohai/memoh/internal/auth" + "github.com/memohai/memoh/internal/chat" + "github.com/memohai/memoh/internal/db/sqlc" +) + +type Service struct { + queries *sqlc.Queries + cron *cron.Cron + parser cron.Parser + chat *chat.Resolver + jwtSecret string + mu sync.Mutex + jobs map[string]cron.EntryID +} + +func NewService(queries *sqlc.Queries, chatResolver *chat.Resolver, jwtSecret string) *Service { + parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + c := cron.New(cron.WithParser(parser)) + service := &Service{ + queries: queries, + cron: c, + parser: parser, + chat: chatResolver, + jwtSecret: jwtSecret, + jobs: map[string]cron.EntryID{}, + } + c.Start() + return service +} + +func (s *Service) Bootstrap(ctx context.Context) error { + if s.queries == nil { + return fmt.Errorf("schedule queries not configured") + } + items, err := s.queries.ListEnabledSchedules(ctx) + if err != nil { + return err + } + for _, item := range items { + if err := s.scheduleJob(item); err != nil { + return err + } + } + return nil +} + +func (s *Service) Create(ctx context.Context, userID string, req CreateRequest) (Schedule, error) { + if s.queries == nil { + return Schedule{}, fmt.Errorf("schedule queries not configured") + } + if strings.TrimSpace(req.Name) == "" || strings.TrimSpace(req.Description) == "" || strings.TrimSpace(req.Pattern) == "" || strings.TrimSpace(req.Command) == "" { + return Schedule{}, fmt.Errorf("name, description, pattern, command are required") + } + if _, err := s.parser.Parse(req.Pattern); err != nil { + return Schedule{}, fmt.Errorf("invalid cron pattern: %w", err) + } + pgUserID, err := parseUUID(userID) + if err != nil { + return Schedule{}, err + } + maxCalls := pgtype.Int4{Valid: false} + if req.MaxCalls != nil { + maxCalls = pgtype.Int4{Int32: int32(*req.MaxCalls), Valid: true} + } + enabled := true + if req.Enabled != nil { + enabled = *req.Enabled + } + row, err := s.queries.CreateSchedule(ctx, sqlc.CreateScheduleParams{ + Name: req.Name, + Description: req.Description, + Pattern: req.Pattern, + MaxCalls: maxCalls, + Enabled: enabled, + Command: req.Command, + UserID: pgUserID, + }) + if err != nil { + return Schedule{}, err + } + if row.Enabled { + if err := s.scheduleJob(row); err != nil { + return Schedule{}, err + } + } + return toSchedule(row), nil +} + +func (s *Service) Get(ctx context.Context, id string) (Schedule, error) { + pgID, err := parseUUID(id) + if err != nil { + return Schedule{}, err + } + row, err := s.queries.GetScheduleByID(ctx, pgID) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return Schedule{}, fmt.Errorf("schedule not found") + } + return Schedule{}, err + } + return toSchedule(row), nil +} + +func (s *Service) List(ctx context.Context, userID string) ([]Schedule, error) { + pgUserID, err := parseUUID(userID) + if err != nil { + return nil, err + } + rows, err := s.queries.ListSchedulesByUser(ctx, pgUserID) + if err != nil { + return nil, err + } + items := make([]Schedule, 0, len(rows)) + for _, row := range rows { + items = append(items, toSchedule(row)) + } + return items, nil +} + +func (s *Service) Update(ctx context.Context, id string, req UpdateRequest) (Schedule, error) { + pgID, err := parseUUID(id) + if err != nil { + return Schedule{}, err + } + existing, err := s.queries.GetScheduleByID(ctx, pgID) + if err != nil { + return Schedule{}, err + } + name := existing.Name + if req.Name != nil { + name = *req.Name + } + description := existing.Description + if req.Description != nil { + description = *req.Description + } + pattern := existing.Pattern + if req.Pattern != nil { + if _, err := s.parser.Parse(*req.Pattern); err != nil { + return Schedule{}, fmt.Errorf("invalid cron pattern: %w", err) + } + pattern = *req.Pattern + } + command := existing.Command + if req.Command != nil { + command = *req.Command + } + maxCalls := existing.MaxCalls + if req.MaxCalls != nil { + maxCalls = pgtype.Int4{Int32: int32(*req.MaxCalls), Valid: true} + } + enabled := existing.Enabled + if req.Enabled != nil { + enabled = *req.Enabled + } + updated, err := s.queries.UpdateSchedule(ctx, sqlc.UpdateScheduleParams{ + ID: pgID, + Name: name, + Description: description, + Pattern: pattern, + MaxCalls: maxCalls, + Enabled: enabled, + Command: command, + }) + if err != nil { + return Schedule{}, err + } + s.rescheduleJob(updated) + return toSchedule(updated), nil +} + +func (s *Service) Delete(ctx context.Context, id string) error { + pgID, err := parseUUID(id) + if err != nil { + return err + } + if err := s.queries.DeleteSchedule(ctx, pgID); err != nil { + return err + } + s.removeJob(id) + return nil +} + +func (s *Service) Trigger(ctx context.Context, scheduleID string) error { + if s.chat == nil { + return fmt.Errorf("chat resolver not configured") + } + schedule, err := s.Get(ctx, scheduleID) + if err != nil { + return err + } + if !schedule.Enabled { + return fmt.Errorf("schedule is disabled") + } + return s.runSchedule(ctx, schedule) +} + +func (s *Service) runSchedule(ctx context.Context, schedule Schedule) error { + if s.chat == nil { + return fmt.Errorf("chat resolver not configured") + } + updated, err := s.queries.IncrementScheduleCalls(ctx, toUUID(schedule.ID)) + if err != nil { + return err + } + if !updated.Enabled { + s.removeJob(schedule.ID) + } + token := "" + if s.jwtSecret != "" { + if signed, _, err := auth.GenerateToken(schedule.UserID, s.jwtSecret, 10*time.Minute); err == nil { + token = "Bearer " + signed + } + } + if err := s.chat.TriggerSchedule(ctx, schedule.UserID, chat.SchedulePayload{ + ID: schedule.ID, + Name: schedule.Name, + Description: schedule.Description, + Pattern: schedule.Pattern, + MaxCalls: schedule.MaxCalls, + Command: schedule.Command, + }, token); err != nil { + return err + } + return nil +} + +func (s *Service) scheduleJob(schedule sqlc.Schedule) error { + id := toUUIDString(schedule.ID) + if id == "" { + return fmt.Errorf("schedule id missing") + } + job := func() { + _ = s.runSchedule(context.Background(), toSchedule(schedule)) + } + entryID, err := s.cron.AddFunc(schedule.Pattern, job) + if err != nil { + return err + } + s.mu.Lock() + s.jobs[id] = entryID + s.mu.Unlock() + return nil +} + +func (s *Service) rescheduleJob(schedule sqlc.Schedule) { + id := toUUIDString(schedule.ID) + if id == "" { + return + } + s.removeJob(id) + if schedule.Enabled { + _ = s.scheduleJob(schedule) + } +} + +func (s *Service) removeJob(id string) { + s.mu.Lock() + defer s.mu.Unlock() + entryID, ok := s.jobs[id] + if ok { + s.cron.Remove(entryID) + delete(s.jobs, id) + } +} + +func toSchedule(row sqlc.Schedule) Schedule { + item := Schedule{ + ID: toUUIDString(row.ID), + Name: row.Name, + Description: row.Description, + Pattern: row.Pattern, + CurrentCalls: int(row.CurrentCalls), + Enabled: row.Enabled, + Command: row.Command, + UserID: toUUIDString(row.UserID), + } + if row.MaxCalls.Valid { + max := int(row.MaxCalls.Int32) + item.MaxCalls = &max + } + if row.CreatedAt.Valid { + item.CreatedAt = row.CreatedAt.Time + } + if row.UpdatedAt.Valid { + item.UpdatedAt = row.UpdatedAt.Time + } + return item +} + +func parseUUID(id string) (pgtype.UUID, error) { + parsed, err := uuid.Parse(strings.TrimSpace(id)) + if err != nil { + return pgtype.UUID{}, fmt.Errorf("invalid UUID: %w", err) + } + var pgID pgtype.UUID + pgID.Valid = true + copy(pgID.Bytes[:], parsed[:]) + return pgID, nil +} + +func toUUID(id string) pgtype.UUID { + pgID, err := parseUUID(id) + if err != nil { + return pgtype.UUID{} + } + return pgID +} + +func toUUIDString(value pgtype.UUID) string { + if !value.Valid { + return "" + } + id, err := uuid.FromBytes(value.Bytes[:]) + if err != nil { + return "" + } + return id.String() +} + diff --git a/internal/schedule/types.go b/internal/schedule/types.go new file mode 100644 index 00000000..29a04d8a --- /dev/null +++ b/internal/schedule/types.go @@ -0,0 +1,40 @@ +package schedule + +import "time" + +type Schedule struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Pattern string `json:"pattern"` + MaxCalls *int `json:"max_calls,omitempty"` + CurrentCalls int `json:"current_calls"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + Enabled bool `json:"enabled"` + Command string `json:"command"` + UserID string `json:"user_id"` +} + +type CreateRequest struct { + Name string `json:"name"` + Description string `json:"description"` + Pattern string `json:"pattern"` + MaxCalls *int `json:"max_calls,omitempty"` + Command string `json:"command"` + Enabled *bool `json:"enabled,omitempty"` +} + +type UpdateRequest struct { + Name *string `json:"name,omitempty"` + Description *string `json:"description,omitempty"` + Pattern *string `json:"pattern,omitempty"` + MaxCalls *int `json:"max_calls,omitempty"` + Command *string `json:"command,omitempty"` + Enabled *bool `json:"enabled,omitempty"` +} + +type ListResponse struct { + Items []Schedule `json:"items"` +} + diff --git a/internal/server/server.go b/internal/server/server.go index 91affb96..8becfced 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -15,7 +15,7 @@ type Server struct { addr string } -func NewServer(addr string, jwtSecret string, pingHandler *handlers.PingHandler, authHandler *handlers.AuthHandler, memoryHandler *handlers.MemoryHandler, embeddingsHandler *handlers.EmbeddingsHandler, chatHandler *handlers.ChatHandler, swaggerHandler *handlers.SwaggerHandler, providersHandler *handlers.ProvidersHandler, modelsHandler *handlers.ModelsHandler, settingsHandler *handlers.SettingsHandler, historyHandler *handlers.HistoryHandler, containerdHandler *handlers.ContainerdHandler) *Server { +func NewServer(addr string, jwtSecret string, pingHandler *handlers.PingHandler, authHandler *handlers.AuthHandler, memoryHandler *handlers.MemoryHandler, embeddingsHandler *handlers.EmbeddingsHandler, chatHandler *handlers.ChatHandler, swaggerHandler *handlers.SwaggerHandler, providersHandler *handlers.ProvidersHandler, modelsHandler *handlers.ModelsHandler, settingsHandler *handlers.SettingsHandler, historyHandler *handlers.HistoryHandler, scheduleHandler *handlers.ScheduleHandler, containerdHandler *handlers.ContainerdHandler) *Server { if addr == "" { addr = ":8080" } @@ -62,6 +62,9 @@ func NewServer(addr string, jwtSecret string, pingHandler *handlers.PingHandler, if historyHandler != nil { historyHandler.Register(e) } + if scheduleHandler != nil { + scheduleHandler.Register(e) + } if providersHandler != nil { providersHandler.Register(e) } diff --git a/mise.toml b/mise.toml index 3c4dd121..693326b3 100644 --- a/mise.toml +++ b/mise.toml @@ -38,7 +38,7 @@ run = "scripts/db-up.sh" [tasks.db-down] description = "Drop Database" -run = "scripts/db-down.sh" +run = "scripts/db-drop.sh" [tasks.dev] description = "Start development environment" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 89c7f67d..5ef21cd6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -51,6 +51,9 @@ importers: '@ai-sdk/openai': specifier: ^3.0.7 version: 3.0.7(zod@4.3.5) + '@elysiajs/bearer': + specifier: ^1.4.2 + version: 1.4.2(elysia@1.4.22(@sinclair/typebox@0.34.47)(@types/bun@1.3.7)(exact-mirror@0.2.6(@sinclair/typebox@0.34.47))(file-type@21.3.0)(openapi-types@12.1.3)(typescript@5.9.3)) '@elysiajs/cors': specifier: ^1.4.1 version: 1.4.1(elysia@1.4.22(@sinclair/typebox@0.34.47)(@types/bun@1.3.7)(exact-mirror@0.2.6(@sinclair/typebox@0.34.47))(file-type@21.3.0)(openapi-types@12.1.3)(typescript@5.9.3)) @@ -72,7 +75,7 @@ importers: devDependencies: bun-types: specifier: latest - version: 1.3.7 + version: 1.3.8 docs: devDependencies: @@ -124,7 +127,7 @@ importers: version: 22.19.5 bun-types: specifier: latest - version: 1.3.7 + version: 1.3.8 packages/shared: {} @@ -636,6 +639,11 @@ packages: '@drizzle-team/brocli@0.10.2': resolution: {integrity: sha512-z33Il7l5dKjUgGULTqBsQBQwckHh5AbIuxhdsIxDDiZAzBOrZO6q9ogcWC65kU382AfynTfgNumVcNIjuIua6w==} + '@elysiajs/bearer@1.4.2': + resolution: {integrity: sha512-MK2aCFqnFMqMNSa1e/A6+Ow5uNl5LpKd8K4lCB2LIsyDrI6juxOUHAgqq+esgdSoh3urD1UIMqFC//TsqCQViA==} + peerDependencies: + elysia: '>= 1.4.3' + '@elysiajs/cors@1.4.1': resolution: {integrity: sha512-lQfad+F3r4mNwsxRKbXyJB8Jg43oAOXjRwn7sKUL6bcOW3KjUqUimTS+woNpO97efpzjtDE0tEjGk9DTw8lqTQ==} peerDependencies: @@ -1584,56 +1592,67 @@ packages: resolution: {integrity: sha512-EHMUcDwhtdRGlXZsGSIuXSYwD5kOT9NVnx9sqzYiwAc91wfYOE1g1djOEDseZJKKqtHAHGwnGPQu3kytmfaXLQ==} cpu: [arm] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm-musleabihf@4.54.0': resolution: {integrity: sha512-+pBrqEjaakN2ySv5RVrj/qLytYhPKEUwk+e3SFU5jTLHIcAtqh2rLrd/OkbNuHJpsBgxsD8ccJt5ga/SeG0JmA==} cpu: [arm] os: [linux] + libc: [musl] '@rollup/rollup-linux-arm64-gnu@4.54.0': resolution: {integrity: sha512-NSqc7rE9wuUaRBsBp5ckQ5CVz5aIRKCwsoa6WMF7G01sX3/qHUw/z4pv+D+ahL1EIKy6Enpcnz1RY8pf7bjwng==} cpu: [arm64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm64-musl@4.54.0': resolution: {integrity: sha512-gr5vDbg3Bakga5kbdpqx81m2n9IX8M6gIMlQQIXiLTNeQW6CucvuInJ91EuCJ/JYvc+rcLLsDFcfAD1K7fMofg==} cpu: [arm64] os: [linux] + libc: [musl] '@rollup/rollup-linux-loong64-gnu@4.54.0': resolution: {integrity: sha512-gsrtB1NA3ZYj2vq0Rzkylo9ylCtW/PhpLEivlgWe0bpgtX5+9j9EZa0wtZiCjgu6zmSeZWyI/e2YRX1URozpIw==} cpu: [loong64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-ppc64-gnu@4.54.0': resolution: {integrity: sha512-y3qNOfTBStmFNq+t4s7Tmc9hW2ENtPg8FeUD/VShI7rKxNW7O4fFeaYbMsd3tpFlIg1Q8IapFgy7Q9i2BqeBvA==} cpu: [ppc64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-gnu@4.54.0': resolution: {integrity: sha512-89sepv7h2lIVPsFma8iwmccN7Yjjtgz0Rj/Ou6fEqg3HDhpCa+Et+YSufy27i6b0Wav69Qv4WBNl3Rs6pwhebQ==} cpu: [riscv64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-musl@4.54.0': resolution: {integrity: sha512-ZcU77ieh0M2Q8Ur7D5X7KvK+UxbXeDHwiOt/CPSBTI1fBmeDMivW0dPkdqkT4rOgDjrDDBUed9x4EgraIKoR2A==} cpu: [riscv64] os: [linux] + libc: [musl] '@rollup/rollup-linux-s390x-gnu@4.54.0': resolution: {integrity: sha512-2AdWy5RdDF5+4YfG/YesGDDtbyJlC9LHmL6rZw6FurBJ5n4vFGupsOBGfwMRjBYH7qRQowT8D/U4LoSvVwOhSQ==} cpu: [s390x] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-gnu@4.54.0': resolution: {integrity: sha512-WGt5J8Ij/rvyqpFexxk3ffKqqbLf9AqrTBbWDk7ApGUzaIs6V+s2s84kAxklFwmMF/vBNGrVdYgbblCOFFezMQ==} cpu: [x64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-musl@4.54.0': resolution: {integrity: sha512-JzQmb38ATzHjxlPHuTH6tE7ojnMKM2kYNzt44LO/jJi8BpceEC8QuXYA908n8r3CNuG/B3BV8VR3Hi1rYtmPiw==} cpu: [x64] os: [linux] + libc: [musl] '@rollup/rollup-openharmony-arm64@4.54.0': resolution: {integrity: sha512-huT3fd0iC7jigGh7n3q/+lfPcXxBi+om/Rs3yiFxjvSxbSB6aohDFXbWvlspaqjeOh+hx7DDHS+5Es5qRkWkZg==} @@ -1782,24 +1801,28 @@ packages: engines: {node: '>= 10'} cpu: [arm64] os: [linux] + libc: [glibc] '@tailwindcss/oxide-linux-arm64-musl@4.1.18': resolution: {integrity: sha512-1px92582HkPQlaaCkdRcio71p8bc8i/ap5807tPRDK/uw953cauQBT8c5tVGkOwrHMfc2Yh6UuxaH4vtTjGvHg==} engines: {node: '>= 10'} cpu: [arm64] os: [linux] + libc: [musl] '@tailwindcss/oxide-linux-x64-gnu@4.1.18': resolution: {integrity: sha512-v3gyT0ivkfBLoZGF9LyHmts0Isc8jHZyVcbzio6Wpzifg/+5ZJpDiRiUhDLkcr7f/r38SWNe7ucxmGW3j3Kb/g==} engines: {node: '>= 10'} cpu: [x64] os: [linux] + libc: [glibc] '@tailwindcss/oxide-linux-x64-musl@4.1.18': resolution: {integrity: sha512-bhJ2y2OQNlcRwwgOAGMY0xTFStt4/wyU6pvI6LSuZpRgKQwxTec0/3Scu91O8ir7qCR3AuepQKLU/kX99FouqQ==} engines: {node: '>= 10'} cpu: [x64] os: [linux] + libc: [musl] '@tailwindcss/oxide-wasm32-wasi@4.1.18': resolution: {integrity: sha512-LffYTvPjODiP6PT16oNeUQJzNVyJl1cjIebq/rWWBF+3eDst5JGEFSc5cWxyRCJ0Mxl+KyIkqRxk1XPEs9x8TA==} @@ -2413,6 +2436,9 @@ packages: bun-types@1.3.7: resolution: {integrity: sha512-qyschsA03Qz+gou+apt6HNl6HnI+sJJLL4wLDke4iugsE6584CMupOtTY1n+2YC9nGVrEKUlTs99jjRLKgWnjQ==} + bun-types@1.3.8: + resolution: {integrity: sha512-fL99nxdOWvV4LqjmC+8Q9kW3M4QTtTR1eePs94v5ctGqU8OeceWrSUaRw3JYb7tU3FkMIAjkueehrHPPPGKi5Q==} + bundle-name@4.1.0: resolution: {integrity: sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q==} engines: {node: '>=18'} @@ -3396,24 +3422,28 @@ packages: engines: {node: '>= 12.0.0'} cpu: [arm64] os: [linux] + libc: [glibc] lightningcss-linux-arm64-musl@1.30.2: resolution: {integrity: sha512-5Vh9dGeblpTxWHpOx8iauV02popZDsCYMPIgiuw97OJ5uaDsL86cnqSFs5LZkG3ghHoX5isLgWzMs+eD1YzrnA==} engines: {node: '>= 12.0.0'} cpu: [arm64] os: [linux] + libc: [musl] lightningcss-linux-x64-gnu@1.30.2: resolution: {integrity: sha512-Cfd46gdmj1vQ+lR6VRTTadNHu6ALuw2pKR9lYq4FnhvgBc4zWY1EtZcAc6EffShbb1MFrIPfLDXD6Xprbnni4w==} engines: {node: '>= 12.0.0'} cpu: [x64] os: [linux] + libc: [glibc] lightningcss-linux-x64-musl@1.30.2: resolution: {integrity: sha512-XJaLUUFXb6/QG2lGIW6aIk6jKdtjtcffUT0NKvIqhSBY3hh9Ch+1LCeH80dR9q9LBjG3ewbDjnumefsLsP6aiA==} engines: {node: '>= 12.0.0'} cpu: [x64] os: [linux] + libc: [musl] lightningcss-win32-arm64-msvc@1.30.2: resolution: {integrity: sha512-FZn+vaj7zLv//D/192WFFVA0RgHawIcHqLX9xuWiQt7P0PtdFEVaxgF9rjM/IRYHQXNnk61/H/gb2Ei+kUQ4xQ==} @@ -5039,6 +5069,10 @@ snapshots: '@drizzle-team/brocli@0.10.2': {} + '@elysiajs/bearer@1.4.2(elysia@1.4.22(@sinclair/typebox@0.34.47)(@types/bun@1.3.7)(exact-mirror@0.2.6(@sinclair/typebox@0.34.47))(file-type@21.3.0)(openapi-types@12.1.3)(typescript@5.9.3))': + dependencies: + elysia: 1.4.22(@sinclair/typebox@0.34.47)(@types/bun@1.3.7)(exact-mirror@0.2.6(@sinclair/typebox@0.34.47))(file-type@21.3.0)(openapi-types@12.1.3)(typescript@5.9.3) + '@elysiajs/cors@1.4.1(elysia@1.4.22(@sinclair/typebox@0.34.47)(@types/bun@1.3.7)(exact-mirror@0.2.6(@sinclair/typebox@0.34.47))(file-type@21.3.0)(openapi-types@12.1.3)(typescript@5.9.3))': dependencies: elysia: 1.4.22(@sinclair/typebox@0.34.47)(@types/bun@1.3.7)(exact-mirror@0.2.6(@sinclair/typebox@0.34.47))(file-type@21.3.0)(openapi-types@12.1.3)(typescript@5.9.3) @@ -6672,6 +6706,10 @@ snapshots: dependencies: '@types/node': 24.10.4 + bun-types@1.3.8: + dependencies: + '@types/node': 24.10.4 + bundle-name@4.1.0: dependencies: run-applescript: 7.1.0