fix(terminal): reconnect to running executions after page refresh (#3200)

* fix(terminal): reconnect to running executions after page refresh

* fix(terminal): use ExecutionEvent type instead of any in reconnection stream

* fix(execution): type event buffer with ExecutionEvent instead of Record<string, unknown>

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(execution): validate fromEventId query param in reconnection endpoint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Fix some bugs

* fix(variables): fix tag dropdown and cursor alignment in variables block (#3199)

* feat(confluence): added list space labels, delete label, delete page prop (#3201)

* updated route

* ack comments

* fix(execution): reset execution state in reconnection cleanup to unblock re-entry

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(execution): restore running entries when reconnection is interrupted by navigation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* done

* remove cast in ioredis types

* ack PR comments

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Siddharth Ganesan <siddharthganesan@gmail.com>
This commit is contained in:
Waleed
2026-02-11 19:31:29 -08:00
committed by Waleed Latif
parent d068ed6d65
commit 85284eb7c4
12 changed files with 944 additions and 170 deletions

View File

@@ -29,7 +29,7 @@ const patchBodySchema = z
description: z
.string()
.trim()
.max(500, 'Description must be 500 characters or less')
.max(2000, 'Description must be 2000 characters or less')
.nullable()
.optional(),
isActive: z.literal(true).optional(), // Set to true to activate this version

View File

@@ -12,7 +12,7 @@ import {
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
@@ -700,15 +700,27 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
let isStreamClosed = false
const eventWriter = createExecutionEventWriter(executionId)
setExecutionMeta(executionId, {
status: 'active',
userId: actorUserId,
workflowId,
}).catch(() => {})
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed) return
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
isStreamClosed = true
const sendEvent = (event: ExecutionEvent) => {
if (!isStreamClosed) {
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
isStreamClosed = true
}
}
if (event.type !== 'stream:chunk' && event.type !== 'stream:done') {
eventWriter.write(event).catch(() => {})
}
}
@@ -829,14 +841,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
let chunkCount = 0
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
chunkCount++
const chunk = decoder.decode(value, { stream: true })
sendEvent({
type: 'stream:chunk',
@@ -951,6 +961,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
duration: result.metadata?.duration || 0,
},
})
finalMetaStatus = 'error'
} else {
logger.info(`[${requestId}] Workflow execution was cancelled`)
@@ -963,6 +974,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
duration: result.metadata?.duration || 0,
},
})
finalMetaStatus = 'cancelled'
}
return
}
@@ -986,6 +998,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
finalMetaStatus = 'complete'
} catch (error: unknown) {
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()
const errorMessage = isTimeout
@@ -1017,7 +1030,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
duration: executionResult?.metadata?.duration || 0,
},
})
finalMetaStatus = 'error'
} finally {
try {
await eventWriter.close()
} catch (closeError) {
logger.warn(`[${requestId}] Failed to close event writer`, {
error: closeError instanceof Error ? closeError.message : String(closeError),
})
}
if (finalMetaStatus) {
setExecutionMeta(executionId, { status: finalMetaStatus }).catch(() => {})
}
timeoutController.cleanup()
if (executionId) {
await cleanupExecutionBase64Cache(executionId)
@@ -1032,10 +1056,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
cancel() {
isStreamClosed = true
timeoutController.cleanup()
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
timeoutController.abort()
markExecutionCancelled(executionId).catch(() => {})
logger.info(`[${requestId}] Client disconnected from SSE stream`)
},
})

View File

