From 3d0b810a8e02738e868938354d9c16533f020ee3 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Mon, 26 Jan 2026 16:19:41 -0800 Subject: [PATCH] Run from block --- apps/sim/app/api/copilot/user-models/route.ts | 1 + .../[id]/execute-from-block/route.ts | 377 ++++++++++++++++++ .../components/action-bar/action-bar.tsx | 56 ++- .../components/user-input/constants.ts | 1 + .../hooks/use-workflow-execution.ts | 260 +++++++++++- apps/sim/executor/execution/engine.ts | 11 + apps/sim/executor/execution/executor.ts | 106 ++++- apps/sim/executor/execution/types.ts | 9 + apps/sim/executor/orchestrators/node.ts | 15 +- apps/sim/executor/types.ts | 9 + .../sim/executor/utils/run-from-block.test.ts | 272 +++++++++++++ apps/sim/executor/utils/run-from-block.ts | 110 +++++ apps/sim/hooks/use-execution-stream.ts | 143 +++++++ apps/sim/lib/copilot/models.ts | 1 + apps/sim/stores/execution/store.ts | 19 + apps/sim/stores/execution/types.ts | 19 + 16 files changed, 1401 insertions(+), 8 deletions(-) create mode 100644 apps/sim/app/api/workflows/[id]/execute-from-block/route.ts create mode 100644 apps/sim/executor/utils/run-from-block.test.ts create mode 100644 apps/sim/executor/utils/run-from-block.ts diff --git a/apps/sim/app/api/copilot/user-models/route.ts b/apps/sim/app/api/copilot/user-models/route.ts index ead14a5e9..b88e12b8a 100644 --- a/apps/sim/app/api/copilot/user-models/route.ts +++ b/apps/sim/app/api/copilot/user-models/route.ts @@ -31,6 +31,7 @@ const DEFAULT_ENABLED_MODELS: Record = { 'claude-4.5-opus': true, 'claude-4.1-opus': false, 'gemini-3-pro': true, + 'auto': true, } // GET - Fetch user's enabled models diff --git a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts new file mode 100644 index 000000000..14cc81248 --- /dev/null +++ b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts @@ -0,0 +1,377 @@ +import { db, workflow as workflowTable } from '@sim/db' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { v4 as uuidv4 } from 'uuid' +import { z } from 'zod' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' +import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events' +import { DAGExecutor } from '@/executor/execution/executor' +import type { IterationContext, SerializableExecutionState } from '@/executor/execution/types' +import type { NormalizedBlockOutput } from '@/executor/types' +import { hasExecutionResult } from '@/executor/utils/errors' +import { Serializer } from '@/serializer' +import { mergeSubblockState } from '@/stores/workflows/server-utils' + +const logger = createLogger('ExecuteFromBlockAPI') + +const ExecuteFromBlockSchema = z.object({ + startBlockId: z.string().min(1, 'Start block ID is required'), + sourceSnapshot: z.object({ + blockStates: z.record(z.any()), + executedBlocks: z.array(z.string()), + blockLogs: z.array(z.any()), + decisions: z.object({ + router: z.record(z.string()), + condition: z.record(z.string()), + }), + completedLoops: z.array(z.string()), + loopExecutions: z.record(z.any()).optional(), + parallelExecutions: z.record(z.any()).optional(), + parallelBlockMapping: z.record(z.any()).optional(), + activeExecutionPath: z.array(z.string()), + }), +}) + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +/** + * POST /api/workflows/[id]/execute-from-block + * + * Executes a workflow starting from a specific block using cached outputs + * for upstream/unaffected blocks from the source snapshot. + */ +export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { + const requestId = generateRequestId() + const { id: workflowId } = await params + + try { + const auth = await checkHybridAuth(req, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 }) + } + const userId = auth.userId + + let body: unknown + try { + body = await req.json() + } catch { + return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 }) + } + + const validation = ExecuteFromBlockSchema.safeParse(body) + if (!validation.success) { + logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors) + return NextResponse.json( + { + error: 'Invalid request body', + details: validation.error.errors.map((e) => ({ + path: e.path.join('.'), + message: e.message, + })), + }, + { status: 400 } + ) + } + + const { startBlockId, sourceSnapshot } = validation.data + + logger.info(`[${requestId}] Starting run-from-block execution`, { + workflowId, + userId, + startBlockId, + executedBlocksCount: sourceSnapshot.executedBlocks.length, + }) + + const executionId = uuidv4() + + // Load workflow record to get workspaceId + const [workflowRecord] = await db + .select({ workspaceId: workflowTable.workspaceId }) + .from(workflowTable) + .where(eq(workflowTable.id, workflowId)) + .limit(1) + + if (!workflowRecord?.workspaceId) { + return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 }) + } + + const workspaceId = workflowRecord.workspaceId + + // Load workflow state + const workflowData = await loadWorkflowFromNormalizedTables(workflowId) + if (!workflowData) { + return NextResponse.json({ error: 'Workflow state not found' }, { status: 404 }) + } + + const { blocks, edges, loops, parallels } = workflowData + + // Merge block states + const mergedStates = mergeSubblockState(blocks) + + // Get environment variables + const { personalDecrypted, workspaceDecrypted } = await getPersonalAndWorkspaceEnv( + userId, + workspaceId + ) + const decryptedEnvVars: Record = { ...personalDecrypted, ...workspaceDecrypted } + + // Serialize workflow + const serializedWorkflow = new Serializer().serializeWorkflow( + mergedStates, + edges, + loops, + parallels, + true + ) + + const encoder = new TextEncoder() + const abortController = new AbortController() + let isStreamClosed = false + + const stream = new ReadableStream({ + async start(controller) { + const sendEvent = (event: ExecutionEvent) => { + if (isStreamClosed) return + + try { + controller.enqueue(encodeSSEEvent(event)) + } catch { + isStreamClosed = true + } + } + + try { + const startTime = new Date() + + sendEvent({ + type: 'execution:started', + timestamp: startTime.toISOString(), + executionId, + workflowId, + data: { + startTime: startTime.toISOString(), + }, + }) + + const onBlockStart = async ( + blockId: string, + blockName: string, + blockType: string, + iterationContext?: IterationContext + ) => { + sendEvent({ + type: 'block:started', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + blockId, + blockName, + blockType, + ...(iterationContext && { + iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType, + }), + }, + }) + } + + const onBlockComplete = async ( + blockId: string, + blockName: string, + blockType: string, + callbackData: { input?: unknown; output: NormalizedBlockOutput; executionTime: number }, + iterationContext?: IterationContext + ) => { + const hasError = (callbackData.output as any)?.error + + if (hasError) { + sendEvent({ + type: 'block:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + blockId, + blockName, + blockType, + input: callbackData.input, + error: (callbackData.output as any).error, + durationMs: callbackData.executionTime || 0, + ...(iterationContext && { + iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType, + }), + }, + }) + } else { + sendEvent({ + type: 'block:completed', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + blockId, + blockName, + blockType, + input: callbackData.input, + output: callbackData.output, + durationMs: callbackData.executionTime || 0, + ...(iterationContext && { + iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType, + }), + }, + }) + } + } + + const onStream = async (streamingExecution: unknown) => { + const streamingExec = streamingExecution as { stream: ReadableStream; execution: any } + const blockId = streamingExec.execution?.blockId + + const reader = streamingExec.stream.getReader() + const decoder = new TextDecoder() + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + const chunk = decoder.decode(value, { stream: true }) + sendEvent({ + type: 'stream:chunk', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { blockId, chunk }, + }) + } + + sendEvent({ + type: 'stream:done', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { blockId }, + }) + } catch (error) { + logger.error(`[${requestId}] Error streaming block content:`, error) + } finally { + try { + reader.releaseLock() + } catch {} + } + } + + // Create executor and run from block + const executor = new DAGExecutor({ + workflow: serializedWorkflow, + envVarValues: decryptedEnvVars, + workflowInput: {}, + workflowVariables: {}, + contextExtensions: { + stream: true, + executionId, + workspaceId, + userId, + isDeployedContext: false, + onBlockStart, + onBlockComplete, + onStream, + abortSignal: abortController.signal, + }, + }) + + const result = await executor.executeFromBlock( + workflowId, + startBlockId, + sourceSnapshot as SerializableExecutionState + ) + + if (result.status === 'cancelled') { + sendEvent({ + type: 'execution:cancelled', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + duration: result.metadata?.duration || 0, + }, + }) + return + } + + sendEvent({ + type: 'execution:completed', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + success: result.success, + output: result.output, + duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || startTime.toISOString(), + endTime: result.metadata?.endTime || new Date().toISOString(), + }, + }) + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`) + + const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + + sendEvent({ + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + error: executionResult?.error || errorMessage, + duration: executionResult?.metadata?.duration || 0, + }, + }) + } finally { + if (!isStreamClosed) { + try { + controller.enqueue(encoder.encode('data: [DONE]\n\n')) + controller.close() + } catch { + // Stream already closed + } + } + } + }, + cancel() { + isStreamClosed = true + logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`) + abortController.abort() + markExecutionCancelled(executionId).catch(() => {}) + }, + }) + + return new NextResponse(stream, { + headers: { + ...SSE_HEADERS, + 'X-Execution-Id': executionId, + }, + }) + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Failed to start run-from-block execution:`, error) + return NextResponse.json( + { error: errorMessage || 'Failed to start execution' }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx index 42d2c3e84..f8a816a32 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx @@ -1,11 +1,12 @@ import { memo, useCallback } from 'react' -import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react' +import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react' import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn' import { cn } from '@/lib/core/utils/cn' import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers' import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider' import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils' import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow' +import { useExecutionStore } from '@/stores/execution' import { useNotificationStore } from '@/stores/notifications' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' @@ -97,12 +98,42 @@ export const ActionBar = memo( ) ) + const { activeWorkflowId } = useWorkflowRegistry() + const { isExecuting, getLastExecutionSnapshot } = useExecutionStore() const userPermissions = useUserPermissionsContext() const isStartBlock = isInputDefinitionTrigger(blockType) const isResponseBlock = blockType === 'response' const isNoteBlock = blockType === 'note' const isSubflowBlock = blockType === 'loop' || blockType === 'parallel' + const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel') + + // Check if run-from-block is available + const hasExecutionSnapshot = activeWorkflowId + ? !!getLastExecutionSnapshot(activeWorkflowId) + : false + const wasExecuted = activeWorkflowId + ? getLastExecutionSnapshot(activeWorkflowId)?.executedBlocks.includes(blockId) ?? false + : false + const canRunFromBlock = + hasExecutionSnapshot && + wasExecuted && + !isStartBlock && + !isNoteBlock && + !isSubflowBlock && + !isInsideSubflow && + !isExecuting + + const handleRunFromBlock = useCallback(() => { + if (!activeWorkflowId || !canRunFromBlock) return + + // Dispatch a custom event to trigger run-from-block execution + window.dispatchEvent( + new CustomEvent('run-from-block', { + detail: { blockId, workflowId: activeWorkflowId }, + }) + ) + }, [blockId, activeWorkflowId, canRunFromBlock]) /** * Get appropriate tooltip message based on disabled state @@ -174,6 +205,29 @@ export const ActionBar = memo( )} + {canRunFromBlock && ( + + + + + + {isExecuting ? 'Execution in progress' : getTooltipMessage('Run from this block')} + + + )} + {!isStartBlock && !isResponseBlock && ( diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/user-input/constants.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/user-input/constants.ts index b98af5dd2..118915239 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/user-input/constants.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/user-input/constants.ts @@ -246,6 +246,7 @@ export function getCommandDisplayLabel(commandId: string): string { * Model configuration options */ export const MODEL_OPTIONS = [ + { value: 'auto', label: 'Auto' }, { value: 'claude-4.5-opus', label: 'Claude 4.5 Opus' }, { value: 'claude-4.5-sonnet', label: 'Claude 4.5 Sonnet' }, { value: 'claude-4.5-haiku', label: 'Claude 4.5 Haiku' }, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 1c0cbcc7a..393bd9a00 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -1,4 +1,4 @@ -import { useCallback, useRef, useState } from 'react' +import { useCallback, useEffect, useRef, useState } from 'react' import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' import { v4 as uuidv4 } from 'uuid' @@ -15,7 +15,8 @@ import { TriggerUtils, } from '@/lib/workflows/triggers/triggers' import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow' -import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types' +import type { SerializableExecutionState } from '@/executor/execution/types' +import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors' import { coerceValue } from '@/executor/utils/start-block' import { subscriptionKeys } from '@/hooks/queries/subscription' @@ -32,6 +33,9 @@ import { useWorkflowStore } from '@/stores/workflows/workflow/store' const logger = createLogger('useWorkflowExecution') +// Module-level guard to prevent duplicate run-from-block executions across hook instances +let runFromBlockGlobalLock = false + // Debug state validation result interface DebugValidationResult { isValid: boolean @@ -98,6 +102,8 @@ export function useWorkflowExecution() { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, + setLastExecutionSnapshot, + getLastExecutionSnapshot, } = useExecutionStore() const [executionResult, setExecutionResult] = useState(null) const executionStream = useExecutionStream() @@ -876,6 +882,8 @@ export function useWorkflowExecution() { const activeBlocksSet = new Set() const streamedContent = new Map() const accumulatedBlockLogs: BlockLog[] = [] + const accumulatedBlockStates = new Map() + const executedBlockIds = new Set() // Execute the workflow try { @@ -922,6 +930,14 @@ export function useWorkflowExecution() { // Track successful block execution in run path setBlockRunStatus(data.blockId, 'success') + // Track block state for run-from-block snapshot + executedBlockIds.add(data.blockId) + accumulatedBlockStates.set(data.blockId, { + output: data.output, + executed: true, + executionTime: data.durationMs, + }) + // Edges already tracked in onBlockStarted, no need to track again const startedAt = new Date(Date.now() - data.durationMs).toISOString() @@ -1056,6 +1072,23 @@ export function useWorkflowExecution() { }, logs: accumulatedBlockLogs, } + + // Store execution snapshot for run-from-block + if (data.success && activeWorkflowId) { + const snapshot: SerializableExecutionState = { + blockStates: Object.fromEntries(accumulatedBlockStates), + executedBlocks: Array.from(executedBlockIds), + blockLogs: accumulatedBlockLogs, + decisions: { router: {}, condition: {} }, + completedLoops: [], + activeExecutionPath: Array.from(executedBlockIds), + } + setLastExecutionSnapshot(activeWorkflowId, snapshot) + logger.info('Stored execution snapshot for run-from-block', { + workflowId: activeWorkflowId, + executedBlocksCount: executedBlockIds.size, + }) + } }, onExecutionError: (data) => { @@ -1376,6 +1409,228 @@ export function useWorkflowExecution() { setActiveBlocks, ]) + /** + * Handles running workflow from a specific block using cached outputs + */ + const handleRunFromBlock = useCallback( + async (blockId: string, workflowId: string) => { + // Prevent duplicate executions across multiple hook instances (panel.tsx and chat.tsx) + if (runFromBlockGlobalLock) { + logger.debug('Run-from-block already in progress (global lock), ignoring duplicate request', { + workflowId, + blockId, + }) + return + } + runFromBlockGlobalLock = true + + const snapshot = getLastExecutionSnapshot(workflowId) + if (!snapshot) { + logger.error('No execution snapshot available for run-from-block', { workflowId, blockId }) + runFromBlockGlobalLock = false + return + } + + if (!snapshot.executedBlocks.includes(blockId)) { + logger.error('Block was not executed in the source run', { workflowId, blockId }) + runFromBlockGlobalLock = false + return + } + + logger.info('Starting run-from-block execution', { + workflowId, + startBlockId: blockId, + snapshotExecutedBlocks: snapshot.executedBlocks.length, + }) + + setIsExecuting(true) + + const workflowEdges = useWorkflowStore.getState().edges + const executionId = uuidv4() + const accumulatedBlockLogs: BlockLog[] = [] + const accumulatedBlockStates = new Map() + const executedBlockIds = new Set() + const activeBlocksSet = new Set() + + try { + await executionStream.executeFromBlock({ + workflowId, + startBlockId: blockId, + sourceSnapshot: snapshot, + callbacks: { + onExecutionStarted: (data) => { + logger.info('Run-from-block execution started:', data) + }, + + onBlockStarted: (data) => { + activeBlocksSet.add(data.blockId) + setActiveBlocks(new Set(activeBlocksSet)) + + const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId) + incomingEdges.forEach((edge) => { + setEdgeRunStatus(edge.id, 'success') + }) + }, + + onBlockCompleted: (data) => { + activeBlocksSet.delete(data.blockId) + setActiveBlocks(new Set(activeBlocksSet)) + + setBlockRunStatus(data.blockId, 'success') + + executedBlockIds.add(data.blockId) + accumulatedBlockStates.set(data.blockId, { + output: data.output, + executed: true, + executionTime: data.durationMs, + }) + + const startedAt = new Date(Date.now() - data.durationMs).toISOString() + const endedAt = new Date().toISOString() + + accumulatedBlockLogs.push({ + blockId: data.blockId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + input: data.input || {}, + output: data.output, + success: true, + durationMs: data.durationMs, + startedAt, + endedAt, + }) + + addConsole({ + input: data.input || {}, + output: data.output, + success: true, + durationMs: data.durationMs, + startedAt, + endedAt, + workflowId, + blockId: data.blockId, + executionId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }) + }, + + onBlockError: (data) => { + activeBlocksSet.delete(data.blockId) + setActiveBlocks(new Set(activeBlocksSet)) + + setBlockRunStatus(data.blockId, 'error') + + const startedAt = new Date(Date.now() - data.durationMs).toISOString() + const endedAt = new Date().toISOString() + + accumulatedBlockLogs.push({ + blockId: data.blockId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + input: data.input || {}, + output: {}, + success: false, + error: data.error, + durationMs: data.durationMs, + startedAt, + endedAt, + }) + + addConsole({ + input: data.input || {}, + output: {}, + success: false, + error: data.error, + durationMs: data.durationMs, + startedAt, + endedAt, + workflowId, + blockId: data.blockId, + executionId, + blockName: data.blockName, + blockType: data.blockType, + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }) + }, + + onExecutionCompleted: (data) => { + if (data.success) { + // Merge new states with snapshot states for updated snapshot + const mergedBlockStates: Record = { ...snapshot.blockStates } + for (const [bId, state] of accumulatedBlockStates) { + mergedBlockStates[bId] = state + } + + const mergedExecutedBlocks = new Set([ + ...snapshot.executedBlocks, + ...executedBlockIds, + ]) + + const updatedSnapshot: SerializableExecutionState = { + ...snapshot, + blockStates: mergedBlockStates, + executedBlocks: Array.from(mergedExecutedBlocks), + blockLogs: [...snapshot.blockLogs, ...accumulatedBlockLogs], + activeExecutionPath: Array.from(mergedExecutedBlocks), + } + setLastExecutionSnapshot(workflowId, updatedSnapshot) + logger.info('Updated execution snapshot after run-from-block', { + workflowId, + newBlocksExecuted: executedBlockIds.size, + }) + } + }, + + onExecutionError: (data) => { + logger.error('Run-from-block execution error:', data.error) + }, + + onExecutionCancelled: () => { + logger.info('Run-from-block execution cancelled') + }, + }, + }) + } catch (error) { + if ((error as Error).name !== 'AbortError') { + logger.error('Run-from-block execution failed:', error) + } + } finally { + setIsExecuting(false) + setActiveBlocks(new Set()) + runFromBlockGlobalLock = false + } + }, + [ + getLastExecutionSnapshot, + setLastExecutionSnapshot, + setIsExecuting, + setActiveBlocks, + setBlockRunStatus, + setEdgeRunStatus, + addConsole, + executionStream, + ] + ) + + // Listen for run-from-block events from the action bar + useEffect(() => { + const handleRunFromBlockEvent = (event: CustomEvent<{ blockId: string; workflowId: string }>) => { + const { blockId, workflowId } = event.detail + handleRunFromBlock(blockId, workflowId) + } + + window.addEventListener('run-from-block', handleRunFromBlockEvent as EventListener) + return () => { + window.removeEventListener('run-from-block', handleRunFromBlockEvent as EventListener) + } + }, [handleRunFromBlock]) + return { isExecuting, isDebugging, @@ -1386,5 +1641,6 @@ export function useWorkflowExecution() { handleResumeDebug, handleCancelDebug, handleCancelExecution, + handleRunFromBlock, } } diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 05e7e0484..9cea32218 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -259,6 +259,17 @@ export class ExecutionEngine { } private initializeQueue(triggerBlockId?: string): void { + // Run-from-block mode: start directly from specified block + if (this.context.runFromBlockContext) { + const { startBlockId } = this.context.runFromBlockContext + logger.info('Initializing queue for run-from-block mode', { + startBlockId, + dirtySetSize: this.context.runFromBlockContext.dirtySet.size, + }) + this.addToQueue(startBlockId) + return + } + const pendingBlocks = this.context.metadata.pendingBlocks const remainingEdges = (this.context.metadata as any).remainingEdges diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index b4e1f55f8..3351aaff8 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -5,12 +5,17 @@ import { BlockExecutor } from '@/executor/execution/block-executor' import { EdgeManager } from '@/executor/execution/edge-manager' import { ExecutionEngine } from '@/executor/execution/engine' import { ExecutionState } from '@/executor/execution/state' -import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types' +import type { + ContextExtensions, + SerializableExecutionState, + WorkflowInput, +} from '@/executor/execution/types' import { createBlockHandlers } from '@/executor/handlers/registry' import { LoopOrchestrator } from '@/executor/orchestrators/loop' import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node' import { ParallelOrchestrator } from '@/executor/orchestrators/parallel' import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types' +import { computeDirtySet, validateRunFromBlock } from '@/executor/utils/run-from-block' import { buildResolutionFromBlock, buildStartBlockOutput, @@ -89,17 +94,103 @@ export class DAGExecutor { } } + /** + * Execute workflow starting from a specific block, using cached outputs + * for all upstream/unaffected blocks from the source snapshot. + * + * This implements Jupyter notebook-style execution where: + * - The start block and all downstream blocks are re-executed + * - Upstream blocks retain their cached outputs from the source snapshot + * - The result is a merged execution state + * + * @param workflowId - The workflow ID + * @param startBlockId - The block to start execution from + * @param sourceSnapshot - The execution state from a previous run + * @returns Merged execution result with cached + fresh outputs + */ + async executeFromBlock( + workflowId: string, + startBlockId: string, + sourceSnapshot: SerializableExecutionState + ): Promise { + // Build full DAG (no trigger constraint - we need all blocks for validation) + const dag = this.dagBuilder.build(this.workflow) + + // Validate the start block + const executedBlocks = new Set(sourceSnapshot.executedBlocks) + const validation = validateRunFromBlock(startBlockId, dag, executedBlocks) + if (!validation.valid) { + throw new Error(validation.error) + } + + // Compute dirty set (blocks that will be re-executed) + const dirtySet = computeDirtySet(dag, startBlockId) + + logger.info('Executing from block', { + workflowId, + startBlockId, + dirtySetSize: dirtySet.size, + totalBlocks: dag.nodes.size, + dirtyBlocks: Array.from(dirtySet), + }) + + // Create context with snapshot state + runFromBlockContext + const runFromBlockContext = { startBlockId, dirtySet } + const { context, state } = this.createExecutionContext(workflowId, undefined, { + snapshotState: sourceSnapshot, + runFromBlockContext, + }) + + // Setup orchestrators and engine (same as execute()) + const resolver = new VariableResolver(this.workflow, this.workflowVariables, state) + const loopOrchestrator = new LoopOrchestrator(dag, state, resolver) + loopOrchestrator.setContextExtensions(this.contextExtensions) + const parallelOrchestrator = new ParallelOrchestrator(dag, state) + parallelOrchestrator.setResolver(resolver) + parallelOrchestrator.setContextExtensions(this.contextExtensions) + const allHandlers = createBlockHandlers() + const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state) + const edgeManager = new EdgeManager(dag) + loopOrchestrator.setEdgeManager(edgeManager) + const nodeOrchestrator = new NodeExecutionOrchestrator( + dag, + state, + blockExecutor, + loopOrchestrator, + parallelOrchestrator + ) + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + // Run and return result + return await engine.run() + } + private createExecutionContext( workflowId: string, - triggerBlockId?: string + triggerBlockId?: string, + overrides?: { + snapshotState?: SerializableExecutionState + runFromBlockContext?: { startBlockId: string; dirtySet: Set } + } ): { context: ExecutionContext; state: ExecutionState } { - const snapshotState = this.contextExtensions.snapshotState + const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState const blockStates = snapshotState?.blockStates ? new Map(Object.entries(snapshotState.blockStates)) : new Map() - const executedBlocks = snapshotState?.executedBlocks + let executedBlocks = snapshotState?.executedBlocks ? new Set(snapshotState.executedBlocks) : new Set() + + // In run-from-block mode, clear the executed status for dirty blocks so they can be re-executed + if (overrides?.runFromBlockContext) { + const { dirtySet } = overrides.runFromBlockContext + executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id))) + logger.info('Cleared executed status for dirty blocks', { + dirtySetSize: dirtySet.size, + remainingExecutedBlocks: executedBlocks.size, + }) + } + const state = new ExecutionState(blockStates, executedBlocks) const context: ExecutionContext = { @@ -169,6 +260,7 @@ export class DAGExecutor { abortSignal: this.contextExtensions.abortSignal, includeFileBase64: this.contextExtensions.includeFileBase64, base64MaxBytes: this.contextExtensions.base64MaxBytes, + runFromBlockContext: overrides?.runFromBlockContext, } if (this.contextExtensions.resumeFromSnapshot) { @@ -193,6 +285,12 @@ export class DAGExecutor { pendingBlocks: context.metadata.pendingBlocks, skipStarterBlockInit: true, }) + } else if (overrides?.runFromBlockContext) { + // In run-from-block mode, skip starter block initialization + // All block states come from the snapshot + logger.info('Run-from-block mode: skipping starter block initialization', { + startBlockId: overrides.runFromBlockContext.startBlockId, + }) } else { this.initializeStarterBlock(context, state, triggerBlockId) } diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 701f5de35..73e6e11ba 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -105,6 +105,15 @@ export interface ContextExtensions { output: { input?: any; output: NormalizedBlockOutput; executionTime: number }, iterationContext?: IterationContext ) => Promise + + /** + * Run-from-block configuration. When provided, executor runs in partial + * execution mode starting from the specified block. + */ + runFromBlockContext?: { + startBlockId: string + dirtySet: Set + } } export interface WorkflowInput { diff --git a/apps/sim/executor/orchestrators/node.ts b/apps/sim/executor/orchestrators/node.ts index e5d7bc1a1..244b54abd 100644 --- a/apps/sim/executor/orchestrators/node.ts +++ b/apps/sim/executor/orchestrators/node.ts @@ -31,7 +31,20 @@ export class NodeExecutionOrchestrator { throw new Error(`Node not found in DAG: ${nodeId}`) } - if (this.state.hasExecuted(nodeId)) { + // In run-from-block mode, skip execution for non-dirty blocks and return cached output + if (ctx.runFromBlockContext && !ctx.runFromBlockContext.dirtySet.has(nodeId)) { + const cachedOutput = this.state.getBlockOutput(nodeId) || {} + logger.debug('Skipping non-dirty block in run-from-block mode', { nodeId }) + return { + nodeId, + output: cachedOutput, + isFinalOutput: false, + } + } + + // Skip hasExecuted check for dirty blocks in run-from-block mode - they need to be re-executed + const isDirtyBlock = ctx.runFromBlockContext?.dirtySet.has(nodeId) ?? false + if (!isDirtyBlock && this.state.hasExecuted(nodeId)) { const output = this.state.getBlockOutput(nodeId) || {} return { nodeId, diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 27eaa0c2b..aa4e05523 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -250,6 +250,15 @@ export interface ExecutionContext { * will not have their base64 content fetched. */ base64MaxBytes?: number + + /** + * Context for "run from block" mode. When present, only blocks in dirtySet + * will be executed; others return cached outputs from the source snapshot. + */ + runFromBlockContext?: { + startBlockId: string + dirtySet: Set + } } export interface ExecutionResult { diff --git a/apps/sim/executor/utils/run-from-block.test.ts b/apps/sim/executor/utils/run-from-block.test.ts new file mode 100644 index 000000000..2e66d9fdf --- /dev/null +++ b/apps/sim/executor/utils/run-from-block.test.ts @@ -0,0 +1,272 @@ +import { describe, expect, it } from 'vitest' +import type { DAG, DAGNode } from '@/executor/dag/builder' +import type { DAGEdge, NodeMetadata } from '@/executor/dag/types' +import type { SerializedLoop, SerializedParallel } from '@/serializer/types' +import { computeDirtySet, validateRunFromBlock } from '@/executor/utils/run-from-block' + +/** + * Helper to create a DAG node for testing + */ +function createNode( + id: string, + outgoingEdges: Array<{ target: string; sourceHandle?: string }> = [], + metadata: Partial = {} +): DAGNode { + const edges = new Map() + for (const edge of outgoingEdges) { + edges.set(edge.target, { target: edge.target, sourceHandle: edge.sourceHandle }) + } + + return { + id, + block: { + id, + position: { x: 0, y: 0 }, + config: { tool: 'test', params: {} }, + inputs: {}, + outputs: {}, + metadata: { id: 'test', name: `block-${id}`, category: 'tools' }, + enabled: true, + }, + incomingEdges: new Set(), + outgoingEdges: edges, + metadata: { + isParallelBranch: false, + isLoopNode: false, + isSentinel: false, + ...metadata, + }, + } +} + +/** + * Helper to create a DAG for testing + */ +function createDAG(nodes: DAGNode[]): DAG { + const nodeMap = new Map() + for (const node of nodes) { + nodeMap.set(node.id, node) + } + + // Set up incoming edges based on outgoing edges + for (const node of nodes) { + for (const [, edge] of node.outgoingEdges) { + const targetNode = nodeMap.get(edge.target) + if (targetNode) { + targetNode.incomingEdges.add(node.id) + } + } + } + + return { + nodes: nodeMap, + loopConfigs: new Map(), + parallelConfigs: new Map(), + } +} + +describe('computeDirtySet', () => { + it.concurrent('includes start block in dirty set', () => { + const dag = createDAG([createNode('A'), createNode('B'), createNode('C')]) + + const dirtySet = computeDirtySet(dag, 'B') + + expect(dirtySet.has('B')).toBe(true) + }) + + it.concurrent('includes all downstream blocks in linear workflow', () => { + // A → B → C → D + const dag = createDAG([ + createNode('A', [{ target: 'B' }]), + createNode('B', [{ target: 'C' }]), + createNode('C', [{ target: 'D' }]), + createNode('D'), + ]) + + const dirtySet = computeDirtySet(dag, 'B') + + expect(dirtySet.has('A')).toBe(false) + expect(dirtySet.has('B')).toBe(true) + expect(dirtySet.has('C')).toBe(true) + expect(dirtySet.has('D')).toBe(true) + expect(dirtySet.size).toBe(3) + }) + + it.concurrent('handles branching paths', () => { + // A → B → C + // ↓ + // D → E + const dag = createDAG([ + createNode('A', [{ target: 'B' }]), + createNode('B', [{ target: 'C' }, { target: 'D' }]), + createNode('C'), + createNode('D', [{ target: 'E' }]), + createNode('E'), + ]) + + const dirtySet = computeDirtySet(dag, 'B') + + expect(dirtySet.has('A')).toBe(false) + expect(dirtySet.has('B')).toBe(true) + expect(dirtySet.has('C')).toBe(true) + expect(dirtySet.has('D')).toBe(true) + expect(dirtySet.has('E')).toBe(true) + expect(dirtySet.size).toBe(4) + }) + + it.concurrent('handles convergence points', () => { + // A → C + // B → C → D + const dag = createDAG([ + createNode('A', [{ target: 'C' }]), + createNode('B', [{ target: 'C' }]), + createNode('C', [{ target: 'D' }]), + createNode('D'), + ]) + + // Run from A: should include A, C, D (but not B) + const dirtySet = computeDirtySet(dag, 'A') + + expect(dirtySet.has('A')).toBe(true) + expect(dirtySet.has('B')).toBe(false) + expect(dirtySet.has('C')).toBe(true) + expect(dirtySet.has('D')).toBe(true) + expect(dirtySet.size).toBe(3) + }) + + it.concurrent('handles diamond pattern', () => { + // B + // ↗ ↘ + // A D + // ↘ ↗ + // C + const dag = createDAG([ + createNode('A', [{ target: 'B' }, { target: 'C' }]), + createNode('B', [{ target: 'D' }]), + createNode('C', [{ target: 'D' }]), + createNode('D'), + ]) + + const dirtySet = computeDirtySet(dag, 'A') + + expect(dirtySet.has('A')).toBe(true) + expect(dirtySet.has('B')).toBe(true) + expect(dirtySet.has('C')).toBe(true) + expect(dirtySet.has('D')).toBe(true) + expect(dirtySet.size).toBe(4) + }) + + it.concurrent('stops at graph boundaries', () => { + // A → B C → D (disconnected) + const dag = createDAG([ + createNode('A', [{ target: 'B' }]), + createNode('B'), + createNode('C', [{ target: 'D' }]), + createNode('D'), + ]) + + const dirtySet = computeDirtySet(dag, 'A') + + expect(dirtySet.has('A')).toBe(true) + expect(dirtySet.has('B')).toBe(true) + expect(dirtySet.has('C')).toBe(false) + expect(dirtySet.has('D')).toBe(false) + expect(dirtySet.size).toBe(2) + }) + + it.concurrent('handles single node workflow', () => { + const dag = createDAG([createNode('A')]) + + const dirtySet = computeDirtySet(dag, 'A') + + expect(dirtySet.has('A')).toBe(true) + expect(dirtySet.size).toBe(1) + }) + + it.concurrent('handles node not in DAG gracefully', () => { + const dag = createDAG([createNode('A'), createNode('B')]) + + const dirtySet = computeDirtySet(dag, 'nonexistent') + + // Should just contain the start block ID even if not found + expect(dirtySet.has('nonexistent')).toBe(true) + expect(dirtySet.size).toBe(1) + }) +}) + +describe('validateRunFromBlock', () => { + it.concurrent('accepts valid block', () => { + const dag = createDAG([createNode('A'), createNode('B')]) + const executedBlocks = new Set(['A', 'B']) + + const result = validateRunFromBlock('A', dag, executedBlocks) + + expect(result.valid).toBe(true) + expect(result.error).toBeUndefined() + }) + + it.concurrent('rejects block not found in DAG', () => { + const dag = createDAG([createNode('A')]) + const executedBlocks = new Set(['A', 'B']) + + const result = validateRunFromBlock('B', dag, executedBlocks) + + expect(result.valid).toBe(false) + expect(result.error).toContain('Block not found') + }) + + it.concurrent('rejects blocks inside loops', () => { + const dag = createDAG([createNode('A', [], { isLoopNode: true, loopId: 'loop-1' })]) + const executedBlocks = new Set(['A']) + + const result = validateRunFromBlock('A', dag, executedBlocks) + + expect(result.valid).toBe(false) + expect(result.error).toContain('inside loop') + expect(result.error).toContain('loop-1') + }) + + it.concurrent('rejects blocks inside parallels', () => { + const dag = createDAG([createNode('A', [], { isParallelBranch: true, parallelId: 'parallel-1' })]) + const executedBlocks = new Set(['A']) + + const result = validateRunFromBlock('A', dag, executedBlocks) + + expect(result.valid).toBe(false) + expect(result.error).toContain('inside parallel') + expect(result.error).toContain('parallel-1') + }) + + it.concurrent('rejects sentinel nodes', () => { + const dag = createDAG([createNode('A', [], { isSentinel: true, sentinelType: 'start' })]) + const executedBlocks = new Set(['A']) + + const result = validateRunFromBlock('A', dag, executedBlocks) + + expect(result.valid).toBe(false) + expect(result.error).toContain('sentinel') + }) + + it.concurrent('rejects unexecuted blocks', () => { + const dag = createDAG([createNode('A'), createNode('B')]) + const executedBlocks = new Set(['A']) // B was not executed + + const result = validateRunFromBlock('B', dag, executedBlocks) + + expect(result.valid).toBe(false) + expect(result.error).toContain('was not executed') + }) + + it.concurrent('accepts regular executed block', () => { + const dag = createDAG([ + createNode('trigger', [{ target: 'A' }]), + createNode('A', [{ target: 'B' }]), + createNode('B'), + ]) + const executedBlocks = new Set(['trigger', 'A', 'B']) + + const result = validateRunFromBlock('A', dag, executedBlocks) + + expect(result.valid).toBe(true) + }) +}) diff --git a/apps/sim/executor/utils/run-from-block.ts b/apps/sim/executor/utils/run-from-block.ts new file mode 100644 index 000000000..57e1e81e8 --- /dev/null +++ b/apps/sim/executor/utils/run-from-block.ts @@ -0,0 +1,110 @@ +import { createLogger } from '@sim/logger' +import type { DAG } from '@/executor/dag/builder' + +const logger = createLogger('run-from-block') + +/** + * Result of validating a block for run-from-block execution. + */ +export interface RunFromBlockValidation { + valid: boolean + error?: string +} + +/** + * Context for run-from-block execution mode. + */ +export interface RunFromBlockContext { + /** The block ID to start execution from */ + startBlockId: string + /** Set of block IDs that need re-execution (start block + all downstream) */ + dirtySet: Set +} + +/** + * Computes all blocks that need re-execution when running from a specific block. + * Uses BFS to find all downstream blocks reachable via outgoing edges. + * + * @param dag - The workflow DAG + * @param startBlockId - The block to start execution from + * @returns Set of block IDs that are "dirty" and need re-execution + */ +export function computeDirtySet(dag: DAG, startBlockId: string): Set { + const dirty = new Set([startBlockId]) + const queue = [startBlockId] + + while (queue.length > 0) { + const nodeId = queue.shift()! + const node = dag.nodes.get(nodeId) + if (!node) continue + + for (const [, edge] of node.outgoingEdges) { + if (!dirty.has(edge.target)) { + dirty.add(edge.target) + queue.push(edge.target) + } + } + } + + logger.debug('Computed dirty set', { + startBlockId, + dirtySetSize: dirty.size, + dirtyBlocks: Array.from(dirty), + }) + + return dirty +} + +/** + * Validates that a block can be used as a run-from-block starting point. + * + * Validation rules: + * - Block must exist in the DAG + * - Block cannot be inside a loop + * - Block cannot be inside a parallel + * - Block cannot be a sentinel node + * - Block must have been executed in the source run + * + * @param blockId - The block ID to validate + * @param dag - The workflow DAG + * @param executedBlocks - Set of blocks that were executed in the source run + * @returns Validation result with error message if invalid + */ +export function validateRunFromBlock( + blockId: string, + dag: DAG, + executedBlocks: Set +): RunFromBlockValidation { + const node = dag.nodes.get(blockId) + + if (!node) { + return { valid: false, error: `Block not found in workflow: ${blockId}` } + } + + if (node.metadata.isLoopNode) { + return { + valid: false, + error: `Cannot run from block inside loop: ${node.metadata.loopId}`, + } + } + + if (node.metadata.isParallelBranch) { + return { + valid: false, + error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`, + } + } + + if (node.metadata.isSentinel) { + return { valid: false, error: 'Cannot run from sentinel node' } + } + + if (!executedBlocks.has(blockId)) { + return { + valid: false, + error: `Block was not executed in source run: ${blockId}`, + } + } + + return { valid: true } +} diff --git a/apps/sim/hooks/use-execution-stream.ts b/apps/sim/hooks/use-execution-stream.ts index b9d1cc685..ae5ab2d04 100644 --- a/apps/sim/hooks/use-execution-stream.ts +++ b/apps/sim/hooks/use-execution-stream.ts @@ -1,6 +1,7 @@ import { useCallback, useRef } from 'react' import { createLogger } from '@sim/logger' import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events' +import type { SerializableExecutionState } from '@/executor/execution/types' import type { SubflowType } from '@/stores/workflows/workflow/types' const logger = createLogger('useExecutionStream') @@ -71,6 +72,13 @@ export interface ExecuteStreamOptions { callbacks?: ExecutionStreamCallbacks } +export interface ExecuteFromBlockOptions { + workflowId: string + startBlockId: string + sourceSnapshot: SerializableExecutionState + callbacks?: ExecutionStreamCallbacks +} + /** * Hook for executing workflows via server-side SSE streaming */ @@ -222,6 +230,140 @@ export function useExecutionStream() { } }, []) + const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => { + const { workflowId, startBlockId, sourceSnapshot, callbacks = {} } = options + + if (abortControllerRef.current) { + abortControllerRef.current.abort() + } + + const abortController = new AbortController() + abortControllerRef.current = abortController + currentExecutionRef.current = null + + try { + const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ startBlockId, sourceSnapshot }), + signal: abortController.signal, + }) + + if (!response.ok) { + const errorResponse = await response.json() + const error = new Error(errorResponse.error || 'Failed to start execution') + if (errorResponse && typeof errorResponse === 'object') { + Object.assign(error, { executionResult: errorResponse }) + } + throw error + } + + if (!response.body) { + throw new Error('No response body') + } + + const executionId = response.headers.get('X-Execution-Id') + if (executionId) { + currentExecutionRef.current = { workflowId, executionId } + } + + const reader = response.body.getReader() + const decoder = new TextDecoder() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + + if (done) { + break + } + + buffer += decoder.decode(value, { stream: true }) + + const lines = buffer.split('\n\n') + + buffer = lines.pop() || '' + + for (const line of lines) { + if (!line.trim() || !line.startsWith('data: ')) { + continue + } + + const data = line.substring(6).trim() + + if (data === '[DONE]') { + logger.info('Run-from-block stream completed') + continue + } + + try { + const event = JSON.parse(data) as ExecutionEvent + + logger.info('📡 Run-from-block SSE Event:', { + type: event.type, + executionId: event.executionId, + }) + + switch (event.type) { + case 'execution:started': + callbacks.onExecutionStarted?.(event.data) + break + case 'execution:completed': + callbacks.onExecutionCompleted?.(event.data) + break + case 'execution:error': + callbacks.onExecutionError?.(event.data) + break + case 'execution:cancelled': + callbacks.onExecutionCancelled?.(event.data) + break + case 'block:started': + callbacks.onBlockStarted?.(event.data) + break + case 'block:completed': + callbacks.onBlockCompleted?.(event.data) + break + case 'block:error': + callbacks.onBlockError?.(event.data) + break + case 'stream:chunk': + callbacks.onStreamChunk?.(event.data) + break + case 'stream:done': + callbacks.onStreamDone?.(event.data) + break + default: + logger.warn('Unknown event type:', (event as any).type) + } + } catch (error) { + logger.error('Failed to parse SSE event:', error, { data }) + } + } + } + } finally { + reader.releaseLock() + } + } catch (error: any) { + if (error.name === 'AbortError') { + logger.info('Run-from-block execution cancelled') + callbacks.onExecutionCancelled?.({ duration: 0 }) + } else { + logger.error('Run-from-block execution error:', error) + callbacks.onExecutionError?.({ + error: error.message || 'Unknown error', + duration: 0, + }) + } + throw error + } finally { + abortControllerRef.current = null + currentExecutionRef.current = null + } + }, []) + const cancel = useCallback(() => { const execution = currentExecutionRef.current if (execution) { @@ -239,6 +381,7 @@ export function useExecutionStream() { return { execute, + executeFromBlock, cancel, } } diff --git a/apps/sim/lib/copilot/models.ts b/apps/sim/lib/copilot/models.ts index 83a90169b..3dec2ef88 100644 --- a/apps/sim/lib/copilot/models.ts +++ b/apps/sim/lib/copilot/models.ts @@ -21,6 +21,7 @@ export const COPILOT_MODEL_IDS = [ 'claude-4.5-opus', 'claude-4.1-opus', 'gemini-3-pro', + 'auto', ] as const export type CopilotModelId = (typeof COPILOT_MODEL_IDS)[number] diff --git a/apps/sim/stores/execution/store.ts b/apps/sim/stores/execution/store.ts index 87fa444ec..912579d4c 100644 --- a/apps/sim/stores/execution/store.ts +++ b/apps/sim/stores/execution/store.ts @@ -35,4 +35,23 @@ export const useExecutionStore = create()((se }, clearRunPath: () => set({ lastRunPath: new Map(), lastRunEdges: new Map() }), reset: () => set(initialState), + + setLastExecutionSnapshot: (workflowId, snapshot) => { + const { lastExecutionSnapshots } = get() + const newSnapshots = new Map(lastExecutionSnapshots) + newSnapshots.set(workflowId, snapshot) + set({ lastExecutionSnapshots: newSnapshots }) + }, + + getLastExecutionSnapshot: (workflowId) => { + const { lastExecutionSnapshots } = get() + return lastExecutionSnapshots.get(workflowId) + }, + + clearLastExecutionSnapshot: (workflowId) => { + const { lastExecutionSnapshots } = get() + const newSnapshots = new Map(lastExecutionSnapshots) + newSnapshots.delete(workflowId) + set({ lastExecutionSnapshots: newSnapshots }) + }, })) diff --git a/apps/sim/stores/execution/types.ts b/apps/sim/stores/execution/types.ts index 3994f4aab..27e3f79d3 100644 --- a/apps/sim/stores/execution/types.ts +++ b/apps/sim/stores/execution/types.ts @@ -1,4 +1,5 @@ import type { Executor } from '@/executor' +import type { SerializableExecutionState } from '@/executor/execution/types' import type { ExecutionContext } from '@/executor/types' /** @@ -28,6 +29,11 @@ export interface ExecutionState { * Cleared when a new run starts. Used to show run path indicators on edges. */ lastRunEdges: Map + /** + * Stores the last successful execution snapshot per workflow. + * Used for run-from-block functionality. + */ + lastExecutionSnapshots: Map } export interface ExecutionActions { @@ -41,6 +47,18 @@ export interface ExecutionActions { setEdgeRunStatus: (edgeId: string, status: EdgeRunStatus) => void clearRunPath: () => void reset: () => void + /** + * Store the execution snapshot for a workflow after successful execution. + */ + setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void + /** + * Get the last execution snapshot for a workflow. + */ + getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined + /** + * Clear the execution snapshot for a workflow. + */ + clearLastExecutionSnapshot: (workflowId: string) => void } export const initialState: ExecutionState = { @@ -52,4 +70,5 @@ export const initialState: ExecutionState = { debugContext: null, lastRunPath: new Map(), lastRunEdges: new Map(), + lastExecutionSnapshots: new Map(), }