fix(execution): ensure background tasks await post-execution DB status updates (#3466)

The fire-and-forget IIFE in execution-core.ts for post-execution logging could be abandoned when trigger.dev tasks exit, leaving executions permanently stuck in "running" status. Store the promise on LoggingSession so background tasks can optionally await it before returning.
This commit is contained in:
Waleed
2026-03-07 21:09:31 -08:00
committed by GitHub
parent 4b7a9b20c4
commit 0b42e26f10
5 changed files with 86 additions and 61 deletions

View File

@@ -238,6 +238,8 @@ async function runWorkflowExecution({
await PauseResumeManager.processQueuedResumes(executionId)
}
await loggingSession.waitForPostExecution()
logger.info(`[${requestId}] Workflow execution completed: ${payload.workflowId}`, {
success: executionResult.success,
executionTime: executionResult.metadata?.duration,

View File

@@ -312,6 +312,8 @@ async function executeWebhookJobInternal(
await PauseResumeManager.processQueuedResumes(executionId)
}
await loggingSession.waitForPostExecution()
logger.info(`[${requestId}] Airtable webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
@@ -585,6 +587,8 @@ async function executeWebhookJobInternal(
await PauseResumeManager.processQueuedResumes(executionId)
}
await loggingSession.waitForPostExecution()
logger.info(`[${requestId}] Webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,

View File

@@ -158,6 +158,8 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
await PauseResumeManager.processQueuedResumes(executionId)
}
await loggingSession.waitForPostExecution()
logger.info(`[${requestId}] Workflow execution completed: ${workflowId}`, {
success: result.success,
executionTime: result.metadata?.duration,

View File

@@ -101,6 +101,7 @@ export class LoggingSession {
models: {},
}
private costFlushed = false
private postExecutionPromise: Promise<void> | null = null
constructor(
workflowId: string,
@@ -688,6 +689,20 @@ export class LoggingSession {
}
}
setPostExecutionPromise(promise: Promise<void>): void {
this.postExecutionPromise = promise
}
async waitForPostExecution(): Promise<void> {
if (this.postExecutionPromise) {
try {
await this.postExecutionPromise
} catch {
/* already handled inside the IIFE */
}
}
}
async safeComplete(params: SessionCompleteParams = {}): Promise<void> {
if (this.completionPromise) return this.completionPromise
this.completionPromise = this._safeCompleteImpl(params)

View File

@@ -360,48 +360,49 @@ export async function executeWorkflowCore(
)) as ExecutionResult)
: ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult)
// Fire-and-forget: post-execution logging, billing, and cleanup
void (async () => {
try {
const { traceSpans, totalDuration } = buildTraceSpans(result)
loggingSession.setPostExecutionPromise(
(async () => {
try {
const { traceSpans, totalDuration } = buildTraceSpans(result)
if (result.success && result.status !== 'paused') {
try {
await updateWorkflowRunCounts(workflowId)
} catch (runCountError) {
logger.error(`[${requestId}] Failed to update run counts`, { error: runCountError })
if (result.success && result.status !== 'paused') {
try {
await updateWorkflowRunCounts(workflowId)
} catch (runCountError) {
logger.error(`[${requestId}] Failed to update run counts`, { error: runCountError })
}
}
}
if (result.status === 'cancelled') {
await loggingSession.safeCompleteWithCancellation({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
})
} else if (result.status === 'paused') {
await loggingSession.safeCompleteWithPause({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
workflowInput: processedInput,
})
} else {
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: result.output || {},
traceSpans: traceSpans || [],
workflowInput: processedInput,
executionState: result.executionState,
})
}
if (result.status === 'cancelled') {
await loggingSession.safeCompleteWithCancellation({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
})
} else if (result.status === 'paused') {
await loggingSession.safeCompleteWithPause({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
workflowInput: processedInput,
})
} else {
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: result.output || {},
traceSpans: traceSpans || [],
workflowInput: processedInput,
executionState: result.executionState,
})
}
await clearExecutionCancellation(executionId)
} catch (postExecError) {
logger.error(`[${requestId}] Post-execution logging failed`, { error: postExecError })
}
})()
await clearExecutionCancellation(executionId)
} catch (postExecError) {
logger.error(`[${requestId}] Post-execution logging failed`, { error: postExecError })
}
})()
)
logger.info(`[${requestId}] Workflow execution completed`, {
success: result.success,
@@ -413,31 +414,32 @@ export async function executeWorkflowCore(
} catch (error: unknown) {
logger.error(`[${requestId}] Execution failed:`, error)
// Fire-and-forget: error logging and cleanup
void (async () => {
try {
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult
? buildTraceSpans(executionResult)
: { traceSpans: [] }
loggingSession.setPostExecutionPromise(
(async () => {
try {
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult
? buildTraceSpans(executionResult)
: { traceSpans: [] }
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: executionResult?.metadata?.duration || 0,
error: {
message: error instanceof Error ? error.message : 'Execution failed',
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: executionResult?.metadata?.duration || 0,
error: {
message: error instanceof Error ? error.message : 'Execution failed',
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})
await clearExecutionCancellation(executionId)
} catch (postExecError) {
logger.error(`[${requestId}] Post-execution error logging failed`, {
error: postExecError,
})
}
})()
await clearExecutionCancellation(executionId)
} catch (postExecError) {
logger.error(`[${requestId}] Post-execution error logging failed`, {
error: postExecError,
})
}
})()
)
throw error
}