@@ -0,0 +1,170 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import {
type ExecutionStreamStatus,
getExecutionMeta,
readExecutionEvents,
} from '@/lib/execution/event-buffer'
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
const logger = createLogger('ExecutionStreamReconnectAPI')
const POLL_INTERVAL_MS = 500
const MAX_POLL_DURATION_MS = 10 * 60 * 1000 // 10 minutes
function isTerminalStatus(status: ExecutionStreamStatus): boolean {
return status === 'complete' || status === 'error' || status === 'cancelled'
}
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export async function GET(
req: NextRequest,
{ params }: { params: Promise<{ id: string; executionId: string }> }
) {
const { id: workflowId, executionId } = await params
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}
const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId: auth.userId,
action: 'read',
})
if (!workflowAuthorization.allowed) {
return NextResponse.json(
{ error: workflowAuthorization.message || 'Access denied' },
{ status: workflowAuthorization.status }
)
}
const meta = await getExecutionMeta(executionId)
if (!meta) {
return NextResponse.json({ error: 'Execution buffer not found or expired' }, { status: 404 })
}
if (meta.workflowId && meta.workflowId !== workflowId) {
return NextResponse.json(
{ error: 'Execution does not belong to this workflow' },
{ status: 403 }
)
}
const fromParam = req.nextUrl.searchParams.get('from')
const parsed = fromParam ? Number.parseInt(fromParam, 10) : 0
const fromEventId = Number.isFinite(parsed) && parsed >= 0 ? parsed : 0
logger.info('Reconnection stream requested', {
workflowId,
executionId,
fromEventId,
metaStatus: meta.status,
})
const encoder = new TextEncoder()
let closed = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
let lastEventId = fromEventId
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS
const enqueue = (text: string) => {
if (closed) return
try {
controller.enqueue(encoder.encode(text))
} catch {
closed = true
}
}
try {
const events = await readExecutionEvents(executionId, lastEventId)
for (const entry of events) {
if (closed) return
enqueue(formatSSEEvent(entry.event))
lastEventId = entry.eventId
}
const currentMeta = await getExecutionMeta(executionId)
if (!currentMeta || isTerminalStatus(currentMeta.status)) {
enqueue('data: [DONE]\n\n')
if (!closed) controller.close()
return
}
while (!closed && Date.now() < pollDeadline) {
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
if (closed) return
const newEvents = await readExecutionEvents(executionId, lastEventId)
for (const entry of newEvents) {
if (closed) return
enqueue(formatSSEEvent(entry.event))
lastEventId = entry.eventId
}
const polledMeta = await getExecutionMeta(executionId)
if (!polledMeta || isTerminalStatus(polledMeta.status)) {
const finalEvents = await readExecutionEvents(executionId, lastEventId)
for (const entry of finalEvents) {
if (closed) return
enqueue(formatSSEEvent(entry.event))
lastEventId = entry.eventId
}
enqueue('data: [DONE]\n\n')
if (!closed) controller.close()
return
}
}
if (!closed) {
logger.warn('Reconnection stream poll deadline reached', { executionId })
enqueue('data: [DONE]\n\n')
controller.close()
}
} catch (error) {
logger.error('Error in reconnection stream', {
executionId,
error: error instanceof Error ? error.message : String(error),
})
if (!closed) {
try {
controller.close()
} catch {}
}
}
},
cancel() {
closed = true
logger.info('Client disconnected from reconnection stream', { executionId })
},
})
return new NextResponse(stream, {
headers: {
...SSE_HEADERS,
'X-Execution-Id': executionId,
},
})
} catch (error: any) {
logger.error('Failed to start reconnection stream', {
workflowId,
executionId,
error: error.message,
})
return NextResponse.json(
{ error: error.message || 'Failed to start reconnection stream' },
{ status: 500 }
)
}
}

View File

