consolidate

This commit is contained in:
Vikhyath Mondreti
2026-02-03 18:51:57 -08:00
parent 39d75892a3
commit ee06ee34f6
7 changed files with 146 additions and 124 deletions

View File

@@ -5,7 +5,11 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
import {
createTimeoutAbortController,
getTimeoutErrorMessage,
isTimeoutError,
} from '@/lib/core/execution-limits'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
@@ -402,17 +406,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
if (!enableSSE) {
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
const syncTimeout = preprocessResult.executionTimeout?.sync
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (syncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, syncTimeout)
}
const timeoutController = createTimeoutAbortController(
preprocessResult.executionTimeout?.sync
)
try {
const metadata: ExecutionMetadata = {
@@ -447,12 +443,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
abortSignal: abortController.signal,
abortSignal: timeoutController.signal,
})
if (result.status === 'cancelled' && isTimedOut && syncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
logger.info(`[${requestId}] Non-SSE execution timed out`, { timeoutMs: syncTimeout })
if (
result.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info(`[${requestId}] Non-SSE execution timed out`, {
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
await cleanupExecutionBase64Cache(executionId)
@@ -535,7 +537,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
{ status: 500 }
)
} finally {
if (timeoutId) clearTimeout(timeoutId)
timeoutController.cleanup()
}
}
@@ -578,18 +580,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
const encoder = new TextEncoder()
const abortController = new AbortController()
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
let isStreamClosed = false
let isTimedOut = false
const syncTimeout = preprocessResult.executionTimeout?.sync
let timeoutId: NodeJS.Timeout | undefined
if (syncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, syncTimeout)
}
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
@@ -780,7 +772,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
onStream,
},
loggingSession,
abortSignal: abortController.signal,
abortSignal: timeoutController.signal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
@@ -816,9 +808,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
if (result.status === 'cancelled') {
if (isTimedOut && syncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: syncTimeout })
if (timeoutController.isTimedOut() && timeoutController.timeoutMs) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info(`[${requestId}] Workflow execution timed out`, {
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
@@ -871,9 +865,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
// Cleanup base64 cache for this execution
await cleanupExecutionBase64Cache(executionId)
} catch (error: unknown) {
const isTimeout = isTimeoutError(error) || isTimedOut
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()
const errorMessage = isTimeout
? getTimeoutErrorMessage(error, syncTimeout)
? getTimeoutErrorMessage(error, timeoutController.timeoutMs)
: error instanceof Error
? error.message
: 'Unknown error'
@@ -899,7 +893,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
})
} finally {
if (timeoutId) clearTimeout(timeoutId)
timeoutController.cleanup()
if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
@@ -910,9 +904,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
cancel() {
isStreamClosed = true
if (timeoutId) clearTimeout(timeoutId)
timeoutController.cleanup()
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
abortController.abort()
timeoutController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})

View File

