Files
Memoh/internal/agent/guard_state.go
Fodesu db777b98ac fix(agent): stream loop abort, mid-stream retry parity, collector cleanup (#376)
* fix(agent): align stream retry abort and event collection

* fix(agent): cancel stream on loop detect, harden retry and tool events

* fix(agent): drain previous stream before retry

* fix(lint): ctx ci lint

---------

Co-authored-by: 晨苒 <16112591+chen-ran@users.noreply.github.com>
2026-04-18 03:19:58 +08:00

115 lines
2.0 KiB
Go

package agent
import (
"sync"
"github.com/memohai/memoh/internal/agent/tools"
)
type toolAbortRegistry struct {
mu sync.Mutex
ids map[string]struct{}
}
func newToolAbortRegistry() *toolAbortRegistry {
return &toolAbortRegistry{
ids: make(map[string]struct{}),
}
}
func (r *toolAbortRegistry) Add(toolCallID string) {
if r == nil || toolCallID == "" {
return
}
r.mu.Lock()
defer r.mu.Unlock()
r.ids[toolCallID] = struct{}{}
}
func (r *toolAbortRegistry) Take(toolCallID string) bool {
if r == nil || toolCallID == "" {
return false
}
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.ids[toolCallID]; !ok {
return false
}
delete(r.ids, toolCallID)
return true
}
func (r *toolAbortRegistry) Any() bool {
if r == nil {
return false
}
r.mu.Lock()
defer r.mu.Unlock()
return len(r.ids) > 0
}
type toolEventCollector struct {
mu sync.Mutex
closed bool
events []tools.ToolStreamEvent
}
func newToolEventCollector() *toolEventCollector {
return &toolEventCollector{}
}
func (c *toolEventCollector) Add(evt tools.ToolStreamEvent) bool {
if c == nil {
return false
}
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return false
}
c.events = append(c.events, evt)
return true
}
func (c *toolEventCollector) Close() {
if c == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
c.closed = true
}
func (c *toolEventCollector) CloseAndSnapshot() []tools.ToolStreamEvent {
if c == nil {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
c.closed = true
snapshot := make([]tools.ToolStreamEvent, len(c.events))
copy(snapshot, c.events)
return snapshot
}
// Snapshot returns a copy of collected events without closing the collector.
// Callers that own the collector lifetime should still invoke Close (or
// CloseAndSnapshot) so late emits are rejected.
func (c *toolEventCollector) Snapshot() []tools.ToolStreamEvent {
if c == nil {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
out := make([]tools.ToolStreamEvent, len(c.events))
copy(out, c.events)
return out
}