fix(subflow-validation): validate subflow fields correctly + surface serialization errors in the logs correctly (#1299)

* fix(subflow-validation): validate subflow fields correctly + surface serialiazation errors in the logs correctly

* remove comments
This commit is contained in:
Vikhyath Mondreti
2025-09-09 18:02:30 -07:00
committed by GitHub
parent 3d4b9f0665
commit 8841e9bd6b
4 changed files with 221 additions and 29 deletions

View File

@@ -24,42 +24,48 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
const body = await request.json()
const { logs, executionId, result } = body
// If result is provided, use logging system for full tool call extraction
if (result) {
logger.info(`[${requestId}] Persisting execution result for workflow: ${id}`, {
executionId,
success: result.success,
})
// Check if this execution is from chat using only the explicit source flag
const isChatExecution = result.metadata?.source === 'chat'
// Also log to logging system
const triggerType = isChatExecution ? 'chat' : 'manual'
const loggingSession = new LoggingSession(id, executionId, triggerType, requestId)
const userId = validation.workflow.userId
const workspaceId = validation.workflow.workspaceId || ''
await loggingSession.safeStart({
userId: '', // TODO: Get from session
workspaceId: '', // TODO: Get from workflow
userId,
workspaceId,
variables: {},
})
// Build trace spans from execution logs
const { traceSpans } = buildTraceSpans(result)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: result.metadata?.duration || 0,
finalOutput: result.output || {},
traceSpans,
})
if (result.success === false) {
const message = result.error || 'Workflow execution failed'
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: result.metadata?.duration || 0,
error: { message },
})
} else {
const { traceSpans } = buildTraceSpans(result)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: result.metadata?.duration || 0,
finalOutput: result.output || {},
traceSpans,
})
}
return createSuccessResponse({
message: 'Execution logs persisted successfully',
})
}
// Fall back to the original log format if 'result' isn't provided
if (!logs || !Array.isArray(logs) || logs.length === 0) {
logger.warn(`[${requestId}] No logs provided for workflow: ${id}`)
return createErrorResponse('No logs provided', 400)

View File

@@ -7,7 +7,7 @@ import { getBlock } from '@/blocks'
import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
import { Serializer, WorkflowValidationError } from '@/serializer'
import type { SerializedWorkflow } from '@/serializer/types'
import { useExecutionStore } from '@/stores/execution/store'
import { useConsoleStore } from '@/stores/panel/console/store'
@@ -16,6 +16,7 @@ import { useEnvironmentStore } from '@/stores/settings/environment/store'
import { useGeneralStore } from '@/stores/settings/general/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
import { useCurrentWorkflow } from './use-current-workflow'
const logger = createLogger('useWorkflowExecution')
@@ -488,7 +489,7 @@ export function useWorkflowExecution() {
}
return result
} catch (error: any) {
const errorResult = handleExecutionError(error)
const errorResult = handleExecutionError(error, { executionId })
persistLogs(executionId, errorResult).catch((err) => {
logger.error('Error persisting logs:', { error: err })
})
@@ -518,12 +519,7 @@ export function useWorkflowExecution() {
executionId?: string
): Promise<ExecutionResult | StreamingExecution> => {
// Use currentWorkflow but check if we're in diff mode
const {
blocks: workflowBlocks,
edges: workflowEdges,
loops: workflowLoops,
parallels: workflowParallels,
} = currentWorkflow
const { blocks: workflowBlocks, edges: workflowEdges } = currentWorkflow
// Filter out blocks without type (these are layout-only blocks)
const validBlocks = Object.entries(workflowBlocks).reduce(
@@ -646,12 +642,17 @@ export function useWorkflowExecution() {
(edge) => !triggerBlockIds.includes(edge.source) && !triggerBlockIds.includes(edge.target)
)
// Create serialized workflow with filtered blocks and edges
// Derive subflows from the current filtered graph to avoid stale state
const runtimeLoops = generateLoopBlocks(filteredStates)
const runtimeParallels = generateParallelBlocks(filteredStates)
// Create serialized workflow with validation enabled
const workflow = new Serializer().serializeWorkflow(
filteredStates,
filteredEdges,
workflowLoops,
workflowParallels
runtimeLoops,
runtimeParallels,
true
)
// If this is a chat execution, get the selected outputs
@@ -690,7 +691,7 @@ export function useWorkflowExecution() {
return newExecutor.execute(activeWorkflowId || '')
}
const handleExecutionError = (error: any) => {
const handleExecutionError = (error: any, options?: { executionId?: string }) => {
let errorMessage = 'Unknown error'
if (error instanceof Error) {
errorMessage = error.message || `Error: ${String(error)}`
@@ -723,6 +724,36 @@ export function useWorkflowExecution() {
errorMessage = 'API request failed - no specific error details available'
}
// If we failed before creating an executor (e.g., serializer validation), add a console entry
if (!executor) {
try {
// Prefer attributing to specific subflow if we have a structured error
let blockId = 'serialization'
let blockName = 'Serialization'
let blockType = 'serializer'
if (error instanceof WorkflowValidationError) {
blockId = error.blockId || blockId
blockName = error.blockName || blockName
blockType = error.blockType || blockType
}
useConsoleStore.getState().addConsole({
input: {},
output: {},
success: false,
error: errorMessage,
durationMs: 0,
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
workflowId: activeWorkflowId || '',
blockId,
executionId: options?.executionId,
blockName,
blockType,
})
} catch {}
}
const errorResult: ExecutionResult = {
success: false,
output: {},

View File

@@ -190,9 +190,51 @@ export class LoggingSession {
return true
} catch (error) {
if (this.requestId) {
logger.error(`[${this.requestId}] Logging start failed:`, error)
logger.warn(
`[${this.requestId}] Logging start failed - falling back to minimal session:`,
error
)
}
// Fallback: create a minimal logging session without full workflow state
try {
const { userId, workspaceId, variables, triggerData } = params
this.trigger = createTriggerObject(this.triggerType, triggerData)
this.environment = createEnvironmentObject(
this.workflowId,
this.executionId,
userId,
workspaceId,
variables
)
// Minimal workflow state when normalized data is unavailable
this.workflowState = {
blocks: {},
edges: [],
loops: {},
parallels: {},
} as unknown as WorkflowState
await executionLogger.startWorkflowExecution({
workflowId: this.workflowId,
executionId: this.executionId,
trigger: this.trigger,
environment: this.environment,
workflowState: this.workflowState,
})
if (this.requestId) {
logger.debug(
`[${this.requestId}] Started minimal logging for execution ${this.executionId}`
)
}
return true
} catch (fallbackError) {
if (this.requestId) {
logger.error(`[${this.requestId}] Minimal logging start also failed:`, fallbackError)
}
return false
}
return false
}
}

View File

@@ -8,6 +8,21 @@ import { getTool } from '@/tools/utils'
const logger = createLogger('Serializer')
/**
* Structured validation error for pre-execution workflow validation
*/
export class WorkflowValidationError extends Error {
constructor(
message: string,
public blockId?: string,
public blockType?: string,
public blockName?: string
) {
super(message)
this.name = 'WorkflowValidationError'
}
}
/**
* Helper function to check if a subblock should be included in serialization based on current mode
*/
@@ -29,6 +44,11 @@ export class Serializer {
parallels?: Record<string, Parallel>,
validateRequired = false
): SerializedWorkflow {
// Validate subflow requirements (loops/parallels) before serialization if requested
if (validateRequired) {
this.validateSubflowsBeforeExecution(blocks, loops || {}, parallels || {})
}
return {
version: '1.0',
blocks: Object.values(blocks).map((block) => this.serializeBlock(block, validateRequired)),
@@ -43,6 +63,99 @@ export class Serializer {
}
}
/**
* Validate loop and parallel subflows for required inputs when running in "each/collection" modes
*/
private validateSubflowsBeforeExecution(
blocks: Record<string, BlockState>,
loops: Record<string, Loop>,
parallels: Record<string, Parallel>
): void {
// Validate loops in forEach mode
Object.values(loops || {}).forEach((loop) => {
if (!loop) return
if (loop.loopType === 'forEach') {
const items = (loop as any).forEachItems
const hasNonEmptyCollection = (() => {
if (items === undefined || items === null) return false
if (Array.isArray(items)) return items.length > 0
if (typeof items === 'object') return Object.keys(items).length > 0
if (typeof items === 'string') {
const trimmed = items.trim()
if (trimmed.length === 0) return false
// If it looks like JSON, parse to confirm non-empty [] / {}
if (trimmed.startsWith('[') || trimmed.startsWith('{')) {
try {
const parsed = JSON.parse(trimmed)
if (Array.isArray(parsed)) return parsed.length > 0
if (parsed && typeof parsed === 'object') return Object.keys(parsed).length > 0
} catch {
// Non-JSON or invalid JSON string allow non-empty string (could be a reference like <start.items>)
return true
}
}
// Non-JSON string allow (may be a variable reference/expression)
return true
}
return false
})()
if (!hasNonEmptyCollection) {
const blockName = blocks[loop.id]?.name || 'Loop'
const error = new WorkflowValidationError(
`${blockName} requires a collection for forEach mode. Provide a non-empty array/object or a variable reference.`,
loop.id,
'loop',
blockName
)
throw error
}
}
})
// Validate parallels in collection mode
Object.values(parallels || {}).forEach((parallel) => {
if (!parallel) return
if ((parallel as any).parallelType === 'collection') {
const distribution = (parallel as any).distribution
const hasNonEmptyDistribution = (() => {
if (distribution === undefined || distribution === null) return false
if (Array.isArray(distribution)) return distribution.length > 0
if (typeof distribution === 'object') return Object.keys(distribution).length > 0
if (typeof distribution === 'string') {
const trimmed = distribution.trim()
if (trimmed.length === 0) return false
// If it looks like JSON, parse to confirm non-empty [] / {}
if (trimmed.startsWith('[') || trimmed.startsWith('{')) {
try {
const parsed = JSON.parse(trimmed)
if (Array.isArray(parsed)) return parsed.length > 0
if (parsed && typeof parsed === 'object') return Object.keys(parsed).length > 0
} catch {
return true
}
}
return true
}
return false
})()
if (!hasNonEmptyDistribution) {
const blockName = blocks[parallel.id]?.name || 'Parallel'
const error = new WorkflowValidationError(
`${blockName} requires a collection for collection mode. Provide a non-empty array/object or a variable reference.`,
parallel.id,
'parallel',
blockName
)
throw error
}
}
})
}
private serializeBlock(block: BlockState, validateRequired = false): SerializedBlock {
// Special handling for subflow blocks (loops, parallels, etc.)
if (block.type === 'loop' || block.type === 'parallel') {