From 424b6e6a6107d1d268aeb210ccfdde89cfb80e66 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 3 Feb 2026 18:33:16 -0800 Subject: [PATCH] add hitl and workflow handler --- .../handlers/workflow/workflow-handler.ts | 1 + .../executor/human-in-the-loop-manager.ts | 46 +++++++++++++++---- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/apps/sim/executor/handlers/workflow/workflow-handler.ts b/apps/sim/executor/handlers/workflow/workflow-handler.ts index 4d0c4d143..8f23bbbb6 100644 --- a/apps/sim/executor/handlers/workflow/workflow-handler.ts +++ b/apps/sim/executor/handlers/workflow/workflow-handler.ts @@ -126,6 +126,7 @@ export class WorkflowBlockHandler implements BlockHandler { workspaceId: ctx.workspaceId, userId: ctx.userId, executionId: ctx.executionId, + abortSignal: ctx.abortSignal, }, }) diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 479ead99a..2a3113fbe 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -4,6 +4,7 @@ import { pausedExecutions, resumeQueue, workflowExecutionLogs } from '@sim/db/sc import { createLogger } from '@sim/logger' import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm' import type { Edge } from 'reactflow' +import { getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' @@ -771,14 +772,43 @@ export class PauseResumeManager { actorUserId: metadata.userId, }) - return await executeWorkflowCore({ - snapshot: resumeSnapshot, - callbacks: {}, - loggingSession, - skipLogCreation: true, // Reuse existing log entry - includeFileBase64: true, // Enable base64 hydration - base64MaxBytes: undefined, // Use default limit - }) + const asyncTimeout = preprocessingResult.executionTimeout?.async + const abortController = new AbortController() + let isTimedOut = false + let timeoutId: NodeJS.Timeout | undefined + + if (asyncTimeout) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, asyncTimeout) + } + + let result: ExecutionResult + try { + result = await executeWorkflowCore({ + snapshot: resumeSnapshot, + callbacks: {}, + loggingSession, + skipLogCreation: true, // Reuse existing log entry + includeFileBase64: true, // Enable base64 hydration + base64MaxBytes: undefined, // Use default limit + abortSignal: abortController.signal, + }) + } finally { + if (timeoutId) clearTimeout(timeoutId) + } + + if (result.status === 'cancelled' && isTimedOut && asyncTimeout) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) + logger.info('Resume execution timed out', { + resumeExecutionId, + timeoutMs: asyncTimeout, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } + + return result } private static async markResumeCompleted(args: {