fix(hitl): fix stream endpoint, pause persistence, and resume page (#3995)

* Fix hitl stream

* fix hitl pause persistence

* Fix /stream endpoint allowing api key usage

* resume page cleanup

* fix type

* make resume sync

* fix types

* address bugbot comments

---------

Co-authored-by: Siddharth Ganesan <siddharthganesan@gmail.com>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: Theodore Li <teddy@zenobiapay.com>
This commit is contained in:
Waleed
2026-04-06 16:19:42 -07:00
committed by GitHub
parent 7e0794c9a0
commit 58571fe73d
15 changed files with 552 additions and 228 deletions

View File

@@ -93,17 +93,36 @@ Access resume data in downstream blocks using `<blockId.resumeInput.fieldName>`.
<Tab>
### REST API
Programmatically resume workflows:
Programmatically resume workflows using the resume endpoint. The `contextId` is available from the block's `resumeEndpoint` output or from the paused execution detail.
```bash
POST /api/workflows/{workflowId}/executions/{executionId}/resume/{blockId}
POST /api/resume/{workflowId}/{executionId}/{contextId}
Content-Type: application/json
{
"approved": true,
"comments": "Looks good to proceed"
"input": {
"approved": true,
"comments": "Looks good to proceed"
}
}
```
The response includes a new `executionId` for the resumed execution:
```json
{
"status": "started",
"executionId": "<resumeExecutionId>",
"message": "Resume execution started."
}
```
To poll execution progress after resuming, connect to the SSE stream:
```bash
GET /api/workflows/{workflowId}/executions/{resumeExecutionId}/stream
```
Build custom approval UIs or integrate with existing systems.
</Tab>
<Tab>

View File

@@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { AuthType } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { generateId } from '@/lib/core/utils/uuid'
import { setExecutionMeta } from '@/lib/execution/event-buffer'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
@@ -125,14 +126,43 @@ export async function POST(
})
}
PauseResumeManager.startResumeExecution({
await setExecutionMeta(enqueueResult.resumeExecutionId, {
status: 'active',
userId,
workflowId,
})
const resumeArgs = {
resumeEntryId: enqueueResult.resumeEntryId,
resumeExecutionId: enqueueResult.resumeExecutionId,
pausedExecution: enqueueResult.pausedExecution,
contextId: enqueueResult.contextId,
resumeInput: enqueueResult.resumeInput,
userId: enqueueResult.userId,
}).catch((error) => {
}
const isApiCaller = access.auth?.authType === AuthType.API_KEY
if (isApiCaller) {
const result = await PauseResumeManager.startResumeExecution(resumeArgs)
return NextResponse.json({
success: result.success,
status: result.status ?? (result.success ? 'completed' : 'failed'),
executionId: enqueueResult.resumeExecutionId,
output: result.output,
error: result.error,
metadata: result.metadata
? {
duration: result.metadata.duration,
startTime: result.metadata.startTime,
endTime: result.metadata.endTime,
}
: undefined,
})
}
PauseResumeManager.startResumeExecution(resumeArgs).catch((error) => {
logger.error('Failed to start resume execution', {
workflowId,
parentExecutionId: executionId,

View File

@@ -41,7 +41,7 @@ import {
} from '@/lib/uploads/utils/user-file-base64.server'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
import {
DIRECT_WORKFLOW_JOB_NAME,
type QueuedWorkflowExecutionPayload,
@@ -903,6 +903,8 @@ async function handleExecutePost(
abortSignal: timeoutController.signal,
})
await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession })
if (
result.status === 'cancelled' &&
timeoutController.isTimedOut() &&
@@ -1359,31 +1361,7 @@ async function handleExecutePost(
runFromBlock: resolvedRunFromBlock,
})
if (result.status === 'paused') {
if (!result.snapshotSeed) {
reqLogger.error('Missing snapshot seed for paused execution')
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
try {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
reqLogger.error('Failed to persist pause result', {
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
}
await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession })
if (result.status === 'cancelled') {
if (timeoutController.isTimedOut() && timeoutController.timeoutMs) {
@@ -1422,25 +1400,42 @@ async function handleExecutePost(
return
}
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: includeFileBase64
? await hydrateUserFilesWithBase64(result.output, {
requestId,
executionId,
maxBytes: base64MaxBytes,
})
: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
const sseOutput = includeFileBase64
? await hydrateUserFilesWithBase64(result.output, {
requestId,
executionId,
maxBytes: base64MaxBytes,
})
: result.output
if (result.status === 'paused') {
sendEvent({
type: 'execution:paused',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
output: sseOutput,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
} else {
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: sseOutput,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
}
finalMetaStatus = 'complete'
} catch (error: unknown) {
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()

View File

@@ -1,6 +1,6 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getSession } from '@/lib/auth'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import {
type ExecutionStreamStatus,
@@ -29,14 +29,14 @@ export async function GET(
const { id: workflowId, executionId } = await params
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId: auth.userId,
userId: session.user.id,
action: 'read',
})
if (!workflowAuthorization.allowed) {
@@ -46,16 +46,6 @@ export async function GET(
)
}
if (
auth.apiKeyType === 'workspace' &&
workflowAuthorization.workflow?.workspaceId !== auth.workspaceId
) {
return NextResponse.json(
{ error: 'API key is not authorized for this workspace' },
{ status: 403 }
)
}
const meta = await getExecutionMeta(executionId)
if (!meta) {
return NextResponse.json({ error: 'Execution buffer not found or expired' }, { status: 404 })

View File

@@ -6,6 +6,7 @@ import { useRouter } from 'next/navigation'
import {
Badge,
Button,
Code,
Input,
Label,
Table,
@@ -155,7 +156,6 @@ function getBlockNameFromSnapshot(
const parsed = JSON.parse(executionSnapshot.snapshot)
const workflowState = parsed?.workflow
if (!workflowState?.blocks || !Array.isArray(workflowState.blocks)) return null
// Blocks are stored as an array of serialized blocks with id and metadata.name
const block = workflowState.blocks.find((b: { id: string }) => b.id === blockId)
return block?.metadata?.name || null
} catch {
@@ -163,6 +163,47 @@ function getBlockNameFromSnapshot(
}
}
function renderStructuredValuePreview(value: unknown) {
if (value === null || value === undefined) {
return <span style={{ fontSize: '12px', color: 'var(--text-muted)' }}></span>
}
if (typeof value === 'object') {
return (
<div style={{ minWidth: '220px' }}>
<Code.Viewer
code={JSON.stringify(value, null, 2)}
language='json'
wrapText
className='max-h-[220px]'
/>
</div>
)
}
const stringValue = String(value)
return (
<div
style={{
display: 'inline-flex',
maxWidth: '100%',
borderRadius: '6px',
border: '1px solid var(--border)',
background: 'var(--surface-5)',
padding: '4px 8px',
whiteSpace: 'pre-wrap',
wordBreak: 'break-word',
fontFamily: 'var(--font-mono, monospace)',
fontSize: '12px',
lineHeight: '16px',
color: 'var(--text-primary)',
}}
>
{stringValue}
</div>
)
}
export default function ResumeExecutionPage({
params,
initialExecutionDetail,
@@ -874,8 +915,11 @@ export default function ResumeExecutionPage({
<Tooltip.Trigger asChild>
<Button
variant='outline'
size='sm'
onClick={refreshExecutionDetail}
disabled={refreshingExecution}
className='gap-1.5 px-2.5'
aria-label='Refresh execution details'
>
<RefreshCw
style={{
@@ -884,6 +928,7 @@ export default function ResumeExecutionPage({
animation: refreshingExecution ? 'spin 1s linear infinite' : undefined,
}}
/>
Refresh
</Button>
</Tooltip.Trigger>
<Tooltip.Content>Refresh</Tooltip.Content>
@@ -1123,11 +1168,7 @@ export default function ResumeExecutionPage({
<TableRow key={row.id}>
<TableCell>{row.name}</TableCell>
<TableCell>{row.type}</TableCell>
<TableCell>
<code style={{ fontSize: '12px' }}>
{formatStructureValue(row.value)}
</code>
</TableCell>
<TableCell>{renderStructuredValuePreview(row.value)}</TableCell>
</TableRow>
))}
</TableBody>
@@ -1243,6 +1284,8 @@ export default function ResumeExecutionPage({
}}
placeholder='{"example": "value"}'
rows={6}
spellCheck={false}
className='min-h-[180px] border-[var(--border-1)] bg-[var(--surface-3)] font-mono text-[12px] leading-5'
/>
</div>
</div>
@@ -1267,10 +1310,10 @@ export default function ResumeExecutionPage({
{/* Footer */}
<div
style={{
marginTop: '32px',
padding: '16px',
maxWidth: '1200px',
margin: '24px auto 0',
padding: '0 24px 24px',
textAlign: 'center',
borderTop: '1px solid var(--border)',
fontSize: '13px',
color: 'var(--text-muted)',
}}

View File

@@ -13,7 +13,7 @@ import {
executeWorkflowCore,
wasExecutionFinalizedByCore,
} from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
import {
blockExistsInDeployment,
loadDeployedWorkflowState,
@@ -237,33 +237,13 @@ async function runWorkflowExecution({
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
try {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
await handlePostExecutionPauseState({
result: executionResult,
workflowId: payload.workflowId,
executionId,
loggingSession,
})
}
await loggingSession.waitForPostExecution()

View File

@@ -16,7 +16,7 @@ import {
executeWorkflowCore,
wasExecutionFinalizedByCore,
} from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
import { getBlock } from '@/blocks'
@@ -205,33 +205,13 @@ async function handleExecutionResult(
timeoutMs: ctx.timeoutController.timeoutMs,
})
await ctx.loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${ctx.requestId}] Missing snapshot seed for paused execution`, {
executionId: ctx.executionId,
})
await ctx.loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
try {
await PauseResumeManager.persistPauseResult({
workflowId: ctx.workflowId,
executionId: ctx.executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${ctx.requestId}] Failed to persist pause result`, {
executionId: ctx.executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await ctx.loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(ctx.executionId)
await handlePostExecutionPauseState({
result: executionResult,
workflowId: ctx.workflowId,
executionId: ctx.executionId,
loggingSession: ctx.loggingSession,
})
}
await ctx.loggingSession.waitForPostExecution()

View File

@@ -10,7 +10,7 @@ import {
executeWorkflowCore,
wasExecutionFinalizedByCore,
} from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import { hasExecutionResult } from '@/executor/utils/errors'
@@ -148,33 +148,8 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (result.status === 'paused') {
if (!result.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
try {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession })
}
await loggingSession.waitForPostExecution()

View File

@@ -9,6 +9,7 @@ import type {
ExecutionCompletedData,
ExecutionErrorData,
ExecutionEvent,
ExecutionPausedData,
ExecutionStartedData,
StreamChunkData,
StreamDoneData,
@@ -74,6 +75,9 @@ export async function processSSEStream(
case 'execution:completed':
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:paused':
callbacks.onExecutionPaused?.(event.data)
break
case 'execution:error':
callbacks.onExecutionError?.(event.data)
break
@@ -114,6 +118,7 @@ export async function processSSEStream(
export interface ExecutionStreamCallbacks {
onExecutionStarted?: (data: ExecutionStartedData) => void
onExecutionCompleted?: (data: ExecutionCompletedData) => void
onExecutionPaused?: (data: ExecutionPausedData) => void
onExecutionError?: (data: ExecutionErrorData) => void
onExecutionCancelled?: (data: ExecutionCancelledData) => void
onBlockStarted?: (data: BlockStartedData) => void

View File

@@ -264,7 +264,7 @@ export interface ParsedSSEChunk {
/** Final success flag if this chunk contains the final event */
finalSuccess?: boolean
/** Terminal task state if known */
terminalState?: 'completed' | 'failed' | 'canceled'
terminalState?: 'completed' | 'failed' | 'canceled' | 'input-required'
/** Final artifacts if present on terminal event */
finalArtifacts?: Artifact[]
/** Whether this chunk indicates the stream is done */
@@ -326,6 +326,15 @@ export function parseWorkflowSSEChunk(chunk: string): ParsedSSEChunk {
result.finalSuccess = parsed.data?.success !== false
result.terminalState = result.finalSuccess ? 'completed' : 'failed'
result.isDone = true
} else if (parsed.type === 'execution:paused') {
if (parsed.data?.output?.content) {
result.finalContent = parsed.data.output.content
} else if (parsed.data?.output) {
result.finalContent = JSON.stringify(parsed.data.output)
}
result.finalSuccess = true
result.terminalState = 'input-required'
result.isDone = true
} else if (parsed.type === 'execution:cancelled') {
result.finalSuccess = false
result.terminalState = 'canceled'

View File

@@ -3,7 +3,7 @@ import { generateId } from '@/lib/core/utils/uuid'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { captureServerEvent } from '@/lib/posthog/server'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionResult, StreamingExecution } from '@/executor/types'
@@ -127,41 +127,7 @@ export async function executeWorkflow(
)
}
if (result.status === 'paused') {
if (!result.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
try {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
try {
await PauseResumeManager.processQueuedResumes(executionId)
} catch (resumeError) {
logger.error(`[${requestId}] Failed to process queued resumes`, {
executionId,
error: resumeError instanceof Error ? resumeError.message : String(resumeError),
})
}
}
await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession })
if (streamConfig?.skipLoggingComplete) {
return {

View File

@@ -8,6 +8,7 @@ import type { SubflowType } from '@/stores/workflows/workflow/types'
export type ExecutionEventType =
| 'execution:started'
| 'execution:completed'
| 'execution:paused'
| 'execution:error'
| 'execution:cancelled'
| 'block:started'
@@ -53,6 +54,20 @@ export interface ExecutionCompletedEvent extends BaseExecutionEvent {
}
}
/**
* Execution paused event (HITL block waiting for human input)
*/
export interface ExecutionPausedEvent extends BaseExecutionEvent {
type: 'execution:paused'
workflowId: string
data: {
output: any
duration: number
startTime: string
endTime: string
}
}
/**
* Execution error event
*/
@@ -196,6 +211,7 @@ export interface StreamDoneEvent extends BaseExecutionEvent {
export type ExecutionEvent =
| ExecutionStartedEvent
| ExecutionCompletedEvent
| ExecutionPausedEvent
| ExecutionErrorEvent
| ExecutionCancelledEvent
| BlockStartedEvent
@@ -207,6 +223,7 @@ export type ExecutionEvent =
export type ExecutionStartedData = ExecutionStartedEvent['data']
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
export type ExecutionPausedData = ExecutionPausedEvent['data']
export type ExecutionErrorData = ExecutionErrorEvent['data']
export type ExecutionCancelledData = ExecutionCancelledEvent['data']
export type BlockStartedData = BlockStartedEvent['data']

View File

@@ -5,11 +5,23 @@ import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm'
import type { Edge } from 'reactflow'
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { generateId } from '@/lib/core/utils/uuid'
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionResult, PausePoint, SerializedSnapshot } from '@/executor/types'
import type {
ChildWorkflowContext,
ExecutionCallbacks,
IterationContext,
} from '@/executor/execution/types'
import type {
ExecutionResult,
PausePoint,
SerializedSnapshot,
StreamingExecution,
} from '@/executor/types'
import { filterOutputForLog } from '@/executor/utils/output-filter'
import type { SerializedConnection } from '@/serializer/types'
@@ -280,7 +292,7 @@ export class PauseResumeManager {
})
}
static async startResumeExecution(args: StartResumeExecutionArgs): Promise<void> {
static async startResumeExecution(args: StartResumeExecutionArgs): Promise<ExecutionResult> {
const { resumeEntryId, resumeExecutionId, pausedExecution, contextId, resumeInput, userId } =
args
@@ -345,6 +357,8 @@ export class PauseResumeManager {
})
await PauseResumeManager.processQueuedResumes(pausedExecution.executionId)
return result
} catch (error) {
await PauseResumeManager.markResumeFailed({
resumeEntryId,
@@ -771,36 +785,273 @@ export class PauseResumeManager {
actorUserId: metadata.userId,
})
const workflowId = pausedExecution.workflowId
const eventWriter = createExecutionEventWriter(resumeExecutionId)
await setExecutionMeta(resumeExecutionId, {
status: 'active',
userId: metadata.userId,
workflowId,
})
let localEventSeq = 0
const writeBufferedEvent = (event: ExecutionEvent) => {
localEventSeq++
event.eventId = localEventSeq
eventWriter.write(event).catch(() => {})
}
writeBufferedEvent({
type: 'execution:started',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: { startTime: new Date().toISOString() },
} as ExecutionEvent)
const callbacks: ExecutionCallbacks = {
onBlockStart: async (
blockId: string,
blockName: string,
blockType: string,
executionOrder: number,
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => {
writeBufferedEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: {
blockId,
blockName,
blockType,
executionOrder,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
...(iterationContext.parentIterations?.length && {
parentIterations: iterationContext.parentIterations,
}),
}),
...(childWorkflowContext && {
childWorkflowBlockId: childWorkflowContext.parentBlockId,
childWorkflowName: childWorkflowContext.workflowName,
}),
},
} as ExecutionEvent)
},
onBlockComplete: async (
blockId: string,
blockName: string,
blockType: string,
callbackData: Record<string, unknown>,
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => {
const output = callbackData.output as Record<string, unknown> | undefined
const hasError = output?.error
const sharedData = {
blockId,
blockName,
blockType,
input: callbackData.input,
durationMs: (callbackData.executionTime as number) || 0,
startedAt: callbackData.startedAt,
executionOrder: callbackData.executionOrder,
endedAt: callbackData.endedAt,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
...(iterationContext.parentIterations?.length && {
parentIterations: iterationContext.parentIterations,
}),
}),
...(childWorkflowContext && {
childWorkflowBlockId: childWorkflowContext.parentBlockId,
childWorkflowName: childWorkflowContext.workflowName,
}),
...(callbackData.childWorkflowInstanceId
? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId }
: {}),
}
writeBufferedEvent({
type: hasError ? 'block:error' : 'block:completed',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: hasError ? { ...sharedData, error: output?.error } : { ...sharedData, output },
} as ExecutionEvent)
},
onChildWorkflowInstanceReady: (
blockId: string,
childWorkflowInstanceId: string,
iterationContext?: IterationContext,
executionOrder?: number
) => {
writeBufferedEvent({
type: 'block:childWorkflowStarted',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: {
blockId,
childWorkflowInstanceId,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationContainerId: iterationContext.iterationContainerId,
}),
...(executionOrder !== undefined && { executionOrder }),
},
} as ExecutionEvent)
},
onStream: async (streamingExec: StreamingExecution) => {
const blockId = (streamingExec.execution as unknown as Record<string, unknown>)
.blockId as string
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
writeBufferedEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: { blockId, chunk },
} as ExecutionEvent)
}
writeBufferedEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: { blockId },
} as ExecutionEvent)
} catch (streamError) {
logger.error('Error streaming block content during resume', {
resumeExecutionId,
blockId,
error: streamError instanceof Error ? streamError.message : String(streamError),
})
} finally {
try {
await reader.cancel().catch(() => {})
} catch {}
}
},
}
const timeoutController = createTimeoutAbortController(
preprocessingResult.executionTimeout?.async
)
let result: ExecutionResult
let finalMetaStatus: 'complete' | 'error' | 'cancelled' = 'complete'
try {
result = await executeWorkflowCore({
snapshot: resumeSnapshot,
callbacks: {},
callbacks,
loggingSession,
skipLogCreation: true, // Reuse existing log entry
includeFileBase64: true, // Enable base64 hydration
base64MaxBytes: undefined, // Use default limit
skipLogCreation: true,
includeFileBase64: true,
base64MaxBytes: undefined,
abortSignal: timeoutController.signal,
})
if (
result.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info('Resume execution timed out', {
resumeExecutionId,
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
writeBufferedEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: {
error: timeoutErrorMessage,
duration: result.metadata?.duration || 0,
},
} as ExecutionEvent)
finalMetaStatus = 'error'
} else if (result.status === 'cancelled') {
writeBufferedEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: { duration: result.metadata?.duration || 0 },
} as ExecutionEvent)
finalMetaStatus = 'cancelled'
} else if (result.status === 'paused') {
writeBufferedEvent({
type: 'execution:paused',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: {
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || new Date().toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
} as ExecutionEvent)
finalMetaStatus = 'complete'
} else {
writeBufferedEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: {
success: result.success,
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || new Date().toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
} as ExecutionEvent)
finalMetaStatus = 'complete'
}
} catch (execError) {
writeBufferedEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId: resumeExecutionId,
workflowId,
data: {
error: execError instanceof Error ? execError.message : String(execError),
duration: 0,
},
} as ExecutionEvent)
finalMetaStatus = 'error'
throw execError
} finally {
timeoutController.cleanup()
}
if (
result.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
logger.info('Resume execution timed out', {
resumeExecutionId,
timeoutMs: timeoutController.timeoutMs,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
try {
await eventWriter.close()
} catch (closeError) {
logger.warn('Failed to close event writer for resume', {
resumeExecutionId,
error: closeError instanceof Error ? closeError.message : String(closeError),
})
}
setExecutionMeta(resumeExecutionId, { status: finalMetaStatus }).catch(() => {})
}
return result

View File

@@ -0,0 +1,64 @@
import { createLogger } from '@sim/logger'
import type { LoggingSession } from '@/lib/logs/execution/logging-session'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import type { ExecutionResult } from '@/executor/types'
const logger = createLogger('PausePersistence')
interface HandlePostExecutionPauseStateArgs {
result: ExecutionResult
workflowId: string
executionId: string
loggingSession: LoggingSession
}
/**
* Handles pause persistence and resume queue processing after `executeWorkflowCore` returns.
*
* Every caller of `executeWorkflowCore` must call this after execution completes
* to ensure HITL pause state is persisted to the database and queued resumes are drained.
*
* - If execution is paused with a valid snapshot: persists to `paused_executions` table
* - If execution is paused without a snapshot: marks execution as failed
* - If execution is not paused: processes any queued resume entries
*/
export async function handlePostExecutionPauseState({
result,
workflowId,
executionId,
loggingSession,
}: HandlePostExecutionPauseStateArgs): Promise<void> {
if (result.status === 'paused') {
if (!result.snapshotSeed) {
logger.error('Missing snapshot seed for paused execution', { executionId })
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
try {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
logger.error('Failed to persist pause result', {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
try {
await PauseResumeManager.processQueuedResumes(executionId)
} catch (resumeError) {
logger.error('Failed to process queued resumes', {
executionId,
error: resumeError instanceof Error ? resumeError.message : String(resumeError),
})
}
}
}

View File

@@ -15,7 +15,7 @@ import {
createExecutionCallbacks,
type ExecutionEvent,
} from '@/lib/workflows/executor/execution-events'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import type { BlockLog, NormalizedBlockOutput } from '@/executor/types'
@@ -194,21 +194,7 @@ export async function executeQueuedWorkflowJob(
)
}
if (result.status === 'paused') {
if (!result.snapshotSeed) {
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
}
await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession })
const outputWithBase64 = payload.includeFileBase64
? await hydrateUserFilesWithBase64(result.output, {
@@ -230,6 +216,20 @@ export async function executeQueuedWorkflowJob(
},
})
await setExecutionMeta(executionId, { status: 'cancelled' })
} else if (result.status === 'paused') {
await eventWriter.write({
type: 'execution:paused',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
output: outputWithBase64,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || metadata.startTime,
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
await setExecutionMeta(executionId, { status: 'complete' })
} else {
await eventWriter.write({
type: 'execution:completed',