diff --git a/agent/src/modules/chat.ts b/agent/src/modules/chat.ts index 290c1a5b..e90874a2 100644 --- a/agent/src/modules/chat.ts +++ b/agent/src/modules/chat.ts @@ -23,6 +23,7 @@ const AgentModel = z.object({ export const chatModule = new Elysia({ prefix: '/chat' }) .use(bearerMiddleware) .post('/', async ({ body, bearer }) => { + console.log('chat', body) const authFetcher = createAuthFetcher(bearer) const { ask } = createAgent({ model: body.model as ModelConfig, @@ -45,6 +46,7 @@ export const chatModule = new Elysia({ prefix: '/chat' }) }), }) .post('/stream', async function* ({ body, bearer }) { + console.log('stream', body) try { const authFetcher = createAuthFetcher(bearer) const { stream } = createAgent({ diff --git a/packages/cli/src/cli/bot.ts b/packages/cli/src/cli/bot.ts index 6a967783..4b71a702 100644 --- a/packages/cli/src/cli/bot.ts +++ b/packages/cli/src/cli/bot.ts @@ -102,16 +102,43 @@ const streamChat = async (query: string, botId: string, sessionId: string, token return true } +const extractTextFromMessage = (message: unknown) => { + if (typeof message === 'string') return message + if (message && typeof message === 'object') { + const value = message as { text?: unknown; parts?: unknown[] } + if (typeof value.text === 'string') return value.text + if (Array.isArray(value.parts)) { + const lines = value.parts + .map((part) => { + if (!part || typeof part !== 'object') return '' + const typed = part as { text?: unknown; url?: unknown; emoji?: unknown } + if (typeof typed.text === 'string' && typed.text.trim()) return typed.text + if (typeof typed.url === 'string' && typed.url.trim()) return typed.url + if (typeof typed.emoji === 'string' && typed.emoji.trim()) return typed.emoji + return '' + }) + .filter(Boolean) + if (lines.length) return lines.join('\n') + } + } + return null +} + const extractTextFromEvent = (payload: string) => { try { const event = JSON.parse(payload) if (typeof event === 'string') return event if (typeof event?.text === 'string') return event.text + const messageText = extractTextFromMessage(event?.message) + if (messageText) return messageText + if (typeof event?.delta === 'string') return event.delta if (typeof event?.delta?.content === 'string') return event.delta.content if (typeof event?.content === 'string') return event.content if (typeof event?.data === 'string') return event.data if (typeof event?.data?.text === 'string') return event.data.text if (typeof event?.data?.delta?.content === 'string') return event.data.delta.content + const nestedMessageText = extractTextFromMessage(event?.data?.message) + if (nestedMessageText) return nestedMessageText return null } catch { return payload @@ -275,7 +302,11 @@ export const registerBotCommands = (program: Command) => { console.log(chalk.green(`Chatting with ${chalk.bold(botId)} (session ${sessionId}). Type \`exit\` to quit.`)) while (true) { const line = (await rl.question(chalk.cyan('> '))).trim() - if (!line || line.toLowerCase() === 'exit') { + if (!line) { + if (input.readableEnded) break + continue + } + if (line.toLowerCase() === 'exit') { break } try { diff --git a/packages/cli/src/cli/index.ts b/packages/cli/src/cli/index.ts index 4f3dd925..9912d51c 100755 --- a/packages/cli/src/cli/index.ts +++ b/packages/cli/src/cli/index.ts @@ -6,6 +6,7 @@ import ora from 'ora' import { table } from 'table' import readline from 'node:readline/promises' import { stdin as input, stdout as output } from 'node:process' +import { randomUUID } from 'node:crypto' import packageJson from '../../package.json' import { apiRequest } from '../core/api' @@ -709,30 +710,29 @@ program await ensureModelsReady() const token = ensureAuth() const botId = await resolveBotId(token, program.opts().bot) - const session = await createLocalSession(botId, token) - const sessionId = session.session_id - - const abortStream = startLocalStream(botId, sessionId, token, (text) => { - if (text) { - process.stdout.write(`\n${chalk.white(text)}\n`) - } - }) + const sessionId = `cli:${randomUUID()}` const rl = readline.createInterface({ input, output }) console.log(chalk.green(`Chatting with ${chalk.bold(botId)}. Type \`exit\` to quit.`)) while (true) { const line = (await rl.question(chalk.cyan('> '))).trim() - if (!line || line.toLowerCase() === 'exit') { + if (!line) { + if (input.readableEnded) break + continue + } + if (line.toLowerCase() === 'exit') { break } try { - await postLocalMessage(botId, sessionId, line, token) + const ok = await streamChat(line, botId, sessionId, token) + if (!ok) { + console.log(chalk.red('Chat failed or stream unavailable.')) + } } catch (err: unknown) { console.log(chalk.red(getErrorMessage(err) || 'Chat failed')) } } - abortStream() rl.close() }) @@ -798,100 +798,72 @@ const streamChat = async (query: string, botId: string, sessionId: string, token return true } +const extractTextFromMessage = (message: unknown) => { + if (typeof message === 'string') return message + if (message && typeof message === 'object') { + const value = message as { text?: unknown; parts?: unknown[] } + if (typeof value.text === 'string') return value.text + if (Array.isArray(value.parts)) { + const lines = value.parts + .map((part) => { + if (!part || typeof part !== 'object') return '' + const typed = part as { text?: unknown; url?: unknown; emoji?: unknown } + if (typeof typed.text === 'string' && typed.text.trim()) return typed.text + if (typeof typed.url === 'string' && typed.url.trim()) return typed.url + if (typeof typed.emoji === 'string' && typed.emoji.trim()) return typed.emoji + return '' + }) + .filter(Boolean) + if (lines.length) return lines.join('\n') + } + } + return null +} + const extractTextFromEvent = (payload: string) => { try { const event = JSON.parse(payload) if (typeof event === 'string') return event if (typeof event?.text === 'string') return event.text + const messageText = extractTextFromMessage(event?.message) + if (messageText) return messageText + if (typeof event?.delta === 'string') return event.delta if (typeof event?.delta?.content === 'string') return event.delta.content if (typeof event?.content === 'string') return event.content if (typeof event?.data === 'string') return event.data if (typeof event?.data?.text === 'string') return event.data.text if (typeof event?.data?.delta?.content === 'string') return event.data.delta.content + const nestedMessageText = extractTextFromMessage(event?.data?.message) + if (nestedMessageText) return nestedMessageText return null } catch { return payload } } -type LocalSessionResponse = { - session_id: string - stream_url: string -} - -const createLocalSession = async (botId: string, token: TokenInfo) => { - return apiRequest(`/bots/${botId}/cli/sessions`, { method: 'POST' }, token) -} - -const postLocalMessage = async (botId: string, sessionId: string, text: string, token: TokenInfo) => { - return apiRequest(`/bots/${botId}/cli/sessions/${sessionId}/messages`, { - method: 'POST', - body: JSON.stringify({ message: { text } }), - }, token) -} - -const startLocalStream = (botId: string, sessionId: string, token: TokenInfo, onText: (text: string) => void) => { - const config = readConfig() - const baseURL = getBaseURL(config) - const controller = new AbortController() - void (async () => { - const resp = await fetch(`${baseURL}/bots/${botId}/chat/stream`, { - method: 'GET', - headers: { - 'Authorization': `Bearer ${token.access_token}`, - }, - signal: controller.signal, - }).catch(() => null) - if (!resp || !resp.ok || !resp.body) return - - const stream = resp.body - const reader = stream.getReader() - const decoder = new TextDecoder() - let buffer = '' - while (true) { - const { value, done } = await reader.read() - if (done) break - buffer += decoder.decode(value, { stream: true }) - let idx - while ((idx = buffer.indexOf('\n')) >= 0) { - const line = buffer.slice(0, idx).trim() - buffer = buffer.slice(idx + 1) - if (!line.startsWith('data:')) continue - const payload = line.slice(5).trim() - if (!payload || payload === '[DONE]') continue - const text = extractTextFromEvent(payload) - if (text) { - onText(text) - } - } - } - })() - return () => controller.abort() -} - const runTui = async (botId: string, token: TokenInfo) => { - const session = await createLocalSession(botId, token) - const sessionId = session.session_id - const abortStream = startLocalStream(botId, sessionId, token, (text) => { - if (text) { - process.stdout.write(`\n${chalk.white(text)}\n`) - } - }) + const sessionId = `cli:${randomUUID()}` const rl = readline.createInterface({ input, output }) console.log(chalk.green(`TUI session (line mode) with ${chalk.bold(botId)}. Type \`exit\` to quit.`)) while (true) { const line = (await rl.question(chalk.cyan('> '))).trim() - if (!line || line.toLowerCase() === 'exit') { + if (!line) { + if (input.readableEnded) break + continue + } + if (line.toLowerCase() === 'exit') { break } try { - await postLocalMessage(botId, sessionId, line, token) + const ok = await streamChat(line, botId, sessionId, token) + if (!ok) { + console.log(chalk.red('Chat failed or stream unavailable.')) + } } catch (err: unknown) { console.log(chalk.red(getErrorMessage(err) || 'Chat failed')) } } - abortStream() rl.close() }