From 742d59f54df5ba237f0da240fb3c6adfb7a721b6 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com> Date: Thu, 6 Nov 2025 15:59:28 -0800 Subject: [PATCH] feat(hitl): add human in the loop block (#1832) * fix(billing): should allow restoring subscription (#1728) * fix(already-cancelled-sub): UI should allow restoring subscription * restore functionality fixed * fix * Add pause resume block * Add db schema * Initial test passes * Tests pass * Execution pauses * Snapshot serializer * Ui checkpoint * Works 1 * Pause resume simple v1 * Hitl block works in parallel branches without timing overlap * Pending status to logs * Pause resume ui link * Big context consolidation * HITL works in loops * Fix parallels * Reference blocks properly * Fix tag dropdown and start block resolution * Filter console logs for hitl block * Fix notifs * Fix logs page * Fix logs page again * Fix * Checkpoint * Cleanup v1 * Refactor v2 * Refactor v3 * Refactor v4 * Refactor v5 * Resume page * Fix variables in loops * Fix var res bugs * Ui changes * Approval block * Hitl works e2e v1 * Fix tets * Row level lock --------- Co-authored-by: Waleed Co-authored-by: Vikhyath Mondreti --- apps/sim/app/api/logs/route.ts | 27 +- .../[executionId]/[contextId]/route.ts | 116 + .../[workflowId]/[executionId]/route.ts | 48 + .../app/api/workflows/[id]/execute/route.ts | 37 + .../[id]/paused/[executionId]/route.ts | 34 + .../app/api/workflows/[id]/paused/route.ts | 31 + .../[executionId]/[contextId]/page.tsx | 15 + .../[workflowId]/[executionId]/page.tsx | 40 + .../[executionId]/resume-page-client.tsx | 1463 ++++ .../components/dashboard/workflow-details.tsx | 39 +- .../logs/components/sidebar/sidebar.tsx | 21 +- .../app/workspace/[workspaceId]/logs/logs.tsx | 37 +- .../app/workspace/[workspaceId]/logs/utils.ts | 3 + apps/sim/background/schedule-execution.ts | 19 + apps/sim/background/webhook-execution.ts | 37 + apps/sim/background/workflow-execution.ts | 19 + apps/sim/blocks/blocks/pause_resume.ts | 169 + apps/sim/blocks/registry.ts | 2 + apps/sim/components/ui/tag-dropdown.tsx | 33 +- .../executor/__test-utils__/executor-mocks.ts | 12 +- apps/sim/executor/consts.ts | 89 +- .../executor/dag/builder.pause-resume.test.ts | 68 + apps/sim/executor/dag/builder.ts | 50 +- apps/sim/executor/dag/construction/edges.ts | 148 +- apps/sim/executor/dag/construction/loops.ts | 19 +- apps/sim/executor/dag/construction/nodes.ts | 87 +- apps/sim/executor/dag/construction/paths.ts | 100 +- apps/sim/executor/dag/types.ts | 5 +- apps/sim/executor/execution/block-executor.ts | 225 +- apps/sim/executor/execution/edge-manager.ts | 74 +- apps/sim/executor/execution/engine.ts | 170 +- apps/sim/executor/execution/executor.ts | 179 +- .../executor/execution/snapshot-serializer.ts | 129 + apps/sim/executor/execution/snapshot.ts | 22 +- apps/sim/executor/execution/state.ts | 107 +- apps/sim/executor/execution/types.ts | 26 +- .../handlers/agent/agent-handler.test.ts | 3 +- .../executor/handlers/agent/agent-handler.ts | 28 - .../executor/handlers/api/api-handler.test.ts | 3 +- apps/sim/executor/handlers/api/api-handler.ts | 14 +- .../condition/condition-handler.test.ts | 20 +- .../handlers/condition/condition-handler.ts | 24 +- .../evaluator/evaluator-handler.test.ts | 3 +- .../handlers/evaluator/evaluator-handler.ts | 27 +- .../function/function-handler.test.ts | 3 +- .../handlers/generic/generic-handler.test.ts | 3 +- .../handlers/generic/generic-handler.ts | 14 - apps/sim/executor/handlers/index.ts | 2 + .../pause-resume/pause-resume-handler.ts | 668 ++ apps/sim/executor/handlers/registry.ts | 28 +- .../handlers/response/response-handler.ts | 36 +- .../handlers/router/router-handler.test.ts | 3 +- .../handlers/router/router-handler.ts | 8 +- .../handlers/trigger/trigger-handler.test.ts | 3 +- .../handlers/trigger/trigger-handler.ts | 19 - .../handlers/variables/variables-handler.ts | 21 - .../executor/handlers/wait/wait-handler.ts | 22 +- .../workflow/workflow-handler.test.ts | 3 +- .../handlers/workflow/workflow-handler.ts | 3 - apps/sim/executor/index.ts | 2 +- apps/sim/executor/orchestrators/loop.ts | 122 +- apps/sim/executor/orchestrators/node.ts | 142 +- apps/sim/executor/orchestrators/parallel.ts | 52 +- apps/sim/executor/pause-resume/utils.ts | 73 + apps/sim/executor/types.ts | 346 +- apps/sim/executor/utils/block-data.ts | 6 +- apps/sim/executor/utils/connections.ts | 4 +- .../sim/executor/utils/file-tool-processor.ts | 39 +- apps/sim/executor/utils/json.ts | 3 - apps/sim/executor/variables/resolver.ts | 45 +- .../sim/executor/variables/resolvers/block.ts | 26 +- apps/sim/executor/variables/resolvers/env.ts | 3 +- apps/sim/executor/variables/resolvers/loop.ts | 5 +- .../executor/variables/resolvers/parallel.ts | 4 +- .../executor/variables/resolvers/reference.ts | 2 +- .../executor/variables/resolvers/workflow.ts | 27 +- apps/sim/hooks/use-collaborative-workflow.ts | 5 +- .../tools/server/workflow/edit-workflow.ts | 5 +- apps/sim/lib/logs/execution/logger.ts | 146 +- .../sim/lib/logs/execution/logging-session.ts | 33 +- apps/sim/lib/workflows/block-outputs.ts | 23 + .../lib/workflows/executor/execution-core.ts | 38 +- .../executor/pause-resume-manager.ts | 1155 +++ apps/sim/stores/logs/filters/types.ts | 1 + apps/sim/stores/workflows/workflow/store.ts | 5 +- apps/sim/tsconfig.json | 4 +- .../0106_bitter_captain_midlands.sql | 37 + .../db/migrations/meta/0106_snapshot.json | 7556 +++++++++++++++++ packages/db/migrations/meta/_journal.json | 7 + packages/db/schema.ts | 52 + 90 files changed, 13498 insertions(+), 1128 deletions(-) create mode 100644 apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts create mode 100644 apps/sim/app/api/resume/[workflowId]/[executionId]/route.ts create mode 100644 apps/sim/app/api/workflows/[id]/paused/[executionId]/route.ts create mode 100644 apps/sim/app/api/workflows/[id]/paused/route.ts create mode 100644 apps/sim/app/resume/[workflowId]/[executionId]/[contextId]/page.tsx create mode 100644 apps/sim/app/resume/[workflowId]/[executionId]/page.tsx create mode 100644 apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx create mode 100644 apps/sim/blocks/blocks/pause_resume.ts create mode 100644 apps/sim/executor/dag/builder.pause-resume.test.ts create mode 100644 apps/sim/executor/execution/snapshot-serializer.ts create mode 100644 apps/sim/executor/handlers/pause-resume/pause-resume-handler.ts create mode 100644 apps/sim/executor/pause-resume/utils.ts create mode 100644 apps/sim/lib/workflows/executor/pause-resume-manager.ts create mode 100644 packages/db/migrations/0106_bitter_captain_midlands.sql create mode 100644 packages/db/migrations/meta/0106_snapshot.json diff --git a/apps/sim/app/api/logs/route.ts b/apps/sim/app/api/logs/route.ts index 557030c63..b55e2d132 100644 --- a/apps/sim/app/api/logs/route.ts +++ b/apps/sim/app/api/logs/route.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema' +import { pausedExecutions, permissions, workflow, workflowExecutionLogs } from '@sim/db/schema' import { and, desc, eq, gte, inArray, lte, type SQL, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' @@ -68,6 +68,9 @@ export async function GET(request: NextRequest) { workflowWorkspaceId: workflow.workspaceId, workflowCreatedAt: workflow.createdAt, workflowUpdatedAt: workflow.updatedAt, + pausedStatus: pausedExecutions.status, + pausedTotalPauseCount: pausedExecutions.totalPauseCount, + pausedResumedCount: pausedExecutions.resumedCount, } : { // Basic mode - exclude large fields for better performance @@ -92,11 +95,18 @@ export async function GET(request: NextRequest) { workflowWorkspaceId: workflow.workspaceId, workflowCreatedAt: workflow.createdAt, workflowUpdatedAt: workflow.updatedAt, + pausedStatus: pausedExecutions.status, + pausedTotalPauseCount: pausedExecutions.totalPauseCount, + pausedResumedCount: pausedExecutions.resumedCount, } const baseQuery = db .select(selectColumns) .from(workflowExecutionLogs) + .leftJoin( + pausedExecutions, + eq(pausedExecutions.executionId, workflowExecutionLogs.executionId) + ) .innerJoin( workflow, and( @@ -186,6 +196,10 @@ export async function GET(request: NextRequest) { const countQuery = db .select({ count: sql`count(*)` }) .from(workflowExecutionLogs) + .leftJoin( + pausedExecutions, + eq(pausedExecutions.executionId, workflowExecutionLogs.executionId) + ) .innerJoin( workflow, and( @@ -340,13 +354,18 @@ export async function GET(request: NextRequest) { return { id: log.id, workflowId: log.workflowId, - executionId: params.details === 'full' ? log.executionId : undefined, + executionId: log.executionId, level: log.level, duration: log.totalDurationMs ? `${log.totalDurationMs}ms` : null, trigger: log.trigger, createdAt: log.startedAt.toISOString(), files: params.details === 'full' ? log.files || undefined : undefined, workflow: workflowSummary, + pauseSummary: { + status: log.pausedStatus ?? null, + total: log.pausedTotalPauseCount ?? 0, + resumed: log.pausedResumedCount ?? 0, + }, executionData: params.details === 'full' ? { @@ -361,6 +380,10 @@ export async function GET(request: NextRequest) { params.details === 'full' ? (costSummary as any) : { total: (costSummary as any)?.total || 0 }, + hasPendingPause: + (Number(log.pausedTotalPauseCount ?? 0) > 0 && + Number(log.pausedResumedCount ?? 0) < Number(log.pausedTotalPauseCount ?? 0)) || + (log.pausedStatus && log.pausedStatus !== 'fully_resumed'), } }) return NextResponse.json( diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts new file mode 100644 index 000000000..c551f26dd --- /dev/null +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts @@ -0,0 +1,116 @@ +import { type NextRequest, NextResponse } from 'next/server' +import { createLogger } from '@/lib/logs/console/logger' +import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager' +import { validateWorkflowAccess } from '@/app/api/workflows/middleware' + +const logger = createLogger('WorkflowResumeAPI') + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +export async function POST( + request: NextRequest, + { + params, + }: { + params: Promise<{ workflowId: string; executionId: string; contextId: string }> + } +) { + const { workflowId, executionId, contextId } = await params + + const access = await validateWorkflowAccess(request, workflowId, false) + if (access.error) { + return NextResponse.json({ error: access.error.message }, { status: access.error.status }) + } + + const workflow = access.workflow! + + let payload: any = {} + try { + payload = await request.json() + } catch { + payload = {} + } + + const resumeInput = payload?.input ?? payload ?? {} + const userId = workflow.userId ?? '' + + try { + const enqueueResult = await PauseResumeManager.enqueueOrStartResume({ + executionId, + contextId, + resumeInput, + userId, + }) + + if (enqueueResult.status === 'queued') { + return NextResponse.json({ + status: 'queued', + executionId: enqueueResult.resumeExecutionId, + queuePosition: enqueueResult.queuePosition, + message: 'Resume queued. It will run after current resumes finish.', + }) + } + + PauseResumeManager.startResumeExecution({ + resumeEntryId: enqueueResult.resumeEntryId, + resumeExecutionId: enqueueResult.resumeExecutionId, + pausedExecution: enqueueResult.pausedExecution, + contextId: enqueueResult.contextId, + resumeInput: enqueueResult.resumeInput, + userId: enqueueResult.userId, + }).catch((error) => { + logger.error('Failed to start resume execution', { + workflowId, + parentExecutionId: executionId, + resumeExecutionId: enqueueResult.resumeExecutionId, + error, + }) + }) + + return NextResponse.json({ + status: 'started', + executionId: enqueueResult.resumeExecutionId, + message: 'Resume execution started.', + }) + } catch (error: any) { + logger.error('Resume request failed', { + workflowId, + executionId, + contextId, + error, + }) + return NextResponse.json( + { error: error.message || 'Failed to queue resume request' }, + { status: 400 } + ) + } +} + +export async function GET( + request: NextRequest, + { + params, + }: { + params: Promise<{ workflowId: string; executionId: string; contextId: string }> + } +) { + const { workflowId, executionId, contextId } = await params + + const access = await validateWorkflowAccess(request, workflowId, false) + if (access.error) { + return NextResponse.json({ error: access.error.message }, { status: access.error.status }) + } + + const detail = await PauseResumeManager.getPauseContextDetail({ + workflowId, + executionId, + contextId, + }) + + if (!detail) { + return NextResponse.json({ error: 'Pause context not found' }, { status: 404 }) + } + + return NextResponse.json(detail) +} diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/route.ts new file mode 100644 index 000000000..8aa517775 --- /dev/null +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/route.ts @@ -0,0 +1,48 @@ +import { type NextRequest, NextResponse } from 'next/server' +import { createLogger } from '@/lib/logs/console/logger' +import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager' +import { validateWorkflowAccess } from '@/app/api/workflows/middleware' + +const logger = createLogger('WorkflowResumeExecutionAPI') + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +export async function GET( + request: NextRequest, + { + params, + }: { + params: Promise<{ workflowId: string; executionId: string }> + } +) { + const { workflowId, executionId } = await params + + const access = await validateWorkflowAccess(request, workflowId, false) + if (access.error) { + return NextResponse.json({ error: access.error.message }, { status: access.error.status }) + } + + try { + const detail = await PauseResumeManager.getPausedExecutionDetail({ + workflowId, + executionId, + }) + + if (!detail) { + return NextResponse.json({ error: 'Paused execution not found' }, { status: 404 }) + } + + return NextResponse.json(detail) + } catch (error: any) { + logger.error('Failed to load paused execution detail', { + workflowId, + executionId, + error, + }) + return NextResponse.json( + { error: error?.message || 'Failed to load paused execution detail' }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index aa7da7b37..3d960bae0 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -12,6 +12,7 @@ import { } from '@/lib/workflows/db-helpers' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events' +import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager' import { validateWorkflowAccess } from '@/app/api/workflows/middleware' import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' import type { StreamingExecution } from '@/executor/types' @@ -135,6 +136,24 @@ export async function executeWorkflow( loggingSession, }) + if (result.status === 'paused') { + if (!result.snapshotSeed) { + logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { + executionId, + }) + } else { + await PauseResumeManager.persistPauseResult({ + workflowId, + executionId, + pausePoints: result.pausePoints || [], + snapshotSeed: result.snapshotSeed, + executorUserId: result.metadata?.userId, + }) + } + } else { + await PauseResumeManager.processQueuedResumes(executionId) + } + if (streamConfig?.skipLoggingComplete) { return { ...result, @@ -605,6 +624,24 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: loggingSession, }) + if (result.status === 'paused') { + if (!result.snapshotSeed) { + logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { + executionId, + }) + } else { + await PauseResumeManager.persistPauseResult({ + workflowId, + executionId, + pausePoints: result.pausePoints || [], + snapshotSeed: result.snapshotSeed, + executorUserId: result.metadata?.userId, + }) + } + } else { + await PauseResumeManager.processQueuedResumes(executionId) + } + if (result.error === 'Workflow execution was cancelled') { logger.info(`[${requestId}] Workflow execution was cancelled`) sendEvent({ diff --git a/apps/sim/app/api/workflows/[id]/paused/[executionId]/route.ts b/apps/sim/app/api/workflows/[id]/paused/[executionId]/route.ts new file mode 100644 index 000000000..a648abc3e --- /dev/null +++ b/apps/sim/app/api/workflows/[id]/paused/[executionId]/route.ts @@ -0,0 +1,34 @@ +import { type NextRequest, NextResponse } from 'next/server' +import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager' +import { validateWorkflowAccess } from '@/app/api/workflows/middleware' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +export async function GET( + request: NextRequest, + { + params, + }: { + params: { id: string; executionId: string } + } +) { + const workflowId = params.id + const executionId = params.executionId + + const access = await validateWorkflowAccess(request, workflowId, false) + if (access.error) { + return NextResponse.json({ error: access.error.message }, { status: access.error.status }) + } + + const detail = await PauseResumeManager.getPausedExecutionDetail({ + workflowId, + executionId, + }) + + if (!detail) { + return NextResponse.json({ error: 'Paused execution not found' }, { status: 404 }) + } + + return NextResponse.json(detail) +} diff --git a/apps/sim/app/api/workflows/[id]/paused/route.ts b/apps/sim/app/api/workflows/[id]/paused/route.ts new file mode 100644 index 000000000..d4162f2f5 --- /dev/null +++ b/apps/sim/app/api/workflows/[id]/paused/route.ts @@ -0,0 +1,31 @@ +import { type NextRequest, NextResponse } from 'next/server' +import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager' +import { validateWorkflowAccess } from '@/app/api/workflows/middleware' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +export async function GET( + request: NextRequest, + { + params, + }: { + params: { id: string } + } +) { + const workflowId = params.id + + const access = await validateWorkflowAccess(request, workflowId, false) + if (access.error) { + return NextResponse.json({ error: access.error.message }, { status: access.error.status }) + } + + const statusFilter = request.nextUrl.searchParams.get('status') || undefined + + const pausedExecutions = await PauseResumeManager.listPausedExecutions({ + workflowId, + status: statusFilter, + }) + + return NextResponse.json({ pausedExecutions }) +} diff --git a/apps/sim/app/resume/[workflowId]/[executionId]/[contextId]/page.tsx b/apps/sim/app/resume/[workflowId]/[executionId]/[contextId]/page.tsx new file mode 100644 index 000000000..9102a2d46 --- /dev/null +++ b/apps/sim/app/resume/[workflowId]/[executionId]/[contextId]/page.tsx @@ -0,0 +1,15 @@ +import { redirect } from 'next/navigation' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +interface PageParams { + workflowId: string + executionId: string + contextId: string +} + +export default async function ResumePage({ params }: { params: Promise }) { + const { workflowId, executionId, contextId } = await params + redirect(`/resume/${workflowId}/${executionId}?contextId=${contextId}`) +} diff --git a/apps/sim/app/resume/[workflowId]/[executionId]/page.tsx b/apps/sim/app/resume/[workflowId]/[executionId]/page.tsx new file mode 100644 index 000000000..c09ddc51d --- /dev/null +++ b/apps/sim/app/resume/[workflowId]/[executionId]/page.tsx @@ -0,0 +1,40 @@ +import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager' +import ResumeExecutionPage from './resume-page-client' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +interface PageParams { + workflowId: string + executionId: string +} + +export default async function ResumeExecutionPageWrapper({ + params, + searchParams, +}: { + params: Promise + searchParams: Promise> +}) { + const resolvedParams = await params + const resolvedSearchParams = await searchParams + + const { workflowId, executionId } = resolvedParams + const initialContextIdParam = resolvedSearchParams?.contextId + const initialContextId = Array.isArray(initialContextIdParam) + ? initialContextIdParam[0] + : initialContextIdParam + + const detail = await PauseResumeManager.getPausedExecutionDetail({ + workflowId, + executionId, + }) + + return ( + + ) +} diff --git a/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx b/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx new file mode 100644 index 000000000..2ef922146 --- /dev/null +++ b/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx @@ -0,0 +1,1463 @@ +'use client' + +import { useCallback, useEffect, useMemo, useState } from 'react' +import { + AlertCircle, + Calendar, + CheckCircle2, + Clock, + FileText, + Play, + RefreshCw, + XCircle, +} from 'lucide-react' +import { useRouter } from 'next/navigation' +import { Badge } from '@/components/ui/badge' +import { Button } from '@/components/ui/button' +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card' +import { Input } from '@/components/ui/input' +import { Label } from '@/components/ui/label' +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from '@/components/ui/select' +import { Separator } from '@/components/ui/separator' +import { Textarea } from '@/components/ui/textarea' +import { useBrandConfig } from '@/lib/branding/branding' +import Nav from '@/app/(landing)/components/nav/nav' +import { inter } from '@/app/fonts/inter' +import { soehne } from '@/app/fonts/soehne/soehne' +import type { ResumeStatus } from '@/executor/types' + +interface ResumeLinks { + apiUrl: string + uiUrl: string + contextId: string + executionId: string + workflowId: string +} + +interface NormalizedInputField { + id: string + name: string + label: string + type: string + description?: string + placeholder?: string + value?: any + required?: boolean + options?: any[] + rows?: number +} + +interface ResponseStructureRow { + id: string + name: string + type: string + value: any +} + +interface ResumeQueueEntrySummary { + id: string + contextId: string + status: string + queuedAt: string | null + claimedAt: string | null + completedAt: string | null + failureReason: string | null + newExecutionId: string + resumeInput: any +} + +interface PausePointWithQueue { + contextId: string + triggerBlockId: string + response: any + registeredAt: string + resumeStatus: ResumeStatus + snapshotReady: boolean + resumeLinks?: ResumeLinks + queuePosition?: number | null + latestResumeEntry?: ResumeQueueEntrySummary | null + parallelScope?: any + loopScope?: any +} + +interface PausedExecutionSummary { + id: string + workflowId: string + executionId: string + status: string + totalPauseCount: number + resumedCount: number + pausedAt: string | null + updatedAt: string | null + expiresAt: string | null + metadata: Record | null + triggerIds: string[] + pausePoints: PausePointWithQueue[] +} + +interface PauseContextDetail { + execution: PausedExecutionSummary + pausePoint: PausePointWithQueue + queue: ResumeQueueEntrySummary[] + activeResumeEntry?: ResumeQueueEntrySummary | null +} + +interface PausedExecutionDetail extends PausedExecutionSummary { + executionSnapshot: any + queue: ResumeQueueEntrySummary[] +} + +interface ResumeExecutionPageProps { + params: { workflowId: string; executionId: string } + initialExecutionDetail: PausedExecutionDetail | null + initialContextId?: string | null +} + +const RESUME_STATUS_STYLES: Record = { + paused: { + style: 'border-amber-200 bg-amber-50 text-amber-700', + icon: , + }, + queued: { + style: 'border-blue-200 bg-blue-50 text-blue-700', + icon: , + }, + resuming: { + style: 'border-indigo-200 bg-indigo-50 text-indigo-700', + icon: , + }, + resumed: { + style: 'border-emerald-200 bg-emerald-50 text-emerald-700', + icon: , + }, + failed: { + style: 'border-red-200 bg-red-50 text-red-700', + icon: , + }, +} + +function formatDate(value: string | null): string { + if (!value) return '—' + try { + return new Date(value).toLocaleString() + } catch { + return value + } +} + +function getStatusLabel(status: string): string { + if (!status) return 'Unknown' + return status.charAt(0).toUpperCase() + status.slice(1) +} + +function ResumeStatusBadge({ status }: { status: string }) { + const config = RESUME_STATUS_STYLES[status] ?? { + style: 'border-slate-200 bg-slate-100 text-slate-700', + icon: , + } + return ( + + {config.icon} + {getStatusLabel(status)} + + ) +} + +export default function ResumeExecutionPage({ + params, + initialExecutionDetail, + initialContextId, +}: ResumeExecutionPageProps) { + const { workflowId, executionId } = params + const router = useRouter() + const brandConfig = useBrandConfig() + + const [executionDetail, setExecutionDetail] = useState( + initialExecutionDetail + ) + const totalPauses = executionDetail?.totalPauseCount ?? 0 + const resumedCount = executionDetail?.resumedCount ?? 0 + const pendingCount = Math.max(0, totalPauses - resumedCount) + const pausePoints = executionDetail?.pausePoints ?? [] + + const defaultContextId = useMemo(() => { + if (initialContextId) return initialContextId + return ( + pausePoints.find((point) => point.resumeStatus === 'paused')?.contextId ?? + pausePoints[0]?.contextId + ) + }, [initialContextId, pausePoints]) + const actionablePausePoints = useMemo( + () => pausePoints.filter((point) => point.resumeStatus === 'paused'), + [pausePoints] + ) + + const groupedPausePoints = useMemo(() => { + const activeStatuses = new Set(['paused', 'queued', 'resuming']) + const resolvedStatuses = new Set(['resumed', 'failed']) + + return { + active: pausePoints.filter((point) => activeStatuses.has(point.resumeStatus)), + resolved: pausePoints.filter((point) => resolvedStatuses.has(point.resumeStatus)), + } + }, [pausePoints]) + + const [selectedContextId, setSelectedContextId] = useState( + defaultContextId ?? null + ) + const [selectedDetail, setSelectedDetail] = useState(null) + const [selectedStatus, setSelectedStatus] = + useState('paused') + const [queuePosition, setQueuePosition] = useState(undefined) + const [resumeInputs, setResumeInputs] = useState>({}) + const [resumeInput, setResumeInput] = useState('') + const [formValuesByContext, setFormValuesByContext] = useState< + Record> + >({}) + const [formValues, setFormValues] = useState>({}) + const [formErrors, setFormErrors] = useState>({}) + const [loadingDetail, setLoadingDetail] = useState(false) + const [loadingAction, setLoadingAction] = useState(false) + const [refreshingExecution, setRefreshingExecution] = useState(false) + const [error, setError] = useState(null) + const [message, setMessage] = useState(null) + + const normalizeInputFormatFields = useCallback((raw: any): NormalizedInputField[] => { + if (!Array.isArray(raw)) return [] + + return raw + .map((field: any, index: number) => { + if (!field || typeof field !== 'object') return null + + const name = typeof field.name === 'string' ? field.name.trim() : '' + if (!name) return null + + const id = typeof field.id === 'string' && field.id.length > 0 ? field.id : `field_${index}` + const label = + typeof field.label === 'string' && field.label.trim().length > 0 + ? field.label.trim() + : name + const type = + typeof field.type === 'string' && field.type.trim().length > 0 ? field.type : 'string' + const description = + typeof field.description === 'string' && field.description.trim().length > 0 + ? field.description.trim() + : undefined + const placeholder = + typeof field.placeholder === 'string' && field.placeholder.trim().length > 0 + ? field.placeholder.trim() + : undefined + const required = field.required === true + const options = Array.isArray(field.options) ? field.options : undefined + const rows = typeof field.rows === 'number' ? field.rows : undefined + + return { + id, + name, + label, + type, + description, + placeholder, + value: field.value, + required, + options, + rows, + } as NormalizedInputField + }) + .filter((field): field is NormalizedInputField => field !== null) + }, []) + + const formatValueForInputField = useCallback( + (field: NormalizedInputField, value: any): string => { + if (value === undefined || value === null) { + return '' + } + + switch (field.type) { + case 'boolean': + if (typeof value === 'boolean') { + return value ? 'true' : 'false' + } + if (typeof value === 'string') { + const normalized = value.trim().toLowerCase() + if (normalized === 'true' || normalized === 'false') { + return normalized + } + } + return '' + case 'number': + if (typeof value === 'number') { + return Number.isFinite(value) ? String(value) : '' + } + if (typeof value === 'string') { + return value + } + return '' + case 'array': + case 'object': + case 'files': + if (typeof value === 'string') { + return value + } + try { + return JSON.stringify(value, null, 2) + } catch { + return '' + } + default: + return typeof value === 'string' ? value : JSON.stringify(value) + } + }, + [] + ) + + const buildInitialFormValues = useCallback( + (fields: NormalizedInputField[], submission?: Record) => { + const initial: Record = {} + + for (const field of fields) { + const candidate = + submission && Object.hasOwn(submission, field.name) ? submission[field.name] : field.value + + initial[field.name] = formatValueForInputField(field, candidate) + } + + return initial + }, + [formatValueForInputField] + ) + + const formatStructureValue = useCallback((value: any): string => { + if (value === null || value === undefined) return '—' + if (typeof value === 'string') return value + if (typeof value === 'number' || typeof value === 'boolean') return String(value) + try { + return JSON.stringify(value, null, 2) + } catch { + return String(value) + } + }, []) + + const parseFormValue = useCallback( + (field: NormalizedInputField, rawValue: string): { value: any; error?: string } => { + const value = rawValue ?? '' + + switch (field.type) { + case 'number': { + if (!value.trim()) { + return { value: null } + } + const numericValue = Number(value) + if (Number.isNaN(numericValue)) { + return { value: null, error: 'Enter a valid number.' } + } + return { value: numericValue } + } + case 'boolean': { + if (value === 'true') return { value: true } + if (value === 'false') return { value: false } + if (!value) return { value: null } + return { value: null, error: 'Select true or false.' } + } + case 'array': + case 'object': + case 'files': { + if (!value.trim()) { + if (field.type === 'array') return { value: [] } + return { value: {} } + } + try { + return { value: JSON.parse(value) } + } catch { + return { value: null, error: 'Enter valid JSON.' } + } + } + default: + return { value } + } + }, + [] + ) + + const handleFormFieldChange = useCallback( + (fieldName: string, newValue: string) => { + if (!selectedContextId) return + + setFormValues((prev) => { + const updated = { ...prev, [fieldName]: newValue } + setFormValuesByContext((map) => ({ ...map, [selectedContextId]: updated })) + return updated + }) + + setFormErrors((prev) => { + if (!prev[fieldName]) { + return prev + } + const { [fieldName]: _, ...rest } = prev + return rest + }) + }, + [selectedContextId] + ) + + const renderFieldInput = useCallback( + (field: NormalizedInputField) => { + const value = formValues[field.name] ?? '' + + switch (field.type) { + case 'boolean': { + const selectValue = value === 'true' || value === 'false' ? value : '__unset__' + return ( + + ) + } + case 'number': + return ( + handleFormFieldChange(field.name, event.target.value)} + placeholder={field.placeholder ?? 'Enter a number...'} + className='rounded-[12px] border-slate-200' + /> + ) + case 'array': + case 'object': + case 'files': + return ( +