From 433552019ea623f3d9a3bd1548d85ccf894bf9a2 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Thu, 12 Feb 2026 11:51:34 -0800 Subject: [PATCH] Checkpoint --- apps/sim/lib/copilot/client-sse/handlers.ts | 141 +-------- .../copilot/client-sse/subagent-handlers.ts | 137 +------- .../copilot/client-sse/tool-call-helpers.ts | 134 ++++++++ .../orchestrator/sse-handlers/handlers.ts | 294 +++++++----------- .../tools/server/workflow/change-store.ts | 91 +++++- .../tools/server/workflow/workflow-change.ts | 9 +- .../tools/server/workflow/workflow-context.ts | 4 +- 7 files changed, 355 insertions(+), 455 deletions(-) create mode 100644 apps/sim/lib/copilot/client-sse/tool-call-helpers.ts diff --git a/apps/sim/lib/copilot/client-sse/handlers.ts b/apps/sim/lib/copilot/client-sse/handlers.ts index af9cc6338..cd462bd8b 100644 --- a/apps/sim/lib/copilot/client-sse/handlers.ts +++ b/apps/sim/lib/copilot/client-sse/handlers.ts @@ -2,17 +2,19 @@ import { createLogger } from '@sim/logger' import { STREAM_STORAGE_KEY } from '@/lib/copilot/constants' import { asRecord } from '@/lib/copilot/orchestrator/sse-utils' import type { SSEEvent } from '@/lib/copilot/orchestrator/types' -import { - humanizedFallback, - isBackgroundState, - isRejectedState, - isReviewState, - resolveToolDisplay, -} from '@/lib/copilot/store-utils' +import { isBackgroundState, isRejectedState, isReviewState } from '@/lib/copilot/store-utils' import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry' import type { CopilotStore, CopilotStreamInfo, CopilotToolCall } from '@/stores/panel/copilot/types' import { appendTextBlock, beginThinkingBlock, finalizeThinkingBlock } from './content-blocks' import { CLIENT_EXECUTABLE_RUN_TOOLS, executeRunToolOnClient } from './run-tool-execution' +import { + extractOperationListFromResultPayload, + extractToolExecutionMetadata, + extractToolUiMetadata, + isWorkflowChangeApplyCall, + mapServerStateToClientState, + resolveDisplayFromServerUi, +} from './tool-call-helpers' import { applyToolEffects } from './tool-effects' import type { ClientContentBlock, ClientStreamingContext } from './types' @@ -30,131 +32,6 @@ function isClientRunCapability(toolCall: CopilotToolCall): boolean { return CLIENT_EXECUTABLE_RUN_TOOLS.has(toolCall.name) } -function mapServerStateToClientState(state: unknown): ClientToolCallState { - switch (String(state || '')) { - case 'generating': - return ClientToolCallState.generating - case 'pending': - case 'awaiting_approval': - return ClientToolCallState.pending - case 'executing': - return ClientToolCallState.executing - case 'success': - return ClientToolCallState.success - case 'rejected': - case 'skipped': - return ClientToolCallState.rejected - case 'aborted': - return ClientToolCallState.aborted - case 'error': - case 'failed': - return ClientToolCallState.error - default: - return ClientToolCallState.pending - } -} - -function extractToolUiMetadata(data: Record): CopilotToolCall['ui'] | undefined { - const ui = asRecord(data.ui) - if (!ui || Object.keys(ui).length === 0) return undefined - const autoAllowedFromUi = ui.autoAllowed === true - const autoAllowedFromData = data.autoAllowed === true - return { - title: typeof ui.title === 'string' ? ui.title : undefined, - phaseLabel: typeof ui.phaseLabel === 'string' ? ui.phaseLabel : undefined, - icon: typeof ui.icon === 'string' ? ui.icon : undefined, - showInterrupt: ui.showInterrupt === true, - showRemember: ui.showRemember === true, - autoAllowed: autoAllowedFromUi || autoAllowedFromData, - actions: Array.isArray(ui.actions) - ? ui.actions - .map((action) => { - const a = asRecord(action) - const id = typeof a.id === 'string' ? a.id : undefined - const label = typeof a.label === 'string' ? a.label : undefined - const kind: 'accept' | 'reject' = a.kind === 'reject' ? 'reject' : 'accept' - if (!id || !label) return null - return { - id, - label, - kind, - remember: a.remember === true, - } - }) - .filter((a): a is NonNullable => !!a) - : undefined, - } -} - -function extractToolExecutionMetadata( - data: Record -): CopilotToolCall['execution'] | undefined { - const execution = asRecord(data.execution) - if (!execution || Object.keys(execution).length === 0) return undefined - return { - target: typeof execution.target === 'string' ? execution.target : undefined, - capabilityId: typeof execution.capabilityId === 'string' ? execution.capabilityId : undefined, - } -} - -function displayVerb(state: ClientToolCallState): string { - switch (state) { - case ClientToolCallState.success: - return 'Completed' - case ClientToolCallState.error: - return 'Failed' - case ClientToolCallState.rejected: - return 'Skipped' - case ClientToolCallState.aborted: - return 'Aborted' - case ClientToolCallState.generating: - return 'Preparing' - case ClientToolCallState.pending: - return 'Waiting' - default: - return 'Running' - } -} - -function resolveDisplayFromServerUi( - toolName: string, - state: ClientToolCallState, - toolCallId: string, - params: Record | undefined, - ui?: CopilotToolCall['ui'] -) { - const fallback = - resolveToolDisplay(toolName, state, toolCallId, params) || - humanizedFallback(toolName, state) - if (!fallback) return undefined - if (ui?.phaseLabel) { - return { text: ui.phaseLabel, icon: fallback.icon } - } - if (ui?.title) { - return { text: `${displayVerb(state)} ${ui.title}`, icon: fallback.icon } - } - return fallback -} - -function isWorkflowChangeApplyCall(toolName?: string, params?: Record): boolean { - if (toolName !== 'workflow_change') return false - const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : '' - if (mode === 'apply') return true - return typeof params?.proposalId === 'string' && params.proposalId.length > 0 -} - -function extractOperationListFromResultPayload( - resultPayload: Record -): Array> | undefined { - const operations = resultPayload.operations - if (Array.isArray(operations)) return operations as Array> - - const compiled = resultPayload.compiledOperations - if (Array.isArray(compiled)) return compiled as Array> - - return undefined -} - function writeActiveStreamToStorage(info: CopilotStreamInfo | null): void { if (typeof window === 'undefined') return try { diff --git a/apps/sim/lib/copilot/client-sse/subagent-handlers.ts b/apps/sim/lib/copilot/client-sse/subagent-handlers.ts index 8570846df..5a215c75d 100644 --- a/apps/sim/lib/copilot/client-sse/subagent-handlers.ts +++ b/apps/sim/lib/copilot/client-sse/subagent-handlers.ts @@ -6,7 +6,6 @@ import { shouldSkipToolResultEvent, } from '@/lib/copilot/orchestrator/sse-utils' import type { SSEEvent } from '@/lib/copilot/orchestrator/types' -import { humanizedFallback, resolveToolDisplay } from '@/lib/copilot/store-utils' import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry' import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types' import { @@ -15,6 +14,14 @@ import { updateStreamingMessage, } from './handlers' import { CLIENT_EXECUTABLE_RUN_TOOLS, executeRunToolOnClient } from './run-tool-execution' +import { + extractOperationListFromResultPayload, + extractToolExecutionMetadata, + extractToolUiMetadata, + isWorkflowChangeApplyCall, + mapServerStateToClientState, + resolveDisplayFromServerUi, +} from './tool-call-helpers' import { applyToolEffects } from './tool-effects' import type { ClientStreamingContext } from './types' @@ -24,112 +31,6 @@ type StoreSet = ( partial: Partial | ((state: CopilotStore) => Partial) ) => void -function mapServerStateToClientState(state: unknown): ClientToolCallState { - switch (String(state || '')) { - case 'generating': - return ClientToolCallState.generating - case 'pending': - case 'awaiting_approval': - return ClientToolCallState.pending - case 'executing': - return ClientToolCallState.executing - case 'success': - return ClientToolCallState.success - case 'rejected': - case 'skipped': - return ClientToolCallState.rejected - case 'aborted': - return ClientToolCallState.aborted - case 'error': - case 'failed': - return ClientToolCallState.error - default: - return ClientToolCallState.pending - } -} - -function extractToolUiMetadata(data: Record): CopilotToolCall['ui'] | undefined { - const ui = asRecord(data.ui) - if (!ui || Object.keys(ui).length === 0) return undefined - const autoAllowedFromUi = ui.autoAllowed === true - const autoAllowedFromData = data.autoAllowed === true - return { - title: typeof ui.title === 'string' ? ui.title : undefined, - phaseLabel: typeof ui.phaseLabel === 'string' ? ui.phaseLabel : undefined, - icon: typeof ui.icon === 'string' ? ui.icon : undefined, - showInterrupt: ui.showInterrupt === true, - showRemember: ui.showRemember === true, - autoAllowed: autoAllowedFromUi || autoAllowedFromData, - actions: Array.isArray(ui.actions) - ? ui.actions - .map((action) => { - const a = asRecord(action) - const id = typeof a.id === 'string' ? a.id : undefined - const label = typeof a.label === 'string' ? a.label : undefined - const kind: 'accept' | 'reject' = a.kind === 'reject' ? 'reject' : 'accept' - if (!id || !label) return null - return { - id, - label, - kind, - remember: a.remember === true, - } - }) - .filter((a): a is NonNullable => !!a) - : undefined, - } -} - -function extractToolExecutionMetadata( - data: Record -): CopilotToolCall['execution'] | undefined { - const execution = asRecord(data.execution) - if (!execution || Object.keys(execution).length === 0) return undefined - return { - target: typeof execution.target === 'string' ? execution.target : undefined, - capabilityId: typeof execution.capabilityId === 'string' ? execution.capabilityId : undefined, - } -} - -function displayVerb(state: ClientToolCallState): string { - switch (state) { - case ClientToolCallState.success: - return 'Completed' - case ClientToolCallState.error: - return 'Failed' - case ClientToolCallState.rejected: - return 'Skipped' - case ClientToolCallState.aborted: - return 'Aborted' - case ClientToolCallState.generating: - return 'Preparing' - case ClientToolCallState.pending: - return 'Waiting' - default: - return 'Running' - } -} - -function resolveDisplayFromServerUi( - toolName: string, - state: ClientToolCallState, - toolCallId: string, - params: Record | undefined, - ui?: CopilotToolCall['ui'] -) { - const fallback = - resolveToolDisplay(toolName, state, toolCallId, params) || - humanizedFallback(toolName, state) - if (!fallback) return undefined - if (ui?.phaseLabel) { - return { text: ui.phaseLabel, icon: fallback.icon } - } - if (ui?.title) { - return { text: `${displayVerb(state)} ${ui.title}`, icon: fallback.icon } - } - return fallback -} - function isClientRunCapability(toolCall: CopilotToolCall): boolean { if (toolCall.execution?.target === 'sim_client_capability') { return toolCall.execution.capabilityId === 'workflow.run' || !toolCall.execution.capabilityId @@ -137,26 +38,6 @@ function isClientRunCapability(toolCall: CopilotToolCall): boolean { return CLIENT_EXECUTABLE_RUN_TOOLS.has(toolCall.name) } -function isWorkflowChangeApplyCall(toolCall: CopilotToolCall): boolean { - if (toolCall.name !== 'workflow_change') return false - const params = (toolCall.params || {}) as Record - const mode = typeof params.mode === 'string' ? params.mode.toLowerCase() : '' - if (mode === 'apply') return true - return typeof params.proposalId === 'string' && params.proposalId.length > 0 -} - -function extractOperationListFromResultPayload( - resultPayload: Record -): Array> | undefined { - const operations = resultPayload.operations - if (Array.isArray(operations)) return operations as Array> - - const compiled = resultPayload.compiledOperations - if (Array.isArray(compiled)) return compiled as Array> - - return undefined -} - export function appendSubAgentContent( context: ClientStreamingContext, parentToolCallId: string, @@ -428,7 +309,7 @@ export const subAgentSSEHandlers: Record = { ) if ( targetState === ClientToolCallState.success && - isWorkflowChangeApplyCall(existing) && + isWorkflowChangeApplyCall(existing.name, existing.params as Record) && resultPayload ) { const operations = extractOperationListFromResultPayload(resultPayload) diff --git a/apps/sim/lib/copilot/client-sse/tool-call-helpers.ts b/apps/sim/lib/copilot/client-sse/tool-call-helpers.ts new file mode 100644 index 000000000..79ecf0b49 --- /dev/null +++ b/apps/sim/lib/copilot/client-sse/tool-call-helpers.ts @@ -0,0 +1,134 @@ +import { asRecord } from '@/lib/copilot/orchestrator/sse-utils' +import { humanizedFallback, resolveToolDisplay } from '@/lib/copilot/store-utils' +import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry' +import type { CopilotToolCall } from '@/stores/panel/copilot/types' + +export function mapServerStateToClientState(state: unknown): ClientToolCallState { + switch (String(state || '')) { + case 'generating': + return ClientToolCallState.generating + case 'pending': + case 'awaiting_approval': + return ClientToolCallState.pending + case 'executing': + return ClientToolCallState.executing + case 'success': + return ClientToolCallState.success + case 'rejected': + case 'skipped': + return ClientToolCallState.rejected + case 'aborted': + return ClientToolCallState.aborted + case 'error': + case 'failed': + return ClientToolCallState.error + default: + return ClientToolCallState.pending + } +} + +export function extractToolUiMetadata( + data: Record +): CopilotToolCall['ui'] | undefined { + const ui = asRecord(data.ui) + if (!ui || Object.keys(ui).length === 0) return undefined + const autoAllowedFromUi = ui.autoAllowed === true + const autoAllowedFromData = data.autoAllowed === true + return { + title: typeof ui.title === 'string' ? ui.title : undefined, + phaseLabel: typeof ui.phaseLabel === 'string' ? ui.phaseLabel : undefined, + icon: typeof ui.icon === 'string' ? ui.icon : undefined, + showInterrupt: ui.showInterrupt === true, + showRemember: ui.showRemember === true, + autoAllowed: autoAllowedFromUi || autoAllowedFromData, + actions: Array.isArray(ui.actions) + ? ui.actions + .map((action) => { + const a = asRecord(action) + const id = typeof a.id === 'string' ? a.id : undefined + const label = typeof a.label === 'string' ? a.label : undefined + const kind: 'accept' | 'reject' = a.kind === 'reject' ? 'reject' : 'accept' + if (!id || !label) return null + return { + id, + label, + kind, + remember: a.remember === true, + } + }) + .filter((a): a is NonNullable => !!a) + : undefined, + } +} + +export function extractToolExecutionMetadata( + data: Record +): CopilotToolCall['execution'] | undefined { + const execution = asRecord(data.execution) + if (!execution || Object.keys(execution).length === 0) return undefined + return { + target: typeof execution.target === 'string' ? execution.target : undefined, + capabilityId: typeof execution.capabilityId === 'string' ? execution.capabilityId : undefined, + } +} + +function displayVerb(state: ClientToolCallState): string { + switch (state) { + case ClientToolCallState.success: + return 'Completed' + case ClientToolCallState.error: + return 'Failed' + case ClientToolCallState.rejected: + return 'Skipped' + case ClientToolCallState.aborted: + return 'Aborted' + case ClientToolCallState.generating: + return 'Preparing' + case ClientToolCallState.pending: + return 'Waiting' + default: + return 'Running' + } +} + +export function resolveDisplayFromServerUi( + toolName: string, + state: ClientToolCallState, + toolCallId: string, + params: Record | undefined, + ui?: CopilotToolCall['ui'] +) { + const fallback = + resolveToolDisplay(toolName, state, toolCallId, params) || + humanizedFallback(toolName, state) + if (!fallback) return undefined + if (ui?.phaseLabel) { + return { text: ui.phaseLabel, icon: fallback.icon } + } + if (ui?.title) { + return { text: `${displayVerb(state)} ${ui.title}`, icon: fallback.icon } + } + return fallback +} + +export function isWorkflowChangeApplyCall( + toolName?: string, + params?: Record +): boolean { + if (toolName !== 'workflow_change') return false + const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : '' + if (mode === 'apply') return true + return typeof params?.proposalId === 'string' && params.proposalId.length > 0 +} + +export function extractOperationListFromResultPayload( + resultPayload: Record +): Array> | undefined { + const operations = resultPayload.operations + if (Array.isArray(operations)) return operations as Array> + + const compiled = resultPayload.compiledOperations + if (Array.isArray(compiled)) return compiled as Array> + + return undefined +} diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts index da658b61f..5ff335e50 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts @@ -139,6 +139,98 @@ async function waitForClientCapabilityAndReport( markToolResultSeen(toolCall.id) } +function markToolCallAndNotify( + toolCall: ToolCallState, + statusCode: number, + message: string, + data: Record | undefined, + logScope: string +): void { + markToolComplete(toolCall.id, toolCall.name, statusCode, message, data).catch((err) => { + logger.error(`markToolComplete fire-and-forget failed (${logScope})`, { + toolCallId: toolCall.id, + error: err instanceof Error ? err.message : String(err), + }) + }) + markToolResultSeen(toolCall.id) +} + +async function executeToolCallWithPolicy( + toolCall: ToolCallState, + toolName: string, + toolData: Record, + context: StreamingContext, + execContext: ExecutionContext, + options: OrchestratorOptions, + logScope: string +): Promise { + const execution = getExecutionTarget(toolData, toolName) + const isInteractive = options.interactive === true + const requiresApproval = isInteractive && needsApproval(toolData) + + if (toolData.state) { + toolCall.status = mapServerStateToToolStatus(toolData.state) + } + + if (requiresApproval) { + const decision = await waitForToolDecision( + toolCall.id, + options.timeout || STREAM_TIMEOUT_MS, + options.abortSignal + ) + + if (decision?.status === 'accepted' || decision?.status === 'success') { + // Continue below into normal execution path. + } else if (decision?.status === 'rejected' || decision?.status === 'error') { + toolCall.status = 'rejected' + toolCall.endTime = Date.now() + markToolCallAndNotify( + toolCall, + 400, + decision.message || 'Tool execution rejected', + { skipped: true, reason: 'user_rejected' }, + `${logScope} rejected` + ) + return + } else if (decision?.status === 'background') { + toolCall.status = 'skipped' + toolCall.endTime = Date.now() + markToolCallAndNotify( + toolCall, + 202, + decision.message || 'Tool execution moved to background', + { background: true }, + `${logScope} background` + ) + return + } else { + // Decision was null (timeout/abort). + toolCall.status = 'rejected' + toolCall.endTime = Date.now() + markToolCallAndNotify( + toolCall, + 408, + 'Tool approval timed out', + { skipped: true, reason: 'timeout' }, + `${logScope} timeout` + ) + return + } + } + + if (execution.target === 'sim_client_capability' && isInteractive) { + await waitForClientCapabilityAndReport(toolCall, options, logScope) + return + } + + if ( + (execution.target === 'sim_server' || execution.target === 'sim_client_capability') && + options.autoExecuteTools !== false + ) { + await executeToolAndReport(toolCall.id, context, execContext, options) + } +} + // Normalization + dedupe helpers live in sse-utils to keep server/client in sync. function inferToolSuccess(data: Record | undefined): { @@ -272,101 +364,15 @@ export const sseHandlers: Record = { const toolCall = context.toolCalls.get(toolCallId) if (!toolCall) return - const execution = getExecutionTarget(toolData, toolName) - const isInteractive = options.interactive === true - const requiresApproval = isInteractive && needsApproval(toolData) - if (toolData.state) { - toolCall.status = mapServerStateToToolStatus(toolData.state) - } - - if (requiresApproval) { - const decision = await waitForToolDecision( - toolCallId, - options.timeout || STREAM_TIMEOUT_MS, - options.abortSignal - ) - if (decision?.status === 'accepted' || decision?.status === 'success') { - if (execution.target === 'sim_client_capability' && isInteractive) { - await waitForClientCapabilityAndReport(toolCall, options, 'run tool') - return - } - if (execution.target === 'sim_server' || execution.target === 'sim_client_capability') { - if (options.autoExecuteTools !== false) { - await executeToolAndReport(toolCallId, context, execContext, options) - } - } - return - } - - if (decision?.status === 'rejected' || decision?.status === 'error') { - toolCall.status = 'rejected' - toolCall.endTime = Date.now() - // Fire-and-forget: must NOT await — see deadlock note in executeToolAndReport - markToolComplete( - toolCall.id, - toolCall.name, - 400, - decision.message || 'Tool execution rejected', - { skipped: true, reason: 'user_rejected' } - ).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (rejected)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) - }) - markToolResultSeen(toolCall.id) - return - } - - if (decision?.status === 'background') { - toolCall.status = 'skipped' - toolCall.endTime = Date.now() - // Fire-and-forget: must NOT await — see deadlock note in executeToolAndReport - markToolComplete( - toolCall.id, - toolCall.name, - 202, - decision.message || 'Tool execution moved to background', - { background: true } - ).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (background)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) - }) - markToolResultSeen(toolCall.id) - return - } - - // Decision was null — timed out or aborted. - // Do NOT fall through to auto-execute. Mark the tool as timed out - // and notify Go so it can unblock waitForExternalTool. - toolCall.status = 'rejected' - toolCall.endTime = Date.now() - markToolComplete(toolCall.id, toolCall.name, 408, 'Tool approval timed out', { - skipped: true, - reason: 'timeout', - }).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (timeout)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) - }) - markToolResultSeen(toolCall.id) - return - } - - if (execution.target === 'sim_client_capability' && isInteractive) { - await waitForClientCapabilityAndReport(toolCall, options, 'run tool') - return - } - - if ( - (execution.target === 'sim_server' || execution.target === 'sim_client_capability') && - options.autoExecuteTools !== false - ) { - await executeToolAndReport(toolCallId, context, execContext, options) - } + await executeToolCallWithPolicy( + toolCall, + toolName, + toolData, + context, + execContext, + options, + 'run tool' + ) }, reasoning: (event, context) => { const d = asRecord(event.data) @@ -484,95 +490,15 @@ export const subAgentHandlers: Record = { if (isPartial) return - const execution = getExecutionTarget(toolData, toolName) - const isInteractive = options.interactive === true - const requiresApproval = isInteractive && needsApproval(toolData) - - if (requiresApproval) { - const decision = await waitForToolDecision( - toolCallId, - options.timeout || STREAM_TIMEOUT_MS, - options.abortSignal - ) - if (decision?.status === 'accepted' || decision?.status === 'success') { - if (execution.target === 'sim_client_capability' && isInteractive) { - await waitForClientCapabilityAndReport(toolCall, options, 'subagent run tool') - return - } - if (execution.target === 'sim_server' || execution.target === 'sim_client_capability') { - if (options.autoExecuteTools !== false) { - await executeToolAndReport(toolCallId, context, execContext, options) - } - } - return - } - if (decision?.status === 'rejected' || decision?.status === 'error') { - toolCall.status = 'rejected' - toolCall.endTime = Date.now() - // Fire-and-forget: must NOT await — see deadlock note in executeToolAndReport - markToolComplete( - toolCall.id, - toolCall.name, - 400, - decision.message || 'Tool execution rejected', - { skipped: true, reason: 'user_rejected' } - ).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (subagent rejected)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) - }) - markToolResultSeen(toolCall.id) - return - } - if (decision?.status === 'background') { - toolCall.status = 'skipped' - toolCall.endTime = Date.now() - // Fire-and-forget: must NOT await — see deadlock note in executeToolAndReport - markToolComplete( - toolCall.id, - toolCall.name, - 202, - decision.message || 'Tool execution moved to background', - { background: true } - ).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (subagent background)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) - }) - markToolResultSeen(toolCall.id) - return - } - - // Decision was null — timed out or aborted. - // Do NOT fall through to auto-execute. - toolCall.status = 'rejected' - toolCall.endTime = Date.now() - markToolComplete(toolCall.id, toolCall.name, 408, 'Tool approval timed out', { - skipped: true, - reason: 'timeout', - }).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (subagent timeout)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) - }) - markToolResultSeen(toolCall.id) - return - } - - if (execution.target === 'sim_client_capability' && isInteractive) { - await waitForClientCapabilityAndReport(toolCall, options, 'subagent run tool') - return - } - - if ( - (execution.target === 'sim_server' || execution.target === 'sim_client_capability') && - options.autoExecuteTools !== false - ) { - await executeToolAndReport(toolCallId, context, execContext, options) - } + await executeToolCallWithPolicy( + toolCall, + toolName, + toolData, + context, + execContext, + options, + 'subagent run tool' + ) }, tool_result: (event, context) => { const parentToolCallId = context.subAgentParentToolCallId diff --git a/apps/sim/lib/copilot/tools/server/workflow/change-store.ts b/apps/sim/lib/copilot/tools/server/workflow/change-store.ts index dcb316f11..ab0bea9a4 100644 --- a/apps/sim/lib/copilot/tools/server/workflow/change-store.ts +++ b/apps/sim/lib/copilot/tools/server/workflow/change-store.ts @@ -1,4 +1,6 @@ import crypto from 'crypto' +import { createLogger } from '@sim/logger' +import { getRedisClient } from '@/lib/core/config/redis' type StoreEntry = { value: T @@ -7,6 +9,11 @@ type StoreEntry = { const DEFAULT_TTL_MS = 30 * 60 * 1000 const MAX_ENTRIES = 500 +const DEFAULT_TTL_SECONDS = Math.floor(DEFAULT_TTL_MS / 1000) +const CONTEXT_PREFIX = 'copilot:workflow_change:context' +const PROPOSAL_PREFIX = 'copilot:workflow_change:proposal' + +const logger = createLogger('WorkflowChangeStore') class TTLStore { private readonly data = new Map>() @@ -89,18 +96,90 @@ export type WorkflowChangeProposal = { const contextPackStore = new TTLStore() const proposalStore = new TTLStore() -export function saveContextPack(pack: WorkflowContextPack): string { - return contextPackStore.set(pack) +function getContextRedisKey(id: string): string { + return `${CONTEXT_PREFIX}:${id}` } -export function getContextPack(id: string): WorkflowContextPack | null { +function getProposalRedisKey(id: string): string { + return `${PROPOSAL_PREFIX}:${id}` +} + +async function writeRedisJson(key: string, value: unknown): Promise { + const redis = getRedisClient()! + await redis.set(key, JSON.stringify(value), 'EX', DEFAULT_TTL_SECONDS) +} + +async function readRedisJson(key: string): Promise { + const redis = getRedisClient()! + + const raw = await redis.get(key) + if (!raw) { + return null + } + + try { + return JSON.parse(raw) as T + } catch (error) { + logger.warn('Failed parsing workflow change store JSON payload', { key, error }) + await redis.del(key).catch(() => {}) + return null + } +} + +export async function saveContextPack(pack: WorkflowContextPack): Promise { + if (!getRedisClient()) { + return contextPackStore.set(pack) + } + const id = crypto.randomUUID() + try { + await writeRedisJson(getContextRedisKey(id), pack) + return id + } catch (error) { + logger.warn('Redis write failed for workflow context pack, using memory fallback', { error }) + return contextPackStore.set(pack) + } +} + +export async function getContextPack(id: string): Promise { + if (!getRedisClient()) { + return contextPackStore.get(id) + } + try { + const redisPayload = await readRedisJson(getContextRedisKey(id)) + if (redisPayload) { + return redisPayload + } + } catch (error) { + logger.warn('Redis read failed for workflow context pack, using memory fallback', { error }) + } return contextPackStore.get(id) } -export function saveProposal(proposal: WorkflowChangeProposal): string { - return proposalStore.set(proposal) +export async function saveProposal(proposal: WorkflowChangeProposal): Promise { + if (!getRedisClient()) { + return proposalStore.set(proposal) + } + const id = crypto.randomUUID() + try { + await writeRedisJson(getProposalRedisKey(id), proposal) + return id + } catch (error) { + logger.warn('Redis write failed for workflow proposal, using memory fallback', { error }) + return proposalStore.set(proposal) + } } -export function getProposal(id: string): WorkflowChangeProposal | null { +export async function getProposal(id: string): Promise { + if (!getRedisClient()) { + return proposalStore.get(id) + } + try { + const redisPayload = await readRedisJson(getProposalRedisKey(id)) + if (redisPayload) { + return redisPayload + } + } catch (error) { + logger.warn('Redis read failed for workflow proposal, using memory fallback', { error }) + } return proposalStore.get(id) } diff --git a/apps/sim/lib/copilot/tools/server/workflow/workflow-change.ts b/apps/sim/lib/copilot/tools/server/workflow/workflow-change.ts index f27dbf92d..1ce5de146 100644 --- a/apps/sim/lib/copilot/tools/server/workflow/workflow-change.ts +++ b/apps/sim/lib/copilot/tools/server/workflow/workflow-change.ts @@ -1162,7 +1162,10 @@ export const workflowChangeServerTool: BaseServerTool } if (params.mode === 'dry_run') { - const workflowId = params.workflowId || getContextPack(params.contextPackId || '')?.workflowId + const contextPack = params.contextPackId + ? await getContextPack(params.contextPackId) + : null + const workflowId = params.workflowId || contextPack?.workflowId if (!workflowId) { throw new Error('workflowId is required for dry_run') } @@ -1231,7 +1234,7 @@ export const workflowChangeServerTool: BaseServerTool unresolvedRisks: params.changeSpec.unresolvedRisks, }, } - const proposalId = saveProposal(proposal) + const proposalId = await saveProposal(proposal) logger.info('Compiled workflow_change dry run', { workflowId, @@ -1265,7 +1268,7 @@ export const workflowChangeServerTool: BaseServerTool throw new Error('proposalId is required for apply') } - const proposal = getProposal(proposalId) + const proposal = await getProposal(proposalId) if (!proposal) { throw new Error(`Proposal not found or expired: ${proposalId}`) } diff --git a/apps/sim/lib/copilot/tools/server/workflow/workflow-context.ts b/apps/sim/lib/copilot/tools/server/workflow/workflow-context.ts index ce44d9024..47e171842 100644 --- a/apps/sim/lib/copilot/tools/server/workflow/workflow-context.ts +++ b/apps/sim/lib/copilot/tools/server/workflow/workflow-context.ts @@ -84,7 +84,7 @@ export const workflowContextGetServerTool: BaseServerTool