From 85284eb7c43b1ff45cfef29a4a3a2f2fa02417f1 Mon Sep 17 00:00:00 2001 From: Waleed Date: Wed, 11 Feb 2026 19:31:29 -0800 Subject: [PATCH] fix(terminal): reconnect to running executions after page refresh (#3200) * fix(terminal): reconnect to running executions after page refresh * fix(terminal): use ExecutionEvent type instead of any in reconnection stream * fix(execution): type event buffer with ExecutionEvent instead of Record Co-Authored-By: Claude Opus 4.6 * fix(execution): validate fromEventId query param in reconnection endpoint Co-Authored-By: Claude Opus 4.6 * Fix some bugs * fix(variables): fix tag dropdown and cursor alignment in variables block (#3199) * feat(confluence): added list space labels, delete label, delete page prop (#3201) * updated route * ack comments * fix(execution): reset execution state in reconnection cleanup to unblock re-entry Co-Authored-By: Claude Opus 4.6 * fix(execution): restore running entries when reconnection is interrupted by navigation Co-Authored-By: Claude Opus 4.6 * done * remove cast in ioredis types * ack PR comments --------- Co-authored-by: Claude Opus 4.6 Co-authored-by: Siddharth Ganesan --- .../[id]/deployments/[version]/route.ts | 2 +- .../app/api/workflows/[id]/execute/route.ts | 47 +- .../executions/[executionId]/stream/route.ts | 170 +++++++ .../components/version-description-modal.tsx | 4 +- .../hooks/use-workflow-execution.ts | 431 ++++++++++++++---- apps/sim/hooks/queries/deployments.ts | 2 +- apps/sim/hooks/use-execution-stream.ts | 167 ++++--- apps/sim/lib/execution/event-buffer.ts | 246 ++++++++++ apps/sim/stores/execution/store.ts | 12 + apps/sim/stores/execution/types.ts | 7 + apps/sim/stores/terminal/console/store.ts | 25 +- apps/sim/stores/terminal/console/types.ts | 1 + 12 files changed, 944 insertions(+), 170 deletions(-) create mode 100644 apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts create mode 100644 apps/sim/lib/execution/event-buffer.ts diff --git a/apps/sim/app/api/workflows/[id]/deployments/[version]/route.ts b/apps/sim/app/api/workflows/[id]/deployments/[version]/route.ts index 74194eba6..3af21e758 100644 --- a/apps/sim/app/api/workflows/[id]/deployments/[version]/route.ts +++ b/apps/sim/app/api/workflows/[id]/deployments/[version]/route.ts @@ -29,7 +29,7 @@ const patchBodySchema = z description: z .string() .trim() - .max(500, 'Description must be 500 characters or less') + .max(2000, 'Description must be 2000 characters or less') .nullable() .optional(), isActive: z.literal(true).optional(), // Set to true to activate this version diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 13fc0ff41..b6ed6bd8b 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -12,7 +12,7 @@ import { import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getBaseUrl } from '@/lib/core/utils/urls' -import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer' import { processInputFileFields } from '@/lib/execution/files' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' @@ -700,15 +700,27 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync) let isStreamClosed = false + const eventWriter = createExecutionEventWriter(executionId) + setExecutionMeta(executionId, { + status: 'active', + userId: actorUserId, + workflowId, + }).catch(() => {}) + const stream = new ReadableStream({ async start(controller) { - const sendEvent = (event: ExecutionEvent) => { - if (isStreamClosed) return + let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null - try { - controller.enqueue(encodeSSEEvent(event)) - } catch { - isStreamClosed = true + const sendEvent = (event: ExecutionEvent) => { + if (!isStreamClosed) { + try { + controller.enqueue(encodeSSEEvent(event)) + } catch { + isStreamClosed = true + } + } + if (event.type !== 'stream:chunk' && event.type !== 'stream:done') { + eventWriter.write(event).catch(() => {}) } } @@ -829,14 +841,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const reader = streamingExec.stream.getReader() const decoder = new TextDecoder() - let chunkCount = 0 try { while (true) { const { done, value } = await reader.read() if (done) break - chunkCount++ const chunk = decoder.decode(value, { stream: true }) sendEvent({ type: 'stream:chunk', @@ -951,6 +961,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: duration: result.metadata?.duration || 0, }, }) + finalMetaStatus = 'error' } else { logger.info(`[${requestId}] Workflow execution was cancelled`) @@ -963,6 +974,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: duration: result.metadata?.duration || 0, }, }) + finalMetaStatus = 'cancelled' } return } @@ -986,6 +998,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: endTime: result.metadata?.endTime || new Date().toISOString(), }, }) + finalMetaStatus = 'complete' } catch (error: unknown) { const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut() const errorMessage = isTimeout @@ -1017,7 +1030,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: duration: executionResult?.metadata?.duration || 0, }, }) + finalMetaStatus = 'error' } finally { + try { + await eventWriter.close() + } catch (closeError) { + logger.warn(`[${requestId}] Failed to close event writer`, { + error: closeError instanceof Error ? closeError.message : String(closeError), + }) + } + if (finalMetaStatus) { + setExecutionMeta(executionId, { status: finalMetaStatus }).catch(() => {}) + } timeoutController.cleanup() if (executionId) { await cleanupExecutionBase64Cache(executionId) @@ -1032,10 +1056,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, cancel() { isStreamClosed = true - timeoutController.cleanup() - logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`) - timeoutController.abort() - markExecutionCancelled(executionId).catch(() => {}) + logger.info(`[${requestId}] Client disconnected from SSE stream`) }, }) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts new file mode 100644 index 000000000..1f77ff391 --- /dev/null +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts @@ -0,0 +1,170 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { + type ExecutionStreamStatus, + getExecutionMeta, + readExecutionEvents, +} from '@/lib/execution/event-buffer' +import { formatSSEEvent } from '@/lib/workflows/executor/execution-events' +import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' + +const logger = createLogger('ExecutionStreamReconnectAPI') + +const POLL_INTERVAL_MS = 500 +const MAX_POLL_DURATION_MS = 10 * 60 * 1000 // 10 minutes + +function isTerminalStatus(status: ExecutionStreamStatus): boolean { + return status === 'complete' || status === 'error' || status === 'cancelled' +} + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +export async function GET( + req: NextRequest, + { params }: { params: Promise<{ id: string; executionId: string }> } +) { + const { id: workflowId, executionId } = await params + + try { + const auth = await checkHybridAuth(req, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 }) + } + + const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({ + workflowId, + userId: auth.userId, + action: 'read', + }) + if (!workflowAuthorization.allowed) { + return NextResponse.json( + { error: workflowAuthorization.message || 'Access denied' }, + { status: workflowAuthorization.status } + ) + } + + const meta = await getExecutionMeta(executionId) + if (!meta) { + return NextResponse.json({ error: 'Execution buffer not found or expired' }, { status: 404 }) + } + + if (meta.workflowId && meta.workflowId !== workflowId) { + return NextResponse.json( + { error: 'Execution does not belong to this workflow' }, + { status: 403 } + ) + } + + const fromParam = req.nextUrl.searchParams.get('from') + const parsed = fromParam ? Number.parseInt(fromParam, 10) : 0 + const fromEventId = Number.isFinite(parsed) && parsed >= 0 ? parsed : 0 + + logger.info('Reconnection stream requested', { + workflowId, + executionId, + fromEventId, + metaStatus: meta.status, + }) + + const encoder = new TextEncoder() + + let closed = false + + const stream = new ReadableStream({ + async start(controller) { + let lastEventId = fromEventId + const pollDeadline = Date.now() + MAX_POLL_DURATION_MS + + const enqueue = (text: string) => { + if (closed) return + try { + controller.enqueue(encoder.encode(text)) + } catch { + closed = true + } + } + + try { + const events = await readExecutionEvents(executionId, lastEventId) + for (const entry of events) { + if (closed) return + enqueue(formatSSEEvent(entry.event)) + lastEventId = entry.eventId + } + + const currentMeta = await getExecutionMeta(executionId) + if (!currentMeta || isTerminalStatus(currentMeta.status)) { + enqueue('data: [DONE]\n\n') + if (!closed) controller.close() + return + } + + while (!closed && Date.now() < pollDeadline) { + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)) + if (closed) return + + const newEvents = await readExecutionEvents(executionId, lastEventId) + for (const entry of newEvents) { + if (closed) return + enqueue(formatSSEEvent(entry.event)) + lastEventId = entry.eventId + } + + const polledMeta = await getExecutionMeta(executionId) + if (!polledMeta || isTerminalStatus(polledMeta.status)) { + const finalEvents = await readExecutionEvents(executionId, lastEventId) + for (const entry of finalEvents) { + if (closed) return + enqueue(formatSSEEvent(entry.event)) + lastEventId = entry.eventId + } + enqueue('data: [DONE]\n\n') + if (!closed) controller.close() + return + } + } + + if (!closed) { + logger.warn('Reconnection stream poll deadline reached', { executionId }) + enqueue('data: [DONE]\n\n') + controller.close() + } + } catch (error) { + logger.error('Error in reconnection stream', { + executionId, + error: error instanceof Error ? error.message : String(error), + }) + if (!closed) { + try { + controller.close() + } catch {} + } + } + }, + cancel() { + closed = true + logger.info('Client disconnected from reconnection stream', { executionId }) + }, + }) + + return new NextResponse(stream, { + headers: { + ...SSE_HEADERS, + 'X-Execution-Id': executionId, + }, + }) + } catch (error: any) { + logger.error('Failed to start reconnection stream', { + workflowId, + executionId, + error: error.message, + }) + return NextResponse.json( + { error: error.message || 'Failed to start reconnection stream' }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/general/components/version-description-modal.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/general/components/version-description-modal.tsx index 3cf5106ea..63606c56a 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/general/components/version-description-modal.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/general/components/version-description-modal.tsx @@ -113,7 +113,7 @@ export function VersionDescriptionModal({ className='min-h-[120px] resize-none' value={description} onChange={(e) => setDescription(e.target.value)} - maxLength={500} + maxLength={2000} disabled={isGenerating} />
@@ -123,7 +123,7 @@ export function VersionDescriptionModal({

)} {!updateMutation.error && !generateMutation.error &&
} -

{description.length}/500

+

{description.length}/2000

diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 16c0e81f1..1088f8c87 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -1,4 +1,4 @@ -import { useCallback, useRef, useState } from 'react' +import { useCallback, useEffect, useRef, useState } from 'react' import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' import { v4 as uuidv4 } from 'uuid' @@ -46,7 +46,13 @@ import { useWorkflowStore } from '@/stores/workflows/workflow/store' const logger = createLogger('useWorkflowExecution') -// Debug state validation result +/** + * Module-level Set tracking which workflows have an active reconnection effect. + * Prevents multiple hook instances (from different components) from starting + * concurrent reconnection streams for the same workflow during the same mount cycle. + */ +const activeReconnections = new Set() + interface DebugValidationResult { isValid: boolean error?: string @@ -54,7 +60,7 @@ interface DebugValidationResult { interface BlockEventHandlerConfig { workflowId?: string - executionId?: string + executionIdRef: { current: string } workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }> activeBlocksSet: Set accumulatedBlockLogs: BlockLog[] @@ -108,12 +114,15 @@ export function useWorkflowExecution() { const queryClient = useQueryClient() const currentWorkflow = useCurrentWorkflow() const { activeWorkflowId, workflows } = useWorkflowRegistry() - const { toggleConsole, addConsole, updateConsole, cancelRunningEntries } = + const { toggleConsole, addConsole, updateConsole, cancelRunningEntries, clearExecutionEntries } = useTerminalConsoleStore() + const hasHydrated = useTerminalConsoleStore((s) => s._hasHydrated) const { getAllVariables } = useEnvironmentStore() const { getVariablesByWorkflowId, variables } = useVariablesStore() const { isExecuting, isDebugging, pendingBlocks, executor, debugContext } = useCurrentWorkflowExecution() + const setCurrentExecutionId = useExecutionStore((s) => s.setCurrentExecutionId) + const getCurrentExecutionId = useExecutionStore((s) => s.getCurrentExecutionId) const setIsExecuting = useExecutionStore((s) => s.setIsExecuting) const setIsDebugging = useExecutionStore((s) => s.setIsDebugging) const setPendingBlocks = useExecutionStore((s) => s.setPendingBlocks) @@ -297,7 +306,7 @@ export function useWorkflowExecution() { (config: BlockEventHandlerConfig) => { const { workflowId, - executionId, + executionIdRef, workflowEdges, activeBlocksSet, accumulatedBlockLogs, @@ -308,6 +317,14 @@ export function useWorkflowExecution() { onBlockCompleteCallback, } = config + /** Returns true if this execution was cancelled or superseded by another run. */ + const isStaleExecution = () => + !!( + workflowId && + executionIdRef.current && + useExecutionStore.getState().getCurrentExecutionId(workflowId) !== executionIdRef.current + ) + const updateActiveBlocks = (blockId: string, isActive: boolean) => { if (!workflowId) return if (isActive) { @@ -360,7 +377,7 @@ export function useWorkflowExecution() { endedAt: data.endedAt, workflowId, blockId: data.blockId, - executionId, + executionId: executionIdRef.current, blockName: data.blockName || 'Unknown Block', blockType: data.blockType || 'unknown', iterationCurrent: data.iterationCurrent, @@ -383,7 +400,7 @@ export function useWorkflowExecution() { endedAt: data.endedAt, workflowId, blockId: data.blockId, - executionId, + executionId: executionIdRef.current, blockName: data.blockName || 'Unknown Block', blockType: data.blockType || 'unknown', iterationCurrent: data.iterationCurrent, @@ -410,7 +427,7 @@ export function useWorkflowExecution() { iterationType: data.iterationType, iterationContainerId: data.iterationContainerId, }, - executionId + executionIdRef.current ) } @@ -432,11 +449,12 @@ export function useWorkflowExecution() { iterationType: data.iterationType, iterationContainerId: data.iterationContainerId, }, - executionId + executionIdRef.current ) } const onBlockStarted = (data: BlockStartedData) => { + if (isStaleExecution()) return updateActiveBlocks(data.blockId, true) markIncomingEdges(data.blockId) @@ -453,7 +471,7 @@ export function useWorkflowExecution() { endedAt: undefined, workflowId, blockId: data.blockId, - executionId, + executionId: executionIdRef.current, blockName: data.blockName || 'Unknown Block', blockType: data.blockType || 'unknown', isRunning: true, @@ -465,6 +483,7 @@ export function useWorkflowExecution() { } const onBlockCompleted = (data: BlockCompletedData) => { + if (isStaleExecution()) return updateActiveBlocks(data.blockId, false) if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success') @@ -495,6 +514,7 @@ export function useWorkflowExecution() { } const onBlockError = (data: BlockErrorData) => { + if (isStaleExecution()) return updateActiveBlocks(data.blockId, false) if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error') @@ -902,10 +922,6 @@ export function useWorkflowExecution() { // Update block logs with actual stream completion times if (result.logs && streamCompletionTimes.size > 0) { - const streamCompletionEndTime = new Date( - Math.max(...Array.from(streamCompletionTimes.values())) - ).toISOString() - result.logs.forEach((log: BlockLog) => { if (streamCompletionTimes.has(log.blockId)) { const completionTime = streamCompletionTimes.get(log.blockId)! @@ -987,7 +1003,6 @@ export function useWorkflowExecution() { return { success: true, stream } } - // For manual (non-chat) execution const manualExecutionId = uuidv4() try { const result = await executeWorkflow( @@ -1002,29 +1017,10 @@ export function useWorkflowExecution() { if (result.metadata.pendingBlocks) { setPendingBlocks(activeWorkflowId, result.metadata.pendingBlocks) } - } else if (result && 'success' in result) { - setExecutionResult(result) - // Reset execution state after successful non-debug execution - setIsExecuting(activeWorkflowId, false) - setIsDebugging(activeWorkflowId, false) - setActiveBlocks(activeWorkflowId, new Set()) - - if (isChatExecution) { - if (!result.metadata) { - result.metadata = { duration: 0, startTime: new Date().toISOString() } - } - ;(result.metadata as any).source = 'chat' - } - - // Invalidate subscription queries to update usage - setTimeout(() => { - queryClient.invalidateQueries({ queryKey: subscriptionKeys.all }) - }, 1000) } return result } catch (error: any) { const errorResult = handleExecutionError(error, { executionId: manualExecutionId }) - // Note: Error logs are already persisted server-side via execution-core.ts return errorResult } }, @@ -1275,7 +1271,7 @@ export function useWorkflowExecution() { if (activeWorkflowId) { logger.info('Using server-side executor') - const executionId = uuidv4() + const executionIdRef = { current: '' } let executionResult: ExecutionResult = { success: false, @@ -1293,7 +1289,7 @@ export function useWorkflowExecution() { try { const blockHandlers = buildBlockEventHandlers({ workflowId: activeWorkflowId, - executionId, + executionIdRef, workflowEdges, activeBlocksSet, accumulatedBlockLogs, @@ -1326,6 +1322,10 @@ export function useWorkflowExecution() { loops: clientWorkflowState.loops, parallels: clientWorkflowState.parallels, }, + onExecutionId: (id) => { + executionIdRef.current = id + setCurrentExecutionId(activeWorkflowId, id) + }, callbacks: { onExecutionStarted: (data) => { logger.info('Server execution started:', data) @@ -1368,6 +1368,18 @@ export function useWorkflowExecution() { }, onExecutionCompleted: (data) => { + if ( + activeWorkflowId && + executionIdRef.current && + useExecutionStore.getState().getCurrentExecutionId(activeWorkflowId) !== + executionIdRef.current + ) + return + + if (activeWorkflowId) { + setCurrentExecutionId(activeWorkflowId, null) + } + executionResult = { success: data.success, output: data.output, @@ -1425,9 +1437,33 @@ export function useWorkflowExecution() { }) } } + + const workflowExecState = activeWorkflowId + ? useExecutionStore.getState().getWorkflowExecution(activeWorkflowId) + : null + if (activeWorkflowId && !workflowExecState?.isDebugging) { + setExecutionResult(executionResult) + setIsExecuting(activeWorkflowId, false) + setActiveBlocks(activeWorkflowId, new Set()) + setTimeout(() => { + queryClient.invalidateQueries({ queryKey: subscriptionKeys.all }) + }, 1000) + } }, onExecutionError: (data) => { + if ( + activeWorkflowId && + executionIdRef.current && + useExecutionStore.getState().getCurrentExecutionId(activeWorkflowId) !== + executionIdRef.current + ) + return + + if (activeWorkflowId) { + setCurrentExecutionId(activeWorkflowId, null) + } + executionResult = { success: false, output: {}, @@ -1441,43 +1477,53 @@ export function useWorkflowExecution() { const isPreExecutionError = accumulatedBlockLogs.length === 0 handleExecutionErrorConsole({ workflowId: activeWorkflowId, - executionId, + executionId: executionIdRef.current, error: data.error, durationMs: data.duration, blockLogs: accumulatedBlockLogs, isPreExecutionError, }) + + if (activeWorkflowId) { + setIsExecuting(activeWorkflowId, false) + setIsDebugging(activeWorkflowId, false) + setActiveBlocks(activeWorkflowId, new Set()) + } }, onExecutionCancelled: (data) => { + if ( + activeWorkflowId && + executionIdRef.current && + useExecutionStore.getState().getCurrentExecutionId(activeWorkflowId) !== + executionIdRef.current + ) + return + + if (activeWorkflowId) { + setCurrentExecutionId(activeWorkflowId, null) + } + handleExecutionCancelledConsole({ workflowId: activeWorkflowId, - executionId, + executionId: executionIdRef.current, durationMs: data?.duration, }) + + if (activeWorkflowId) { + setIsExecuting(activeWorkflowId, false) + setIsDebugging(activeWorkflowId, false) + setActiveBlocks(activeWorkflowId, new Set()) + } }, }, }) return executionResult } catch (error: any) { - // Don't log abort errors - they're intentional user actions if (error.name === 'AbortError' || error.message?.includes('aborted')) { logger.info('Execution aborted by user') - - // Reset execution state - if (activeWorkflowId) { - setIsExecuting(activeWorkflowId, false) - setActiveBlocks(activeWorkflowId, new Set()) - } - - // Return gracefully without error - return { - success: false, - output: {}, - metadata: { duration: 0 }, - logs: [], - } + return executionResult } logger.error('Server-side execution failed:', error) @@ -1485,7 +1531,6 @@ export function useWorkflowExecution() { } } - // Fallback: should never reach here throw new Error('Server-side execution is required') } @@ -1717,25 +1762,28 @@ export function useWorkflowExecution() { * Handles cancelling the current workflow execution */ const handleCancelExecution = useCallback(() => { + if (!activeWorkflowId) return logger.info('Workflow execution cancellation requested') - // Cancel the execution stream for this workflow (server-side) - executionStream.cancel(activeWorkflowId ?? undefined) + const storedExecutionId = getCurrentExecutionId(activeWorkflowId) - // Mark current chat execution as superseded so its cleanup won't affect new executions - currentChatExecutionIdRef.current = null - - // Mark all running entries as canceled in the terminal - if (activeWorkflowId) { - cancelRunningEntries(activeWorkflowId) - - // Reset execution state - this triggers chat stream cleanup via useEffect in chat.tsx - setIsExecuting(activeWorkflowId, false) - setIsDebugging(activeWorkflowId, false) - setActiveBlocks(activeWorkflowId, new Set()) + if (storedExecutionId) { + setCurrentExecutionId(activeWorkflowId, null) + fetch(`/api/workflows/${activeWorkflowId}/executions/${storedExecutionId}/cancel`, { + method: 'POST', + }).catch(() => {}) + handleExecutionCancelledConsole({ + workflowId: activeWorkflowId, + executionId: storedExecutionId, + }) } - // If in debug mode, also reset debug state + executionStream.cancel(activeWorkflowId) + currentChatExecutionIdRef.current = null + setIsExecuting(activeWorkflowId, false) + setIsDebugging(activeWorkflowId, false) + setActiveBlocks(activeWorkflowId, new Set()) + if (isDebugging) { resetDebugState() } @@ -1747,7 +1795,9 @@ export function useWorkflowExecution() { setIsDebugging, setActiveBlocks, activeWorkflowId, - cancelRunningEntries, + getCurrentExecutionId, + setCurrentExecutionId, + handleExecutionCancelledConsole, ]) /** @@ -1847,7 +1897,7 @@ export function useWorkflowExecution() { } setIsExecuting(workflowId, true) - const executionId = uuidv4() + const executionIdRef = { current: '' } const accumulatedBlockLogs: BlockLog[] = [] const accumulatedBlockStates = new Map() const executedBlockIds = new Set() @@ -1856,7 +1906,7 @@ export function useWorkflowExecution() { try { const blockHandlers = buildBlockEventHandlers({ workflowId, - executionId, + executionIdRef, workflowEdges, activeBlocksSet, accumulatedBlockLogs, @@ -1871,6 +1921,10 @@ export function useWorkflowExecution() { startBlockId: blockId, sourceSnapshot: effectiveSnapshot, input: workflowInput, + onExecutionId: (id) => { + executionIdRef.current = id + setCurrentExecutionId(workflowId, id) + }, callbacks: { onBlockStarted: blockHandlers.onBlockStarted, onBlockCompleted: blockHandlers.onBlockCompleted, @@ -1878,7 +1932,6 @@ export function useWorkflowExecution() { onExecutionCompleted: (data) => { if (data.success) { - // Add the start block (trigger) to executed blocks executedBlockIds.add(blockId) const mergedBlockStates: Record = { @@ -1902,6 +1955,10 @@ export function useWorkflowExecution() { } setLastExecutionSnapshot(workflowId, updatedSnapshot) } + + setCurrentExecutionId(workflowId, null) + setIsExecuting(workflowId, false) + setActiveBlocks(workflowId, new Set()) }, onExecutionError: (data) => { @@ -1921,19 +1978,27 @@ export function useWorkflowExecution() { handleExecutionErrorConsole({ workflowId, - executionId, + executionId: executionIdRef.current, error: data.error, durationMs: data.duration, blockLogs: accumulatedBlockLogs, }) + + setCurrentExecutionId(workflowId, null) + setIsExecuting(workflowId, false) + setActiveBlocks(workflowId, new Set()) }, onExecutionCancelled: (data) => { handleExecutionCancelledConsole({ workflowId, - executionId, + executionId: executionIdRef.current, durationMs: data?.duration, }) + + setCurrentExecutionId(workflowId, null) + setIsExecuting(workflowId, false) + setActiveBlocks(workflowId, new Set()) }, }, }) @@ -1942,14 +2007,20 @@ export function useWorkflowExecution() { logger.error('Run-from-block failed:', error) } } finally { - setIsExecuting(workflowId, false) - setActiveBlocks(workflowId, new Set()) + const currentId = getCurrentExecutionId(workflowId) + if (currentId === null || currentId === executionIdRef.current) { + setCurrentExecutionId(workflowId, null) + setIsExecuting(workflowId, false) + setActiveBlocks(workflowId, new Set()) + } } }, [ getLastExecutionSnapshot, setLastExecutionSnapshot, clearLastExecutionSnapshot, + getCurrentExecutionId, + setCurrentExecutionId, setIsExecuting, setActiveBlocks, setBlockRunStatus, @@ -1979,29 +2050,213 @@ export function useWorkflowExecution() { const executionId = uuidv4() try { - const result = await executeWorkflow( - undefined, - undefined, - executionId, - undefined, - 'manual', - blockId - ) - if (result && 'success' in result) { - setExecutionResult(result) - } + await executeWorkflow(undefined, undefined, executionId, undefined, 'manual', blockId) } catch (error) { const errorResult = handleExecutionError(error, { executionId }) return errorResult } finally { + setCurrentExecutionId(workflowId, null) setIsExecuting(workflowId, false) setIsDebugging(workflowId, false) setActiveBlocks(workflowId, new Set()) } }, - [activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks] + [ + activeWorkflowId, + setCurrentExecutionId, + setExecutionResult, + setIsExecuting, + setIsDebugging, + setActiveBlocks, + ] ) + useEffect(() => { + if (!activeWorkflowId || !hasHydrated) return + + const entries = useTerminalConsoleStore.getState().entries + const runningEntries = entries.filter( + (e) => e.isRunning && e.workflowId === activeWorkflowId && e.executionId + ) + if (runningEntries.length === 0) return + + if (activeReconnections.has(activeWorkflowId)) return + activeReconnections.add(activeWorkflowId) + + executionStream.cancel(activeWorkflowId) + + const sorted = [...runningEntries].sort((a, b) => { + const aTime = a.startedAt ? new Date(a.startedAt).getTime() : 0 + const bTime = b.startedAt ? new Date(b.startedAt).getTime() : 0 + return bTime - aTime + }) + const executionId = sorted[0].executionId! + + const otherExecutionIds = new Set( + sorted.filter((e) => e.executionId !== executionId).map((e) => e.executionId!) + ) + if (otherExecutionIds.size > 0) { + cancelRunningEntries(activeWorkflowId) + } + + setCurrentExecutionId(activeWorkflowId, executionId) + setIsExecuting(activeWorkflowId, true) + + const workflowEdges = useWorkflowStore.getState().edges + const activeBlocksSet = new Set() + const accumulatedBlockLogs: BlockLog[] = [] + const accumulatedBlockStates = new Map() + const executedBlockIds = new Set() + + const executionIdRef = { current: executionId } + + const handlers = buildBlockEventHandlers({ + workflowId: activeWorkflowId, + executionIdRef, + workflowEdges, + activeBlocksSet, + accumulatedBlockLogs, + accumulatedBlockStates, + executedBlockIds, + consoleMode: 'update', + includeStartConsoleEntry: true, + }) + + const originalEntries = entries + .filter((e) => e.executionId === executionId) + .map((e) => ({ ...e })) + + let cleared = false + let reconnectionComplete = false + let cleanupRan = false + const clearOnce = () => { + if (!cleared) { + cleared = true + clearExecutionEntries(executionId) + } + } + + const reconnectWorkflowId = activeWorkflowId + + executionStream + .reconnect({ + workflowId: reconnectWorkflowId, + executionId, + callbacks: { + onBlockStarted: (data) => { + clearOnce() + handlers.onBlockStarted(data) + }, + onBlockCompleted: (data) => { + clearOnce() + handlers.onBlockCompleted(data) + }, + onBlockError: (data) => { + clearOnce() + handlers.onBlockError(data) + }, + onExecutionCompleted: () => { + const currentId = useExecutionStore + .getState() + .getCurrentExecutionId(reconnectWorkflowId) + if (currentId !== executionId) { + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + return + } + clearOnce() + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + setCurrentExecutionId(reconnectWorkflowId, null) + setIsExecuting(reconnectWorkflowId, false) + setActiveBlocks(reconnectWorkflowId, new Set()) + }, + onExecutionError: (data) => { + const currentId = useExecutionStore + .getState() + .getCurrentExecutionId(reconnectWorkflowId) + if (currentId !== executionId) { + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + return + } + clearOnce() + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + setCurrentExecutionId(reconnectWorkflowId, null) + setIsExecuting(reconnectWorkflowId, false) + setActiveBlocks(reconnectWorkflowId, new Set()) + handleExecutionErrorConsole({ + workflowId: reconnectWorkflowId, + executionId, + error: data.error, + blockLogs: accumulatedBlockLogs, + }) + }, + onExecutionCancelled: () => { + const currentId = useExecutionStore + .getState() + .getCurrentExecutionId(reconnectWorkflowId) + if (currentId !== executionId) { + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + return + } + clearOnce() + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + setCurrentExecutionId(reconnectWorkflowId, null) + setIsExecuting(reconnectWorkflowId, false) + setActiveBlocks(reconnectWorkflowId, new Set()) + handleExecutionCancelledConsole({ + workflowId: reconnectWorkflowId, + executionId, + }) + }, + }, + }) + .catch((error) => { + logger.warn('Execution reconnection failed', { executionId, error }) + }) + .finally(() => { + if (reconnectionComplete || cleanupRan) return + const currentId = useExecutionStore.getState().getCurrentExecutionId(reconnectWorkflowId) + if (currentId !== executionId) return + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + clearExecutionEntries(executionId) + for (const entry of originalEntries) { + addConsole({ + workflowId: entry.workflowId, + blockId: entry.blockId, + blockName: entry.blockName, + blockType: entry.blockType, + executionId: entry.executionId, + executionOrder: entry.executionOrder, + isRunning: false, + warning: 'Execution result unavailable — check the logs page', + }) + } + setCurrentExecutionId(reconnectWorkflowId, null) + setIsExecuting(reconnectWorkflowId, false) + setActiveBlocks(reconnectWorkflowId, new Set()) + }) + + return () => { + cleanupRan = true + executionStream.cancel(reconnectWorkflowId) + activeReconnections.delete(reconnectWorkflowId) + + if (cleared && !reconnectionComplete) { + clearExecutionEntries(executionId) + for (const entry of originalEntries) { + addConsole(entry) + } + } + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [activeWorkflowId, hasHydrated]) + return { isExecuting, isDebugging, diff --git a/apps/sim/hooks/queries/deployments.ts b/apps/sim/hooks/queries/deployments.ts index 894e1152c..e2f5b5ffe 100644 --- a/apps/sim/hooks/queries/deployments.ts +++ b/apps/sim/hooks/queries/deployments.ts @@ -423,7 +423,7 @@ interface GenerateVersionDescriptionVariables { const VERSION_DESCRIPTION_SYSTEM_PROMPT = `You are writing deployment version descriptions for a workflow automation platform. -Write a brief, factual description (1-3 sentences, under 400 characters) that states what changed between versions. +Write a brief, factual description (1-3 sentences, under 2000 characters) that states what changed between versions. Guidelines: - Use the specific values provided (credential names, channel names, model names) diff --git a/apps/sim/hooks/use-execution-stream.ts b/apps/sim/hooks/use-execution-stream.ts index e664788b5..2ab98059f 100644 --- a/apps/sim/hooks/use-execution-stream.ts +++ b/apps/sim/hooks/use-execution-stream.ts @@ -1,4 +1,4 @@ -import { useCallback, useRef } from 'react' +import { useCallback } from 'react' import { createLogger } from '@sim/logger' import type { BlockCompletedData, @@ -16,6 +16,18 @@ import type { SerializableExecutionState } from '@/executor/execution/types' const logger = createLogger('useExecutionStream') +/** + * Detects errors caused by the browser killing a fetch (page refresh, navigation, tab close). + * These should be treated as clean disconnects, not execution errors. + */ +function isClientDisconnectError(error: any): boolean { + if (error.name === 'AbortError') return true + const msg = (error.message ?? '').toLowerCase() + return ( + msg.includes('network error') || msg.includes('failed to fetch') || msg.includes('load failed') + ) +} + /** * Processes SSE events from a response body and invokes appropriate callbacks. */ @@ -121,6 +133,7 @@ export interface ExecuteStreamOptions { parallels?: Record } stopAfterBlockId?: string + onExecutionId?: (executionId: string) => void callbacks?: ExecutionStreamCallbacks } @@ -129,30 +142,40 @@ export interface ExecuteFromBlockOptions { startBlockId: string sourceSnapshot: SerializableExecutionState input?: any + onExecutionId?: (executionId: string) => void callbacks?: ExecutionStreamCallbacks } +export interface ReconnectStreamOptions { + workflowId: string + executionId: string + fromEventId?: number + callbacks?: ExecutionStreamCallbacks +} + +/** + * Module-level map shared across all hook instances. + * Ensures ANY instance can cancel streams started by ANY other instance, + * which is critical for SPA navigation where the original hook instance unmounts + * but the SSE stream must be cancellable from the new instance. + */ +const sharedAbortControllers = new Map() + /** * Hook for executing workflows via server-side SSE streaming. * Supports concurrent executions via per-workflow AbortController maps. */ export function useExecutionStream() { - const abortControllersRef = useRef>(new Map()) - const currentExecutionsRef = useRef>( - new Map() - ) - const execute = useCallback(async (options: ExecuteStreamOptions) => { - const { workflowId, callbacks = {}, ...payload } = options + const { workflowId, callbacks = {}, onExecutionId, ...payload } = options - const existing = abortControllersRef.current.get(workflowId) + const existing = sharedAbortControllers.get(workflowId) if (existing) { existing.abort() } const abortController = new AbortController() - abortControllersRef.current.set(workflowId, abortController) - currentExecutionsRef.current.delete(workflowId) + sharedAbortControllers.set(workflowId, abortController) try { const response = await fetch(`/api/workflows/${workflowId}/execute`, { @@ -177,42 +200,48 @@ export function useExecutionStream() { throw new Error('No response body') } - const executionId = response.headers.get('X-Execution-Id') - if (executionId) { - currentExecutionsRef.current.set(workflowId, { workflowId, executionId }) + const serverExecutionId = response.headers.get('X-Execution-Id') + if (serverExecutionId) { + onExecutionId?.(serverExecutionId) } const reader = response.body.getReader() await processSSEStream(reader, callbacks, 'Execution') } catch (error: any) { - if (error.name === 'AbortError') { - logger.info('Execution stream cancelled') - callbacks.onExecutionCancelled?.({ duration: 0 }) - } else { - logger.error('Execution stream error:', error) - callbacks.onExecutionError?.({ - error: error.message || 'Unknown error', - duration: 0, - }) + if (isClientDisconnectError(error)) { + logger.info('Execution stream disconnected (page unload or abort)') + return } + logger.error('Execution stream error:', error) + callbacks.onExecutionError?.({ + error: error.message || 'Unknown error', + duration: 0, + }) throw error } finally { - abortControllersRef.current.delete(workflowId) - currentExecutionsRef.current.delete(workflowId) + if (sharedAbortControllers.get(workflowId) === abortController) { + sharedAbortControllers.delete(workflowId) + } } }, []) const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => { - const { workflowId, startBlockId, sourceSnapshot, input, callbacks = {} } = options + const { + workflowId, + startBlockId, + sourceSnapshot, + input, + onExecutionId, + callbacks = {}, + } = options - const existing = abortControllersRef.current.get(workflowId) + const existing = sharedAbortControllers.get(workflowId) if (existing) { existing.abort() } const abortController = new AbortController() - abortControllersRef.current.set(workflowId, abortController) - currentExecutionsRef.current.delete(workflowId) + sharedAbortControllers.set(workflowId, abortController) try { const response = await fetch(`/api/workflows/${workflowId}/execute`, { @@ -246,64 +275,80 @@ export function useExecutionStream() { throw new Error('No response body') } - const executionId = response.headers.get('X-Execution-Id') - if (executionId) { - currentExecutionsRef.current.set(workflowId, { workflowId, executionId }) + const serverExecutionId = response.headers.get('X-Execution-Id') + if (serverExecutionId) { + onExecutionId?.(serverExecutionId) } const reader = response.body.getReader() await processSSEStream(reader, callbacks, 'Run-from-block') } catch (error: any) { - if (error.name === 'AbortError') { - logger.info('Run-from-block execution cancelled') - callbacks.onExecutionCancelled?.({ duration: 0 }) - } else { - logger.error('Run-from-block execution error:', error) - callbacks.onExecutionError?.({ - error: error.message || 'Unknown error', - duration: 0, - }) + if (isClientDisconnectError(error)) { + logger.info('Run-from-block stream disconnected (page unload or abort)') + return } + logger.error('Run-from-block execution error:', error) + callbacks.onExecutionError?.({ + error: error.message || 'Unknown error', + duration: 0, + }) throw error } finally { - abortControllersRef.current.delete(workflowId) - currentExecutionsRef.current.delete(workflowId) + if (sharedAbortControllers.get(workflowId) === abortController) { + sharedAbortControllers.delete(workflowId) + } + } + }, []) + + const reconnect = useCallback(async (options: ReconnectStreamOptions) => { + const { workflowId, executionId, fromEventId = 0, callbacks = {} } = options + + const existing = sharedAbortControllers.get(workflowId) + if (existing) { + existing.abort() + } + + const abortController = new AbortController() + sharedAbortControllers.set(workflowId, abortController) + try { + const response = await fetch( + `/api/workflows/${workflowId}/executions/${executionId}/stream?from=${fromEventId}`, + { signal: abortController.signal } + ) + if (!response.ok) throw new Error(`Reconnect failed (${response.status})`) + if (!response.body) throw new Error('No response body') + + await processSSEStream(response.body.getReader(), callbacks, 'Reconnect') + } catch (error: any) { + if (isClientDisconnectError(error)) return + logger.error('Reconnection stream error:', error) + throw error + } finally { + if (sharedAbortControllers.get(workflowId) === abortController) { + sharedAbortControllers.delete(workflowId) + } } }, []) const cancel = useCallback((workflowId?: string) => { if (workflowId) { - const execution = currentExecutionsRef.current.get(workflowId) - if (execution) { - fetch(`/api/workflows/${execution.workflowId}/executions/${execution.executionId}/cancel`, { - method: 'POST', - }).catch(() => {}) - } - - const controller = abortControllersRef.current.get(workflowId) + const controller = sharedAbortControllers.get(workflowId) if (controller) { controller.abort() - abortControllersRef.current.delete(workflowId) + sharedAbortControllers.delete(workflowId) } - currentExecutionsRef.current.delete(workflowId) } else { - for (const [, execution] of currentExecutionsRef.current) { - fetch(`/api/workflows/${execution.workflowId}/executions/${execution.executionId}/cancel`, { - method: 'POST', - }).catch(() => {}) - } - - for (const [, controller] of abortControllersRef.current) { + for (const [, controller] of sharedAbortControllers) { controller.abort() } - abortControllersRef.current.clear() - currentExecutionsRef.current.clear() + sharedAbortControllers.clear() } }, []) return { execute, executeFromBlock, + reconnect, cancel, } } diff --git a/apps/sim/lib/execution/event-buffer.ts b/apps/sim/lib/execution/event-buffer.ts new file mode 100644 index 000000000..4473a922f --- /dev/null +++ b/apps/sim/lib/execution/event-buffer.ts @@ -0,0 +1,246 @@ +import { createLogger } from '@sim/logger' +import { getRedisClient } from '@/lib/core/config/redis' +import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events' + +const logger = createLogger('ExecutionEventBuffer') + +const REDIS_PREFIX = 'execution:stream:' +const TTL_SECONDS = 60 * 60 // 1 hour +const EVENT_LIMIT = 1000 +const RESERVE_BATCH = 100 +const FLUSH_INTERVAL_MS = 15 +const FLUSH_MAX_BATCH = 200 + +function getEventsKey(executionId: string) { + return `${REDIS_PREFIX}${executionId}:events` +} + +function getSeqKey(executionId: string) { + return `${REDIS_PREFIX}${executionId}:seq` +} + +function getMetaKey(executionId: string) { + return `${REDIS_PREFIX}${executionId}:meta` +} + +export type ExecutionStreamStatus = 'active' | 'complete' | 'error' | 'cancelled' + +export interface ExecutionStreamMeta { + status: ExecutionStreamStatus + userId?: string + workflowId?: string + updatedAt?: string +} + +export interface ExecutionEventEntry { + eventId: number + executionId: string + event: ExecutionEvent +} + +export interface ExecutionEventWriter { + write: (event: ExecutionEvent) => Promise + flush: () => Promise + close: () => Promise +} + +export async function setExecutionMeta( + executionId: string, + meta: Partial +): Promise { + const redis = getRedisClient() + if (!redis) { + logger.warn('setExecutionMeta: Redis client unavailable', { executionId }) + return + } + try { + const key = getMetaKey(executionId) + const payload: Record = { + updatedAt: new Date().toISOString(), + } + if (meta.status) payload.status = meta.status + if (meta.userId) payload.userId = meta.userId + if (meta.workflowId) payload.workflowId = meta.workflowId + await redis.hset(key, payload) + await redis.expire(key, TTL_SECONDS) + } catch (error) { + logger.warn('Failed to update execution meta', { + executionId, + error: error instanceof Error ? error.message : String(error), + }) + } +} + +export async function getExecutionMeta(executionId: string): Promise { + const redis = getRedisClient() + if (!redis) { + logger.warn('getExecutionMeta: Redis client unavailable', { executionId }) + return null + } + try { + const key = getMetaKey(executionId) + const meta = await redis.hgetall(key) + if (!meta || Object.keys(meta).length === 0) return null + return meta as unknown as ExecutionStreamMeta + } catch (error) { + logger.warn('Failed to read execution meta', { + executionId, + error: error instanceof Error ? error.message : String(error), + }) + return null + } +} + +export async function readExecutionEvents( + executionId: string, + afterEventId: number +): Promise { + const redis = getRedisClient() + if (!redis) return [] + try { + const raw = await redis.zrangebyscore(getEventsKey(executionId), afterEventId + 1, '+inf') + return raw + .map((entry) => { + try { + return JSON.parse(entry) as ExecutionEventEntry + } catch { + return null + } + }) + .filter((entry): entry is ExecutionEventEntry => Boolean(entry)) + } catch (error) { + logger.warn('Failed to read execution events', { + executionId, + error: error instanceof Error ? error.message : String(error), + }) + return [] + } +} + +export function createExecutionEventWriter(executionId: string): ExecutionEventWriter { + const redis = getRedisClient() + if (!redis) { + logger.warn( + 'createExecutionEventWriter: Redis client unavailable, events will not be buffered', + { + executionId, + } + ) + return { + write: async (event) => ({ eventId: 0, executionId, event }), + flush: async () => {}, + close: async () => {}, + } + } + + let pending: ExecutionEventEntry[] = [] + let nextEventId = 0 + let maxReservedId = 0 + let flushTimer: ReturnType | null = null + + const scheduleFlush = () => { + if (flushTimer) return + flushTimer = setTimeout(() => { + flushTimer = null + void flush() + }, FLUSH_INTERVAL_MS) + } + + const reserveIds = async (minCount: number) => { + const reserveCount = Math.max(RESERVE_BATCH, minCount) + const newMax = await redis.incrby(getSeqKey(executionId), reserveCount) + const startId = newMax - reserveCount + 1 + if (nextEventId === 0 || nextEventId > maxReservedId) { + nextEventId = startId + maxReservedId = newMax + } + } + + let flushPromise: Promise | null = null + let closed = false + const inflightWrites = new Set>() + + const doFlush = async () => { + if (pending.length === 0) return + const batch = pending + pending = [] + try { + const key = getEventsKey(executionId) + const zaddArgs: (string | number)[] = [] + for (const entry of batch) { + zaddArgs.push(entry.eventId, JSON.stringify(entry)) + } + const pipeline = redis.pipeline() + pipeline.zadd(key, ...zaddArgs) + pipeline.expire(key, TTL_SECONDS) + pipeline.expire(getSeqKey(executionId), TTL_SECONDS) + pipeline.zremrangebyrank(key, 0, -EVENT_LIMIT - 1) + await pipeline.exec() + } catch (error) { + logger.warn('Failed to flush execution events', { + executionId, + batchSize: batch.length, + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined, + }) + pending = batch.concat(pending) + } + } + + const flush = async () => { + if (flushPromise) { + await flushPromise + return + } + flushPromise = doFlush() + try { + await flushPromise + } finally { + flushPromise = null + if (pending.length > 0) scheduleFlush() + } + } + + const writeCore = async (event: ExecutionEvent): Promise => { + if (closed) return { eventId: 0, executionId, event } + if (nextEventId === 0 || nextEventId > maxReservedId) { + await reserveIds(1) + } + const eventId = nextEventId++ + const entry: ExecutionEventEntry = { eventId, executionId, event } + pending.push(entry) + if (pending.length >= FLUSH_MAX_BATCH) { + await flush() + } else { + scheduleFlush() + } + return entry + } + + const write = (event: ExecutionEvent): Promise => { + const p = writeCore(event) + inflightWrites.add(p) + const remove = () => inflightWrites.delete(p) + p.then(remove, remove) + return p + } + + const close = async () => { + closed = true + if (flushTimer) { + clearTimeout(flushTimer) + flushTimer = null + } + if (inflightWrites.size > 0) { + await Promise.allSettled(inflightWrites) + } + if (flushPromise) { + await flushPromise + } + if (pending.length > 0) { + await doFlush() + } + } + + return { write, flush, close } +} diff --git a/apps/sim/stores/execution/store.ts b/apps/sim/stores/execution/store.ts index 6983ddcda..b82d4a3c5 100644 --- a/apps/sim/stores/execution/store.ts +++ b/apps/sim/stores/execution/store.ts @@ -129,6 +129,18 @@ export const useExecutionStore = create()((se }) }, + setCurrentExecutionId: (workflowId, executionId) => { + set({ + workflowExecutions: updatedMap(get().workflowExecutions, workflowId, { + currentExecutionId: executionId, + }), + }) + }, + + getCurrentExecutionId: (workflowId) => { + return getOrCreate(get().workflowExecutions, workflowId).currentExecutionId + }, + clearRunPath: (workflowId) => { set({ workflowExecutions: updatedMap(get().workflowExecutions, workflowId, { diff --git a/apps/sim/stores/execution/types.ts b/apps/sim/stores/execution/types.ts index 55d873b49..b36ea43a1 100644 --- a/apps/sim/stores/execution/types.ts +++ b/apps/sim/stores/execution/types.ts @@ -35,6 +35,8 @@ export interface WorkflowExecutionState { lastRunPath: Map /** Maps edge IDs to their run result from the last execution */ lastRunEdges: Map + /** The execution ID of the currently running execution */ + currentExecutionId: string | null } /** @@ -54,6 +56,7 @@ export const defaultWorkflowExecutionState: WorkflowExecutionState = { debugContext: null, lastRunPath: new Map(), lastRunEdges: new Map(), + currentExecutionId: null, } /** @@ -96,6 +99,10 @@ export interface ExecutionActions { setEdgeRunStatus: (workflowId: string, edgeId: string, status: EdgeRunStatus) => void /** Clears the run path and run edges for a workflow */ clearRunPath: (workflowId: string) => void + /** Stores the current execution ID for a workflow */ + setCurrentExecutionId: (workflowId: string, executionId: string | null) => void + /** Returns the current execution ID for a workflow */ + getCurrentExecutionId: (workflowId: string) => string | null /** Resets the entire store to its initial empty state */ reset: () => void /** Stores a serializable execution snapshot for a workflow */ diff --git a/apps/sim/stores/terminal/console/store.ts b/apps/sim/stores/terminal/console/store.ts index 55b59b135..9fddbf3ef 100644 --- a/apps/sim/stores/terminal/console/store.ts +++ b/apps/sim/stores/terminal/console/store.ts @@ -224,7 +224,7 @@ export const useTerminalConsoleStore = create()( const newEntry = get().entries[0] - if (newEntry?.error) { + if (newEntry?.error && newEntry.blockType !== 'cancelled') { notifyBlockError({ error: newEntry.error, blockName: newEntry.blockName || 'Unknown Block', @@ -243,6 +243,11 @@ export const useTerminalConsoleStore = create()( useExecutionStore.getState().clearRunPath(workflowId) }, + clearExecutionEntries: (executionId: string) => + set((state) => ({ + entries: state.entries.filter((e) => e.executionId !== executionId), + })), + exportConsoleCSV: (workflowId: string) => { const entries = get().entries.filter((entry) => entry.workflowId === workflowId) @@ -470,12 +475,24 @@ export const useTerminalConsoleStore = create()( }, merge: (persistedState, currentState) => { const persisted = persistedState as Partial | undefined - const entries = (persisted?.entries ?? currentState.entries).map((entry, index) => { + const rawEntries = persisted?.entries ?? currentState.entries + const oneHourAgo = Date.now() - 60 * 60 * 1000 + + const entries = rawEntries.map((entry, index) => { + let updated = entry if (entry.executionOrder === undefined) { - return { ...entry, executionOrder: index + 1 } + updated = { ...updated, executionOrder: index + 1 } } - return entry + if ( + entry.isRunning && + entry.startedAt && + new Date(entry.startedAt).getTime() < oneHourAgo + ) { + updated = { ...updated, isRunning: false } + } + return updated }) + return { ...currentState, entries, diff --git a/apps/sim/stores/terminal/console/types.ts b/apps/sim/stores/terminal/console/types.ts index f15f36377..e057854d8 100644 --- a/apps/sim/stores/terminal/console/types.ts +++ b/apps/sim/stores/terminal/console/types.ts @@ -51,6 +51,7 @@ export interface ConsoleStore { isOpen: boolean addConsole: (entry: Omit) => ConsoleEntry clearWorkflowConsole: (workflowId: string) => void + clearExecutionEntries: (executionId: string) => void exportConsoleCSV: (workflowId: string) => void getWorkflowEntries: (workflowId: string) => ConsoleEntry[] toggleConsole: () => void