From 8c4e9e218e2c0046a45afd935189bd08af2f71b6 Mon Sep 17 00:00:00 2001 From: zhangxx Date: Wed, 8 Apr 2026 01:25:09 +0800 Subject: [PATCH] Merge pull request #323 from mx1700/feat/stop-command feat: add /stop command to abort agent generation on external channels --- go.sum | 6 -- internal/channel/inbound/channel.go | 104 +++++++++++++++++++++++++++- internal/command/handler.go | 3 +- internal/command/registry.go | 3 +- 4 files changed, 105 insertions(+), 11 deletions(-) diff --git a/go.sum b/go.sum index 735d7fb1..f2832a0a 100644 --- a/go.sum +++ b/go.sum @@ -228,12 +228,6 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/memohai/acgo v0.0.0-20260221232113-babac0d6acd7 h1:beehwOQperqGWj4m4EhcPhnSZKtDiuHK/7ZMoTPaQjw= github.com/memohai/acgo v0.0.0-20260221232113-babac0d6acd7/go.mod h1:OvmxM7JmnXBmwJWWVqtreL3HSHSKuzPbtbhlg5MvBg0= -github.com/memohai/twilight-ai v0.3.4-0.20260329101319-3ebcc563f5d9 h1:vpq3FgZ7UJAWr63M4mAtB8wvUWHSAdrgWibTFocXFBk= -github.com/memohai/twilight-ai v0.3.4-0.20260329101319-3ebcc563f5d9/go.mod h1:GZTT9GUT3uSs6zram/FcF24GLTZMFSpiybbYmjr+gH8= -github.com/memohai/twilight-ai v0.3.4-0.20260402145441-9e9f798cbd2d h1:tZYwJ0YDc6FUlh3mXYv+Tkw5dLfMD6IgP8VY5/+AfB0= -github.com/memohai/twilight-ai v0.3.4-0.20260402145441-9e9f798cbd2d/go.mod h1:GZTT9GUT3uSs6zram/FcF24GLTZMFSpiybbYmjr+gH8= -github.com/memohai/twilight-ai v0.3.4-0.20260402155501-497ad09c6724 h1:/Hw5vHfAeHRGx+duPKyetT5n2t6J5cYMfUysN/Xh9U0= -github.com/memohai/twilight-ai v0.3.4-0.20260402155501-497ad09c6724/go.mod h1:GZTT9GUT3uSs6zram/FcF24GLTZMFSpiybbYmjr+gH8= github.com/memohai/twilight-ai v0.3.4-0.20260402160505-00db38ee4442 h1:mTy+OSkMCOvF1S6D5asKRdKx0A+icQvnu6A/f7aZolg= github.com/memohai/twilight-ai v0.3.4-0.20260402160505-00db38ee4442/go.mod h1:GZTT9GUT3uSs6zram/FcF24GLTZMFSpiybbYmjr+gH8= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= diff --git a/internal/channel/inbound/channel.go b/internal/channel/inbound/channel.go index 1a116bae..03b86539 100644 --- a/internal/channel/inbound/channel.go +++ b/internal/channel/inbound/channel.go @@ -111,6 +111,11 @@ type ChannelInboundProcessor struct { pipeline *pipelinepkg.Pipeline eventStore *pipelinepkg.EventStore discussDriver *pipelinepkg.DiscussDriver + + // activeStreams maps "botID:routeID" to a context.CancelFunc for the + // currently running agent stream. Used by /stop to abort generation + // on external channels (Telegram, Discord, etc.). + activeStreams sync.Map } // NewChannelInboundProcessor creates a processor with channel identity-based resolution. @@ -290,11 +295,14 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel // (via @mention or reply) to avoid all bots responding to the same command. cmdText := rawTextForCommand(msg, text) - // /new requires route context, so it is handled separately from the - // general command handler (which runs before route resolution). + // /new and /stop require route context, so they are handled separately + // from the general command handler (which runs before route resolution). if isNewSessionCommand(cmdText) && isDirectedAtBot(msg) { return p.handleNewSessionCommand(ctx, cfg, msg, sender, identity) } + if isStopCommand(cmdText) && isDirectedAtBot(msg) { + return p.handleStopCommand(ctx, cfg, msg, sender, identity) + } // Skip generic command handler for mode-prefix commands (/btw, /now, /next) // so they pass through to mode detection below. @@ -716,7 +724,15 @@ startStream: if re, _ := msg.Metadata["reasoning_effort"].(string); strings.TrimSpace(re) != "" { chatReq.ReasoningEffort = strings.TrimSpace(re) } - chunkCh, streamErrCh := p.runner.StreamChat(ctx, chatReq) + // Create a cancellable context so /stop can abort the stream. + streamCtx, streamCancel := context.WithCancel(ctx) + defer streamCancel() + + streamKey := strings.TrimSpace(identity.BotID) + ":" + strings.TrimSpace(resolved.RouteID) + p.activeStreams.Store(streamKey, streamCancel) + defer p.activeStreams.Delete(streamKey) + + chunkCh, streamErrCh := p.runner.StreamChat(streamCtx, chatReq) var ( finalMessages []conversation.ModelMessage @@ -2487,6 +2503,88 @@ func (p *ChannelInboundProcessor) enrichConversationAvatar(ctx context.Context, } } +// isStopCommand returns true when the command text is "/stop" (with +// optional Telegram-style @botname suffix and trailing whitespace). +func isStopCommand(cmdText string) bool { + extracted := command.ExtractCommandText(cmdText) + if extracted == "" { + return false + } + parsed, err := command.Parse(extracted) + if err != nil { + return false + } + return parsed.Resource == "stop" +} + +// handleStopCommand resolves the route for the current conversation and +// cancels any active agent stream, effectively aborting the generation. +func (p *ChannelInboundProcessor) handleStopCommand( + ctx context.Context, + cfg channel.ChannelConfig, + msg channel.InboundMessage, + sender channel.StreamReplySender, + identity InboundIdentity, +) error { + target := strings.TrimSpace(msg.ReplyTarget) + if target == "" { + return errors.New("reply target missing for /stop command") + } + + if p.routeResolver == nil { + return sender.Send(ctx, channel.OutboundMessage{ + Target: target, + Message: channel.Message{Text: "Error: route resolver not configured."}, + }) + } + + threadID := extractThreadID(msg) + routeMetadata := buildRouteMetadata(msg, identity) + p.enrichConversationAvatar(ctx, cfg, msg, routeMetadata) + resolved, err := p.routeResolver.ResolveConversation(ctx, route.ResolveInput{ + BotID: identity.BotID, + Platform: msg.Channel.String(), + ConversationID: msg.Conversation.ID, + ThreadID: threadID, + ConversationType: msg.Conversation.Type, + ChannelIdentityID: identity.UserID, + ChannelConfigID: identity.ChannelConfigID, + ReplyTarget: target, + Metadata: routeMetadata, + }) + if err != nil { + if p.logger != nil { + p.logger.Warn("resolve route for /stop command failed", slog.Any("error", err)) + } + return sender.Send(ctx, channel.OutboundMessage{ + Target: target, + Message: channel.Message{Text: "Error: failed to resolve conversation route."}, + }) + } + + streamKey := strings.TrimSpace(identity.BotID) + ":" + strings.TrimSpace(resolved.RouteID) + cancelVal, loaded := p.activeStreams.LoadAndDelete(streamKey) + if !loaded { + // No active stream — silently ignore. + return nil + } + + cancelFn, ok := cancelVal.(context.CancelFunc) + if !ok { + return nil + } + + cancelFn() + if p.logger != nil { + p.logger.Info("agent stream aborted via /stop command", + slog.String("bot_id", strings.TrimSpace(identity.BotID)), + slog.String("route_id", strings.TrimSpace(resolved.RouteID)), + slog.String("channel", msg.Channel.String()), + ) + } + return nil +} + // isNewSessionCommand returns true when the command text is "/new" (with // optional Telegram-style @botname suffix and trailing whitespace). func isNewSessionCommand(cmdText string) bool { diff --git a/internal/command/handler.go b/internal/command/handler.go index cfe97600..ecc510be 100644 --- a/internal/command/handler.go +++ b/internal/command/handler.go @@ -112,7 +112,8 @@ func NewHandler( // the regular resource-group dispatch (e.g. in the channel inbound // processor which has the required routing context). var topLevelCommands = map[string]string{ - "new": "Start a new conversation (resets session context)", + "new": "Start a new conversation (resets session context)", + "stop": "Stop the current generation", } // IsCommand reports whether the text contains a slash command. diff --git a/internal/command/registry.go b/internal/command/registry.go index bbf01fbe..80de168a 100644 --- a/internal/command/registry.go +++ b/internal/command/registry.go @@ -81,7 +81,8 @@ func (r *Registry) GlobalHelp() string { var b strings.Builder b.WriteString("Available commands:\n\n") b.WriteString("/help - Show this help message\n") - b.WriteString("/new - Start a new conversation (resets session context)\n\n") + b.WriteString("/new - Start a new conversation (resets session context)\n") + b.WriteString("/stop - Stop the current generation\n\n") for i, name := range r.order { if i > 0 { b.WriteByte('\n')