fix(cancel-workflow-exec): move cancellation tracking for multi-task envs to redis (#2573)

* fix(cancel-workflow-exec): move cancellation tracking for multi-task envs to redis

* cleanup cancellation keys after execution
This commit is contained in:
Vikhyath Mondreti
2025-12-24 11:51:09 -08:00
committed by GitHub
parent cb8b9c547a
commit 77521a3a57
8 changed files with 234 additions and 23 deletions

View File

@@ -7,6 +7,7 @@ import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
@@ -767,10 +768,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
cancel() {
isStreamClosed = true
logger.info(
`[${requestId}] Client aborted SSE stream, signalling cancellation via AbortController`
)
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})

View File

@@ -0,0 +1,47 @@
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('CancelExecutionAPI')
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export async function POST(
req: NextRequest,
{ params }: { params: Promise<{ id: string; executionId: string }> }
) {
const { id: workflowId, executionId } = await params
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}
logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId })
const marked = await markExecutionCancelled(executionId)
if (marked) {
logger.info('Execution marked as cancelled in Redis', { executionId })
} else {
logger.info('Redis not available, cancellation will rely on connection close', {
executionId,
})
}
return NextResponse.json({
success: true,
executionId,
redisAvailable: marked,
})
} catch (error: any) {
logger.error('Failed to cancel execution', { workflowId, executionId, error: error.message })
return NextResponse.json(
{ error: error.message || 'Failed to cancel execution' },
{ status: 500 }
)
}
}

View File

