improvement(logs): state machine of workflow execution (#2560)

* improvement(logs): state machine of workflow execution

* cleanup more code

* fallback consistency

* fix labels

* backfill in migration correctly

* make streaming stop in chat window correctly
This commit is contained in:
Vikhyath Mondreti
2025-12-23 18:27:19 -08:00
committed by GitHub
parent 169dd4a503
commit 8c89507247
22 changed files with 8872 additions and 117 deletions

View File

@@ -49,6 +49,7 @@ export async function GET(request: NextRequest) {
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
deploymentVersionId: workflowExecutionLogs.deploymentVersionId,
level: workflowExecutionLogs.level,
status: workflowExecutionLogs.status,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
@@ -78,6 +79,7 @@ export async function GET(request: NextRequest) {
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
deploymentVersionId: workflowExecutionLogs.deploymentVersionId,
level: workflowExecutionLogs.level,
status: workflowExecutionLogs.status,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
@@ -332,6 +334,7 @@ export async function GET(request: NextRequest) {
deploymentVersion: log.deploymentVersion ?? null,
deploymentVersionName: log.deploymentVersionName ?? null,
level: log.level,
status: log.status,
duration: log.totalDurationMs ? `${log.totalDurationMs}ms` : null,
trigger: log.trigger,
createdAt: log.startedAt.toISOString(),

View File

@@ -713,7 +713,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
await PauseResumeManager.processQueuedResumes(executionId)
}
if (result.error === 'Workflow execution was cancelled') {
if (result.status === 'cancelled') {
logger.info(`[${requestId}] Workflow execution was cancelled`)
sendEvent({
type: 'execution:cancelled',

View File

@@ -7,8 +7,12 @@ import { ScrollArea } from '@/components/ui/scroll-area'
import { BASE_EXECUTION_CHARGE } from '@/lib/billing/constants'
import { FileCards, FrozenCanvas, TraceSpans } from '@/app/workspace/[workspaceId]/logs/components'
import { useLogDetailsResize } from '@/app/workspace/[workspaceId]/logs/hooks'
import type { LogStatus } from '@/app/workspace/[workspaceId]/logs/utils'
import { formatDate, StatusBadge, TriggerBadge } from '@/app/workspace/[workspaceId]/logs/utils'
import {
formatDate,
getDisplayStatus,
StatusBadge,
TriggerBadge,
} from '@/app/workspace/[workspaceId]/logs/utils'
import { formatCost } from '@/providers/utils'
import type { WorkflowLog } from '@/stores/logs/filters/types'
import { useLogDetailsUIStore } from '@/stores/logs/store'
@@ -100,14 +104,7 @@ export const LogDetails = memo(function LogDetails({
[log?.createdAt]
)
const logStatus: LogStatus = useMemo(() => {
if (!log) return 'info'
const baseLevel = (log.level || 'info').toLowerCase()
const isError = baseLevel === 'error'
const isPending = !isError && log.hasPendingPause === true
const isRunning = !isError && !isPending && log.duration === null
return isError ? 'error' : isPending ? 'pending' : isRunning ? 'running' : 'info'
}, [log])
const logStatus = useMemo(() => getDisplayStatus(log?.status), [log?.status])
return (
<>

View File

@@ -6,8 +6,14 @@ import Link from 'next/link'
import { List, type RowComponentProps, useListRef } from 'react-window'
import { Badge, buttonVariants } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
import {
formatDate,
formatDuration,
getDisplayStatus,
StatusBadge,
TriggerBadge,
} from '@/app/workspace/[workspaceId]/logs/utils'
import type { WorkflowLog } from '@/stores/logs/filters/types'
import { formatDate, formatDuration, StatusBadge, TriggerBadge } from '../../utils'
const LOG_ROW_HEIGHT = 44 as const
@@ -25,10 +31,6 @@ interface LogRowProps {
const LogRow = memo(
function LogRow({ log, isSelected, onClick, selectedRowRef }: LogRowProps) {
const formattedDate = useMemo(() => formatDate(log.createdAt), [log.createdAt])
const baseLevel = (log.level || 'info').toLowerCase()
const isError = baseLevel === 'error'
const isPending = !isError && log.hasPendingPause === true
const isRunning = !isError && !isPending && log.duration === null
const handleClick = useCallback(() => onClick(log), [onClick, log])
@@ -54,9 +56,7 @@ const LogRow = memo(
{/* Status */}
<div className='w-[12%] min-w-[100px]'>
<StatusBadge
status={isError ? 'error' : isPending ? 'pending' : isRunning ? 'running' : 'info'}
/>
<StatusBadge status={getDisplayStatus(log.status)} />
</div>
{/* Workflow */}
@@ -93,7 +93,7 @@ const LogRow = memo(
</div>
{/* Resume Link */}
{isPending && log.executionId && (log.workflow?.id || log.workflowId) && (
{log.status === 'pending' && log.executionId && (log.workflow?.id || log.workflowId) && (
<Link
href={`/resume/${log.workflow?.id || log.workflowId}/${log.executionId}`}
target='_blank'
@@ -115,8 +115,7 @@ const LogRow = memo(
return (
prevProps.log.id === nextProps.log.id &&
prevProps.log.duration === nextProps.log.duration &&
prevProps.log.level === nextProps.log.level &&
prevProps.log.hasPendingPause === nextProps.log.hasPendingPause &&
prevProps.log.status === nextProps.log.status &&
prevProps.isSelected === nextProps.isSelected
)
}

View File

@@ -137,9 +137,7 @@ export default function Logs() {
const hasStatusChange =
prevLog?.id === updatedLog.id &&
(updatedLog.duration !== prevLog.duration ||
updatedLog.level !== prevLog.level ||
updatedLog.hasPendingPause !== prevLog.hasPendingPause)
(updatedLog.duration !== prevLog.duration || updatedLog.status !== prevLog.status)
if (updatedLog !== selectedLog) {
setSelectedLog(updatedLog)

View File

@@ -7,8 +7,22 @@ import { getBlock } from '@/blocks/registry'
const CORE_TRIGGER_TYPES = ['manual', 'api', 'schedule', 'chat', 'webhook'] as const
const RUNNING_COLOR = '#22c55e' as const
const PENDING_COLOR = '#f59e0b' as const
export type LogStatus = 'error' | 'pending' | 'running' | 'info' | 'cancelled'
export type LogStatus = 'error' | 'pending' | 'running' | 'info'
export function getDisplayStatus(status: string | null | undefined): LogStatus {
switch (status) {
case 'running':
return 'running'
case 'pending':
return 'pending'
case 'cancelled':
return 'cancelled'
case 'failed':
return 'error'
default:
return 'info'
}
}
/**
* Checks if a hex color is gray/neutral (low saturation) or too light/dark
@@ -77,6 +91,11 @@ export const StatusBadge = React.memo(({ status }: StatusBadgeProps) => {
color: lightenColor(RUNNING_COLOR, 65),
label: 'Running',
},
cancelled: {
bg: 'var(--terminal-status-info-bg)',
color: 'var(--terminal-status-info-color)',
label: 'Cancelled',
},
info: {
bg: 'var(--terminal-status-info-bg)',
color: 'var(--terminal-status-info-color)',
@@ -271,6 +290,7 @@ export interface ExecutionLog {
executionId: string
startedAt: string
level: string
status: string
trigger: string
triggerUserId: string | null
triggerInputs?: unknown
@@ -291,6 +311,7 @@ interface RawLogResponse extends LogWithDuration, LogWithExecutionData {
endedAt?: string
createdAt?: string
level?: string
status?: string
trigger?: string
triggerUserId?: string | null
error?: string
@@ -331,6 +352,7 @@ export function mapToExecutionLog(log: RawLogResponse): ExecutionLog {
executionId: log.executionId,
startedAt,
level: log.level || 'info',
status: log.status || 'completed',
trigger: log.trigger || 'manual',
triggerUserId: log.triggerUserId || null,
triggerInputs: undefined,
@@ -365,6 +387,7 @@ export function mapToExecutionLogAlt(log: RawLogResponse): ExecutionLog {
executionId: log.executionId,
startedAt: log.createdAt || log.startedAt || new Date().toISOString(),
level: log.level || 'info',
status: log.status || 'completed',
trigger: log.trigger || 'manual',
triggerUserId: log.triggerUserId || null,
triggerInputs: undefined,

View File

@@ -1,7 +1,15 @@
'use client'
import { type KeyboardEvent, useCallback, useEffect, useMemo, useRef, useState } from 'react'
import { AlertCircle, ArrowDownToLine, ArrowUp, MoreVertical, Paperclip, X } from 'lucide-react'
import {
AlertCircle,
ArrowDownToLine,
ArrowUp,
MoreVertical,
Paperclip,
Square,
X,
} from 'lucide-react'
import {
Badge,
Button,
@@ -211,7 +219,7 @@ export function Chat() {
const { entries } = useTerminalConsoleStore()
const { isExecuting } = useExecutionStore()
const { handleRunWorkflow } = useWorkflowExecution()
const { handleRunWorkflow, handleCancelExecution } = useWorkflowExecution()
const { data: session } = useSession()
const { addToQueue } = useOperationQueue()
@@ -224,7 +232,7 @@ export function Chat() {
// Refs
const inputRef = useRef<HTMLInputElement>(null)
const timeoutRef = useRef<NodeJS.Timeout | null>(null)
const abortControllerRef = useRef<AbortController | null>(null)
const streamReaderRef = useRef<ReadableStreamDefaultReader<Uint8Array> | null>(null)
// File upload hook
const {
@@ -436,10 +444,28 @@ export function Chat() {
useEffect(() => {
return () => {
timeoutRef.current && clearTimeout(timeoutRef.current)
abortControllerRef.current?.abort()
streamReaderRef.current?.cancel()
}
}, [])
// React to execution cancellation from run button
useEffect(() => {
if (!isExecuting && isStreaming) {
const lastMessage = workflowMessages[workflowMessages.length - 1]
if (lastMessage?.isStreaming) {
streamReaderRef.current?.cancel()
streamReaderRef.current = null
finalizeMessageStream(lastMessage.id)
}
}
}, [isExecuting, isStreaming, workflowMessages, finalizeMessageStream])
const handleStopStreaming = useCallback(() => {
streamReaderRef.current?.cancel()
streamReaderRef.current = null
handleCancelExecution()
}, [handleCancelExecution])
/**
* Processes streaming response from workflow execution
* Reads the stream chunk by chunk and updates the message content in real-time
@@ -449,6 +475,7 @@ export function Chat() {
const processStreamingResponse = useCallback(
async (stream: ReadableStream, responseMessageId: string) => {
const reader = stream.getReader()
streamReaderRef.current = reader
const decoder = new TextDecoder()
let accumulatedContent = ''
let buffer = ''
@@ -509,8 +536,15 @@ export function Chat() {
}
}
} catch (error) {
logger.error('Error processing stream:', error)
if ((error as Error)?.name !== 'AbortError') {
logger.error('Error processing stream:', error)
}
finalizeMessageStream(responseMessageId)
} finally {
// Only clear ref if it's still our reader (prevents clobbering a new stream)
if (streamReaderRef.current === reader) {
streamReaderRef.current = null
}
focusInput(100)
}
},
@@ -590,10 +624,6 @@ export function Chat() {
}
setHistoryIndex(-1)
// Reset abort controller
abortControllerRef.current?.abort()
abortControllerRef.current = new AbortController()
const conversationId = getConversationId(activeWorkflowId)
try {
@@ -1022,22 +1052,31 @@ export function Chat() {
<Paperclip className='!h-3.5 !w-3.5' />
</Badge>
<Button
onClick={handleSendMessage}
disabled={
(!chatMessage.trim() && chatFiles.length === 0) ||
!activeWorkflowId ||
isExecuting
}
className={cn(
'h-[22px] w-[22px] rounded-full p-0 transition-colors',
chatMessage.trim() || chatFiles.length > 0
? '!bg-[var(--c-C0C0C0)] hover:!bg-[var(--c-D0D0D0)]'
: '!bg-[var(--c-C0C0C0)]'
)}
>
<ArrowUp className='h-3.5 w-3.5 text-black' strokeWidth={2.25} />
</Button>
{isStreaming ? (
<Button
onClick={handleStopStreaming}
className='h-[22px] w-[22px] rounded-full p-0 transition-colors !bg-[var(--c-C0C0C0)] hover:!bg-[var(--c-D0D0D0)]'
>
<Square className='h-2.5 w-2.5 fill-black text-black' />
</Button>
) : (
<Button
onClick={handleSendMessage}
disabled={
(!chatMessage.trim() && chatFiles.length === 0) ||
!activeWorkflowId ||
isExecuting
}
className={cn(
'h-[22px] w-[22px] rounded-full p-0 transition-colors',
chatMessage.trim() || chatFiles.length > 0
? '!bg-[var(--c-C0C0C0)] hover:!bg-[var(--c-D0D0D0)]'
: '!bg-[var(--c-C0C0C0)]'
)}
>
<ArrowUp className='h-3.5 w-3.5 text-black' strokeWidth={2.25} />
</Button>
)}
</div>
</div>

View File

@@ -1,4 +1,4 @@
import { useCallback, useState } from 'react'
import { useCallback, useRef, useState } from 'react'
import { useQueryClient } from '@tanstack/react-query'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
@@ -111,6 +111,7 @@ export function useWorkflowExecution() {
} = useExecutionStore()
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
const executionStream = useExecutionStream()
const currentChatExecutionIdRef = useRef<string | null>(null)
const isViewingDiff = useWorkflowDiffStore((state) => state.isShowingDiff)
/**
@@ -312,13 +313,25 @@ export function useWorkflowExecution() {
// For chat executions, we'll use a streaming approach
if (isChatExecution) {
let isCancelled = false
const executionId = uuidv4()
currentChatExecutionIdRef.current = executionId
const stream = new ReadableStream({
async start(controller) {
const { encodeSSE } = await import('@/lib/core/utils/sse')
const executionId = uuidv4()
const streamedContent = new Map<string, string>()
const streamReadingPromises: Promise<void>[] = []
const safeEnqueue = (data: Uint8Array) => {
if (!isCancelled) {
try {
controller.enqueue(data)
} catch {
isCancelled = true
}
}
}
// Handle file uploads if present
const uploadedFiles: any[] = []
interface UploadErrorCapableInput {
@@ -432,7 +445,7 @@ export function useWorkflowExecution() {
}
}
controller.enqueue(encodeSSE({ blockId, chunk: chunkToSend }))
safeEnqueue(encodeSSE({ blockId, chunk: chunkToSend }))
}
} catch (error) {
logger.error('Error reading from stream:', error)
@@ -485,7 +498,7 @@ export function useWorkflowExecution() {
const separator = streamedContent.size > 0 ? '\n\n' : ''
// Send the non-streaming block output as a chunk
controller.enqueue(encodeSSE({ blockId, chunk: separator + formattedOutput }))
safeEnqueue(encodeSSE({ blockId, chunk: separator + formattedOutput }))
// Track that we've sent output for this block
streamedContent.set(blockId, formattedOutput)
@@ -503,13 +516,8 @@ export function useWorkflowExecution() {
)
// Check if execution was cancelled
if (
result &&
'success' in result &&
!result.success &&
result.error === 'Workflow execution was cancelled'
) {
controller.enqueue(encodeSSE({ event: 'cancelled', data: result }))
if (result && 'status' in result && result.status === 'cancelled') {
safeEnqueue(encodeSSE({ event: 'cancelled', data: result }))
return
}
@@ -568,8 +576,7 @@ export function useWorkflowExecution() {
queryClient.invalidateQueries({ queryKey: subscriptionKeys.user() })
}, 1000)
const { encodeSSE } = await import('@/lib/core/utils/sse')
controller.enqueue(encodeSSE({ event: 'final', data: result }))
safeEnqueue(encodeSSE({ event: 'final', data: result }))
// Note: Logs are already persisted server-side via execution-core.ts
}
} catch (error: any) {
@@ -587,17 +594,23 @@ export function useWorkflowExecution() {
}
// Send the error as final event so downstream handlers can treat it uniformly
const { encodeSSE } = await import('@/lib/core/utils/sse')
controller.enqueue(encodeSSE({ event: 'final', data: errorResult }))
safeEnqueue(encodeSSE({ event: 'final', data: errorResult }))
// Do not error the controller to allow consumers to process the final event
} finally {
controller.close()
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
if (!isCancelled) {
controller.close()
}
if (currentChatExecutionIdRef.current === executionId) {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
}
}
},
cancel() {
isCancelled = true
},
})
return { success: true, stream }
}
@@ -1317,7 +1330,10 @@ export function useWorkflowExecution() {
// Cancel the execution stream (server-side)
executionStream.cancel()
// Reset execution state
// Mark current chat execution as superseded so its cleanup won't affect new executions
currentChatExecutionIdRef.current = null
// Reset execution state - this triggers chat stream cleanup via useEffect in chat.tsx
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())

View File

@@ -309,30 +309,22 @@ async function runWorkflowExecution({
}
return { status: 'failure', blocks, executionResult }
} catch (earlyError) {
logger.error(
`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`,
earlyError
)
} catch (error: unknown) {
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
try {
const executionResult = (earlyError as any)?.executionResult as ExecutionResult | undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed: ${
earlyError instanceof Error ? earlyError.message : String(earlyError)
}`,
stackTrace: earlyError instanceof Error ? earlyError.stack : undefined,
},
traceSpans,
})
} catch (loggingError) {
logger.error(`[${requestId}] Failed to complete log entry for schedule failure`, loggingError)
}
await loggingSession.safeCompleteWithError({
error: {
message: error instanceof Error ? error.message : String(error),
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})
throw earlyError
throw error
}
}
@@ -606,8 +598,10 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
`Error updating schedule ${payload.scheduleId} after failure`,
`Updated schedule ${payload.scheduleId} after failure`
)
} catch (error: any) {
if (error?.message?.includes('Service overloaded')) {
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error)
if (errorMessage.includes('Service overloaded')) {
logger.warn(`[${requestId}] Service overloaded, retrying schedule in 5 minutes`)
const retryDelay = 5 * 60 * 1000
@@ -652,7 +646,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
`Updated schedule ${payload.scheduleId} after execution error`
)
}
} catch (error: any) {
} catch (error: unknown) {
logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error)
}
}

View File

@@ -536,10 +536,13 @@ async function executeWebhookJobInternal(
executedAt: new Date().toISOString(),
provider: payload.provider,
}
} catch (error: any) {
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error)
const errorStack = error instanceof Error ? error.stack : undefined
logger.error(`[${requestId}] Webhook execution failed`, {
error: error.message,
stack: error.stack,
error: errorMessage,
stack: errorStack,
workflowId: payload.workflowId,
provider: payload.provider,
})
@@ -567,10 +570,11 @@ async function executeWebhookJobInternal(
isTest: payload.testMode === true,
executionTarget: payload.executionTarget || 'deployed',
},
deploymentVersionId, // Pass if available (undefined for early errors)
deploymentVersionId,
})
const executionResult = (error?.executionResult as ExecutionResult | undefined) || {
const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult || {
success: false,
output: {},
logs: [],
@@ -581,8 +585,8 @@ async function executeWebhookJobInternal(
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Webhook execution failed',
stackTrace: error.stack,
message: errorMessage || 'Webhook execution failed',
stackTrace: errorStack,
},
traceSpans,
})

View File

@@ -3,10 +3,12 @@ import { v4 as uuidv4 } from 'uuid'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { getWorkflowById } from '@/lib/workflows/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionResult } from '@/executor/types'
const logger = createLogger('TriggerWorkflowExecution')
@@ -66,6 +68,12 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
logger.info(`[${requestId}] Preprocessing passed. Using actor: ${actorUserId}`)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId,
variables: {},
})
const workflow = await getWorkflowById(workflowId)
if (!workflow) {
throw new Error(`Workflow ${workflowId} not found after preprocessing`)
@@ -131,11 +139,24 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
executedAt: new Date().toISOString(),
metadata: payload.metadata,
}
} catch (error: any) {
} catch (error: unknown) {
logger.error(`[${requestId}] Workflow execution failed: ${workflowId}`, {
error: error.message,
error: error instanceof Error ? error.message : String(error),
executionId,
})
const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({
error: {
message: error instanceof Error ? error.message : String(error),
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})
throw error
}
}

View File

@@ -39,6 +39,9 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if (this.context.isCancelled && this.executing.size === 0) {
break
}
await this.processQueue()
}
await this.waitForAllExecutions()
@@ -51,6 +54,16 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime
if (this.context.isCancelled) {
return {
success: false,
output: this.finalOutput,
logs: this.context.blockLogs,
metadata: this.context.metadata,
status: 'cancelled',
}
}
return {
success: true,
output: this.finalOutput,
@@ -62,6 +75,16 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime
if (this.context.isCancelled) {
return {
success: false,
output: this.finalOutput,
logs: this.context.blockLogs,
metadata: this.context.metadata,
status: 'cancelled',
}
}
const errorMessage = normalizeError(error)
logger.error('Execution failed', { error: errorMessage })
@@ -73,8 +96,6 @@ export class ExecutionEngine {
metadata: this.context.metadata,
}
// Attach executionResult to the original error instead of creating a new one
// This preserves block error metadata (blockId, blockName, blockType, etc.)
if (error && typeof error === 'object') {
;(error as any).executionResult = executionResult
}
@@ -213,6 +234,9 @@ export class ExecutionEngine {
private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (this.context.isCancelled) {
break
}
const nodeId = this.dequeue()
if (!nodeId) continue
const promise = this.executeNodeAsync(nodeId)
@@ -227,8 +251,6 @@ export class ExecutionEngine {
private async executeNodeAsync(nodeId: string): Promise<void> {
try {
const wasAlreadyExecuted = this.context.executedBlocks.has(nodeId)
const node = this.dag.nodes.get(nodeId)
const result = await this.nodeOrchestrator.executeNode(this.context, nodeId)
if (!wasAlreadyExecuted) {

View File

@@ -235,7 +235,7 @@ export interface ExecutionResult {
error?: string
logs?: BlockLog[]
metadata?: ExecutionMetadata
status?: 'completed' | 'paused'
status?: 'completed' | 'paused' | 'cancelled'
pausePoints?: PausePoint[]
snapshotSeed?: SerializedSnapshot
_streamingMetadata?: {

View File

@@ -149,6 +149,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
stateSnapshotId: snapshotResult.snapshot.id,
deploymentVersionId: deploymentVersionId ?? null,
level: 'info',
status: 'running',
trigger: trigger.type,
startedAt: startTime,
endedAt: null,
@@ -206,8 +207,9 @@ export class ExecutionLogger implements IExecutionLoggerService {
finalOutput: BlockOutputData
traceSpans?: TraceSpan[]
workflowInput?: any
isResume?: boolean // If true, merge with existing data instead of replacing
level?: 'info' | 'error' // Optional override for log level (used in cost-only fallback)
isResume?: boolean
level?: 'info' | 'error'
status?: 'completed' | 'failed' | 'cancelled'
}): Promise<WorkflowExecutionLog> {
const {
executionId,
@@ -219,6 +221,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
workflowInput,
isResume,
level: levelOverride,
status: statusOverride,
} = params
logger.debug(`Completing workflow execution ${executionId}`, { isResume })
@@ -248,6 +251,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
})
const level = levelOverride ?? (hasErrors ? 'error' : 'info')
const status = statusOverride ?? (hasErrors ? 'failed' : 'completed')
// Extract files from trace spans, final output, and workflow input
const executionFiles = this.extractFilesFromExecution(traceSpans, finalOutput, workflowInput)
@@ -309,6 +313,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
.update(workflowExecutionLogs)
.set({
level,
status,
endedAt: new Date(endedAt),
totalDurationMs: actualTotalDuration,
files: mergedFiles.length > 0 ? mergedFiles : null,

View File

@@ -44,6 +44,12 @@ export interface SessionErrorCompleteParams {
traceSpans?: TraceSpan[]
}
export interface SessionCancelledParams {
endedAt?: string
totalDurationMs?: number
traceSpans?: TraceSpan[]
}
export class LoggingSession {
private workflowId: string
private executionId: string
@@ -52,7 +58,8 @@ export class LoggingSession {
private trigger?: ExecutionTrigger
private environment?: ExecutionEnvironment
private workflowState?: WorkflowState
private isResume = false // Track if this is a resume execution
private isResume = false
private completed = false
constructor(
workflowId: string,
@@ -127,6 +134,10 @@ export class LoggingSession {
}
async complete(params: SessionCompleteParams = {}): Promise<void> {
if (this.completed) {
return
}
const { endedAt, totalDurationMs, finalOutput, traceSpans, workflowInput } = params
try {
@@ -145,6 +156,8 @@ export class LoggingSession {
isResume: this.isResume,
})
this.completed = true
// Track workflow execution outcome
if (traceSpans && traceSpans.length > 0) {
try {
@@ -194,6 +207,10 @@ export class LoggingSession {
}
async completeWithError(params: SessionErrorCompleteParams = {}): Promise<void> {
if (this.completed) {
return
}
try {
const { endedAt, totalDurationMs, error, traceSpans } = params
@@ -242,6 +259,8 @@ export class LoggingSession {
traceSpans: spans,
})
this.completed = true
// Track workflow execution error outcome
try {
const { trackPlatformEvent } = await import('@/lib/core/telemetry')
@@ -277,6 +296,74 @@ export class LoggingSession {
}
}
async completeWithCancellation(params: SessionCancelledParams = {}): Promise<void> {
if (this.completed) {
return
}
try {
const { endedAt, totalDurationMs, traceSpans } = params
const endTime = endedAt ? new Date(endedAt) : new Date()
const durationMs = typeof totalDurationMs === 'number' ? totalDurationMs : 0
const costSummary = traceSpans?.length
? calculateCostSummary(traceSpans)
: {
totalCost: BASE_EXECUTION_CHARGE,
totalInputCost: 0,
totalOutputCost: 0,
totalTokens: 0,
totalPromptTokens: 0,
totalCompletionTokens: 0,
baseExecutionCharge: BASE_EXECUTION_CHARGE,
modelCost: 0,
models: {},
}
await executionLogger.completeWorkflowExecution({
executionId: this.executionId,
endedAt: endTime.toISOString(),
totalDurationMs: Math.max(1, durationMs),
costSummary,
finalOutput: { cancelled: true },
traceSpans: traceSpans || [],
status: 'cancelled',
})
this.completed = true
try {
const { trackPlatformEvent } = await import('@/lib/core/telemetry')
trackPlatformEvent('platform.workflow.executed', {
'workflow.id': this.workflowId,
'execution.duration_ms': Math.max(1, durationMs),
'execution.status': 'cancelled',
'execution.trigger': this.triggerType,
'execution.blocks_executed': traceSpans?.length || 0,
'execution.has_errors': false,
})
} catch (_e) {
// Silently fail
}
if (this.requestId) {
logger.debug(
`[${this.requestId}] Completed cancelled logging for execution ${this.executionId}`
)
}
} catch (cancelError) {
logger.error(`Failed to complete cancelled logging for execution ${this.executionId}:`, {
requestId: this.requestId,
workflowId: this.workflowId,
executionId: this.executionId,
error: cancelError instanceof Error ? cancelError.message : String(cancelError),
stack: cancelError instanceof Error ? cancelError.stack : undefined,
})
throw cancelError
}
}
async safeStart(params: SessionStartParams): Promise<boolean> {
try {
await this.start(params)
@@ -368,6 +455,27 @@ export class LoggingSession {
errorMessage:
params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`,
isError: true,
status: 'failed',
})
}
}
async safeCompleteWithCancellation(params?: SessionCancelledParams): Promise<void> {
try {
await this.completeWithCancellation(params)
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error)
logger.warn(
`[${this.requestId || 'unknown'}] CompleteWithCancellation failed for execution ${this.executionId}, attempting fallback`,
{ error: errorMsg }
)
await this.completeWithCostOnlyLog({
traceSpans: params?.traceSpans,
endedAt: params?.endedAt,
totalDurationMs: params?.totalDurationMs,
errorMessage: 'Execution was cancelled',
isError: false,
status: 'cancelled',
})
}
}
@@ -378,7 +486,12 @@ export class LoggingSession {
totalDurationMs?: number
errorMessage: string
isError: boolean
status?: 'completed' | 'failed' | 'cancelled'
}): Promise<void> {
if (this.completed) {
return
}
logger.warn(
`[${this.requestId || 'unknown'}] Logging completion failed for execution ${this.executionId} - attempting cost-only fallback`
)
@@ -407,8 +520,11 @@ export class LoggingSession {
traceSpans: [],
isResume: this.isResume,
level: params.isError ? 'error' : 'info',
status: params.status,
})
this.completed = true
logger.info(
`[${this.requestId || 'unknown'}] Cost-only fallback succeeded for execution ${this.executionId}`
)

View File

@@ -366,7 +366,28 @@ export async function executeWorkflowCore(
await updateWorkflowRunCounts(workflowId)
}
// Complete logging session
if (result.status === 'cancelled') {
await loggingSession.safeCompleteWithCancellation({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
})
logger.info(`[${requestId}] Workflow execution cancelled`, {
duration: result.metadata?.duration,
})
return result
}
if (result.status === 'paused') {
logger.info(`[${requestId}] Workflow execution paused`, {
duration: result.metadata?.duration,
})
return result
}
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,

View File

@@ -1,6 +1,6 @@
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { pausedExecutions, resumeQueue } from '@sim/db/schema'
import { pausedExecutions, resumeQueue, workflowExecutionLogs } from '@sim/db/schema'
import { and, asc, desc, eq, inArray, lt, sql } from 'drizzle-orm'
import type { Edge } from 'reactflow'
import { preprocessExecution } from '@/lib/execution/preprocessing'
@@ -155,6 +155,11 @@ export class PauseResumeManager {
},
})
await db
.update(workflowExecutionLogs)
.set({ status: 'pending' })
.where(eq(workflowExecutionLogs.executionId, executionId))
await PauseResumeManager.processQueuedResumes(executionId)
}
@@ -330,6 +335,7 @@ export class PauseResumeManager {
await PauseResumeManager.markResumeFailed({
resumeEntryId,
pausedExecutionId: pausedExecution.id,
parentExecutionId: pausedExecution.executionId,
contextId,
failureReason: (error as Error).message,
})
@@ -352,6 +358,12 @@ export class PauseResumeManager {
userId: string
}): Promise<ExecutionResult> {
const { resumeExecutionId, pausedExecution, contextId, resumeInput, userId } = args
const parentExecutionId = pausedExecution.executionId
await db
.update(workflowExecutionLogs)
.set({ status: 'running' })
.where(eq(workflowExecutionLogs.executionId, parentExecutionId))
logger.info('Starting resume execution', {
resumeExecutionId,
@@ -667,7 +679,7 @@ export class PauseResumeManager {
'manual'
const loggingSession = new LoggingSession(
metadata.workflowId,
resumeExecutionId,
parentExecutionId,
triggerType,
metadata.requestId
)
@@ -765,6 +777,11 @@ export class PauseResumeManager {
.update(pausedExecutions)
.set({ status: 'fully_resumed', updatedAt: now })
.where(eq(pausedExecutions.executionId, parentExecutionId))
} else {
await tx
.update(workflowExecutionLogs)
.set({ status: 'pending' })
.where(eq(workflowExecutionLogs.executionId, parentExecutionId))
}
})
}
@@ -772,6 +789,7 @@ export class PauseResumeManager {
private static async markResumeFailed(args: {
resumeEntryId: string
pausedExecutionId: string
parentExecutionId: string
contextId: string
failureReason: string
}): Promise<void> {
@@ -789,6 +807,11 @@ export class PauseResumeManager {
pausePoints: sql`jsonb_set(pause_points, ARRAY[${args.contextId}, 'resumeStatus'], '"failed"'::jsonb)`,
})
.where(eq(pausedExecutions.id, args.pausedExecutionId))
await tx
.update(workflowExecutionLogs)
.set({ status: 'failed' })
.where(eq(workflowExecutionLogs.executionId, args.parentExecutionId))
})
}

View File

@@ -107,6 +107,7 @@ export interface WorkflowLog {
deploymentVersion?: number | null
deploymentVersionName?: string | null
level: string
status?: string | null
duration: string | null
trigger: string | null
createdAt: string

View File

@@ -0,0 +1,7 @@
ALTER TABLE "workflow_execution_logs" ADD COLUMN "status" text DEFAULT 'running' NOT NULL;--> statement-breakpoint
UPDATE "workflow_execution_logs"
SET "status" = CASE
WHEN "level" = 'error' THEN 'failed'
WHEN "ended_at" IS NOT NULL THEN 'completed'
ELSE 'running'
END;

File diff suppressed because it is too large Load Diff

View File

@@ -918,6 +918,13 @@
"when": 1766460889694,
"tag": "0131_illegal_nova",
"breakpoints": true
},
{
"idx": 132,
"version": "7",
"when": 1766529613309,
"tag": "0132_dazzling_leech",
"breakpoints": true
}
]
}

View File

@@ -304,8 +304,9 @@ export const workflowExecutionLogs = pgTable(
{ onDelete: 'set null' }
),
level: text('level').notNull(), // 'info', 'error'
trigger: text('trigger').notNull(), // 'api', 'webhook', 'schedule', 'manual', 'chat'
level: text('level').notNull(), // 'info' | 'error'
status: text('status').notNull().default('running'), // 'running' | 'pending' | 'completed' | 'failed' | 'cancelled'
trigger: text('trigger').notNull(), // 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
startedAt: timestamp('started_at').notNull(),
endedAt: timestamp('ended_at'),