fix(execution): scope execution state per workflow to prevent cross-workflow bleed (#3183)

* fix(execution): scope execution state per workflow to prevent cross-workflow bleed

* fix(execution): use validated workflowId param instead of non-null assertion in handleRunUntilBlock

* improvement(execution): use individual selectors to avoid unnecessary re-renders from unselectored store hook

* improvement(execution): use useShallow selector in workflow.tsx to avoid re-renders from lastRunPath/lastRunEdges changes
This commit is contained in:
Waleed
2026-02-10 18:17:50 -08:00
committed by GitHub
parent 6d16f216c8
commit 78fef22d0e
16 changed files with 934 additions and 182 deletions

View File

@@ -7,7 +7,7 @@ import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/provide
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useExecutionStore } from '@/stores/execution'
import { useCurrentWorkflowExecution, useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
@@ -114,7 +114,8 @@ export const ActionBar = memo(
)
const { activeWorkflowId } = useWorkflowRegistry()
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
const { isExecuting } = useCurrentWorkflowExecution()
const getLastExecutionSnapshot = useExecutionStore((s) => s.getLastExecutionSnapshot)
const userPermissions = useUserPermissionsContext()
const edges = useWorkflowStore((state) => state.edges)

View File

@@ -51,7 +51,7 @@ import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowI
import type { BlockLog, ExecutionResult } from '@/executor/types'
import { useChatStore } from '@/stores/chat/store'
import { getChatPosition } from '@/stores/chat/utils'
import { useExecutionStore } from '@/stores/execution'
import { useCurrentWorkflowExecution } from '@/stores/execution'
import { useOperationQueue } from '@/stores/operation-queue/store'
import { useTerminalConsoleStore } from '@/stores/terminal'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
@@ -256,7 +256,7 @@ export function Chat() {
const hasConsoleHydrated = useTerminalConsoleStore((state) => state._hasHydrated)
const entriesFromStore = useTerminalConsoleStore((state) => state.entries)
const entries = hasConsoleHydrated ? entriesFromStore : []
const { isExecuting } = useExecutionStore()
const { isExecuting } = useCurrentWorkflowExecution()
const { handleRunWorkflow, handleCancelExecution } = useWorkflowExecution()
const { data: session } = useSession()
const { addToQueue } = useOperationQueue()

View File

@@ -1,7 +1,7 @@
import { useCallback } from 'react'
import type { DiffStatus } from '@/lib/workflows/diff/types'
import { hasDiffStatus } from '@/lib/workflows/diff/types'
import { useExecutionStore } from '@/stores/execution'
import { useIsBlockActive } from '@/stores/execution'
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
import type { CurrentWorkflow } from '../../../hooks/use-current-workflow'
import type { WorkflowBlockProps } from '../types'
@@ -67,7 +67,7 @@ export function useBlockState(
const isDeletedBlock = !isShowingDiff && diffAnalysis?.deleted_blocks?.includes(blockId)
// Execution state
const isActiveBlock = useExecutionStore((state) => state.activeBlockIds.has(blockId))
const isActiveBlock = useIsBlockActive(blockId)
const isActive = data.isActive || isActiveBlock
return {

View File

@@ -3,7 +3,7 @@ import { X } from 'lucide-react'
import { BaseEdge, EdgeLabelRenderer, type EdgeProps, getSmoothStepPath } from 'reactflow'
import { useShallow } from 'zustand/react/shallow'
import type { EdgeDiffStatus } from '@/lib/workflows/diff/types'
import { useExecutionStore } from '@/stores/execution'
import { useLastRunEdges } from '@/stores/execution'
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
/** Extended edge props with optional handle identifiers */
@@ -49,7 +49,7 @@ const WorkflowEdgeComponent = ({
isDiffReady: state.isDiffReady,
}))
)
const lastRunEdges = useExecutionStore((state) => state.lastRunEdges)
const lastRunEdges = useLastRunEdges()
const dataSourceHandle = (data as { sourceHandle?: string } | undefined)?.sourceHandle
const isErrorEdge = (sourceHandle ?? dataSourceHandle) === 'error'

View File

@@ -3,7 +3,7 @@ import { useBlockState } from '@/app/workspace/[workspaceId]/w/[workflowId]/comp
import type { WorkflowBlockProps } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/types'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import { getBlockRingStyles } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/block-ring-utils'
import { useExecutionStore } from '@/stores/execution'
import { useLastRunPath } from '@/stores/execution'
import { usePanelEditorStore, usePanelStore } from '@/stores/panel'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
@@ -64,7 +64,7 @@ export function useBlockVisual({
)
const isEditorOpen = !isPreview && isThisBlockInEditor && activeTabIsEditor
const lastRunPath = useExecutionStore((state) => state.lastRunPath)
const lastRunPath = useLastRunPath()
const runPathStatus = isPreview ? undefined : lastRunPath.get(blockId)
const setCurrentBlockId = usePanelEditorStore((state) => state.setCurrentBlockId)

View File

@@ -34,7 +34,7 @@ import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { WorkflowValidationError } from '@/serializer'
import { useExecutionStore } from '@/stores/execution'
import { useCurrentWorkflowExecution, useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useVariablesStore } from '@/stores/panel'
import { useEnvironmentStore } from '@/stores/settings/environment'
@@ -112,24 +112,19 @@ export function useWorkflowExecution() {
useTerminalConsoleStore()
const { getAllVariables } = useEnvironmentStore()
const { getVariablesByWorkflowId, variables } = useVariablesStore()
const {
isExecuting,
isDebugging,
pendingBlocks,
executor,
debugContext,
setIsExecuting,
setIsDebugging,
setPendingBlocks,
setExecutor,
setDebugContext,
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
setLastExecutionSnapshot,
getLastExecutionSnapshot,
clearLastExecutionSnapshot,
} = useExecutionStore()
const { isExecuting, isDebugging, pendingBlocks, executor, debugContext } =
useCurrentWorkflowExecution()
const setIsExecuting = useExecutionStore((s) => s.setIsExecuting)
const setIsDebugging = useExecutionStore((s) => s.setIsDebugging)
const setPendingBlocks = useExecutionStore((s) => s.setPendingBlocks)
const setExecutor = useExecutionStore((s) => s.setExecutor)
const setDebugContext = useExecutionStore((s) => s.setDebugContext)
const setActiveBlocks = useExecutionStore((s) => s.setActiveBlocks)
const setBlockRunStatus = useExecutionStore((s) => s.setBlockRunStatus)
const setEdgeRunStatus = useExecutionStore((s) => s.setEdgeRunStatus)
const setLastExecutionSnapshot = useExecutionStore((s) => s.setLastExecutionSnapshot)
const getLastExecutionSnapshot = useExecutionStore((s) => s.getLastExecutionSnapshot)
const clearLastExecutionSnapshot = useExecutionStore((s) => s.clearLastExecutionSnapshot)
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
const executionStream = useExecutionStream()
const currentChatExecutionIdRef = useRef<string | null>(null)
@@ -158,13 +153,15 @@ export function useWorkflowExecution() {
* Resets all debug-related state
*/
const resetDebugState = useCallback(() => {
setIsExecuting(false)
setIsDebugging(false)
setDebugContext(null)
setExecutor(null)
setPendingBlocks([])
setActiveBlocks(new Set())
if (!activeWorkflowId) return
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setDebugContext(activeWorkflowId, null)
setExecutor(activeWorkflowId, null)
setPendingBlocks(activeWorkflowId, [])
setActiveBlocks(activeWorkflowId, new Set())
}, [
activeWorkflowId,
setIsExecuting,
setIsDebugging,
setDebugContext,
@@ -312,18 +309,20 @@ export function useWorkflowExecution() {
} = config
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
if (!workflowId) return
if (isActive) {
activeBlocksSet.add(blockId)
} else {
activeBlocksSet.delete(blockId)
}
setActiveBlocks(new Set(activeBlocksSet))
setActiveBlocks(workflowId, new Set(activeBlocksSet))
}
const markIncomingEdges = (blockId: string) => {
if (!workflowId) return
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(edge.id, 'success')
setEdgeRunStatus(workflowId, edge.id, 'success')
})
}
@@ -459,7 +458,7 @@ export function useWorkflowExecution() {
const onBlockCompleted = (data: BlockCompletedData) => {
updateActiveBlocks(data.blockId, false)
setBlockRunStatus(data.blockId, 'success')
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
@@ -489,7 +488,7 @@ export function useWorkflowExecution() {
const onBlockError = (data: BlockErrorData) => {
updateActiveBlocks(data.blockId, false)
setBlockRunStatus(data.blockId, 'error')
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
@@ -547,19 +546,20 @@ export function useWorkflowExecution() {
*/
const handleDebugSessionContinuation = useCallback(
(result: ExecutionResult) => {
if (!activeWorkflowId) return
logger.info('Debug step completed, next blocks pending', {
nextPendingBlocks: result.metadata?.pendingBlocks?.length || 0,
})
// Update debug context and pending blocks
if (result.metadata?.context) {
setDebugContext(result.metadata.context)
setDebugContext(activeWorkflowId, result.metadata.context)
}
if (result.metadata?.pendingBlocks) {
setPendingBlocks(result.metadata.pendingBlocks)
setPendingBlocks(activeWorkflowId, result.metadata.pendingBlocks)
}
},
[setDebugContext, setPendingBlocks]
[activeWorkflowId, setDebugContext, setPendingBlocks]
)
/**
@@ -663,11 +663,11 @@ export function useWorkflowExecution() {
// Reset execution result and set execution state
setExecutionResult(null)
setIsExecuting(true)
setIsExecuting(activeWorkflowId, true)
// Set debug mode only if explicitly requested
if (enableDebug) {
setIsDebugging(true)
setIsDebugging(activeWorkflowId, true)
}
// Determine if this is a chat execution
@@ -965,9 +965,9 @@ export function useWorkflowExecution() {
controller.close()
}
if (currentChatExecutionIdRef.current === executionId) {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}
}
},
@@ -989,16 +989,16 @@ export function useWorkflowExecution() {
'manual'
)
if (result && 'metadata' in result && result.metadata?.isDebugSession) {
setDebugContext(result.metadata.context || null)
setDebugContext(activeWorkflowId, result.metadata.context || null)
if (result.metadata.pendingBlocks) {
setPendingBlocks(result.metadata.pendingBlocks)
setPendingBlocks(activeWorkflowId, result.metadata.pendingBlocks)
}
} else if (result && 'success' in result) {
setExecutionResult(result)
// Reset execution state after successful non-debug execution
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
if (isChatExecution) {
if (!result.metadata) {
@@ -1179,7 +1179,7 @@ export function useWorkflowExecution() {
logger.error('No trigger blocks found for manual run', {
allBlockTypes: Object.values(filteredStates).map((b) => b.type),
})
setIsExecuting(false)
if (activeWorkflowId) setIsExecuting(activeWorkflowId, false)
throw error
}
@@ -1195,7 +1195,7 @@ export function useWorkflowExecution() {
'Workflow Validation'
)
logger.error('Multiple API triggers found')
setIsExecuting(false)
if (activeWorkflowId) setIsExecuting(activeWorkflowId, false)
throw error
}
@@ -1220,7 +1220,7 @@ export function useWorkflowExecution() {
'Workflow Validation'
)
logger.error('Trigger has no outgoing connections', { triggerName, startBlockId })
setIsExecuting(false)
if (activeWorkflowId) setIsExecuting(activeWorkflowId, false)
throw error
}
}
@@ -1251,7 +1251,7 @@ export function useWorkflowExecution() {
'Workflow Validation'
)
logger.error('No startBlockId found after trigger search')
setIsExecuting(false)
if (activeWorkflowId) setIsExecuting(activeWorkflowId, false)
throw error
}
@@ -1457,8 +1457,10 @@ export function useWorkflowExecution() {
logger.info('Execution aborted by user')
// Reset execution state
setIsExecuting(false)
setActiveBlocks(new Set())
if (activeWorkflowId) {
setIsExecuting(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}
// Return gracefully without error
return {
@@ -1533,9 +1535,11 @@ export function useWorkflowExecution() {
}
setExecutionResult(errorResult)
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
if (activeWorkflowId) {
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}
let notificationMessage = WORKFLOW_EXECUTION_FAILURE_MESSAGE
if (isRecord(error) && isRecord(error.request) && sanitizeMessage(error.request.url)) {
@@ -1706,8 +1710,8 @@ export function useWorkflowExecution() {
const handleCancelExecution = useCallback(() => {
logger.info('Workflow execution cancellation requested')
// Cancel the execution stream (server-side)
executionStream.cancel()
// Cancel the execution stream for this workflow (server-side)
executionStream.cancel(activeWorkflowId ?? undefined)
// Mark current chat execution as superseded so its cleanup won't affect new executions
currentChatExecutionIdRef.current = null
@@ -1715,12 +1719,12 @@ export function useWorkflowExecution() {
// Mark all running entries as canceled in the terminal
if (activeWorkflowId) {
cancelRunningEntries(activeWorkflowId)
}
// Reset execution state - this triggers chat stream cleanup via useEffect in chat.tsx
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
// Reset execution state - this triggers chat stream cleanup via useEffect in chat.tsx
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}
// If in debug mode, also reset debug state
if (isDebugging) {
@@ -1833,7 +1837,7 @@ export function useWorkflowExecution() {
}
}
setIsExecuting(true)
setIsExecuting(workflowId, true)
const executionId = uuidv4()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
@@ -1929,8 +1933,8 @@ export function useWorkflowExecution() {
logger.error('Run-from-block failed:', error)
}
} finally {
setIsExecuting(false)
setActiveBlocks(new Set())
setIsExecuting(workflowId, false)
setActiveBlocks(workflowId, new Set())
}
},
[
@@ -1962,7 +1966,7 @@ export function useWorkflowExecution() {
logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId })
setExecutionResult(null)
setIsExecuting(true)
setIsExecuting(workflowId, true)
const executionId = uuidv4()
try {
@@ -1981,9 +1985,9 @@ export function useWorkflowExecution() {
const errorResult = handleExecutionError(error, { executionId })
return errorResult
} finally {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
setIsExecuting(workflowId, false)
setIsDebugging(workflowId, false)
setActiveBlocks(workflowId, new Set())
}
},
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]

View File

@@ -35,6 +35,7 @@ export async function executeWorkflowWithFullLogging(
const executionId = options.executionId || uuidv4()
const { addConsole } = useTerminalConsoleStore.getState()
const { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus } = useExecutionStore.getState()
const wfId = activeWorkflowId
const workflowEdges = useWorkflowStore.getState().edges
const activeBlocksSet = new Set<string>()
@@ -103,22 +104,22 @@ export async function executeWorkflowWithFullLogging(
switch (event.type) {
case 'block:started': {
activeBlocksSet.add(event.data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setActiveBlocks(wfId, new Set(activeBlocksSet))
const incomingEdges = workflowEdges.filter(
(edge) => edge.target === event.data.blockId
)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(edge.id, 'success')
setEdgeRunStatus(wfId, edge.id, 'success')
})
break
}
case 'block:completed':
activeBlocksSet.delete(event.data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setActiveBlocks(wfId, new Set(activeBlocksSet))
setBlockRunStatus(event.data.blockId, 'success')
setBlockRunStatus(wfId, event.data.blockId, 'success')
addConsole({
input: event.data.input || {},
@@ -145,9 +146,9 @@ export async function executeWorkflowWithFullLogging(
case 'block:error':
activeBlocksSet.delete(event.data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setActiveBlocks(wfId, new Set(activeBlocksSet))
setBlockRunStatus(event.data.blockId, 'error')
setBlockRunStatus(wfId, event.data.blockId, 'error')
addConsole({
input: event.data.input || {},
@@ -192,7 +193,7 @@ export async function executeWorkflowWithFullLogging(
}
} finally {
reader.releaseLock()
setActiveBlocks(new Set())
setActiveBlocks(wfId, new Set())
}
return executionResult

View File

@@ -74,7 +74,7 @@ import { useStreamCleanup } from '@/hooks/use-stream-cleanup'
import { useCanvasModeStore } from '@/stores/canvas-mode'
import { useChatStore } from '@/stores/chat/store'
import { useCopilotTrainingStore } from '@/stores/copilot-training/store'
import { useExecutionStore } from '@/stores/execution'
import { defaultWorkflowExecutionState, useExecutionStore } from '@/stores/execution'
import { useSearchModalStore } from '@/stores/modals/search/store'
import { useNotificationStore } from '@/stores/notifications'
import { useCopilotStore, usePanelEditorStore } from '@/stores/panel'
@@ -740,16 +740,18 @@ const WorkflowContent = React.memo(() => {
[collaborativeBatchAddBlocks, setSelectedEdges, setPendingSelection]
)
const { activeBlockIds, pendingBlocks, isDebugging, isExecuting, getLastExecutionSnapshot } =
useExecutionStore(
useShallow((state) => ({
activeBlockIds: state.activeBlockIds,
pendingBlocks: state.pendingBlocks,
isDebugging: state.isDebugging,
isExecuting: state.isExecuting,
getLastExecutionSnapshot: state.getLastExecutionSnapshot,
}))
)
const { activeBlockIds, pendingBlocks, isDebugging, isExecuting } = useExecutionStore(
useShallow((state) => {
const wf = activeWorkflowId ? state.workflowExecutions.get(activeWorkflowId) : undefined
return {
activeBlockIds: wf?.activeBlockIds ?? defaultWorkflowExecutionState.activeBlockIds,
pendingBlocks: wf?.pendingBlocks ?? defaultWorkflowExecutionState.pendingBlocks,
isDebugging: wf?.isDebugging ?? false,
isExecuting: wf?.isExecuting ?? false,
}
})
)
const getLastExecutionSnapshot = useExecutionStore((s) => s.getLastExecutionSnapshot)
const [dragStartParentId, setDragStartParentId] = useState<string | null>(null)

View File

@@ -133,22 +133,26 @@ export interface ExecuteFromBlockOptions {
}
/**
* Hook for executing workflows via server-side SSE streaming
* Hook for executing workflows via server-side SSE streaming.
* Supports concurrent executions via per-workflow AbortController maps.
*/
export function useExecutionStream() {
const abortControllerRef = useRef<AbortController | null>(null)
const currentExecutionRef = useRef<{ workflowId: string; executionId: string } | null>(null)
const abortControllersRef = useRef<Map<string, AbortController>>(new Map())
const currentExecutionsRef = useRef<Map<string, { workflowId: string; executionId: string }>>(
new Map()
)
const execute = useCallback(async (options: ExecuteStreamOptions) => {
const { workflowId, callbacks = {}, ...payload } = options
if (abortControllerRef.current) {
abortControllerRef.current.abort()
const existing = abortControllersRef.current.get(workflowId)
if (existing) {
existing.abort()
}
const abortController = new AbortController()
abortControllerRef.current = abortController
currentExecutionRef.current = null
abortControllersRef.current.set(workflowId, abortController)
currentExecutionsRef.current.delete(workflowId)
try {
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
@@ -163,7 +167,6 @@ export function useExecutionStream() {
if (!response.ok) {
const errorResponse = await response.json()
const error = new Error(errorResponse.error || 'Failed to start execution')
// Attach the execution result from server response for error handling
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
@@ -176,7 +179,7 @@ export function useExecutionStream() {
const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current = { workflowId, executionId }
currentExecutionsRef.current.set(workflowId, { workflowId, executionId })
}
const reader = response.body.getReader()
@@ -194,21 +197,22 @@ export function useExecutionStream() {
}
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = null
abortControllersRef.current.delete(workflowId)
currentExecutionsRef.current.delete(workflowId)
}
}, [])
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
const { workflowId, startBlockId, sourceSnapshot, input, callbacks = {} } = options
if (abortControllerRef.current) {
abortControllerRef.current.abort()
const existing = abortControllersRef.current.get(workflowId)
if (existing) {
existing.abort()
}
const abortController = new AbortController()
abortControllerRef.current = abortController
currentExecutionRef.current = null
abortControllersRef.current.set(workflowId, abortController)
currentExecutionsRef.current.delete(workflowId)
try {
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
@@ -244,7 +248,7 @@ export function useExecutionStream() {
const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current = { workflowId, executionId }
currentExecutionsRef.current.set(workflowId, { workflowId, executionId })
}
const reader = response.body.getReader()
@@ -262,24 +266,39 @@ export function useExecutionStream() {
}
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = null
abortControllersRef.current.delete(workflowId)
currentExecutionsRef.current.delete(workflowId)
}
}, [])
const cancel = useCallback(() => {
const execution = currentExecutionRef.current
if (execution) {
fetch(`/api/workflows/${execution.workflowId}/executions/${execution.executionId}/cancel`, {
method: 'POST',
}).catch(() => {})
}
const cancel = useCallback((workflowId?: string) => {
if (workflowId) {
const execution = currentExecutionsRef.current.get(workflowId)
if (execution) {
fetch(`/api/workflows/${execution.workflowId}/executions/${execution.executionId}/cancel`, {
method: 'POST',
}).catch(() => {})
}
if (abortControllerRef.current) {
abortControllerRef.current.abort()
abortControllerRef.current = null
const controller = abortControllersRef.current.get(workflowId)
if (controller) {
controller.abort()
abortControllersRef.current.delete(workflowId)
}
currentExecutionsRef.current.delete(workflowId)
} else {
for (const [, execution] of currentExecutionsRef.current) {
fetch(`/api/workflows/${execution.workflowId}/executions/${execution.executionId}/cancel`, {
method: 'POST',
}).catch(() => {})
}
for (const [, controller] of abortControllersRef.current) {
controller.abort()
}
abortControllersRef.current.clear()
currentExecutionsRef.current.clear()
}
currentExecutionRef.current = null
}, [])
return {

View File

@@ -50,7 +50,17 @@ async function doExecuteRunTool(
toolName: string,
params: Record<string, unknown>
): Promise<void> {
const { isExecuting, setIsExecuting } = useExecutionStore.getState()
const { activeWorkflowId } = useWorkflowRegistry.getState()
if (!activeWorkflowId) {
logger.warn('[RunTool] Execution prevented: no active workflow', { toolCallId, toolName })
setToolState(toolCallId, ClientToolCallState.error)
await reportCompletion(toolCallId, false, 'No active workflow found')
return
}
const { getWorkflowExecution, setIsExecuting } = useExecutionStore.getState()
const { isExecuting } = getWorkflowExecution(activeWorkflowId)
if (isExecuting) {
logger.warn('[RunTool] Execution prevented: already executing', { toolCallId, toolName })
@@ -59,14 +69,6 @@ async function doExecuteRunTool(
return
}
const { activeWorkflowId } = useWorkflowRegistry.getState()
if (!activeWorkflowId) {
logger.warn('[RunTool] Execution prevented: no active workflow', { toolCallId, toolName })
setToolState(toolCallId, ClientToolCallState.error)
await reportCompletion(toolCallId, false, 'No active workflow found')
return
}
// Extract params for all tool types
const workflowInput = (params.workflow_input || params.input || undefined) as
| Record<string, unknown>
@@ -95,7 +97,7 @@ async function doExecuteRunTool(
return undefined
})()
setIsExecuting(true)
setIsExecuting(activeWorkflowId, true)
const executionId = uuidv4()
const executionStartTime = new Date().toISOString()
@@ -160,7 +162,7 @@ async function doExecuteRunTool(
setToolState(toolCallId, ClientToolCallState.error)
await reportCompletion(toolCallId, false, msg)
} finally {
setIsExecuting(false)
setIsExecuting(activeWorkflowId, false)
}
}

View File

@@ -1,7 +1,15 @@
export { useExecutionStore } from './store'
export {
useCurrentWorkflowExecution,
useExecutionStore,
useIsBlockActive,
useLastRunEdges,
useLastRunPath,
} from './store'
export type {
BlockRunStatus,
EdgeRunStatus,
ExecutionActions,
ExecutionState,
WorkflowExecutionState,
} from './types'
export { defaultWorkflowExecutionState } from './types'

View File

@@ -0,0 +1,472 @@
/**
* Tests for the per-workflow execution store.
*
* These tests cover:
* - Default state for unknown workflows
* - Per-workflow state isolation
* - Execution lifecycle (start/stop clears run path)
* - Block and edge run status tracking
* - Active block management
* - Debug state management
* - Execution snapshot management
* - Store reset
* - Immutability guarantees
*
* @remarks
* Most tests use `it.concurrent` with unique workflow IDs per test.
* Because the store isolates state by workflow ID, concurrent tests
* do not interfere with each other. The `reset` and `immutability`
* groups run sequentially since they affect or read global store state.
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
vi.unmock('@/stores/execution/store')
vi.unmock('@/stores/execution/types')
import { useExecutionStore } from '@/stores/execution/store'
import { defaultWorkflowExecutionState, initialState } from '@/stores/execution/types'
describe('useExecutionStore', () => {
describe('getWorkflowExecution', () => {
it.concurrent('should return default state for an unknown workflow', () => {
const state = useExecutionStore.getState().getWorkflowExecution('wf-get-default')
expect(state.isExecuting).toBe(false)
expect(state.isDebugging).toBe(false)
expect(state.activeBlockIds.size).toBe(0)
expect(state.pendingBlocks).toEqual([])
expect(state.executor).toBeNull()
expect(state.debugContext).toBeNull()
expect(state.lastRunPath.size).toBe(0)
expect(state.lastRunEdges.size).toBe(0)
})
it.concurrent(
'should return fresh collections for unknown workflows, not shared references',
() => {
const stateA = useExecutionStore.getState().getWorkflowExecution('wf-fresh-a')
const stateB = useExecutionStore.getState().getWorkflowExecution('wf-fresh-b')
expect(stateA.activeBlockIds).not.toBe(stateB.activeBlockIds)
expect(stateA.lastRunPath).not.toBe(stateB.lastRunPath)
expect(stateA.lastRunEdges).not.toBe(stateB.lastRunEdges)
expect(stateA.activeBlockIds).not.toBe(defaultWorkflowExecutionState.activeBlockIds)
}
)
it.concurrent('should return the stored state after a mutation', () => {
useExecutionStore.getState().setIsExecuting('wf-get-stored', true)
const state = useExecutionStore.getState().getWorkflowExecution('wf-get-stored')
expect(state.isExecuting).toBe(true)
})
})
describe('setIsExecuting', () => {
it.concurrent('should set isExecuting to true', () => {
useExecutionStore.getState().setIsExecuting('wf-exec-true', true)
expect(useExecutionStore.getState().getWorkflowExecution('wf-exec-true').isExecuting).toBe(
true
)
})
it.concurrent('should set isExecuting to false', () => {
useExecutionStore.getState().setIsExecuting('wf-exec-false', true)
useExecutionStore.getState().setIsExecuting('wf-exec-false', false)
expect(useExecutionStore.getState().getWorkflowExecution('wf-exec-false').isExecuting).toBe(
false
)
})
it.concurrent('should clear lastRunPath and lastRunEdges when starting execution', () => {
const wf = 'wf-exec-clears-run'
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'success')
useExecutionStore.getState().setEdgeRunStatus(wf, 'edge-1', 'success')
expect(useExecutionStore.getState().getWorkflowExecution(wf).lastRunPath.size).toBe(1)
expect(useExecutionStore.getState().getWorkflowExecution(wf).lastRunEdges.size).toBe(1)
useExecutionStore.getState().setIsExecuting(wf, true)
const state = useExecutionStore.getState().getWorkflowExecution(wf)
expect(state.lastRunPath.size).toBe(0)
expect(state.lastRunEdges.size).toBe(0)
expect(state.isExecuting).toBe(true)
})
it.concurrent('should NOT clear lastRunPath when stopping execution', () => {
const wf = 'wf-exec-stop-keeps-path'
useExecutionStore.getState().setIsExecuting(wf, true)
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'success')
useExecutionStore.getState().setIsExecuting(wf, false)
const state = useExecutionStore.getState().getWorkflowExecution(wf)
expect(state.isExecuting).toBe(false)
expect(state.lastRunPath.get('block-1')).toBe('success')
})
})
describe('setActiveBlocks', () => {
it.concurrent('should set the active block IDs', () => {
const wf = 'wf-active-set'
useExecutionStore.getState().setActiveBlocks(wf, new Set(['block-1', 'block-2']))
const state = useExecutionStore.getState().getWorkflowExecution(wf)
expect(state.activeBlockIds.has('block-1')).toBe(true)
expect(state.activeBlockIds.has('block-2')).toBe(true)
expect(state.activeBlockIds.size).toBe(2)
})
it.concurrent('should replace the previous set', () => {
const wf = 'wf-active-replace'
useExecutionStore.getState().setActiveBlocks(wf, new Set(['block-1']))
useExecutionStore.getState().setActiveBlocks(wf, new Set(['block-2']))
const state = useExecutionStore.getState().getWorkflowExecution(wf)
expect(state.activeBlockIds.has('block-1')).toBe(false)
expect(state.activeBlockIds.has('block-2')).toBe(true)
})
it.concurrent('should clear active blocks with an empty set', () => {
const wf = 'wf-active-clear'
useExecutionStore.getState().setActiveBlocks(wf, new Set(['block-1']))
useExecutionStore.getState().setActiveBlocks(wf, new Set())
expect(useExecutionStore.getState().getWorkflowExecution(wf).activeBlockIds.size).toBe(0)
})
})
describe('setPendingBlocks', () => {
it.concurrent('should set pending block IDs', () => {
const wf = 'wf-pending'
useExecutionStore.getState().setPendingBlocks(wf, ['block-1', 'block-2'])
expect(useExecutionStore.getState().getWorkflowExecution(wf).pendingBlocks).toEqual([
'block-1',
'block-2',
])
})
})
describe('setIsDebugging', () => {
it.concurrent('should toggle debug mode', () => {
const wf = 'wf-debug-toggle'
useExecutionStore.getState().setIsDebugging(wf, true)
expect(useExecutionStore.getState().getWorkflowExecution(wf).isDebugging).toBe(true)
useExecutionStore.getState().setIsDebugging(wf, false)
expect(useExecutionStore.getState().getWorkflowExecution(wf).isDebugging).toBe(false)
})
})
describe('setExecutor', () => {
it.concurrent('should store and clear executor', () => {
const wf = 'wf-executor'
const mockExecutor = { run: () => {} } as any
useExecutionStore.getState().setExecutor(wf, mockExecutor)
expect(useExecutionStore.getState().getWorkflowExecution(wf).executor).toBe(mockExecutor)
useExecutionStore.getState().setExecutor(wf, null)
expect(useExecutionStore.getState().getWorkflowExecution(wf).executor).toBeNull()
})
})
describe('setDebugContext', () => {
it.concurrent('should store and clear debug context', () => {
const wf = 'wf-debug-ctx'
const mockContext = { blockId: 'block-1' } as any
useExecutionStore.getState().setDebugContext(wf, mockContext)
expect(useExecutionStore.getState().getWorkflowExecution(wf).debugContext).toBe(mockContext)
useExecutionStore.getState().setDebugContext(wf, null)
expect(useExecutionStore.getState().getWorkflowExecution(wf).debugContext).toBeNull()
})
})
describe('setBlockRunStatus', () => {
it.concurrent('should record a success status for a block', () => {
const wf = 'wf-block-success'
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'success')
expect(useExecutionStore.getState().getWorkflowExecution(wf).lastRunPath.get('block-1')).toBe(
'success'
)
})
it.concurrent('should record an error status for a block', () => {
const wf = 'wf-block-error'
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'error')
expect(useExecutionStore.getState().getWorkflowExecution(wf).lastRunPath.get('block-1')).toBe(
'error'
)
})
it.concurrent('should accumulate statuses for multiple blocks', () => {
const wf = 'wf-block-accum'
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'success')
useExecutionStore.getState().setBlockRunStatus(wf, 'block-2', 'error')
const runPath = useExecutionStore.getState().getWorkflowExecution(wf).lastRunPath
expect(runPath.get('block-1')).toBe('success')
expect(runPath.get('block-2')).toBe('error')
expect(runPath.size).toBe(2)
})
it.concurrent('should overwrite a previous status for the same block', () => {
const wf = 'wf-block-overwrite'
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'error')
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'success')
expect(useExecutionStore.getState().getWorkflowExecution(wf).lastRunPath.get('block-1')).toBe(
'success'
)
})
})
describe('setEdgeRunStatus', () => {
it.concurrent('should record a success status for an edge', () => {
const wf = 'wf-edge-success'
useExecutionStore.getState().setEdgeRunStatus(wf, 'edge-1', 'success')
expect(useExecutionStore.getState().getWorkflowExecution(wf).lastRunEdges.get('edge-1')).toBe(
'success'
)
})
it.concurrent('should accumulate statuses for multiple edges', () => {
const wf = 'wf-edge-accum'
useExecutionStore.getState().setEdgeRunStatus(wf, 'edge-1', 'success')
useExecutionStore.getState().setEdgeRunStatus(wf, 'edge-2', 'error')
const runEdges = useExecutionStore.getState().getWorkflowExecution(wf).lastRunEdges
expect(runEdges.get('edge-1')).toBe('success')
expect(runEdges.get('edge-2')).toBe('error')
expect(runEdges.size).toBe(2)
})
})
describe('clearRunPath', () => {
it.concurrent('should clear both lastRunPath and lastRunEdges', () => {
const wf = 'wf-clear-both'
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'success')
useExecutionStore.getState().setEdgeRunStatus(wf, 'edge-1', 'success')
useExecutionStore.getState().clearRunPath(wf)
const state = useExecutionStore.getState().getWorkflowExecution(wf)
expect(state.lastRunPath.size).toBe(0)
expect(state.lastRunEdges.size).toBe(0)
})
it.concurrent('should not affect other workflow state', () => {
const wf = 'wf-clear-other'
useExecutionStore.getState().setIsExecuting(wf, true)
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'success')
useExecutionStore.getState().clearRunPath(wf)
const state = useExecutionStore.getState().getWorkflowExecution(wf)
expect(state.isExecuting).toBe(true)
expect(state.lastRunPath.size).toBe(0)
})
})
describe('per-workflow isolation', () => {
it.concurrent('should keep execution state independent between workflows', () => {
const wfA = 'wf-iso-exec-a'
const wfB = 'wf-iso-exec-b'
useExecutionStore.getState().setIsExecuting(wfA, true)
useExecutionStore.getState().setActiveBlocks(wfA, new Set(['block-a1']))
useExecutionStore.getState().setIsExecuting(wfB, false)
useExecutionStore.getState().setActiveBlocks(wfB, new Set(['block-b1', 'block-b2']))
const stateA = useExecutionStore.getState().getWorkflowExecution(wfA)
const stateB = useExecutionStore.getState().getWorkflowExecution(wfB)
expect(stateA.isExecuting).toBe(true)
expect(stateA.activeBlockIds.size).toBe(1)
expect(stateA.activeBlockIds.has('block-a1')).toBe(true)
expect(stateB.isExecuting).toBe(false)
expect(stateB.activeBlockIds.size).toBe(2)
expect(stateB.activeBlockIds.has('block-b1')).toBe(true)
})
it.concurrent('should keep run path independent between workflows', () => {
const wfA = 'wf-iso-path-a'
const wfB = 'wf-iso-path-b'
useExecutionStore.getState().setBlockRunStatus(wfA, 'block-1', 'success')
useExecutionStore.getState().setEdgeRunStatus(wfA, 'edge-1', 'success')
useExecutionStore.getState().setBlockRunStatus(wfB, 'block-1', 'error')
useExecutionStore.getState().setEdgeRunStatus(wfB, 'edge-1', 'error')
const stateA = useExecutionStore.getState().getWorkflowExecution(wfA)
const stateB = useExecutionStore.getState().getWorkflowExecution(wfB)
expect(stateA.lastRunPath.get('block-1')).toBe('success')
expect(stateA.lastRunEdges.get('edge-1')).toBe('success')
expect(stateB.lastRunPath.get('block-1')).toBe('error')
expect(stateB.lastRunEdges.get('edge-1')).toBe('error')
})
it.concurrent('should not affect workflow B when starting execution on workflow A', () => {
const wfA = 'wf-iso-start-a'
const wfB = 'wf-iso-start-b'
useExecutionStore.getState().setBlockRunStatus(wfA, 'block-1', 'success')
useExecutionStore.getState().setBlockRunStatus(wfB, 'block-1', 'success')
useExecutionStore.getState().setIsExecuting(wfA, true)
const stateA = useExecutionStore.getState().getWorkflowExecution(wfA)
const stateB = useExecutionStore.getState().getWorkflowExecution(wfB)
expect(stateA.lastRunPath.size).toBe(0)
expect(stateB.lastRunPath.get('block-1')).toBe('success')
})
it.concurrent('should not affect workflow B when clearing run path on workflow A', () => {
const wfA = 'wf-iso-clear-a'
const wfB = 'wf-iso-clear-b'
useExecutionStore.getState().setBlockRunStatus(wfA, 'block-1', 'success')
useExecutionStore.getState().setBlockRunStatus(wfB, 'block-2', 'error')
useExecutionStore.getState().clearRunPath(wfA)
expect(useExecutionStore.getState().getWorkflowExecution(wfA).lastRunPath.size).toBe(0)
expect(
useExecutionStore.getState().getWorkflowExecution(wfB).lastRunPath.get('block-2')
).toBe('error')
})
})
describe('execution snapshots', () => {
const mockSnapshot = {
blockStates: {},
blockLogs: [],
executionOrder: [],
} as any
it.concurrent('should store a snapshot', () => {
const wf = 'wf-snap-store'
useExecutionStore.getState().setLastExecutionSnapshot(wf, mockSnapshot)
expect(useExecutionStore.getState().getLastExecutionSnapshot(wf)).toBe(mockSnapshot)
})
it.concurrent('should return undefined for unknown workflows', () => {
expect(
useExecutionStore.getState().getLastExecutionSnapshot('wf-snap-unknown')
).toBeUndefined()
})
it.concurrent('should clear a snapshot', () => {
const wf = 'wf-snap-clear'
useExecutionStore.getState().setLastExecutionSnapshot(wf, mockSnapshot)
useExecutionStore.getState().clearLastExecutionSnapshot(wf)
expect(useExecutionStore.getState().getLastExecutionSnapshot(wf)).toBeUndefined()
})
it.concurrent('should keep snapshots independent between workflows', () => {
const wfA = 'wf-snap-iso-a'
const wfB = 'wf-snap-iso-b'
const snapshotB = { blockStates: { x: 1 } } as any
useExecutionStore.getState().setLastExecutionSnapshot(wfA, mockSnapshot)
useExecutionStore.getState().setLastExecutionSnapshot(wfB, snapshotB)
expect(useExecutionStore.getState().getLastExecutionSnapshot(wfA)).toBe(mockSnapshot)
expect(useExecutionStore.getState().getLastExecutionSnapshot(wfB)).toBe(snapshotB)
})
})
describe('reset', () => {
beforeEach(() => {
useExecutionStore.setState(initialState)
})
it('should clear all workflow execution state', () => {
useExecutionStore.getState().setIsExecuting('wf-reset-a', true)
useExecutionStore.getState().setBlockRunStatus('wf-reset-a', 'block-1', 'success')
useExecutionStore.getState().setLastExecutionSnapshot('wf-reset-a', {} as any)
useExecutionStore.getState().reset()
const state = useExecutionStore.getState()
expect(state.workflowExecutions.size).toBe(0)
expect(state.lastExecutionSnapshots.size).toBe(0)
})
it('should return defaults for all workflows after reset', () => {
useExecutionStore.getState().setIsExecuting('wf-reset-b', true)
useExecutionStore.getState().setIsExecuting('wf-reset-c', true)
useExecutionStore.getState().reset()
expect(useExecutionStore.getState().getWorkflowExecution('wf-reset-b').isExecuting).toBe(
false
)
expect(useExecutionStore.getState().getWorkflowExecution('wf-reset-c').isExecuting).toBe(
false
)
})
})
describe('immutability', () => {
beforeEach(() => {
useExecutionStore.setState(initialState)
})
it('should create a new workflowExecutions map on each mutation', () => {
const mapBefore = useExecutionStore.getState().workflowExecutions
useExecutionStore.getState().setIsExecuting('wf-immut-map', true)
const mapAfter = useExecutionStore.getState().workflowExecutions
expect(mapBefore).not.toBe(mapAfter)
})
it('should create a new lastRunPath map when adding block status', () => {
const wf = 'wf-immut-path'
useExecutionStore.getState().setBlockRunStatus(wf, 'block-1', 'success')
const pathBefore = useExecutionStore.getState().getWorkflowExecution(wf).lastRunPath
useExecutionStore.getState().setBlockRunStatus(wf, 'block-2', 'error')
const pathAfter = useExecutionStore.getState().getWorkflowExecution(wf).lastRunPath
expect(pathBefore).not.toBe(pathAfter)
expect(pathBefore.size).toBe(1)
expect(pathAfter.size).toBe(2)
})
it('should create a new lastRunEdges map when adding edge status', () => {
const wf = 'wf-immut-edges'
useExecutionStore.getState().setEdgeRunStatus(wf, 'edge-1', 'success')
const edgesBefore = useExecutionStore.getState().getWorkflowExecution(wf).lastRunEdges
useExecutionStore.getState().setEdgeRunStatus(wf, 'edge-2', 'error')
const edgesAfter = useExecutionStore.getState().getWorkflowExecution(wf).lastRunEdges
expect(edgesBefore).not.toBe(edgesAfter)
expect(edgesBefore.size).toBe(1)
expect(edgesAfter.size).toBe(2)
})
it.concurrent('should not mutate the default state constant', () => {
useExecutionStore.getState().setBlockRunStatus('wf-immut-const', 'block-1', 'success')
expect(defaultWorkflowExecutionState.lastRunPath.size).toBe(0)
expect(defaultWorkflowExecutionState.lastRunEdges.size).toBe(0)
expect(defaultWorkflowExecutionState.activeBlockIds.size).toBe(0)
})
})
})

View File

@@ -1,57 +1,214 @@
import { create } from 'zustand'
import { type ExecutionActions, type ExecutionState, initialState } from './types'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import {
type BlockRunStatus,
defaultWorkflowExecutionState,
type EdgeRunStatus,
type ExecutionActions,
type ExecutionState,
initialState,
type WorkflowExecutionState,
} from './types'
/**
* Returns the execution state for a workflow, creating a fresh default if absent.
*
* @remarks
* When the workflow has no entry in the map, fresh `Set` and `Map` instances
* are created so that callers never share mutable collections with
* {@link defaultWorkflowExecutionState}.
*/
function getOrCreate(
map: Map<string, WorkflowExecutionState>,
workflowId: string
): WorkflowExecutionState {
return (
map.get(workflowId) ?? {
...defaultWorkflowExecutionState,
activeBlockIds: new Set<string>(),
lastRunPath: new Map<string, BlockRunStatus>(),
lastRunEdges: new Map<string, EdgeRunStatus>(),
}
)
}
/**
* Immutably updates a single workflow's execution state within the map.
*
* Creates a shallow copy of the outer map, merges the patch into the
* target workflow's entry, and returns the new map. This ensures Zustand
* detects the top-level reference change and notifies subscribers.
*/
function updatedMap(
map: Map<string, WorkflowExecutionState>,
workflowId: string,
patch: Partial<WorkflowExecutionState>
): Map<string, WorkflowExecutionState> {
const next = new Map(map)
const current = getOrCreate(map, workflowId)
next.set(workflowId, { ...current, ...patch })
return next
}
/**
* Global Zustand store for per-workflow execution state.
*
* All execution state (running, debugging, block/edge highlights) is keyed
* by workflow ID so users can run multiple workflows concurrently, each
* with independent visual feedback.
*/
export const useExecutionStore = create<ExecutionState & ExecutionActions>()((set, get) => ({
...initialState,
setActiveBlocks: (blockIds) => {
set({ activeBlockIds: new Set(blockIds) })
getWorkflowExecution: (workflowId) => {
return getOrCreate(get().workflowExecutions, workflowId)
},
setPendingBlocks: (pendingBlocks) => {
set({ pendingBlocks })
setActiveBlocks: (workflowId, blockIds) => {
set({
workflowExecutions: updatedMap(get().workflowExecutions, workflowId, {
activeBlockIds: new Set(blockIds),
}),
})
},
setIsExecuting: (isExecuting) => {
set({ isExecuting })
setPendingBlocks: (workflowId, pendingBlocks) => {
set({
workflowExecutions: updatedMap(get().workflowExecutions, workflowId, { pendingBlocks }),
})
},
setIsExecuting: (workflowId, isExecuting) => {
const patch: Partial<WorkflowExecutionState> = { isExecuting }
if (isExecuting) {
set({ lastRunPath: new Map(), lastRunEdges: new Map() })
patch.lastRunPath = new Map()
patch.lastRunEdges = new Map()
}
set({
workflowExecutions: updatedMap(get().workflowExecutions, workflowId, patch),
})
},
setIsDebugging: (isDebugging) => set({ isDebugging }),
setExecutor: (executor) => set({ executor }),
setDebugContext: (debugContext) => set({ debugContext }),
setBlockRunStatus: (blockId, status) => {
const { lastRunPath } = get()
const newRunPath = new Map(lastRunPath)
setIsDebugging: (workflowId, isDebugging) => {
set({
workflowExecutions: updatedMap(get().workflowExecutions, workflowId, { isDebugging }),
})
},
setExecutor: (workflowId, executor) => {
set({
workflowExecutions: updatedMap(get().workflowExecutions, workflowId, { executor }),
})
},
setDebugContext: (workflowId, debugContext) => {
set({
workflowExecutions: updatedMap(get().workflowExecutions, workflowId, { debugContext }),
})
},
setBlockRunStatus: (workflowId, blockId, status) => {
const current = getOrCreate(get().workflowExecutions, workflowId)
const newRunPath = new Map(current.lastRunPath)
newRunPath.set(blockId, status)
set({ lastRunPath: newRunPath })
set({
workflowExecutions: updatedMap(get().workflowExecutions, workflowId, {
lastRunPath: newRunPath,
}),
})
},
setEdgeRunStatus: (edgeId, status) => {
const { lastRunEdges } = get()
const newRunEdges = new Map(lastRunEdges)
setEdgeRunStatus: (workflowId, edgeId, status) => {
const current = getOrCreate(get().workflowExecutions, workflowId)
const newRunEdges = new Map(current.lastRunEdges)
newRunEdges.set(edgeId, status)
set({ lastRunEdges: newRunEdges })
set({
workflowExecutions: updatedMap(get().workflowExecutions, workflowId, {
lastRunEdges: newRunEdges,
}),
})
},
clearRunPath: () => set({ lastRunPath: new Map(), lastRunEdges: new Map() }),
clearRunPath: (workflowId) => {
set({
workflowExecutions: updatedMap(get().workflowExecutions, workflowId, {
lastRunPath: new Map(),
lastRunEdges: new Map(),
}),
})
},
reset: () => set(initialState),
setLastExecutionSnapshot: (workflowId, snapshot) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
const newSnapshots = new Map(get().lastExecutionSnapshots)
newSnapshots.set(workflowId, snapshot)
set({ lastExecutionSnapshots: newSnapshots })
},
getLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
return lastExecutionSnapshots.get(workflowId)
return get().lastExecutionSnapshots.get(workflowId)
},
clearLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
const newSnapshots = new Map(get().lastExecutionSnapshots)
newSnapshots.delete(workflowId)
set({ lastExecutionSnapshots: newSnapshots })
},
}))
/**
* Convenience hook that returns the execution state for the currently active workflow.
*/
export function useCurrentWorkflowExecution(): WorkflowExecutionState {
const activeWorkflowId = useWorkflowRegistry((s) => s.activeWorkflowId)
return useExecutionStore((state) => {
if (!activeWorkflowId) return defaultWorkflowExecutionState
return state.workflowExecutions.get(activeWorkflowId) ?? defaultWorkflowExecutionState
})
}
/**
* Returns whether a specific block is currently active (executing) in the current workflow.
* More granular than useCurrentWorkflowExecution — only re-renders when
* the boolean result changes for this specific block.
*/
export function useIsBlockActive(blockId: string): boolean {
const activeWorkflowId = useWorkflowRegistry((s) => s.activeWorkflowId)
return useExecutionStore((state) => {
if (!activeWorkflowId) return false
return state.workflowExecutions.get(activeWorkflowId)?.activeBlockIds.has(blockId) ?? false
})
}
/**
* Returns the last run path (block statuses) for the current workflow.
* More granular than useCurrentWorkflowExecution — only re-renders when
* the lastRunPath map reference changes.
*/
export function useLastRunPath(): Map<string, BlockRunStatus> {
const activeWorkflowId = useWorkflowRegistry((s) => s.activeWorkflowId)
return useExecutionStore((state) => {
if (!activeWorkflowId) return defaultWorkflowExecutionState.lastRunPath
return (
state.workflowExecutions.get(activeWorkflowId)?.lastRunPath ??
defaultWorkflowExecutionState.lastRunPath
)
})
}
/**
* Returns the last run edges (edge statuses) for the current workflow.
* More granular than useCurrentWorkflowExecution — only re-renders when
* the lastRunEdges map reference changes.
*/
export function useLastRunEdges(): Map<string, EdgeRunStatus> {
const activeWorkflowId = useWorkflowRegistry((s) => s.activeWorkflowId)
return useExecutionStore((state) => {
if (!activeWorkflowId) return defaultWorkflowExecutionState.lastRunEdges
return (
state.workflowExecutions.get(activeWorkflowId)?.lastRunEdges ??
defaultWorkflowExecutionState.lastRunEdges
)
})
}

View File

@@ -12,42 +12,102 @@ export type BlockRunStatus = 'success' | 'error'
*/
export type EdgeRunStatus = 'success' | 'error'
export interface ExecutionState {
activeBlockIds: Set<string>
/**
* Execution state scoped to a single workflow.
*
* Each workflow has its own independent instance so concurrent executions
* do not interfere with one another.
*/
export interface WorkflowExecutionState {
/** Whether this workflow is currently executing */
isExecuting: boolean
/** Whether this workflow is in step-by-step debug mode */
isDebugging: boolean
/** Block IDs that are currently running (pulsing in the UI) */
activeBlockIds: Set<string>
/** Block IDs queued to execute next (used during debug stepping) */
pendingBlocks: string[]
/** The executor instance when running client-side */
executor: Executor | null
/** Debug execution context preserved across steps */
debugContext: ExecutionContext | null
/** Maps block IDs to their run result from the last execution */
lastRunPath: Map<string, BlockRunStatus>
/** Maps edge IDs to their run result from the last execution */
lastRunEdges: Map<string, EdgeRunStatus>
lastExecutionSnapshots: Map<string, SerializableExecutionState>
}
export interface ExecutionActions {
setActiveBlocks: (blockIds: Set<string>) => void
setIsExecuting: (isExecuting: boolean) => void
setIsDebugging: (isDebugging: boolean) => void
setPendingBlocks: (blockIds: string[]) => void
setExecutor: (executor: Executor | null) => void
setDebugContext: (context: ExecutionContext | null) => void
setBlockRunStatus: (blockId: string, status: BlockRunStatus) => void
setEdgeRunStatus: (edgeId: string, status: EdgeRunStatus) => void
clearRunPath: () => void
reset: () => void
setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void
getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined
clearLastExecutionSnapshot: (workflowId: string) => void
}
export const initialState: ExecutionState = {
activeBlockIds: new Set(),
/**
* Default values for a workflow that has never been executed.
*
* @remarks
* This constant is used as the fallback in selectors when no per-workflow
* entry exists. Its reference identity is stable, which prevents unnecessary
* re-renders in Zustand selectors that use `Object.is` equality.
*/
export const defaultWorkflowExecutionState: WorkflowExecutionState = {
isExecuting: false,
isDebugging: false,
activeBlockIds: new Set(),
pendingBlocks: [],
executor: null,
debugContext: null,
lastRunPath: new Map(),
lastRunEdges: new Map(),
}
/**
* Root state shape for the execution store.
*
* All execution state is keyed by workflow ID so multiple workflows
* can be executed concurrently with independent visual feedback.
*/
export interface ExecutionState {
/** Per-workflow execution state keyed by workflow ID */
workflowExecutions: Map<string, WorkflowExecutionState>
/** Serializable snapshots of the last successful execution per workflow */
lastExecutionSnapshots: Map<string, SerializableExecutionState>
}
/**
* Actions available on the execution store.
*
* Every setter takes a `workflowId` as its first argument so mutations
* are scoped to a single workflow.
*/
export interface ExecutionActions {
/** Returns the execution state for a workflow, falling back to defaults */
getWorkflowExecution: (workflowId: string) => WorkflowExecutionState
/** Replaces the set of currently-executing block IDs for a workflow */
setActiveBlocks: (workflowId: string, blockIds: Set<string>) => void
/** Marks a workflow as executing or idle. Starting clears the run path */
setIsExecuting: (workflowId: string, isExecuting: boolean) => void
/** Toggles debug mode for a workflow */
setIsDebugging: (workflowId: string, isDebugging: boolean) => void
/** Sets the list of blocks pending execution during debug stepping */
setPendingBlocks: (workflowId: string, blockIds: string[]) => void
/** Stores the executor instance for a workflow */
setExecutor: (workflowId: string, executor: Executor | null) => void
/** Stores the debug execution context for a workflow */
setDebugContext: (workflowId: string, context: ExecutionContext | null) => void
/** Records a block's run result (success/error) in the run path */
setBlockRunStatus: (workflowId: string, blockId: string, status: BlockRunStatus) => void
/** Records an edge's run result (success/error) in the run edges */
setEdgeRunStatus: (workflowId: string, edgeId: string, status: EdgeRunStatus) => void
/** Clears the run path and run edges for a workflow */
clearRunPath: (workflowId: string) => void
/** Resets the entire store to its initial empty state */
reset: () => void
/** Stores a serializable execution snapshot for a workflow */
setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void
/** Returns the stored execution snapshot for a workflow, if any */
getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined
/** Removes the stored execution snapshot for a workflow */
clearLastExecutionSnapshot: (workflowId: string) => void
}
/** Empty initial state used by the store and by {@link ExecutionActions.reset} */
export const initialState: ExecutionState = {
workflowExecutions: new Map(),
lastExecutionSnapshots: new Map(),
}

View File

@@ -208,7 +208,7 @@ export const useTerminalConsoleStore = create<ConsoleStore>()(
set((state) => ({
entries: state.entries.filter((entry) => entry.workflowId !== workflowId),
}))
useExecutionStore.getState().clearRunPath()
useExecutionStore.getState().clearRunPath(workflowId)
},
exportConsoleCSV: (workflowId: string) => {

View File

@@ -33,13 +33,39 @@ vi.mock('@/stores/terminal', () => ({
vi.mock('@/stores/execution/store', () => ({
useExecutionStore: {
getState: vi.fn().mockReturnValue({
getWorkflowExecution: vi.fn().mockReturnValue({
isExecuting: false,
isDebugging: false,
activeBlockIds: new Set(),
pendingBlocks: [],
executor: null,
debugContext: null,
lastRunPath: new Map(),
lastRunEdges: new Map(),
}),
setIsExecuting: vi.fn(),
setIsDebugging: vi.fn(),
setPendingBlocks: vi.fn(),
reset: vi.fn(),
setActiveBlocks: vi.fn(),
setBlockRunStatus: vi.fn(),
setEdgeRunStatus: vi.fn(),
clearRunPath: vi.fn(),
}),
},
useCurrentWorkflowExecution: vi.fn().mockReturnValue({
isExecuting: false,
isDebugging: false,
activeBlockIds: new Set(),
pendingBlocks: [],
executor: null,
debugContext: null,
lastRunPath: new Map(),
lastRunEdges: new Map(),
}),
useIsBlockActive: vi.fn().mockReturnValue(false),
useLastRunPath: vi.fn().mockReturnValue(new Map()),
useLastRunEdges: vi.fn().mockReturnValue(new Map()),
}))
vi.mock('@/blocks/registry', () => ({