@@ -4,7 +4,7 @@ import { task } from '@trigger.dev/sdk'
import { Cron } from 'croner'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -184,16 +184,7 @@ async function runWorkflowExecution({
[]
)
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (asyncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, asyncTimeout)
}
const timeoutController = createTimeoutAbortController(asyncTimeout)
let executionResult
try {
@@ -203,16 +194,20 @@ async function runWorkflowExecution({
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
abortSignal: abortController.signal,
abortSignal: timeoutController.signal,
})
} finally {
if (timeoutId) clearTimeout(timeoutId)
timeoutController.cleanup()
}
if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
if (
executionResult.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info(`[${requestId}] Scheduled workflow execution timed out`, {
timeoutMs: asyncTimeout,
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {

View File

@@ -5,7 +5,11 @@ import { task } from '@trigger.dev/sdk'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { getHighestPrioritySubscription } from '@/lib/billing'
import { getExecutionTimeout, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import {
createTimeoutAbortController,
getExecutionTimeout,
getTimeoutErrorMessage,
} from '@/lib/core/execution-limits'
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
import { processExecutionFiles } from '@/lib/execution/files'
@@ -142,16 +146,7 @@ async function executeWebhookJobInternal(
userSubscription?.plan as SubscriptionPlan | undefined,
'async'
)
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (asyncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, asyncTimeout)
}
const timeoutController = createTimeoutAbortController(asyncTimeout)
let deploymentVersionId: string | undefined
@@ -261,13 +256,17 @@ async function executeWebhookJobInternal(
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
abortSignal: abortController.signal,
abortSignal: timeoutController.signal,
})
if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
if (
executionResult.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info(`[${requestId}] Airtable webhook execution timed out`, {
timeoutMs: asyncTimeout,
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
@@ -522,12 +521,18 @@ async function executeWebhookJobInternal(
callbacks: {},
loggingSession,
includeFileBase64: true,
abortSignal: abortController.signal,
abortSignal: timeoutController.signal,
})
if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
logger.info(`[${requestId}] Webhook execution timed out`, { timeoutMs: asyncTimeout })
if (
executionResult.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info(`[${requestId}] Webhook execution timed out`, {
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
@@ -632,7 +637,7 @@ async function executeWebhookJobInternal(
throw error
} finally {
if (timeoutId) clearTimeout(timeoutId)
timeoutController.cleanup()
}
}

View File

@@ -1,7 +1,7 @@
import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { v4 as uuidv4 } from 'uuid'
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -104,17 +104,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
[]
)
const asyncTimeout = preprocessResult.executionTimeout?.async
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (asyncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, asyncTimeout)
}
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.async)
let result
try {
@@ -124,15 +114,21 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
abortSignal: abortController.signal,
abortSignal: timeoutController.signal,
})
} finally {
if (timeoutId) clearTimeout(timeoutId)
timeoutController.cleanup()
}
if (result.status === 'cancelled' && isTimedOut && asyncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: asyncTimeout })
if (
result.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info(`[${requestId}] Workflow execution timed out`, {
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (result.status === 'paused') {
if (!result.snapshotSeed) {

View File

@@ -92,3 +92,43 @@ export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): stri
return 'Execution timed out'
}
/**
* Helper to create an AbortController with timeout handling.
* Centralizes the timeout abort pattern used across execution paths.
*/
export interface TimeoutAbortController {
/** The AbortSignal to pass to execution functions */
signal: AbortSignal
/** Returns true if the abort was triggered by timeout (not user cancellation) */
isTimedOut: () => boolean
/** Cleanup function - call in finally block to clear the timeout */
cleanup: () => void
/** Manually abort the execution (for user cancellation) */
abort: () => void
/** The timeout duration in milliseconds (undefined if no timeout) */
timeoutMs: number | undefined
}
export function createTimeoutAbortController(timeoutMs?: number): TimeoutAbortController {
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (timeoutMs) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, timeoutMs)
}
return {
signal: abortController.signal,
isTimedOut: () => isTimedOut,
cleanup: () => {
if (timeoutId) clearTimeout(timeoutId)
},
abort: () => abortController.abort(),
timeoutMs,
}
}

View File

@@ -4,7 +4,7 @@ import { pausedExecutions, resumeQueue, workflowExecutionLogs } from '@sim/db/sc
import { createLogger } from '@sim/logger'
import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm'
import type { Edge } from 'reactflow'
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
@@ -772,17 +772,9 @@ export class PauseResumeManager {
actorUserId: metadata.userId,
})
const asyncTimeout = preprocessingResult.executionTimeout?.async
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (asyncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, asyncTimeout)
}
const timeoutController = createTimeoutAbortController(
preprocessingResult.executionTimeout?.async
)
let result: ExecutionResult
try {
@@ -793,17 +785,21 @@ export class PauseResumeManager {
skipLogCreation: true, // Reuse existing log entry
includeFileBase64: true, // Enable base64 hydration
base64MaxBytes: undefined, // Use default limit
abortSignal: abortController.signal,
abortSignal: timeoutController.signal,
})
} finally {
if (timeoutId) clearTimeout(timeoutId)
timeoutController.cleanup()
}
if (result.status === 'cancelled' && isTimedOut && asyncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
if (
result.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info('Resume execution timed out', {
resumeExecutionId,
timeoutMs: asyncTimeout,
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
}

View File

@@ -1,5 +1,5 @@
import { createLogger } from '@sim/logger'
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import {
extractBlockIdFromOutputId,
extractPathFromOutputId,
@@ -270,17 +270,7 @@ export async function createStreamingResponse(
}
}
const timeoutMs = streamConfig.timeoutMs
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (timeoutMs) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, timeoutMs)
}
const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs)
try {
const result = await executeWorkflow(
@@ -298,7 +288,7 @@ export async function createStreamingResponse(
skipLoggingComplete: true,
includeFileBase64: streamConfig.includeFileBase64,
base64MaxBytes: streamConfig.base64MaxBytes,
abortSignal: abortController.signal,
abortSignal: timeoutController.signal,
},
executionId
)
@@ -308,9 +298,15 @@ export async function createStreamingResponse(
processStreamingBlockLogs(result.logs, state.streamedContent)
}
if (result.status === 'cancelled' && isTimedOut && timeoutMs) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutMs)
logger.info(`[${requestId}] Streaming execution timed out`, { timeoutMs })
if (
result.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info(`[${requestId}] Streaming execution timed out`, {
timeoutMs: timeoutController.timeoutMs,
})
if (result._streamingMetadata?.loggingSession) {
await result._streamingMetadata.loggingSession.markAsFailed(timeoutErrorMessage)
}
@@ -349,7 +345,7 @@ export async function createStreamingResponse(
controller.close()
} finally {
if (timeoutId) clearTimeout(timeoutId)
timeoutController.cleanup()
}
},
})