@@ -113,7 +113,7 @@ export function VersionDescriptionModal({
className='min-h-[120px] resize-none'
value={description}
onChange={(e) => setDescription(e.target.value)}
maxLength={500}
maxLength={2000}
disabled={isGenerating}
/>
<div className='flex items-center justify-between'>
@@ -123,7 +123,7 @@ export function VersionDescriptionModal({
</p>
)}
{!updateMutation.error && !generateMutation.error && <div />}
<p className='text-[11px] text-[var(--text-tertiary)]'>{description.length}/500</p>
<p className='text-[11px] text-[var(--text-tertiary)]'>{description.length}/2000</p>
</div>
</ModalBody>
<ModalFooter>

View File

@@ -1,4 +1,4 @@
import { useCallback, useRef, useState } from 'react'
import { useCallback, useEffect, useRef, useState } from 'react'
import { createLogger } from '@sim/logger'
import { useQueryClient } from '@tanstack/react-query'
import { v4 as uuidv4 } from 'uuid'
@@ -46,7 +46,13 @@ import { useWorkflowStore } from '@/stores/workflows/workflow/store'
const logger = createLogger('useWorkflowExecution')
// Debug state validation result
/**
* Module-level Set tracking which workflows have an active reconnection effect.
* Prevents multiple hook instances (from different components) from starting
* concurrent reconnection streams for the same workflow during the same mount cycle.
*/
const activeReconnections = new Set<string>()
interface DebugValidationResult {
isValid: boolean
error?: string
@@ -54,7 +60,7 @@ interface DebugValidationResult {
interface BlockEventHandlerConfig {
workflowId?: string
executionId?: string
executionIdRef: { current: string }
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
activeBlocksSet: Set<string>
accumulatedBlockLogs: BlockLog[]
@@ -108,12 +114,15 @@ export function useWorkflowExecution() {
const queryClient = useQueryClient()
const currentWorkflow = useCurrentWorkflow()
const { activeWorkflowId, workflows } = useWorkflowRegistry()
const { toggleConsole, addConsole, updateConsole, cancelRunningEntries } =
const { toggleConsole, addConsole, updateConsole, cancelRunningEntries, clearExecutionEntries } =
useTerminalConsoleStore()
const hasHydrated = useTerminalConsoleStore((s) => s._hasHydrated)
const { getAllVariables } = useEnvironmentStore()
const { getVariablesByWorkflowId, variables } = useVariablesStore()
const { isExecuting, isDebugging, pendingBlocks, executor, debugContext } =
useCurrentWorkflowExecution()
const setCurrentExecutionId = useExecutionStore((s) => s.setCurrentExecutionId)
const getCurrentExecutionId = useExecutionStore((s) => s.getCurrentExecutionId)
const setIsExecuting = useExecutionStore((s) => s.setIsExecuting)
const setIsDebugging = useExecutionStore((s) => s.setIsDebugging)
const setPendingBlocks = useExecutionStore((s) => s.setPendingBlocks)
@@ -297,7 +306,7 @@ export function useWorkflowExecution() {
(config: BlockEventHandlerConfig) => {
const {
workflowId,
executionId,
executionIdRef,
workflowEdges,
activeBlocksSet,
accumulatedBlockLogs,
@@ -308,6 +317,14 @@ export function useWorkflowExecution() {
onBlockCompleteCallback,
} = config
/** Returns true if this execution was cancelled or superseded by another run. */
const isStaleExecution = () =>
!!(
workflowId &&
executionIdRef.current &&
useExecutionStore.getState().getCurrentExecutionId(workflowId) !== executionIdRef.current
)
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
if (!workflowId) return
if (isActive) {
@@ -360,7 +377,7 @@ export function useWorkflowExecution() {
endedAt: data.endedAt,
workflowId,
blockId: data.blockId,
executionId,
executionId: executionIdRef.current,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
iterationCurrent: data.iterationCurrent,
@@ -383,7 +400,7 @@ export function useWorkflowExecution() {
endedAt: data.endedAt,
workflowId,
blockId: data.blockId,
executionId,
executionId: executionIdRef.current,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
iterationCurrent: data.iterationCurrent,
@@ -410,7 +427,7 @@ export function useWorkflowExecution() {
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
},
executionId
executionIdRef.current
)
}
@@ -432,11 +449,12 @@ export function useWorkflowExecution() {
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
},
executionId
executionIdRef.current
)
}
const onBlockStarted = (data: BlockStartedData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, true)
markIncomingEdges(data.blockId)
@@ -453,7 +471,7 @@ export function useWorkflowExecution() {
endedAt: undefined,
workflowId,
blockId: data.blockId,
executionId,
executionId: executionIdRef.current,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
isRunning: true,
@@ -465,6 +483,7 @@ export function useWorkflowExecution() {
}
const onBlockCompleted = (data: BlockCompletedData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
@@ -495,6 +514,7 @@ export function useWorkflowExecution() {
}
const onBlockError = (data: BlockErrorData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
@@ -902,10 +922,6 @@ export function useWorkflowExecution() {
// Update block logs with actual stream completion times
if (result.logs && streamCompletionTimes.size > 0) {
const streamCompletionEndTime = new Date(
Math.max(...Array.from(streamCompletionTimes.values()))
).toISOString()
result.logs.forEach((log: BlockLog) => {
if (streamCompletionTimes.has(log.blockId)) {
const completionTime = streamCompletionTimes.get(log.blockId)!
@@ -987,7 +1003,6 @@ export function useWorkflowExecution() {
return { success: true, stream }
}
// For manual (non-chat) execution
const manualExecutionId = uuidv4()
try {
const result = await executeWorkflow(
@@ -1002,29 +1017,10 @@ export function useWorkflowExecution() {
if (result.metadata.pendingBlocks) {
setPendingBlocks(activeWorkflowId, result.metadata.pendingBlocks)
}
} else if (result && 'success' in result) {
setExecutionResult(result)
// Reset execution state after successful non-debug execution
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
if (isChatExecution) {
if (!result.metadata) {
result.metadata = { duration: 0, startTime: new Date().toISOString() }
}
;(result.metadata as any).source = 'chat'
}
// Invalidate subscription queries to update usage
setTimeout(() => {
queryClient.invalidateQueries({ queryKey: subscriptionKeys.all })
}, 1000)
}
return result
} catch (error: any) {
const errorResult = handleExecutionError(error, { executionId: manualExecutionId })
// Note: Error logs are already persisted server-side via execution-core.ts
return errorResult
}
},
@@ -1275,7 +1271,7 @@ export function useWorkflowExecution() {
if (activeWorkflowId) {
logger.info('Using server-side executor')
const executionId = uuidv4()
const executionIdRef = { current: '' }
let executionResult: ExecutionResult = {
success: false,
@@ -1293,7 +1289,7 @@ export function useWorkflowExecution() {
try {
const blockHandlers = buildBlockEventHandlers({
workflowId: activeWorkflowId,
executionId,
executionIdRef,
workflowEdges,
activeBlocksSet,
accumulatedBlockLogs,
@@ -1326,6 +1322,10 @@ export function useWorkflowExecution() {
loops: clientWorkflowState.loops,
parallels: clientWorkflowState.parallels,
},
onExecutionId: (id) => {
executionIdRef.current = id
setCurrentExecutionId(activeWorkflowId, id)
},
callbacks: {
onExecutionStarted: (data) => {
logger.info('Server execution started:', data)
@@ -1368,6 +1368,18 @@ export function useWorkflowExecution() {
},
onExecutionCompleted: (data) => {
if (
activeWorkflowId &&
executionIdRef.current &&
useExecutionStore.getState().getCurrentExecutionId(activeWorkflowId) !==
executionIdRef.current
)
return
if (activeWorkflowId) {
setCurrentExecutionId(activeWorkflowId, null)
}
executionResult = {
success: data.success,
output: data.output,
@@ -1425,9 +1437,33 @@ export function useWorkflowExecution() {
})
}
}
const workflowExecState = activeWorkflowId
? useExecutionStore.getState().getWorkflowExecution(activeWorkflowId)
: null
if (activeWorkflowId && !workflowExecState?.isDebugging) {
setExecutionResult(executionResult)
setIsExecuting(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
setTimeout(() => {
queryClient.invalidateQueries({ queryKey: subscriptionKeys.all })
}, 1000)
}
},
onExecutionError: (data) => {
if (
activeWorkflowId &&
executionIdRef.current &&
useExecutionStore.getState().getCurrentExecutionId(activeWorkflowId) !==
executionIdRef.current
)
return
if (activeWorkflowId) {
setCurrentExecutionId(activeWorkflowId, null)
}
executionResult = {
success: false,
output: {},
@@ -1441,43 +1477,53 @@ export function useWorkflowExecution() {
const isPreExecutionError = accumulatedBlockLogs.length === 0
handleExecutionErrorConsole({
workflowId: activeWorkflowId,
executionId,
executionId: executionIdRef.current,
error: data.error,
durationMs: data.duration,
blockLogs: accumulatedBlockLogs,
isPreExecutionError,
})
if (activeWorkflowId) {
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}
},
onExecutionCancelled: (data) => {
if (
activeWorkflowId &&
executionIdRef.current &&
useExecutionStore.getState().getCurrentExecutionId(activeWorkflowId) !==
executionIdRef.current
)
return
if (activeWorkflowId) {
setCurrentExecutionId(activeWorkflowId, null)
}
handleExecutionCancelledConsole({
workflowId: activeWorkflowId,
executionId,
executionId: executionIdRef.current,
durationMs: data?.duration,
})
if (activeWorkflowId) {
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}
},
},
})
return executionResult
} catch (error: any) {
// Don't log abort errors - they're intentional user actions
if (error.name === 'AbortError' || error.message?.includes('aborted')) {
logger.info('Execution aborted by user')
// Reset execution state
if (activeWorkflowId) {
setIsExecuting(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}
// Return gracefully without error
return {
success: false,
output: {},
metadata: { duration: 0 },
logs: [],
}
return executionResult
}
logger.error('Server-side execution failed:', error)
@@ -1485,7 +1531,6 @@ export function useWorkflowExecution() {
}
}
// Fallback: should never reach here
throw new Error('Server-side execution is required')
}
@@ -1717,25 +1762,28 @@ export function useWorkflowExecution() {
* Handles cancelling the current workflow execution
*/
const handleCancelExecution = useCallback(() => {
if (!activeWorkflowId) return
logger.info('Workflow execution cancellation requested')
// Cancel the execution stream for this workflow (server-side)
executionStream.cancel(activeWorkflowId ?? undefined)
const storedExecutionId = getCurrentExecutionId(activeWorkflowId)
// Mark current chat execution as superseded so its cleanup won't affect new executions
currentChatExecutionIdRef.current = null
// Mark all running entries as canceled in the terminal
if (activeWorkflowId) {
cancelRunningEntries(activeWorkflowId)
// Reset execution state - this triggers chat stream cleanup via useEffect in chat.tsx
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
if (storedExecutionId) {
setCurrentExecutionId(activeWorkflowId, null)
fetch(`/api/workflows/${activeWorkflowId}/executions/${storedExecutionId}/cancel`, {
method: 'POST',
}).catch(() => {})
handleExecutionCancelledConsole({
workflowId: activeWorkflowId,
executionId: storedExecutionId,
})
}
// If in debug mode, also reset debug state
executionStream.cancel(activeWorkflowId)
currentChatExecutionIdRef.current = null
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
if (isDebugging) {
resetDebugState()
}
@@ -1747,7 +1795,9 @@ export function useWorkflowExecution() {
setIsDebugging,
setActiveBlocks,
activeWorkflowId,
cancelRunningEntries,
getCurrentExecutionId,
setCurrentExecutionId,
handleExecutionCancelledConsole,
])
/**
@@ -1847,7 +1897,7 @@ export function useWorkflowExecution() {
}
setIsExecuting(workflowId, true)
const executionId = uuidv4()
const executionIdRef = { current: '' }
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
@@ -1856,7 +1906,7 @@ export function useWorkflowExecution() {
try {
const blockHandlers = buildBlockEventHandlers({
workflowId,
executionId,
executionIdRef,
workflowEdges,
activeBlocksSet,
accumulatedBlockLogs,
@@ -1871,6 +1921,10 @@ export function useWorkflowExecution() {
startBlockId: blockId,
sourceSnapshot: effectiveSnapshot,
input: workflowInput,
onExecutionId: (id) => {
executionIdRef.current = id
setCurrentExecutionId(workflowId, id)
},
callbacks: {
onBlockStarted: blockHandlers.onBlockStarted,
onBlockCompleted: blockHandlers.onBlockCompleted,
@@ -1878,7 +1932,6 @@ export function useWorkflowExecution() {
onExecutionCompleted: (data) => {
if (data.success) {
// Add the start block (trigger) to executed blocks
executedBlockIds.add(blockId)
const mergedBlockStates: Record<string, BlockState> = {
@@ -1902,6 +1955,10 @@ export function useWorkflowExecution() {
}
setLastExecutionSnapshot(workflowId, updatedSnapshot)
}
setCurrentExecutionId(workflowId, null)
setIsExecuting(workflowId, false)
setActiveBlocks(workflowId, new Set())
},
onExecutionError: (data) => {
@@ -1921,19 +1978,27 @@ export function useWorkflowExecution() {
handleExecutionErrorConsole({
workflowId,
executionId,
executionId: executionIdRef.current,
error: data.error,
durationMs: data.duration,
blockLogs: accumulatedBlockLogs,
})
setCurrentExecutionId(workflowId, null)
setIsExecuting(workflowId, false)
setActiveBlocks(workflowId, new Set())
},
onExecutionCancelled: (data) => {
handleExecutionCancelledConsole({
workflowId,
executionId,
executionId: executionIdRef.current,
durationMs: data?.duration,
})
setCurrentExecutionId(workflowId, null)
setIsExecuting(workflowId, false)
setActiveBlocks(workflowId, new Set())
},
},
})
@@ -1942,14 +2007,20 @@ export function useWorkflowExecution() {
logger.error('Run-from-block failed:', error)
}
} finally {
setIsExecuting(workflowId, false)
setActiveBlocks(workflowId, new Set())
const currentId = getCurrentExecutionId(workflowId)
if (currentId === null || currentId === executionIdRef.current) {
setCurrentExecutionId(workflowId, null)
setIsExecuting(workflowId, false)
setActiveBlocks(workflowId, new Set())
}
}
},
[
getLastExecutionSnapshot,
setLastExecutionSnapshot,
clearLastExecutionSnapshot,
getCurrentExecutionId,
setCurrentExecutionId,
setIsExecuting,
setActiveBlocks,
setBlockRunStatus,
@@ -1979,29 +2050,213 @@ export function useWorkflowExecution() {
const executionId = uuidv4()
try {
const result = await executeWorkflow(
undefined,
undefined,
executionId,
undefined,
'manual',
blockId
)
if (result && 'success' in result) {
setExecutionResult(result)
}
await executeWorkflow(undefined, undefined, executionId, undefined, 'manual', blockId)
} catch (error) {
const errorResult = handleExecutionError(error, { executionId })
return errorResult
} finally {
setCurrentExecutionId(workflowId, null)
setIsExecuting(workflowId, false)
setIsDebugging(workflowId, false)
setActiveBlocks(workflowId, new Set())
}
},
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]
[
activeWorkflowId,
setCurrentExecutionId,
setExecutionResult,
setIsExecuting,
setIsDebugging,
setActiveBlocks,
]
)
useEffect(() => {
if (!activeWorkflowId || !hasHydrated) return
const entries = useTerminalConsoleStore.getState().entries
const runningEntries = entries.filter(
(e) => e.isRunning && e.workflowId === activeWorkflowId && e.executionId
)
if (runningEntries.length === 0) return
if (activeReconnections.has(activeWorkflowId)) return
activeReconnections.add(activeWorkflowId)
executionStream.cancel(activeWorkflowId)
const sorted = [...runningEntries].sort((a, b) => {
const aTime = a.startedAt ? new Date(a.startedAt).getTime() : 0
const bTime = b.startedAt ? new Date(b.startedAt).getTime() : 0
return bTime - aTime
})
const executionId = sorted[0].executionId!
const otherExecutionIds = new Set(
sorted.filter((e) => e.executionId !== executionId).map((e) => e.executionId!)
)
if (otherExecutionIds.size > 0) {
cancelRunningEntries(activeWorkflowId)
}
setCurrentExecutionId(activeWorkflowId, executionId)
setIsExecuting(activeWorkflowId, true)
const workflowEdges = useWorkflowStore.getState().edges
const activeBlocksSet = new Set<string>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
const executionIdRef = { current: executionId }
const handlers = buildBlockEventHandlers({
workflowId: activeWorkflowId,
executionIdRef,
workflowEdges,
activeBlocksSet,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
consoleMode: 'update',
includeStartConsoleEntry: true,
})
const originalEntries = entries
.filter((e) => e.executionId === executionId)
.map((e) => ({ ...e }))
let cleared = false
let reconnectionComplete = false
let cleanupRan = false
const clearOnce = () => {
if (!cleared) {
cleared = true
clearExecutionEntries(executionId)
}
}
const reconnectWorkflowId = activeWorkflowId
executionStream
.reconnect({
workflowId: reconnectWorkflowId,
executionId,
callbacks: {
onBlockStarted: (data) => {
clearOnce()
handlers.onBlockStarted(data)
},
onBlockCompleted: (data) => {
clearOnce()
handlers.onBlockCompleted(data)
},
onBlockError: (data) => {
clearOnce()
handlers.onBlockError(data)
},
onExecutionCompleted: () => {
const currentId = useExecutionStore
.getState()
.getCurrentExecutionId(reconnectWorkflowId)
if (currentId !== executionId) {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
return
}
clearOnce()
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
},
onExecutionError: (data) => {
const currentId = useExecutionStore
.getState()
.getCurrentExecutionId(reconnectWorkflowId)
if (currentId !== executionId) {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
return
}
clearOnce()
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
handleExecutionErrorConsole({
workflowId: reconnectWorkflowId,
executionId,
error: data.error,
blockLogs: accumulatedBlockLogs,
})
},
onExecutionCancelled: () => {
const currentId = useExecutionStore
.getState()
.getCurrentExecutionId(reconnectWorkflowId)
if (currentId !== executionId) {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
return
}
clearOnce()
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
handleExecutionCancelledConsole({
workflowId: reconnectWorkflowId,
executionId,
})
},
},
})
.catch((error) => {
logger.warn('Execution reconnection failed', { executionId, error })
})
.finally(() => {
if (reconnectionComplete || cleanupRan) return
const currentId = useExecutionStore.getState().getCurrentExecutionId(reconnectWorkflowId)
if (currentId !== executionId) return
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
clearExecutionEntries(executionId)
for (const entry of originalEntries) {
addConsole({
workflowId: entry.workflowId,
blockId: entry.blockId,
blockName: entry.blockName,
blockType: entry.blockType,
executionId: entry.executionId,
executionOrder: entry.executionOrder,
isRunning: false,
warning: 'Execution result unavailable — check the logs page',
})
}
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
})
return () => {
cleanupRan = true
executionStream.cancel(reconnectWorkflowId)
activeReconnections.delete(reconnectWorkflowId)
if (cleared && !reconnectionComplete) {
clearExecutionEntries(executionId)
for (const entry of originalEntries) {
addConsole(entry)
}
}
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [activeWorkflowId, hasHydrated])
return {
isExecuting,
isDebugging,