diff --git a/.gitignore b/.gitignore index 525f6b6e..727465b3 100644 --- a/.gitignore +++ b/.gitignore @@ -91,8 +91,8 @@ Thumbs.db tmp/ # compiled files -memoh -agent +/memoh +/agent docs/docs/.vitepress/cache .pnpm-store diff --git a/apps/web/src/composables/api/useChat.types.ts b/apps/web/src/composables/api/useChat.types.ts index 217bbc88..af288f76 100644 --- a/apps/web/src/composables/api/useChat.types.ts +++ b/apps/web/src/composables/api/useChat.types.ts @@ -46,7 +46,7 @@ export interface StreamEvent { | 'text_start' | 'text_delta' | 'text_end' | 'reasoning_start' | 'reasoning_delta' | 'reasoning_end' | 'tool_call_start' | 'tool_call_end' - | 'attachment_delta' + | 'attachment_delta' | 'reaction_delta' | 'agent_start' | 'agent_end' | 'agent_abort' | 'processing_started' | 'processing_completed' | 'processing_failed' | 'error' diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 968cc290..8c88553a 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -448,6 +448,7 @@ func provideChannelManager(log *slog.Logger, registry *channel.Registry, channel if mw := channelRouter.IdentityMiddleware(); mw != nil { mgr.Use(mw) } + channelRouter.SetReactor(mgr) return mgr } diff --git a/internal/channel/inbound/channel.go b/internal/channel/inbound/channel.go index babd569f..2fcbf3e1 100644 --- a/internal/channel/inbound/channel.go +++ b/internal/channel/inbound/channel.go @@ -39,6 +39,10 @@ type RouteResolver interface { ResolveConversation(ctx context.Context, input route.ResolveInput) (route.ResolveConversationResult, error) } +type channelReactor interface { + React(ctx context.Context, botID string, channelType channel.ChannelType, req channel.ReactRequest) error +} + type mediaIngestor interface { Ingest(ctx context.Context, input media.IngestInput) (media.Asset, error) // GetByStorageKey resolves an asset by reading its sidecar JSON. @@ -55,6 +59,7 @@ type ChannelInboundProcessor struct { routeResolver RouteResolver message messagepkg.Writer mediaService mediaIngestor + reactor channelReactor inboxService *inbox.Service registry *channel.Registry logger *slog.Logger @@ -114,6 +119,14 @@ func (p *ChannelInboundProcessor) SetMediaService(mediaService mediaIngestor) { p.mediaService = mediaService } +// SetReactor configures the channel reactor for handling inline emoji reactions. +func (p *ChannelInboundProcessor) SetReactor(reactor channelReactor) { + if p == nil { + return + } + p.reactor = reactor +} + // SetStreamObserver configures an observer that receives copies of all stream // events produced for non-local channels (e.g. Telegram, Feishu). This enables // cross-channel visibility in the WebUI without coupling adapters to the hub. @@ -443,6 +456,10 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel } assetMu.Unlock() } + if event.Type == channel.StreamEventReaction && len(event.Reactions) > 0 { + p.dispatchReactions(ctx, identity.BotID, msg.Channel, target, sourceMessageID, event.Reactions) + continue + } if pushErr := stream.Push(ctx, events[i]); pushErr != nil { streamErr = pushErr break @@ -861,6 +878,7 @@ type gatewayStreamEnvelope struct { Input json.RawMessage `json:"input"` Result json.RawMessage `json:"result"` Attachments json.RawMessage `json:"attachments"` + Reactions json.RawMessage `json:"reactions"` } type gatewayStreamDoneData struct { @@ -954,6 +972,14 @@ func mapStreamChunkToChannelEvents(chunk conversation.StreamChunk) ([]channel.St return []channel.StreamEvent{ {Type: channel.StreamEventAttachment, Attachments: attachments}, }, finalMessages, nil + case "reaction_delta": + reactions := parseReactionDelta(envelope.Reactions) + if len(reactions) == 0 { + return nil, finalMessages, nil + } + return []channel.StreamEvent{ + {Type: channel.StreamEventReaction, Reactions: reactions}, + }, finalMessages, nil case "agent_start": return []channel.StreamEvent{ { @@ -1935,6 +1961,73 @@ func parseAttachmentDelta(raw json.RawMessage) []channel.Attachment { return attachments } +// parseReactionDelta converts raw JSON reaction data to channel ReactRequests. +func parseReactionDelta(raw json.RawMessage) []channel.ReactRequest { + if len(raw) == 0 { + return nil + } + var items []struct { + Emoji string `json:"emoji"` + } + if err := json.Unmarshal(raw, &items); err != nil { + return nil + } + reactions := make([]channel.ReactRequest, 0, len(items)) + for _, item := range items { + emoji := strings.TrimSpace(item.Emoji) + if emoji == "" { + continue + } + reactions = append(reactions, channel.ReactRequest{ + Emoji: emoji, + }) + } + return reactions +} + +// dispatchReactions sends emoji reactions to the channel for the source message. +func (p *ChannelInboundProcessor) dispatchReactions( + ctx context.Context, + botID string, + channelType channel.ChannelType, + target string, + sourceMessageID string, + reactions []channel.ReactRequest, +) { + if p.reactor == nil { + return + } + target = strings.TrimSpace(target) + sourceMessageID = strings.TrimSpace(sourceMessageID) + if target == "" || sourceMessageID == "" { + if p.logger != nil { + p.logger.Warn("cannot dispatch reactions: missing target or source message ID", + slog.String("bot_id", botID), + slog.String("channel", channelType.String()), + ) + } + return + } + for _, reaction := range reactions { + req := channel.ReactRequest{ + Target: target, + MessageID: sourceMessageID, + Emoji: reaction.Emoji, + } + if err := p.reactor.React(ctx, strings.TrimSpace(botID), channelType, req); err != nil { + if p.logger != nil { + p.logger.Warn("inline reaction failed", + slog.String("bot_id", botID), + slog.String("channel", channelType.String()), + slog.String("emoji", reaction.Emoji), + slog.String("message_id", sourceMessageID), + slog.Any("error", err), + ) + } + } + } +} + // buildRouteMetadata extracts user/conversation information for route metadata persistence. func buildRouteMetadata(msg channel.InboundMessage, identity InboundIdentity) map[string]any { m := make(map[string]any) diff --git a/internal/channel/types.go b/internal/channel/types.go index 268616f4..c1da3065 100644 --- a/internal/channel/types.go +++ b/internal/channel/types.go @@ -101,6 +101,7 @@ const ( StreamEventAttachment StreamEventType = "attachment" StreamEventAgentStart StreamEventType = "agent_start" StreamEventAgentEnd StreamEventType = "agent_end" + StreamEventReaction StreamEventType = "reaction" StreamEventProcessingStarted StreamEventType = "processing_started" StreamEventProcessingCompleted StreamEventType = "processing_completed" StreamEventProcessingFailed StreamEventType = "processing_failed" @@ -146,6 +147,7 @@ type StreamEvent struct { ToolCall *StreamToolCall `json:"tool_call,omitempty"` Phase StreamPhase `json:"phase,omitempty"` Attachments []Attachment `json:"attachments,omitempty"` + Reactions []ReactRequest `json:"reactions,omitempty"` Metadata map[string]any `json:"metadata,omitempty"` } diff --git a/packages/agent/src/agent.ts b/packages/agent/src/agent.ts index b1b4d3ca..f193fa96 100644 --- a/packages/agent/src/agent.ts +++ b/packages/agent/src/agent.ts @@ -25,11 +25,13 @@ import { system, schedule, heartbeat, subagentSystem } from './prompts' import { AuthFetcher } from './types' import { createModel } from './model' import { - extractAttachmentsFromText, stripAttachmentsFromMessages, dedupeAttachments, - AttachmentsStreamExtractor, + attachmentsResolver, } from './utils/attachments' +import type { ContainerFileAttachment } from './types/attachment' +import { reactionsResolver, type ReactionItem } from './utils/reactions' +import { StreamTagExtractor, extractTagsFromText, type TagEvent } from './utils/tag-extractor' import { createImagePartFromAttachment } from './utils/image-parts' import type { GatewayInputAttachment } from './types/attachment' import { getMCPTools } from './tools/mcp' @@ -330,10 +332,16 @@ export const createAgent = ( basePrepareStep: () => ({ system: systemPrompt }), }) const stepUsages = buildStepUsages(steps) - const { cleanedText, attachments: textAttachments } = - extractAttachmentsFromText(text) + const tagResolvers = [attachmentsResolver, reactionsResolver] + const { cleanedText, events } = extractTagsFromText(text, tagResolvers) + const textAttachments = events + .filter((e) => e.tag === 'attachments') + .flatMap((e) => e.data as ContainerFileAttachment[]) + const reactions = events + .filter((e) => e.tag === 'reactions') + .flatMap((e) => e.data as ReactionItem[]) const { messages: strippedMessages, attachments: messageAttachments } = - stripAttachmentsFromMessages(response.messages) + stripAttachmentsFromMessages(response.messages, [reactionsResolver]) const allAttachments = dedupeAttachments([ ...textAttachments, ...messageAttachments, @@ -348,6 +356,7 @@ export const createAgent = ( usage, text: cleanedText, attachments: allAttachments, + reactions, skills: getEnabledSkills(), } } @@ -472,12 +481,34 @@ export const createAgent = ( return 'Model stream failed' } + function* emitTagEvents(events: TagEvent[]): Generator { + for (const event of events) { + switch (event.tag) { + case 'attachments': { + const attachments = dedupeAttachments(event.data as ContainerFileAttachment[]) as ContainerFileAttachment[] + if (attachments.length) { + yield { type: 'attachment_delta', attachments } + } + break + } + case 'reactions': { + const reactions = event.data as ReactionItem[] + if (reactions.length) { + yield { type: 'reaction_delta', reactions } + } + break + } + } + } + } + async function* stream(input: AgentInput): AsyncGenerator { const userPrompt = generateUserPrompt(input) const messages = [...input.messages, userPrompt] input.skills.forEach((skill) => enableSkill(skill)) const systemPrompt = await generateSystemPrompt() - const attachmentsExtractor = new AttachmentsStreamExtractor() + const tagResolvers = [attachmentsResolver, reactionsResolver] + const tagExtractor = new StreamTagExtractor(tagResolvers) const textLoopGuard = loopDetectionEnabled ? createTextLoopGuard({ consecutiveHitsToAbort: LOOP_DETECTED_STREAK_THRESHOLD, @@ -598,9 +629,7 @@ export const createAgent = ( } break case 'text-delta': { - const { visibleText, attachments } = attachmentsExtractor.push( - chunk.text, - ) + const { visibleText, events } = tagExtractor.push(chunk.text) if (visibleText) { if (textLoopProbeBuffer) { textLoopProbeBuffer.push(visibleText) @@ -610,16 +639,11 @@ export const createAgent = ( delta: visibleText, } } - if (attachments.length) { - yield { - type: 'attachment_delta', - attachments, - } - } + yield* emitTagEvents(events) break } case 'text-end': { - const remainder = attachmentsExtractor.flushRemainder() + const remainder = tagExtractor.flushRemainder() if (remainder.visibleText) { if (textLoopProbeBuffer) { textLoopProbeBuffer.push(remainder.visibleText) @@ -632,20 +656,15 @@ export const createAgent = ( if (textLoopProbeBuffer) { textLoopProbeBuffer.flush() } - if (remainder.attachments.length) { - yield { - type: 'attachment_delta', - attachments: remainder.attachments, - } - } + yield* emitTagEvents(remainder.events) yield { type: 'text_end', metadata: chunk, } break } - case 'tool-call': - const remainder = attachmentsExtractor.flushRemainder() + case 'tool-call': { + const remainder = tagExtractor.flushRemainder() if (remainder.visibleText) { if (textLoopProbeBuffer) { textLoopProbeBuffer.push(remainder.visibleText) @@ -658,12 +677,7 @@ export const createAgent = ( if (textLoopProbeBuffer) { textLoopProbeBuffer.flush() } - if (remainder.attachments.length) { - yield { - type: 'attachment_delta', - attachments: remainder.attachments, - } - } + yield* emitTagEvents(remainder.events) yield { type: 'tool_call_start', toolName: chunk.toolName, @@ -672,7 +686,8 @@ export const createAgent = ( metadata: chunk, } break - case 'tool-result': + } + case 'tool-result': { const shouldAbortForToolLoop = toolLoopAbortCallIds.delete(chunk.toolCallId) yield { type: 'tool_call_end', @@ -691,6 +706,7 @@ export const createAgent = ( throw new Error(TOOL_LOOP_DETECTED_ABORT_MESSAGE) } break + } case 'file': yield { type: 'attachment_delta', @@ -710,6 +726,7 @@ export const createAgent = ( const { messages: strippedMessages } = stripAttachmentsFromMessages( result.messages, + [reactionsResolver], ) yield { type: 'agent_end', diff --git a/packages/agent/src/prompts/system.ts b/packages/agent/src/prompts/system.ts index d123e06c..6034cea9 100644 --- a/packages/agent/src/prompts/system.ts +++ b/packages/agent/src/prompts/system.ts @@ -206,6 +206,22 @@ Rules: - No extra text inside ${quote('...')} - The block can appear anywhere in your response; it will be parsed and stripped from visible text +## Reactions + +To react with an emoji to the message you are replying to, use this format in your direct response: + +${block([ + '', + '- 👍', + '', +].join('\n'))} + +Rules: +- One emoji per line, prefixed by ${quote('- ')} +- The block can appear anywhere in your response; it will be parsed and stripped from visible text +- This reacts to the **source message** of the current conversation (the message you are responding to) +- For reacting to messages in other channels or removing reactions, use the ${quote('react')} tool instead + ## Schedule Tasks You can create and manage schedule tasks via cron. diff --git a/packages/agent/src/types/action.ts b/packages/agent/src/types/action.ts index 327a91c8..f07e0848 100644 --- a/packages/agent/src/types/action.ts +++ b/packages/agent/src/types/action.ts @@ -1,6 +1,7 @@ import { LanguageModelUsage, ModelMessage } from 'ai' import { AgentInput } from './agent' import { AgentAttachment } from './attachment' +import { ReactionItem } from '../utils/reactions' export interface BaseAction { type: string @@ -39,6 +40,11 @@ export interface AttachmentDeltaAction extends BaseAction { attachments: AgentAttachment[] } +export interface ReactionDeltaAction extends BaseAction { + type: 'reaction_delta' + reactions: ReactionItem[] +} + export interface TextEndAction extends BaseAction { type: 'text_end' } @@ -84,6 +90,7 @@ export type AgentStreamAction = | TextStartAction | TextDeltaAction | AttachmentDeltaAction + | ReactionDeltaAction | TextEndAction | ToolCallStartAction | ToolCallEndAction diff --git a/packages/agent/src/utils/attachments.ts b/packages/agent/src/utils/attachments.ts index 87e767f2..bf809bf9 100644 --- a/packages/agent/src/utils/attachments.ts +++ b/packages/agent/src/utils/attachments.ts @@ -3,9 +3,12 @@ import type { AgentAttachment, ContainerFileAttachment, } from '../types/attachment' +import type { TagResolver } from './tag-extractor' +import { StreamTagExtractor, extractTagsFromText } from './tag-extractor' -const ATTACHMENTS_START = '' -const ATTACHMENTS_END = '' +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- /** * Get a unique key for deduplication of attachments. @@ -39,26 +42,41 @@ export const parseAttachmentPaths = (content: string): string[] => { .filter(Boolean) } +// --------------------------------------------------------------------------- +// TagResolver for +// --------------------------------------------------------------------------- + +export const attachmentsResolver: TagResolver = { + tag: 'attachments', + parse(content: string): ContainerFileAttachment[] { + const paths = Array.from(new Set(parseAttachmentPaths(content))) + return paths.map((path): ContainerFileAttachment => ({ type: 'file', path })) + }, +} + +// --------------------------------------------------------------------------- +// Batch extraction (backward-compatible wrapper) +// --------------------------------------------------------------------------- + /** * Extract all `...` blocks from a text string. * Returns the cleaned text (blocks removed) and the parsed file attachments. */ export const extractAttachmentsFromText = (text: string): { cleanedText: string; attachments: ContainerFileAttachment[] } => { - const paths: string[] = [] - const cleanedText = text.replace( - /([\s\S]*?)<\/attachments>/g, - (_match, inner: string) => { - paths.push(...parseAttachmentPaths(inner)) - return '' - } - ) - const uniquePaths = Array.from(new Set(paths)) + const { cleanedText, events } = extractTagsFromText(text, [attachmentsResolver]) + const attachments = events + .filter((e) => e.tag === 'attachments') + .flatMap((e) => e.data as ContainerFileAttachment[]) return { - cleanedText: cleanedText.replace(/\n{3,}/g, '\n\n').trim(), - attachments: uniquePaths.map((path): ContainerFileAttachment => ({ type: 'file', path })), + cleanedText, + attachments: dedupeAttachments(attachments) as ContainerFileAttachment[], } } +// --------------------------------------------------------------------------- +// Message-level stripping +// --------------------------------------------------------------------------- + /** * Type guard: checks whether a content part is a TextPart. */ @@ -72,13 +90,25 @@ const isTextPart = (part: unknown): part is TextPart => { } /** - * Strip `` blocks from all assistant messages in a message list. + * Strip all registered tag blocks from assistant messages in a message list. + * Accepts additional resolvers to strip beyond `` (e.g. ``). * Returns the cleaned messages and a deduplicated list of attachments found. */ export const stripAttachmentsFromMessages = ( - messages: ModelMessage[] + messages: ModelMessage[], + extraResolvers: TagResolver[] = [], ): { messages: ModelMessage[]; attachments: ContainerFileAttachment[] } => { const allAttachments: ContainerFileAttachment[] = [] + const resolvers: TagResolver[] = [attachmentsResolver, ...extraResolvers] + + const cleanText = (text: string): string => { + const { cleanedText, events } = extractTagsFromText(text, resolvers) + const attachments = events + .filter((e) => e.tag === 'attachments') + .flatMap((e) => e.data as ContainerFileAttachment[]) + allAttachments.push(...attachments) + return cleanedText + } const stripped = messages.map((msg): ModelMessage => { if (msg.role !== 'assistant') return msg @@ -87,17 +117,13 @@ export const stripAttachmentsFromMessages = ( const { content } = assistantMsg if (typeof content === 'string') { - const { cleanedText, attachments } = extractAttachmentsFromText(content) - allAttachments.push(...attachments) - return { ...assistantMsg, content: cleanedText } + return { ...assistantMsg, content: cleanText(content) } } if (Array.isArray(content)) { const newParts = content.map(part => { if (!isTextPart(part)) return part - const { cleanedText, attachments } = extractAttachmentsFromText(part.text) - allAttachments.push(...attachments) - return { ...part, text: cleanedText } + return { ...part, text: cleanText(part.text) } }) return { ...assistantMsg, content: newParts } } @@ -112,7 +138,7 @@ export const stripAttachmentsFromMessages = ( } // --------------------------------------------------------------------------- -// Streaming extractor +// Streaming extractor (backward-compatible wrapper) // --------------------------------------------------------------------------- export interface AttachmentsStreamResult { @@ -121,81 +147,35 @@ export interface AttachmentsStreamResult { } /** - * Incremental state-machine that intercepts `...` - * blocks from a stream of text deltas. Text outside those blocks is passed - * through as `visibleText`; completed blocks are emitted as `attachments`. + * Backward-compatible streaming extractor that delegates to {@link StreamTagExtractor}. + * Intercepts `...` blocks from a stream of text deltas. */ export class AttachmentsStreamExtractor { - private state: 'text' | 'attachments' = 'text' - private buffer = '' - private attachmentsBuffer = '' + private inner: StreamTagExtractor - /** - * Feed a new text delta into the extractor. - */ - push(delta: string): AttachmentsStreamResult { - this.buffer += delta - let visible = '' - const attachments: ContainerFileAttachment[] = [] - - while (this.buffer.length > 0) { - if (this.state === 'text') { - const idx = this.buffer.indexOf(ATTACHMENTS_START) - if (idx === -1) { - // Emit everything except a small tail that could be the start of the opening tag. - const keep = Math.min(this.buffer.length, ATTACHMENTS_START.length - 1) - const emit = this.buffer.slice(0, this.buffer.length - keep) - visible += emit - this.buffer = this.buffer.slice(this.buffer.length - keep) - break - } - - visible += this.buffer.slice(0, idx) - this.buffer = this.buffer.slice(idx + ATTACHMENTS_START.length) - this.attachmentsBuffer = '' - this.state = 'attachments' - continue - } - - // state === 'attachments' - const endIdx = this.buffer.indexOf(ATTACHMENTS_END) - if (endIdx === -1) { - const keep = Math.min(this.buffer.length, ATTACHMENTS_END.length - 1) - const take = this.buffer.slice(0, this.buffer.length - keep) - this.attachmentsBuffer += take - this.buffer = this.buffer.slice(this.buffer.length - keep) - break - } - - this.attachmentsBuffer += this.buffer.slice(0, endIdx) - const paths = parseAttachmentPaths(this.attachmentsBuffer) - for (const path of paths) { - attachments.push({ type: 'file', path }) - } - this.buffer = this.buffer.slice(endIdx + ATTACHMENTS_END.length) - this.attachmentsBuffer = '' - this.state = 'text' - } - - return { visibleText: visible, attachments: dedupeAttachments(attachments) as ContainerFileAttachment[] } + constructor() { + this.inner = new StreamTagExtractor([attachmentsResolver]) } - /** - * Flush any remaining buffered content. Call this when the stream ends. - * If an `` block was not properly closed, the raw text is - * returned as `visibleText` to avoid data loss. - */ - flushRemainder(): AttachmentsStreamResult { - if (this.state === 'text') { - const out = this.buffer - this.buffer = '' - return { visibleText: out, attachments: [] } + push(delta: string): AttachmentsStreamResult { + const { visibleText, events } = this.inner.push(delta) + const attachments = events + .filter((e) => e.tag === 'attachments') + .flatMap((e) => e.data as ContainerFileAttachment[]) + return { + visibleText, + attachments: dedupeAttachments(attachments) as ContainerFileAttachment[], + } + } + + flushRemainder(): AttachmentsStreamResult { + const { visibleText, events } = this.inner.flushRemainder() + const attachments = events + .filter((e) => e.tag === 'attachments') + .flatMap((e) => e.data as ContainerFileAttachment[]) + return { + visibleText, + attachments: dedupeAttachments(attachments) as ContainerFileAttachment[], } - // Unclosed attachments block — treat it as literal text to avoid data loss. - const out = `${ATTACHMENTS_START}${this.attachmentsBuffer}${this.buffer}` - this.state = 'text' - this.buffer = '' - this.attachmentsBuffer = '' - return { visibleText: out, attachments: [] } } } diff --git a/packages/agent/src/utils/reactions.ts b/packages/agent/src/utils/reactions.ts new file mode 100644 index 00000000..aed42b4c --- /dev/null +++ b/packages/agent/src/utils/reactions.ts @@ -0,0 +1,28 @@ +import type { TagResolver } from './tag-extractor' + +export interface ReactionItem { + emoji: string +} + +/** + * Parse emoji entries from the inner content of a `` block. + * Each line should be formatted as `- 👍`. + */ +export const parseReactionEmojis = (content: string): string[] => { + return content + .split('\n') + .map(line => line.trim()) + .map(line => { + if (!line.startsWith('-')) return '' + return line.slice(1).trim() + }) + .filter(Boolean) +} + +export const reactionsResolver: TagResolver = { + tag: 'reactions', + parse(content: string): ReactionItem[] { + const emojis = Array.from(new Set(parseReactionEmojis(content))) + return emojis.map((emoji): ReactionItem => ({ emoji })) + }, +} diff --git a/packages/agent/src/utils/tag-extractor.ts b/packages/agent/src/utils/tag-extractor.ts new file mode 100644 index 00000000..6195645d --- /dev/null +++ b/packages/agent/src/utils/tag-extractor.ts @@ -0,0 +1,183 @@ +/** + * Generic extensible tag-interception system. + * + * Register TagResolver instances (e.g. attachments, reactions) and both the + * batch extractor and the streaming state-machine will intercept the + * corresponding `...` blocks, stripping them from visible text and + * forwarding the parsed payload through {@link TagEvent} objects. + */ + +// --------------------------------------------------------------------------- +// Public interfaces +// --------------------------------------------------------------------------- + +export interface TagResolver { + tag: string + parse(content: string): T[] +} + +export interface TagEvent { + tag: string + data: unknown[] +} + +export interface TagStreamResult { + visibleText: string + events: TagEvent[] +} + +// --------------------------------------------------------------------------- +// Batch extractor +// --------------------------------------------------------------------------- + +/** + * Extract all registered tag blocks from a complete text string. + * Returns the cleaned text (blocks removed) and a list of tag events. + */ +export function extractTagsFromText( + text: string, + resolvers: TagResolver[], +): { cleanedText: string; events: TagEvent[] } { + const events: TagEvent[] = [] + let cleaned = text + for (const resolver of resolvers) { + const open = `<${resolver.tag}>` + const close = `` + const pattern = new RegExp( + `${escapeRegExp(open)}([\\s\\S]*?)${escapeRegExp(close)}`, + 'g', + ) + cleaned = cleaned.replace(pattern, (_match, inner: string) => { + const parsed = resolver.parse(inner) + if (parsed.length > 0) { + events.push({ tag: resolver.tag, data: parsed }) + } + return '' + }) + } + return { + cleanedText: cleaned.replace(/\n{3,}/g, '\n\n').trim(), + events, + } +} + +// --------------------------------------------------------------------------- +// Streaming extractor +// --------------------------------------------------------------------------- + +interface ResolverMeta { + resolver: TagResolver + openTag: string + closeTag: string +} + +/** + * Incremental state-machine that intercepts multiple `...` blocks + * from a stream of text deltas. + * + * Text outside registered blocks is passed through as `visibleText`; completed + * blocks are emitted as {@link TagEvent} entries. + */ +export class StreamTagExtractor { + private metas: ResolverMeta[] + private maxOpenLen: number + private state: 'text' | 'inside' = 'text' + private activeMeta: ResolverMeta | null = null + private buffer = '' + private tagBuffer = '' + + constructor(resolvers: TagResolver[]) { + this.metas = resolvers.map((resolver) => ({ + resolver, + openTag: `<${resolver.tag}>`, + closeTag: ``, + })) + this.maxOpenLen = Math.max(...this.metas.map((m) => m.openTag.length), 0) + } + + push(delta: string): TagStreamResult { + this.buffer += delta + let visible = '' + const events: TagEvent[] = [] + + while (this.buffer.length > 0) { + if (this.state === 'text') { + let earliestIdx = -1 + let matchedMeta: ResolverMeta | null = null + + for (const meta of this.metas) { + const idx = this.buffer.indexOf(meta.openTag) + if (idx !== -1 && (earliestIdx === -1 || idx < earliestIdx)) { + earliestIdx = idx + matchedMeta = meta + } + } + + if (earliestIdx === -1) { + const keep = Math.min(this.buffer.length, this.maxOpenLen - 1) + const emit = this.buffer.slice(0, this.buffer.length - keep) + visible += emit + this.buffer = this.buffer.slice(this.buffer.length - keep) + break + } + + visible += this.buffer.slice(0, earliestIdx) + this.buffer = this.buffer.slice(earliestIdx + matchedMeta!.openTag.length) + this.tagBuffer = '' + this.activeMeta = matchedMeta + this.state = 'inside' + continue + } + + // state === 'inside' + const closeTag = this.activeMeta!.closeTag + const endIdx = this.buffer.indexOf(closeTag) + if (endIdx === -1) { + const keep = Math.min(this.buffer.length, closeTag.length - 1) + const take = this.buffer.slice(0, this.buffer.length - keep) + this.tagBuffer += take + this.buffer = this.buffer.slice(this.buffer.length - keep) + break + } + + this.tagBuffer += this.buffer.slice(0, endIdx) + const parsed = this.activeMeta!.resolver.parse(this.tagBuffer) + if (parsed.length > 0) { + events.push({ tag: this.activeMeta!.resolver.tag, data: parsed }) + } + this.buffer = this.buffer.slice(endIdx + closeTag.length) + this.tagBuffer = '' + this.activeMeta = null + this.state = 'text' + } + + return { visibleText: visible, events } + } + + /** + * Flush remaining buffered content. Call when the stream ends. + * Unclosed tag blocks are returned as literal `visibleText` to avoid data loss. + */ + flushRemainder(): TagStreamResult { + if (this.state === 'text') { + const out = this.buffer + this.buffer = '' + return { visibleText: out, events: [] } + } + const meta = this.activeMeta! + const out = `${meta.openTag}${this.tagBuffer}${this.buffer}` + this.state = 'text' + this.buffer = '' + this.tagBuffer = '' + this.activeMeta = null + return { visibleText: out, events: [] } + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function escapeRegExp(s: string): string { + return s.replace(/[.*+?^${}()|[\]\\]/g, '\\$&') +}