@@ -1,3 +1,4 @@
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
@@ -23,6 +24,10 @@ export class ExecutionEngine {
private finalOutput: NormalizedBlockOutput = {}
private pausedBlocks: Map<string, PauseMetadata> = new Map()
private allowResumeTriggers: boolean
private cancelledFlag = false
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
constructor(
private context: ExecutionContext,
@@ -31,6 +36,35 @@ export class ExecutionEngine {
private nodeOrchestrator: NodeExecutionOrchestrator
) {
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
}
private async checkCancellation(): Promise<boolean> {
if (this.cancelledFlag) {
return true
}
if (this.useRedisCancellation) {
const now = Date.now()
if (now - this.lastCancellationCheck < this.CANCELLATION_CHECK_INTERVAL_MS) {
return false
}
this.lastCancellationCheck = now
const cancelled = await isExecutionCancelled(this.context.executionId!)
if (cancelled) {
this.cancelledFlag = true
logger.info('Execution cancelled via Redis', { executionId: this.context.executionId })
}
return cancelled
}
if (this.context.abortSignal?.aborted) {
this.cancelledFlag = true
return true
}
return false
}
async run(triggerBlockId?: string): Promise<ExecutionResult> {
@@ -39,7 +73,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if (this.context.abortSignal?.aborted && this.executing.size === 0) {
if ((await this.checkCancellation()) && this.executing.size === 0) {
break
}
await this.processQueue()
@@ -54,7 +88,7 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime
if (this.context.abortSignal?.aborted) {
if (this.cancelledFlag) {
return {
success: false,
output: this.finalOutput,
@@ -75,7 +109,7 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime
if (this.context.abortSignal?.aborted) {
if (this.cancelledFlag) {
return {
success: false,
output: this.finalOutput,
@@ -234,7 +268,7 @@ export class ExecutionEngine {
private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (this.context.abortSignal?.aborted) {
if (await this.checkCancellation()) {
break
}
const nodeId = this.dequeue()

View File

@@ -1,32 +1,61 @@
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
import { BlockType } from '@/executor/constants'
import type { BlockHandler, ExecutionContext } from '@/executor/types'
import type { SerializedBlock } from '@/serializer/types'
/**
* Helper function to sleep for a specified number of milliseconds with AbortSignal support.
* The sleep will be cancelled immediately when the AbortSignal is aborted.
*/
const sleep = async (ms: number, signal?: AbortSignal): Promise<boolean> => {
if (signal?.aborted) {
const CANCELLATION_CHECK_INTERVAL_MS = 500
interface SleepOptions {
signal?: AbortSignal
executionId?: string
}
const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> => {
const { signal, executionId } = options
const useRedis = isRedisCancellationEnabled() && !!executionId
if (!useRedis && signal?.aborted) {
return false
}
return new Promise((resolve) => {
let timeoutId: NodeJS.Timeout | undefined
let mainTimeoutId: NodeJS.Timeout | undefined
let checkIntervalId: NodeJS.Timeout | undefined
let resolved = false
const cleanup = () => {
if (mainTimeoutId) clearTimeout(mainTimeoutId)
if (checkIntervalId) clearInterval(checkIntervalId)
if (!useRedis && signal) signal.removeEventListener('abort', onAbort)
}
const onAbort = () => {
if (timeoutId) clearTimeout(timeoutId)
if (resolved) return
resolved = true
cleanup()
resolve(false)
}
if (signal) {
if (useRedis) {
checkIntervalId = setInterval(async () => {
if (resolved) return
try {
const cancelled = await isExecutionCancelled(executionId!)
if (cancelled) {
resolved = true
cleanup()
resolve(false)
}
} catch {}
}, CANCELLATION_CHECK_INTERVAL_MS)
} else if (signal) {
signal.addEventListener('abort', onAbort, { once: true })
}
timeoutId = setTimeout(() => {
if (signal) {
signal.removeEventListener('abort', onAbort)
}
mainTimeoutId = setTimeout(() => {
if (resolved) return
resolved = true
cleanup()
resolve(true)
}, ms)
})
@@ -63,7 +92,10 @@ export class WaitBlockHandler implements BlockHandler {
throw new Error(`Wait time exceeds maximum of ${maxDisplay}`)
}
const completed = await sleep(waitMs, ctx.abortSignal)
const completed = await sleep(waitMs, {
signal: ctx.abortSignal,
executionId: ctx.executionId,
})
if (!completed) {
return {

View File

@@ -1,4 +1,5 @@
import { generateRequestId } from '@/lib/core/utils/request'
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
import { executeInIsolatedVM } from '@/lib/execution/isolated-vm'
import { createLogger } from '@/lib/logs/console/logger'
import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
@@ -229,7 +230,14 @@ export class LoopOrchestrator {
}
}
if (ctx.abortSignal?.aborted) {
const useRedis = isRedisCancellationEnabled() && !!ctx.executionId
let isCancelled = false
if (useRedis) {
isCancelled = await isExecutionCancelled(ctx.executionId!)
} else {
isCancelled = ctx.abortSignal?.aborted ?? false
}
if (isCancelled) {
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
return this.createExitResult(ctx, loopId, scope)
}

View File

@@ -76,6 +76,7 @@ export interface ExecuteStreamOptions {
*/
export function useExecutionStream() {
const abortControllerRef = useRef<AbortController | null>(null)
const currentExecutionRef = useRef<{ workflowId: string; executionId: string } | null>(null)
const execute = useCallback(async (options: ExecuteStreamOptions) => {
const { workflowId, callbacks = {}, ...payload } = options
@@ -88,6 +89,7 @@ export function useExecutionStream() {
// Create new abort controller
const abortController = new AbortController()
abortControllerRef.current = abortController
currentExecutionRef.current = null
try {
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
@@ -108,6 +110,11 @@ export function useExecutionStream() {
throw new Error('No response body')
}
const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current = { workflowId, executionId }
}
// Read SSE stream
const reader = response.body.getReader()
const decoder = new TextDecoder()
@@ -215,14 +222,23 @@ export function useExecutionStream() {
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = null
}
}, [])
const cancel = useCallback(() => {
const execution = currentExecutionRef.current
if (execution) {
fetch(`/api/workflows/${execution.workflowId}/executions/${execution.executionId}/cancel`, {
method: 'POST',
}).catch(() => {})
}
if (abortControllerRef.current) {
abortControllerRef.current.abort()
abortControllerRef.current = null
}
currentExecutionRef.current = null
}, [])
return {

View File

@@ -0,0 +1,66 @@
import { getRedisClient } from '@/lib/core/config/redis'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('ExecutionCancellation')
const EXECUTION_CANCEL_PREFIX = 'execution:cancel:'
const EXECUTION_CANCEL_EXPIRY = 60 * 60
export function isRedisCancellationEnabled(): boolean {
return getRedisClient() !== null
}
/**
* Mark an execution as cancelled in Redis.
* Returns true if Redis is available and the flag was set, false otherwise.
*/
export async function markExecutionCancelled(executionId: string): Promise<boolean> {
const redis = getRedisClient()
if (!redis) {
return false
}
try {
await redis.set(`${EXECUTION_CANCEL_PREFIX}${executionId}`, '1', 'EX', EXECUTION_CANCEL_EXPIRY)
logger.info('Marked execution as cancelled', { executionId })
return true
} catch (error) {
logger.error('Failed to mark execution as cancelled', { executionId, error })
return false
}
}
/**
* Check if an execution has been cancelled via Redis.
* Returns false if Redis is not available (fallback to local abort signal).
*/
export async function isExecutionCancelled(executionId: string): Promise<boolean> {
const redis = getRedisClient()
if (!redis) {
return false
}
try {
const result = await redis.exists(`${EXECUTION_CANCEL_PREFIX}${executionId}`)
return result === 1
} catch (error) {
logger.error('Failed to check execution cancellation', { executionId, error })
return false
}
}
/**
* Clear the cancellation flag for an execution.
*/
export async function clearExecutionCancellation(executionId: string): Promise<void> {
const redis = getRedisClient()
if (!redis) {
return
}
try {
await redis.del(`${EXECUTION_CANCEL_PREFIX}${executionId}`)
} catch (error) {
logger.error('Failed to clear execution cancellation', { executionId, error })
}
}

View File

@@ -6,6 +6,7 @@
import type { Edge } from 'reactflow'
import { z } from 'zod'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { clearExecutionCancellation } from '@/lib/execution/cancellation'
import { createLogger } from '@/lib/logs/console/logger'
import type { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -375,6 +376,8 @@ export async function executeWorkflowCore(
traceSpans: traceSpans || [],
})
await clearExecutionCancellation(executionId)
logger.info(`[${requestId}] Workflow execution cancelled`, {
duration: result.metadata?.duration,
})
@@ -383,6 +386,8 @@ export async function executeWorkflowCore(
}
if (result.status === 'paused') {
await clearExecutionCancellation(executionId)
logger.info(`[${requestId}] Workflow execution paused`, {
duration: result.metadata?.duration,
})
@@ -398,6 +403,8 @@ export async function executeWorkflowCore(
workflowInput: processedInput,
})
await clearExecutionCancellation(executionId)
logger.info(`[${requestId}] Workflow execution completed`, {
success: result.success,
duration: result.metadata?.duration,
@@ -407,7 +414,6 @@ export async function executeWorkflowCore(
} catch (error: any) {
logger.error(`[${requestId}] Execution failed:`, error)
// Extract execution result from error if available
const executionResult = (error as any)?.executionResult
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
@@ -421,6 +427,8 @@ export async function executeWorkflowCore(
traceSpans,
})
await clearExecutionCancellation(executionId)
throw error
}
}