From 3d5321d9a1434c1fc8dc57d4bbeb77e7d00bc6ec Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Fri, 6 Feb 2026 11:44:27 -0800 Subject: [PATCH] Add tools --- apps/sim/executor/execution/engine.ts | 23 ++ apps/sim/executor/types.ts | 2 + apps/sim/lib/copilot/orchestrator/config.ts | 4 + .../orchestrator/tool-executor/index.ts | 17 ++ .../orchestrator/tool-executor/param-types.ts | 43 +++ .../tool-executor/workflow-tools/mutations.ts | 258 +++++++++++++++++- .../tool-executor/workflow-tools/queries.ts | 53 +++- apps/sim/lib/copilot/tools/mcp/definitions.ts | 152 +++++++++++ apps/sim/lib/logs/execution/logger.ts | 4 + .../sim/lib/logs/execution/logging-session.ts | 6 +- apps/sim/lib/logs/types.ts | 2 + .../workflows/executor/execute-workflow.ts | 11 +- .../lib/workflows/executor/execution-core.ts | 1 + .../lib/workflows/executor/execution-state.ts | 53 ++++ 14 files changed, 624 insertions(+), 5 deletions(-) create mode 100644 apps/sim/lib/workflows/executor/execution-state.ts diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 47afd8b03..c0d015d5e 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -4,6 +4,7 @@ import { BlockType } from '@/executor/constants' import type { DAG } from '@/executor/dag/builder' import type { EdgeManager } from '@/executor/execution/edge-manager' import { serializePauseSnapshot } from '@/executor/execution/snapshot-serializer' +import type { SerializableExecutionState } from '@/executor/execution/types' import type { NodeExecutionOrchestrator } from '@/executor/orchestrators/node' import type { ExecutionContext, @@ -135,6 +136,7 @@ export class ExecutionEngine { success: false, output: this.finalOutput, logs: this.context.blockLogs, + executionState: this.getSerializableExecutionState(), metadata: this.context.metadata, status: 'cancelled', } @@ -144,6 +146,7 @@ export class ExecutionEngine { success: true, output: this.finalOutput, logs: this.context.blockLogs, + executionState: this.getSerializableExecutionState(), metadata: this.context.metadata, } } catch (error) { @@ -157,6 +160,7 @@ export class ExecutionEngine { success: false, output: this.finalOutput, logs: this.context.blockLogs, + executionState: this.getSerializableExecutionState(), metadata: this.context.metadata, status: 'cancelled', } @@ -459,6 +463,7 @@ export class ExecutionEngine { success: true, output: this.collectPauseResponses(), logs: this.context.blockLogs, + executionState: this.getSerializableExecutionState(snapshotSeed), metadata: this.context.metadata, status: 'paused', pausePoints, @@ -466,6 +471,24 @@ export class ExecutionEngine { } } + private getSerializableExecutionState( + snapshotSeed?: { snapshot: string } + ): SerializableExecutionState | undefined { + try { + const serializedSnapshot = + snapshotSeed?.snapshot ?? serializePauseSnapshot(this.context, [], this.dag).snapshot + const parsedSnapshot = JSON.parse(serializedSnapshot) as { + state?: SerializableExecutionState + } + return parsedSnapshot.state + } catch (error) { + logger.warn('Failed to serialize execution state', { + error: error instanceof Error ? error.message : String(error), + }) + return undefined + } + } + private collectPauseResponses(): NormalizedBlockOutput { const responses = Array.from(this.pausedBlocks.values()).map((pause) => pause.response) diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 10c1996b3..b8bcb70f1 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -1,6 +1,7 @@ import type { TraceSpan } from '@/lib/logs/types' import type { PermissionGroupConfig } from '@/lib/permission-groups/types' import type { BlockOutput } from '@/blocks/types' +import type { SerializableExecutionState } from '@/executor/execution/types' import type { RunFromBlockContext } from '@/executor/utils/run-from-block' import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types' @@ -302,6 +303,7 @@ export interface ExecutionResult { output: NormalizedBlockOutput error?: string logs?: BlockLog[] + executionState?: SerializableExecutionState metadata?: ExecutionMetadata status?: 'completed' | 'paused' | 'cancelled' pausePoints?: PausePoint[] diff --git a/apps/sim/lib/copilot/orchestrator/config.ts b/apps/sim/lib/copilot/orchestrator/config.ts index 76bf83bd4..9e3dc1a28 100644 --- a/apps/sim/lib/copilot/orchestrator/config.ts +++ b/apps/sim/lib/copilot/orchestrator/config.ts @@ -1,6 +1,9 @@ export const INTERRUPT_TOOL_NAMES = [ 'set_global_workflow_variables', 'run_workflow', + 'run_workflow_until_block', + 'run_from_block', + 'run_block', 'manage_mcp_tool', 'manage_custom_tool', 'deploy_mcp', @@ -12,6 +15,7 @@ export const INTERRUPT_TOOL_NAMES = [ 'oauth_request_access', 'navigate_ui', 'knowledge_base', + 'generate_api_key', ] as const export const INTERRUPT_TOOL_SET = new Set(INTERRUPT_TOOL_NAMES) diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts index 9ca5c9763..cb175f27e 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts @@ -30,8 +30,10 @@ import type { DeployApiParams, DeployChatParams, DeployMcpParams, + GenerateApiKeyParams, GetBlockOutputsParams, GetBlockUpstreamReferencesParams, + GetDeployedWorkflowStateParams, GetUserWorkflowParams, GetWorkflowDataParams, GetWorkflowFromNameParams, @@ -41,14 +43,19 @@ import type { MoveFolderParams, MoveWorkflowParams, RenameWorkflowParams, + RunBlockParams, + RunFromBlockParams, RunWorkflowParams, + RunWorkflowUntilBlockParams, SetGlobalWorkflowVariablesParams, } from './param-types' import { executeCreateFolder, executeCreateWorkflow, + executeGenerateApiKey, executeGetBlockOutputs, executeGetBlockUpstreamReferences, + executeGetDeployedWorkflowState, executeGetUserWorkflow, executeGetWorkflowData, executeGetWorkflowFromName, @@ -58,7 +65,10 @@ import { executeMoveFolder, executeMoveWorkflow, executeRenameWorkflow, + executeRunBlock, + executeRunFromBlock, executeRunWorkflow, + executeRunWorkflowUntilBlock, executeSetGlobalWorkflowVariables, } from './workflow-tools' @@ -99,6 +109,13 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record< get_block_upstream_references: (p, c) => executeGetBlockUpstreamReferences(p as unknown as GetBlockUpstreamReferencesParams, c), run_workflow: (p, c) => executeRunWorkflow(p as RunWorkflowParams, c), + run_workflow_until_block: (p, c) => + executeRunWorkflowUntilBlock(p as unknown as RunWorkflowUntilBlockParams, c), + run_from_block: (p, c) => executeRunFromBlock(p as unknown as RunFromBlockParams, c), + run_block: (p, c) => executeRunBlock(p as unknown as RunBlockParams, c), + get_deployed_workflow_state: (p, c) => + executeGetDeployedWorkflowState(p as GetDeployedWorkflowStateParams, c), + generate_api_key: (p, c) => executeGenerateApiKey(p as unknown as GenerateApiKeyParams, c), set_global_workflow_variables: (p, c) => executeSetGlobalWorkflowVariables(p as SetGlobalWorkflowVariablesParams, c), deploy_api: (p, c) => executeDeployApi(p as DeployApiParams, c), diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/param-types.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/param-types.ts index 87c78ffbf..1f49ab616 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/param-types.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/param-types.ts @@ -57,6 +57,49 @@ export interface RunWorkflowParams { workflowId?: string workflow_input?: unknown input?: unknown + /** When true, runs the deployed version instead of the draft. Default: false (draft). */ + useDeployedState?: boolean +} + +export interface RunWorkflowUntilBlockParams { + workflowId?: string + workflow_input?: unknown + input?: unknown + /** The block ID to stop after. Execution halts once this block completes. */ + stopAfterBlockId: string + /** When true, runs the deployed version instead of the draft. Default: false (draft). */ + useDeployedState?: boolean +} + +export interface RunFromBlockParams { + workflowId?: string + /** The block ID to start execution from. */ + startBlockId: string + /** Optional execution ID to load the snapshot from. If omitted, uses the latest execution. */ + executionId?: string + workflow_input?: unknown + input?: unknown + useDeployedState?: boolean +} + +export interface RunBlockParams { + workflowId?: string + /** The block ID to run. Only this block executes using cached upstream outputs. */ + blockId: string + /** Optional execution ID to load the snapshot from. If omitted, uses the latest execution. */ + executionId?: string + workflow_input?: unknown + input?: unknown + useDeployedState?: boolean +} + +export interface GetDeployedWorkflowStateParams { + workflowId?: string +} + +export interface GenerateApiKeyParams { + name: string + workspaceId?: string } export interface VariableOperation { diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools/mutations.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools/mutations.ts index a3c3c0efc..80e1c0a23 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools/mutations.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools/mutations.ts @@ -1,21 +1,31 @@ import crypto from 'crypto' +import { nanoid } from 'nanoid' import { createLogger } from '@sim/logger' import { db } from '@sim/db' -import { workflow, workflowFolder } from '@sim/db/schema' +import { apiKey, workflow, workflowFolder } from '@sim/db/schema' import { and, eq, isNull, max } from 'drizzle-orm' import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types' +import { createApiKey } from '@/lib/api-key/auth' import { generateRequestId } from '@/lib/core/utils/request' import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults' import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow' import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils' import { ensureWorkflowAccess, ensureWorkspaceAccess, getDefaultWorkspaceId } from '../access' +import { + getExecutionState, + getLatestExecutionState, +} from '@/lib/workflows/executor/execution-state' import type { CreateFolderParams, CreateWorkflowParams, + GenerateApiKeyParams, MoveFolderParams, MoveWorkflowParams, RenameWorkflowParams, + RunBlockParams, + RunFromBlockParams, RunWorkflowParams, + RunWorkflowUntilBlockParams, SetGlobalWorkflowVariablesParams, VariableOperation, } from '../param-types' @@ -150,6 +160,8 @@ export async function executeRunWorkflow( const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + const useDraftState = !params.useDeployedState + const result = await executeWorkflow( { id: workflowRecord.id, @@ -160,7 +172,7 @@ export async function executeRunWorkflow( generateRequestId(), params.workflow_input || params.input || undefined, context.userId, - { enabled: true, useDraftState: true } + { enabled: true, useDraftState } ) return { @@ -368,3 +380,245 @@ export async function executeMoveFolder( return { success: false, error: error instanceof Error ? error.message : String(error) } } } + +export async function executeRunWorkflowUntilBlock( + params: RunWorkflowUntilBlockParams, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + if (!params.stopAfterBlockId) { + return { success: false, error: 'stopAfterBlockId is required' } + } + + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + + const useDraftState = !params.useDeployedState + + const result = await executeWorkflow( + { + id: workflowRecord.id, + userId: workflowRecord.userId, + workspaceId: workflowRecord.workspaceId, + variables: workflowRecord.variables || {}, + }, + generateRequestId(), + params.workflow_input || params.input || undefined, + context.userId, + { enabled: true, useDraftState, stopAfterBlockId: params.stopAfterBlockId } + ) + + return { + success: result.success, + output: { + executionId: result.metadata?.executionId, + success: result.success, + stoppedAfterBlockId: params.stopAfterBlockId, + output: result.output, + logs: result.logs, + }, + error: result.success ? undefined : result.error || 'Workflow execution failed', + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeGenerateApiKey( + params: GenerateApiKeyParams, + context: ExecutionContext +): Promise { + try { + const name = typeof params.name === 'string' ? params.name.trim() : '' + if (!name) { + return { success: false, error: 'name is required' } + } + if (name.length > 200) { + return { success: false, error: 'API key name must be 200 characters or less' } + } + + const workspaceId = params.workspaceId || (await getDefaultWorkspaceId(context.userId)) + await ensureWorkspaceAccess(workspaceId, context.userId, true) + + const existingKey = await db + .select({ id: apiKey.id }) + .from(apiKey) + .where( + and( + eq(apiKey.workspaceId, workspaceId), + eq(apiKey.name, name), + eq(apiKey.type, 'workspace') + ) + ) + .limit(1) + + if (existingKey.length > 0) { + return { + success: false, + error: `A workspace API key named "${name}" already exists. Choose a different name.`, + } + } + + const { key: plainKey, encryptedKey } = await createApiKey(true) + if (!encryptedKey) { + return { success: false, error: 'Failed to encrypt API key for storage' } + } + + const [newKey] = await db + .insert(apiKey) + .values({ + id: nanoid(), + workspaceId, + userId: context.userId, + createdBy: context.userId, + name, + key: encryptedKey, + type: 'workspace', + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning({ id: apiKey.id, name: apiKey.name, createdAt: apiKey.createdAt }) + + return { + success: true, + output: { + id: newKey.id, + name: newKey.name, + key: plainKey, + workspaceId, + message: + 'API key created successfully. Copy this key now — it will not be shown again. Use this key in the x-api-key header when calling workflow API endpoints.', + }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeRunFromBlock( + params: RunFromBlockParams, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + if (!params.startBlockId) { + return { success: false, error: 'startBlockId is required' } + } + + const snapshot = params.executionId + ? await getExecutionState(params.executionId) + : await getLatestExecutionState(workflowId) + + if (!snapshot) { + return { + success: false, + error: params.executionId + ? `No execution state found for execution ${params.executionId}. Run the full workflow first.` + : `No execution state found for workflow ${workflowId}. Run the full workflow first to create a snapshot.`, + } + } + + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + const useDraftState = !params.useDeployedState + + const result = await executeWorkflow( + { + id: workflowRecord.id, + userId: workflowRecord.userId, + workspaceId: workflowRecord.workspaceId, + variables: workflowRecord.variables || {}, + }, + generateRequestId(), + params.workflow_input || params.input || undefined, + context.userId, + { + enabled: true, + useDraftState, + runFromBlock: { startBlockId: params.startBlockId, sourceSnapshot: snapshot }, + } + ) + + return { + success: result.success, + output: { + executionId: result.metadata?.executionId, + success: result.success, + startBlockId: params.startBlockId, + output: result.output, + logs: result.logs, + }, + error: result.success ? undefined : result.error || 'Workflow execution failed', + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeRunBlock( + params: RunBlockParams, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + if (!params.blockId) { + return { success: false, error: 'blockId is required' } + } + + const snapshot = params.executionId + ? await getExecutionState(params.executionId) + : await getLatestExecutionState(workflowId) + + if (!snapshot) { + return { + success: false, + error: params.executionId + ? `No execution state found for execution ${params.executionId}. Run the full workflow first.` + : `No execution state found for workflow ${workflowId}. Run the full workflow first to create a snapshot.`, + } + } + + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + const useDraftState = !params.useDeployedState + + const result = await executeWorkflow( + { + id: workflowRecord.id, + userId: workflowRecord.userId, + workspaceId: workflowRecord.workspaceId, + variables: workflowRecord.variables || {}, + }, + generateRequestId(), + params.workflow_input || params.input || undefined, + context.userId, + { + enabled: true, + useDraftState, + runFromBlock: { startBlockId: params.blockId, sourceSnapshot: snapshot }, + stopAfterBlockId: params.blockId, + } + ) + + return { + success: result.success, + output: { + executionId: result.metadata?.executionId, + success: result.success, + blockId: params.blockId, + output: result.output, + logs: result.logs, + }, + error: result.success ? undefined : result.error || 'Workflow execution failed', + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools/queries.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools/queries.ts index cc8a724f0..645b08301 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools/queries.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools/queries.ts @@ -10,7 +10,10 @@ import { mcpService } from '@/lib/mcp/service' import { listWorkspaceFiles } from '@/lib/uploads/contexts/workspace' import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs' import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator' -import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { + loadDeployedWorkflowState, + loadWorkflowFromNormalizedTables, +} from '@/lib/workflows/persistence/utils' import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers' import type { Loop, Parallel } from '@/stores/workflows/workflow/types' import { normalizeName } from '@/executor/constants' @@ -23,6 +26,7 @@ import { import type { GetBlockOutputsParams, GetBlockUpstreamReferencesParams, + GetDeployedWorkflowStateParams, GetUserWorkflowParams, GetWorkflowDataParams, GetWorkflowFromNameParams, @@ -562,3 +566,50 @@ function formatOutputsWithPrefix(paths: string[], blockName: string): string[] { const normalizedName = normalizeName(blockName) return paths.map((path) => `${normalizedName}.${path}`) } + +export async function executeGetDeployedWorkflowState( + params: GetDeployedWorkflowStateParams, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + + try { + const deployedState = await loadDeployedWorkflowState(workflowId) + const formatted = formatNormalizedWorkflowForCopilot({ + blocks: deployedState.blocks, + edges: deployedState.edges, + loops: deployedState.loops as Record, + parallels: deployedState.parallels as Record, + }) + + return { + success: true, + output: { + workflowId, + workflowName: workflowRecord.name || '', + isDeployed: true, + deploymentVersionId: deployedState.deploymentVersionId, + deployedState: formatted, + }, + } + } catch { + return { + success: true, + output: { + workflowId, + workflowName: workflowRecord.name || '', + isDeployed: false, + message: 'Workflow has not been deployed yet.', + }, + } + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} diff --git a/apps/sim/lib/copilot/tools/mcp/definitions.ts b/apps/sim/lib/copilot/tools/mcp/definitions.ts index 3ad8a1e44..08fbe5b8f 100644 --- a/apps/sim/lib/copilot/tools/mcp/definitions.ts +++ b/apps/sim/lib/copilot/tools/mcp/definitions.ts @@ -189,6 +189,158 @@ export const DIRECT_TOOL_DEFS: DirectToolDef[] = [ required: ['folderId'], }, }, + { + name: 'run_workflow', + toolId: 'run_workflow', + description: + 'Run a workflow and return its output. Works on both draft and deployed states. By default runs the draft (live) state.', + inputSchema: { + type: 'object', + properties: { + workflowId: { + type: 'string', + description: 'REQUIRED. The workflow ID to run.', + }, + workflow_input: { + type: 'object', + description: 'JSON object with input values. Keys should match the workflow start block input field names.', + }, + useDeployedState: { + type: 'boolean', + description: 'When true, runs the deployed version instead of the draft. Default: false.', + }, + }, + required: ['workflowId'], + }, + }, + { + name: 'run_workflow_until_block', + toolId: 'run_workflow_until_block', + description: + 'Run a workflow and stop after a specific block completes. Useful for testing partial execution or debugging specific blocks.', + inputSchema: { + type: 'object', + properties: { + workflowId: { + type: 'string', + description: 'REQUIRED. The workflow ID to run.', + }, + stopAfterBlockId: { + type: 'string', + description: 'REQUIRED. The block ID to stop after. Execution halts once this block completes.', + }, + workflow_input: { + type: 'object', + description: 'JSON object with input values for the workflow.', + }, + useDeployedState: { + type: 'boolean', + description: 'When true, runs the deployed version instead of the draft. Default: false.', + }, + }, + required: ['workflowId', 'stopAfterBlockId'], + }, + }, + { + name: 'run_from_block', + toolId: 'run_from_block', + description: + 'Run a workflow starting from a specific block, using cached outputs from a prior execution for upstream blocks. The workflow must have been run at least once first.', + inputSchema: { + type: 'object', + properties: { + workflowId: { + type: 'string', + description: 'REQUIRED. The workflow ID to run.', + }, + startBlockId: { + type: 'string', + description: 'REQUIRED. The block ID to start execution from.', + }, + executionId: { + type: 'string', + description: 'Optional. Specific execution ID to load the snapshot from. Uses latest if omitted.', + }, + workflow_input: { + type: 'object', + description: 'Optional input values for the workflow.', + }, + useDeployedState: { + type: 'boolean', + description: 'When true, runs the deployed version instead of the draft. Default: false.', + }, + }, + required: ['workflowId', 'startBlockId'], + }, + }, + { + name: 'run_block', + toolId: 'run_block', + description: + 'Run a single block in isolation using cached outputs from a prior execution. Only the specified block executes — nothing upstream or downstream. The workflow must have been run at least once first.', + inputSchema: { + type: 'object', + properties: { + workflowId: { + type: 'string', + description: 'REQUIRED. The workflow ID.', + }, + blockId: { + type: 'string', + description: 'REQUIRED. The block ID to run in isolation.', + }, + executionId: { + type: 'string', + description: 'Optional. Specific execution ID to load the snapshot from. Uses latest if omitted.', + }, + workflow_input: { + type: 'object', + description: 'Optional input values for the workflow.', + }, + useDeployedState: { + type: 'boolean', + description: 'When true, runs the deployed version instead of the draft. Default: false.', + }, + }, + required: ['workflowId', 'blockId'], + }, + }, + { + name: 'get_deployed_workflow_state', + toolId: 'get_deployed_workflow_state', + description: + 'Get the deployed (production) state of a workflow. Returns the full workflow definition as deployed, or indicates if the workflow is not yet deployed.', + inputSchema: { + type: 'object', + properties: { + workflowId: { + type: 'string', + description: 'REQUIRED. The workflow ID to get the deployed state for.', + }, + }, + required: ['workflowId'], + }, + }, + { + name: 'generate_api_key', + toolId: 'generate_api_key', + description: + 'Generate a new workspace API key for calling workflow API endpoints. The key is only shown once — tell the user to save it immediately.', + inputSchema: { + type: 'object', + properties: { + name: { + type: 'string', + description: 'A descriptive name for the API key (e.g., "production-key", "dev-testing").', + }, + workspaceId: { + type: 'string', + description: 'Optional workspace ID. Defaults to user\'s default workspace.', + }, + }, + required: ['name'], + }, + }, ] export const SUBAGENT_TOOL_DEFS: SubagentToolDef[] = [ diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 0fc47fa73..033f3dc8e 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -21,6 +21,7 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing' import { isBillingEnabled } from '@/lib/core/config/feature-flags' import { redactApiKeys } from '@/lib/core/security/redaction' import { filterForDisplay } from '@/lib/core/utils/display-filters' +import type { SerializableExecutionState } from '@/executor/execution/types' import { emitWorkflowExecutionCompleted } from '@/lib/logs/events' import { snapshotService } from '@/lib/logs/execution/snapshot/service' import type { @@ -188,6 +189,7 @@ export class ExecutionLogger implements IExecutionLoggerService { finalOutput: BlockOutputData traceSpans?: TraceSpan[] workflowInput?: any + executionState?: SerializableExecutionState isResume?: boolean level?: 'info' | 'error' status?: 'completed' | 'failed' | 'cancelled' | 'pending' @@ -200,6 +202,7 @@ export class ExecutionLogger implements IExecutionLoggerService { finalOutput, traceSpans, workflowInput, + executionState, isResume, level: levelOverride, status: statusOverride, @@ -287,6 +290,7 @@ export class ExecutionLogger implements IExecutionLoggerService { total: executionCost.tokens.total, }, models: executionCost.models, + ...(executionState ? { executionState } : {}), }, cost: executionCost, }) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index be1515686..4245af6c1 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -3,6 +3,7 @@ import { workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq, sql } from 'drizzle-orm' import { BASE_EXECUTION_CHARGE } from '@/lib/billing/constants' +import type { SerializableExecutionState } from '@/executor/execution/types' import { executionLogger } from '@/lib/logs/execution/logger' import { calculateCostSummary, @@ -35,6 +36,7 @@ export interface SessionCompleteParams { finalOutput?: any traceSpans?: TraceSpan[] workflowInput?: any + executionState?: SerializableExecutionState } export interface SessionErrorCompleteParams { @@ -269,7 +271,8 @@ export class LoggingSession { return } - const { endedAt, totalDurationMs, finalOutput, traceSpans, workflowInput } = params + const { endedAt, totalDurationMs, finalOutput, traceSpans, workflowInput, executionState } = + params try { const costSummary = calculateCostSummary(traceSpans || []) @@ -284,6 +287,7 @@ export class LoggingSession { finalOutput: finalOutput || {}, traceSpans: traceSpans || [], workflowInput, + executionState, isResume: this.isResume, }) diff --git a/apps/sim/lib/logs/types.ts b/apps/sim/lib/logs/types.ts index 8d65fbd55..9d160fd4a 100644 --- a/apps/sim/lib/logs/types.ts +++ b/apps/sim/lib/logs/types.ts @@ -1,5 +1,6 @@ import type { Edge } from 'reactflow' import type { BlockLog, NormalizedBlockOutput } from '@/executor/types' +import type { SerializableExecutionState } from '@/executor/execution/types' import type { DeploymentStatus } from '@/stores/workflows/registry/types' import type { Loop, Parallel, WorkflowState } from '@/stores/workflows/workflow/types' @@ -111,6 +112,7 @@ export interface WorkflowExecutionLog { tokens?: { input?: number; output?: number; total?: number } } > + executionState?: SerializableExecutionState finalOutput?: any errorDetails?: { blockId: string diff --git a/apps/sim/lib/workflows/executor/execute-workflow.ts b/apps/sim/lib/workflows/executor/execute-workflow.ts index 3313128ff..82813ce76 100644 --- a/apps/sim/lib/workflows/executor/execute-workflow.ts +++ b/apps/sim/lib/workflows/executor/execute-workflow.ts @@ -4,7 +4,7 @@ import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import { ExecutionSnapshot } from '@/executor/execution/snapshot' -import type { ExecutionMetadata } from '@/executor/execution/types' +import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types' import type { ExecutionResult, StreamingExecution } from '@/executor/types' const logger = createLogger('WorkflowExecution') @@ -22,6 +22,13 @@ export interface ExecuteWorkflowOptions { abortSignal?: AbortSignal /** Use the live/draft workflow state instead of the deployed state. Used by copilot. */ useDraftState?: boolean + /** Stop execution after this block completes. Used for "run until block" feature. */ + stopAfterBlockId?: string + /** Run-from-block configuration using a prior execution snapshot. */ + runFromBlock?: { + startBlockId: string + sourceSnapshot: SerializableExecutionState + } } export interface WorkflowInfo { @@ -86,6 +93,8 @@ export async function executeWorkflow( includeFileBase64: streamConfig?.includeFileBase64, base64MaxBytes: streamConfig?.base64MaxBytes, abortSignal: streamConfig?.abortSignal, + stopAfterBlockId: streamConfig?.stopAfterBlockId, + runFromBlock: streamConfig?.runFromBlock, }) if (result.status === 'paused') { diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 60998d934..56926d627 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -400,6 +400,7 @@ export async function executeWorkflowCore( finalOutput: result.output || {}, traceSpans: traceSpans || [], workflowInput: processedInput, + executionState: result.executionState, }) await clearExecutionCancellation(executionId) diff --git a/apps/sim/lib/workflows/executor/execution-state.ts b/apps/sim/lib/workflows/executor/execution-state.ts new file mode 100644 index 000000000..490895a89 --- /dev/null +++ b/apps/sim/lib/workflows/executor/execution-state.ts @@ -0,0 +1,53 @@ +import { db } from '@sim/db' +import { workflowExecutionLogs } from '@sim/db/schema' +import { and, desc, eq, sql } from 'drizzle-orm' +import type { SerializableExecutionState } from '@/executor/execution/types' + +function isSerializableExecutionState(value: unknown): value is SerializableExecutionState { + if (!value || typeof value !== 'object') return false + const state = value as Record + return ( + typeof state.blockStates === 'object' && + Array.isArray(state.executedBlocks) && + Array.isArray(state.blockLogs) && + typeof state.decisions === 'object' && + Array.isArray(state.completedLoops) && + Array.isArray(state.activeExecutionPath) + ) +} + +function extractExecutionState(executionData: unknown): SerializableExecutionState | null { + if (!executionData || typeof executionData !== 'object') return null + const state = (executionData as Record).executionState + return isSerializableExecutionState(state) ? state : null +} + +export async function getExecutionState( + executionId: string +): Promise { + const [row] = await db + .select({ executionData: workflowExecutionLogs.executionData }) + .from(workflowExecutionLogs) + .where(eq(workflowExecutionLogs.executionId, executionId)) + .limit(1) + + return extractExecutionState(row?.executionData) +} + +export async function getLatestExecutionState( + workflowId: string +): Promise { + const [row] = await db + .select({ executionData: workflowExecutionLogs.executionData }) + .from(workflowExecutionLogs) + .where( + and( + eq(workflowExecutionLogs.workflowId, workflowId), + sql`${workflowExecutionLogs.executionData} -> 'executionState' IS NOT NULL` + ) + ) + .orderBy(desc(workflowExecutionLogs.startedAt)) + .limit(1) + + return extractExecutionState(row?.executionData) +}