Merge pull request #323 from mx1700/feat/stop-command

feat: add /stop command to abort agent generation on external channels
This commit is contained in:
zhangxx
2026-04-08 01:25:09 +08:00
committed by GitHub
parent 8d5c38f0e5
commit 8c4e9e218e
4 changed files with 105 additions and 11 deletions
+101 -3
View File
@@ -111,6 +111,11 @@ type ChannelInboundProcessor struct {
pipeline *pipelinepkg.Pipeline
eventStore *pipelinepkg.EventStore
discussDriver *pipelinepkg.DiscussDriver
// activeStreams maps "botID:routeID" to a context.CancelFunc for the
// currently running agent stream. Used by /stop to abort generation
// on external channels (Telegram, Discord, etc.).
activeStreams sync.Map
}
// NewChannelInboundProcessor creates a processor with channel identity-based resolution.
@@ -290,11 +295,14 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel
// (via @mention or reply) to avoid all bots responding to the same command.
cmdText := rawTextForCommand(msg, text)
// /new requires route context, so it is handled separately from the
// general command handler (which runs before route resolution).
// /new and /stop require route context, so they are handled separately
// from the general command handler (which runs before route resolution).
if isNewSessionCommand(cmdText) && isDirectedAtBot(msg) {
return p.handleNewSessionCommand(ctx, cfg, msg, sender, identity)
}
if isStopCommand(cmdText) && isDirectedAtBot(msg) {
return p.handleStopCommand(ctx, cfg, msg, sender, identity)
}
// Skip generic command handler for mode-prefix commands (/btw, /now, /next)
// so they pass through to mode detection below.
@@ -716,7 +724,15 @@ startStream:
if re, _ := msg.Metadata["reasoning_effort"].(string); strings.TrimSpace(re) != "" {
chatReq.ReasoningEffort = strings.TrimSpace(re)
}
chunkCh, streamErrCh := p.runner.StreamChat(ctx, chatReq)
// Create a cancellable context so /stop can abort the stream.
streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel()
streamKey := strings.TrimSpace(identity.BotID) + ":" + strings.TrimSpace(resolved.RouteID)
p.activeStreams.Store(streamKey, streamCancel)
defer p.activeStreams.Delete(streamKey)
chunkCh, streamErrCh := p.runner.StreamChat(streamCtx, chatReq)
var (
finalMessages []conversation.ModelMessage
@@ -2487,6 +2503,88 @@ func (p *ChannelInboundProcessor) enrichConversationAvatar(ctx context.Context,
}
}
// isStopCommand returns true when the command text is "/stop" (with
// optional Telegram-style @botname suffix and trailing whitespace).
func isStopCommand(cmdText string) bool {
extracted := command.ExtractCommandText(cmdText)
if extracted == "" {
return false
}
parsed, err := command.Parse(extracted)
if err != nil {
return false
}
return parsed.Resource == "stop"
}
// handleStopCommand resolves the route for the current conversation and
// cancels any active agent stream, effectively aborting the generation.
func (p *ChannelInboundProcessor) handleStopCommand(
ctx context.Context,
cfg channel.ChannelConfig,
msg channel.InboundMessage,
sender channel.StreamReplySender,
identity InboundIdentity,
) error {
target := strings.TrimSpace(msg.ReplyTarget)
if target == "" {
return errors.New("reply target missing for /stop command")
}
if p.routeResolver == nil {
return sender.Send(ctx, channel.OutboundMessage{
Target: target,
Message: channel.Message{Text: "Error: route resolver not configured."},
})
}
threadID := extractThreadID(msg)
routeMetadata := buildRouteMetadata(msg, identity)
p.enrichConversationAvatar(ctx, cfg, msg, routeMetadata)
resolved, err := p.routeResolver.ResolveConversation(ctx, route.ResolveInput{
BotID: identity.BotID,
Platform: msg.Channel.String(),
ConversationID: msg.Conversation.ID,
ThreadID: threadID,
ConversationType: msg.Conversation.Type,
ChannelIdentityID: identity.UserID,
ChannelConfigID: identity.ChannelConfigID,
ReplyTarget: target,
Metadata: routeMetadata,
})
if err != nil {
if p.logger != nil {
p.logger.Warn("resolve route for /stop command failed", slog.Any("error", err))
}
return sender.Send(ctx, channel.OutboundMessage{
Target: target,
Message: channel.Message{Text: "Error: failed to resolve conversation route."},
})
}
streamKey := strings.TrimSpace(identity.BotID) + ":" + strings.TrimSpace(resolved.RouteID)
cancelVal, loaded := p.activeStreams.LoadAndDelete(streamKey)
if !loaded {
// No active stream — silently ignore.
return nil
}
cancelFn, ok := cancelVal.(context.CancelFunc)
if !ok {
return nil
}
cancelFn()
if p.logger != nil {
p.logger.Info("agent stream aborted via /stop command",
slog.String("bot_id", strings.TrimSpace(identity.BotID)),
slog.String("route_id", strings.TrimSpace(resolved.RouteID)),
slog.String("channel", msg.Channel.String()),
)
}
return nil
}
// isNewSessionCommand returns true when the command text is "/new" (with
// optional Telegram-style @botname suffix and trailing whitespace).
func isNewSessionCommand(cmdText string) bool {
+2 -1
View File
@@ -112,7 +112,8 @@ func NewHandler(
// the regular resource-group dispatch (e.g. in the channel inbound
// processor which has the required routing context).
var topLevelCommands = map[string]string{
"new": "Start a new conversation (resets session context)",
"new": "Start a new conversation (resets session context)",
"stop": "Stop the current generation",
}
// IsCommand reports whether the text contains a slash command.
+2 -1
View File
@@ -81,7 +81,8 @@ func (r *Registry) GlobalHelp() string {
var b strings.Builder
b.WriteString("Available commands:\n\n")
b.WriteString("/help - Show this help message\n")
b.WriteString("/new - Start a new conversation (resets session context)\n\n")
b.WriteString("/new - Start a new conversation (resets session context)\n")
b.WriteString("/stop - Stop the current generation\n\n")
for i, name := range r.order {
if i > 0 {
b.WriteByte('\n')