From ee06ee34f68ef9e5d4d0097747b7d08203b5649d Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 3 Feb 2026 18:51:57 -0800 Subject: [PATCH] consolidate --- .../app/api/workflows/[id]/execute/route.ts | 68 +++++++++---------- apps/sim/background/schedule-execution.ts | 27 +++----- apps/sim/background/webhook-execution.ts | 45 ++++++------ apps/sim/background/workflow-execution.ts | 30 ++++---- apps/sim/lib/core/execution-limits/types.ts | 40 +++++++++++ .../executor/human-in-the-loop-manager.ts | 30 ++++---- apps/sim/lib/workflows/streaming/streaming.ts | 30 ++++---- 7 files changed, 146 insertions(+), 124 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 3d103d061..68b2fa842 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -5,7 +5,11 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid' import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' -import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits' +import { + createTimeoutAbortController, + getTimeoutErrorMessage, + isTimeoutError, +} from '@/lib/core/execution-limits' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getBaseUrl } from '@/lib/core/utils/urls' @@ -402,17 +406,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: if (!enableSSE) { logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`) - const syncTimeout = preprocessResult.executionTimeout?.sync - const abortController = new AbortController() - let isTimedOut = false - let timeoutId: NodeJS.Timeout | undefined - - if (syncTimeout) { - timeoutId = setTimeout(() => { - isTimedOut = true - abortController.abort() - }, syncTimeout) - } + const timeoutController = createTimeoutAbortController( + preprocessResult.executionTimeout?.sync + ) try { const metadata: ExecutionMetadata = { @@ -447,12 +443,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: includeFileBase64, base64MaxBytes, stopAfterBlockId, - abortSignal: abortController.signal, + abortSignal: timeoutController.signal, }) - if (result.status === 'cancelled' && isTimedOut && syncTimeout) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout) - logger.info(`[${requestId}] Non-SSE execution timed out`, { timeoutMs: syncTimeout }) + if ( + result.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Non-SSE execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) await loggingSession.markAsFailed(timeoutErrorMessage) await cleanupExecutionBase64Cache(executionId) @@ -535,7 +537,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: { status: 500 } ) } finally { - if (timeoutId) clearTimeout(timeoutId) + timeoutController.cleanup() } } @@ -578,18 +580,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } const encoder = new TextEncoder() - const abortController = new AbortController() + const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync) let isStreamClosed = false - let isTimedOut = false - - const syncTimeout = preprocessResult.executionTimeout?.sync - let timeoutId: NodeJS.Timeout | undefined - if (syncTimeout) { - timeoutId = setTimeout(() => { - isTimedOut = true - abortController.abort() - }, syncTimeout) - } const stream = new ReadableStream({ async start(controller) { @@ -780,7 +772,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: onStream, }, loggingSession, - abortSignal: abortController.signal, + abortSignal: timeoutController.signal, includeFileBase64, base64MaxBytes, stopAfterBlockId, @@ -816,9 +808,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } if (result.status === 'cancelled') { - if (isTimedOut && syncTimeout) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout) - logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: syncTimeout }) + if (timeoutController.isTimedOut() && timeoutController.timeoutMs) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Workflow execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) await loggingSession.markAsFailed(timeoutErrorMessage) @@ -871,9 +865,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: // Cleanup base64 cache for this execution await cleanupExecutionBase64Cache(executionId) } catch (error: unknown) { - const isTimeout = isTimeoutError(error) || isTimedOut + const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut() const errorMessage = isTimeout - ? getTimeoutErrorMessage(error, syncTimeout) + ? getTimeoutErrorMessage(error, timeoutController.timeoutMs) : error instanceof Error ? error.message : 'Unknown error' @@ -899,7 +893,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, }) } finally { - if (timeoutId) clearTimeout(timeoutId) + timeoutController.cleanup() if (!isStreamClosed) { try { controller.enqueue(encoder.encode('data: [DONE]\n\n')) @@ -910,9 +904,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, cancel() { isStreamClosed = true - if (timeoutId) clearTimeout(timeoutId) + timeoutController.cleanup() logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`) - abortController.abort() + timeoutController.abort() markExecutionCancelled(executionId).catch(() => {}) }, }) diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index c43bb6f53..54fb95420 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -4,7 +4,7 @@ import { task } from '@trigger.dev/sdk' import { Cron } from 'croner' import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' -import { getTimeoutErrorMessage } from '@/lib/core/execution-limits' +import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -184,16 +184,7 @@ async function runWorkflowExecution({ [] ) - const abortController = new AbortController() - let isTimedOut = false - let timeoutId: NodeJS.Timeout | undefined - - if (asyncTimeout) { - timeoutId = setTimeout(() => { - isTimedOut = true - abortController.abort() - }, asyncTimeout) - } + const timeoutController = createTimeoutAbortController(asyncTimeout) let executionResult try { @@ -203,16 +194,20 @@ async function runWorkflowExecution({ loggingSession, includeFileBase64: true, base64MaxBytes: undefined, - abortSignal: abortController.signal, + abortSignal: timeoutController.signal, }) } finally { - if (timeoutId) clearTimeout(timeoutId) + timeoutController.cleanup() } - if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) + if ( + executionResult.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) logger.info(`[${requestId}] Scheduled workflow execution timed out`, { - timeoutMs: asyncTimeout, + timeoutMs: timeoutController.timeoutMs, }) await loggingSession.markAsFailed(timeoutErrorMessage) } else if (executionResult.status === 'paused') { diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index 30403a3d8..4c4b2e4d8 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -5,7 +5,11 @@ import { task } from '@trigger.dev/sdk' import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' import { getHighestPrioritySubscription } from '@/lib/billing' -import { getExecutionTimeout, getTimeoutErrorMessage } from '@/lib/core/execution-limits' +import { + createTimeoutAbortController, + getExecutionTimeout, + getTimeoutErrorMessage, +} from '@/lib/core/execution-limits' import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency' import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' import { processExecutionFiles } from '@/lib/execution/files' @@ -142,16 +146,7 @@ async function executeWebhookJobInternal( userSubscription?.plan as SubscriptionPlan | undefined, 'async' ) - const abortController = new AbortController() - let isTimedOut = false - let timeoutId: NodeJS.Timeout | undefined - - if (asyncTimeout) { - timeoutId = setTimeout(() => { - isTimedOut = true - abortController.abort() - }, asyncTimeout) - } + const timeoutController = createTimeoutAbortController(asyncTimeout) let deploymentVersionId: string | undefined @@ -261,13 +256,17 @@ async function executeWebhookJobInternal( loggingSession, includeFileBase64: true, base64MaxBytes: undefined, - abortSignal: abortController.signal, + abortSignal: timeoutController.signal, }) - if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) + if ( + executionResult.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) logger.info(`[${requestId}] Airtable webhook execution timed out`, { - timeoutMs: asyncTimeout, + timeoutMs: timeoutController.timeoutMs, }) await loggingSession.markAsFailed(timeoutErrorMessage) } else if (executionResult.status === 'paused') { @@ -522,12 +521,18 @@ async function executeWebhookJobInternal( callbacks: {}, loggingSession, includeFileBase64: true, - abortSignal: abortController.signal, + abortSignal: timeoutController.signal, }) - if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) - logger.info(`[${requestId}] Webhook execution timed out`, { timeoutMs: asyncTimeout }) + if ( + executionResult.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Webhook execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) await loggingSession.markAsFailed(timeoutErrorMessage) } else if (executionResult.status === 'paused') { if (!executionResult.snapshotSeed) { @@ -632,7 +637,7 @@ async function executeWebhookJobInternal( throw error } finally { - if (timeoutId) clearTimeout(timeoutId) + timeoutController.cleanup() } } diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 47d2fcb64..818480fb6 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -1,7 +1,7 @@ import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { v4 as uuidv4 } from 'uuid' -import { getTimeoutErrorMessage } from '@/lib/core/execution-limits' +import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -104,17 +104,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { [] ) - const asyncTimeout = preprocessResult.executionTimeout?.async - const abortController = new AbortController() - let isTimedOut = false - let timeoutId: NodeJS.Timeout | undefined - - if (asyncTimeout) { - timeoutId = setTimeout(() => { - isTimedOut = true - abortController.abort() - }, asyncTimeout) - } + const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.async) let result try { @@ -124,15 +114,21 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { loggingSession, includeFileBase64: true, base64MaxBytes: undefined, - abortSignal: abortController.signal, + abortSignal: timeoutController.signal, }) } finally { - if (timeoutId) clearTimeout(timeoutId) + timeoutController.cleanup() } - if (result.status === 'cancelled' && isTimedOut && asyncTimeout) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) - logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: asyncTimeout }) + if ( + result.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Workflow execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) await loggingSession.markAsFailed(timeoutErrorMessage) } else if (result.status === 'paused') { if (!result.snapshotSeed) { diff --git a/apps/sim/lib/core/execution-limits/types.ts b/apps/sim/lib/core/execution-limits/types.ts index ea4b31b86..06df5966f 100644 --- a/apps/sim/lib/core/execution-limits/types.ts +++ b/apps/sim/lib/core/execution-limits/types.ts @@ -92,3 +92,43 @@ export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): stri return 'Execution timed out' } + +/** + * Helper to create an AbortController with timeout handling. + * Centralizes the timeout abort pattern used across execution paths. + */ +export interface TimeoutAbortController { + /** The AbortSignal to pass to execution functions */ + signal: AbortSignal + /** Returns true if the abort was triggered by timeout (not user cancellation) */ + isTimedOut: () => boolean + /** Cleanup function - call in finally block to clear the timeout */ + cleanup: () => void + /** Manually abort the execution (for user cancellation) */ + abort: () => void + /** The timeout duration in milliseconds (undefined if no timeout) */ + timeoutMs: number | undefined +} + +export function createTimeoutAbortController(timeoutMs?: number): TimeoutAbortController { + const abortController = new AbortController() + let isTimedOut = false + let timeoutId: NodeJS.Timeout | undefined + + if (timeoutMs) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, timeoutMs) + } + + return { + signal: abortController.signal, + isTimedOut: () => isTimedOut, + cleanup: () => { + if (timeoutId) clearTimeout(timeoutId) + }, + abort: () => abortController.abort(), + timeoutMs, + } +} diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 2a3113fbe..ee176ad9b 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -4,7 +4,7 @@ import { pausedExecutions, resumeQueue, workflowExecutionLogs } from '@sim/db/sc import { createLogger } from '@sim/logger' import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm' import type { Edge } from 'reactflow' -import { getTimeoutErrorMessage } from '@/lib/core/execution-limits' +import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' @@ -772,17 +772,9 @@ export class PauseResumeManager { actorUserId: metadata.userId, }) - const asyncTimeout = preprocessingResult.executionTimeout?.async - const abortController = new AbortController() - let isTimedOut = false - let timeoutId: NodeJS.Timeout | undefined - - if (asyncTimeout) { - timeoutId = setTimeout(() => { - isTimedOut = true - abortController.abort() - }, asyncTimeout) - } + const timeoutController = createTimeoutAbortController( + preprocessingResult.executionTimeout?.async + ) let result: ExecutionResult try { @@ -793,17 +785,21 @@ export class PauseResumeManager { skipLogCreation: true, // Reuse existing log entry includeFileBase64: true, // Enable base64 hydration base64MaxBytes: undefined, // Use default limit - abortSignal: abortController.signal, + abortSignal: timeoutController.signal, }) } finally { - if (timeoutId) clearTimeout(timeoutId) + timeoutController.cleanup() } - if (result.status === 'cancelled' && isTimedOut && asyncTimeout) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) + if ( + result.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) logger.info('Resume execution timed out', { resumeExecutionId, - timeoutMs: asyncTimeout, + timeoutMs: timeoutController.timeoutMs, }) await loggingSession.markAsFailed(timeoutErrorMessage) } diff --git a/apps/sim/lib/workflows/streaming/streaming.ts b/apps/sim/lib/workflows/streaming/streaming.ts index 65b0e67ac..f290d3e99 100644 --- a/apps/sim/lib/workflows/streaming/streaming.ts +++ b/apps/sim/lib/workflows/streaming/streaming.ts @@ -1,5 +1,5 @@ import { createLogger } from '@sim/logger' -import { getTimeoutErrorMessage } from '@/lib/core/execution-limits' +import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { extractBlockIdFromOutputId, extractPathFromOutputId, @@ -270,17 +270,7 @@ export async function createStreamingResponse( } } - const timeoutMs = streamConfig.timeoutMs - const abortController = new AbortController() - let isTimedOut = false - let timeoutId: NodeJS.Timeout | undefined - - if (timeoutMs) { - timeoutId = setTimeout(() => { - isTimedOut = true - abortController.abort() - }, timeoutMs) - } + const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs) try { const result = await executeWorkflow( @@ -298,7 +288,7 @@ export async function createStreamingResponse( skipLoggingComplete: true, includeFileBase64: streamConfig.includeFileBase64, base64MaxBytes: streamConfig.base64MaxBytes, - abortSignal: abortController.signal, + abortSignal: timeoutController.signal, }, executionId ) @@ -308,9 +298,15 @@ export async function createStreamingResponse( processStreamingBlockLogs(result.logs, state.streamedContent) } - if (result.status === 'cancelled' && isTimedOut && timeoutMs) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutMs) - logger.info(`[${requestId}] Streaming execution timed out`, { timeoutMs }) + if ( + result.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Streaming execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) if (result._streamingMetadata?.loggingSession) { await result._streamingMetadata.loggingSession.markAsFailed(timeoutErrorMessage) } @@ -349,7 +345,7 @@ export async function createStreamingResponse( controller.close() } finally { - if (timeoutId) clearTimeout(timeoutId) + timeoutController.cleanup() } }, })