fix(executor): workflow abort has to send abort signal to route for correct state update (#2571)

This commit is contained in:
Vikhyath Mondreti
2025-12-24 02:50:58 -08:00
committed by GitHub
parent 1145f5c043
commit b1cd8d151d
9 changed files with 54 additions and 62 deletions

View File

@@ -496,7 +496,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
const encoder = new TextEncoder()
let executorInstance: any = null
const abortController = new AbortController()
let isStreamClosed = false
const stream = new ReadableStream<Uint8Array>({
@@ -688,11 +688,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
onBlockStart,
onBlockComplete,
onStream,
onExecutorCreated: (executor) => {
executorInstance = executor
},
},
loggingSession,
abortSignal: abortController.signal,
})
if (result.status === 'paused') {
@@ -769,11 +767,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client aborted SSE stream, cancelling executor`)
if (executorInstance && typeof executorInstance.cancel === 'function') {
executorInstance.cancel()
}
logger.info(
`[${requestId}] Client aborted SSE stream, signalling cancellation via AbortController`
)
abortController.abort()
},
})

View File

@@ -39,7 +39,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if (this.context.isCancelled && this.executing.size === 0) {
if (this.context.abortSignal?.aborted && this.executing.size === 0) {
break
}
await this.processQueue()
@@ -54,7 +54,7 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime
if (this.context.isCancelled) {
if (this.context.abortSignal?.aborted) {
return {
success: false,
output: this.finalOutput,
@@ -75,7 +75,7 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime
if (this.context.isCancelled) {
if (this.context.abortSignal?.aborted) {
return {
success: false,
output: this.finalOutput,
@@ -234,7 +234,7 @@ export class ExecutionEngine {
private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (this.context.isCancelled) {
if (this.context.abortSignal?.aborted) {
break
}
const nodeId = this.dequeue()

View File

@@ -37,7 +37,6 @@ export class DAGExecutor {
private workflowInput: WorkflowInput
private workflowVariables: Record<string, unknown>
private contextExtensions: ContextExtensions
private isCancelled = false
private dagBuilder: DAGBuilder
constructor(options: DAGExecutorOptions) {
@@ -54,13 +53,6 @@ export class DAGExecutor {
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
// Link cancellation flag to context
Object.defineProperty(context, 'isCancelled', {
get: () => this.isCancelled,
enumerable: true,
configurable: true,
})
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
loopOrchestrator.setContextExtensions(this.contextExtensions)
@@ -82,10 +74,6 @@ export class DAGExecutor {
return await engine.run(triggerBlockId)
}
cancel(): void {
this.isCancelled = true
}
async continueExecution(
_pendingBlocks: string[],
context: ExecutionContext
@@ -180,6 +168,7 @@ export class DAGExecutor {
onStream: this.contextExtensions.onStream,
onBlockStart: this.contextExtensions.onBlockStart,
onBlockComplete: this.contextExtensions.onBlockComplete,
abortSignal: this.contextExtensions.abortSignal,
}
if (this.contextExtensions.resumeFromSnapshot) {

View File

@@ -34,7 +34,6 @@ export interface ExecutionCallbacks {
blockType: string,
output: any
) => Promise<void>
onExecutorCreated?: (executor: any) => void
}
export interface SerializableExecutionState {

View File

@@ -22,6 +22,11 @@ export interface ContextExtensions {
dagIncomingEdges?: Record<string, string[]>
snapshotState?: SerializableExecutionState
metadata?: ExecutionMetadata
/**
* AbortSignal for cancellation support.
* When aborted, the execution should stop gracefully.
*/
abortSignal?: AbortSignal
onStream?: (streamingExecution: unknown) => Promise<void>
onBlockStart?: (
blockId: string,

View File

@@ -1,37 +1,37 @@
import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/constants'
import type { BlockHandler, ExecutionContext } from '@/executor/types'
import type { SerializedBlock } from '@/serializer/types'
const logger = createLogger('WaitBlockHandler')
/**
* Helper function to sleep for a specified number of milliseconds
* On client-side: checks for cancellation every 100ms (non-blocking for UI)
* On server-side: simple sleep without polling (server execution can't be cancelled mid-flight)
* Helper function to sleep for a specified number of milliseconds with AbortSignal support.
* The sleep will be cancelled immediately when the AbortSignal is aborted.
*/
const sleep = async (ms: number, checkCancelled?: () => boolean): Promise<boolean> => {
const isClientSide = typeof window !== 'undefined'
if (!isClientSide) {
await new Promise((resolve) => setTimeout(resolve, ms))
return true
const sleep = async (ms: number, signal?: AbortSignal): Promise<boolean> => {
if (signal?.aborted) {
return false
}
const chunkMs = 100
let elapsed = 0
return new Promise((resolve) => {
let timeoutId: NodeJS.Timeout | undefined
while (elapsed < ms) {
if (checkCancelled?.()) {
return false
const onAbort = () => {
if (timeoutId) {
clearTimeout(timeoutId)
}
resolve(false)
}
const sleepTime = Math.min(chunkMs, ms - elapsed)
await new Promise((resolve) => setTimeout(resolve, sleepTime))
elapsed += sleepTime
}
if (signal) {
signal.addEventListener('abort', onAbort, { once: true })
}
return true
timeoutId = setTimeout(() => {
if (signal) {
signal.removeEventListener('abort', onAbort)
}
resolve(true)
}, ms)
})
}
/**
@@ -65,11 +65,7 @@ export class WaitBlockHandler implements BlockHandler {
throw new Error(`Wait time exceeds maximum of ${maxDisplay}`)
}
const checkCancelled = () => {
return (ctx as any).isCancelled === true
}
const completed = await sleep(waitMs, checkCancelled)
const completed = await sleep(waitMs, ctx.abortSignal)
if (!completed) {
return {

View File

@@ -229,7 +229,7 @@ export class LoopOrchestrator {
}
}
if (ctx.isCancelled) {
if (ctx.abortSignal?.aborted) {
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
return this.createExitResult(ctx, loopId, scope)
}

View File

@@ -222,8 +222,12 @@ export interface ExecutionContext {
output: any
) => Promise<void>
// Cancellation support
isCancelled?: boolean
/**
* AbortSignal for cancellation support.
* When the signal is aborted, execution should stop gracefully.
* This is triggered when the SSE client disconnects.
*/
abortSignal?: AbortSignal
// Dynamically added nodes that need to be scheduled (e.g., from parallel expansion)
pendingDynamicNodes?: string[]

View File

@@ -32,6 +32,11 @@ export interface ExecuteWorkflowCoreOptions {
callbacks: ExecutionCallbacks
loggingSession: LoggingSession
skipLogCreation?: boolean // For resume executions - reuse existing log entry
/**
* AbortSignal for cancellation support.
* When aborted (e.g., client disconnects from SSE), execution stops gracefully.
*/
abortSignal?: AbortSignal
}
function parseVariableValueByType(value: any, type: string): any {
@@ -98,11 +103,11 @@ function parseVariableValueByType(value: any, type: string): any {
export async function executeWorkflowCore(
options: ExecuteWorkflowCoreOptions
): Promise<ExecutionResult> {
const { snapshot, callbacks, loggingSession, skipLogCreation } = options
const { snapshot, callbacks, loggingSession, skipLogCreation, abortSignal } = options
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
metadata
const { onBlockStart, onBlockComplete, onStream, onExecutorCreated } = callbacks
const { onBlockStart, onBlockComplete, onStream } = callbacks
const providedWorkspaceId = metadata.workspaceId
if (!providedWorkspaceId) {
@@ -326,6 +331,7 @@ export async function executeWorkflowCore(
dagIncomingEdges: snapshot.state?.dagIncomingEdges,
snapshotState: snapshot.state,
metadata,
abortSignal,
}
const executorInstance = new Executor({
@@ -349,10 +355,6 @@ export async function executeWorkflowCore(
}
}
if (onExecutorCreated) {
onExecutorCreated(executorInstance)
}
const result = (await executorInstance.execute(
workflowId,
resolvedTriggerBlockId