5595 lines
208 KiB
TypeScript
5595 lines
208 KiB
TypeScript
// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
|
|
import { feature } from 'bun:bundle'
|
|
import { readFile, stat } from 'fs/promises'
|
|
import { dirname } from 'path'
|
|
import {
|
|
downloadUserSettings,
|
|
redownloadUserSettings,
|
|
} from 'src/services/settingsSync/index.js'
|
|
import { waitForRemoteManagedSettingsToLoad } from 'src/services/remoteManagedSettings/index.js'
|
|
import { StructuredIO } from 'src/cli/structuredIO.js'
|
|
import { RemoteIO } from 'src/cli/remoteIO.js'
|
|
import {
|
|
type Command,
|
|
formatDescriptionWithSource,
|
|
getCommandName,
|
|
} from 'src/commands.js'
|
|
import { createStreamlinedTransformer } from 'src/utils/streamlinedTransform.js'
|
|
import { installStreamJsonStdoutGuard } from 'src/utils/streamJsonStdoutGuard.js'
|
|
import type { ToolPermissionContext } from 'src/Tool.js'
|
|
import type { ThinkingConfig } from 'src/utils/thinking.js'
|
|
import { assembleToolPool, filterToolsByDenyRules } from 'src/tools.js'
|
|
import uniqBy from 'lodash-es/uniqBy.js'
|
|
import { uniq } from 'src/utils/array.js'
|
|
import { mergeAndFilterTools } from 'src/utils/toolPool.js'
|
|
import {
|
|
logEvent,
|
|
type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
} from 'src/services/analytics/index.js'
|
|
import { getFeatureValue_CACHED_MAY_BE_STALE } from 'src/services/analytics/growthbook.js'
|
|
import { logForDebugging } from 'src/utils/debug.js'
|
|
import {
|
|
logForDiagnosticsNoPII,
|
|
withDiagnosticsTiming,
|
|
} from 'src/utils/diagLogs.js'
|
|
import { toolMatchesName, type Tool, type Tools } from 'src/Tool.js'
|
|
import {
|
|
type AgentDefinition,
|
|
isBuiltInAgent,
|
|
parseAgentsFromJson,
|
|
} from 'src/tools/AgentTool/loadAgentsDir.js'
|
|
import type { Message, NormalizedUserMessage } from 'src/types/message.js'
|
|
import type { QueuedCommand } from 'src/types/textInputTypes.js'
|
|
import {
|
|
dequeue,
|
|
dequeueAllMatching,
|
|
enqueue,
|
|
hasCommandsInQueue,
|
|
peek,
|
|
subscribeToCommandQueue,
|
|
getCommandsByMaxPriority,
|
|
} from 'src/utils/messageQueueManager.js'
|
|
import { notifyCommandLifecycle } from 'src/utils/commandLifecycle.js'
|
|
import {
|
|
getSessionState,
|
|
notifySessionStateChanged,
|
|
notifySessionMetadataChanged,
|
|
setPermissionModeChangedListener,
|
|
type RequiresActionDetails,
|
|
type SessionExternalMetadata,
|
|
} from 'src/utils/sessionState.js'
|
|
import { externalMetadataToAppState } from 'src/state/onChangeAppState.js'
|
|
import { getInMemoryErrors, logError, logMCPDebug } from 'src/utils/log.js'
|
|
import {
|
|
writeToStdout,
|
|
registerProcessOutputErrorHandlers,
|
|
} from 'src/utils/process.js'
|
|
import type { Stream } from 'src/utils/stream.js'
|
|
import { EMPTY_USAGE } from 'src/services/api/logging.js'
|
|
import {
|
|
loadConversationForResume,
|
|
type TurnInterruptionState,
|
|
} from 'src/utils/conversationRecovery.js'
|
|
import type {
|
|
MCPServerConnection,
|
|
McpSdkServerConfig,
|
|
ScopedMcpServerConfig,
|
|
} from 'src/services/mcp/types.js'
|
|
import {
|
|
ChannelMessageNotificationSchema,
|
|
gateChannelServer,
|
|
wrapChannelMessage,
|
|
findChannelEntry,
|
|
} from 'src/services/mcp/channelNotification.js'
|
|
import {
|
|
isChannelAllowlisted,
|
|
isChannelsEnabled,
|
|
} from 'src/services/mcp/channelAllowlist.js'
|
|
import { parsePluginIdentifier } from 'src/utils/plugins/pluginIdentifier.js'
|
|
import { validateUuid } from 'src/utils/uuid.js'
|
|
import { fromArray } from 'src/utils/generators.js'
|
|
import { ask } from 'src/QueryEngine.js'
|
|
import type { PermissionPromptTool } from 'src/utils/queryHelpers.js'
|
|
import {
|
|
createFileStateCacheWithSizeLimit,
|
|
mergeFileStateCaches,
|
|
READ_FILE_STATE_CACHE_SIZE,
|
|
} from 'src/utils/fileStateCache.js'
|
|
import { expandPath } from 'src/utils/path.js'
|
|
import { extractReadFilesFromMessages } from 'src/utils/queryHelpers.js'
|
|
import { registerHookEventHandler } from 'src/utils/hooks/hookEvents.js'
|
|
import { executeFilePersistence } from 'src/utils/filePersistence/filePersistence.js'
|
|
import { finalizePendingAsyncHooks } from 'src/utils/hooks/AsyncHookRegistry.js'
|
|
import {
|
|
gracefulShutdown,
|
|
gracefulShutdownSync,
|
|
isShuttingDown,
|
|
} from 'src/utils/gracefulShutdown.js'
|
|
import { registerCleanup } from 'src/utils/cleanupRegistry.js'
|
|
import { createIdleTimeoutManager } from 'src/utils/idleTimeout.js'
|
|
import type {
|
|
SDKStatus,
|
|
ModelInfo,
|
|
SDKMessage,
|
|
SDKUserMessage,
|
|
SDKUserMessageReplay,
|
|
PermissionResult,
|
|
McpServerConfigForProcessTransport,
|
|
McpServerStatus,
|
|
RewindFilesResult,
|
|
} from 'src/entrypoints/agentSdkTypes.js'
|
|
import type {
|
|
StdoutMessage,
|
|
SDKControlInitializeRequest,
|
|
SDKControlInitializeResponse,
|
|
SDKControlRequest,
|
|
SDKControlResponse,
|
|
SDKControlMcpSetServersResponse,
|
|
SDKControlReloadPluginsResponse,
|
|
} from 'src/entrypoints/sdk/controlTypes.js'
|
|
import type { PermissionMode } from '@anthropic-ai/claude-agent-sdk'
|
|
import type { PermissionMode as InternalPermissionMode } from 'src/types/permissions.js'
|
|
import { cwd } from 'process'
|
|
import { getCwd } from 'src/utils/cwd.js'
|
|
import omit from 'lodash-es/omit.js'
|
|
import reject from 'lodash-es/reject.js'
|
|
import { isPolicyAllowed } from 'src/services/policyLimits/index.js'
|
|
import type { ReplBridgeHandle } from 'src/bridge/replBridge.js'
|
|
import { getRemoteSessionUrl } from 'src/constants/product.js'
|
|
import { buildBridgeConnectUrl } from 'src/bridge/bridgeStatusUtil.js'
|
|
import { extractInboundMessageFields } from 'src/bridge/inboundMessages.js'
|
|
import { resolveAndPrepend } from 'src/bridge/inboundAttachments.js'
|
|
import type { CanUseToolFn } from 'src/hooks/useCanUseTool.js'
|
|
import { hasPermissionsToUseTool } from 'src/utils/permissions/permissions.js'
|
|
import { safeParseJSON } from 'src/utils/json.js'
|
|
import {
|
|
outputSchema as permissionToolOutputSchema,
|
|
permissionPromptToolResultToPermissionDecision,
|
|
} from 'src/utils/permissions/PermissionPromptToolResultSchema.js'
|
|
import { createAbortController } from 'src/utils/abortController.js'
|
|
import { createCombinedAbortSignal } from 'src/utils/combinedAbortSignal.js'
|
|
import { generateSessionTitle } from 'src/utils/sessionTitle.js'
|
|
import { buildSideQuestionFallbackParams } from 'src/utils/queryContext.js'
|
|
import { runSideQuestion } from 'src/utils/sideQuestion.js'
|
|
import {
|
|
processSessionStartHooks,
|
|
processSetupHooks,
|
|
takeInitialUserMessage,
|
|
} from 'src/utils/sessionStart.js'
|
|
import {
|
|
DEFAULT_OUTPUT_STYLE_NAME,
|
|
getAllOutputStyles,
|
|
} from 'src/constants/outputStyles.js'
|
|
import { TEAMMATE_MESSAGE_TAG, TICK_TAG } from 'src/constants/xml.js'
|
|
import {
|
|
getSettings_DEPRECATED,
|
|
getSettingsWithSources,
|
|
} from 'src/utils/settings/settings.js'
|
|
import { settingsChangeDetector } from 'src/utils/settings/changeDetector.js'
|
|
import { applySettingsChange } from 'src/utils/settings/applySettingsChange.js'
|
|
import {
|
|
isFastModeAvailable,
|
|
isFastModeEnabled,
|
|
isFastModeSupportedByModel,
|
|
getFastModeState,
|
|
} from 'src/utils/fastMode.js'
|
|
import {
|
|
isAutoModeGateEnabled,
|
|
getAutoModeUnavailableNotification,
|
|
getAutoModeUnavailableReason,
|
|
isBypassPermissionsModeDisabled,
|
|
transitionPermissionMode,
|
|
} from 'src/utils/permissions/permissionSetup.js'
|
|
import {
|
|
tryGenerateSuggestion,
|
|
logSuggestionOutcome,
|
|
logSuggestionSuppressed,
|
|
type PromptVariant,
|
|
} from 'src/services/PromptSuggestion/promptSuggestion.js'
|
|
import { getLastCacheSafeParams } from 'src/utils/forkedAgent.js'
|
|
import { getAccountInformation } from 'src/utils/auth.js'
|
|
import { OAuthService } from 'src/services/oauth/index.js'
|
|
import { installOAuthTokens } from 'src/cli/handlers/auth.js'
|
|
import { getAPIProvider } from 'src/utils/model/providers.js'
|
|
import type { HookCallbackMatcher } from 'src/types/hooks.js'
|
|
import { AwsAuthStatusManager } from 'src/utils/awsAuthStatusManager.js'
|
|
import type { HookEvent } from 'src/entrypoints/agentSdkTypes.js'
|
|
import {
|
|
registerHookCallbacks,
|
|
setInitJsonSchema,
|
|
getInitJsonSchema,
|
|
setSdkAgentProgressSummariesEnabled,
|
|
} from 'src/bootstrap/state.js'
|
|
import { createSyntheticOutputTool } from 'src/tools/SyntheticOutputTool/SyntheticOutputTool.js'
|
|
import { parseSessionIdentifier } from 'src/utils/sessionUrl.js'
|
|
import {
|
|
hydrateRemoteSession,
|
|
hydrateFromCCRv2InternalEvents,
|
|
resetSessionFilePointer,
|
|
doesMessageExistInSession,
|
|
findUnresolvedToolUse,
|
|
recordAttributionSnapshot,
|
|
saveAgentSetting,
|
|
saveMode,
|
|
saveAiGeneratedTitle,
|
|
restoreSessionMetadata,
|
|
} from 'src/utils/sessionStorage.js'
|
|
import { incrementPromptCount } from 'src/utils/commitAttribution.js'
|
|
import {
|
|
setupSdkMcpClients,
|
|
connectToServer,
|
|
clearServerCache,
|
|
fetchToolsForClient,
|
|
areMcpConfigsEqual,
|
|
reconnectMcpServerImpl,
|
|
} from 'src/services/mcp/client.js'
|
|
import {
|
|
filterMcpServersByPolicy,
|
|
getMcpConfigByName,
|
|
isMcpServerDisabled,
|
|
setMcpServerEnabled,
|
|
} from 'src/services/mcp/config.js'
|
|
import {
|
|
performMCPOAuthFlow,
|
|
revokeServerTokens,
|
|
} from 'src/services/mcp/auth.js'
|
|
import {
|
|
runElicitationHooks,
|
|
runElicitationResultHooks,
|
|
} from 'src/services/mcp/elicitationHandler.js'
|
|
import { executeNotificationHooks } from 'src/utils/hooks.js'
|
|
import {
|
|
ElicitRequestSchema,
|
|
ElicitationCompleteNotificationSchema,
|
|
} from '@modelcontextprotocol/sdk/types.js'
|
|
import { getMcpPrefix } from 'src/services/mcp/mcpStringUtils.js'
|
|
import {
|
|
commandBelongsToServer,
|
|
filterToolsByServer,
|
|
} from 'src/services/mcp/utils.js'
|
|
import { setupVscodeSdkMcp } from 'src/services/mcp/vscodeSdkMcp.js'
|
|
import { getAllMcpConfigs } from 'src/services/mcp/config.js'
|
|
import {
|
|
isQualifiedForGrove,
|
|
checkGroveForNonInteractive,
|
|
} from 'src/services/api/grove.js'
|
|
import {
|
|
toInternalMessages,
|
|
toSDKRateLimitInfo,
|
|
} from 'src/utils/messages/mappers.js'
|
|
import { createModelSwitchBreadcrumbs } from 'src/utils/messages.js'
|
|
import { collectContextData } from 'src/commands/context/context-noninteractive.js'
|
|
import { LOCAL_COMMAND_STDOUT_TAG } from 'src/constants/xml.js'
|
|
import {
|
|
statusListeners,
|
|
type ClaudeAILimits,
|
|
} from 'src/services/claudeAiLimits.js'
|
|
import {
|
|
getDefaultMainLoopModel,
|
|
getMainLoopModel,
|
|
modelDisplayString,
|
|
parseUserSpecifiedModel,
|
|
} from 'src/utils/model/model.js'
|
|
import { getModelOptions } from 'src/utils/model/modelOptions.js'
|
|
import {
|
|
modelSupportsEffort,
|
|
modelSupportsMaxEffort,
|
|
EFFORT_LEVELS,
|
|
resolveAppliedEffort,
|
|
} from 'src/utils/effort.js'
|
|
import { modelSupportsAdaptiveThinking } from 'src/utils/thinking.js'
|
|
import { modelSupportsAutoMode } from 'src/utils/betas.js'
|
|
import { ensureModelStringsInitialized } from 'src/utils/model/modelStrings.js'
|
|
import {
|
|
getSessionId,
|
|
setMainLoopModelOverride,
|
|
setMainThreadAgentType,
|
|
switchSession,
|
|
isSessionPersistenceDisabled,
|
|
getIsRemoteMode,
|
|
getFlagSettingsInline,
|
|
setFlagSettingsInline,
|
|
getMainThreadAgentType,
|
|
getAllowedChannels,
|
|
setAllowedChannels,
|
|
type ChannelEntry,
|
|
} from 'src/bootstrap/state.js'
|
|
import { runWithWorkload, WORKLOAD_CRON } from 'src/utils/workloadContext.js'
|
|
import type { UUID } from 'crypto'
|
|
import { randomUUID } from 'crypto'
|
|
import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
|
|
import type { AppState } from 'src/state/AppStateStore.js'
|
|
import {
|
|
fileHistoryRewind,
|
|
fileHistoryCanRestore,
|
|
fileHistoryEnabled,
|
|
fileHistoryGetDiffStats,
|
|
} from 'src/utils/fileHistory.js'
|
|
import {
|
|
restoreAgentFromSession,
|
|
restoreSessionStateFromLog,
|
|
} from 'src/utils/sessionRestore.js'
|
|
import { SandboxManager } from 'src/utils/sandbox/sandbox-adapter.js'
|
|
import {
|
|
headlessProfilerStartTurn,
|
|
headlessProfilerCheckpoint,
|
|
logHeadlessProfilerTurn,
|
|
} from 'src/utils/headlessProfiler.js'
|
|
import {
|
|
startQueryProfile,
|
|
logQueryProfileReport,
|
|
} from 'src/utils/queryProfiler.js'
|
|
import { asSessionId } from 'src/types/ids.js'
|
|
import { jsonStringify } from '../utils/slowOperations.js'
|
|
import { skillChangeDetector } from '../utils/skills/skillChangeDetector.js'
|
|
import { getCommands, clearCommandsCache } from '../commands.js'
|
|
import {
|
|
isBareMode,
|
|
isEnvTruthy,
|
|
isEnvDefinedFalsy,
|
|
} from '../utils/envUtils.js'
|
|
import { installPluginsForHeadless } from '../utils/plugins/headlessPluginInstall.js'
|
|
import { refreshActivePlugins } from '../utils/plugins/refresh.js'
|
|
import { loadAllPluginsCacheOnly } from '../utils/plugins/pluginLoader.js'
|
|
import {
|
|
isTeamLead,
|
|
hasActiveInProcessTeammates,
|
|
hasWorkingInProcessTeammates,
|
|
waitForTeammatesToBecomeIdle,
|
|
} from '../utils/teammate.js'
|
|
import {
|
|
readUnreadMessages,
|
|
markMessagesAsRead,
|
|
isShutdownApproved,
|
|
} from '../utils/teammateMailbox.js'
|
|
import { removeTeammateFromTeamFile } from '../utils/swarm/teamHelpers.js'
|
|
import { unassignTeammateTasks } from '../utils/tasks.js'
|
|
import { getRunningTasks } from '../utils/task/framework.js'
|
|
import { isBackgroundTask } from '../tasks/types.js'
|
|
import { stopTask } from '../tasks/stopTask.js'
|
|
import { drainSdkEvents } from '../utils/sdkEventQueue.js'
|
|
import { initializeGrowthBook } from '../services/analytics/growthbook.js'
|
|
import { errorMessage, toError } from '../utils/errors.js'
|
|
import { sleep } from '../utils/sleep.js'
|
|
import { isExtractModeActive } from '../memdir/paths.js'
|
|
|
|
// Dead code elimination: conditional imports
|
|
/* eslint-disable @typescript-eslint/no-require-imports */
|
|
const coordinatorModeModule = feature('COORDINATOR_MODE')
|
|
? (require('../coordinator/coordinatorMode.js') as typeof import('../coordinator/coordinatorMode.js'))
|
|
: null
|
|
const proactiveModule =
|
|
feature('PROACTIVE') || feature('KAIROS')
|
|
? (require('../proactive/index.js') as typeof import('../proactive/index.js'))
|
|
: null
|
|
const cronSchedulerModule = feature('AGENT_TRIGGERS')
|
|
? (require('../utils/cronScheduler.js') as typeof import('../utils/cronScheduler.js'))
|
|
: null
|
|
const cronJitterConfigModule = feature('AGENT_TRIGGERS')
|
|
? (require('../utils/cronJitterConfig.js') as typeof import('../utils/cronJitterConfig.js'))
|
|
: null
|
|
const cronGate = feature('AGENT_TRIGGERS')
|
|
? (require('../tools/ScheduleCronTool/prompt.js') as typeof import('../tools/ScheduleCronTool/prompt.js'))
|
|
: null
|
|
const extractMemoriesModule = feature('EXTRACT_MEMORIES')
|
|
? (require('../services/extractMemories/extractMemories.js') as typeof import('../services/extractMemories/extractMemories.js'))
|
|
: null
|
|
/* eslint-enable @typescript-eslint/no-require-imports */
|
|
|
|
const SHUTDOWN_TEAM_PROMPT = `<system-reminder>
|
|
You are running in non-interactive mode and cannot return a response to the user until your team is shut down.
|
|
|
|
You MUST shut down your team before preparing your final response:
|
|
1. Use requestShutdown to ask each team member to shut down gracefully
|
|
2. Wait for shutdown approvals
|
|
3. Use the cleanup operation to clean up the team
|
|
4. Only then provide your final response to the user
|
|
|
|
The user cannot receive your response until the team is completely shut down.
|
|
</system-reminder>
|
|
|
|
Shut down your team and prepare your final response for the user.`
|
|
|
|
// Track message UUIDs received during the current session runtime
|
|
const MAX_RECEIVED_UUIDS = 10_000
|
|
const receivedMessageUuids = new Set<UUID>()
|
|
const receivedMessageUuidsOrder: UUID[] = []
|
|
|
|
function trackReceivedMessageUuid(uuid: UUID): boolean {
|
|
if (receivedMessageUuids.has(uuid)) {
|
|
return false // duplicate
|
|
}
|
|
receivedMessageUuids.add(uuid)
|
|
receivedMessageUuidsOrder.push(uuid)
|
|
// Evict oldest entries when at capacity
|
|
if (receivedMessageUuidsOrder.length > MAX_RECEIVED_UUIDS) {
|
|
const toEvict = receivedMessageUuidsOrder.splice(
|
|
0,
|
|
receivedMessageUuidsOrder.length - MAX_RECEIVED_UUIDS,
|
|
)
|
|
for (const old of toEvict) {
|
|
receivedMessageUuids.delete(old)
|
|
}
|
|
}
|
|
return true // new UUID
|
|
}
|
|
|
|
type PromptValue = string | ContentBlockParam[]
|
|
|
|
function toBlocks(v: PromptValue): ContentBlockParam[] {
|
|
return typeof v === 'string' ? [{ type: 'text', text: v }] : v
|
|
}
|
|
|
|
/**
|
|
* Join prompt values from multiple queued commands into one. Strings are
|
|
* newline-joined; if any value is a block array, all values are normalized
|
|
* to blocks and concatenated.
|
|
*/
|
|
export function joinPromptValues(values: PromptValue[]): PromptValue {
|
|
if (values.length === 1) return values[0]!
|
|
if (values.every(v => typeof v === 'string')) {
|
|
return values.join('\n')
|
|
}
|
|
return values.flatMap(toBlocks)
|
|
}
|
|
|
|
/**
|
|
* Whether `next` can be batched into the same ask() call as `head`. Only
|
|
* prompt-mode commands batch, and only when the workload tag matches (so the
|
|
* combined turn is attributed correctly) and the isMeta flag matches (so a
|
|
* proactive tick can't merge into a user prompt and lose its hidden-in-
|
|
* transcript marking when the head is spread over the merged command).
|
|
*/
|
|
export function canBatchWith(
|
|
head: QueuedCommand,
|
|
next: QueuedCommand | undefined,
|
|
): boolean {
|
|
return (
|
|
next !== undefined &&
|
|
next.mode === 'prompt' &&
|
|
next.workload === head.workload &&
|
|
next.isMeta === head.isMeta
|
|
)
|
|
}
|
|
|
|
export async function runHeadless(
|
|
inputPrompt: string | AsyncIterable<string>,
|
|
getAppState: () => AppState,
|
|
setAppState: (f: (prev: AppState) => AppState) => void,
|
|
commands: Command[],
|
|
tools: Tools,
|
|
sdkMcpConfigs: Record<string, McpSdkServerConfig>,
|
|
agents: AgentDefinition[],
|
|
options: {
|
|
continue: boolean | undefined
|
|
resume: string | boolean | undefined
|
|
resumeSessionAt: string | undefined
|
|
verbose: boolean | undefined
|
|
outputFormat: string | undefined
|
|
jsonSchema: Record<string, unknown> | undefined
|
|
permissionPromptToolName: string | undefined
|
|
allowedTools: string[] | undefined
|
|
thinkingConfig: ThinkingConfig | undefined
|
|
maxTurns: number | undefined
|
|
maxBudgetUsd: number | undefined
|
|
taskBudget: { total: number } | undefined
|
|
systemPrompt: string | undefined
|
|
appendSystemPrompt: string | undefined
|
|
userSpecifiedModel: string | undefined
|
|
fallbackModel: string | undefined
|
|
teleport: string | true | null | undefined
|
|
sdkUrl: string | undefined
|
|
replayUserMessages: boolean | undefined
|
|
includePartialMessages: boolean | undefined
|
|
forkSession: boolean | undefined
|
|
rewindFiles: string | undefined
|
|
enableAuthStatus: boolean | undefined
|
|
agent: string | undefined
|
|
workload: string | undefined
|
|
setupTrigger?: 'init' | 'maintenance' | undefined
|
|
sessionStartHooksPromise?: ReturnType<typeof processSessionStartHooks>
|
|
setSDKStatus?: (status: SDKStatus) => void
|
|
},
|
|
): Promise<void> {
|
|
if (
|
|
process.env.USER_TYPE === 'ant' &&
|
|
isEnvTruthy(process.env.CLAUDE_CODE_EXIT_AFTER_FIRST_RENDER)
|
|
) {
|
|
process.stderr.write(
|
|
`\nStartup time: ${Math.round(process.uptime() * 1000)}ms\n`,
|
|
)
|
|
// eslint-disable-next-line custom-rules/no-process-exit
|
|
process.exit(0)
|
|
}
|
|
|
|
// Fire user settings download now so it overlaps with the MCP/tool setup
|
|
// below. Managed settings already started in main.tsx preAction; this gives
|
|
// user settings a similar head start. The cached promise is joined in
|
|
// installPluginsAndApplyMcpInBackground before plugin install reads
|
|
// enabledPlugins.
|
|
if (
|
|
feature('DOWNLOAD_USER_SETTINGS') &&
|
|
(isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) || getIsRemoteMode())
|
|
) {
|
|
void downloadUserSettings()
|
|
}
|
|
|
|
// In headless mode there is no React tree, so the useSettingsChange hook
|
|
// never runs. Subscribe directly so that settings changes (including
|
|
// managed-settings / policy updates) are fully applied.
|
|
settingsChangeDetector.subscribe(source => {
|
|
applySettingsChange(source, setAppState)
|
|
|
|
// In headless mode, also sync the denormalized fastMode field from
|
|
// settings. The TUI manages fastMode via the UI so it skips this.
|
|
if (isFastModeEnabled()) {
|
|
setAppState(prev => {
|
|
const s = prev.settings as Record<string, unknown>
|
|
const fastMode = s.fastMode === true && !s.fastModePerSessionOptIn
|
|
return { ...prev, fastMode }
|
|
})
|
|
}
|
|
})
|
|
|
|
// Proactive activation is now handled in main.tsx before getTools() so
|
|
// SleepTool passes isEnabled() filtering. This fallback covers the case
|
|
// where CLAUDE_CODE_PROACTIVE is set but main.tsx's check didn't fire
|
|
// (e.g. env was injected by the SDK transport after argv parsing).
|
|
if (
|
|
(feature('PROACTIVE') || feature('KAIROS')) &&
|
|
proactiveModule &&
|
|
!proactiveModule.isProactiveActive() &&
|
|
isEnvTruthy(process.env.CLAUDE_CODE_PROACTIVE)
|
|
) {
|
|
proactiveModule.activateProactive('command')
|
|
}
|
|
|
|
// Periodically force a full GC to keep memory usage in check
|
|
if (typeof Bun !== 'undefined') {
|
|
const gcTimer = setInterval(Bun.gc, 1000)
|
|
gcTimer.unref()
|
|
}
|
|
|
|
// Start headless profiler for first turn
|
|
headlessProfilerStartTurn()
|
|
headlessProfilerCheckpoint('runHeadless_entry')
|
|
|
|
// Check Grove requirements for non-interactive consumer subscribers
|
|
if (await isQualifiedForGrove()) {
|
|
await checkGroveForNonInteractive()
|
|
}
|
|
headlessProfilerCheckpoint('after_grove_check')
|
|
|
|
// Initialize GrowthBook so feature flags take effect in headless mode.
|
|
// Without this, the disk cache is empty and all flags fall back to defaults.
|
|
void initializeGrowthBook()
|
|
|
|
if (options.resumeSessionAt && !options.resume) {
|
|
process.stderr.write(`Error: --resume-session-at requires --resume\n`)
|
|
gracefulShutdownSync(1)
|
|
return
|
|
}
|
|
|
|
if (options.rewindFiles && !options.resume) {
|
|
process.stderr.write(`Error: --rewind-files requires --resume\n`)
|
|
gracefulShutdownSync(1)
|
|
return
|
|
}
|
|
|
|
if (options.rewindFiles && inputPrompt) {
|
|
process.stderr.write(
|
|
`Error: --rewind-files is a standalone operation and cannot be used with a prompt\n`,
|
|
)
|
|
gracefulShutdownSync(1)
|
|
return
|
|
}
|
|
|
|
const structuredIO = getStructuredIO(inputPrompt, options)
|
|
|
|
// When emitting NDJSON for SDK clients, any stray write to stdout (debug
|
|
// prints, dependency console.log, library banners) breaks the client's
|
|
// line-by-line JSON parser. Install a guard that diverts non-JSON lines to
|
|
// stderr so the stream stays clean. Must run before the first
|
|
// structuredIO.write below.
|
|
if (options.outputFormat === 'stream-json') {
|
|
installStreamJsonStdoutGuard()
|
|
}
|
|
|
|
// #34044: if user explicitly set sandbox.enabled=true but deps are missing,
|
|
// isSandboxingEnabled() returns false silently. Surface the reason so users
|
|
// know their security config isn't being enforced.
|
|
const sandboxUnavailableReason = SandboxManager.getSandboxUnavailableReason()
|
|
if (sandboxUnavailableReason) {
|
|
if (SandboxManager.isSandboxRequired()) {
|
|
process.stderr.write(
|
|
`\nError: sandbox required but unavailable: ${sandboxUnavailableReason}\n` +
|
|
` sandbox.failIfUnavailable is set — refusing to start without a working sandbox.\n\n`,
|
|
)
|
|
gracefulShutdownSync(1)
|
|
return
|
|
}
|
|
process.stderr.write(
|
|
`\n⚠ Sandbox disabled: ${sandboxUnavailableReason}\n` +
|
|
` Commands will run WITHOUT sandboxing. Network and filesystem restrictions will NOT be enforced.\n\n`,
|
|
)
|
|
} else if (SandboxManager.isSandboxingEnabled()) {
|
|
// Initialize sandbox with a callback that forwards network permission
|
|
// requests to the SDK host via the can_use_tool control_request protocol.
|
|
// This must happen after structuredIO is created so we can send requests.
|
|
try {
|
|
await SandboxManager.initialize(structuredIO.createSandboxAskCallback())
|
|
} catch (err) {
|
|
process.stderr.write(`\n❌ Sandbox Error: ${errorMessage(err)}\n`)
|
|
gracefulShutdownSync(1, 'other')
|
|
return
|
|
}
|
|
}
|
|
|
|
if (options.outputFormat === 'stream-json' && options.verbose) {
|
|
registerHookEventHandler(event => {
|
|
const message: StdoutMessage = (() => {
|
|
switch (event.type) {
|
|
case 'started':
|
|
return {
|
|
type: 'system' as const,
|
|
subtype: 'hook_started' as const,
|
|
hook_id: event.hookId,
|
|
hook_name: event.hookName,
|
|
hook_event: event.hookEvent,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
}
|
|
case 'progress':
|
|
return {
|
|
type: 'system' as const,
|
|
subtype: 'hook_progress' as const,
|
|
hook_id: event.hookId,
|
|
hook_name: event.hookName,
|
|
hook_event: event.hookEvent,
|
|
stdout: event.stdout,
|
|
stderr: event.stderr,
|
|
output: event.output,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
}
|
|
case 'response':
|
|
return {
|
|
type: 'system' as const,
|
|
subtype: 'hook_response' as const,
|
|
hook_id: event.hookId,
|
|
hook_name: event.hookName,
|
|
hook_event: event.hookEvent,
|
|
output: event.output,
|
|
stdout: event.stdout,
|
|
stderr: event.stderr,
|
|
exit_code: event.exitCode,
|
|
outcome: event.outcome,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
}
|
|
}
|
|
})()
|
|
void structuredIO.write(message)
|
|
})
|
|
}
|
|
|
|
if (options.setupTrigger) {
|
|
await processSetupHooks(options.setupTrigger)
|
|
}
|
|
|
|
headlessProfilerCheckpoint('before_loadInitialMessages')
|
|
const appState = getAppState()
|
|
const {
|
|
messages: initialMessages,
|
|
turnInterruptionState,
|
|
agentSetting: resumedAgentSetting,
|
|
} = await loadInitialMessages(setAppState, {
|
|
continue: options.continue,
|
|
teleport: options.teleport,
|
|
resume: options.resume,
|
|
resumeSessionAt: options.resumeSessionAt,
|
|
forkSession: options.forkSession,
|
|
outputFormat: options.outputFormat,
|
|
sessionStartHooksPromise: options.sessionStartHooksPromise,
|
|
restoredWorkerState: structuredIO.restoredWorkerState,
|
|
})
|
|
|
|
// SessionStart hooks can emit initialUserMessage — the first user turn for
|
|
// headless orchestrator sessions where stdin is empty and additionalContext
|
|
// alone (an attachment, not a turn) would leave the REPL with nothing to
|
|
// respond to. The hook promise is awaited inside loadInitialMessages, so the
|
|
// module-level pending value is set by the time we get here.
|
|
const hookInitialUserMessage = takeInitialUserMessage()
|
|
if (hookInitialUserMessage) {
|
|
structuredIO.prependUserMessage(hookInitialUserMessage)
|
|
}
|
|
|
|
// Restore agent setting from the resumed session (if not overridden by current --agent flag
|
|
// or settings-based agent, which would already have set mainThreadAgentType in main.tsx)
|
|
if (!options.agent && !getMainThreadAgentType() && resumedAgentSetting) {
|
|
const { agentDefinition: restoredAgent } = restoreAgentFromSession(
|
|
resumedAgentSetting,
|
|
undefined,
|
|
{ activeAgents: agents, allAgents: agents },
|
|
)
|
|
if (restoredAgent) {
|
|
setAppState(prev => ({ ...prev, agent: restoredAgent.agentType }))
|
|
// Apply the agent's system prompt for non-built-in agents (mirrors main.tsx initial --agent path)
|
|
if (!options.systemPrompt && !isBuiltInAgent(restoredAgent)) {
|
|
const agentSystemPrompt = restoredAgent.getSystemPrompt()
|
|
if (agentSystemPrompt) {
|
|
options.systemPrompt = agentSystemPrompt
|
|
}
|
|
}
|
|
// Re-persist agent setting so future resumes maintain the agent
|
|
saveAgentSetting(restoredAgent.agentType)
|
|
}
|
|
}
|
|
|
|
// gracefulShutdownSync schedules an async shutdown and sets process.exitCode.
|
|
// If a loadInitialMessages error path triggered it, bail early to avoid
|
|
// unnecessary work while the process winds down.
|
|
if (initialMessages.length === 0 && process.exitCode !== undefined) {
|
|
return
|
|
}
|
|
|
|
// Handle --rewind-files: restore filesystem and exit immediately
|
|
if (options.rewindFiles) {
|
|
// File history snapshots are only created for user messages,
|
|
// so we require the target to be a user message
|
|
const targetMessage = initialMessages.find(
|
|
m => m.uuid === options.rewindFiles,
|
|
)
|
|
|
|
if (!targetMessage || targetMessage.type !== 'user') {
|
|
process.stderr.write(
|
|
`Error: --rewind-files requires a user message UUID, but ${options.rewindFiles} is not a user message in this session\n`,
|
|
)
|
|
gracefulShutdownSync(1)
|
|
return
|
|
}
|
|
|
|
const currentAppState = getAppState()
|
|
const result = await handleRewindFiles(
|
|
options.rewindFiles as UUID,
|
|
currentAppState,
|
|
setAppState,
|
|
false,
|
|
)
|
|
if (!result.canRewind) {
|
|
process.stderr.write(`Error: ${result.error || 'Unexpected error'}\n`)
|
|
gracefulShutdownSync(1)
|
|
return
|
|
}
|
|
|
|
// Rewind complete - exit successfully
|
|
process.stdout.write(
|
|
`Files rewound to state at message ${options.rewindFiles}\n`,
|
|
)
|
|
gracefulShutdownSync(0)
|
|
return
|
|
}
|
|
|
|
// Check if we need input prompt - skip if we're resuming with a valid session ID/JSONL file or using SDK URL
|
|
const hasValidResumeSessionId =
|
|
typeof options.resume === 'string' &&
|
|
(Boolean(validateUuid(options.resume)) || options.resume.endsWith('.jsonl'))
|
|
const isUsingSdkUrl = Boolean(options.sdkUrl)
|
|
|
|
if (!inputPrompt && !hasValidResumeSessionId && !isUsingSdkUrl) {
|
|
process.stderr.write(
|
|
`Error: Input must be provided either through stdin or as a prompt argument when using --print\n`,
|
|
)
|
|
gracefulShutdownSync(1)
|
|
return
|
|
}
|
|
|
|
if (options.outputFormat === 'stream-json' && !options.verbose) {
|
|
process.stderr.write(
|
|
'Error: When using --print, --output-format=stream-json requires --verbose\n',
|
|
)
|
|
gracefulShutdownSync(1)
|
|
return
|
|
}
|
|
|
|
// Filter out MCP tools that are in the deny list
|
|
const allowedMcpTools = filterToolsByDenyRules(
|
|
appState.mcp.tools,
|
|
appState.toolPermissionContext,
|
|
)
|
|
let filteredTools = [...tools, ...allowedMcpTools]
|
|
|
|
// When using SDK URL, always use stdio permission prompting to delegate to the SDK
|
|
const effectivePermissionPromptToolName = options.sdkUrl
|
|
? 'stdio'
|
|
: options.permissionPromptToolName
|
|
|
|
// Callback for when a permission prompt is shown
|
|
const onPermissionPrompt = (details: RequiresActionDetails) => {
|
|
if (feature('COMMIT_ATTRIBUTION')) {
|
|
setAppState(prev => ({
|
|
...prev,
|
|
attribution: {
|
|
...prev.attribution,
|
|
permissionPromptCount: prev.attribution.permissionPromptCount + 1,
|
|
},
|
|
}))
|
|
}
|
|
notifySessionStateChanged('requires_action', details)
|
|
}
|
|
|
|
const canUseTool = getCanUseToolFn(
|
|
effectivePermissionPromptToolName,
|
|
structuredIO,
|
|
() => getAppState().mcp.tools,
|
|
onPermissionPrompt,
|
|
)
|
|
if (options.permissionPromptToolName) {
|
|
// Remove the permission prompt tool from the list of available tools.
|
|
filteredTools = filteredTools.filter(
|
|
tool => !toolMatchesName(tool, options.permissionPromptToolName!),
|
|
)
|
|
}
|
|
|
|
// Install errors handlers to gracefully handle broken pipes (e.g., when parent process dies)
|
|
registerProcessOutputErrorHandlers()
|
|
|
|
headlessProfilerCheckpoint('after_loadInitialMessages')
|
|
|
|
// Ensure model strings are initialized before generating model options.
|
|
// For Bedrock users, this waits for the profile fetch to get correct region strings.
|
|
await ensureModelStringsInitialized()
|
|
headlessProfilerCheckpoint('after_modelStrings')
|
|
|
|
// UDS inbox store registration is deferred until after `run` is defined
|
|
// so we can pass `run` as the onEnqueue callback (see below).
|
|
|
|
// Only `json` + `verbose` needs the full array (jsonStringify(messages) below).
|
|
// For stream-json (SDK/CCR) and default text output, only the last message is
|
|
// read for the exit code / final result. Avoid accumulating every message in
|
|
// memory for the entire session.
|
|
const needsFullArray = options.outputFormat === 'json' && options.verbose
|
|
const messages: SDKMessage[] = []
|
|
let lastMessage: SDKMessage | undefined
|
|
// Streamlined mode transforms messages when CLAUDE_CODE_STREAMLINED_OUTPUT=true and using stream-json
|
|
// Build flag gates this out of external builds; env var is the runtime opt-in for ant builds
|
|
const transformToStreamlined =
|
|
feature('STREAMLINED_OUTPUT') &&
|
|
isEnvTruthy(process.env.CLAUDE_CODE_STREAMLINED_OUTPUT) &&
|
|
options.outputFormat === 'stream-json'
|
|
? createStreamlinedTransformer()
|
|
: null
|
|
|
|
headlessProfilerCheckpoint('before_runHeadlessStreaming')
|
|
for await (const message of runHeadlessStreaming(
|
|
structuredIO,
|
|
appState.mcp.clients,
|
|
[...commands, ...appState.mcp.commands],
|
|
filteredTools,
|
|
initialMessages,
|
|
canUseTool,
|
|
sdkMcpConfigs,
|
|
getAppState,
|
|
setAppState,
|
|
agents,
|
|
options,
|
|
turnInterruptionState,
|
|
)) {
|
|
if (transformToStreamlined) {
|
|
// Streamlined mode: transform messages and stream immediately
|
|
const transformed = transformToStreamlined(message)
|
|
if (transformed) {
|
|
await structuredIO.write(transformed)
|
|
}
|
|
} else if (options.outputFormat === 'stream-json' && options.verbose) {
|
|
await structuredIO.write(message)
|
|
}
|
|
// Should not be getting control messages or stream events in non-stream mode.
|
|
// Also filter out streamlined types since they're only produced by the transformer.
|
|
// SDK-only system events are excluded so lastMessage stays at the result
|
|
// (session_state_changed(idle) and any late task_notification drain after
|
|
// result in the finally block).
|
|
if (
|
|
message.type !== 'control_response' &&
|
|
message.type !== 'control_request' &&
|
|
message.type !== 'control_cancel_request' &&
|
|
!(
|
|
message.type === 'system' &&
|
|
(message.subtype === 'session_state_changed' ||
|
|
message.subtype === 'task_notification' ||
|
|
message.subtype === 'task_started' ||
|
|
message.subtype === 'task_progress' ||
|
|
message.subtype === 'post_turn_summary')
|
|
) &&
|
|
message.type !== 'stream_event' &&
|
|
message.type !== 'keep_alive' &&
|
|
message.type !== 'streamlined_text' &&
|
|
message.type !== 'streamlined_tool_use_summary' &&
|
|
message.type !== 'prompt_suggestion'
|
|
) {
|
|
if (needsFullArray) {
|
|
messages.push(message)
|
|
}
|
|
lastMessage = message
|
|
}
|
|
}
|
|
|
|
switch (options.outputFormat) {
|
|
case 'json':
|
|
if (!lastMessage || lastMessage.type !== 'result') {
|
|
throw new Error('No messages returned')
|
|
}
|
|
if (options.verbose) {
|
|
writeToStdout(jsonStringify(messages) + '\n')
|
|
break
|
|
}
|
|
writeToStdout(jsonStringify(lastMessage) + '\n')
|
|
break
|
|
case 'stream-json':
|
|
// already logged above
|
|
break
|
|
default:
|
|
if (!lastMessage || lastMessage.type !== 'result') {
|
|
throw new Error('No messages returned')
|
|
}
|
|
switch (lastMessage.subtype) {
|
|
case 'success':
|
|
writeToStdout(
|
|
lastMessage.result.endsWith('\n')
|
|
? lastMessage.result
|
|
: lastMessage.result + '\n',
|
|
)
|
|
break
|
|
case 'error_during_execution':
|
|
writeToStdout(`Execution error`)
|
|
break
|
|
case 'error_max_turns':
|
|
writeToStdout(`Error: Reached max turns (${options.maxTurns})`)
|
|
break
|
|
case 'error_max_budget_usd':
|
|
writeToStdout(`Error: Exceeded USD budget (${options.maxBudgetUsd})`)
|
|
break
|
|
case 'error_max_structured_output_retries':
|
|
writeToStdout(
|
|
`Error: Failed to provide valid structured output after maximum retries`,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Log headless latency metrics for the final turn
|
|
logHeadlessProfilerTurn()
|
|
|
|
// Drain any in-flight memory extraction before shutdown. The response is
|
|
// already flushed above, so this adds no user-visible latency — it just
|
|
// delays process exit so gracefulShutdownSync's 5s failsafe doesn't kill
|
|
// the forked agent mid-flight. Gated by isExtractModeActive so the
|
|
// tengu_slate_thimble flag controls non-interactive extraction end-to-end.
|
|
if (feature('EXTRACT_MEMORIES') && isExtractModeActive()) {
|
|
await extractMemoriesModule!.drainPendingExtraction()
|
|
}
|
|
|
|
gracefulShutdownSync(
|
|
lastMessage?.type === 'result' && lastMessage?.is_error ? 1 : 0,
|
|
)
|
|
}
|
|
|
|
function runHeadlessStreaming(
|
|
structuredIO: StructuredIO,
|
|
mcpClients: MCPServerConnection[],
|
|
commands: Command[],
|
|
tools: Tools,
|
|
initialMessages: Message[],
|
|
canUseTool: CanUseToolFn,
|
|
sdkMcpConfigs: Record<string, McpSdkServerConfig>,
|
|
getAppState: () => AppState,
|
|
setAppState: (f: (prev: AppState) => AppState) => void,
|
|
agents: AgentDefinition[],
|
|
options: {
|
|
verbose: boolean | undefined
|
|
jsonSchema: Record<string, unknown> | undefined
|
|
permissionPromptToolName: string | undefined
|
|
allowedTools: string[] | undefined
|
|
thinkingConfig: ThinkingConfig | undefined
|
|
maxTurns: number | undefined
|
|
maxBudgetUsd: number | undefined
|
|
taskBudget: { total: number } | undefined
|
|
systemPrompt: string | undefined
|
|
appendSystemPrompt: string | undefined
|
|
userSpecifiedModel: string | undefined
|
|
fallbackModel: string | undefined
|
|
replayUserMessages?: boolean | undefined
|
|
includePartialMessages?: boolean | undefined
|
|
enableAuthStatus?: boolean | undefined
|
|
agent?: string | undefined
|
|
setSDKStatus?: (status: SDKStatus) => void
|
|
promptSuggestions?: boolean | undefined
|
|
workload?: string | undefined
|
|
},
|
|
turnInterruptionState?: TurnInterruptionState,
|
|
): AsyncIterable<StdoutMessage> {
|
|
let running = false
|
|
let runPhase:
|
|
| 'draining_commands'
|
|
| 'waiting_for_agents'
|
|
| 'finally_flush'
|
|
| 'finally_post_flush'
|
|
| undefined
|
|
let inputClosed = false
|
|
let shutdownPromptInjected = false
|
|
let heldBackResult: StdoutMessage | null = null
|
|
let abortController: AbortController | undefined
|
|
// Same queue sendRequest() enqueues to — one FIFO for everything.
|
|
const output = structuredIO.outbound
|
|
|
|
// Ctrl+C in -p mode: abort the in-flight query, then shut down gracefully.
|
|
// gracefulShutdown persists session state and flushes analytics, with a
|
|
// failsafe timer that force-exits if cleanup hangs.
|
|
const sigintHandler = () => {
|
|
logForDiagnosticsNoPII('info', 'shutdown_signal', { signal: 'SIGINT' })
|
|
if (abortController && !abortController.signal.aborted) {
|
|
abortController.abort()
|
|
}
|
|
void gracefulShutdown(0)
|
|
}
|
|
process.on('SIGINT', sigintHandler)
|
|
|
|
// Dump run()'s state at SIGTERM so a stuck session's healthsweep can name
|
|
// the do/while(waitingForAgents) poll without reading the transcript.
|
|
registerCleanup(async () => {
|
|
const bg: Record<string, number> = {}
|
|
for (const t of getRunningTasks(getAppState())) {
|
|
if (isBackgroundTask(t)) bg[t.type] = (bg[t.type] ?? 0) + 1
|
|
}
|
|
logForDiagnosticsNoPII('info', 'run_state_at_shutdown', {
|
|
run_active: running,
|
|
run_phase: runPhase,
|
|
worker_status: getSessionState(),
|
|
internal_events_pending: structuredIO.internalEventsPending,
|
|
bg_tasks: bg,
|
|
})
|
|
})
|
|
|
|
// Wire the central onChangeAppState mode-diff hook to the SDK output stream.
|
|
// This fires whenever ANY code path mutates toolPermissionContext.mode —
|
|
// Shift+Tab, ExitPlanMode dialog, /plan slash command, rewind, bridge
|
|
// set_permission_mode, the query loop, stop_task — rather than the two
|
|
// paths that previously went through a bespoke wrapper.
|
|
// The wrapper's body was fully redundant (it enqueued here AND called
|
|
// notifySessionMetadataChanged, both of which onChangeAppState now covers);
|
|
// keeping it would double-emit status messages.
|
|
setPermissionModeChangedListener(newMode => {
|
|
// Only emit for SDK-exposed modes.
|
|
if (
|
|
newMode === 'default' ||
|
|
newMode === 'acceptEdits' ||
|
|
newMode === 'bypassPermissions' ||
|
|
newMode === 'plan' ||
|
|
newMode === (feature('TRANSCRIPT_CLASSIFIER') && 'auto') ||
|
|
newMode === 'dontAsk'
|
|
) {
|
|
output.enqueue({
|
|
type: 'system',
|
|
subtype: 'status',
|
|
status: null,
|
|
permissionMode: newMode as PermissionMode,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
})
|
|
}
|
|
})
|
|
|
|
// Prompt suggestion tracking (push model)
|
|
const suggestionState: {
|
|
abortController: AbortController | null
|
|
inflightPromise: Promise<void> | null
|
|
lastEmitted: {
|
|
text: string
|
|
emittedAt: number
|
|
promptId: PromptVariant
|
|
generationRequestId: string | null
|
|
} | null
|
|
pendingSuggestion: {
|
|
type: 'prompt_suggestion'
|
|
suggestion: string
|
|
uuid: UUID
|
|
session_id: string
|
|
} | null
|
|
pendingLastEmittedEntry: {
|
|
text: string
|
|
promptId: PromptVariant
|
|
generationRequestId: string | null
|
|
} | null
|
|
} = {
|
|
abortController: null,
|
|
inflightPromise: null,
|
|
lastEmitted: null,
|
|
pendingSuggestion: null,
|
|
pendingLastEmittedEntry: null,
|
|
}
|
|
|
|
// Set up AWS auth status listener if enabled
|
|
let unsubscribeAuthStatus: (() => void) | undefined
|
|
if (options.enableAuthStatus) {
|
|
const authStatusManager = AwsAuthStatusManager.getInstance()
|
|
unsubscribeAuthStatus = authStatusManager.subscribe(status => {
|
|
output.enqueue({
|
|
type: 'auth_status',
|
|
isAuthenticating: status.isAuthenticating,
|
|
output: status.output,
|
|
error: status.error,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
})
|
|
})
|
|
}
|
|
|
|
// Set up rate limit status listener to emit SDKRateLimitEvent for all status changes.
|
|
// Emitting for all statuses (including 'allowed') ensures consumers can clear warnings
|
|
// when rate limits reset. The upstream emitStatusChange already deduplicates via isEqual.
|
|
const rateLimitListener = (limits: ClaudeAILimits) => {
|
|
const rateLimitInfo = toSDKRateLimitInfo(limits)
|
|
if (rateLimitInfo) {
|
|
output.enqueue({
|
|
type: 'rate_limit_event',
|
|
rate_limit_info: rateLimitInfo,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
})
|
|
}
|
|
}
|
|
statusListeners.add(rateLimitListener)
|
|
|
|
// Messages for internal tracking, directly mutated by ask(). These messages
|
|
// include Assistant, User, Attachment, and Progress messages.
|
|
// TODO: Clean up this code to avoid passing around a mutable array.
|
|
const mutableMessages: Message[] = initialMessages
|
|
|
|
// Seed the readFileState cache from the transcript (content the model saw,
|
|
// with message timestamps) so getChangedFiles can detect external edits.
|
|
// This cache instance must persist across ask() calls, since the edit tool
|
|
// relies on this as a global state.
|
|
let readFileState = extractReadFilesFromMessages(
|
|
initialMessages,
|
|
cwd(),
|
|
READ_FILE_STATE_CACHE_SIZE,
|
|
)
|
|
|
|
// Client-supplied readFileState seeds (via seed_read_state control request).
|
|
// The stdin IIFE runs concurrently with ask() — a seed arriving mid-turn
|
|
// would be lost to ask()'s clone-then-replace (QueryEngine.ts finally block)
|
|
// if written directly into readFileState. Instead, seeds land here, merge
|
|
// into getReadFileCache's view (readFileState-wins-ties: seeds fill gaps),
|
|
// and are re-applied then CLEARED in setReadFileCache. One-shot: each seed
|
|
// survives exactly one clone-replace cycle, then becomes a regular
|
|
// readFileState entry subject to compact's clear like everything else.
|
|
const pendingSeeds = createFileStateCacheWithSizeLimit(
|
|
READ_FILE_STATE_CACHE_SIZE,
|
|
)
|
|
|
|
// Auto-resume interrupted turns on restart so CC continues from where it
|
|
// left off without requiring the SDK to re-send the prompt.
|
|
const resumeInterruptedTurnEnv =
|
|
process.env.CLAUDE_CODE_RESUME_INTERRUPTED_TURN
|
|
if (
|
|
turnInterruptionState &&
|
|
turnInterruptionState.kind !== 'none' &&
|
|
resumeInterruptedTurnEnv
|
|
) {
|
|
logForDebugging(
|
|
`[print.ts] Auto-resuming interrupted turn (kind: ${turnInterruptionState.kind})`,
|
|
)
|
|
|
|
// Remove the interrupted message and its sentinel, then re-enqueue so
|
|
// the model sees it exactly once. For mid-turn interruptions, the
|
|
// deserialization layer transforms them into interrupted_prompt by
|
|
// appending a synthetic "Continue from where you left off." message.
|
|
removeInterruptedMessage(mutableMessages, turnInterruptionState.message)
|
|
enqueue({
|
|
mode: 'prompt',
|
|
value: turnInterruptionState.message.message.content,
|
|
uuid: randomUUID(),
|
|
})
|
|
}
|
|
|
|
const modelOptions = getModelOptions()
|
|
const modelInfos = modelOptions.map(option => {
|
|
const modelId = option.value === null ? 'default' : option.value
|
|
const resolvedModel =
|
|
modelId === 'default'
|
|
? getDefaultMainLoopModel()
|
|
: parseUserSpecifiedModel(modelId)
|
|
const hasEffort = modelSupportsEffort(resolvedModel)
|
|
const hasAdaptiveThinking = modelSupportsAdaptiveThinking(resolvedModel)
|
|
const hasFastMode = isFastModeSupportedByModel(option.value)
|
|
const hasAutoMode = modelSupportsAutoMode(resolvedModel)
|
|
return {
|
|
value: modelId,
|
|
displayName: option.label,
|
|
description: option.description,
|
|
...(hasEffort && {
|
|
supportsEffort: true,
|
|
supportedEffortLevels: modelSupportsMaxEffort(resolvedModel)
|
|
? [...EFFORT_LEVELS]
|
|
: EFFORT_LEVELS.filter(l => l !== 'max'),
|
|
}),
|
|
...(hasAdaptiveThinking && { supportsAdaptiveThinking: true }),
|
|
...(hasFastMode && { supportsFastMode: true }),
|
|
...(hasAutoMode && { supportsAutoMode: true }),
|
|
}
|
|
})
|
|
let activeUserSpecifiedModel = options.userSpecifiedModel
|
|
|
|
function injectModelSwitchBreadcrumbs(
|
|
modelArg: string,
|
|
resolvedModel: string,
|
|
): void {
|
|
const breadcrumbs = createModelSwitchBreadcrumbs(
|
|
modelArg,
|
|
modelDisplayString(resolvedModel),
|
|
)
|
|
mutableMessages.push(...breadcrumbs)
|
|
for (const crumb of breadcrumbs) {
|
|
if (
|
|
typeof crumb.message.content === 'string' &&
|
|
crumb.message.content.includes(`<${LOCAL_COMMAND_STDOUT_TAG}>`)
|
|
) {
|
|
output.enqueue({
|
|
type: 'user',
|
|
message: crumb.message,
|
|
session_id: getSessionId(),
|
|
parent_tool_use_id: null,
|
|
uuid: crumb.uuid,
|
|
timestamp: crumb.timestamp,
|
|
isReplay: true,
|
|
} satisfies SDKUserMessageReplay)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cache SDK MCP clients to avoid reconnecting on each run
|
|
let sdkClients: MCPServerConnection[] = []
|
|
let sdkTools: Tools = []
|
|
|
|
// Track which MCP clients have had elicitation handlers registered
|
|
const elicitationRegistered = new Set<string>()
|
|
|
|
/**
|
|
* Register elicitation request/completion handlers on connected MCP clients
|
|
* that haven't been registered yet. SDK MCP servers are excluded because they
|
|
* route through SdkControlClientTransport. Hooks run first (matching REPL
|
|
* behavior); if no hook responds, the request is forwarded to the SDK
|
|
* consumer via the control protocol.
|
|
*/
|
|
function registerElicitationHandlers(clients: MCPServerConnection[]): void {
|
|
for (const connection of clients) {
|
|
if (
|
|
connection.type !== 'connected' ||
|
|
elicitationRegistered.has(connection.name)
|
|
) {
|
|
continue
|
|
}
|
|
// Skip SDK MCP servers — elicitation flows through SdkControlClientTransport
|
|
if (connection.config.type === 'sdk') {
|
|
continue
|
|
}
|
|
const serverName = connection.name
|
|
|
|
// Wrapped in try/catch because setRequestHandler throws if the client wasn't
|
|
// created with elicitation capability declared (e.g., SDK-created clients).
|
|
try {
|
|
connection.client.setRequestHandler(
|
|
ElicitRequestSchema,
|
|
async (request, extra) => {
|
|
logMCPDebug(
|
|
serverName,
|
|
`Elicitation request received in print mode: ${jsonStringify(request)}`,
|
|
)
|
|
|
|
const mode = request.params.mode === 'url' ? 'url' : 'form'
|
|
|
|
logEvent('tengu_mcp_elicitation_shown', {
|
|
mode: mode as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
})
|
|
|
|
// Run elicitation hooks first — they can provide a response programmatically
|
|
const hookResponse = await runElicitationHooks(
|
|
serverName,
|
|
request.params,
|
|
extra.signal,
|
|
)
|
|
if (hookResponse) {
|
|
logMCPDebug(
|
|
serverName,
|
|
`Elicitation resolved by hook: ${jsonStringify(hookResponse)}`,
|
|
)
|
|
logEvent('tengu_mcp_elicitation_response', {
|
|
mode: mode as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
action:
|
|
hookResponse.action as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
})
|
|
return hookResponse
|
|
}
|
|
|
|
// Delegate to SDK consumer via control protocol
|
|
const url =
|
|
'url' in request.params
|
|
? (request.params.url as string)
|
|
: undefined
|
|
const requestedSchema =
|
|
'requestedSchema' in request.params
|
|
? (request.params.requestedSchema as
|
|
| Record<string, unknown>
|
|
| undefined)
|
|
: undefined
|
|
|
|
const elicitationId =
|
|
'elicitationId' in request.params
|
|
? (request.params.elicitationId as string | undefined)
|
|
: undefined
|
|
|
|
const rawResult = await structuredIO.handleElicitation(
|
|
serverName,
|
|
request.params.message,
|
|
requestedSchema,
|
|
extra.signal,
|
|
mode,
|
|
url,
|
|
elicitationId,
|
|
)
|
|
|
|
const result = await runElicitationResultHooks(
|
|
serverName,
|
|
rawResult,
|
|
extra.signal,
|
|
mode,
|
|
elicitationId,
|
|
)
|
|
|
|
logEvent('tengu_mcp_elicitation_response', {
|
|
mode: mode as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
action:
|
|
result.action as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
})
|
|
return result
|
|
},
|
|
)
|
|
|
|
// Surface completion notifications to SDK consumers (URL mode)
|
|
connection.client.setNotificationHandler(
|
|
ElicitationCompleteNotificationSchema,
|
|
notification => {
|
|
const { elicitationId } = notification.params
|
|
logMCPDebug(
|
|
serverName,
|
|
`Elicitation completion notification: ${elicitationId}`,
|
|
)
|
|
void executeNotificationHooks({
|
|
message: `MCP server "${serverName}" confirmed elicitation ${elicitationId} complete`,
|
|
notificationType: 'elicitation_complete',
|
|
})
|
|
output.enqueue({
|
|
type: 'system',
|
|
subtype: 'elicitation_complete',
|
|
mcp_server_name: serverName,
|
|
elicitation_id: elicitationId,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
})
|
|
},
|
|
)
|
|
|
|
elicitationRegistered.add(serverName)
|
|
} catch {
|
|
// setRequestHandler throws if the client wasn't created with
|
|
// elicitation capability — skip silently
|
|
}
|
|
}
|
|
}
|
|
|
|
async function updateSdkMcp() {
|
|
// Check if SDK MCP servers need to be updated (new servers added or removed)
|
|
const currentServerNames = new Set(Object.keys(sdkMcpConfigs))
|
|
const connectedServerNames = new Set(sdkClients.map(c => c.name))
|
|
|
|
// Check if there are any differences (additions or removals)
|
|
const hasNewServers = Array.from(currentServerNames).some(
|
|
name => !connectedServerNames.has(name),
|
|
)
|
|
const hasRemovedServers = Array.from(connectedServerNames).some(
|
|
name => !currentServerNames.has(name),
|
|
)
|
|
// Check if any SDK clients are pending and need to be upgraded
|
|
const hasPendingSdkClients = sdkClients.some(c => c.type === 'pending')
|
|
// Check if any SDK clients failed their handshake and need to be retried.
|
|
// Without this, a client that lands in 'failed' (e.g. handshake timeout on
|
|
// a WS reconnect race) stays failed forever — its name satisfies the
|
|
// connectedServerNames diff but it contributes zero tools.
|
|
const hasFailedSdkClients = sdkClients.some(c => c.type === 'failed')
|
|
|
|
const haveServersChanged =
|
|
hasNewServers ||
|
|
hasRemovedServers ||
|
|
hasPendingSdkClients ||
|
|
hasFailedSdkClients
|
|
|
|
if (haveServersChanged) {
|
|
// Clean up removed servers
|
|
for (const client of sdkClients) {
|
|
if (!currentServerNames.has(client.name)) {
|
|
if (client.type === 'connected') {
|
|
await client.cleanup()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Re-initialize all SDK MCP servers with current config
|
|
const sdkSetup = await setupSdkMcpClients(
|
|
sdkMcpConfigs,
|
|
(serverName, message) =>
|
|
structuredIO.sendMcpMessage(serverName, message),
|
|
)
|
|
sdkClients = sdkSetup.clients
|
|
sdkTools = sdkSetup.tools
|
|
|
|
// Store SDK MCP tools in appState so subagents can access them via
|
|
// assembleToolPool. Only tools are stored here — SDK clients are already
|
|
// merged separately in the query loop (allMcpClients) and mcp_status handler.
|
|
// Use both old (connectedServerNames) and new (currentServerNames) to remove
|
|
// stale SDK tools when servers are added or removed.
|
|
const allSdkNames = uniq([...connectedServerNames, ...currentServerNames])
|
|
setAppState(prev => ({
|
|
...prev,
|
|
mcp: {
|
|
...prev.mcp,
|
|
tools: [
|
|
...prev.mcp.tools.filter(
|
|
t =>
|
|
!allSdkNames.some(name =>
|
|
t.name.startsWith(getMcpPrefix(name)),
|
|
),
|
|
),
|
|
...sdkTools,
|
|
],
|
|
},
|
|
}))
|
|
|
|
// Set up the special internal VSCode MCP server if necessary.
|
|
setupVscodeSdkMcp(sdkClients)
|
|
}
|
|
}
|
|
|
|
void updateSdkMcp()
|
|
|
|
// State for dynamically added MCP servers (via mcp_set_servers control message)
|
|
// These are separate from SDK MCP servers and support all transport types
|
|
let dynamicMcpState: DynamicMcpState = {
|
|
clients: [],
|
|
tools: [],
|
|
configs: {},
|
|
}
|
|
|
|
// Shared tool assembly for ask() and the get_context_usage control request.
|
|
// Closes over the mutable sdkTools/dynamicMcpState bindings so both call
|
|
// sites see late-connecting servers.
|
|
const buildAllTools = (appState: AppState): Tools => {
|
|
const assembledTools = assembleToolPool(
|
|
appState.toolPermissionContext,
|
|
appState.mcp.tools,
|
|
)
|
|
let allTools = uniqBy(
|
|
mergeAndFilterTools(
|
|
[...tools, ...sdkTools, ...dynamicMcpState.tools],
|
|
assembledTools,
|
|
appState.toolPermissionContext.mode,
|
|
),
|
|
'name',
|
|
)
|
|
if (options.permissionPromptToolName) {
|
|
allTools = allTools.filter(
|
|
tool => !toolMatchesName(tool, options.permissionPromptToolName!),
|
|
)
|
|
}
|
|
const initJsonSchema = getInitJsonSchema()
|
|
if (initJsonSchema && !options.jsonSchema) {
|
|
const syntheticOutputResult = createSyntheticOutputTool(initJsonSchema)
|
|
if ('tool' in syntheticOutputResult) {
|
|
allTools = [...allTools, syntheticOutputResult.tool]
|
|
}
|
|
}
|
|
return allTools
|
|
}
|
|
|
|
// Bridge handle for remote-control (SDK control message).
|
|
// Mirrors the REPL's useReplBridge hook: the handle is created when
|
|
// `remote_control` is enabled and torn down when disabled.
|
|
let bridgeHandle: ReplBridgeHandle | null = null
|
|
// Cursor into mutableMessages — tracks how far we've forwarded.
|
|
// Same index-based diff as useReplBridge's lastWrittenIndexRef.
|
|
let bridgeLastForwardedIndex = 0
|
|
|
|
// Forward new messages from mutableMessages to the bridge.
|
|
// Called incrementally during each turn (so claude.ai sees progress
|
|
// and stays alive during permission waits) and again after the turn.
|
|
//
|
|
// writeMessages has its own UUID-based dedup (initialMessageUUIDs,
|
|
// recentPostedUUIDs) — the index cursor here is a pre-filter to avoid
|
|
// O(n) re-scanning of already-sent messages on every call.
|
|
function forwardMessagesToBridge(): void {
|
|
if (!bridgeHandle) return
|
|
// Guard against mutableMessages shrinking (compaction truncates it).
|
|
const startIndex = Math.min(
|
|
bridgeLastForwardedIndex,
|
|
mutableMessages.length,
|
|
)
|
|
const newMessages = mutableMessages
|
|
.slice(startIndex)
|
|
.filter(m => m.type === 'user' || m.type === 'assistant')
|
|
bridgeLastForwardedIndex = mutableMessages.length
|
|
if (newMessages.length > 0) {
|
|
bridgeHandle.writeMessages(newMessages)
|
|
}
|
|
}
|
|
|
|
// Helper to apply MCP server changes - used by both mcp_set_servers control message
|
|
// and background plugin installation.
|
|
// NOTE: Nested function required - mutates closure state (sdkMcpConfigs, sdkClients, etc.)
|
|
let mcpChangesPromise: Promise<{
|
|
response: SDKControlMcpSetServersResponse
|
|
sdkServersChanged: boolean
|
|
}> = Promise.resolve({
|
|
response: {
|
|
added: [] as string[],
|
|
removed: [] as string[],
|
|
errors: {} as Record<string, string>,
|
|
},
|
|
sdkServersChanged: false,
|
|
})
|
|
|
|
function applyMcpServerChanges(
|
|
servers: Record<string, McpServerConfigForProcessTransport>,
|
|
): Promise<{
|
|
response: SDKControlMcpSetServersResponse
|
|
sdkServersChanged: boolean
|
|
}> {
|
|
// Serialize calls to prevent race conditions between concurrent callers
|
|
// (background plugin install and mcp_set_servers control messages)
|
|
const doWork = async (): Promise<{
|
|
response: SDKControlMcpSetServersResponse
|
|
sdkServersChanged: boolean
|
|
}> => {
|
|
const oldSdkClientNames = new Set(sdkClients.map(c => c.name))
|
|
|
|
const result = await handleMcpSetServers(
|
|
servers,
|
|
{ configs: sdkMcpConfigs, clients: sdkClients, tools: sdkTools },
|
|
dynamicMcpState,
|
|
setAppState,
|
|
)
|
|
|
|
// Update SDK state (need to mutate sdkMcpConfigs since it's shared)
|
|
for (const key of Object.keys(sdkMcpConfigs)) {
|
|
delete sdkMcpConfigs[key]
|
|
}
|
|
Object.assign(sdkMcpConfigs, result.newSdkState.configs)
|
|
sdkClients = result.newSdkState.clients
|
|
sdkTools = result.newSdkState.tools
|
|
dynamicMcpState = result.newDynamicState
|
|
|
|
// Keep appState.mcp.tools in sync so subagents can see SDK MCP tools.
|
|
// Use both old and new SDK client names to remove stale tools.
|
|
if (result.sdkServersChanged) {
|
|
const newSdkClientNames = new Set(sdkClients.map(c => c.name))
|
|
const allSdkNames = uniq([...oldSdkClientNames, ...newSdkClientNames])
|
|
setAppState(prev => ({
|
|
...prev,
|
|
mcp: {
|
|
...prev.mcp,
|
|
tools: [
|
|
...prev.mcp.tools.filter(
|
|
t =>
|
|
!allSdkNames.some(name =>
|
|
t.name.startsWith(getMcpPrefix(name)),
|
|
),
|
|
),
|
|
...sdkTools,
|
|
],
|
|
},
|
|
}))
|
|
}
|
|
|
|
return {
|
|
response: result.response,
|
|
sdkServersChanged: result.sdkServersChanged,
|
|
}
|
|
}
|
|
|
|
mcpChangesPromise = mcpChangesPromise.then(doWork, doWork)
|
|
return mcpChangesPromise
|
|
}
|
|
|
|
// Build McpServerStatus[] for control responses. Shared by mcp_status and
|
|
// reload_plugins handlers. Reads closure state: sdkClients, dynamicMcpState.
|
|
function buildMcpServerStatuses(): McpServerStatus[] {
|
|
const currentAppState = getAppState()
|
|
const currentMcpClients = currentAppState.mcp.clients
|
|
const allMcpTools = uniqBy(
|
|
[...currentAppState.mcp.tools, ...dynamicMcpState.tools],
|
|
'name',
|
|
)
|
|
const existingNames = new Set([
|
|
...currentMcpClients.map(c => c.name),
|
|
...sdkClients.map(c => c.name),
|
|
])
|
|
return [
|
|
...currentMcpClients,
|
|
...sdkClients,
|
|
...dynamicMcpState.clients.filter(c => !existingNames.has(c.name)),
|
|
].map(connection => {
|
|
let config
|
|
if (
|
|
connection.config.type === 'sse' ||
|
|
connection.config.type === 'http'
|
|
) {
|
|
config = {
|
|
type: connection.config.type,
|
|
url: connection.config.url,
|
|
headers: connection.config.headers,
|
|
oauth: connection.config.oauth,
|
|
}
|
|
} else if (connection.config.type === 'claudeai-proxy') {
|
|
config = {
|
|
type: 'claudeai-proxy' as const,
|
|
url: connection.config.url,
|
|
id: connection.config.id,
|
|
}
|
|
} else if (
|
|
connection.config.type === 'stdio' ||
|
|
connection.config.type === undefined
|
|
) {
|
|
config = {
|
|
type: 'stdio' as const,
|
|
command: connection.config.command,
|
|
args: connection.config.args,
|
|
}
|
|
}
|
|
const serverTools =
|
|
connection.type === 'connected'
|
|
? filterToolsByServer(allMcpTools, connection.name).map(tool => ({
|
|
name: tool.mcpInfo?.toolName ?? tool.name,
|
|
annotations: {
|
|
readOnly: tool.isReadOnly({}) || undefined,
|
|
destructive: tool.isDestructive?.({}) || undefined,
|
|
openWorld: tool.isOpenWorld?.({}) || undefined,
|
|
},
|
|
}))
|
|
: undefined
|
|
// Capabilities passthrough with allowlist pre-filter. The IDE reads
|
|
// experimental['claude/channel'] to decide whether to show the
|
|
// Enable-channel prompt — only echo it if channel_enable would
|
|
// actually pass the allowlist. Not a security boundary (the
|
|
// handler re-runs the full gate); just avoids dead buttons.
|
|
let capabilities: { experimental?: Record<string, unknown> } | undefined
|
|
if (
|
|
(feature('KAIROS') || feature('KAIROS_CHANNELS')) &&
|
|
connection.type === 'connected' &&
|
|
connection.capabilities.experimental
|
|
) {
|
|
const exp = { ...connection.capabilities.experimental }
|
|
if (
|
|
exp['claude/channel'] &&
|
|
(!isChannelsEnabled() ||
|
|
!isChannelAllowlisted(connection.config.pluginSource))
|
|
) {
|
|
delete exp['claude/channel']
|
|
}
|
|
if (Object.keys(exp).length > 0) {
|
|
capabilities = { experimental: exp }
|
|
}
|
|
}
|
|
return {
|
|
name: connection.name,
|
|
status: connection.type,
|
|
serverInfo:
|
|
connection.type === 'connected' ? connection.serverInfo : undefined,
|
|
error: connection.type === 'failed' ? connection.error : undefined,
|
|
config,
|
|
scope: connection.config.scope,
|
|
tools: serverTools,
|
|
capabilities,
|
|
}
|
|
})
|
|
}
|
|
|
|
// NOTE: Nested function required - needs closure access to applyMcpServerChanges and updateSdkMcp
|
|
async function installPluginsAndApplyMcpInBackground(): Promise<void> {
|
|
try {
|
|
// Join point for user settings (fired at runHeadless entry) and managed
|
|
// settings (fired in main.tsx preAction). downloadUserSettings() caches
|
|
// its promise so this awaits the same in-flight request.
|
|
await Promise.all([
|
|
feature('DOWNLOAD_USER_SETTINGS') &&
|
|
(isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) || getIsRemoteMode())
|
|
? withDiagnosticsTiming('headless_user_settings_download', () =>
|
|
downloadUserSettings(),
|
|
)
|
|
: Promise.resolve(),
|
|
withDiagnosticsTiming('headless_managed_settings_wait', () =>
|
|
waitForRemoteManagedSettingsToLoad(),
|
|
),
|
|
])
|
|
|
|
const pluginsInstalled = await installPluginsForHeadless()
|
|
|
|
if (pluginsInstalled) {
|
|
await applyPluginMcpDiff()
|
|
}
|
|
} catch (error) {
|
|
logError(error)
|
|
}
|
|
}
|
|
|
|
// Background plugin installation for all headless users
|
|
// Installs marketplaces from extraKnownMarketplaces and missing enabled plugins
|
|
// CLAUDE_CODE_SYNC_PLUGIN_INSTALL=true: resolved in run() before the first
|
|
// query so plugins are guaranteed available on the first ask().
|
|
let pluginInstallPromise: Promise<void> | null = null
|
|
// --bare / SIMPLE: skip plugin install. Scripted calls don't add plugins
|
|
// mid-session; the next interactive run reconciles.
|
|
if (!isBareMode()) {
|
|
if (isEnvTruthy(process.env.CLAUDE_CODE_SYNC_PLUGIN_INSTALL)) {
|
|
pluginInstallPromise = installPluginsAndApplyMcpInBackground()
|
|
} else {
|
|
void installPluginsAndApplyMcpInBackground()
|
|
}
|
|
}
|
|
|
|
// Idle timeout management
|
|
const idleTimeout = createIdleTimeoutManager(() => !running)
|
|
|
|
// Mutable commands and agents for hot reloading
|
|
let currentCommands = commands
|
|
let currentAgents = agents
|
|
|
|
// Clear all plugin-related caches, reload commands/agents/hooks.
|
|
// Called after CLAUDE_CODE_SYNC_PLUGIN_INSTALL completes (before first query)
|
|
// and after non-sync background install finishes.
|
|
// refreshActivePlugins calls clearAllCaches() which is required because
|
|
// loadAllPlugins() may have run during main.tsx startup BEFORE managed
|
|
// settings were fetched. Without clearing, getCommands() would rebuild
|
|
// from a stale plugin list.
|
|
async function refreshPluginState(): Promise<void> {
|
|
// refreshActivePlugins handles the full cache sweep (clearAllCaches),
|
|
// reloads all plugin component loaders, writes AppState.plugins +
|
|
// AppState.agentDefinitions, registers hooks, and bumps mcp.pluginReconnectKey.
|
|
const { agentDefinitions: freshAgentDefs } =
|
|
await refreshActivePlugins(setAppState)
|
|
|
|
// Headless-specific: currentCommands/currentAgents are local mutable refs
|
|
// captured by the query loop (REPL uses AppState instead). getCommands is
|
|
// fresh because refreshActivePlugins cleared its cache.
|
|
currentCommands = await getCommands(cwd())
|
|
|
|
// Preserve SDK-provided agents (--agents CLI flag or SDK initialize
|
|
// control_request) — both inject via parseAgentsFromJson with
|
|
// source='flagSettings'. loadMarkdownFilesForSubdir never assigns this
|
|
// source, so it cleanly discriminates "injected, not disk-loadable".
|
|
//
|
|
// The previous filter used a negative set-diff (!freshAgentTypes.has(a))
|
|
// which also matched plugin agents that were in the poisoned initial
|
|
// currentAgents but correctly excluded from freshAgentDefs after managed
|
|
// settings applied — leaking policy-blocked agents into the init message.
|
|
// See gh-23085: isBridgeEnabled() at Commander-definition time poisoned
|
|
// the settings cache before setEligibility(true) ran.
|
|
const sdkAgents = currentAgents.filter(a => a.source === 'flagSettings')
|
|
currentAgents = [...freshAgentDefs.allAgents, ...sdkAgents]
|
|
}
|
|
|
|
// Re-diff MCP configs after plugin state changes. Filters to
|
|
// process-transport-supported types and carries SDK-mode servers through
|
|
// so applyMcpServerChanges' diff doesn't close their transports.
|
|
// Nested: needs closure access to sdkMcpConfigs, applyMcpServerChanges,
|
|
// updateSdkMcp.
|
|
async function applyPluginMcpDiff(): Promise<void> {
|
|
const { servers: newConfigs } = await getAllMcpConfigs()
|
|
const supportedConfigs: Record<string, McpServerConfigForProcessTransport> =
|
|
{}
|
|
for (const [name, config] of Object.entries(newConfigs)) {
|
|
const type = config.type
|
|
if (
|
|
type === undefined ||
|
|
type === 'stdio' ||
|
|
type === 'sse' ||
|
|
type === 'http' ||
|
|
type === 'sdk'
|
|
) {
|
|
supportedConfigs[name] = config
|
|
}
|
|
}
|
|
for (const [name, config] of Object.entries(sdkMcpConfigs)) {
|
|
if (config.type === 'sdk' && !(name in supportedConfigs)) {
|
|
supportedConfigs[name] = config
|
|
}
|
|
}
|
|
const { response, sdkServersChanged } =
|
|
await applyMcpServerChanges(supportedConfigs)
|
|
if (sdkServersChanged) {
|
|
void updateSdkMcp()
|
|
}
|
|
logForDebugging(
|
|
`Headless MCP refresh: added=${response.added.length}, removed=${response.removed.length}`,
|
|
)
|
|
}
|
|
|
|
// Subscribe to skill changes for hot reloading
|
|
const unsubscribeSkillChanges = skillChangeDetector.subscribe(() => {
|
|
clearCommandsCache()
|
|
void getCommands(cwd()).then(newCommands => {
|
|
currentCommands = newCommands
|
|
})
|
|
})
|
|
|
|
// Proactive mode: schedule a tick to keep the model looping autonomously.
|
|
// setTimeout(0) yields to the event loop so pending stdin messages
|
|
// (interrupts, user messages) are processed before the tick fires.
|
|
const scheduleProactiveTick =
|
|
feature('PROACTIVE') || feature('KAIROS')
|
|
? () => {
|
|
setTimeout(() => {
|
|
if (
|
|
!proactiveModule?.isProactiveActive() ||
|
|
proactiveModule.isProactivePaused() ||
|
|
inputClosed
|
|
) {
|
|
return
|
|
}
|
|
const tickContent = `<${TICK_TAG}>${new Date().toLocaleTimeString()}</${TICK_TAG}>`
|
|
enqueue({
|
|
mode: 'prompt' as const,
|
|
value: tickContent,
|
|
uuid: randomUUID(),
|
|
priority: 'later',
|
|
isMeta: true,
|
|
})
|
|
void run()
|
|
}, 0)
|
|
}
|
|
: undefined
|
|
|
|
// Abort the current operation when a 'now' priority message arrives.
|
|
subscribeToCommandQueue(() => {
|
|
if (abortController && getCommandsByMaxPriority('now').length > 0) {
|
|
abortController.abort('interrupt')
|
|
}
|
|
})
|
|
|
|
const run = async () => {
|
|
if (running) {
|
|
return
|
|
}
|
|
|
|
running = true
|
|
runPhase = undefined
|
|
notifySessionStateChanged('running')
|
|
idleTimeout.stop()
|
|
|
|
headlessProfilerCheckpoint('run_entry')
|
|
// TODO(custom-tool-refactor): Should move to the init message, like browser
|
|
|
|
await updateSdkMcp()
|
|
headlessProfilerCheckpoint('after_updateSdkMcp')
|
|
|
|
// Resolve deferred plugin installation (CLAUDE_CODE_SYNC_PLUGIN_INSTALL).
|
|
// The promise was started eagerly so installation overlaps with other init.
|
|
// Awaiting here guarantees plugins are available before the first ask().
|
|
// If CLAUDE_CODE_SYNC_PLUGIN_INSTALL_TIMEOUT_MS is set, races against that
|
|
// deadline and proceeds without plugins on timeout (logging an error).
|
|
if (pluginInstallPromise) {
|
|
const timeoutMs = parseInt(
|
|
process.env.CLAUDE_CODE_SYNC_PLUGIN_INSTALL_TIMEOUT_MS || '',
|
|
10,
|
|
)
|
|
if (timeoutMs > 0) {
|
|
const timeout = sleep(timeoutMs).then(() => 'timeout' as const)
|
|
const result = await Promise.race([pluginInstallPromise, timeout])
|
|
if (result === 'timeout') {
|
|
logError(
|
|
new Error(
|
|
`CLAUDE_CODE_SYNC_PLUGIN_INSTALL: plugin installation timed out after ${timeoutMs}ms`,
|
|
),
|
|
)
|
|
logEvent('tengu_sync_plugin_install_timeout', {
|
|
timeout_ms: timeoutMs,
|
|
})
|
|
}
|
|
} else {
|
|
await pluginInstallPromise
|
|
}
|
|
pluginInstallPromise = null
|
|
|
|
// Refresh commands, agents, and hooks now that plugins are installed
|
|
await refreshPluginState()
|
|
|
|
// Set up hot-reload for plugin hooks now that the initial install is done.
|
|
// In sync-install mode, setup.ts skips this to avoid racing with the install.
|
|
const { setupPluginHookHotReload } = await import(
|
|
'../utils/plugins/loadPluginHooks.js'
|
|
)
|
|
setupPluginHookHotReload()
|
|
}
|
|
|
|
// Only main-thread commands (agentId===undefined) — subagent
|
|
// notifications are drained by the subagent's mid-turn gate in query.ts.
|
|
// Defined outside the try block so it's accessible in the post-finally
|
|
// queue re-checks at the bottom of run().
|
|
const isMainThread = (cmd: QueuedCommand) => cmd.agentId === undefined
|
|
|
|
try {
|
|
let command: QueuedCommand | undefined
|
|
let waitingForAgents = false
|
|
|
|
// Extract command processing into a named function for the do-while pattern.
|
|
// Drains the queue, batching consecutive prompt-mode commands into one
|
|
// ask() call so messages that queued up during a long turn coalesce
|
|
// into a single follow-up turn instead of N separate turns.
|
|
const drainCommandQueue = async () => {
|
|
while ((command = dequeue(isMainThread))) {
|
|
if (
|
|
command.mode !== 'prompt' &&
|
|
command.mode !== 'orphaned-permission' &&
|
|
command.mode !== 'task-notification'
|
|
) {
|
|
throw new Error(
|
|
'only prompt commands are supported in streaming mode',
|
|
)
|
|
}
|
|
|
|
// Non-prompt commands (task-notification, orphaned-permission) carry
|
|
// side effects or orphanedPermission state, so they process singly.
|
|
// Prompt commands greedily collect followers with matching workload.
|
|
const batch: QueuedCommand[] = [command]
|
|
if (command.mode === 'prompt') {
|
|
while (canBatchWith(command, peek(isMainThread))) {
|
|
batch.push(dequeue(isMainThread)!)
|
|
}
|
|
if (batch.length > 1) {
|
|
command = {
|
|
...command,
|
|
value: joinPromptValues(batch.map(c => c.value)),
|
|
uuid: batch.findLast(c => c.uuid)?.uuid ?? command.uuid,
|
|
}
|
|
}
|
|
}
|
|
const batchUuids = batch.map(c => c.uuid).filter(u => u !== undefined)
|
|
|
|
// QueryEngine will emit a replay for command.uuid (the last uuid in
|
|
// the batch) via its messagesToAck path. Emit replays here for the
|
|
// rest so consumers that track per-uuid delivery (clank's
|
|
// asyncMessages footer, CCR) see an ack for every message they sent,
|
|
// not just the one that survived the merge.
|
|
if (options.replayUserMessages && batch.length > 1) {
|
|
for (const c of batch) {
|
|
if (c.uuid && c.uuid !== command.uuid) {
|
|
output.enqueue({
|
|
type: 'user',
|
|
message: { role: 'user', content: c.value },
|
|
session_id: getSessionId(),
|
|
parent_tool_use_id: null,
|
|
uuid: c.uuid,
|
|
isReplay: true,
|
|
} satisfies SDKUserMessageReplay)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Combine all MCP clients. appState.mcp is populated incrementally
|
|
// per-server by main.tsx (mirrors useManageMCPConnections). Reading
|
|
// fresh per-command means late-connecting servers are visible on the
|
|
// next turn. registerElicitationHandlers is idempotent (tracking set).
|
|
const appState = getAppState()
|
|
const allMcpClients = [
|
|
...appState.mcp.clients,
|
|
...sdkClients,
|
|
...dynamicMcpState.clients,
|
|
]
|
|
registerElicitationHandlers(allMcpClients)
|
|
// Channel handlers for servers allowlisted via --channels at
|
|
// construction time (or enableChannel() mid-session). Runs every
|
|
// turn like registerElicitationHandlers — idempotent per-client
|
|
// (setNotificationHandler replaces, not stacks) and no-ops for
|
|
// non-allowlisted servers (one feature-flag check).
|
|
for (const client of allMcpClients) {
|
|
reregisterChannelHandlerAfterReconnect(client)
|
|
}
|
|
|
|
const allTools = buildAllTools(appState)
|
|
|
|
for (const uuid of batchUuids) {
|
|
notifyCommandLifecycle(uuid, 'started')
|
|
}
|
|
|
|
// Task notifications arrive when background agents complete.
|
|
// Emit an SDK system event for SDK consumers, then fall through
|
|
// to ask() so the model sees the agent result and can act on it.
|
|
// This matches TUI behavior where useQueueProcessor always feeds
|
|
// notifications to the model regardless of coordinator mode.
|
|
if (command.mode === 'task-notification') {
|
|
const notificationText =
|
|
typeof command.value === 'string' ? command.value : ''
|
|
// Parse the XML-formatted notification
|
|
const taskIdMatch = notificationText.match(
|
|
/<task-id>([^<]+)<\/task-id>/,
|
|
)
|
|
const toolUseIdMatch = notificationText.match(
|
|
/<tool-use-id>([^<]+)<\/tool-use-id>/,
|
|
)
|
|
const outputFileMatch = notificationText.match(
|
|
/<output-file>([^<]+)<\/output-file>/,
|
|
)
|
|
const statusMatch = notificationText.match(
|
|
/<status>([^<]+)<\/status>/,
|
|
)
|
|
const summaryMatch = notificationText.match(
|
|
/<summary>([^<]+)<\/summary>/,
|
|
)
|
|
|
|
const isValidStatus = (
|
|
s: string | undefined,
|
|
): s is 'completed' | 'failed' | 'stopped' | 'killed' =>
|
|
s === 'completed' ||
|
|
s === 'failed' ||
|
|
s === 'stopped' ||
|
|
s === 'killed'
|
|
const rawStatus = statusMatch?.[1]
|
|
const status = isValidStatus(rawStatus)
|
|
? rawStatus === 'killed'
|
|
? 'stopped'
|
|
: rawStatus
|
|
: 'completed'
|
|
|
|
const usageMatch = notificationText.match(
|
|
/<usage>([\s\S]*?)<\/usage>/,
|
|
)
|
|
const usageContent = usageMatch?.[1] ?? ''
|
|
const totalTokensMatch = usageContent.match(
|
|
/<total_tokens>(\d+)<\/total_tokens>/,
|
|
)
|
|
const toolUsesMatch = usageContent.match(
|
|
/<tool_uses>(\d+)<\/tool_uses>/,
|
|
)
|
|
const durationMsMatch = usageContent.match(
|
|
/<duration_ms>(\d+)<\/duration_ms>/,
|
|
)
|
|
|
|
// Only emit a task_notification SDK event when a <status> tag is
|
|
// present — that means this is a terminal notification (completed/
|
|
// failed/stopped). Stream events from enqueueStreamEvent carry no
|
|
// <status> (they're progress pings); emitting them here would
|
|
// default to 'completed' and falsely close the task for SDK
|
|
// consumers. Terminal bookends are now emitted directly via
|
|
// emitTaskTerminatedSdk, so skipping statusless events is safe.
|
|
if (statusMatch) {
|
|
output.enqueue({
|
|
type: 'system',
|
|
subtype: 'task_notification',
|
|
task_id: taskIdMatch?.[1] ?? '',
|
|
tool_use_id: toolUseIdMatch?.[1],
|
|
status,
|
|
output_file: outputFileMatch?.[1] ?? '',
|
|
summary: summaryMatch?.[1] ?? '',
|
|
usage:
|
|
totalTokensMatch && toolUsesMatch
|
|
? {
|
|
total_tokens: parseInt(totalTokensMatch[1]!, 10),
|
|
tool_uses: parseInt(toolUsesMatch[1]!, 10),
|
|
duration_ms: durationMsMatch
|
|
? parseInt(durationMsMatch[1]!, 10)
|
|
: 0,
|
|
}
|
|
: undefined,
|
|
session_id: getSessionId(),
|
|
uuid: randomUUID(),
|
|
})
|
|
}
|
|
// No continue -- fall through to ask() so the model processes the result
|
|
}
|
|
|
|
const input = command.value
|
|
|
|
if (structuredIO instanceof RemoteIO && command.mode === 'prompt') {
|
|
logEvent('tengu_bridge_message_received', {
|
|
is_repl: false,
|
|
})
|
|
}
|
|
|
|
// Abort any in-flight suggestion generation and track acceptance
|
|
suggestionState.abortController?.abort()
|
|
suggestionState.abortController = null
|
|
suggestionState.pendingSuggestion = null
|
|
suggestionState.pendingLastEmittedEntry = null
|
|
if (suggestionState.lastEmitted) {
|
|
if (command.mode === 'prompt') {
|
|
// SDK user messages enqueue ContentBlockParam[], not a plain string
|
|
const inputText =
|
|
typeof input === 'string'
|
|
? input
|
|
: (
|
|
input.find(b => b.type === 'text') as
|
|
| { type: 'text'; text: string }
|
|
| undefined
|
|
)?.text
|
|
if (typeof inputText === 'string') {
|
|
logSuggestionOutcome(
|
|
suggestionState.lastEmitted.text,
|
|
inputText,
|
|
suggestionState.lastEmitted.emittedAt,
|
|
suggestionState.lastEmitted.promptId,
|
|
suggestionState.lastEmitted.generationRequestId,
|
|
)
|
|
}
|
|
suggestionState.lastEmitted = null
|
|
}
|
|
}
|
|
|
|
abortController = createAbortController()
|
|
const turnStartTime = feature('FILE_PERSISTENCE')
|
|
? Date.now()
|
|
: undefined
|
|
|
|
headlessProfilerCheckpoint('before_ask')
|
|
startQueryProfile()
|
|
// Per-iteration ALS context so bg agents spawned inside ask()
|
|
// inherit workload across their detached awaits. In-process cron
|
|
// stamps cmd.workload; the SDK --workload flag is options.workload.
|
|
// const-capture: TS loses `while ((command = dequeue()))` narrowing
|
|
// inside the closure.
|
|
const cmd = command
|
|
await runWithWorkload(cmd.workload ?? options.workload, async () => {
|
|
for await (const message of ask({
|
|
commands: uniqBy(
|
|
[...currentCommands, ...appState.mcp.commands],
|
|
'name',
|
|
),
|
|
prompt: input,
|
|
promptUuid: cmd.uuid,
|
|
isMeta: cmd.isMeta,
|
|
cwd: cwd(),
|
|
tools: allTools,
|
|
verbose: options.verbose,
|
|
mcpClients: allMcpClients,
|
|
thinkingConfig: options.thinkingConfig,
|
|
maxTurns: options.maxTurns,
|
|
maxBudgetUsd: options.maxBudgetUsd,
|
|
taskBudget: options.taskBudget,
|
|
canUseTool,
|
|
userSpecifiedModel: activeUserSpecifiedModel,
|
|
fallbackModel: options.fallbackModel,
|
|
jsonSchema: getInitJsonSchema() ?? options.jsonSchema,
|
|
mutableMessages,
|
|
getReadFileCache: () =>
|
|
pendingSeeds.size === 0
|
|
? readFileState
|
|
: mergeFileStateCaches(readFileState, pendingSeeds),
|
|
setReadFileCache: cache => {
|
|
readFileState = cache
|
|
for (const [path, seed] of pendingSeeds.entries()) {
|
|
const existing = readFileState.get(path)
|
|
if (!existing || seed.timestamp > existing.timestamp) {
|
|
readFileState.set(path, seed)
|
|
}
|
|
}
|
|
pendingSeeds.clear()
|
|
},
|
|
customSystemPrompt: options.systemPrompt,
|
|
appendSystemPrompt: options.appendSystemPrompt,
|
|
getAppState,
|
|
setAppState,
|
|
abortController,
|
|
replayUserMessages: options.replayUserMessages,
|
|
includePartialMessages: options.includePartialMessages,
|
|
handleElicitation: (serverName, params, elicitSignal) =>
|
|
structuredIO.handleElicitation(
|
|
serverName,
|
|
params.message,
|
|
undefined,
|
|
elicitSignal,
|
|
params.mode,
|
|
params.url,
|
|
'elicitationId' in params ? params.elicitationId : undefined,
|
|
),
|
|
agents: currentAgents,
|
|
orphanedPermission: cmd.orphanedPermission,
|
|
setSDKStatus: status => {
|
|
output.enqueue({
|
|
type: 'system',
|
|
subtype: 'status',
|
|
status,
|
|
session_id: getSessionId(),
|
|
uuid: randomUUID(),
|
|
})
|
|
},
|
|
})) {
|
|
// Forward messages to bridge incrementally (mid-turn) so
|
|
// claude.ai sees progress and the connection stays alive
|
|
// while blocked on permission requests.
|
|
forwardMessagesToBridge()
|
|
|
|
if (message.type === 'result') {
|
|
// Flush pending SDK events so they appear before result on the stream.
|
|
for (const event of drainSdkEvents()) {
|
|
output.enqueue(event)
|
|
}
|
|
|
|
// Hold-back: don't emit result while background agents are running
|
|
const currentState = getAppState()
|
|
if (
|
|
getRunningTasks(currentState).some(
|
|
t =>
|
|
(t.type === 'local_agent' ||
|
|
t.type === 'local_workflow') &&
|
|
isBackgroundTask(t),
|
|
)
|
|
) {
|
|
heldBackResult = message
|
|
} else {
|
|
heldBackResult = null
|
|
output.enqueue(message)
|
|
}
|
|
} else {
|
|
// Flush SDK events (task_started, task_progress) so background
|
|
// agent progress is streamed in real-time, not batched until result.
|
|
for (const event of drainSdkEvents()) {
|
|
output.enqueue(event)
|
|
}
|
|
output.enqueue(message)
|
|
}
|
|
}
|
|
}) // end runWithWorkload
|
|
|
|
for (const uuid of batchUuids) {
|
|
notifyCommandLifecycle(uuid, 'completed')
|
|
}
|
|
|
|
// Forward messages to bridge after each turn
|
|
forwardMessagesToBridge()
|
|
bridgeHandle?.sendResult()
|
|
|
|
if (feature('FILE_PERSISTENCE') && turnStartTime !== undefined) {
|
|
void executeFilePersistence(
|
|
turnStartTime,
|
|
abortController.signal,
|
|
result => {
|
|
output.enqueue({
|
|
type: 'system' as const,
|
|
subtype: 'files_persisted' as const,
|
|
files: result.files,
|
|
failed: result.failed,
|
|
processed_at: new Date().toISOString(),
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
})
|
|
},
|
|
)
|
|
}
|
|
|
|
// Generate and emit prompt suggestion for SDK consumers
|
|
if (
|
|
options.promptSuggestions &&
|
|
!isEnvDefinedFalsy(process.env.CLAUDE_CODE_ENABLE_PROMPT_SUGGESTION)
|
|
) {
|
|
// TS narrows suggestionState to never in the while loop body;
|
|
// cast via unknown to reset narrowing.
|
|
const state = suggestionState as unknown as typeof suggestionState
|
|
state.abortController?.abort()
|
|
const localAbort = new AbortController()
|
|
suggestionState.abortController = localAbort
|
|
|
|
const cacheSafeParams = getLastCacheSafeParams()
|
|
if (!cacheSafeParams) {
|
|
logSuggestionSuppressed(
|
|
'sdk_no_params',
|
|
undefined,
|
|
undefined,
|
|
'sdk',
|
|
)
|
|
} else {
|
|
// Use a ref object so the IIFE's finally can compare against its own
|
|
// promise without a self-reference (which upsets TypeScript's flow analysis).
|
|
const ref: { promise: Promise<void> | null } = { promise: null }
|
|
ref.promise = (async () => {
|
|
try {
|
|
const result = await tryGenerateSuggestion(
|
|
localAbort,
|
|
mutableMessages,
|
|
getAppState,
|
|
cacheSafeParams,
|
|
'sdk',
|
|
)
|
|
if (!result || localAbort.signal.aborted) return
|
|
const suggestionMsg = {
|
|
type: 'prompt_suggestion' as const,
|
|
suggestion: result.suggestion,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
}
|
|
const lastEmittedEntry = {
|
|
text: result.suggestion,
|
|
emittedAt: Date.now(),
|
|
promptId: result.promptId,
|
|
generationRequestId: result.generationRequestId,
|
|
}
|
|
// Defer emission if the result is being held for background agents,
|
|
// so that prompt_suggestion always arrives after result.
|
|
// Only set lastEmitted when the suggestion is actually delivered
|
|
// to the consumer; deferred suggestions may be discarded before
|
|
// delivery if a new command arrives first.
|
|
if (heldBackResult) {
|
|
suggestionState.pendingSuggestion = suggestionMsg
|
|
suggestionState.pendingLastEmittedEntry = {
|
|
text: lastEmittedEntry.text,
|
|
promptId: lastEmittedEntry.promptId,
|
|
generationRequestId: lastEmittedEntry.generationRequestId,
|
|
}
|
|
} else {
|
|
suggestionState.lastEmitted = lastEmittedEntry
|
|
output.enqueue(suggestionMsg)
|
|
}
|
|
} catch (error) {
|
|
if (
|
|
error instanceof Error &&
|
|
(error.name === 'AbortError' ||
|
|
error.name === 'APIUserAbortError')
|
|
) {
|
|
logSuggestionSuppressed(
|
|
'aborted',
|
|
undefined,
|
|
undefined,
|
|
'sdk',
|
|
)
|
|
return
|
|
}
|
|
logError(toError(error))
|
|
} finally {
|
|
if (suggestionState.inflightPromise === ref.promise) {
|
|
suggestionState.inflightPromise = null
|
|
}
|
|
}
|
|
})()
|
|
suggestionState.inflightPromise = ref.promise
|
|
}
|
|
}
|
|
|
|
// Log headless profiler metrics for this turn and start next turn
|
|
logHeadlessProfilerTurn()
|
|
logQueryProfileReport()
|
|
headlessProfilerStartTurn()
|
|
}
|
|
}
|
|
|
|
// Use a do-while loop to drain commands and then wait for any
|
|
// background agents that are still running. When agents complete,
|
|
// their notifications are enqueued and the loop re-drains.
|
|
do {
|
|
// Drain SDK events (task_started, task_progress) before command queue
|
|
// so progress events precede task_notification on the stream.
|
|
for (const event of drainSdkEvents()) {
|
|
output.enqueue(event)
|
|
}
|
|
|
|
runPhase = 'draining_commands'
|
|
await drainCommandQueue()
|
|
|
|
// Check for running background tasks before exiting.
|
|
// Exclude in_process_teammate — teammates are long-lived by design
|
|
// (status: 'running' for their whole lifetime, cleaned up by the
|
|
// shutdown protocol, not by transitioning to 'completed'). Waiting
|
|
// on them here loops forever (gh-30008). Same exclusion already
|
|
// exists at useBackgroundTaskNavigation.ts:55 for the same reason;
|
|
// L1839 above is already narrower (type === 'local_agent') so it
|
|
// doesn't hit this.
|
|
waitingForAgents = false
|
|
{
|
|
const state = getAppState()
|
|
const hasRunningBg = getRunningTasks(state).some(
|
|
t => isBackgroundTask(t) && t.type !== 'in_process_teammate',
|
|
)
|
|
const hasMainThreadQueued = peek(isMainThread) !== undefined
|
|
if (hasRunningBg || hasMainThreadQueued) {
|
|
waitingForAgents = true
|
|
if (!hasMainThreadQueued) {
|
|
runPhase = 'waiting_for_agents'
|
|
// No commands ready yet, wait for tasks to complete
|
|
await sleep(100)
|
|
}
|
|
// Loop back to drain any newly queued commands
|
|
}
|
|
}
|
|
} while (waitingForAgents)
|
|
|
|
if (heldBackResult) {
|
|
output.enqueue(heldBackResult)
|
|
heldBackResult = null
|
|
if (suggestionState.pendingSuggestion) {
|
|
output.enqueue(suggestionState.pendingSuggestion)
|
|
// Now that the suggestion is actually delivered, record it for acceptance tracking
|
|
if (suggestionState.pendingLastEmittedEntry) {
|
|
suggestionState.lastEmitted = {
|
|
...suggestionState.pendingLastEmittedEntry,
|
|
emittedAt: Date.now(),
|
|
}
|
|
suggestionState.pendingLastEmittedEntry = null
|
|
}
|
|
suggestionState.pendingSuggestion = null
|
|
}
|
|
}
|
|
} catch (error) {
|
|
// Emit error result message before shutting down
|
|
// Write directly to structuredIO to ensure immediate delivery
|
|
try {
|
|
await structuredIO.write({
|
|
type: 'result',
|
|
subtype: 'error_during_execution',
|
|
duration_ms: 0,
|
|
duration_api_ms: 0,
|
|
is_error: true,
|
|
num_turns: 0,
|
|
stop_reason: null,
|
|
session_id: getSessionId(),
|
|
total_cost_usd: 0,
|
|
usage: EMPTY_USAGE,
|
|
modelUsage: {},
|
|
permission_denials: [],
|
|
uuid: randomUUID(),
|
|
errors: [
|
|
errorMessage(error),
|
|
...getInMemoryErrors().map(_ => _.error),
|
|
],
|
|
})
|
|
} catch {
|
|
// If we can't emit the error result, continue with shutdown anyway
|
|
}
|
|
suggestionState.abortController?.abort()
|
|
gracefulShutdownSync(1)
|
|
return
|
|
} finally {
|
|
runPhase = 'finally_flush'
|
|
// Flush pending internal events before going idle
|
|
await structuredIO.flushInternalEvents()
|
|
runPhase = 'finally_post_flush'
|
|
if (!isShuttingDown()) {
|
|
notifySessionStateChanged('idle')
|
|
// Drain so the idle session_state_changed SDK event (plus any
|
|
// terminal task_notification bookends emitted during bg-agent
|
|
// teardown) reach the output stream before we block on the next
|
|
// command. The do-while drain above only runs while
|
|
// waitingForAgents; once we're here the next drain would be the
|
|
// top of the next run(), which won't come if input is idle.
|
|
for (const event of drainSdkEvents()) {
|
|
output.enqueue(event)
|
|
}
|
|
}
|
|
running = false
|
|
// Start idle timer when we finish processing and are waiting for input
|
|
idleTimeout.start()
|
|
}
|
|
|
|
// Proactive tick: if proactive is active and queue is empty, inject a tick
|
|
if (
|
|
(feature('PROACTIVE') || feature('KAIROS')) &&
|
|
proactiveModule?.isProactiveActive() &&
|
|
!proactiveModule.isProactivePaused()
|
|
) {
|
|
if (peek(isMainThread) === undefined && !inputClosed) {
|
|
scheduleProactiveTick!()
|
|
return
|
|
}
|
|
}
|
|
|
|
// Re-check the queue after releasing the mutex. A message may have
|
|
// arrived (and called run()) between the last dequeue() returning
|
|
// undefined and `running = false` above. In that case the caller
|
|
// saw `running === true` and returned immediately, leaving the
|
|
// message stranded in the queue with no one to process it.
|
|
if (peek(isMainThread) !== undefined) {
|
|
void run()
|
|
return
|
|
}
|
|
|
|
// Check for unread teammate messages and process them
|
|
// This mirrors what useInboxPoller does in interactive REPL mode
|
|
// Poll until no more messages (teammates may still be working)
|
|
{
|
|
const currentAppState = getAppState()
|
|
const teamContext = currentAppState.teamContext
|
|
|
|
if (teamContext && isTeamLead(teamContext)) {
|
|
const agentName = 'team-lead'
|
|
|
|
// Poll for messages while teammates are active
|
|
// This is needed because teammates may send messages while we're waiting
|
|
// Keep polling until the team is shut down
|
|
const POLL_INTERVAL_MS = 500
|
|
|
|
while (true) {
|
|
// Check if teammates are still active
|
|
const refreshedState = getAppState()
|
|
const hasActiveTeammates =
|
|
hasActiveInProcessTeammates(refreshedState) ||
|
|
(refreshedState.teamContext &&
|
|
Object.keys(refreshedState.teamContext.teammates).length > 0)
|
|
|
|
if (!hasActiveTeammates) {
|
|
logForDebugging(
|
|
'[print.ts] No more active teammates, stopping poll',
|
|
)
|
|
break
|
|
}
|
|
|
|
const unread = await readUnreadMessages(
|
|
agentName,
|
|
refreshedState.teamContext?.teamName,
|
|
)
|
|
|
|
if (unread.length > 0) {
|
|
logForDebugging(
|
|
`[print.ts] Team-lead found ${unread.length} unread messages`,
|
|
)
|
|
|
|
// Mark as read immediately to avoid duplicate processing
|
|
await markMessagesAsRead(
|
|
agentName,
|
|
refreshedState.teamContext?.teamName,
|
|
)
|
|
|
|
// Process shutdown_approved messages - remove teammates from team file
|
|
// This mirrors what useInboxPoller does in interactive mode (lines 546-606)
|
|
const teamName = refreshedState.teamContext?.teamName
|
|
for (const m of unread) {
|
|
const shutdownApproval = isShutdownApproved(m.text)
|
|
if (shutdownApproval && teamName) {
|
|
const teammateToRemove = shutdownApproval.from
|
|
logForDebugging(
|
|
`[print.ts] Processing shutdown_approved from ${teammateToRemove}`,
|
|
)
|
|
|
|
// Find the teammate ID by name
|
|
const teammateId = refreshedState.teamContext?.teammates
|
|
? Object.entries(refreshedState.teamContext.teammates).find(
|
|
([, t]) => t.name === teammateToRemove,
|
|
)?.[0]
|
|
: undefined
|
|
|
|
if (teammateId) {
|
|
// Remove from team file
|
|
removeTeammateFromTeamFile(teamName, {
|
|
agentId: teammateId,
|
|
name: teammateToRemove,
|
|
})
|
|
logForDebugging(
|
|
`[print.ts] Removed ${teammateToRemove} from team file`,
|
|
)
|
|
|
|
// Unassign tasks owned by this teammate
|
|
await unassignTeammateTasks(
|
|
teamName,
|
|
teammateId,
|
|
teammateToRemove,
|
|
'shutdown',
|
|
)
|
|
|
|
// Remove from teamContext in AppState
|
|
setAppState(prev => {
|
|
if (!prev.teamContext?.teammates) return prev
|
|
if (!(teammateId in prev.teamContext.teammates)) return prev
|
|
const { [teammateId]: _, ...remainingTeammates } =
|
|
prev.teamContext.teammates
|
|
return {
|
|
...prev,
|
|
teamContext: {
|
|
...prev.teamContext,
|
|
teammates: remainingTeammates,
|
|
},
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Format messages same as useInboxPoller
|
|
const formatted = unread
|
|
.map(
|
|
(m: { from: string; text: string; color?: string }) =>
|
|
`<${TEAMMATE_MESSAGE_TAG} teammate_id="${m.from}"${m.color ? ` color="${m.color}"` : ''}>\n${m.text}\n</${TEAMMATE_MESSAGE_TAG}>`,
|
|
)
|
|
.join('\n\n')
|
|
|
|
// Enqueue and process
|
|
enqueue({
|
|
mode: 'prompt',
|
|
value: formatted,
|
|
uuid: randomUUID(),
|
|
})
|
|
void run()
|
|
return // run() will come back here after processing
|
|
}
|
|
|
|
// No messages - check if we need to prompt for shutdown
|
|
// If input is closed and teammates are active, inject shutdown prompt once
|
|
if (inputClosed && !shutdownPromptInjected) {
|
|
shutdownPromptInjected = true
|
|
logForDebugging(
|
|
'[print.ts] Input closed with active teammates, injecting shutdown prompt',
|
|
)
|
|
enqueue({
|
|
mode: 'prompt',
|
|
value: SHUTDOWN_TEAM_PROMPT,
|
|
uuid: randomUUID(),
|
|
})
|
|
void run()
|
|
return // run() will come back here after processing
|
|
}
|
|
|
|
// Wait and check again
|
|
await sleep(POLL_INTERVAL_MS)
|
|
}
|
|
}
|
|
}
|
|
|
|
if (inputClosed) {
|
|
// Check for active swarm that needs shutdown
|
|
const hasActiveSwarm = await (async () => {
|
|
// Wait for any working in-process team members to finish
|
|
const currentAppState = getAppState()
|
|
if (hasWorkingInProcessTeammates(currentAppState)) {
|
|
await waitForTeammatesToBecomeIdle(setAppState, currentAppState)
|
|
}
|
|
|
|
// Re-fetch state after potential wait
|
|
const refreshedAppState = getAppState()
|
|
const refreshedTeamContext = refreshedAppState.teamContext
|
|
const hasTeamMembersNotCleanedUp =
|
|
refreshedTeamContext &&
|
|
Object.keys(refreshedTeamContext.teammates).length > 0
|
|
|
|
return (
|
|
hasTeamMembersNotCleanedUp ||
|
|
hasActiveInProcessTeammates(refreshedAppState)
|
|
)
|
|
})()
|
|
|
|
if (hasActiveSwarm) {
|
|
// Team members are idle or pane-based - inject prompt to shut down team
|
|
enqueue({
|
|
mode: 'prompt',
|
|
value: SHUTDOWN_TEAM_PROMPT,
|
|
uuid: randomUUID(),
|
|
})
|
|
void run()
|
|
} else {
|
|
// Wait for any in-flight push suggestion before closing the output stream.
|
|
if (suggestionState.inflightPromise) {
|
|
await Promise.race([suggestionState.inflightPromise, sleep(5000)])
|
|
}
|
|
suggestionState.abortController?.abort()
|
|
suggestionState.abortController = null
|
|
await finalizePendingAsyncHooks()
|
|
unsubscribeSkillChanges()
|
|
unsubscribeAuthStatus?.()
|
|
statusListeners.delete(rateLimitListener)
|
|
output.done()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Set up UDS inbox callback so the query loop is kicked off
|
|
// when a message arrives via the UDS socket in headless mode.
|
|
if (feature('UDS_INBOX')) {
|
|
/* eslint-disable @typescript-eslint/no-require-imports */
|
|
const { setOnEnqueue } = require('../utils/udsMessaging.js')
|
|
/* eslint-enable @typescript-eslint/no-require-imports */
|
|
setOnEnqueue(() => {
|
|
if (!inputClosed) {
|
|
void run()
|
|
}
|
|
})
|
|
}
|
|
|
|
// Cron scheduler: runs scheduled_tasks.json tasks in SDK/-p mode.
|
|
// Mirrors REPL's useScheduledTasks hook. Fired prompts enqueue + kick
|
|
// off run() directly — unlike REPL, there's no queue subscriber here
|
|
// that drains on enqueue while idle. The run() mutex makes this safe
|
|
// during an active turn: the call no-ops and the post-run recheck at
|
|
// the end of run() picks up the queued command.
|
|
let cronScheduler: import('../utils/cronScheduler.js').CronScheduler | null =
|
|
null
|
|
if (
|
|
feature('AGENT_TRIGGERS') &&
|
|
cronSchedulerModule &&
|
|
cronGate?.isKairosCronEnabled()
|
|
) {
|
|
cronScheduler = cronSchedulerModule.createCronScheduler({
|
|
onFire: prompt => {
|
|
if (inputClosed) return
|
|
enqueue({
|
|
mode: 'prompt',
|
|
value: prompt,
|
|
uuid: randomUUID(),
|
|
priority: 'later',
|
|
// System-generated — matches useScheduledTasks.ts REPL equivalent.
|
|
// Without this, messages.ts metaProp eval is {} → prompt leaks
|
|
// into visible transcript when cron fires mid-turn in -p mode.
|
|
isMeta: true,
|
|
// Threaded to cc_workload= in the billing-header attribution block
|
|
// so the API can serve cron requests at lower QoS. drainCommandQueue
|
|
// reads this per-iteration and hoists it into bootstrap state for
|
|
// the ask() call.
|
|
workload: WORKLOAD_CRON,
|
|
})
|
|
void run()
|
|
},
|
|
isLoading: () => running || inputClosed,
|
|
getJitterConfig: cronJitterConfigModule?.getCronJitterConfig,
|
|
isKilled: () => !cronGate?.isKairosCronEnabled(),
|
|
})
|
|
cronScheduler.start()
|
|
}
|
|
|
|
const sendControlResponseSuccess = function (
|
|
message: SDKControlRequest,
|
|
response?: Record<string, unknown>,
|
|
) {
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: {
|
|
subtype: 'success',
|
|
request_id: message.request_id,
|
|
response: response,
|
|
},
|
|
})
|
|
}
|
|
|
|
const sendControlResponseError = function (
|
|
message: SDKControlRequest,
|
|
errorMessage: string,
|
|
) {
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: {
|
|
subtype: 'error',
|
|
request_id: message.request_id,
|
|
error: errorMessage,
|
|
},
|
|
})
|
|
}
|
|
|
|
// Handle unexpected permission responses by looking up the unresolved tool
|
|
// call in the transcript and executing it
|
|
const handledOrphanedToolUseIds = new Set<string>()
|
|
structuredIO.setUnexpectedResponseCallback(async message => {
|
|
await handleOrphanedPermissionResponse({
|
|
message,
|
|
setAppState,
|
|
handledToolUseIds: handledOrphanedToolUseIds,
|
|
onEnqueued: () => {
|
|
// The first message of a session might be the orphaned permission
|
|
// check rather than a user prompt, so kick off the loop.
|
|
void run()
|
|
},
|
|
})
|
|
})
|
|
|
|
// Track active OAuth flows per server so we can abort a previous flow
|
|
// when a new mcp_authenticate request arrives for the same server.
|
|
const activeOAuthFlows = new Map<string, AbortController>()
|
|
// Track manual callback URL submit functions for active OAuth flows.
|
|
// Used when localhost is not reachable (e.g., browser-based IDEs).
|
|
const oauthCallbackSubmitters = new Map<
|
|
string,
|
|
(callbackUrl: string) => void
|
|
>()
|
|
// Track servers where the manual callback was actually invoked (so the
|
|
// automatic reconnect path knows to skip — the extension will reconnect).
|
|
const oauthManualCallbackUsed = new Set<string>()
|
|
// Track OAuth auth-only promises so mcp_oauth_callback_url can await
|
|
// token exchange completion. Reconnect is handled separately by the
|
|
// extension via handleAuthDone → mcp_reconnect.
|
|
const oauthAuthPromises = new Map<string, Promise<void>>()
|
|
|
|
// In-flight Anthropic OAuth flow (claude_authenticate). Single-slot: a
|
|
// second authenticate request cleans up the first. The service holds the
|
|
// PKCE verifier + localhost listener; the promise settles after
|
|
// installOAuthTokens — after it resolves, the in-process memoized token
|
|
// cache is already cleared and the next API call picks up the new creds.
|
|
let claudeOAuth: {
|
|
service: OAuthService
|
|
flow: Promise<void>
|
|
} | null = null
|
|
|
|
// This is essentially spawning a parallel async task- we have two
|
|
// running in parallel- one reading from stdin and adding to the
|
|
// queue to be processed and another reading from the queue,
|
|
// processing and returning the result of the generation.
|
|
// The process is complete when the input stream completes and
|
|
// the last generation of the queue has complete.
|
|
void (async () => {
|
|
let initialized = false
|
|
logForDiagnosticsNoPII('info', 'cli_message_loop_started')
|
|
for await (const message of structuredIO.structuredInput) {
|
|
// Non-user events are handled inline (no queue). started→completed in
|
|
// the same tick carries no information, so only fire completed.
|
|
// control_response is reported by StructuredIO.processLine (which also
|
|
// sees orphans that never yield here).
|
|
const eventId = 'uuid' in message ? message.uuid : undefined
|
|
if (
|
|
eventId &&
|
|
message.type !== 'user' &&
|
|
message.type !== 'control_response'
|
|
) {
|
|
notifyCommandLifecycle(eventId, 'completed')
|
|
}
|
|
|
|
if (message.type === 'control_request') {
|
|
if (message.request.subtype === 'interrupt') {
|
|
// Track escapes for attribution (ant-only feature)
|
|
if (feature('COMMIT_ATTRIBUTION')) {
|
|
setAppState(prev => ({
|
|
...prev,
|
|
attribution: {
|
|
...prev.attribution,
|
|
escapeCount: prev.attribution.escapeCount + 1,
|
|
},
|
|
}))
|
|
}
|
|
if (abortController) {
|
|
abortController.abort()
|
|
}
|
|
suggestionState.abortController?.abort()
|
|
suggestionState.abortController = null
|
|
suggestionState.lastEmitted = null
|
|
suggestionState.pendingSuggestion = null
|
|
sendControlResponseSuccess(message)
|
|
} else if (message.request.subtype === 'end_session') {
|
|
logForDebugging(
|
|
`[print.ts] end_session received, reason=${message.request.reason ?? 'unspecified'}`,
|
|
)
|
|
if (abortController) {
|
|
abortController.abort()
|
|
}
|
|
suggestionState.abortController?.abort()
|
|
suggestionState.abortController = null
|
|
suggestionState.lastEmitted = null
|
|
suggestionState.pendingSuggestion = null
|
|
sendControlResponseSuccess(message)
|
|
break // exits for-await → falls through to inputClosed=true drain below
|
|
} else if (message.request.subtype === 'initialize') {
|
|
// SDK MCP server names from the initialize message
|
|
// Populated by both browser and ProcessTransport sessions
|
|
if (
|
|
message.request.sdkMcpServers &&
|
|
message.request.sdkMcpServers.length > 0
|
|
) {
|
|
for (const serverName of message.request.sdkMcpServers) {
|
|
// Create placeholder config for SDK MCP servers
|
|
// The actual server connection is managed by the SDK Query class
|
|
sdkMcpConfigs[serverName] = {
|
|
type: 'sdk',
|
|
name: serverName,
|
|
}
|
|
}
|
|
}
|
|
|
|
await handleInitializeRequest(
|
|
message.request,
|
|
message.request_id,
|
|
initialized,
|
|
output,
|
|
commands,
|
|
modelInfos,
|
|
structuredIO,
|
|
!!options.enableAuthStatus,
|
|
options,
|
|
agents,
|
|
getAppState,
|
|
)
|
|
|
|
// Enable prompt suggestions in AppState when SDK consumer opts in.
|
|
// shouldEnablePromptSuggestion() returns false for non-interactive
|
|
// sessions, but the SDK consumer explicitly requested suggestions.
|
|
if (message.request.promptSuggestions) {
|
|
setAppState(prev => {
|
|
if (prev.promptSuggestionEnabled) return prev
|
|
return { ...prev, promptSuggestionEnabled: true }
|
|
})
|
|
}
|
|
|
|
if (
|
|
message.request.agentProgressSummaries &&
|
|
getFeatureValue_CACHED_MAY_BE_STALE('tengu_slate_prism', true)
|
|
) {
|
|
setSdkAgentProgressSummariesEnabled(true)
|
|
}
|
|
|
|
initialized = true
|
|
|
|
// If the auto-resume logic pre-enqueued a command, drain it now
|
|
// that initialize has set up systemPrompt, agents, hooks, etc.
|
|
if (hasCommandsInQueue()) {
|
|
void run()
|
|
}
|
|
} else if (message.request.subtype === 'set_permission_mode') {
|
|
const m = message.request // for typescript (TODO: use readonly types to avoid this)
|
|
setAppState(prev => ({
|
|
...prev,
|
|
toolPermissionContext: handleSetPermissionMode(
|
|
m,
|
|
message.request_id,
|
|
prev.toolPermissionContext,
|
|
output,
|
|
),
|
|
isUltraplanMode: m.ultraplan ?? prev.isUltraplanMode,
|
|
}))
|
|
// handleSetPermissionMode sends the control_response; the
|
|
// notifySessionMetadataChanged that used to follow here is
|
|
// now fired by onChangeAppState (with externalized mode name).
|
|
} else if (message.request.subtype === 'set_model') {
|
|
const requestedModel = message.request.model ?? 'default'
|
|
const model =
|
|
requestedModel === 'default'
|
|
? getDefaultMainLoopModel()
|
|
: requestedModel
|
|
activeUserSpecifiedModel = model
|
|
setMainLoopModelOverride(model)
|
|
notifySessionMetadataChanged({ model })
|
|
injectModelSwitchBreadcrumbs(requestedModel, model)
|
|
|
|
sendControlResponseSuccess(message)
|
|
} else if (message.request.subtype === 'set_max_thinking_tokens') {
|
|
if (message.request.max_thinking_tokens === null) {
|
|
options.thinkingConfig = undefined
|
|
} else if (message.request.max_thinking_tokens === 0) {
|
|
options.thinkingConfig = { type: 'disabled' }
|
|
} else {
|
|
options.thinkingConfig = {
|
|
type: 'enabled',
|
|
budgetTokens: message.request.max_thinking_tokens,
|
|
}
|
|
}
|
|
sendControlResponseSuccess(message)
|
|
} else if (message.request.subtype === 'mcp_status') {
|
|
sendControlResponseSuccess(message, {
|
|
mcpServers: buildMcpServerStatuses(),
|
|
})
|
|
} else if (message.request.subtype === 'get_context_usage') {
|
|
try {
|
|
const appState = getAppState()
|
|
const data = await collectContextData({
|
|
messages: mutableMessages,
|
|
getAppState,
|
|
options: {
|
|
mainLoopModel: getMainLoopModel(),
|
|
tools: buildAllTools(appState),
|
|
agentDefinitions: appState.agentDefinitions,
|
|
customSystemPrompt: options.systemPrompt,
|
|
appendSystemPrompt: options.appendSystemPrompt,
|
|
},
|
|
})
|
|
sendControlResponseSuccess(message, { ...data })
|
|
} catch (error) {
|
|
sendControlResponseError(message, errorMessage(error))
|
|
}
|
|
} else if (message.request.subtype === 'mcp_message') {
|
|
// Handle MCP notifications from SDK servers
|
|
const mcpRequest = message.request
|
|
const sdkClient = sdkClients.find(
|
|
client => client.name === mcpRequest.server_name,
|
|
)
|
|
// Check client exists - dynamically added SDK servers may have
|
|
// placeholder clients with null client until updateSdkMcp() runs
|
|
if (
|
|
sdkClient &&
|
|
sdkClient.type === 'connected' &&
|
|
sdkClient.client?.transport?.onmessage
|
|
) {
|
|
sdkClient.client.transport.onmessage(mcpRequest.message)
|
|
}
|
|
sendControlResponseSuccess(message)
|
|
} else if (message.request.subtype === 'rewind_files') {
|
|
const appState = getAppState()
|
|
const result = await handleRewindFiles(
|
|
message.request.user_message_id as UUID,
|
|
appState,
|
|
setAppState,
|
|
message.request.dry_run ?? false,
|
|
)
|
|
if (result.canRewind || message.request.dry_run) {
|
|
sendControlResponseSuccess(message, result)
|
|
} else {
|
|
sendControlResponseError(
|
|
message,
|
|
result.error ?? 'Unexpected error',
|
|
)
|
|
}
|
|
} else if (message.request.subtype === 'cancel_async_message') {
|
|
const targetUuid = message.request.message_uuid
|
|
const removed = dequeueAllMatching(cmd => cmd.uuid === targetUuid)
|
|
sendControlResponseSuccess(message, {
|
|
cancelled: removed.length > 0,
|
|
})
|
|
} else if (message.request.subtype === 'seed_read_state') {
|
|
// Client observed a Read that was later removed from context (e.g.
|
|
// by snip), so transcript-based seeding missed it. Queued into
|
|
// pendingSeeds; applied at the next clone-replace boundary.
|
|
try {
|
|
// expandPath: all other readFileState writers normalize (~, relative,
|
|
// session cwd vs process cwd). FileEditTool looks up by expandPath'd
|
|
// key — a verbatim client path would miss.
|
|
const normalizedPath = expandPath(message.request.path)
|
|
// Check disk mtime before reading content. If the file changed
|
|
// since the client's observation, readFile would return C_current
|
|
// but we'd store it with the client's M_observed — getChangedFiles
|
|
// then sees disk > cache.timestamp, re-reads, diffs C_current vs
|
|
// C_current = empty, emits no attachment, and the model is never
|
|
// told about the C_observed → C_current change. Skipping the seed
|
|
// makes Edit fail "file not read yet" → forces a fresh Read.
|
|
// Math.floor matches FileReadTool and getFileModificationTime.
|
|
const diskMtime = Math.floor((await stat(normalizedPath)).mtimeMs)
|
|
if (diskMtime <= message.request.mtime) {
|
|
const raw = await readFile(normalizedPath, 'utf-8')
|
|
// Strip BOM + normalize CRLF→LF to match readFileInRange and
|
|
// readFileSyncWithMetadata. FileEditTool's content-compare
|
|
// fallback (for Windows mtime bumps without content change)
|
|
// compares against LF-normalized disk reads.
|
|
const content = (
|
|
raw.charCodeAt(0) === 0xfeff ? raw.slice(1) : raw
|
|
).replaceAll('\r\n', '\n')
|
|
pendingSeeds.set(normalizedPath, {
|
|
content,
|
|
timestamp: diskMtime,
|
|
offset: undefined,
|
|
limit: undefined,
|
|
})
|
|
}
|
|
} catch {
|
|
// ENOENT etc — skip seeding but still succeed
|
|
}
|
|
sendControlResponseSuccess(message)
|
|
} else if (message.request.subtype === 'mcp_set_servers') {
|
|
const { response, sdkServersChanged } = await applyMcpServerChanges(
|
|
message.request.servers,
|
|
)
|
|
sendControlResponseSuccess(message, response)
|
|
|
|
// Connect SDK servers AFTER response to avoid deadlock
|
|
if (sdkServersChanged) {
|
|
void updateSdkMcp()
|
|
}
|
|
} else if (message.request.subtype === 'reload_plugins') {
|
|
try {
|
|
if (
|
|
feature('DOWNLOAD_USER_SETTINGS') &&
|
|
(isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) || getIsRemoteMode())
|
|
) {
|
|
// Re-pull user settings so enabledPlugins pushed from the
|
|
// user's local CLI take effect before the cache sweep.
|
|
const applied = await redownloadUserSettings()
|
|
if (applied) {
|
|
settingsChangeDetector.notifyChange('userSettings')
|
|
}
|
|
}
|
|
|
|
const r = await refreshActivePlugins(setAppState)
|
|
|
|
const sdkAgents = currentAgents.filter(
|
|
a => a.source === 'flagSettings',
|
|
)
|
|
currentAgents = [...r.agentDefinitions.allAgents, ...sdkAgents]
|
|
|
|
// Reload succeeded — gather response data best-effort so a
|
|
// read failure doesn't mask the successful state change.
|
|
// allSettled so one failure doesn't discard the others.
|
|
let plugins: SDKControlReloadPluginsResponse['plugins'] = []
|
|
const [cmdsR, mcpR, pluginsR] = await Promise.allSettled([
|
|
getCommands(cwd()),
|
|
applyPluginMcpDiff(),
|
|
loadAllPluginsCacheOnly(),
|
|
])
|
|
if (cmdsR.status === 'fulfilled') {
|
|
currentCommands = cmdsR.value
|
|
} else {
|
|
logError(cmdsR.reason)
|
|
}
|
|
if (mcpR.status === 'rejected') {
|
|
logError(mcpR.reason)
|
|
}
|
|
if (pluginsR.status === 'fulfilled') {
|
|
plugins = pluginsR.value.enabled.map(p => ({
|
|
name: p.name,
|
|
path: p.path,
|
|
source: p.source,
|
|
}))
|
|
} else {
|
|
logError(pluginsR.reason)
|
|
}
|
|
|
|
sendControlResponseSuccess(message, {
|
|
commands: currentCommands
|
|
.filter(cmd => cmd.userInvocable !== false)
|
|
.map(cmd => ({
|
|
name: getCommandName(cmd),
|
|
description: formatDescriptionWithSource(cmd),
|
|
argumentHint: cmd.argumentHint || '',
|
|
})),
|
|
agents: currentAgents.map(a => ({
|
|
name: a.agentType,
|
|
description: a.whenToUse,
|
|
model: a.model === 'inherit' ? undefined : a.model,
|
|
})),
|
|
plugins,
|
|
mcpServers: buildMcpServerStatuses(),
|
|
error_count: r.error_count,
|
|
} satisfies SDKControlReloadPluginsResponse)
|
|
} catch (error) {
|
|
sendControlResponseError(message, errorMessage(error))
|
|
}
|
|
} else if (message.request.subtype === 'mcp_reconnect') {
|
|
const currentAppState = getAppState()
|
|
const { serverName } = message.request
|
|
elicitationRegistered.delete(serverName)
|
|
// Config-existence gate must cover the SAME sources as the
|
|
// operations below. SDK-injected servers (query({mcpServers:{...}}))
|
|
// and dynamically-added servers were missing here, so
|
|
// toggleMcpServer/reconnect returned "Server not found" even though
|
|
// the disconnect/reconnect would have worked (gh-31339 / CC-314).
|
|
const config =
|
|
getMcpConfigByName(serverName) ??
|
|
mcpClients.find(c => c.name === serverName)?.config ??
|
|
sdkClients.find(c => c.name === serverName)?.config ??
|
|
dynamicMcpState.clients.find(c => c.name === serverName)?.config ??
|
|
currentAppState.mcp.clients.find(c => c.name === serverName)
|
|
?.config ??
|
|
null
|
|
if (!config) {
|
|
sendControlResponseError(message, `Server not found: ${serverName}`)
|
|
} else {
|
|
const result = await reconnectMcpServerImpl(serverName, config)
|
|
// Update appState.mcp with the new client, tools, commands, and resources
|
|
const prefix = getMcpPrefix(serverName)
|
|
setAppState(prev => ({
|
|
...prev,
|
|
mcp: {
|
|
...prev.mcp,
|
|
clients: prev.mcp.clients.map(c =>
|
|
c.name === serverName ? result.client : c,
|
|
),
|
|
tools: [
|
|
...reject(prev.mcp.tools, t => t.name?.startsWith(prefix)),
|
|
...result.tools,
|
|
],
|
|
commands: [
|
|
...reject(prev.mcp.commands, c =>
|
|
commandBelongsToServer(c, serverName),
|
|
),
|
|
...result.commands,
|
|
],
|
|
resources:
|
|
result.resources && result.resources.length > 0
|
|
? { ...prev.mcp.resources, [serverName]: result.resources }
|
|
: omit(prev.mcp.resources, serverName),
|
|
},
|
|
}))
|
|
// Also update dynamicMcpState so run() picks up the new tools
|
|
// on the next turn (run() reads dynamicMcpState, not appState)
|
|
dynamicMcpState = {
|
|
...dynamicMcpState,
|
|
clients: [
|
|
...dynamicMcpState.clients.filter(c => c.name !== serverName),
|
|
result.client,
|
|
],
|
|
tools: [
|
|
...dynamicMcpState.tools.filter(
|
|
t => !t.name?.startsWith(prefix),
|
|
),
|
|
...result.tools,
|
|
],
|
|
}
|
|
if (result.client.type === 'connected') {
|
|
registerElicitationHandlers([result.client])
|
|
reregisterChannelHandlerAfterReconnect(result.client)
|
|
sendControlResponseSuccess(message)
|
|
} else {
|
|
const errorMessage =
|
|
result.client.type === 'failed'
|
|
? (result.client.error ?? 'Connection failed')
|
|
: `Server status: ${result.client.type}`
|
|
sendControlResponseError(message, errorMessage)
|
|
}
|
|
}
|
|
} else if (message.request.subtype === 'mcp_toggle') {
|
|
const currentAppState = getAppState()
|
|
const { serverName, enabled } = message.request
|
|
elicitationRegistered.delete(serverName)
|
|
// Gate must match the client-lookup spread below (which
|
|
// includes sdkClients and dynamicMcpState.clients). Same fix as
|
|
// mcp_reconnect above (gh-31339 / CC-314).
|
|
const config =
|
|
getMcpConfigByName(serverName) ??
|
|
mcpClients.find(c => c.name === serverName)?.config ??
|
|
sdkClients.find(c => c.name === serverName)?.config ??
|
|
dynamicMcpState.clients.find(c => c.name === serverName)?.config ??
|
|
currentAppState.mcp.clients.find(c => c.name === serverName)
|
|
?.config ??
|
|
null
|
|
|
|
if (!config) {
|
|
sendControlResponseError(message, `Server not found: ${serverName}`)
|
|
} else if (!enabled) {
|
|
// Disabling: persist + disconnect (matches TUI toggleMcpServer behavior)
|
|
setMcpServerEnabled(serverName, false)
|
|
const client = [
|
|
...mcpClients,
|
|
...sdkClients,
|
|
...dynamicMcpState.clients,
|
|
...currentAppState.mcp.clients,
|
|
].find(c => c.name === serverName)
|
|
if (client && client.type === 'connected') {
|
|
await clearServerCache(serverName, config)
|
|
}
|
|
// Update appState.mcp to reflect disabled status and remove tools/commands/resources
|
|
const prefix = getMcpPrefix(serverName)
|
|
setAppState(prev => ({
|
|
...prev,
|
|
mcp: {
|
|
...prev.mcp,
|
|
clients: prev.mcp.clients.map(c =>
|
|
c.name === serverName
|
|
? { name: serverName, type: 'disabled' as const, config }
|
|
: c,
|
|
),
|
|
tools: reject(prev.mcp.tools, t => t.name?.startsWith(prefix)),
|
|
commands: reject(prev.mcp.commands, c =>
|
|
commandBelongsToServer(c, serverName),
|
|
),
|
|
resources: omit(prev.mcp.resources, serverName),
|
|
},
|
|
}))
|
|
sendControlResponseSuccess(message)
|
|
} else {
|
|
// Enabling: persist + reconnect
|
|
setMcpServerEnabled(serverName, true)
|
|
const result = await reconnectMcpServerImpl(serverName, config)
|
|
// Update appState.mcp with the new client, tools, commands, and resources
|
|
// This ensures the LLM sees updated tools after enabling the server
|
|
const prefix = getMcpPrefix(serverName)
|
|
setAppState(prev => ({
|
|
...prev,
|
|
mcp: {
|
|
...prev.mcp,
|
|
clients: prev.mcp.clients.map(c =>
|
|
c.name === serverName ? result.client : c,
|
|
),
|
|
tools: [
|
|
...reject(prev.mcp.tools, t => t.name?.startsWith(prefix)),
|
|
...result.tools,
|
|
],
|
|
commands: [
|
|
...reject(prev.mcp.commands, c =>
|
|
commandBelongsToServer(c, serverName),
|
|
),
|
|
...result.commands,
|
|
],
|
|
resources:
|
|
result.resources && result.resources.length > 0
|
|
? { ...prev.mcp.resources, [serverName]: result.resources }
|
|
: omit(prev.mcp.resources, serverName),
|
|
},
|
|
}))
|
|
if (result.client.type === 'connected') {
|
|
registerElicitationHandlers([result.client])
|
|
reregisterChannelHandlerAfterReconnect(result.client)
|
|
sendControlResponseSuccess(message)
|
|
} else {
|
|
const errorMessage =
|
|
result.client.type === 'failed'
|
|
? (result.client.error ?? 'Connection failed')
|
|
: `Server status: ${result.client.type}`
|
|
sendControlResponseError(message, errorMessage)
|
|
}
|
|
}
|
|
} else if (message.request.subtype === 'channel_enable') {
|
|
const currentAppState = getAppState()
|
|
handleChannelEnable(
|
|
message.request_id,
|
|
message.request.serverName,
|
|
// Pool spread matches mcp_status — all three client sources.
|
|
[
|
|
...currentAppState.mcp.clients,
|
|
...sdkClients,
|
|
...dynamicMcpState.clients,
|
|
],
|
|
output,
|
|
)
|
|
} else if (message.request.subtype === 'mcp_authenticate') {
|
|
const { serverName } = message.request
|
|
const currentAppState = getAppState()
|
|
const config =
|
|
getMcpConfigByName(serverName) ??
|
|
mcpClients.find(c => c.name === serverName)?.config ??
|
|
currentAppState.mcp.clients.find(c => c.name === serverName)
|
|
?.config ??
|
|
null
|
|
if (!config) {
|
|
sendControlResponseError(message, `Server not found: ${serverName}`)
|
|
} else if (config.type !== 'sse' && config.type !== 'http') {
|
|
sendControlResponseError(
|
|
message,
|
|
`Server type "${config.type}" does not support OAuth authentication`,
|
|
)
|
|
} else {
|
|
try {
|
|
// Abort any previous in-flight OAuth flow for this server
|
|
activeOAuthFlows.get(serverName)?.abort()
|
|
const controller = new AbortController()
|
|
activeOAuthFlows.set(serverName, controller)
|
|
|
|
// Capture the auth URL from the callback
|
|
let resolveAuthUrl: (url: string) => void
|
|
const authUrlPromise = new Promise<string>(resolve => {
|
|
resolveAuthUrl = resolve
|
|
})
|
|
|
|
// Start the OAuth flow in the background
|
|
const oauthPromise = performMCPOAuthFlow(
|
|
serverName,
|
|
config,
|
|
url => resolveAuthUrl!(url),
|
|
controller.signal,
|
|
{
|
|
skipBrowserOpen: true,
|
|
onWaitingForCallback: submit => {
|
|
oauthCallbackSubmitters.set(serverName, submit)
|
|
},
|
|
},
|
|
)
|
|
|
|
// Wait for the auth URL (or the flow to complete without needing redirect)
|
|
const authUrl = await Promise.race([
|
|
authUrlPromise,
|
|
oauthPromise.then(() => null as string | null),
|
|
])
|
|
|
|
if (authUrl) {
|
|
sendControlResponseSuccess(message, {
|
|
authUrl,
|
|
requiresUserAction: true,
|
|
})
|
|
} else {
|
|
sendControlResponseSuccess(message, {
|
|
requiresUserAction: false,
|
|
})
|
|
}
|
|
|
|
// Store auth-only promise for mcp_oauth_callback_url handler.
|
|
// Don't swallow errors — the callback handler needs to detect
|
|
// auth failures and report them to the caller.
|
|
oauthAuthPromises.set(serverName, oauthPromise)
|
|
|
|
// Handle background completion — reconnect after auth.
|
|
// When manual callback is used, skip the reconnect here;
|
|
// the extension's handleAuthDone → mcp_reconnect handles it
|
|
// (which also updates dynamicMcpState for tool registration).
|
|
const fullFlowPromise = oauthPromise
|
|
.then(async () => {
|
|
// Don't reconnect if the server was disabled during the OAuth flow
|
|
if (isMcpServerDisabled(serverName)) {
|
|
return
|
|
}
|
|
// Skip reconnect if the manual callback path was used —
|
|
// handleAuthDone will do it via mcp_reconnect (which
|
|
// updates dynamicMcpState for tool registration).
|
|
if (oauthManualCallbackUsed.has(serverName)) {
|
|
return
|
|
}
|
|
// Reconnect the server after successful auth
|
|
const result = await reconnectMcpServerImpl(
|
|
serverName,
|
|
config,
|
|
)
|
|
const prefix = getMcpPrefix(serverName)
|
|
setAppState(prev => ({
|
|
...prev,
|
|
mcp: {
|
|
...prev.mcp,
|
|
clients: prev.mcp.clients.map(c =>
|
|
c.name === serverName ? result.client : c,
|
|
),
|
|
tools: [
|
|
...reject(prev.mcp.tools, t =>
|
|
t.name?.startsWith(prefix),
|
|
),
|
|
...result.tools,
|
|
],
|
|
commands: [
|
|
...reject(prev.mcp.commands, c =>
|
|
commandBelongsToServer(c, serverName),
|
|
),
|
|
...result.commands,
|
|
],
|
|
resources:
|
|
result.resources && result.resources.length > 0
|
|
? {
|
|
...prev.mcp.resources,
|
|
[serverName]: result.resources,
|
|
}
|
|
: omit(prev.mcp.resources, serverName),
|
|
},
|
|
}))
|
|
// Also update dynamicMcpState so run() picks up the new tools
|
|
// on the next turn (run() reads dynamicMcpState, not appState)
|
|
dynamicMcpState = {
|
|
...dynamicMcpState,
|
|
clients: [
|
|
...dynamicMcpState.clients.filter(
|
|
c => c.name !== serverName,
|
|
),
|
|
result.client,
|
|
],
|
|
tools: [
|
|
...dynamicMcpState.tools.filter(
|
|
t => !t.name?.startsWith(prefix),
|
|
),
|
|
...result.tools,
|
|
],
|
|
}
|
|
})
|
|
.catch(error => {
|
|
logForDebugging(
|
|
`MCP OAuth failed for ${serverName}: ${error}`,
|
|
{ level: 'error' },
|
|
)
|
|
})
|
|
.finally(() => {
|
|
// Clean up only if this is still the active flow
|
|
if (activeOAuthFlows.get(serverName) === controller) {
|
|
activeOAuthFlows.delete(serverName)
|
|
oauthCallbackSubmitters.delete(serverName)
|
|
oauthManualCallbackUsed.delete(serverName)
|
|
oauthAuthPromises.delete(serverName)
|
|
}
|
|
})
|
|
void fullFlowPromise
|
|
} catch (error) {
|
|
sendControlResponseError(message, errorMessage(error))
|
|
}
|
|
}
|
|
} else if (message.request.subtype === 'mcp_oauth_callback_url') {
|
|
const { serverName, callbackUrl } = message.request
|
|
const submit = oauthCallbackSubmitters.get(serverName)
|
|
if (submit) {
|
|
// Validate the callback URL before submitting. The submit
|
|
// callback in auth.ts silently ignores URLs missing a code
|
|
// param, which would leave the auth promise unresolved and
|
|
// block the control message loop until timeout.
|
|
let hasCodeOrError = false
|
|
try {
|
|
const parsed = new URL(callbackUrl)
|
|
hasCodeOrError =
|
|
parsed.searchParams.has('code') ||
|
|
parsed.searchParams.has('error')
|
|
} catch {
|
|
// Invalid URL
|
|
}
|
|
if (!hasCodeOrError) {
|
|
sendControlResponseError(
|
|
message,
|
|
'Invalid callback URL: missing authorization code. Please paste the full redirect URL including the code parameter.',
|
|
)
|
|
} else {
|
|
oauthManualCallbackUsed.add(serverName)
|
|
submit(callbackUrl)
|
|
// Wait for auth (token exchange) to complete before responding.
|
|
// Reconnect is handled by the extension via handleAuthDone →
|
|
// mcp_reconnect (which updates dynamicMcpState for tools).
|
|
const authPromise = oauthAuthPromises.get(serverName)
|
|
if (authPromise) {
|
|
try {
|
|
await authPromise
|
|
sendControlResponseSuccess(message)
|
|
} catch (error) {
|
|
sendControlResponseError(
|
|
message,
|
|
error instanceof Error
|
|
? error.message
|
|
: 'OAuth authentication failed',
|
|
)
|
|
}
|
|
} else {
|
|
sendControlResponseSuccess(message)
|
|
}
|
|
}
|
|
} else {
|
|
sendControlResponseError(
|
|
message,
|
|
`No active OAuth flow for server: ${serverName}`,
|
|
)
|
|
}
|
|
} else if (message.request.subtype === 'claude_authenticate') {
|
|
// Anthropic OAuth over the control channel. The SDK client owns
|
|
// the user's browser (we're headless in -p mode); we hand back
|
|
// both URLs and wait. Automatic URL → localhost listener catches
|
|
// the redirect if the browser is on this host; manual URL → the
|
|
// success page shows "code#state" for claude_oauth_callback.
|
|
const { loginWithClaudeAi } = message.request
|
|
|
|
// Clean up any prior flow. cleanup() closes the localhost listener
|
|
// and nulls the manual resolver. The prior `flow` promise is left
|
|
// pending (AuthCodeListener.close() does not reject) but its object
|
|
// graph becomes unreachable once the server handle is released and
|
|
// is GC'd — no fd or port is held.
|
|
claudeOAuth?.service.cleanup()
|
|
|
|
logEvent('tengu_oauth_flow_start', {
|
|
loginWithClaudeAi: loginWithClaudeAi ?? true,
|
|
})
|
|
|
|
const service = new OAuthService()
|
|
let urlResolver!: (urls: {
|
|
manualUrl: string
|
|
automaticUrl: string
|
|
}) => void
|
|
const urlPromise = new Promise<{
|
|
manualUrl: string
|
|
automaticUrl: string
|
|
}>(resolve => {
|
|
urlResolver = resolve
|
|
})
|
|
|
|
const flow = service
|
|
.startOAuthFlow(
|
|
async (manualUrl, automaticUrl) => {
|
|
// automaticUrl is always defined when skipBrowserOpen is set;
|
|
// the signature is optional only for the existing single-arg callers.
|
|
urlResolver({ manualUrl, automaticUrl: automaticUrl! })
|
|
},
|
|
{
|
|
loginWithClaudeAi: loginWithClaudeAi ?? true,
|
|
skipBrowserOpen: true,
|
|
},
|
|
)
|
|
.then(async tokens => {
|
|
// installOAuthTokens: performLogout (clear stale state) →
|
|
// store profile → saveOAuthTokensIfNeeded → clearOAuthTokenCache
|
|
// → clearAuthRelatedCaches. After this resolves, the memoized
|
|
// getClaudeAIOAuthTokens in this process is invalidated; the
|
|
// next API call re-reads keychain/file and works. No respawn.
|
|
await installOAuthTokens(tokens)
|
|
logEvent('tengu_oauth_success', {
|
|
loginWithClaudeAi: loginWithClaudeAi ?? true,
|
|
})
|
|
})
|
|
.finally(() => {
|
|
service.cleanup()
|
|
if (claudeOAuth?.service === service) {
|
|
claudeOAuth = null
|
|
}
|
|
})
|
|
|
|
claudeOAuth = { service, flow }
|
|
|
|
// Attach the rejection handler before awaiting so a synchronous
|
|
// startOAuthFlow failure doesn't surface as an unhandled rejection.
|
|
// The claude_oauth_callback handler re-awaits flow for the manual
|
|
// path and surfaces the real error to the client.
|
|
void flow.catch(err =>
|
|
logForDebugging(`claude_authenticate flow ended: ${err}`, {
|
|
level: 'info',
|
|
}),
|
|
)
|
|
|
|
try {
|
|
// Race against flow: if startOAuthFlow rejects before calling
|
|
// the authURLHandler (e.g. AuthCodeListener.start() fails with
|
|
// EACCES or fd exhaustion), urlPromise would pend forever and
|
|
// wedge the stdin loop. flow resolving first is unreachable in
|
|
// practice (it's suspended on the same urls we're waiting for).
|
|
const { manualUrl, automaticUrl } = await Promise.race([
|
|
urlPromise,
|
|
flow.then(() => {
|
|
throw new Error(
|
|
'OAuth flow completed without producing auth URLs',
|
|
)
|
|
}),
|
|
])
|
|
sendControlResponseSuccess(message, {
|
|
manualUrl,
|
|
automaticUrl,
|
|
})
|
|
} catch (error) {
|
|
sendControlResponseError(message, errorMessage(error))
|
|
}
|
|
} else if (
|
|
message.request.subtype === 'claude_oauth_callback' ||
|
|
message.request.subtype === 'claude_oauth_wait_for_completion'
|
|
) {
|
|
if (!claudeOAuth) {
|
|
sendControlResponseError(
|
|
message,
|
|
'No active claude_authenticate flow',
|
|
)
|
|
} else {
|
|
// Inject the manual code synchronously — must happen in stdin
|
|
// message order so a subsequent claude_authenticate doesn't
|
|
// replace the service before this code lands.
|
|
if (message.request.subtype === 'claude_oauth_callback') {
|
|
claudeOAuth.service.handleManualAuthCodeInput({
|
|
authorizationCode: message.request.authorizationCode,
|
|
state: message.request.state,
|
|
})
|
|
}
|
|
// Detach the await — the stdin reader is serial and blocking
|
|
// here deadlocks claude_oauth_wait_for_completion: flow may
|
|
// only resolve via a future claude_oauth_callback on stdin,
|
|
// which can't be read while we're parked. Capture the binding;
|
|
// claudeOAuth is nulled in flow's own .finally.
|
|
const { flow } = claudeOAuth
|
|
void flow.then(
|
|
() => {
|
|
const accountInfo = getAccountInformation()
|
|
sendControlResponseSuccess(message, {
|
|
account: {
|
|
email: accountInfo?.email,
|
|
organization: accountInfo?.organization,
|
|
subscriptionType: accountInfo?.subscription,
|
|
tokenSource: accountInfo?.tokenSource,
|
|
apiKeySource: accountInfo?.apiKeySource,
|
|
apiProvider: getAPIProvider(),
|
|
},
|
|
})
|
|
},
|
|
(error: unknown) =>
|
|
sendControlResponseError(message, errorMessage(error)),
|
|
)
|
|
}
|
|
} else if (message.request.subtype === 'mcp_clear_auth') {
|
|
const { serverName } = message.request
|
|
const currentAppState = getAppState()
|
|
const config =
|
|
getMcpConfigByName(serverName) ??
|
|
mcpClients.find(c => c.name === serverName)?.config ??
|
|
currentAppState.mcp.clients.find(c => c.name === serverName)
|
|
?.config ??
|
|
null
|
|
if (!config) {
|
|
sendControlResponseError(message, `Server not found: ${serverName}`)
|
|
} else if (config.type !== 'sse' && config.type !== 'http') {
|
|
sendControlResponseError(
|
|
message,
|
|
`Cannot clear auth for server type "${config.type}"`,
|
|
)
|
|
} else {
|
|
await revokeServerTokens(serverName, config)
|
|
const result = await reconnectMcpServerImpl(serverName, config)
|
|
const prefix = getMcpPrefix(serverName)
|
|
setAppState(prev => ({
|
|
...prev,
|
|
mcp: {
|
|
...prev.mcp,
|
|
clients: prev.mcp.clients.map(c =>
|
|
c.name === serverName ? result.client : c,
|
|
),
|
|
tools: [
|
|
...reject(prev.mcp.tools, t => t.name?.startsWith(prefix)),
|
|
...result.tools,
|
|
],
|
|
commands: [
|
|
...reject(prev.mcp.commands, c =>
|
|
commandBelongsToServer(c, serverName),
|
|
),
|
|
...result.commands,
|
|
],
|
|
resources:
|
|
result.resources && result.resources.length > 0
|
|
? {
|
|
...prev.mcp.resources,
|
|
[serverName]: result.resources,
|
|
}
|
|
: omit(prev.mcp.resources, serverName),
|
|
},
|
|
}))
|
|
sendControlResponseSuccess(message, {})
|
|
}
|
|
} else if (message.request.subtype === 'apply_flag_settings') {
|
|
// Snapshot the current model before applying — we need to detect
|
|
// model switches so we can inject breadcrumbs and notify listeners.
|
|
const prevModel = getMainLoopModel()
|
|
|
|
// Merge the provided settings into the in-memory flag settings
|
|
const existing = getFlagSettingsInline() ?? {}
|
|
const incoming = message.request.settings
|
|
// Shallow-merge top-level keys; getSettingsForSource handles
|
|
// the deep merge with file-based flag settings via mergeWith.
|
|
// JSON serialization drops `undefined`, so callers use `null`
|
|
// to signal "clear this key". Convert nulls to deletions so
|
|
// SettingsSchema().safeParse() doesn't reject the whole object
|
|
// (z.string().optional() accepts string | undefined, not null).
|
|
const merged = { ...existing, ...incoming }
|
|
for (const key of Object.keys(merged)) {
|
|
if (merged[key as keyof typeof merged] === null) {
|
|
delete merged[key as keyof typeof merged]
|
|
}
|
|
}
|
|
setFlagSettingsInline(merged)
|
|
// Route through notifyChange so fanOut() resets the settings cache
|
|
// before listeners run. The subscriber at :392 calls
|
|
// applySettingsChange for us. Pre-#20625 this was a direct
|
|
// applySettingsChange() call that relied on its own internal reset —
|
|
// now that the reset is centralized in fanOut, a direct call here
|
|
// would read stale cached settings and silently drop the update.
|
|
// Bonus: going through notifyChange also tells the other subscribers
|
|
// (loadPluginHooks, sandbox-adapter) about the change, which the
|
|
// previous direct call skipped.
|
|
settingsChangeDetector.notifyChange('flagSettings')
|
|
|
|
// If the incoming settings include a model change, update the
|
|
// override so getMainLoopModel() reflects it. The override has
|
|
// higher priority than the settings cascade in
|
|
// getUserSpecifiedModelSetting(), so without this update,
|
|
// getMainLoopModel() returns the stale override and the model
|
|
// change is silently ignored (matching set_model at :2811).
|
|
if ('model' in incoming) {
|
|
if (incoming.model != null) {
|
|
setMainLoopModelOverride(String(incoming.model))
|
|
} else {
|
|
setMainLoopModelOverride(undefined)
|
|
}
|
|
}
|
|
|
|
// If the model changed, inject breadcrumbs so the model sees the
|
|
// mid-conversation switch, and notify metadata listeners (CCR).
|
|
const newModel = getMainLoopModel()
|
|
if (newModel !== prevModel) {
|
|
activeUserSpecifiedModel = newModel
|
|
const modelArg = incoming.model ? String(incoming.model) : 'default'
|
|
notifySessionMetadataChanged({ model: newModel })
|
|
injectModelSwitchBreadcrumbs(modelArg, newModel)
|
|
}
|
|
|
|
sendControlResponseSuccess(message)
|
|
} else if (message.request.subtype === 'get_settings') {
|
|
const currentAppState = getAppState()
|
|
const model = getMainLoopModel()
|
|
// modelSupportsEffort gate matches claude.ts — applied.effort must
|
|
// mirror what actually goes to the API, not just what's configured.
|
|
const effort = modelSupportsEffort(model)
|
|
? resolveAppliedEffort(model, currentAppState.effortValue)
|
|
: undefined
|
|
sendControlResponseSuccess(message, {
|
|
...getSettingsWithSources(),
|
|
applied: {
|
|
model,
|
|
// Numeric effort (ant-only) → null; SDK schema is string-level only.
|
|
effort: typeof effort === 'string' ? effort : null,
|
|
},
|
|
})
|
|
} else if (message.request.subtype === 'stop_task') {
|
|
const { task_id: taskId } = message.request
|
|
try {
|
|
await stopTask(taskId, {
|
|
getAppState,
|
|
setAppState,
|
|
})
|
|
sendControlResponseSuccess(message, {})
|
|
} catch (error) {
|
|
sendControlResponseError(message, errorMessage(error))
|
|
}
|
|
} else if (message.request.subtype === 'generate_session_title') {
|
|
// Fire-and-forget so the Haiku call does not block the stdin loop
|
|
// (which would delay processing of subsequent user messages /
|
|
// interrupts for the duration of the API roundtrip).
|
|
const { description, persist } = message.request
|
|
// Reuse the live controller only if it has not already been aborted
|
|
// (e.g. by interrupt()); an aborted signal would cause queryHaiku to
|
|
// immediately throw APIUserAbortError → {title: null}.
|
|
const titleSignal = (
|
|
abortController && !abortController.signal.aborted
|
|
? abortController
|
|
: createAbortController()
|
|
).signal
|
|
void (async () => {
|
|
try {
|
|
const title = await generateSessionTitle(description, titleSignal)
|
|
if (title && persist) {
|
|
try {
|
|
saveAiGeneratedTitle(getSessionId() as UUID, title)
|
|
} catch (e) {
|
|
logError(e)
|
|
}
|
|
}
|
|
sendControlResponseSuccess(message, { title })
|
|
} catch (e) {
|
|
// Unreachable in practice — generateSessionTitle wraps its
|
|
// own body and returns null, saveAiGeneratedTitle is wrapped
|
|
// above. Propagate (not swallow) so unexpected failures are
|
|
// visible to the SDK caller (hostComms.ts catches and logs).
|
|
sendControlResponseError(message, errorMessage(e))
|
|
}
|
|
})()
|
|
} else if (message.request.subtype === 'side_question') {
|
|
// Same fire-and-forget pattern as generate_session_title above —
|
|
// the forked agent's API roundtrip must not block the stdin loop.
|
|
//
|
|
// The snapshot captured by stopHooks (for querySource === 'sdk')
|
|
// holds the exact systemPrompt/userContext/systemContext/messages
|
|
// sent on the last main-thread turn. Reusing them gives a byte-
|
|
// identical prefix → prompt cache hit.
|
|
//
|
|
// Fallback (resume before first turn completes — no snapshot yet):
|
|
// rebuild from scratch. buildSideQuestionFallbackParams mirrors
|
|
// QueryEngine.ts:ask()'s system prompt assembly (including
|
|
// --system-prompt / --append-system-prompt) so the rebuilt prefix
|
|
// matches in the common case. May still miss the cache for
|
|
// coordinator mode or memory-mechanics extras — acceptable, the
|
|
// alternative is the side question failing entirely.
|
|
const { question } = message.request
|
|
void (async () => {
|
|
try {
|
|
const saved = getLastCacheSafeParams()
|
|
const cacheSafeParams = saved
|
|
? {
|
|
...saved,
|
|
// If the last turn was interrupted, the snapshot holds an
|
|
// already-aborted controller; createChildAbortController in
|
|
// createSubagentContext would propagate it and the fork
|
|
// would die before sending a request. The controller is
|
|
// not part of the cache key — swapping in a fresh one is
|
|
// safe. Same guard as generate_session_title above.
|
|
toolUseContext: {
|
|
...saved.toolUseContext,
|
|
abortController: createAbortController(),
|
|
},
|
|
}
|
|
: await buildSideQuestionFallbackParams({
|
|
tools: buildAllTools(getAppState()),
|
|
commands: currentCommands,
|
|
mcpClients: [
|
|
...getAppState().mcp.clients,
|
|
...sdkClients,
|
|
...dynamicMcpState.clients,
|
|
],
|
|
messages: mutableMessages,
|
|
readFileState,
|
|
getAppState,
|
|
setAppState,
|
|
customSystemPrompt: options.systemPrompt,
|
|
appendSystemPrompt: options.appendSystemPrompt,
|
|
thinkingConfig: options.thinkingConfig,
|
|
agents: currentAgents,
|
|
})
|
|
const result = await runSideQuestion({
|
|
question,
|
|
cacheSafeParams,
|
|
})
|
|
sendControlResponseSuccess(message, { response: result.response })
|
|
} catch (e) {
|
|
sendControlResponseError(message, errorMessage(e))
|
|
}
|
|
})()
|
|
} else if (
|
|
(feature('PROACTIVE') || feature('KAIROS')) &&
|
|
(message.request as { subtype: string }).subtype === 'set_proactive'
|
|
) {
|
|
const req = message.request as unknown as {
|
|
subtype: string
|
|
enabled: boolean
|
|
}
|
|
if (req.enabled) {
|
|
if (!proactiveModule!.isProactiveActive()) {
|
|
proactiveModule!.activateProactive('command')
|
|
scheduleProactiveTick!()
|
|
}
|
|
} else {
|
|
proactiveModule!.deactivateProactive()
|
|
}
|
|
sendControlResponseSuccess(message)
|
|
} else if (message.request.subtype === 'remote_control') {
|
|
if (message.request.enabled) {
|
|
if (bridgeHandle) {
|
|
// Already connected
|
|
sendControlResponseSuccess(message, {
|
|
session_url: getRemoteSessionUrl(
|
|
bridgeHandle.bridgeSessionId,
|
|
bridgeHandle.sessionIngressUrl,
|
|
),
|
|
connect_url: buildBridgeConnectUrl(
|
|
bridgeHandle.environmentId,
|
|
bridgeHandle.sessionIngressUrl,
|
|
),
|
|
environment_id: bridgeHandle.environmentId,
|
|
})
|
|
} else {
|
|
// initReplBridge surfaces gate-failure reasons via
|
|
// onStateChange('failed', detail) before returning null.
|
|
// Capture so the control-response error is actionable
|
|
// ("/login", "disabled by your organization's policy", etc.)
|
|
// instead of a generic "initialization failed".
|
|
let bridgeFailureDetail: string | undefined
|
|
try {
|
|
const { initReplBridge } = await import(
|
|
'src/bridge/initReplBridge.js'
|
|
)
|
|
const handle = await initReplBridge({
|
|
onInboundMessage(msg) {
|
|
const fields = extractInboundMessageFields(msg)
|
|
if (!fields) return
|
|
const { content, uuid } = fields
|
|
enqueue({
|
|
value: content,
|
|
mode: 'prompt' as const,
|
|
uuid,
|
|
skipSlashCommands: true,
|
|
})
|
|
void run()
|
|
},
|
|
onPermissionResponse(response) {
|
|
// Forward bridge permission responses into the
|
|
// stdin processing loop so they resolve pending
|
|
// permission requests from the SDK consumer.
|
|
structuredIO.injectControlResponse(response)
|
|
},
|
|
onInterrupt() {
|
|
abortController?.abort()
|
|
},
|
|
onSetModel(model) {
|
|
const resolved =
|
|
model === 'default' ? getDefaultMainLoopModel() : model
|
|
activeUserSpecifiedModel = resolved
|
|
setMainLoopModelOverride(resolved)
|
|
},
|
|
onSetMaxThinkingTokens(maxTokens) {
|
|
if (maxTokens === null) {
|
|
options.thinkingConfig = undefined
|
|
} else if (maxTokens === 0) {
|
|
options.thinkingConfig = { type: 'disabled' }
|
|
} else {
|
|
options.thinkingConfig = {
|
|
type: 'enabled',
|
|
budgetTokens: maxTokens,
|
|
}
|
|
}
|
|
},
|
|
onStateChange(state, detail) {
|
|
if (state === 'failed') {
|
|
bridgeFailureDetail = detail
|
|
}
|
|
logForDebugging(
|
|
`[bridge:sdk] State change: ${state}${detail ? ` — ${detail}` : ''}`,
|
|
)
|
|
output.enqueue({
|
|
type: 'system' as StdoutMessage['type'],
|
|
subtype: 'bridge_state' as string,
|
|
state,
|
|
detail,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
} as StdoutMessage)
|
|
},
|
|
initialMessages:
|
|
mutableMessages.length > 0 ? mutableMessages : undefined,
|
|
})
|
|
if (!handle) {
|
|
sendControlResponseError(
|
|
message,
|
|
bridgeFailureDetail ??
|
|
'Remote Control initialization failed',
|
|
)
|
|
} else {
|
|
bridgeHandle = handle
|
|
bridgeLastForwardedIndex = mutableMessages.length
|
|
// Forward permission requests to the bridge
|
|
structuredIO.setOnControlRequestSent(request => {
|
|
handle.sendControlRequest(request)
|
|
})
|
|
// Cancel stale bridge permission prompts when the SDK
|
|
// consumer resolves a can_use_tool request first.
|
|
structuredIO.setOnControlRequestResolved(requestId => {
|
|
handle.sendControlCancelRequest(requestId)
|
|
})
|
|
sendControlResponseSuccess(message, {
|
|
session_url: getRemoteSessionUrl(
|
|
handle.bridgeSessionId,
|
|
handle.sessionIngressUrl,
|
|
),
|
|
connect_url: buildBridgeConnectUrl(
|
|
handle.environmentId,
|
|
handle.sessionIngressUrl,
|
|
),
|
|
environment_id: handle.environmentId,
|
|
})
|
|
}
|
|
} catch (err) {
|
|
sendControlResponseError(message, errorMessage(err))
|
|
}
|
|
}
|
|
} else {
|
|
// Disable
|
|
if (bridgeHandle) {
|
|
structuredIO.setOnControlRequestSent(undefined)
|
|
structuredIO.setOnControlRequestResolved(undefined)
|
|
await bridgeHandle.teardown()
|
|
bridgeHandle = null
|
|
}
|
|
sendControlResponseSuccess(message)
|
|
}
|
|
} else {
|
|
// Unknown control request subtype — send an error response so
|
|
// the caller doesn't hang waiting for a reply that never comes.
|
|
sendControlResponseError(
|
|
message,
|
|
`Unsupported control request subtype: ${(message.request as { subtype: string }).subtype}`,
|
|
)
|
|
}
|
|
continue
|
|
} else if (message.type === 'control_response') {
|
|
// Replay control_response messages when replay mode is enabled
|
|
if (options.replayUserMessages) {
|
|
output.enqueue(message)
|
|
}
|
|
continue
|
|
} else if (message.type === 'keep_alive') {
|
|
// Silently ignore keep-alive messages
|
|
continue
|
|
} else if (message.type === 'update_environment_variables') {
|
|
// Handled in structuredIO.ts, but TypeScript needs the type guard
|
|
continue
|
|
} else if (message.type === 'assistant' || message.type === 'system') {
|
|
// History replay from bridge: inject into mutableMessages as
|
|
// conversation context so the model sees prior turns.
|
|
const internalMsgs = toInternalMessages([message])
|
|
mutableMessages.push(...internalMsgs)
|
|
// Echo assistant messages back so CCR displays them
|
|
if (message.type === 'assistant' && options.replayUserMessages) {
|
|
output.enqueue(message)
|
|
}
|
|
continue
|
|
}
|
|
// After handling control, keep-alive, env-var, assistant, and system
|
|
// messages above, only user messages should remain.
|
|
if (message.type !== 'user') {
|
|
continue
|
|
}
|
|
|
|
// First prompt message implicitly initializes if not already done.
|
|
initialized = true
|
|
|
|
// Check for duplicate user message - skip if already processed
|
|
if (message.uuid) {
|
|
const sessionId = getSessionId() as UUID
|
|
const existsInSession = await doesMessageExistInSession(
|
|
sessionId,
|
|
message.uuid,
|
|
)
|
|
|
|
// Check both historical duplicates (from file) and runtime duplicates (this session)
|
|
if (existsInSession || receivedMessageUuids.has(message.uuid)) {
|
|
logForDebugging(`Skipping duplicate user message: ${message.uuid}`)
|
|
// Send acknowledgment for duplicate message if replay mode is enabled
|
|
if (options.replayUserMessages) {
|
|
logForDebugging(
|
|
`Sending acknowledgment for duplicate user message: ${message.uuid}`,
|
|
)
|
|
output.enqueue({
|
|
type: 'user',
|
|
message: message.message,
|
|
session_id: sessionId,
|
|
parent_tool_use_id: null,
|
|
uuid: message.uuid,
|
|
timestamp: message.timestamp,
|
|
isReplay: true,
|
|
} as SDKUserMessageReplay)
|
|
}
|
|
// Historical dup = transcript already has this turn's output, so it
|
|
// ran but its lifecycle was never closed (interrupted before ack).
|
|
// Runtime dups don't need this — the original enqueue path closes them.
|
|
if (existsInSession) {
|
|
notifyCommandLifecycle(message.uuid, 'completed')
|
|
}
|
|
// Don't enqueue duplicate messages for execution
|
|
continue
|
|
}
|
|
|
|
// Track this UUID to prevent runtime duplicates
|
|
trackReceivedMessageUuid(message.uuid)
|
|
}
|
|
|
|
enqueue({
|
|
mode: 'prompt' as const,
|
|
// file_attachments rides the protobuf catchall from the web composer.
|
|
// Same-ref no-op when absent (no 'file_attachments' key).
|
|
value: await resolveAndPrepend(message, message.message.content),
|
|
uuid: message.uuid,
|
|
priority: message.priority,
|
|
})
|
|
// Increment prompt count for attribution tracking and save snapshot
|
|
// The snapshot persists promptCount so it survives compaction
|
|
if (feature('COMMIT_ATTRIBUTION')) {
|
|
setAppState(prev => ({
|
|
...prev,
|
|
attribution: incrementPromptCount(prev.attribution, snapshot => {
|
|
void recordAttributionSnapshot(snapshot).catch(error => {
|
|
logForDebugging(`Attribution: Failed to save snapshot: ${error}`)
|
|
})
|
|
}),
|
|
}))
|
|
}
|
|
void run()
|
|
}
|
|
inputClosed = true
|
|
cronScheduler?.stop()
|
|
if (!running) {
|
|
// If a push-suggestion is in-flight, wait for it to emit before closing
|
|
// the output stream (5 s safety timeout to prevent hanging).
|
|
if (suggestionState.inflightPromise) {
|
|
await Promise.race([suggestionState.inflightPromise, sleep(5000)])
|
|
}
|
|
suggestionState.abortController?.abort()
|
|
suggestionState.abortController = null
|
|
await finalizePendingAsyncHooks()
|
|
unsubscribeSkillChanges()
|
|
unsubscribeAuthStatus?.()
|
|
statusListeners.delete(rateLimitListener)
|
|
output.done()
|
|
}
|
|
})()
|
|
|
|
return output
|
|
}
|
|
|
|
/**
|
|
* Creates a CanUseToolFn that incorporates a custom permission prompt tool.
|
|
* This function converts the permissionPromptTool into a CanUseToolFn that can be used in ask.tsx
|
|
*/
|
|
export function createCanUseToolWithPermissionPrompt(
|
|
permissionPromptTool: PermissionPromptTool,
|
|
): CanUseToolFn {
|
|
const canUseTool: CanUseToolFn = async (
|
|
tool,
|
|
input,
|
|
toolUseContext,
|
|
assistantMessage,
|
|
toolUseId,
|
|
forceDecision,
|
|
) => {
|
|
const mainPermissionResult =
|
|
forceDecision ??
|
|
(await hasPermissionsToUseTool(
|
|
tool,
|
|
input,
|
|
toolUseContext,
|
|
assistantMessage,
|
|
toolUseId,
|
|
))
|
|
|
|
// If the tool is allowed or denied, return the result
|
|
if (
|
|
mainPermissionResult.behavior === 'allow' ||
|
|
mainPermissionResult.behavior === 'deny'
|
|
) {
|
|
return mainPermissionResult
|
|
}
|
|
|
|
// Race the permission prompt tool against the abort signal.
|
|
//
|
|
// Why we need this: The permission prompt tool may block indefinitely waiting
|
|
// for user input (e.g., via stdin or a UI dialog). If the user triggers an
|
|
// interrupt (Ctrl+C), we need to detect it even while the tool is blocked.
|
|
// Without this race, the abort check would only run AFTER the tool completes,
|
|
// which may never happen if the tool is waiting for input that will never come.
|
|
//
|
|
// The second check (combinedSignal.aborted) handles a race condition where
|
|
// abort fires after Promise.race resolves but before we reach this check.
|
|
const { signal: combinedSignal, cleanup: cleanupAbortListener } =
|
|
createCombinedAbortSignal(toolUseContext.abortController.signal)
|
|
|
|
// Check if already aborted before starting the race
|
|
if (combinedSignal.aborted) {
|
|
cleanupAbortListener()
|
|
return {
|
|
behavior: 'deny',
|
|
message: 'Permission prompt was aborted.',
|
|
decisionReason: {
|
|
type: 'permissionPromptTool' as const,
|
|
permissionPromptToolName: tool.name,
|
|
toolResult: undefined,
|
|
},
|
|
}
|
|
}
|
|
|
|
const abortPromise = new Promise<'aborted'>(resolve => {
|
|
combinedSignal.addEventListener('abort', () => resolve('aborted'), {
|
|
once: true,
|
|
})
|
|
})
|
|
|
|
const toolCallPromise = permissionPromptTool.call(
|
|
{
|
|
tool_name: tool.name,
|
|
input,
|
|
tool_use_id: toolUseId,
|
|
},
|
|
toolUseContext,
|
|
canUseTool,
|
|
assistantMessage,
|
|
)
|
|
|
|
const raceResult = await Promise.race([toolCallPromise, abortPromise])
|
|
cleanupAbortListener()
|
|
|
|
if (raceResult === 'aborted' || combinedSignal.aborted) {
|
|
return {
|
|
behavior: 'deny',
|
|
message: 'Permission prompt was aborted.',
|
|
decisionReason: {
|
|
type: 'permissionPromptTool' as const,
|
|
permissionPromptToolName: tool.name,
|
|
toolResult: undefined,
|
|
},
|
|
}
|
|
}
|
|
|
|
// TypeScript narrowing: after the abort check, raceResult must be ToolResult
|
|
const result = raceResult as Awaited<typeof toolCallPromise>
|
|
|
|
const permissionToolResultBlockParam =
|
|
permissionPromptTool.mapToolResultToToolResultBlockParam(result.data, '1')
|
|
if (
|
|
!permissionToolResultBlockParam.content ||
|
|
!Array.isArray(permissionToolResultBlockParam.content) ||
|
|
!permissionToolResultBlockParam.content[0] ||
|
|
permissionToolResultBlockParam.content[0].type !== 'text' ||
|
|
typeof permissionToolResultBlockParam.content[0].text !== 'string'
|
|
) {
|
|
throw new Error(
|
|
'Permission prompt tool returned an invalid result. Expected a single text block param with type="text" and a string text value.',
|
|
)
|
|
}
|
|
return permissionPromptToolResultToPermissionDecision(
|
|
permissionToolOutputSchema().parse(
|
|
safeParseJSON(permissionToolResultBlockParam.content[0].text),
|
|
),
|
|
permissionPromptTool,
|
|
input,
|
|
toolUseContext,
|
|
)
|
|
}
|
|
return canUseTool
|
|
}
|
|
|
|
// Exported for testing — regression: this used to crash at construction when
|
|
// getMcpTools() was empty (before per-server connects populated appState).
|
|
export function getCanUseToolFn(
|
|
permissionPromptToolName: string | undefined,
|
|
structuredIO: StructuredIO,
|
|
getMcpTools: () => Tool[],
|
|
onPermissionPrompt?: (details: RequiresActionDetails) => void,
|
|
): CanUseToolFn {
|
|
if (permissionPromptToolName === 'stdio') {
|
|
return structuredIO.createCanUseTool(onPermissionPrompt)
|
|
}
|
|
if (!permissionPromptToolName) {
|
|
return async (
|
|
tool,
|
|
input,
|
|
toolUseContext,
|
|
assistantMessage,
|
|
toolUseId,
|
|
forceDecision,
|
|
) =>
|
|
forceDecision ??
|
|
(await hasPermissionsToUseTool(
|
|
tool,
|
|
input,
|
|
toolUseContext,
|
|
assistantMessage,
|
|
toolUseId,
|
|
))
|
|
}
|
|
// Lazy lookup: MCP connects are per-server incremental in print mode, so
|
|
// the tool may not be in appState yet at init time. Resolve on first call
|
|
// (first permission prompt), by which point connects have had time to finish.
|
|
let resolved: CanUseToolFn | null = null
|
|
return async (
|
|
tool,
|
|
input,
|
|
toolUseContext,
|
|
assistantMessage,
|
|
toolUseId,
|
|
forceDecision,
|
|
) => {
|
|
if (!resolved) {
|
|
const mcpTools = getMcpTools()
|
|
const permissionPromptTool = mcpTools.find(t =>
|
|
toolMatchesName(t, permissionPromptToolName),
|
|
) as PermissionPromptTool | undefined
|
|
if (!permissionPromptTool) {
|
|
const error = `Error: MCP tool ${permissionPromptToolName} (passed via --permission-prompt-tool) not found. Available MCP tools: ${mcpTools.map(t => t.name).join(', ') || 'none'}`
|
|
process.stderr.write(`${error}\n`)
|
|
gracefulShutdownSync(1)
|
|
throw new Error(error)
|
|
}
|
|
if (!permissionPromptTool.inputJSONSchema) {
|
|
const error = `Error: tool ${permissionPromptToolName} (passed via --permission-prompt-tool) must be an MCP tool`
|
|
process.stderr.write(`${error}\n`)
|
|
gracefulShutdownSync(1)
|
|
throw new Error(error)
|
|
}
|
|
resolved = createCanUseToolWithPermissionPrompt(permissionPromptTool)
|
|
}
|
|
return resolved(
|
|
tool,
|
|
input,
|
|
toolUseContext,
|
|
assistantMessage,
|
|
toolUseId,
|
|
forceDecision,
|
|
)
|
|
}
|
|
}
|
|
|
|
async function handleInitializeRequest(
|
|
request: SDKControlInitializeRequest,
|
|
requestId: string,
|
|
initialized: boolean,
|
|
output: Stream<StdoutMessage>,
|
|
commands: Command[],
|
|
modelInfos: ModelInfo[],
|
|
structuredIO: StructuredIO,
|
|
enableAuthStatus: boolean,
|
|
options: {
|
|
systemPrompt: string | undefined
|
|
appendSystemPrompt: string | undefined
|
|
agent?: string | undefined
|
|
userSpecifiedModel?: string | undefined
|
|
[key: string]: unknown
|
|
},
|
|
agents: AgentDefinition[],
|
|
getAppState: () => AppState,
|
|
): Promise<void> {
|
|
if (initialized) {
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: {
|
|
subtype: 'error',
|
|
error: 'Already initialized',
|
|
request_id: requestId,
|
|
pending_permission_requests:
|
|
structuredIO.getPendingPermissionRequests(),
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
// Apply systemPrompt/appendSystemPrompt from stdin to avoid ARG_MAX limits
|
|
if (request.systemPrompt !== undefined) {
|
|
options.systemPrompt = request.systemPrompt
|
|
}
|
|
if (request.appendSystemPrompt !== undefined) {
|
|
options.appendSystemPrompt = request.appendSystemPrompt
|
|
}
|
|
if (request.promptSuggestions !== undefined) {
|
|
options.promptSuggestions = request.promptSuggestions
|
|
}
|
|
|
|
// Merge agents from stdin to avoid ARG_MAX limits
|
|
if (request.agents) {
|
|
const stdinAgents = parseAgentsFromJson(request.agents, 'flagSettings')
|
|
agents.push(...stdinAgents)
|
|
}
|
|
|
|
// Re-evaluate main thread agent after SDK agents are merged
|
|
// This allows --agent to reference agents defined via SDK
|
|
if (options.agent) {
|
|
// If main.tsx already found this agent (filesystem-defined), it already
|
|
// applied systemPrompt/model/initialPrompt. Skip to avoid double-apply.
|
|
const alreadyResolved = getMainThreadAgentType() === options.agent
|
|
const mainThreadAgent = agents.find(a => a.agentType === options.agent)
|
|
if (mainThreadAgent && !alreadyResolved) {
|
|
// Update the main thread agent type in bootstrap state
|
|
setMainThreadAgentType(mainThreadAgent.agentType)
|
|
|
|
// Apply the agent's system prompt if user hasn't specified a custom one
|
|
// SDK agents are always custom agents (not built-in), so getSystemPrompt() takes no args
|
|
if (!options.systemPrompt && !isBuiltInAgent(mainThreadAgent)) {
|
|
const agentSystemPrompt = mainThreadAgent.getSystemPrompt()
|
|
if (agentSystemPrompt) {
|
|
options.systemPrompt = agentSystemPrompt
|
|
}
|
|
}
|
|
|
|
// Apply the agent's model if user didn't specify one and agent has a model
|
|
if (
|
|
!options.userSpecifiedModel &&
|
|
mainThreadAgent.model &&
|
|
mainThreadAgent.model !== 'inherit'
|
|
) {
|
|
const agentModel = parseUserSpecifiedModel(mainThreadAgent.model)
|
|
setMainLoopModelOverride(agentModel)
|
|
}
|
|
|
|
// SDK-defined agents arrive via init, so main.tsx's lookup missed them.
|
|
if (mainThreadAgent.initialPrompt) {
|
|
structuredIO.prependUserMessage(mainThreadAgent.initialPrompt)
|
|
}
|
|
} else if (mainThreadAgent?.initialPrompt) {
|
|
// Filesystem-defined agent (alreadyResolved by main.tsx). main.tsx
|
|
// handles initialPrompt for the string inputPrompt case, but when
|
|
// inputPrompt is an AsyncIterable (SDK stream-json), it can't
|
|
// concatenate — fall back to prependUserMessage here.
|
|
structuredIO.prependUserMessage(mainThreadAgent.initialPrompt)
|
|
}
|
|
}
|
|
|
|
const settings = getSettings_DEPRECATED()
|
|
const outputStyle = settings?.outputStyle || DEFAULT_OUTPUT_STYLE_NAME
|
|
const availableOutputStyles = await getAllOutputStyles(getCwd())
|
|
|
|
// Get account information
|
|
const accountInfo = getAccountInformation()
|
|
if (request.hooks) {
|
|
const hooks: Partial<Record<HookEvent, HookCallbackMatcher[]>> = {}
|
|
for (const [event, matchers] of Object.entries(request.hooks)) {
|
|
hooks[event as HookEvent] = matchers.map(matcher => {
|
|
const callbacks = matcher.hookCallbackIds.map(callbackId => {
|
|
return structuredIO.createHookCallback(callbackId, matcher.timeout)
|
|
})
|
|
return {
|
|
matcher: matcher.matcher,
|
|
hooks: callbacks,
|
|
}
|
|
})
|
|
}
|
|
registerHookCallbacks(hooks)
|
|
}
|
|
if (request.jsonSchema) {
|
|
setInitJsonSchema(request.jsonSchema)
|
|
}
|
|
const initResponse: SDKControlInitializeResponse = {
|
|
commands: commands
|
|
.filter(cmd => cmd.userInvocable !== false)
|
|
.map(cmd => ({
|
|
name: getCommandName(cmd),
|
|
description: formatDescriptionWithSource(cmd),
|
|
argumentHint: cmd.argumentHint || '',
|
|
})),
|
|
agents: agents.map(agent => ({
|
|
name: agent.agentType,
|
|
description: agent.whenToUse,
|
|
// 'inherit' is an internal sentinel; normalize to undefined for the public API
|
|
model: agent.model === 'inherit' ? undefined : agent.model,
|
|
})),
|
|
output_style: outputStyle,
|
|
available_output_styles: Object.keys(availableOutputStyles),
|
|
models: modelInfos,
|
|
account: {
|
|
email: accountInfo?.email,
|
|
organization: accountInfo?.organization,
|
|
subscriptionType: accountInfo?.subscription,
|
|
tokenSource: accountInfo?.tokenSource,
|
|
apiKeySource: accountInfo?.apiKeySource,
|
|
// getAccountInformation() returns undefined under 3P providers, so the
|
|
// other fields are all absent. apiProvider disambiguates "not logged
|
|
// in" (firstParty + tokenSource:none) from "3P, login not applicable".
|
|
apiProvider: getAPIProvider(),
|
|
},
|
|
pid: process.pid,
|
|
}
|
|
|
|
if (isFastModeEnabled() && isFastModeAvailable()) {
|
|
const appState = getAppState()
|
|
initResponse.fast_mode_state = getFastModeState(
|
|
options.userSpecifiedModel ?? null,
|
|
appState.fastMode,
|
|
)
|
|
}
|
|
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: {
|
|
subtype: 'success',
|
|
request_id: requestId,
|
|
response: initResponse,
|
|
},
|
|
})
|
|
|
|
// After the initialize message, check the auth status-
|
|
// This will get notified of changes, but we also want to send the
|
|
// initial state.
|
|
if (enableAuthStatus) {
|
|
const authStatusManager = AwsAuthStatusManager.getInstance()
|
|
const status = authStatusManager.getStatus()
|
|
if (status) {
|
|
output.enqueue({
|
|
type: 'auth_status',
|
|
isAuthenticating: status.isAuthenticating,
|
|
output: status.output,
|
|
error: status.error,
|
|
uuid: randomUUID(),
|
|
session_id: getSessionId(),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
async function handleRewindFiles(
|
|
userMessageId: UUID,
|
|
appState: AppState,
|
|
setAppState: (updater: (prev: AppState) => AppState) => void,
|
|
dryRun: boolean,
|
|
): Promise<RewindFilesResult> {
|
|
if (!fileHistoryEnabled()) {
|
|
return { canRewind: false, error: 'File rewinding is not enabled.' }
|
|
}
|
|
if (!fileHistoryCanRestore(appState.fileHistory, userMessageId)) {
|
|
return {
|
|
canRewind: false,
|
|
error: 'No file checkpoint found for this message.',
|
|
}
|
|
}
|
|
|
|
if (dryRun) {
|
|
const diffStats = await fileHistoryGetDiffStats(
|
|
appState.fileHistory,
|
|
userMessageId,
|
|
)
|
|
return {
|
|
canRewind: true,
|
|
filesChanged: diffStats?.filesChanged,
|
|
insertions: diffStats?.insertions,
|
|
deletions: diffStats?.deletions,
|
|
}
|
|
}
|
|
|
|
try {
|
|
await fileHistoryRewind(
|
|
updater =>
|
|
setAppState(prev => ({
|
|
...prev,
|
|
fileHistory: updater(prev.fileHistory),
|
|
})),
|
|
userMessageId,
|
|
)
|
|
} catch (error) {
|
|
return {
|
|
canRewind: false,
|
|
error: `Failed to rewind: ${errorMessage(error)}`,
|
|
}
|
|
}
|
|
|
|
return { canRewind: true }
|
|
}
|
|
|
|
function handleSetPermissionMode(
|
|
request: { mode: InternalPermissionMode },
|
|
requestId: string,
|
|
toolPermissionContext: ToolPermissionContext,
|
|
output: Stream<StdoutMessage>,
|
|
): ToolPermissionContext {
|
|
// Check if trying to switch to bypassPermissions mode
|
|
if (request.mode === 'bypassPermissions') {
|
|
if (isBypassPermissionsModeDisabled()) {
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: {
|
|
subtype: 'error',
|
|
request_id: requestId,
|
|
error:
|
|
'Cannot set permission mode to bypassPermissions because it is disabled by settings or configuration',
|
|
},
|
|
})
|
|
return toolPermissionContext
|
|
}
|
|
if (!toolPermissionContext.isBypassPermissionsModeAvailable) {
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: {
|
|
subtype: 'error',
|
|
request_id: requestId,
|
|
error:
|
|
'Cannot set permission mode to bypassPermissions because the session was not launched with --dangerously-skip-permissions',
|
|
},
|
|
})
|
|
return toolPermissionContext
|
|
}
|
|
}
|
|
|
|
// Check if trying to switch to auto mode without the classifier gate
|
|
if (
|
|
feature('TRANSCRIPT_CLASSIFIER') &&
|
|
request.mode === 'auto' &&
|
|
!isAutoModeGateEnabled()
|
|
) {
|
|
const reason = getAutoModeUnavailableReason()
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: {
|
|
subtype: 'error',
|
|
request_id: requestId,
|
|
error: reason
|
|
? `Cannot set permission mode to auto: ${getAutoModeUnavailableNotification(reason)}`
|
|
: 'Cannot set permission mode to auto',
|
|
},
|
|
})
|
|
return toolPermissionContext
|
|
}
|
|
|
|
// Allow the mode switch
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: {
|
|
subtype: 'success',
|
|
request_id: requestId,
|
|
response: {
|
|
mode: request.mode,
|
|
},
|
|
},
|
|
})
|
|
|
|
return {
|
|
...transitionPermissionMode(
|
|
toolPermissionContext.mode,
|
|
request.mode,
|
|
toolPermissionContext,
|
|
),
|
|
mode: request.mode,
|
|
}
|
|
}
|
|
|
|
/**
|
|
* IDE-triggered channel enable. Derives the ChannelEntry from the connection's
|
|
* pluginSource (IDE can't spoof kind/marketplace — we only take the server
|
|
* name), appends it to session allowedChannels, and runs the full gate. On
|
|
* gate failure, rolls back the append. On success, registers a notification
|
|
* handler that enqueues channel messages at priority:'next' — drainCommandQueue
|
|
* picks them up between turns.
|
|
*
|
|
* Intentionally does NOT register the claude/channel/permission handler that
|
|
* useManageMCPConnections sets up for interactive mode. That handler resolves
|
|
* a pending dialog inside handleInteractivePermission — but print.ts never
|
|
* calls handleInteractivePermission. When SDK permission lands on 'ask', it
|
|
* goes to the consumer's canUseTool callback over stdio; there is no CLI-side
|
|
* dialog for a remote "yes tbxkq" to resolve. If an IDE wants channel-relayed
|
|
* tool approval, that's IDE-side plumbing against its own pending-map. (Also
|
|
* gated separately by tengu_harbor_permissions — not yet shipping on
|
|
* interactive either.)
|
|
*/
|
|
function handleChannelEnable(
|
|
requestId: string,
|
|
serverName: string,
|
|
connectionPool: readonly MCPServerConnection[],
|
|
output: Stream<StdoutMessage>,
|
|
): void {
|
|
const respondError = (error: string) =>
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: { subtype: 'error', request_id: requestId, error },
|
|
})
|
|
|
|
if (!(feature('KAIROS') || feature('KAIROS_CHANNELS'))) {
|
|
return respondError('channels feature not available in this build')
|
|
}
|
|
|
|
// Only a 'connected' client has .capabilities and .client to register the
|
|
// handler on. The pool spread at the call site matches mcp_status.
|
|
const connection = connectionPool.find(
|
|
c => c.name === serverName && c.type === 'connected',
|
|
)
|
|
if (!connection || connection.type !== 'connected') {
|
|
return respondError(`server ${serverName} is not connected`)
|
|
}
|
|
|
|
const pluginSource = connection.config.pluginSource
|
|
const parsed = pluginSource ? parsePluginIdentifier(pluginSource) : undefined
|
|
if (!parsed?.marketplace) {
|
|
// No pluginSource or @-less source — can never pass the {plugin,
|
|
// marketplace}-keyed allowlist. Short-circuit with the same reason the
|
|
// gate would produce.
|
|
return respondError(
|
|
`server ${serverName} is not plugin-sourced; channel_enable requires a marketplace plugin`,
|
|
)
|
|
}
|
|
|
|
const entry: ChannelEntry = {
|
|
kind: 'plugin',
|
|
name: parsed.name,
|
|
marketplace: parsed.marketplace,
|
|
}
|
|
// Idempotency: don't double-append on repeat enable.
|
|
const prior = getAllowedChannels()
|
|
const already = prior.some(
|
|
e =>
|
|
e.kind === 'plugin' &&
|
|
e.name === entry.name &&
|
|
e.marketplace === entry.marketplace,
|
|
)
|
|
if (!already) setAllowedChannels([...prior, entry])
|
|
|
|
const gate = gateChannelServer(
|
|
serverName,
|
|
connection.capabilities,
|
|
pluginSource,
|
|
)
|
|
if (gate.action === 'skip') {
|
|
// Rollback — only remove the entry we appended.
|
|
if (!already) setAllowedChannels(prior)
|
|
return respondError(gate.reason)
|
|
}
|
|
|
|
const pluginId =
|
|
`${entry.name}@${entry.marketplace}` as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
|
|
logMCPDebug(serverName, 'Channel notifications registered')
|
|
logEvent('tengu_mcp_channel_enable', { plugin: pluginId })
|
|
|
|
// Identical enqueue shape to the interactive register block in
|
|
// useManageMCPConnections. drainCommandQueue processes it between turns —
|
|
// channel messages queue at priority 'next' and are seen by the model on
|
|
// the turn after they arrive.
|
|
connection.client.setNotificationHandler(
|
|
ChannelMessageNotificationSchema(),
|
|
async notification => {
|
|
const { content, meta } = notification.params
|
|
logMCPDebug(
|
|
serverName,
|
|
`notifications/claude/channel: ${content.slice(0, 80)}`,
|
|
)
|
|
logEvent('tengu_mcp_channel_message', {
|
|
content_length: content.length,
|
|
meta_key_count: Object.keys(meta ?? {}).length,
|
|
entry_kind:
|
|
'plugin' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
is_dev: false,
|
|
plugin: pluginId,
|
|
})
|
|
enqueue({
|
|
mode: 'prompt',
|
|
value: wrapChannelMessage(serverName, content, meta),
|
|
priority: 'next',
|
|
isMeta: true,
|
|
origin: { kind: 'channel', server: serverName },
|
|
skipSlashCommands: true,
|
|
})
|
|
},
|
|
)
|
|
|
|
output.enqueue({
|
|
type: 'control_response',
|
|
response: {
|
|
subtype: 'success',
|
|
request_id: requestId,
|
|
response: undefined,
|
|
},
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Re-register the channel notification handler after mcp_reconnect /
|
|
* mcp_toggle creates a new client. handleChannelEnable bound the handler to
|
|
* the OLD client object; allowedChannels survives the reconnect but the
|
|
* handler binding does not. Without this, channel messages silently drop
|
|
* after a reconnect while the IDE still believes the channel is live.
|
|
*
|
|
* Mirrors the interactive CLI's onConnectionAttempt in
|
|
* useManageMCPConnections, which re-gates on every new connection. Paired
|
|
* with registerElicitationHandlers at the same call sites.
|
|
*
|
|
* No-op if the server was never channel-enabled: gateChannelServer calls
|
|
* findChannelEntry internally and returns skip/session for an unlisted
|
|
* server, so reconnecting a non-channel MCP server costs one feature-flag
|
|
* check.
|
|
*/
|
|
function reregisterChannelHandlerAfterReconnect(
|
|
connection: MCPServerConnection,
|
|
): void {
|
|
if (!(feature('KAIROS') || feature('KAIROS_CHANNELS'))) return
|
|
if (connection.type !== 'connected') return
|
|
|
|
const gate = gateChannelServer(
|
|
connection.name,
|
|
connection.capabilities,
|
|
connection.config.pluginSource,
|
|
)
|
|
if (gate.action !== 'register') return
|
|
|
|
const entry = findChannelEntry(connection.name, getAllowedChannels())
|
|
const pluginId =
|
|
entry?.kind === 'plugin'
|
|
? (`${entry.name}@${entry.marketplace}` as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
|
|
: undefined
|
|
|
|
logMCPDebug(
|
|
connection.name,
|
|
'Channel notifications re-registered after reconnect',
|
|
)
|
|
connection.client.setNotificationHandler(
|
|
ChannelMessageNotificationSchema(),
|
|
async notification => {
|
|
const { content, meta } = notification.params
|
|
logMCPDebug(
|
|
connection.name,
|
|
`notifications/claude/channel: ${content.slice(0, 80)}`,
|
|
)
|
|
logEvent('tengu_mcp_channel_message', {
|
|
content_length: content.length,
|
|
meta_key_count: Object.keys(meta ?? {}).length,
|
|
entry_kind:
|
|
entry?.kind as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
|
|
is_dev: entry?.dev ?? false,
|
|
plugin: pluginId,
|
|
})
|
|
enqueue({
|
|
mode: 'prompt',
|
|
value: wrapChannelMessage(connection.name, content, meta),
|
|
priority: 'next',
|
|
isMeta: true,
|
|
origin: { kind: 'channel', server: connection.name },
|
|
skipSlashCommands: true,
|
|
})
|
|
},
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Emits an error message in the correct format based on outputFormat.
|
|
* When using stream-json, writes JSON to stdout; otherwise writes plain text to stderr.
|
|
*/
|
|
function emitLoadError(
|
|
message: string,
|
|
outputFormat: string | undefined,
|
|
): void {
|
|
if (outputFormat === 'stream-json') {
|
|
const errorResult = {
|
|
type: 'result',
|
|
subtype: 'error_during_execution',
|
|
duration_ms: 0,
|
|
duration_api_ms: 0,
|
|
is_error: true,
|
|
num_turns: 0,
|
|
stop_reason: null,
|
|
session_id: getSessionId(),
|
|
total_cost_usd: 0,
|
|
usage: EMPTY_USAGE,
|
|
modelUsage: {},
|
|
permission_denials: [],
|
|
uuid: randomUUID(),
|
|
errors: [message],
|
|
}
|
|
process.stdout.write(jsonStringify(errorResult) + '\n')
|
|
} else {
|
|
process.stderr.write(message + '\n')
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Removes an interrupted user message and its synthetic assistant sentinel
|
|
* from the message array. Used during gateway-triggered restarts to clean up
|
|
* the message history before re-enqueuing the interrupted prompt.
|
|
*
|
|
* @internal Exported for testing
|
|
*/
|
|
export function removeInterruptedMessage(
|
|
messages: Message[],
|
|
interruptedUserMessage: NormalizedUserMessage,
|
|
): void {
|
|
const idx = messages.findIndex(m => m.uuid === interruptedUserMessage.uuid)
|
|
if (idx !== -1) {
|
|
// Remove the user message and the sentinel that immediately follows it.
|
|
// splice safely handles the case where idx is the last element.
|
|
messages.splice(idx, 2)
|
|
}
|
|
}
|
|
|
|
type LoadInitialMessagesResult = {
|
|
messages: Message[]
|
|
turnInterruptionState?: TurnInterruptionState
|
|
agentSetting?: string
|
|
}
|
|
|
|
async function loadInitialMessages(
|
|
setAppState: (f: (prev: AppState) => AppState) => void,
|
|
options: {
|
|
continue: boolean | undefined
|
|
teleport: string | true | null | undefined
|
|
resume: string | boolean | undefined
|
|
resumeSessionAt: string | undefined
|
|
forkSession: boolean | undefined
|
|
outputFormat: string | undefined
|
|
sessionStartHooksPromise?: ReturnType<typeof processSessionStartHooks>
|
|
restoredWorkerState: Promise<SessionExternalMetadata | null>
|
|
},
|
|
): Promise<LoadInitialMessagesResult> {
|
|
const persistSession = !isSessionPersistenceDisabled()
|
|
// Handle continue in print mode
|
|
if (options.continue) {
|
|
try {
|
|
logEvent('tengu_continue_print', {})
|
|
|
|
const result = await loadConversationForResume(
|
|
undefined /* sessionId */,
|
|
undefined /* file path */,
|
|
)
|
|
if (result) {
|
|
// Match coordinator mode to the resumed session's mode
|
|
if (feature('COORDINATOR_MODE') && coordinatorModeModule) {
|
|
const warning = coordinatorModeModule.matchSessionMode(result.mode)
|
|
if (warning) {
|
|
process.stderr.write(warning + '\n')
|
|
// Refresh agent definitions to reflect the mode switch
|
|
const {
|
|
getAgentDefinitionsWithOverrides,
|
|
getActiveAgentsFromList,
|
|
} =
|
|
// eslint-disable-next-line @typescript-eslint/no-require-imports
|
|
require('../tools/AgentTool/loadAgentsDir.js') as typeof import('../tools/AgentTool/loadAgentsDir.js')
|
|
getAgentDefinitionsWithOverrides.cache.clear?.()
|
|
const freshAgentDefs = await getAgentDefinitionsWithOverrides(
|
|
getCwd(),
|
|
)
|
|
|
|
setAppState(prev => ({
|
|
...prev,
|
|
agentDefinitions: {
|
|
...freshAgentDefs,
|
|
allAgents: freshAgentDefs.allAgents,
|
|
activeAgents: getActiveAgentsFromList(freshAgentDefs.allAgents),
|
|
},
|
|
}))
|
|
}
|
|
}
|
|
|
|
// Reuse the resumed session's ID
|
|
if (!options.forkSession) {
|
|
if (result.sessionId) {
|
|
switchSession(
|
|
asSessionId(result.sessionId),
|
|
result.fullPath ? dirname(result.fullPath) : null,
|
|
)
|
|
if (persistSession) {
|
|
await resetSessionFilePointer()
|
|
}
|
|
}
|
|
}
|
|
restoreSessionStateFromLog(result, setAppState)
|
|
|
|
// Restore session metadata so it's re-appended on exit via reAppendSessionMetadata
|
|
restoreSessionMetadata(
|
|
options.forkSession
|
|
? { ...result, worktreeSession: undefined }
|
|
: result,
|
|
)
|
|
|
|
// Write mode entry for the resumed session
|
|
if (feature('COORDINATOR_MODE') && coordinatorModeModule) {
|
|
saveMode(
|
|
coordinatorModeModule.isCoordinatorMode()
|
|
? 'coordinator'
|
|
: 'normal',
|
|
)
|
|
}
|
|
|
|
return {
|
|
messages: result.messages,
|
|
turnInterruptionState: result.turnInterruptionState,
|
|
agentSetting: result.agentSetting,
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logError(error)
|
|
gracefulShutdownSync(1)
|
|
return { messages: [] }
|
|
}
|
|
}
|
|
|
|
// Handle teleport in print mode
|
|
if (options.teleport) {
|
|
try {
|
|
if (!isPolicyAllowed('allow_remote_sessions')) {
|
|
throw new Error(
|
|
"Remote sessions are disabled by your organization's policy.",
|
|
)
|
|
}
|
|
|
|
logEvent('tengu_teleport_print', {})
|
|
|
|
if (typeof options.teleport !== 'string') {
|
|
throw new Error('No session ID provided for teleport')
|
|
}
|
|
|
|
const {
|
|
checkOutTeleportedSessionBranch,
|
|
processMessagesForTeleportResume,
|
|
teleportResumeCodeSession,
|
|
validateGitState,
|
|
} = await import('src/utils/teleport.js')
|
|
await validateGitState()
|
|
const teleportResult = await teleportResumeCodeSession(options.teleport)
|
|
const { branchError } = await checkOutTeleportedSessionBranch(
|
|
teleportResult.branch,
|
|
)
|
|
return {
|
|
messages: processMessagesForTeleportResume(
|
|
teleportResult.log,
|
|
branchError,
|
|
),
|
|
}
|
|
} catch (error) {
|
|
logError(error)
|
|
gracefulShutdownSync(1)
|
|
return { messages: [] }
|
|
}
|
|
}
|
|
|
|
// Handle resume in print mode (accepts session ID or URL)
|
|
// URLs are [ANT-ONLY]
|
|
if (options.resume) {
|
|
try {
|
|
logEvent('tengu_resume_print', {})
|
|
|
|
// In print mode - we require a valid session ID, JSONL file or URL
|
|
const parsedSessionId = parseSessionIdentifier(
|
|
typeof options.resume === 'string' ? options.resume : '',
|
|
)
|
|
if (!parsedSessionId) {
|
|
let errorMessage =
|
|
'Error: --resume requires a valid session ID when used with --print. Usage: claude -p --resume <session-id>'
|
|
if (typeof options.resume === 'string') {
|
|
errorMessage += `. Session IDs must be in UUID format (e.g., 550e8400-e29b-41d4-a716-446655440000). Provided value "${options.resume}" is not a valid UUID`
|
|
}
|
|
emitLoadError(errorMessage, options.outputFormat)
|
|
gracefulShutdownSync(1)
|
|
return { messages: [] }
|
|
}
|
|
|
|
// Hydrate local transcript from remote before loading
|
|
if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
|
|
// Await restore alongside hydration so SSE catchup lands on
|
|
// restored state, not a fresh default.
|
|
const [, metadata] = await Promise.all([
|
|
hydrateFromCCRv2InternalEvents(parsedSessionId.sessionId),
|
|
options.restoredWorkerState,
|
|
])
|
|
if (metadata) {
|
|
setAppState(externalMetadataToAppState(metadata))
|
|
if (typeof metadata.model === 'string') {
|
|
setMainLoopModelOverride(metadata.model)
|
|
}
|
|
}
|
|
} else if (
|
|
parsedSessionId.isUrl &&
|
|
parsedSessionId.ingressUrl &&
|
|
isEnvTruthy(process.env.ENABLE_SESSION_PERSISTENCE)
|
|
) {
|
|
// v1: fetch session logs from Session Ingress
|
|
await hydrateRemoteSession(
|
|
parsedSessionId.sessionId,
|
|
parsedSessionId.ingressUrl,
|
|
)
|
|
}
|
|
|
|
// Load the conversation with the specified session ID
|
|
const result = await loadConversationForResume(
|
|
parsedSessionId.sessionId,
|
|
parsedSessionId.jsonlFile || undefined,
|
|
)
|
|
|
|
// hydrateFromCCRv2InternalEvents writes an empty transcript file for
|
|
// fresh sessions (writeFile(sessionFile, '') with zero events), so
|
|
// loadConversationForResume returns {messages: []} not null. Treat
|
|
// empty the same as null so SessionStart still fires.
|
|
if (!result || result.messages.length === 0) {
|
|
// For URL-based or CCR v2 resume, start with empty session (it was hydrated but empty)
|
|
if (
|
|
parsedSessionId.isUrl ||
|
|
isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)
|
|
) {
|
|
// Execute SessionStart hooks for startup since we're starting a new session
|
|
return {
|
|
messages: await (options.sessionStartHooksPromise ??
|
|
processSessionStartHooks('startup')),
|
|
}
|
|
} else {
|
|
emitLoadError(
|
|
`No conversation found with session ID: ${parsedSessionId.sessionId}`,
|
|
options.outputFormat,
|
|
)
|
|
gracefulShutdownSync(1)
|
|
return { messages: [] }
|
|
}
|
|
}
|
|
|
|
// Handle resumeSessionAt feature
|
|
if (options.resumeSessionAt) {
|
|
const index = result.messages.findIndex(
|
|
m => m.uuid === options.resumeSessionAt,
|
|
)
|
|
if (index < 0) {
|
|
emitLoadError(
|
|
`No message found with message.uuid of: ${options.resumeSessionAt}`,
|
|
options.outputFormat,
|
|
)
|
|
gracefulShutdownSync(1)
|
|
return { messages: [] }
|
|
}
|
|
|
|
result.messages = index >= 0 ? result.messages.slice(0, index + 1) : []
|
|
}
|
|
|
|
// Match coordinator mode to the resumed session's mode
|
|
if (feature('COORDINATOR_MODE') && coordinatorModeModule) {
|
|
const warning = coordinatorModeModule.matchSessionMode(result.mode)
|
|
if (warning) {
|
|
process.stderr.write(warning + '\n')
|
|
// Refresh agent definitions to reflect the mode switch
|
|
const { getAgentDefinitionsWithOverrides, getActiveAgentsFromList } =
|
|
// eslint-disable-next-line @typescript-eslint/no-require-imports
|
|
require('../tools/AgentTool/loadAgentsDir.js') as typeof import('../tools/AgentTool/loadAgentsDir.js')
|
|
getAgentDefinitionsWithOverrides.cache.clear?.()
|
|
const freshAgentDefs = await getAgentDefinitionsWithOverrides(
|
|
getCwd(),
|
|
)
|
|
|
|
setAppState(prev => ({
|
|
...prev,
|
|
agentDefinitions: {
|
|
...freshAgentDefs,
|
|
allAgents: freshAgentDefs.allAgents,
|
|
activeAgents: getActiveAgentsFromList(freshAgentDefs.allAgents),
|
|
},
|
|
}))
|
|
}
|
|
}
|
|
|
|
// Reuse the resumed session's ID
|
|
if (!options.forkSession && result.sessionId) {
|
|
switchSession(
|
|
asSessionId(result.sessionId),
|
|
result.fullPath ? dirname(result.fullPath) : null,
|
|
)
|
|
if (persistSession) {
|
|
await resetSessionFilePointer()
|
|
}
|
|
}
|
|
restoreSessionStateFromLog(result, setAppState)
|
|
|
|
// Restore session metadata so it's re-appended on exit via reAppendSessionMetadata
|
|
restoreSessionMetadata(
|
|
options.forkSession
|
|
? { ...result, worktreeSession: undefined }
|
|
: result,
|
|
)
|
|
|
|
// Write mode entry for the resumed session
|
|
if (feature('COORDINATOR_MODE') && coordinatorModeModule) {
|
|
saveMode(
|
|
coordinatorModeModule.isCoordinatorMode() ? 'coordinator' : 'normal',
|
|
)
|
|
}
|
|
|
|
return {
|
|
messages: result.messages,
|
|
turnInterruptionState: result.turnInterruptionState,
|
|
agentSetting: result.agentSetting,
|
|
}
|
|
} catch (error) {
|
|
logError(error)
|
|
const errorMessage =
|
|
error instanceof Error
|
|
? `Failed to resume session: ${error.message}`
|
|
: 'Failed to resume session with --print mode'
|
|
emitLoadError(errorMessage, options.outputFormat)
|
|
gracefulShutdownSync(1)
|
|
return { messages: [] }
|
|
}
|
|
}
|
|
|
|
// Join the SessionStart hooks promise kicked in main.tsx (or run fresh if
|
|
// it wasn't kicked — e.g. --continue with no prior session falls through
|
|
// here with sessionStartHooksPromise undefined because main.tsx guards on continue)
|
|
return {
|
|
messages: await (options.sessionStartHooksPromise ??
|
|
processSessionStartHooks('startup')),
|
|
}
|
|
}
|
|
|
|
function getStructuredIO(
|
|
inputPrompt: string | AsyncIterable<string>,
|
|
options: {
|
|
sdkUrl: string | undefined
|
|
replayUserMessages?: boolean
|
|
},
|
|
): StructuredIO {
|
|
let inputStream: AsyncIterable<string>
|
|
if (typeof inputPrompt === 'string') {
|
|
if (inputPrompt.trim() !== '') {
|
|
// Normalize to a streaming input.
|
|
inputStream = fromArray([
|
|
jsonStringify({
|
|
type: 'user',
|
|
session_id: '',
|
|
message: {
|
|
role: 'user',
|
|
content: inputPrompt,
|
|
},
|
|
parent_tool_use_id: null,
|
|
} satisfies SDKUserMessage),
|
|
])
|
|
} else {
|
|
// Empty string - create empty stream
|
|
inputStream = fromArray([])
|
|
}
|
|
} else {
|
|
inputStream = inputPrompt
|
|
}
|
|
|
|
// Use RemoteIO if sdkUrl is provided, otherwise use regular StructuredIO
|
|
return options.sdkUrl
|
|
? new RemoteIO(options.sdkUrl, inputStream, options.replayUserMessages)
|
|
: new StructuredIO(inputStream, options.replayUserMessages)
|
|
}
|
|
|
|
/**
|
|
* Handles unexpected permission responses by looking up the unresolved tool
|
|
* call in the transcript and enqueuing it for execution.
|
|
*
|
|
* Returns true if a permission was enqueued, false otherwise.
|
|
*/
|
|
export async function handleOrphanedPermissionResponse({
|
|
message,
|
|
setAppState,
|
|
onEnqueued,
|
|
handledToolUseIds,
|
|
}: {
|
|
message: SDKControlResponse
|
|
setAppState: (f: (prev: AppState) => AppState) => void
|
|
onEnqueued?: () => void
|
|
handledToolUseIds: Set<string>
|
|
}): Promise<boolean> {
|
|
if (
|
|
message.response.subtype === 'success' &&
|
|
message.response.response?.toolUseID &&
|
|
typeof message.response.response.toolUseID === 'string'
|
|
) {
|
|
const permissionResult = message.response.response as PermissionResult
|
|
const { toolUseID } = permissionResult
|
|
if (!toolUseID) {
|
|
return false
|
|
}
|
|
|
|
logForDebugging(
|
|
`handleOrphanedPermissionResponse: received orphaned control_response for toolUseID=${toolUseID} request_id=${message.response.request_id}`,
|
|
)
|
|
|
|
// Prevent re-processing the same orphaned tool_use. Without this guard,
|
|
// duplicate control_response deliveries (e.g. from WebSocket reconnect)
|
|
// cause the same tool to be executed multiple times, producing duplicate
|
|
// tool_use IDs in the messages array and a 400 error from the API.
|
|
// Once corrupted, every retry accumulates more duplicates.
|
|
if (handledToolUseIds.has(toolUseID)) {
|
|
logForDebugging(
|
|
`handleOrphanedPermissionResponse: skipping duplicate orphaned permission for toolUseID=${toolUseID} (already handled)`,
|
|
)
|
|
return false
|
|
}
|
|
|
|
const assistantMessage = await findUnresolvedToolUse(toolUseID)
|
|
if (!assistantMessage) {
|
|
logForDebugging(
|
|
`handleOrphanedPermissionResponse: no unresolved tool_use found for toolUseID=${toolUseID} (already resolved in transcript)`,
|
|
)
|
|
return false
|
|
}
|
|
|
|
handledToolUseIds.add(toolUseID)
|
|
logForDebugging(
|
|
`handleOrphanedPermissionResponse: enqueuing orphaned permission for toolUseID=${toolUseID} messageID=${assistantMessage.message.id}`,
|
|
)
|
|
enqueue({
|
|
mode: 'orphaned-permission' as const,
|
|
value: [],
|
|
orphanedPermission: {
|
|
permissionResult,
|
|
assistantMessage,
|
|
},
|
|
})
|
|
|
|
onEnqueued?.()
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
export type DynamicMcpState = {
|
|
clients: MCPServerConnection[]
|
|
tools: Tools
|
|
configs: Record<string, ScopedMcpServerConfig>
|
|
}
|
|
|
|
/**
|
|
* Converts a process transport config to a scoped config.
|
|
* The types are structurally compatible, so we just add the scope.
|
|
*/
|
|
function toScopedConfig(
|
|
config: McpServerConfigForProcessTransport,
|
|
): ScopedMcpServerConfig {
|
|
// McpServerConfigForProcessTransport is a subset of McpServerConfig
|
|
// (it excludes IDE-specific types like sse-ide and ws-ide)
|
|
// Adding scope makes it a valid ScopedMcpServerConfig
|
|
return { ...config, scope: 'dynamic' } as ScopedMcpServerConfig
|
|
}
|
|
|
|
/**
|
|
* State for SDK MCP servers that run in the SDK process.
|
|
*/
|
|
export type SdkMcpState = {
|
|
configs: Record<string, McpSdkServerConfig>
|
|
clients: MCPServerConnection[]
|
|
tools: Tools
|
|
}
|
|
|
|
/**
|
|
* Result of handleMcpSetServers - contains new state and response data.
|
|
*/
|
|
export type McpSetServersResult = {
|
|
response: SDKControlMcpSetServersResponse
|
|
newSdkState: SdkMcpState
|
|
newDynamicState: DynamicMcpState
|
|
sdkServersChanged: boolean
|
|
}
|
|
|
|
/**
|
|
* Handles mcp_set_servers requests by processing both SDK and process-based servers.
|
|
* SDK servers run in the SDK process; process-based servers are spawned by the CLI.
|
|
*
|
|
* Applies enterprise allowedMcpServers/deniedMcpServers policy — same filter as
|
|
* --mcp-config (see filterMcpServersByPolicy call in main.tsx). Without this,
|
|
* SDK V2 Query.setMcpServers() was a second policy bypass vector. Blocked servers
|
|
* are reported in response.errors so the SDK consumer knows why they weren't added.
|
|
*/
|
|
export async function handleMcpSetServers(
|
|
servers: Record<string, McpServerConfigForProcessTransport>,
|
|
sdkState: SdkMcpState,
|
|
dynamicState: DynamicMcpState,
|
|
setAppState: (f: (prev: AppState) => AppState) => void,
|
|
): Promise<McpSetServersResult> {
|
|
// Enforce enterprise MCP policy on process-based servers (stdio/http/sse).
|
|
// Mirrors the --mcp-config filter in main.tsx — both user-controlled injection
|
|
// paths must have the same gate. type:'sdk' servers are exempt (SDK-managed,
|
|
// CLI never spawns/connects for them — see filterMcpServersByPolicy jsdoc).
|
|
// Blocked servers go into response.errors so the SDK caller sees why.
|
|
const { allowed: allowedServers, blocked } = filterMcpServersByPolicy(servers)
|
|
const policyErrors: Record<string, string> = {}
|
|
for (const name of blocked) {
|
|
policyErrors[name] =
|
|
'Blocked by enterprise policy (allowedMcpServers/deniedMcpServers)'
|
|
}
|
|
|
|
// Separate SDK servers from process-based servers
|
|
const sdkServers: Record<string, McpSdkServerConfig> = {}
|
|
const processServers: Record<string, McpServerConfigForProcessTransport> = {}
|
|
|
|
for (const [name, config] of Object.entries(allowedServers)) {
|
|
if (config.type === 'sdk') {
|
|
sdkServers[name] = config
|
|
} else {
|
|
processServers[name] = config
|
|
}
|
|
}
|
|
|
|
// Handle SDK servers
|
|
const currentSdkNames = new Set(Object.keys(sdkState.configs))
|
|
const newSdkNames = new Set(Object.keys(sdkServers))
|
|
const sdkAdded: string[] = []
|
|
const sdkRemoved: string[] = []
|
|
|
|
const newSdkConfigs = { ...sdkState.configs }
|
|
let newSdkClients = [...sdkState.clients]
|
|
let newSdkTools = [...sdkState.tools]
|
|
|
|
// Remove SDK servers no longer in desired state
|
|
for (const name of currentSdkNames) {
|
|
if (!newSdkNames.has(name)) {
|
|
const client = newSdkClients.find(c => c.name === name)
|
|
if (client && client.type === 'connected') {
|
|
await client.cleanup()
|
|
}
|
|
newSdkClients = newSdkClients.filter(c => c.name !== name)
|
|
const prefix = `mcp__${name}__`
|
|
newSdkTools = newSdkTools.filter(t => !t.name.startsWith(prefix))
|
|
delete newSdkConfigs[name]
|
|
sdkRemoved.push(name)
|
|
}
|
|
}
|
|
|
|
// Add new SDK servers as pending - they'll be upgraded to connected
|
|
// when updateSdkMcp() runs on the next query
|
|
for (const [name, config] of Object.entries(sdkServers)) {
|
|
if (!currentSdkNames.has(name)) {
|
|
newSdkConfigs[name] = config
|
|
const pendingClient: MCPServerConnection = {
|
|
type: 'pending',
|
|
name,
|
|
config: { ...config, scope: 'dynamic' as const },
|
|
}
|
|
newSdkClients = [...newSdkClients, pendingClient]
|
|
sdkAdded.push(name)
|
|
}
|
|
}
|
|
|
|
// Handle process-based servers
|
|
const processResult = await reconcileMcpServers(
|
|
processServers,
|
|
dynamicState,
|
|
setAppState,
|
|
)
|
|
|
|
return {
|
|
response: {
|
|
added: [...sdkAdded, ...processResult.response.added],
|
|
removed: [...sdkRemoved, ...processResult.response.removed],
|
|
errors: { ...policyErrors, ...processResult.response.errors },
|
|
},
|
|
newSdkState: {
|
|
configs: newSdkConfigs,
|
|
clients: newSdkClients,
|
|
tools: newSdkTools,
|
|
},
|
|
newDynamicState: processResult.newState,
|
|
sdkServersChanged: sdkAdded.length > 0 || sdkRemoved.length > 0,
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reconciles the current set of dynamic MCP servers with a new desired state.
|
|
* Handles additions, removals, and config changes.
|
|
*/
|
|
export async function reconcileMcpServers(
|
|
desiredConfigs: Record<string, McpServerConfigForProcessTransport>,
|
|
currentState: DynamicMcpState,
|
|
setAppState: (f: (prev: AppState) => AppState) => void,
|
|
): Promise<{
|
|
response: SDKControlMcpSetServersResponse
|
|
newState: DynamicMcpState
|
|
}> {
|
|
const currentNames = new Set(Object.keys(currentState.configs))
|
|
const desiredNames = new Set(Object.keys(desiredConfigs))
|
|
|
|
const toRemove = [...currentNames].filter(n => !desiredNames.has(n))
|
|
const toAdd = [...desiredNames].filter(n => !currentNames.has(n))
|
|
|
|
// Check for config changes (same name, different config)
|
|
const toCheck = [...currentNames].filter(n => desiredNames.has(n))
|
|
const toReplace = toCheck.filter(name => {
|
|
const currentConfig = currentState.configs[name]
|
|
const desiredConfigRaw = desiredConfigs[name]
|
|
if (!currentConfig || !desiredConfigRaw) return true
|
|
const desiredConfig = toScopedConfig(desiredConfigRaw)
|
|
return !areMcpConfigsEqual(currentConfig, desiredConfig)
|
|
})
|
|
|
|
const removed: string[] = []
|
|
const added: string[] = []
|
|
const errors: Record<string, string> = {}
|
|
|
|
let newClients = [...currentState.clients]
|
|
let newTools = [...currentState.tools]
|
|
|
|
// Remove old servers (including ones being replaced)
|
|
for (const name of [...toRemove, ...toReplace]) {
|
|
const client = newClients.find(c => c.name === name)
|
|
const config = currentState.configs[name]
|
|
if (client && config) {
|
|
if (client.type === 'connected') {
|
|
try {
|
|
await client.cleanup()
|
|
} catch (e) {
|
|
logError(e)
|
|
}
|
|
}
|
|
// Clear the memoization cache
|
|
await clearServerCache(name, config)
|
|
}
|
|
|
|
// Remove tools from this server
|
|
const prefix = `mcp__${name}__`
|
|
newTools = newTools.filter(t => !t.name.startsWith(prefix))
|
|
|
|
// Remove from clients list
|
|
newClients = newClients.filter(c => c.name !== name)
|
|
|
|
// Track removal (only for actually removed, not replaced)
|
|
if (toRemove.includes(name)) {
|
|
removed.push(name)
|
|
}
|
|
}
|
|
|
|
// Add new servers (including replacements)
|
|
for (const name of [...toAdd, ...toReplace]) {
|
|
const config = desiredConfigs[name]
|
|
if (!config) continue
|
|
const scopedConfig = toScopedConfig(config)
|
|
|
|
// SDK servers are managed by the SDK process, not the CLI.
|
|
// Just track them without trying to connect.
|
|
if (config.type === 'sdk') {
|
|
added.push(name)
|
|
continue
|
|
}
|
|
|
|
try {
|
|
const client = await connectToServer(name, scopedConfig)
|
|
newClients.push(client)
|
|
|
|
if (client.type === 'connected') {
|
|
const serverTools = await fetchToolsForClient(client)
|
|
newTools.push(...serverTools)
|
|
} else if (client.type === 'failed') {
|
|
errors[name] = client.error || 'Connection failed'
|
|
}
|
|
|
|
added.push(name)
|
|
} catch (e) {
|
|
const err = toError(e)
|
|
errors[name] = err.message
|
|
logError(err)
|
|
}
|
|
}
|
|
|
|
// Build new configs
|
|
const newConfigs: Record<string, ScopedMcpServerConfig> = {}
|
|
for (const name of desiredNames) {
|
|
const config = desiredConfigs[name]
|
|
if (config) {
|
|
newConfigs[name] = toScopedConfig(config)
|
|
}
|
|
}
|
|
|
|
const newState: DynamicMcpState = {
|
|
clients: newClients,
|
|
tools: newTools,
|
|
configs: newConfigs,
|
|
}
|
|
|
|
// Update AppState with the new tools
|
|
setAppState(prev => {
|
|
// Get all dynamic server names (current + new)
|
|
const allDynamicServerNames = new Set([
|
|
...Object.keys(currentState.configs),
|
|
...Object.keys(newConfigs),
|
|
])
|
|
|
|
// Remove old dynamic tools
|
|
const nonDynamicTools = prev.mcp.tools.filter(t => {
|
|
for (const serverName of allDynamicServerNames) {
|
|
if (t.name.startsWith(`mcp__${serverName}__`)) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
|
|
// Remove old dynamic clients
|
|
const nonDynamicClients = prev.mcp.clients.filter(c => {
|
|
return !allDynamicServerNames.has(c.name)
|
|
})
|
|
|
|
return {
|
|
...prev,
|
|
mcp: {
|
|
...prev.mcp,
|
|
tools: [...nonDynamicTools, ...newTools],
|
|
clients: [...nonDynamicClients, ...newClients],
|
|
},
|
|
}
|
|
})
|
|
|
|
return {
|
|
response: { added, removed, errors },
|
|
newState,
|
|
}
|
|
}
|