feat: schedule task

This commit is contained in:
Acbox
2026-01-30 23:31:43 +08:00
parent 0273b45141
commit ba34fb156d
28 changed files with 1674 additions and 43 deletions
+39 -10
View File
@@ -2,6 +2,8 @@ import { generateText, ModelMessage, stepCountIs, streamText, TextStreamPart, To
import { createChatGateway } from './gateway'
import { ClientType, Schedule } from './types'
import { system, schedule } from './prompts'
import { AuthFetcher } from './index'
import { getScheduleTools } from './tools/schedule'
export interface AgentParams {
apiKey: string
@@ -25,12 +27,22 @@ export interface AgentResult {
messages: ModelMessage[]
}
export const createAgent = (params: AgentParams) => {
export const createAgent = (
params: AgentParams,
fetcher: AuthFetcher = fetch,
) => {
const gateway = createChatGateway(params.clientType)
const messages: ModelMessage[] = []
const maxSteps = params.maxSteps ?? 50
const getTools = () => {
const scheduleTools = getScheduleTools({ fetch: fetcher })
return {
...scheduleTools,
}
}
const generateSystem = () => {
return system({
date: new Date(),
@@ -44,10 +56,11 @@ export const createAgent = (params: AgentParams) => {
const ask = async (input: AgentInput): Promise<AgentResult> => {
messages.push(...input.messages)
messages.push({
const user: ModelMessage = {
role: 'user',
content: input.query,
})
}
messages.push(user)
const { response } = await generateText({
model: gateway({
apiKey: params.apiKey,
@@ -56,18 +69,20 @@ export const createAgent = (params: AgentParams) => {
system: generateSystem(),
stopWhen: stepCountIs(maxSteps),
messages,
tools: getTools(),
})
return {
messages: response.messages,
messages: [user, ...response.messages],
}
}
async function* stream(input: AgentInput): AsyncGenerator<TextStreamPart<ToolSet>, AgentResult> {
messages.push(...input.messages)
messages.push({
const user: ModelMessage = {
role: 'user',
content: input.query,
})
}
messages.push(user)
const { response, fullStream } = streamText({
model: gateway({
apiKey: params.apiKey,
@@ -76,29 +91,43 @@ export const createAgent = (params: AgentParams) => {
system: generateSystem(),
stopWhen: stepCountIs(maxSteps),
messages,
tools: getTools(),
})
for await (const event of fullStream) {
yield event
}
return {
messages: (await response).messages,
messages: [user, ...(await response).messages],
}
}
const triggerSchedule = async (
input: AgentInput,
scheduleData: Schedule
) => {
): Promise<AgentResult> => {
messages.push(...input.messages)
messages.push({
const user: ModelMessage = {
role: 'user',
content: schedule({
schedule: scheduleData,
locale: params.locale,
date: new Date(),
}),
}
messages.push(user)
const { response } = await generateText({
model: gateway({
apiKey: params.apiKey,
baseURL: params.baseUrl,
})(params.model),
system: generateSystem(),
stopWhen: stepCountIs(maxSteps),
messages,
tools: getTools(),
})
return await ask(input)
return {
messages: [user, ...response.messages],
}
}
return {
+3
View File
@@ -5,6 +5,9 @@ type AgentGatewayConfig = {
'agent_gateway': {
host?: string
port?: number
},
'server': {
addr?: string
}
}
+22
View File
@@ -3,9 +3,31 @@ import { chatModule } from './modules/chat'
import { corsMiddleware } from './middlewares/cors'
import { errorMiddleware } from './middlewares/error'
import { loadConfig } from './config'
import { join } from 'path'
const config = loadConfig('../config.toml')
export type AuthFetcher = (url: string, options: RequestInit) => Promise<Response>
export const createAuthFetcher = (bearer: string | undefined): AuthFetcher => {
return async (url: string, options: RequestInit) => {
const headers = new Headers(options.headers || {})
if (bearer) {
headers.set('Authorization', `Bearer ${bearer}`)
}
let baseUrl = ''
if (!baseUrl) {
baseUrl = 'http://127.0.0.1'
}
if (typeof config.server.addr === 'string' && config.server.addr.startsWith(':')) {
baseUrl = `http://127.0.0.1${config.server.addr}`
}
return await fetch(join(baseUrl, url), {
...options,
headers,
})
}
}
const app = new Elysia()
.use(corsMiddleware)
.use(errorMiddleware)
+3
View File
@@ -0,0 +1,3 @@
import { bearer } from '@elysiajs/bearer'
export const bearerMiddleware = bearer()
+29 -16
View File
@@ -1,8 +1,10 @@
import { Elysia, sse } from 'elysia'
import z from 'zod'
import { createAgent } from '../agent'
import { createAuthFetcher } from '../index'
import { ClientType } from '../types'
import { ModelMessage } from 'ai'
import { bearerMiddleware } from '../middlewares/bearer'
const ChatBody = z.object({
apiKey: z.string().min(1, 'API key is required'),
@@ -36,13 +38,8 @@ const ScheduleBody = z.object({
}).and(ChatBody)
export const chatModule = new Elysia({ prefix: '/chat' })
.post('/', async ({ body }) => {
console.log('[Chat] request', {
type: 'chat',
clientType: body.clientType,
model: body.model,
baseUrl: body.baseUrl,
})
.use(bearerMiddleware)
.post('/', async ({ body, bearer }) => {
const { ask } = createAgent({
apiKey: body.apiKey,
baseUrl: body.baseUrl,
@@ -54,7 +51,7 @@ export const chatModule = new Elysia({ prefix: '/chat' })
maxContextLoadTime: body.maxContextLoadTime,
platforms: body.platforms,
currentPlatform: body.currentPlatform,
})
}, createAuthFetcher(bearer))
try {
const result = await ask({
messages: body.messages as unknown as ModelMessage[],
@@ -75,12 +72,13 @@ export const chatModule = new Elysia({ prefix: '/chat' })
}, {
body: ChatBody,
})
.post('/stream', async function* ({ body }) {
.post('/stream', async function* ({ body, bearer }) {
console.log('[Chat] request', {
type: 'stream',
clientType: body.clientType,
model: body.model,
baseUrl: body.baseUrl,
bearer,
})
const { stream } = createAgent({
apiKey: body.apiKey,
@@ -93,7 +91,7 @@ export const chatModule = new Elysia({ prefix: '/chat' })
maxContextLoadTime: body.maxContextLoadTime,
platforms: body.platforms,
currentPlatform: body.currentPlatform,
})
}, createAuthFetcher(bearer))
try {
const streanGenerator = stream({
messages: body.messages as unknown as ModelMessage[],
@@ -127,7 +125,12 @@ export const chatModule = new Elysia({ prefix: '/chat' })
}, {
body: ChatBody,
})
.post('/schedule', async ({ body }) => {
.post('/schedule', async ({ body, bearer }) => {
console.log('[Chat] schedule request', {
type: 'schedule',
bearer,
body,
})
const { triggerSchedule } = createAgent({
apiKey: body.apiKey,
baseUrl: body.baseUrl,
@@ -139,11 +142,21 @@ export const chatModule = new Elysia({ prefix: '/chat' })
maxContextLoadTime: body.maxContextLoadTime,
platforms: body.platforms,
currentPlatform: body.currentPlatform,
})
return await triggerSchedule({
messages: body.messages as unknown as ModelMessage[],
query: body.query,
}, body.schedule)
}, createAuthFetcher(bearer))
try {
return await triggerSchedule({
messages: body.messages as unknown as ModelMessage[],
query: body.query,
}, body.schedule)
} catch (error) {
console.error('[Chat] schedule error', {
type: 'schedule',
bearer,
body,
error,
})
throw error
}
}, {
body: ScheduleBody,
})
+4 -4
View File
@@ -32,12 +32,12 @@ Your abilities:
**Schedule**
- We use **Cron Syntax** to schedule tasks.
- You can use ${quote('get-schedules')} to get the list of schedules.
- You can use ${quote('remove-schedule')} to remove a schedule by id.
- You can use ${quote('schedule')} to schedule a task.
- You can use ${quote('schedule_list')} to get the list of schedules.
- You can use ${quote('schedule_delete')} to remove a schedule by id.
- You can use ${quote('schedule_create')} to create a new schedule.
+ The ${quote('pattern')} is the pattern of the schedule with **Cron Syntax**.
+ The ${quote('command')} is the natural language command to execute, will send to you when the schedule is triggered, which means the command will be executed by presence of you.
+ The ${quote('maxCalls')} is the maximum number of calls to the schedule, If you want to run the task only once, set it to 1.
+ The ${quote('max_calls')} is the maximum number of calls to the schedule, If you want to run the task only once, set it to 1.
- The ${quote('command')} should include the method (e.g. ${quote('send-message')}) for returning the task result. If the user does not specify otherwise, the user should be asked how they would like to be notified.
**Message**
View File
+24
View File
@@ -0,0 +1,24 @@
import { tool } from 'ai'
import { AuthFetcher } from '..'
import { z } from 'zod'
export type MemoryToolParams = {
fetch: AuthFetcher
}
export const getMemoryTools = ({ fetch }: MemoryToolParams) => {
const searchMemory = tool({
description: 'Search for memories',
inputSchema: z.object({
query: z.string().describe('The query to search for memories'),
}),
execute: async ({ query }) => {
const response = await fetch(`/memory/search?query=${query}`)
return response.json()
},
})
return {
'search_memory': searchMemory,
}
}
+98
View File
@@ -0,0 +1,98 @@
import { tool } from 'ai'
import { z } from 'zod'
import { AuthFetcher } from '..'
export type ScheduleToolParams = {
fetch: AuthFetcher
}
const ScheduleSchema = z.object({
id: z.string(),
name: z.string(),
description: z.string(),
pattern: z.string(),
max_calls: z.number().nullable().optional(),
current_calls: z.number().optional(),
created_at: z.string().optional(),
updated_at: z.string().optional(),
enabled: z.boolean(),
command: z.string(),
user_id: z.string().optional(),
})
export const getScheduleTools = ({ fetch }: ScheduleToolParams) => {
const listSchedules = tool({
description: 'List schedules for current user',
inputSchema: z.object({}),
execute: async () => {
const response = await fetch('/schedule', { method: 'GET' })
return response.json()
},
})
const getSchedule = tool({
description: 'Get a schedule by id',
inputSchema: z.object({
id: z.string().describe('Schedule ID'),
}),
execute: async ({ id }) => {
const response = await fetch(`/schedule/${id}`, { method: 'GET' })
return response.json()
},
})
const createSchedule = tool({
description: 'Create a new schedule',
inputSchema: z.object({
name: z.string(),
description: z.string(),
pattern: z.string(),
max_calls: z.number().optional(),
enabled: z.boolean().optional(),
command: z.string(),
}),
execute: async (payload) => {
const response = await fetch('/schedule', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
})
return response.json()
},
})
const updateSchedule = tool({
description: 'Update an existing schedule',
inputSchema: ScheduleSchema.partial().extend({
id: z.string(),
}),
execute: async (payload) => {
const { id, ...body } = payload
const response = await fetch(`/schedule/${id}`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
})
return response.json()
},
})
const deleteSchedule = tool({
description: 'Delete a schedule',
inputSchema: z.object({
id: z.string(),
}),
execute: async ({ id }) => {
const response = await fetch(`/schedule/${id}`, { method: 'DELETE' })
return response.status === 204 ? { success: true } : response.json()
},
})
return {
'schedule_list': listSchedules,
'schedule_get': getSchedule,
'schedule_create': createSchedule,
'schedule_update': updateSchedule,
'schedule_delete': deleteSchedule,
}
}