fix(cli): stream

This commit is contained in:
Acbox
2026-02-07 01:37:36 +08:00
parent 29e6ddd1f9
commit a9596ab3a8
3 changed files with 82 additions and 77 deletions
+2
View File
@@ -23,6 +23,7 @@ const AgentModel = z.object({
export const chatModule = new Elysia({ prefix: '/chat' })
.use(bearerMiddleware)
.post('/', async ({ body, bearer }) => {
console.log('chat', body)
const authFetcher = createAuthFetcher(bearer)
const { ask } = createAgent({
model: body.model as ModelConfig,
@@ -45,6 +46,7 @@ export const chatModule = new Elysia({ prefix: '/chat' })
}),
})
.post('/stream', async function* ({ body, bearer }) {
console.log('stream', body)
try {
const authFetcher = createAuthFetcher(bearer)
const { stream } = createAgent({
+32 -1
View File
@@ -102,16 +102,43 @@ const streamChat = async (query: string, botId: string, sessionId: string, token
return true
}
const extractTextFromMessage = (message: unknown) => {
if (typeof message === 'string') return message
if (message && typeof message === 'object') {
const value = message as { text?: unknown; parts?: unknown[] }
if (typeof value.text === 'string') return value.text
if (Array.isArray(value.parts)) {
const lines = value.parts
.map((part) => {
if (!part || typeof part !== 'object') return ''
const typed = part as { text?: unknown; url?: unknown; emoji?: unknown }
if (typeof typed.text === 'string' && typed.text.trim()) return typed.text
if (typeof typed.url === 'string' && typed.url.trim()) return typed.url
if (typeof typed.emoji === 'string' && typed.emoji.trim()) return typed.emoji
return ''
})
.filter(Boolean)
if (lines.length) return lines.join('\n')
}
}
return null
}
const extractTextFromEvent = (payload: string) => {
try {
const event = JSON.parse(payload)
if (typeof event === 'string') return event
if (typeof event?.text === 'string') return event.text
const messageText = extractTextFromMessage(event?.message)
if (messageText) return messageText
if (typeof event?.delta === 'string') return event.delta
if (typeof event?.delta?.content === 'string') return event.delta.content
if (typeof event?.content === 'string') return event.content
if (typeof event?.data === 'string') return event.data
if (typeof event?.data?.text === 'string') return event.data.text
if (typeof event?.data?.delta?.content === 'string') return event.data.delta.content
const nestedMessageText = extractTextFromMessage(event?.data?.message)
if (nestedMessageText) return nestedMessageText
return null
} catch {
return payload
@@ -275,7 +302,11 @@ export const registerBotCommands = (program: Command) => {
console.log(chalk.green(`Chatting with ${chalk.bold(botId)} (session ${sessionId}). Type \`exit\` to quit.`))
while (true) {
const line = (await rl.question(chalk.cyan('> '))).trim()
if (!line || line.toLowerCase() === 'exit') {
if (!line) {
if (input.readableEnded) break
continue
}
if (line.toLowerCase() === 'exit') {
break
}
try {
+48 -76
View File
@@ -6,6 +6,7 @@ import ora from 'ora'
import { table } from 'table'
import readline from 'node:readline/promises'
import { stdin as input, stdout as output } from 'node:process'
import { randomUUID } from 'node:crypto'
import packageJson from '../../package.json'
import { apiRequest } from '../core/api'
@@ -709,30 +710,29 @@ program
await ensureModelsReady()
const token = ensureAuth()
const botId = await resolveBotId(token, program.opts().bot)
const session = await createLocalSession(botId, token)
const sessionId = session.session_id
const abortStream = startLocalStream(botId, sessionId, token, (text) => {
if (text) {
process.stdout.write(`\n${chalk.white(text)}\n`)
}
})
const sessionId = `cli:${randomUUID()}`
const rl = readline.createInterface({ input, output })
console.log(chalk.green(`Chatting with ${chalk.bold(botId)}. Type \`exit\` to quit.`))
while (true) {
const line = (await rl.question(chalk.cyan('> '))).trim()
if (!line || line.toLowerCase() === 'exit') {
if (!line) {
if (input.readableEnded) break
continue
}
if (line.toLowerCase() === 'exit') {
break
}
try {
await postLocalMessage(botId, sessionId, line, token)
const ok = await streamChat(line, botId, sessionId, token)
if (!ok) {
console.log(chalk.red('Chat failed or stream unavailable.'))
}
} catch (err: unknown) {
console.log(chalk.red(getErrorMessage(err) || 'Chat failed'))
}
}
abortStream()
rl.close()
})
@@ -798,100 +798,72 @@ const streamChat = async (query: string, botId: string, sessionId: string, token
return true
}
const extractTextFromMessage = (message: unknown) => {
if (typeof message === 'string') return message
if (message && typeof message === 'object') {
const value = message as { text?: unknown; parts?: unknown[] }
if (typeof value.text === 'string') return value.text
if (Array.isArray(value.parts)) {
const lines = value.parts
.map((part) => {
if (!part || typeof part !== 'object') return ''
const typed = part as { text?: unknown; url?: unknown; emoji?: unknown }
if (typeof typed.text === 'string' && typed.text.trim()) return typed.text
if (typeof typed.url === 'string' && typed.url.trim()) return typed.url
if (typeof typed.emoji === 'string' && typed.emoji.trim()) return typed.emoji
return ''
})
.filter(Boolean)
if (lines.length) return lines.join('\n')
}
}
return null
}
const extractTextFromEvent = (payload: string) => {
try {
const event = JSON.parse(payload)
if (typeof event === 'string') return event
if (typeof event?.text === 'string') return event.text
const messageText = extractTextFromMessage(event?.message)
if (messageText) return messageText
if (typeof event?.delta === 'string') return event.delta
if (typeof event?.delta?.content === 'string') return event.delta.content
if (typeof event?.content === 'string') return event.content
if (typeof event?.data === 'string') return event.data
if (typeof event?.data?.text === 'string') return event.data.text
if (typeof event?.data?.delta?.content === 'string') return event.data.delta.content
const nestedMessageText = extractTextFromMessage(event?.data?.message)
if (nestedMessageText) return nestedMessageText
return null
} catch {
return payload
}
}
type LocalSessionResponse = {
session_id: string
stream_url: string
}
const createLocalSession = async (botId: string, token: TokenInfo) => {
return apiRequest<LocalSessionResponse>(`/bots/${botId}/cli/sessions`, { method: 'POST' }, token)
}
const postLocalMessage = async (botId: string, sessionId: string, text: string, token: TokenInfo) => {
return apiRequest(`/bots/${botId}/cli/sessions/${sessionId}/messages`, {
method: 'POST',
body: JSON.stringify({ message: { text } }),
}, token)
}
const startLocalStream = (botId: string, sessionId: string, token: TokenInfo, onText: (text: string) => void) => {
const config = readConfig()
const baseURL = getBaseURL(config)
const controller = new AbortController()
void (async () => {
const resp = await fetch(`${baseURL}/bots/${botId}/chat/stream`, {
method: 'GET',
headers: {
'Authorization': `Bearer ${token.access_token}`,
},
signal: controller.signal,
}).catch(() => null)
if (!resp || !resp.ok || !resp.body) return
const stream = resp.body
const reader = stream.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { value, done } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let idx
while ((idx = buffer.indexOf('\n')) >= 0) {
const line = buffer.slice(0, idx).trim()
buffer = buffer.slice(idx + 1)
if (!line.startsWith('data:')) continue
const payload = line.slice(5).trim()
if (!payload || payload === '[DONE]') continue
const text = extractTextFromEvent(payload)
if (text) {
onText(text)
}
}
}
})()
return () => controller.abort()
}
const runTui = async (botId: string, token: TokenInfo) => {
const session = await createLocalSession(botId, token)
const sessionId = session.session_id
const abortStream = startLocalStream(botId, sessionId, token, (text) => {
if (text) {
process.stdout.write(`\n${chalk.white(text)}\n`)
}
})
const sessionId = `cli:${randomUUID()}`
const rl = readline.createInterface({ input, output })
console.log(chalk.green(`TUI session (line mode) with ${chalk.bold(botId)}. Type \`exit\` to quit.`))
while (true) {
const line = (await rl.question(chalk.cyan('> '))).trim()
if (!line || line.toLowerCase() === 'exit') {
if (!line) {
if (input.readableEnded) break
continue
}
if (line.toLowerCase() === 'exit') {
break
}
try {
await postLocalMessage(botId, sessionId, line, token)
const ok = await streamChat(line, botId, sessionId, token)
if (!ok) {
console.log(chalk.red('Chat failed or stream unavailable.'))
}
} catch (err: unknown) {
console.log(chalk.red(getErrorMessage(err) || 'Chat failed'))
}
}
abortStream()
rl.close()
}