Checkpoint

This commit is contained in:
Siddharth Ganesan
2025-08-29 18:32:06 -07:00
parent 3e6d454de3
commit cf5532c852
8 changed files with 224 additions and 7 deletions

View File

@@ -544,6 +544,7 @@ export async function executeWorkflowForChat(
workspaceId: '', // TODO: Get from workflow
variables: workflowVariables,
initialInput: { input, conversationId },
executionType: 'chat',
})
const stream = new ReadableStream({

View File

@@ -400,6 +400,8 @@ export async function GET() {
workspaceId: workflowRecord.workspaceId || '',
variables: variables || {},
initialInput: input,
startBlockId: schedule.blockId || undefined,
executionType: 'schedule',
})
const executor = new Executor({
@@ -479,6 +481,8 @@ export async function GET() {
workspaceId: workflowRecord.workspaceId || '',
variables: {},
initialInput: input,
startBlockId: schedule.blockId || undefined,
executionType: 'schedule',
})
await loggingSession.safeCompleteWithError({

View File

@@ -147,6 +147,7 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any): P
workspaceId: workflow.workspaceId,
variables,
initialInput: processedInput || {},
executionType: 'api',
})
// Replace environment variables in the block states

View File

@@ -41,6 +41,8 @@ import { getTrigger, getTriggersByProvider } from '@/triggers'
import { useParams } from 'next/navigation'
import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from '@/components/ui/select'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
// Token render cache (LRU-style)
const TOKEN_CACHE_MAX = 500
@@ -2055,7 +2057,10 @@ export function DebugPanel() {
console.log('[Debug] Final executionInput:', executionInput)
// Apply the execution input to the starter block's subblock values
if (executionInput) {
const startedFromBlockIdMeta = (executionData?.startedFromBlockId as string | undefined)
const executionTypeMeta = (executionData?.executionType as string | undefined) || (executionData?.trigger?.type as string | undefined)
const shouldApplyStarter = !startedFromBlockIdMeta || startedFromBlockIdMeta === stateStarterId || executionTypeMeta === 'chat' || executionTypeMeta === 'api' || executionTypeMeta === 'manual'
if (executionInput && shouldApplyStarter) {
if (!subblockValues[stateStarterId]) {
subblockValues[stateStarterId] = {}
}
@@ -2105,7 +2110,7 @@ export function DebugPanel() {
})
// Append any extra keys from executionInput that aren't in the format
if (executionInput && typeof executionInput === 'object') {
if (executionInput && typeof executionInput === 'object') {
const existingNames = new Set(updatedFormat.map((f: any) => f?.name).filter(Boolean))
Object.entries(executionInput as Record<string, any>).forEach(([key, value]) => {
if (
@@ -2150,6 +2155,8 @@ export function DebugPanel() {
// Store the full execution input for reference
subblockValues[stateStarterId]._executionInput = executionInput
console.log('[Debug] Full execution input stored:', executionInput)
} else if (executionInput && !shouldApplyStarter) {
console.log('[Debug] Skipping applying execution input to starter because execution started from a non-starter block')
}
}
@@ -2210,9 +2217,172 @@ export function DebugPanel() {
const blocks = Object.values(state.blocks || {}) as any[]
const starterBlock = blocks.find((b: any) => b?.type === 'starter' || b?.metadata?.id === 'starter') as any
const starterIdFromState = starterBlock?.id as string | undefined
console.log('[Debug] Setting starter as pending:', starterIdFromState)
execStore.setPendingBlocks(starterIdFromState ? [starterIdFromState] : [])
execStore.setPanelFocusedBlockId(starterIdFromState || null)
// Prefer webhook-triggered block if present
let initialBlockId: string | undefined = starterIdFromState
try {
const fromStartMeta = (executionData?.startedFromBlockId as string | undefined)
if (fromStartMeta && state.blocks?.[fromStartMeta]) {
initialBlockId = fromStartMeta
console.log('[Debug] Using startedFromBlockId as pending:', initialBlockId)
}
const maybeWebhookBlockId = (executionData?.trigger?.data as any)?.blockId as
| string
| undefined
if (maybeWebhookBlockId && state.blocks?.[maybeWebhookBlockId]) {
initialBlockId = maybeWebhookBlockId
console.log('[Debug] Using webhook-triggered block as pending:', initialBlockId)
} else {
// Fallback: find first span with a matching blockId in traceSpans
const spans = (executionData?.traceSpans as any[]) || []
const findFirstMatchingSpanBlockId = (list: any[]): string | undefined => {
for (const span of list) {
if (span?.blockId && state.blocks?.[span.blockId]) return span.blockId as string
if (span?.children && Array.isArray(span.children)) {
const nested = findFirstMatchingSpanBlockId(span.children)
if (nested) return nested
}
}
return undefined
}
const fromSpans = findFirstMatchingSpanBlockId(spans)
if (fromSpans) {
initialBlockId = fromSpans
console.log('[Debug] Using first trace span block as pending:', initialBlockId)
}
}
} catch {}
console.log('[Debug] Setting initial pending block:', initialBlockId)
execStore.setPendingBlocks(initialBlockId ? [initialBlockId] : [])
execStore.setPanelFocusedBlockId(initialBlockId || null)
// Build a minimal debug context so references can resolve from executed blocks
try {
const blockStates = new Map<string, any>()
const executedBlocks = new Set<string>()
const activeExecutionPath = new Set<string>()
// Helper: compute downstream path from a node using state.edges
const edges = (state.edges as any[]) || []
const forwardAdjLocal: Record<string, string[]> = {}
edges.forEach((e: any) => {
const s = e.source
const t = e.target
if (!forwardAdjLocal[s]) forwardAdjLocal[s] = []
forwardAdjLocal[s].push(t)
})
const addPathFrom = (id?: string) => {
if (!id) return
const q: string[] = [id]
const seen = new Set<string>()
while (q.length) {
const n = q.shift() as string
if (seen.has(n)) continue
seen.add(n)
activeExecutionPath.add(n)
const next = forwardAdjLocal[n] || []
for (const m of next) if (!seen.has(m)) q.push(m)
}
}
// Starter output from executionInput if available
let starterOutput: any = undefined
if (stateStarterId && subblockValues[stateStarterId]) {
const execInput = subblockValues[stateStarterId]._executionInput
if (execInput !== undefined) {
if (typeof execInput === 'string') {
starterOutput = { input: execInput }
} else if (execInput && typeof execInput === 'object') {
// Keep common keys at root if present
starterOutput = { ...execInput }
}
}
}
if (stateStarterId && starterOutput !== undefined) {
blockStates.set(stateStarterId, {
output: starterOutput,
executed: true,
executionTime: 0,
})
executedBlocks.add(stateStarterId)
addPathFrom(stateStarterId)
}
// Triggered block output from trace spans
const spans = (executionData?.traceSpans as any[]) || []
const findSpanByBlockId = (list: any[], id?: string): any | undefined => {
if (!id) return undefined
for (const span of list) {
if (span?.blockId === id) return span
if (span?.children && Array.isArray(span.children)) {
const nested = findSpanByBlockId(span.children, id)
if (nested) return nested
}
}
return undefined
}
const triggerSpan = findSpanByBlockId(spans, initialBlockId)
const triggerOutput = triggerSpan?.output || triggerSpan?.outputData || triggerSpan?.data
let effectiveTriggerOutput = triggerOutput
if (effectiveTriggerOutput === undefined && executionData?.initialInput !== undefined) {
effectiveTriggerOutput = executionData.initialInput
console.log('[Debug] Using initialInput as trigger output fallback')
}
if (initialBlockId && effectiveTriggerOutput !== undefined) {
blockStates.set(initialBlockId, {
output: effectiveTriggerOutput,
executed: true,
executionTime: 0,
})
executedBlocks.add(initialBlockId)
addPathFrom(initialBlockId)
}
const newDebugCtx: any = {
blockStates,
blockLogs: [],
executedBlocks,
activeExecutionPath,
environmentVariables: (executionData?.environment?.variables as any) || {},
workflowVariables: {},
parallelBlockMapping: new Map(),
// Execution metadata and scaffolding needed by the executor
metadata: { startTime: new Date().toISOString() },
decisions: { router: new Map(), condition: new Map() },
loopIterations: new Map(),
loopItems: new Map(),
completedLoops: new Set(),
parallelExecutions: new Map(),
selectedOutputIds: [],
}
// Attach serialized workflow onto context so handlers have structure available
const serializer = new Serializer()
const serialized = serializer.serializeWorkflow(
(state.blocks || {}) as any,
(state.edges || []) as any,
(state.loops || {}) as any,
(state.parallels || {}) as any,
true
)
newDebugCtx.workflow = serialized
execStore.setDebugContext(newDebugCtx)
// Initialize an executor so handleStepDebug can continue from the selected blocks
try {
const ex = new Executor({
workflow: serialized,
envVarValues: newDebugCtx.environmentVariables,
contextExtensions: {
edges: ((state.edges as any[]) || []).map((e: any) => ({ source: e.source, target: e.target })),
},
})
execStore.setExecutor(ex as any)
} catch (initErr) {
console.warn('[Debug] Executor init failed for frozen canvas stepping:', initErr)
}
} catch (ctxErr) {
console.warn('[Debug] Failed to synthesize debug context for frozen canvas:', ctxErr)
}
// Verify the canvas was activated and trigger a re-render
setTimeout(() => {

View File

@@ -100,6 +100,9 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
workspaceId: '', // TODO: Get from workflow if needed
variables: decryptedEnvVars,
initialInput: payload.body || {},
triggerData: { provider: payload.provider, blockId: payload.blockId },
startBlockId: payload.blockId,
executionType: 'webhook',
})
// Merge subblock states (matching workflow-execution pattern)

View File

@@ -37,11 +37,13 @@ export class ExecutionLogger implements IExecutionLoggerService {
environment: ExecutionEnvironment
workflowState: WorkflowState
initialInput?: Record<string, unknown>
startedFromBlockId?: string
executionType?: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
}): Promise<{
workflowLog: WorkflowExecutionLog
snapshot: WorkflowExecutionSnapshot
}> {
const { workflowId, executionId, trigger, environment, workflowState, initialInput } = params
const { workflowId, executionId, trigger, environment, workflowState, initialInput, startedFromBlockId, executionType } = params
logger.debug(`Starting workflow execution ${executionId} for workflow ${workflowId}`)
@@ -68,6 +70,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
environment,
trigger,
initialInput: initialInput || {},
startedFromBlockId: startedFromBlockId || undefined,
executionType: executionType || trigger.type,
},
})
.returning()
@@ -148,6 +152,30 @@ export class ExecutionLogger implements IExecutionLoggerService {
const existingExecutionData = (existing?.executionData as any) || {}
// Build simple block execution summaries from trace spans (flat list)
const blockExecutions: any[] = []
const collectBlocks = (spans?: any[]) => {
if (!Array.isArray(spans)) return
spans.forEach((span) => {
if (span?.blockId) {
blockExecutions.push({
id: span.id,
blockId: span.blockId,
blockName: span.name,
blockType: span.type,
startedAt: span.startTime,
endedAt: span.endTime,
durationMs: span.duration,
status: span.status || 'success',
inputData: span.input || {},
outputData: span.output || {},
})
}
if (span?.children && Array.isArray(span.children)) collectBlocks(span.children)
})
}
collectBlocks(traceSpans)
const [updatedLog] = await db
.update(workflowExecutionLogs)
.set({
@@ -159,6 +187,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
...existingExecutionData,
traceSpans,
finalOutput,
blockExecutions: blockExecutions,
tokenBreakdown: {
prompt: costSummary.totalPromptTokens,
completion: costSummary.totalCompletionTokens,

View File

@@ -22,6 +22,8 @@ export interface SessionStartParams {
variables?: Record<string, string>
triggerData?: Record<string, unknown>
initialInput?: Record<string, unknown>
startBlockId?: string
executionType?: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
}
export interface SessionCompleteParams {
@@ -62,7 +64,7 @@ export class LoggingSession {
}
async start(params: SessionStartParams = {}): Promise<void> {
const { userId, workspaceId, variables, triggerData, initialInput } = params
const { userId, workspaceId, variables, triggerData, initialInput, startBlockId, executionType } = params
try {
this.trigger = createTriggerObject(this.triggerType, triggerData)
@@ -82,6 +84,8 @@ export class LoggingSession {
environment: this.environment,
workflowState: this.workflowState,
initialInput,
startedFromBlockId: startBlockId,
executionType: executionType || this.triggerType,
})
if (this.requestId) {

View File

@@ -111,6 +111,11 @@ export interface WorkflowExecutionLog {
}
// Newly added: persist the original triggering input (starter/chat/api/webhook)
initialInput?: Record<string, unknown>
// Newly added: where execution began and type metadata
startedFromBlockId?: string
executionType?: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
// Optional precomputed block execution summaries
blockExecutions?: any[]
}
// Top-level cost information
cost?: {