fix(flow): stabilize chunked SSE and unify prune limits for read/exec/gateway (#71)

* fix(agent): emit chunked SSE data

fix(flow): reassemble chunked SSE and prune tool payloads

fix: avoid whitespace prune bypass; optimize chunked SSE builder

* refactor: LLM provider pruning use shared textprune library

* chore: smaller range
This commit is contained in:
Ringo.Typowriter
2026-02-21 17:06:02 +08:00
committed by GitHub
parent 2de8095c75
commit 9461f923df
11 changed files with 1160 additions and 59 deletions
+79 -52
View File
@@ -2,9 +2,11 @@ package flow
import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
@@ -34,6 +36,12 @@ const (
sharedMemoryNamespace = "bot"
// Keep gateway payload bounded when inlining binary attachments as data URLs.
gatewayInlineAttachmentMaxBytes int64 = 20 * 1024 * 1024
// SSE payloads (especially attachment/tool results) can be very large.
// bufio.Scanner hard-fails with "token too long" if a single line exceeds its max token size.
// Use a reader-based parser and enforce an explicit per-line cap here. The agent gateway
// stream is expected to chunk large JSON payloads across multiple SSE "data:" lines, so
// this limit should stay relatively small.
gatewaySSEMaxLineBytes = 256 * 1024
)
// SkillEntry represents a skill loaded from the container.
@@ -255,11 +263,16 @@ func (r *Resolver) resolve(ctx context.Context, req conversation.ChatRequest) (r
// Build non-history parts first so we can reserve their token cost before
// trimming history messages.
memoryMsg := r.loadMemoryContextMessage(ctx, req)
reqMessages := pruneMessagesForGateway(nonNilModelMessages(req.Messages))
if memoryMsg != nil {
pruned, _ := pruneMessageForGateway(*memoryMsg)
memoryMsg = &pruned
}
var overhead int
if memoryMsg != nil {
overhead += estimateMessageTokens(*memoryMsg)
}
for _, m := range req.Messages {
for _, m := range reqMessages {
overhead += estimateMessageTokens(m)
}
// Reserve space for the system prompt built by the agent gateway
@@ -278,12 +291,13 @@ func (r *Resolver) resolve(ctx context.Context, req conversation.ChatRequest) (r
if loadErr != nil {
return resolvedContext{}, loadErr
}
loaded = pruneHistoryForGateway(loaded)
messages = trimMessagesByTokens(loaded, historyBudget)
}
if memoryMsg != nil {
messages = append(messages, *memoryMsg)
}
messages = append(messages, req.Messages...)
messages = append(messages, reqMessages...)
messages = sanitizeMessages(messages)
skills := dedup(req.Skills)
containerID := r.resolveContainerID(ctx, req.BotID, req.ContainerID)
@@ -580,39 +594,66 @@ func (r *Resolver) streamChat(ctx context.Context, payload gatewayRequest, req c
return fmt.Errorf("agent gateway error: %s", strings.TrimSpace(string(errBody)))
}
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 0, 64*1024), 2*1024*1024)
currentEvent := ""
stored := false
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
if strings.HasPrefix(line, "event:") {
currentEvent = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
continue
}
if !strings.HasPrefix(line, "data:") {
continue
}
data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if data == "" || data == "[DONE]" {
continue
}
chunkCh <- conversation.StreamChunk([]byte(data))
var dataBuf bytes.Buffer
if stored {
flushEvent := func() error {
if dataBuf.Len() == 0 {
return nil
}
out := append([]byte(nil), dataBuf.Bytes()...)
dataBuf.Reset()
if len(out) == 0 || bytes.Equal(bytes.TrimSpace(out), []byte("[DONE]")) {
return nil
}
// Persist final messages before forwarding the "done"/"agent_end" event so the
// next user turn can immediately see the assistant output in history.
if !stored {
if handled, storeErr := r.tryStoreStream(ctx, req, out); storeErr != nil {
return storeErr
} else if handled {
stored = true
}
}
chunkCh <- conversation.StreamChunk(out)
return nil
}
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 64*1024), gatewaySSEMaxLineBytes)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
if err := flushEvent(); err != nil {
return err
}
continue
}
if handled, storeErr := r.tryStoreStream(ctx, req, currentEvent, data); storeErr != nil {
return storeErr
} else if handled {
stored = true
if len(line) > 0 && line[0] == ':' {
continue
}
if !bytes.HasPrefix(line, []byte("data:")) {
continue
}
part := bytes.TrimPrefix(line, []byte("data:"))
// Backward-compat: older SSE writers used "data: <payload>" (note the space).
// Only strip the first leading space for the *first* fragment to avoid corrupting
// chunked payloads split inside JSON string values.
if dataBuf.Len() == 0 && len(part) > 0 && part[0] == ' ' {
part = part[1:]
}
if len(part) == 0 {
continue
}
_, _ = dataBuf.Write(part)
}
return scanner.Err()
if err := scanner.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return fmt.Errorf("sse line too long (max %d bytes)", gatewaySSEMaxLineBytes)
}
return err
}
return flushEvent()
}
func newJSONRequestWithContext(ctx context.Context, method, url string, payload any) (*http.Request, error) {
@@ -631,24 +672,15 @@ func newJSONRequestWithContext(ctx context.Context, method, url string, payload
}
// tryStoreStream attempts to extract final messages from a stream event and persist them.
func (r *Resolver) tryStoreStream(ctx context.Context, req conversation.ChatRequest, eventType, data string) (bool, error) {
// event: done + data: {messages: [...]}
if eventType == "done" {
var resp gatewayResponse
if err := json.Unmarshal([]byte(data), &resp); err == nil && len(resp.Messages) > 0 {
return true, r.storeRound(ctx, req, resp.Messages, resp.Usage)
}
}
func (r *Resolver) tryStoreStream(ctx context.Context, req conversation.ChatRequest, data []byte) (bool, error) {
// data: {"type":"text_delta"|"agent_end"|"done", ...}
var envelope struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
Messages []conversation.ModelMessage `json:"messages"`
Skills []string `json:"skills"`
Usage json.RawMessage `json:"usage,omitempty"`
}
if err := json.Unmarshal([]byte(data), &envelope); err == nil {
if err := json.Unmarshal(data, &envelope); err == nil {
if (envelope.Type == "agent_end" || envelope.Type == "done") && len(envelope.Messages) > 0 {
return true, r.storeRound(ctx, req, envelope.Messages, envelope.Usage)
}
@@ -662,7 +694,7 @@ func (r *Resolver) tryStoreStream(ctx context.Context, req conversation.ChatRequ
// fallback: data: {messages: [...]}
var resp gatewayResponse
if err := json.Unmarshal([]byte(data), &resp); err == nil && len(resp.Messages) > 0 {
if err := json.Unmarshal(data, &resp); err == nil && len(resp.Messages) > 0 {
return true, r.storeRound(ctx, req, resp.Messages, resp.Usage)
}
return false, nil
@@ -763,19 +795,14 @@ func normalizeGatewayAttachmentPayload(item gatewayAttachment) gatewayAttachment
if payload == "" {
return item
}
lower := strings.ToLower(payload)
if strings.HasPrefix(lower, "data:") {
if strings.TrimSpace(item.Mime) == "" || strings.EqualFold(strings.TrimSpace(item.Mime), "application/octet-stream") {
if start := strings.Index(payload, ":"); start >= 0 {
rest := payload[start+1:]
if end := strings.Index(rest, ";"); end > 0 {
mime := strings.TrimSpace(rest[:end])
if mime != "" {
item.Mime = mime
}
}
if strings.HasPrefix(strings.ToLower(payload), "data:") {
mime := strings.TrimSpace(item.Mime)
if mime == "" || strings.EqualFold(mime, "application/octet-stream") {
if extracted := attachmentpkg.MimeFromDataURL(payload); extracted != "" {
item.Mime = extracted
}
}
item.Payload = payload
return item
}
mime := strings.TrimSpace(item.Mime)