Checkpoint

This commit is contained in:
Siddharth Ganesan
2026-02-12 11:14:33 -08:00
parent 76bd405293
commit f733b8dd88
7 changed files with 177 additions and 363 deletions

View File

@@ -11,13 +11,9 @@ import {
} from '@/lib/copilot/store-utils'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import type { CopilotStore, CopilotStreamInfo, CopilotToolCall } from '@/stores/panel/copilot/types'
import { useVariablesStore } from '@/stores/panel/variables/store'
import { useEnvironmentStore } from '@/stores/settings/environment/store'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import { appendTextBlock, beginThinkingBlock, finalizeThinkingBlock } from './content-blocks'
import { CLIENT_EXECUTABLE_RUN_TOOLS, executeRunToolOnClient } from './run-tool-execution'
import { applyToolEffects } from './tool-effects'
import type { ClientContentBlock, ClientStreamingContext } from './types'
const logger = createLogger('CopilotClientSseHandlers')
@@ -27,14 +23,6 @@ const MAX_BATCH_INTERVAL = 50
const MIN_BATCH_INTERVAL = 16
const MAX_QUEUE_SIZE = 5
function isWorkflowEditToolCall(toolName?: string, params?: Record<string, unknown>): boolean {
if (toolName !== 'workflow_change') return false
const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params?.proposalId === 'string' && params.proposalId.length > 0
}
function isClientRunCapability(toolCall: CopilotToolCall): boolean {
if (toolCall.execution?.target === 'sim_client_capability') {
return toolCall.execution.capabilityId === 'workflow.run' || !toolCall.execution.capabilityId
@@ -155,19 +143,6 @@ function isWorkflowChangeApplyCall(toolName?: string, params?: Record<string, un
return typeof params?.proposalId === 'string' && params.proposalId.length > 0
}
function extractWorkflowStateFromResultPayload(
resultPayload: Record<string, unknown>
): WorkflowState | null {
const directState = asRecord(resultPayload.workflowState)
if (directState) return directState as unknown as WorkflowState
const editResult = asRecord(resultPayload.editResult)
const nestedState = asRecord(editResult?.workflowState)
if (nestedState) return nestedState as unknown as WorkflowState
return null
}
function extractOperationListFromResultPayload(
resultPayload: Record<string, unknown>
): Array<Record<string, unknown>> | undefined {
@@ -480,158 +455,12 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
}
if (
targetState === ClientToolCallState.success &&
isWorkflowEditToolCall(current.name, paramsForCurrentToolCall)
) {
try {
const workflowState = resultPayload
? extractWorkflowStateFromResultPayload(resultPayload)
: null
const hasWorkflowState = !!workflowState
logger.info('[SSE] workflow edit result received', {
toolName: current.name,
hasWorkflowState,
blockCount: hasWorkflowState
? Object.keys((workflowState as any).blocks ?? {}).length
: 0,
edgeCount:
hasWorkflowState && Array.isArray((workflowState as any).edges)
? (workflowState as any).edges.length
: 0,
})
if (workflowState) {
const diffStore = useWorkflowDiffStore.getState()
diffStore.setProposedChanges(workflowState).catch((err) => {
logger.error('[SSE] Failed to apply workflow edit diff', {
error: err instanceof Error ? err.message : String(err),
toolName: current.name,
})
})
}
} catch (err) {
logger.error('[SSE] workflow edit result handling failed', {
error: err instanceof Error ? err.message : String(err),
toolName: current.name,
})
}
}
// Deploy tools: update deployment status in workflow registry
if (
targetState === ClientToolCallState.success &&
(current.name === 'deploy_api' ||
current.name === 'deploy_chat' ||
current.name === 'deploy_mcp' ||
current.name === 'redeploy' ||
current.name === 'workflow_deploy')
) {
try {
const resultPayload = asRecord(
data?.result || eventData.result || eventData.data || data?.data
)
const input = asRecord(current.params)
const workflowId =
(resultPayload?.workflowId as string) ||
(input?.workflowId as string) ||
useWorkflowRegistry.getState().activeWorkflowId
const deployMode = String(input?.mode || '')
const action = String(input?.action || 'deploy')
const isDeployed = (() => {
if (typeof resultPayload?.isDeployed === 'boolean') return resultPayload.isDeployed
if (current.name !== 'workflow_deploy') return true
if (deployMode === 'status') {
const statusIsDeployed = resultPayload?.isDeployed
return typeof statusIsDeployed === 'boolean' ? statusIsDeployed : true
}
if (deployMode === 'api' || deployMode === 'chat') return action !== 'undeploy'
if (deployMode === 'redeploy' || deployMode === 'mcp') return true
return true
})()
if (workflowId) {
useWorkflowRegistry
.getState()
.setDeploymentStatus(workflowId, isDeployed, isDeployed ? new Date() : undefined)
logger.info('[SSE] Updated deployment status from tool result', {
toolName: current.name,
workflowId,
isDeployed,
})
}
} catch (err) {
logger.warn('[SSE] Failed to hydrate deployment status', {
error: err instanceof Error ? err.message : String(err),
})
}
}
// Environment variables: reload store after successful set
if (
targetState === ClientToolCallState.success &&
current.name === 'set_environment_variables'
) {
try {
useEnvironmentStore.getState().loadEnvironmentVariables()
logger.info('[SSE] Triggered environment variables reload')
} catch (err) {
logger.warn('[SSE] Failed to reload environment variables', {
error: err instanceof Error ? err.message : String(err),
})
}
}
// Workflow variables: reload store after successful set
if (
targetState === ClientToolCallState.success &&
current.name === 'set_global_workflow_variables'
) {
try {
const input = asRecord(current.params)
const workflowId =
(input?.workflowId as string) || useWorkflowRegistry.getState().activeWorkflowId
if (workflowId) {
useVariablesStore.getState().loadForWorkflow(workflowId)
logger.info('[SSE] Triggered workflow variables reload', { workflowId })
}
} catch (err) {
logger.warn('[SSE] Failed to reload workflow variables', {
error: err instanceof Error ? err.message : String(err),
})
}
}
// Generate API key: update deployment status with the new key
if (targetState === ClientToolCallState.success && current.name === 'generate_api_key') {
try {
const resultPayload = asRecord(
data?.result || eventData.result || eventData.data || data?.data
)
const input = asRecord(current.params)
const workflowId =
(input?.workflowId as string) || useWorkflowRegistry.getState().activeWorkflowId
const apiKey = (resultPayload?.apiKey || resultPayload?.key) as string | undefined
if (workflowId) {
const existingStatus = useWorkflowRegistry
.getState()
.getWorkflowDeploymentStatus(workflowId)
useWorkflowRegistry
.getState()
.setDeploymentStatus(
workflowId,
existingStatus?.isDeployed ?? false,
existingStatus?.deployedAt,
apiKey
)
logger.info('[SSE] Updated deployment status with API key', {
workflowId,
hasKey: !!apiKey,
})
}
} catch (err) {
logger.warn('[SSE] Failed to hydrate API key status', {
error: err instanceof Error ? err.message : String(err),
})
}
if (targetState === ClientToolCallState.success) {
applyToolEffects({
effectsRaw: eventData.effects,
toolCall: updatedMap[toolCallId],
resultPayload,
})
}
}

View File

@@ -16,10 +16,6 @@ const logger = createLogger('CopilotRunToolExecution')
*/
export const CLIENT_EXECUTABLE_RUN_TOOLS = new Set([
'workflow_run',
'run_workflow',
'run_workflow_until_block',
'run_from_block',
'run_block',
])
/**
@@ -76,9 +72,7 @@ async function doExecuteRunTool(
| undefined
const runMode =
toolName === 'workflow_run'
? ((params.mode as string | undefined) || 'full').toLowerCase()
: undefined
toolName === 'workflow_run' ? ((params.mode as string | undefined) || 'full').toLowerCase() : undefined
if (
toolName === 'workflow_run' &&
@@ -101,9 +95,6 @@ async function doExecuteRunTool(
if (toolName === 'workflow_run' && runMode === 'until_block') {
return params.stopAfterBlockId as string | undefined
}
if (toolName === 'run_workflow_until_block')
return params.stopAfterBlockId as string | undefined
if (toolName === 'run_block') return params.blockId as string | undefined
if (toolName === 'workflow_run' && runMode === 'block') {
return params.blockId as string | undefined
}
@@ -123,18 +114,6 @@ async function doExecuteRunTool(
executionId: (params.executionId as string | undefined) || 'latest',
}
}
if (toolName === 'run_from_block' && params.startBlockId) {
return {
startBlockId: params.startBlockId as string,
executionId: (params.executionId as string | undefined) || 'latest',
}
}
if (toolName === 'run_block' && params.blockId) {
return {
startBlockId: params.blockId as string,
executionId: (params.executionId as string | undefined) || 'latest',
}
}
return undefined
})()

View File

@@ -9,14 +9,13 @@ import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
import { humanizedFallback, resolveToolDisplay } from '@/lib/copilot/store-utils'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import {
type SSEHandler,
sseHandlers,
updateStreamingMessage,
} from './handlers'
import { CLIENT_EXECUTABLE_RUN_TOOLS, executeRunToolOnClient } from './run-tool-execution'
import { applyToolEffects } from './tool-effects'
import type { ClientStreamingContext } from './types'
const logger = createLogger('CopilotClientSubagentHandlers')
@@ -146,19 +145,6 @@ function isWorkflowChangeApplyCall(toolCall: CopilotToolCall): boolean {
return typeof params.proposalId === 'string' && params.proposalId.length > 0
}
function extractWorkflowStateFromResultPayload(
resultPayload: Record<string, unknown>
): WorkflowState | null {
const directState = asRecord(resultPayload.workflowState)
if (directState) return directState as unknown as WorkflowState
const editResult = asRecord(resultPayload.editResult)
const nestedState = asRecord(editResult?.workflowState)
if (nestedState) return nestedState as unknown as WorkflowState
return null
}
function extractOperationListFromResultPayload(
resultPayload: Record<string, unknown>
): Array<Record<string, unknown>> | undefined {
@@ -491,21 +477,12 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
})
}
if (
targetState === ClientToolCallState.success &&
resultPayload &&
isWorkflowChangeApplyCall(updatedSubAgentToolCall)
) {
const workflowState = extractWorkflowStateFromResultPayload(resultPayload)
if (workflowState) {
const diffStore = useWorkflowDiffStore.getState()
diffStore.setProposedChanges(workflowState).catch((error) => {
logger.error('[SubAgent] Failed to apply workflow_change diff', {
error: error instanceof Error ? error.message : String(error),
toolCallId,
})
})
}
if (targetState === ClientToolCallState.success) {
applyToolEffects({
effectsRaw: resultData.effects,
toolCall: updatedSubAgentToolCall,
resultPayload,
})
}
}

View File

@@ -0,0 +1,162 @@
import { createLogger } from '@sim/logger'
import { asRecord } from '@/lib/copilot/orchestrator/sse-utils'
import type { CopilotToolCall } from '@/stores/panel/copilot/types'
import { useVariablesStore } from '@/stores/panel/variables/store'
import { useEnvironmentStore } from '@/stores/settings/environment/store'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
const logger = createLogger('CopilotToolEffects')
type ParsedToolEffect = {
kind: string
payload: Record<string, unknown>
}
function parseToolEffects(raw: unknown): ParsedToolEffect[] {
if (!Array.isArray(raw)) return []
const effects: ParsedToolEffect[] = []
for (const item of raw) {
const effect = asRecord(item)
const kind = typeof effect.kind === 'string' ? effect.kind : ''
if (!kind) continue
effects.push({
kind,
payload: asRecord(effect.payload) || {},
})
}
return effects
}
function resolveWorkflowId(
payload: Record<string, unknown>,
toolCall?: CopilotToolCall
): string | undefined {
const payloadWorkflowId = typeof payload.workflowId === 'string' ? payload.workflowId : undefined
if (payloadWorkflowId) return payloadWorkflowId
const params = asRecord(toolCall?.params)
const paramWorkflowId = typeof params?.workflowId === 'string' ? params.workflowId : undefined
if (paramWorkflowId) return paramWorkflowId
return useWorkflowRegistry.getState().activeWorkflowId || undefined
}
function resolveWorkflowState(
payload: Record<string, unknown>,
resultPayload?: Record<string, unknown>
): WorkflowState | null {
const payloadState = asRecord(payload.workflowState)
if (payloadState) return payloadState as unknown as WorkflowState
if (resultPayload) {
const directState = asRecord(resultPayload.workflowState)
if (directState) return directState as unknown as WorkflowState
const editResult = asRecord(resultPayload.editResult)
const nestedState = asRecord(editResult?.workflowState)
if (nestedState) return nestedState as unknown as WorkflowState
}
return null
}
function applyDeploymentSyncEffect(payload: Record<string, unknown>, toolCall?: CopilotToolCall): void {
const workflowId = resolveWorkflowId(payload, toolCall)
if (!workflowId) return
const registry = useWorkflowRegistry.getState()
const existingStatus = registry.getWorkflowDeploymentStatus(workflowId)
const isDeployed =
typeof payload.isDeployed === 'boolean'
? payload.isDeployed
: (existingStatus?.isDeployed ?? true)
const deployedAt = (() => {
if (typeof payload.deployedAt === 'string' && payload.deployedAt) {
const parsed = new Date(payload.deployedAt)
if (!Number.isNaN(parsed.getTime())) return parsed
}
return existingStatus?.deployedAt
})()
const apiKey =
typeof payload.apiKey === 'string' && payload.apiKey.length > 0
? payload.apiKey
: existingStatus?.apiKey
registry.setDeploymentStatus(workflowId, isDeployed, deployedAt, apiKey)
}
function applyApiKeySyncEffect(payload: Record<string, unknown>, toolCall?: CopilotToolCall): void {
const workflowId = resolveWorkflowId(payload, toolCall)
if (!workflowId) return
const apiKey = typeof payload.apiKey === 'string' ? payload.apiKey : undefined
const registry = useWorkflowRegistry.getState()
const existingStatus = registry.getWorkflowDeploymentStatus(workflowId)
registry.setDeploymentStatus(
workflowId,
existingStatus?.isDeployed ?? false,
existingStatus?.deployedAt,
apiKey || existingStatus?.apiKey
)
}
function applyWorkflowVariablesReload(
payload: Record<string, unknown>,
toolCall?: CopilotToolCall
): void {
const workflowId = resolveWorkflowId(payload, toolCall)
if (!workflowId) return
useVariablesStore.getState().loadForWorkflow(workflowId)
}
export function applyToolEffects(params: {
effectsRaw: unknown
toolCall?: CopilotToolCall
resultPayload?: Record<string, unknown>
}): void {
const effects = parseToolEffects(params.effectsRaw)
if (effects.length === 0) return
for (const effect of effects) {
switch (effect.kind) {
case 'workflow.diff.proposed': {
const workflowState = resolveWorkflowState(effect.payload, params.resultPayload)
if (!workflowState) break
useWorkflowDiffStore
.getState()
.setProposedChanges(workflowState)
.catch((error) => {
logger.error('Failed to apply workflow diff effect', {
error: error instanceof Error ? error.message : String(error),
})
})
break
}
case 'workflow.deployment.sync':
applyDeploymentSyncEffect(effect.payload, params.toolCall)
break
case 'workflow.api_key.sync':
applyApiKeySyncEffect(effect.payload, params.toolCall)
break
case 'environment.variables.reload':
useEnvironmentStore.getState().loadEnvironmentVariables()
break
case 'workflow.variables.reload':
applyWorkflowVariablesReload(effect.payload, params.toolCall)
break
default:
logger.debug('Ignoring unknown tool effect', { kind: effect.kind })
break
}
}
}

View File

@@ -30,10 +30,6 @@ const logger = createLogger('CopilotSseHandlers')
*/
const CLIENT_EXECUTABLE_RUN_TOOLS = new Set([
'workflow_run',
'run_workflow',
'run_workflow_until_block',
'run_from_block',
'run_block',
])
function mapServerStateToToolStatus(state: unknown): ToolCallState['status'] {

View File

@@ -433,11 +433,6 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record<
get_block_upstream_references: (p, c) =>
executeGetBlockUpstreamReferences(p as unknown as GetBlockUpstreamReferencesParams, c),
workflow_run: (p, c) => executeWorkflowRunUnified(p, c),
run_workflow: (p, c) => executeRunWorkflow(p as RunWorkflowParams, c),
run_workflow_until_block: (p, c) =>
executeRunWorkflowUntilBlock(p as unknown as RunWorkflowUntilBlockParams, c),
run_from_block: (p, c) => executeRunFromBlock(p as unknown as RunFromBlockParams, c),
run_block: (p, c) => executeRunBlock(p as unknown as RunBlockParams, c),
get_deployed_workflow_state: (p, c) =>
executeGetDeployedWorkflowState(p as GetDeployedWorkflowStateParams, c),
generate_api_key: (p, c) => executeGenerateApiKey(p as unknown as GenerateApiKeyParams, c),
@@ -449,10 +444,6 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record<
set_global_workflow_variables: (p, c) =>
executeSetGlobalWorkflowVariables(p as SetGlobalWorkflowVariablesParams, c),
workflow_deploy: (p, c) => executeWorkflowDeployUnified(p, c),
deploy_api: (p, c) => executeDeployApi(p as DeployApiParams, c),
deploy_chat: (p, c) => executeDeployChat(p as DeployChatParams, c),
deploy_mcp: (p, c) => executeDeployMcp(p as DeployMcpParams, c),
redeploy: (_p, c) => executeRedeploy(c),
check_deployment_status: (p, c) =>
executeCheckDeploymentStatus(p as CheckDeploymentStatusParams, c),
list_workspace_mcp_servers: (p, c) =>

View File

@@ -236,126 +236,6 @@ export const DIRECT_TOOL_DEFS: DirectToolDef[] = [
required: ['workflowId'],
},
},
{
name: 'run_workflow',
toolId: 'run_workflow',
description:
'Run a workflow and return its output. Works on both draft and deployed states. By default runs the draft (live) state.',
inputSchema: {
type: 'object',
properties: {
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to run.',
},
workflow_input: {
type: 'object',
description:
'JSON object with input values. Keys should match the workflow start block input field names.',
},
useDeployedState: {
type: 'boolean',
description: 'When true, runs the deployed version instead of the draft. Default: false.',
},
},
required: ['workflowId'],
},
},
{
name: 'run_workflow_until_block',
toolId: 'run_workflow_until_block',
description:
'Run a workflow and stop after a specific block completes. Useful for testing partial execution or debugging specific blocks.',
inputSchema: {
type: 'object',
properties: {
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to run.',
},
stopAfterBlockId: {
type: 'string',
description:
'REQUIRED. The block ID to stop after. Execution halts once this block completes.',
},
workflow_input: {
type: 'object',
description: 'JSON object with input values for the workflow.',
},
useDeployedState: {
type: 'boolean',
description: 'When true, runs the deployed version instead of the draft. Default: false.',
},
},
required: ['workflowId', 'stopAfterBlockId'],
},
},
{
name: 'run_from_block',
toolId: 'run_from_block',
description:
'Run a workflow starting from a specific block, using cached outputs from a prior execution for upstream blocks. The workflow must have been run at least once first.',
inputSchema: {
type: 'object',
properties: {
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to run.',
},
startBlockId: {
type: 'string',
description: 'REQUIRED. The block ID to start execution from.',
},
executionId: {
type: 'string',
description:
'Optional. Specific execution ID to load the snapshot from. Uses latest if omitted.',
},
workflow_input: {
type: 'object',
description: 'Optional input values for the workflow.',
},
useDeployedState: {
type: 'boolean',
description: 'When true, runs the deployed version instead of the draft. Default: false.',
},
},
required: ['workflowId', 'startBlockId'],
},
},
{
name: 'run_block',
toolId: 'run_block',
description:
'Run a single block in isolation using cached outputs from a prior execution. Only the specified block executes — nothing upstream or downstream. The workflow must have been run at least once first.',
inputSchema: {
type: 'object',
properties: {
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID.',
},
blockId: {
type: 'string',
description: 'REQUIRED. The block ID to run in isolation.',
},
executionId: {
type: 'string',
description:
'Optional. Specific execution ID to load the snapshot from. Uses latest if omitted.',
},
workflow_input: {
type: 'object',
description: 'Optional input values for the workflow.',
},
useDeployedState: {
type: 'boolean',
description: 'When true, runs the deployed version instead of the draft. Default: false.',
},
},
required: ['workflowId', 'blockId'],
},
},
{
name: 'get_deployed_workflow_state',
toolId: 'get_deployed_workflow_state',