Add tools

This commit is contained in:
Siddharth Ganesan
2026-02-06 11:44:27 -08:00
parent 13c8621513
commit 3d5321d9a1
14 changed files with 624 additions and 5 deletions

View File

@@ -4,6 +4,7 @@ import { BlockType } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
import type { EdgeManager } from '@/executor/execution/edge-manager'
import { serializePauseSnapshot } from '@/executor/execution/snapshot-serializer'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import type {
ExecutionContext,
@@ -135,6 +136,7 @@ export class ExecutionEngine {
success: false,
output: this.finalOutput,
logs: this.context.blockLogs,
executionState: this.getSerializableExecutionState(),
metadata: this.context.metadata,
status: 'cancelled',
}
@@ -144,6 +146,7 @@ export class ExecutionEngine {
success: true,
output: this.finalOutput,
logs: this.context.blockLogs,
executionState: this.getSerializableExecutionState(),
metadata: this.context.metadata,
}
} catch (error) {
@@ -157,6 +160,7 @@ export class ExecutionEngine {
success: false,
output: this.finalOutput,
logs: this.context.blockLogs,
executionState: this.getSerializableExecutionState(),
metadata: this.context.metadata,
status: 'cancelled',
}
@@ -459,6 +463,7 @@ export class ExecutionEngine {
success: true,
output: this.collectPauseResponses(),
logs: this.context.blockLogs,
executionState: this.getSerializableExecutionState(snapshotSeed),
metadata: this.context.metadata,
status: 'paused',
pausePoints,
@@ -466,6 +471,24 @@ export class ExecutionEngine {
}
}
private getSerializableExecutionState(
snapshotSeed?: { snapshot: string }
): SerializableExecutionState | undefined {
try {
const serializedSnapshot =
snapshotSeed?.snapshot ?? serializePauseSnapshot(this.context, [], this.dag).snapshot
const parsedSnapshot = JSON.parse(serializedSnapshot) as {
state?: SerializableExecutionState
}
return parsedSnapshot.state
} catch (error) {
logger.warn('Failed to serialize execution state', {
error: error instanceof Error ? error.message : String(error),
})
return undefined
}
}
private collectPauseResponses(): NormalizedBlockOutput {
const responses = Array.from(this.pausedBlocks.values()).map((pause) => pause.response)

View File

@@ -1,6 +1,7 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import type { BlockOutput } from '@/blocks/types'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
@@ -302,6 +303,7 @@ export interface ExecutionResult {
output: NormalizedBlockOutput
error?: string
logs?: BlockLog[]
executionState?: SerializableExecutionState
metadata?: ExecutionMetadata
status?: 'completed' | 'paused' | 'cancelled'
pausePoints?: PausePoint[]

View File

@@ -1,6 +1,9 @@
export const INTERRUPT_TOOL_NAMES = [
'set_global_workflow_variables',
'run_workflow',
'run_workflow_until_block',
'run_from_block',
'run_block',
'manage_mcp_tool',
'manage_custom_tool',
'deploy_mcp',
@@ -12,6 +15,7 @@ export const INTERRUPT_TOOL_NAMES = [
'oauth_request_access',
'navigate_ui',
'knowledge_base',
'generate_api_key',
] as const
export const INTERRUPT_TOOL_SET = new Set<string>(INTERRUPT_TOOL_NAMES)

View File

@@ -30,8 +30,10 @@ import type {
DeployApiParams,
DeployChatParams,
DeployMcpParams,
GenerateApiKeyParams,
GetBlockOutputsParams,
GetBlockUpstreamReferencesParams,
GetDeployedWorkflowStateParams,
GetUserWorkflowParams,
GetWorkflowDataParams,
GetWorkflowFromNameParams,
@@ -41,14 +43,19 @@ import type {
MoveFolderParams,
MoveWorkflowParams,
RenameWorkflowParams,
RunBlockParams,
RunFromBlockParams,
RunWorkflowParams,
RunWorkflowUntilBlockParams,
SetGlobalWorkflowVariablesParams,
} from './param-types'
import {
executeCreateFolder,
executeCreateWorkflow,
executeGenerateApiKey,
executeGetBlockOutputs,
executeGetBlockUpstreamReferences,
executeGetDeployedWorkflowState,
executeGetUserWorkflow,
executeGetWorkflowData,
executeGetWorkflowFromName,
@@ -58,7 +65,10 @@ import {
executeMoveFolder,
executeMoveWorkflow,
executeRenameWorkflow,
executeRunBlock,
executeRunFromBlock,
executeRunWorkflow,
executeRunWorkflowUntilBlock,
executeSetGlobalWorkflowVariables,
} from './workflow-tools'
@@ -99,6 +109,13 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record<
get_block_upstream_references: (p, c) =>
executeGetBlockUpstreamReferences(p as unknown as GetBlockUpstreamReferencesParams, c),
run_workflow: (p, c) => executeRunWorkflow(p as RunWorkflowParams, c),
run_workflow_until_block: (p, c) =>
executeRunWorkflowUntilBlock(p as unknown as RunWorkflowUntilBlockParams, c),
run_from_block: (p, c) => executeRunFromBlock(p as unknown as RunFromBlockParams, c),
run_block: (p, c) => executeRunBlock(p as unknown as RunBlockParams, c),
get_deployed_workflow_state: (p, c) =>
executeGetDeployedWorkflowState(p as GetDeployedWorkflowStateParams, c),
generate_api_key: (p, c) => executeGenerateApiKey(p as unknown as GenerateApiKeyParams, c),
set_global_workflow_variables: (p, c) =>
executeSetGlobalWorkflowVariables(p as SetGlobalWorkflowVariablesParams, c),
deploy_api: (p, c) => executeDeployApi(p as DeployApiParams, c),

View File

@@ -57,6 +57,49 @@ export interface RunWorkflowParams {
workflowId?: string
workflow_input?: unknown
input?: unknown
/** When true, runs the deployed version instead of the draft. Default: false (draft). */
useDeployedState?: boolean
}
export interface RunWorkflowUntilBlockParams {
workflowId?: string
workflow_input?: unknown
input?: unknown
/** The block ID to stop after. Execution halts once this block completes. */
stopAfterBlockId: string
/** When true, runs the deployed version instead of the draft. Default: false (draft). */
useDeployedState?: boolean
}
export interface RunFromBlockParams {
workflowId?: string
/** The block ID to start execution from. */
startBlockId: string
/** Optional execution ID to load the snapshot from. If omitted, uses the latest execution. */
executionId?: string
workflow_input?: unknown
input?: unknown
useDeployedState?: boolean
}
export interface RunBlockParams {
workflowId?: string
/** The block ID to run. Only this block executes using cached upstream outputs. */
blockId: string
/** Optional execution ID to load the snapshot from. If omitted, uses the latest execution. */
executionId?: string
workflow_input?: unknown
input?: unknown
useDeployedState?: boolean
}
export interface GetDeployedWorkflowStateParams {
workflowId?: string
}
export interface GenerateApiKeyParams {
name: string
workspaceId?: string
}
export interface VariableOperation {

View File

@@ -1,21 +1,31 @@
import crypto from 'crypto'
import { nanoid } from 'nanoid'
import { createLogger } from '@sim/logger'
import { db } from '@sim/db'
import { workflow, workflowFolder } from '@sim/db/schema'
import { apiKey, workflow, workflowFolder } from '@sim/db/schema'
import { and, eq, isNull, max } from 'drizzle-orm'
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
import { createApiKey } from '@/lib/api-key/auth'
import { generateRequestId } from '@/lib/core/utils/request'
import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults'
import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
import { ensureWorkflowAccess, ensureWorkspaceAccess, getDefaultWorkspaceId } from '../access'
import {
getExecutionState,
getLatestExecutionState,
} from '@/lib/workflows/executor/execution-state'
import type {
CreateFolderParams,
CreateWorkflowParams,
GenerateApiKeyParams,
MoveFolderParams,
MoveWorkflowParams,
RenameWorkflowParams,
RunBlockParams,
RunFromBlockParams,
RunWorkflowParams,
RunWorkflowUntilBlockParams,
SetGlobalWorkflowVariablesParams,
VariableOperation,
} from '../param-types'
@@ -150,6 +160,8 @@ export async function executeRunWorkflow(
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const useDraftState = !params.useDeployedState
const result = await executeWorkflow(
{
id: workflowRecord.id,
@@ -160,7 +172,7 @@ export async function executeRunWorkflow(
generateRequestId(),
params.workflow_input || params.input || undefined,
context.userId,
{ enabled: true, useDraftState: true }
{ enabled: true, useDraftState }
)
return {
@@ -368,3 +380,245 @@ export async function executeMoveFolder(
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeRunWorkflowUntilBlock(
params: RunWorkflowUntilBlockParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
if (!params.stopAfterBlockId) {
return { success: false, error: 'stopAfterBlockId is required' }
}
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const useDraftState = !params.useDeployedState
const result = await executeWorkflow(
{
id: workflowRecord.id,
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId,
variables: workflowRecord.variables || {},
},
generateRequestId(),
params.workflow_input || params.input || undefined,
context.userId,
{ enabled: true, useDraftState, stopAfterBlockId: params.stopAfterBlockId }
)
return {
success: result.success,
output: {
executionId: result.metadata?.executionId,
success: result.success,
stoppedAfterBlockId: params.stopAfterBlockId,
output: result.output,
logs: result.logs,
},
error: result.success ? undefined : result.error || 'Workflow execution failed',
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeGenerateApiKey(
params: GenerateApiKeyParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const name = typeof params.name === 'string' ? params.name.trim() : ''
if (!name) {
return { success: false, error: 'name is required' }
}
if (name.length > 200) {
return { success: false, error: 'API key name must be 200 characters or less' }
}
const workspaceId = params.workspaceId || (await getDefaultWorkspaceId(context.userId))
await ensureWorkspaceAccess(workspaceId, context.userId, true)
const existingKey = await db
.select({ id: apiKey.id })
.from(apiKey)
.where(
and(
eq(apiKey.workspaceId, workspaceId),
eq(apiKey.name, name),
eq(apiKey.type, 'workspace')
)
)
.limit(1)
if (existingKey.length > 0) {
return {
success: false,
error: `A workspace API key named "${name}" already exists. Choose a different name.`,
}
}
const { key: plainKey, encryptedKey } = await createApiKey(true)
if (!encryptedKey) {
return { success: false, error: 'Failed to encrypt API key for storage' }
}
const [newKey] = await db
.insert(apiKey)
.values({
id: nanoid(),
workspaceId,
userId: context.userId,
createdBy: context.userId,
name,
key: encryptedKey,
type: 'workspace',
createdAt: new Date(),
updatedAt: new Date(),
})
.returning({ id: apiKey.id, name: apiKey.name, createdAt: apiKey.createdAt })
return {
success: true,
output: {
id: newKey.id,
name: newKey.name,
key: plainKey,
workspaceId,
message:
'API key created successfully. Copy this key now — it will not be shown again. Use this key in the x-api-key header when calling workflow API endpoints.',
},
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeRunFromBlock(
params: RunFromBlockParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
if (!params.startBlockId) {
return { success: false, error: 'startBlockId is required' }
}
const snapshot = params.executionId
? await getExecutionState(params.executionId)
: await getLatestExecutionState(workflowId)
if (!snapshot) {
return {
success: false,
error: params.executionId
? `No execution state found for execution ${params.executionId}. Run the full workflow first.`
: `No execution state found for workflow ${workflowId}. Run the full workflow first to create a snapshot.`,
}
}
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const useDraftState = !params.useDeployedState
const result = await executeWorkflow(
{
id: workflowRecord.id,
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId,
variables: workflowRecord.variables || {},
},
generateRequestId(),
params.workflow_input || params.input || undefined,
context.userId,
{
enabled: true,
useDraftState,
runFromBlock: { startBlockId: params.startBlockId, sourceSnapshot: snapshot },
}
)
return {
success: result.success,
output: {
executionId: result.metadata?.executionId,
success: result.success,
startBlockId: params.startBlockId,
output: result.output,
logs: result.logs,
},
error: result.success ? undefined : result.error || 'Workflow execution failed',
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeRunBlock(
params: RunBlockParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
if (!params.blockId) {
return { success: false, error: 'blockId is required' }
}
const snapshot = params.executionId
? await getExecutionState(params.executionId)
: await getLatestExecutionState(workflowId)
if (!snapshot) {
return {
success: false,
error: params.executionId
? `No execution state found for execution ${params.executionId}. Run the full workflow first.`
: `No execution state found for workflow ${workflowId}. Run the full workflow first to create a snapshot.`,
}
}
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const useDraftState = !params.useDeployedState
const result = await executeWorkflow(
{
id: workflowRecord.id,
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId,
variables: workflowRecord.variables || {},
},
generateRequestId(),
params.workflow_input || params.input || undefined,
context.userId,
{
enabled: true,
useDraftState,
runFromBlock: { startBlockId: params.blockId, sourceSnapshot: snapshot },
stopAfterBlockId: params.blockId,
}
)
return {
success: result.success,
output: {
executionId: result.metadata?.executionId,
success: result.success,
blockId: params.blockId,
output: result.output,
logs: result.logs,
},
error: result.success ? undefined : result.error || 'Workflow execution failed',
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}

View File

@@ -10,7 +10,10 @@ import { mcpService } from '@/lib/mcp/service'
import { listWorkspaceFiles } from '@/lib/uploads/contexts/workspace'
import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs'
import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import {
loadDeployedWorkflowState,
loadWorkflowFromNormalizedTables,
} from '@/lib/workflows/persistence/utils'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import type { Loop, Parallel } from '@/stores/workflows/workflow/types'
import { normalizeName } from '@/executor/constants'
@@ -23,6 +26,7 @@ import {
import type {
GetBlockOutputsParams,
GetBlockUpstreamReferencesParams,
GetDeployedWorkflowStateParams,
GetUserWorkflowParams,
GetWorkflowDataParams,
GetWorkflowFromNameParams,
@@ -562,3 +566,50 @@ function formatOutputsWithPrefix(paths: string[], blockName: string): string[] {
const normalizedName = normalizeName(blockName)
return paths.map((path) => `${normalizedName}.${path}`)
}
export async function executeGetDeployedWorkflowState(
params: GetDeployedWorkflowStateParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
try {
const deployedState = await loadDeployedWorkflowState(workflowId)
const formatted = formatNormalizedWorkflowForCopilot({
blocks: deployedState.blocks,
edges: deployedState.edges,
loops: deployedState.loops as Record<string, Loop>,
parallels: deployedState.parallels as Record<string, Parallel>,
})
return {
success: true,
output: {
workflowId,
workflowName: workflowRecord.name || '',
isDeployed: true,
deploymentVersionId: deployedState.deploymentVersionId,
deployedState: formatted,
},
}
} catch {
return {
success: true,
output: {
workflowId,
workflowName: workflowRecord.name || '',
isDeployed: false,
message: 'Workflow has not been deployed yet.',
},
}
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}

View File

@@ -189,6 +189,158 @@ export const DIRECT_TOOL_DEFS: DirectToolDef[] = [
required: ['folderId'],
},
},
{
name: 'run_workflow',
toolId: 'run_workflow',
description:
'Run a workflow and return its output. Works on both draft and deployed states. By default runs the draft (live) state.',
inputSchema: {
type: 'object',
properties: {
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to run.',
},
workflow_input: {
type: 'object',
description: 'JSON object with input values. Keys should match the workflow start block input field names.',
},
useDeployedState: {
type: 'boolean',
description: 'When true, runs the deployed version instead of the draft. Default: false.',
},
},
required: ['workflowId'],
},
},
{
name: 'run_workflow_until_block',
toolId: 'run_workflow_until_block',
description:
'Run a workflow and stop after a specific block completes. Useful for testing partial execution or debugging specific blocks.',
inputSchema: {
type: 'object',
properties: {
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to run.',
},
stopAfterBlockId: {
type: 'string',
description: 'REQUIRED. The block ID to stop after. Execution halts once this block completes.',
},
workflow_input: {
type: 'object',
description: 'JSON object with input values for the workflow.',
},
useDeployedState: {
type: 'boolean',
description: 'When true, runs the deployed version instead of the draft. Default: false.',
},
},
required: ['workflowId', 'stopAfterBlockId'],
},
},
{
name: 'run_from_block',
toolId: 'run_from_block',
description:
'Run a workflow starting from a specific block, using cached outputs from a prior execution for upstream blocks. The workflow must have been run at least once first.',
inputSchema: {
type: 'object',
properties: {
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to run.',
},
startBlockId: {
type: 'string',
description: 'REQUIRED. The block ID to start execution from.',
},
executionId: {
type: 'string',
description: 'Optional. Specific execution ID to load the snapshot from. Uses latest if omitted.',
},
workflow_input: {
type: 'object',
description: 'Optional input values for the workflow.',
},
useDeployedState: {
type: 'boolean',
description: 'When true, runs the deployed version instead of the draft. Default: false.',
},
},
required: ['workflowId', 'startBlockId'],
},
},
{
name: 'run_block',
toolId: 'run_block',
description:
'Run a single block in isolation using cached outputs from a prior execution. Only the specified block executes — nothing upstream or downstream. The workflow must have been run at least once first.',
inputSchema: {
type: 'object',
properties: {
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID.',
},
blockId: {
type: 'string',
description: 'REQUIRED. The block ID to run in isolation.',
},
executionId: {
type: 'string',
description: 'Optional. Specific execution ID to load the snapshot from. Uses latest if omitted.',
},
workflow_input: {
type: 'object',
description: 'Optional input values for the workflow.',
},
useDeployedState: {
type: 'boolean',
description: 'When true, runs the deployed version instead of the draft. Default: false.',
},
},
required: ['workflowId', 'blockId'],
},
},
{
name: 'get_deployed_workflow_state',
toolId: 'get_deployed_workflow_state',
description:
'Get the deployed (production) state of a workflow. Returns the full workflow definition as deployed, or indicates if the workflow is not yet deployed.',
inputSchema: {
type: 'object',
properties: {
workflowId: {
type: 'string',
description: 'REQUIRED. The workflow ID to get the deployed state for.',
},
},
required: ['workflowId'],
},
},
{
name: 'generate_api_key',
toolId: 'generate_api_key',
description:
'Generate a new workspace API key for calling workflow API endpoints. The key is only shown once — tell the user to save it immediately.',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'A descriptive name for the API key (e.g., "production-key", "dev-testing").',
},
workspaceId: {
type: 'string',
description: 'Optional workspace ID. Defaults to user\'s default workspace.',
},
},
required: ['name'],
},
},
]
export const SUBAGENT_TOOL_DEFS: SubagentToolDef[] = [

View File

@@ -21,6 +21,7 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
import { redactApiKeys } from '@/lib/core/security/redaction'
import { filterForDisplay } from '@/lib/core/utils/display-filters'
import type { SerializableExecutionState } from '@/executor/execution/types'
import { emitWorkflowExecutionCompleted } from '@/lib/logs/events'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import type {
@@ -188,6 +189,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
finalOutput: BlockOutputData
traceSpans?: TraceSpan[]
workflowInput?: any
executionState?: SerializableExecutionState
isResume?: boolean
level?: 'info' | 'error'
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
@@ -200,6 +202,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
finalOutput,
traceSpans,
workflowInput,
executionState,
isResume,
level: levelOverride,
status: statusOverride,
@@ -287,6 +290,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
total: executionCost.tokens.total,
},
models: executionCost.models,
...(executionState ? { executionState } : {}),
},
cost: executionCost,
})

View File

@@ -3,6 +3,7 @@ import { workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq, sql } from 'drizzle-orm'
import { BASE_EXECUTION_CHARGE } from '@/lib/billing/constants'
import type { SerializableExecutionState } from '@/executor/execution/types'
import { executionLogger } from '@/lib/logs/execution/logger'
import {
calculateCostSummary,
@@ -35,6 +36,7 @@ export interface SessionCompleteParams {
finalOutput?: any
traceSpans?: TraceSpan[]
workflowInput?: any
executionState?: SerializableExecutionState
}
export interface SessionErrorCompleteParams {
@@ -269,7 +271,8 @@ export class LoggingSession {
return
}
const { endedAt, totalDurationMs, finalOutput, traceSpans, workflowInput } = params
const { endedAt, totalDurationMs, finalOutput, traceSpans, workflowInput, executionState } =
params
try {
const costSummary = calculateCostSummary(traceSpans || [])
@@ -284,6 +287,7 @@ export class LoggingSession {
finalOutput: finalOutput || {},
traceSpans: traceSpans || [],
workflowInput,
executionState,
isResume: this.isResume,
})

View File

@@ -1,5 +1,6 @@
import type { Edge } from 'reactflow'
import type { BlockLog, NormalizedBlockOutput } from '@/executor/types'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { DeploymentStatus } from '@/stores/workflows/registry/types'
import type { Loop, Parallel, WorkflowState } from '@/stores/workflows/workflow/types'
@@ -111,6 +112,7 @@ export interface WorkflowExecutionLog {
tokens?: { input?: number; output?: number; total?: number }
}
>
executionState?: SerializableExecutionState
finalOutput?: any
errorDetails?: {
blockId: string

View File

@@ -4,7 +4,7 @@ import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionResult, StreamingExecution } from '@/executor/types'
const logger = createLogger('WorkflowExecution')
@@ -22,6 +22,13 @@ export interface ExecuteWorkflowOptions {
abortSignal?: AbortSignal
/** Use the live/draft workflow state instead of the deployed state. Used by copilot. */
useDraftState?: boolean
/** Stop execution after this block completes. Used for "run until block" feature. */
stopAfterBlockId?: string
/** Run-from-block configuration using a prior execution snapshot. */
runFromBlock?: {
startBlockId: string
sourceSnapshot: SerializableExecutionState
}
}
export interface WorkflowInfo {
@@ -86,6 +93,8 @@ export async function executeWorkflow(
includeFileBase64: streamConfig?.includeFileBase64,
base64MaxBytes: streamConfig?.base64MaxBytes,
abortSignal: streamConfig?.abortSignal,
stopAfterBlockId: streamConfig?.stopAfterBlockId,
runFromBlock: streamConfig?.runFromBlock,
})
if (result.status === 'paused') {

View File

@@ -400,6 +400,7 @@ export async function executeWorkflowCore(
finalOutput: result.output || {},
traceSpans: traceSpans || [],
workflowInput: processedInput,
executionState: result.executionState,
})
await clearExecutionCancellation(executionId)

View File

@@ -0,0 +1,53 @@
import { db } from '@sim/db'
import { workflowExecutionLogs } from '@sim/db/schema'
import { and, desc, eq, sql } from 'drizzle-orm'
import type { SerializableExecutionState } from '@/executor/execution/types'
function isSerializableExecutionState(value: unknown): value is SerializableExecutionState {
if (!value || typeof value !== 'object') return false
const state = value as Record<string, unknown>
return (
typeof state.blockStates === 'object' &&
Array.isArray(state.executedBlocks) &&
Array.isArray(state.blockLogs) &&
typeof state.decisions === 'object' &&
Array.isArray(state.completedLoops) &&
Array.isArray(state.activeExecutionPath)
)
}
function extractExecutionState(executionData: unknown): SerializableExecutionState | null {
if (!executionData || typeof executionData !== 'object') return null
const state = (executionData as Record<string, unknown>).executionState
return isSerializableExecutionState(state) ? state : null
}
export async function getExecutionState(
executionId: string
): Promise<SerializableExecutionState | null> {
const [row] = await db
.select({ executionData: workflowExecutionLogs.executionData })
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)
return extractExecutionState(row?.executionData)
}
export async function getLatestExecutionState(
workflowId: string
): Promise<SerializableExecutionState | null> {
const [row] = await db
.select({ executionData: workflowExecutionLogs.executionData })
.from(workflowExecutionLogs)
.where(
and(
eq(workflowExecutionLogs.workflowId, workflowId),
sql`${workflowExecutionLogs.executionData} -> 'executionState' IS NOT NULL`
)
)
.orderBy(desc(workflowExecutionLogs.startedAt))
.limit(1)
return extractExecutionState(row?.executionData)
}