fix(variables): fix variable resolution error and start block outputs (#1847)

* Fix var resolution if block is not upstream

* Filter convo id from start block outputs if not set

* Lint

* Start block outputs
This commit is contained in:
Siddharth Ganesan
2025-11-07 15:03:43 -08:00
committed by GitHub
parent 9fd2156e9e
commit b6c2c7456e
4 changed files with 112 additions and 62 deletions

View File

@@ -71,9 +71,21 @@ export class BlockExecutor {
try {
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
} finally {
} catch (error) {
cleanupSelfReference?.()
return this.handleBlockError(
error,
ctx,
node,
block,
startTime,
blockLog,
resolvedInputs,
isSentinel,
'input_resolution'
)
}
cleanupSelfReference?.()
try {
const output = handler.executeWithNode
@@ -120,58 +132,17 @@ export class BlockExecutor {
return normalizedOutput
} catch (error) {
const duration = Date.now() - startTime
const errorMessage = normalizeError(error)
if (blockLog) {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
}
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
this.state.setBlockOutput(node.id, errorOutput, duration)
logger.error('Block execution failed', {
blockId: node.id,
blockType: block.metadata?.id,
error: errorMessage,
})
if (!isSentinel) {
this.callOnBlockComplete(ctx, node, block, resolvedInputs, errorOutput, duration)
}
const hasErrorPort = this.hasErrorPortEdge(node)
if (hasErrorPort) {
logger.info('Block has error port - returning error output instead of throwing', {
blockId: node.id,
error: errorMessage,
})
return errorOutput
}
let errorToThrow: Error | string
if (error instanceof Error) {
errorToThrow = error
} else {
errorToThrow = errorMessage
}
throw buildBlockExecutionError({
return this.handleBlockError(
error,
ctx,
node,
block,
error: errorToThrow,
context: ctx,
additionalInfo: {
nodeId: node.id,
executionTime: duration,
},
})
startTime,
blockLog,
resolvedInputs,
isSentinel,
'execution'
)
}
}
@@ -196,6 +167,68 @@ export class BlockExecutor {
return this.blockHandlers.find((h) => h.canHandle(block))
}
private handleBlockError(
error: unknown,
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
startTime: number,
blockLog: BlockLog | undefined,
resolvedInputs: Record<string, any>,
isSentinel: boolean,
phase: 'input_resolution' | 'execution'
): NormalizedBlockOutput {
const duration = Date.now() - startTime
const errorMessage = normalizeError(error)
if (blockLog) {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
}
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
this.state.setBlockOutput(node.id, errorOutput, duration)
logger.error(
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
{
blockId: node.id,
blockType: block.metadata?.id,
error: errorMessage,
}
)
if (!isSentinel) {
this.callOnBlockComplete(ctx, node, block, resolvedInputs, errorOutput, duration)
}
const hasErrorPort = this.hasErrorPortEdge(node)
if (hasErrorPort) {
logger.info('Block has error port - returning error output instead of throwing', {
blockId: node.id,
error: errorMessage,
})
return errorOutput
}
const errorToThrow = error instanceof Error ? error : new Error(errorMessage)
throw buildBlockExecutionError({
block,
error: errorToThrow,
context: ctx,
additionalInfo: {
nodeId: node.id,
executionTime: duration,
},
})
}
private hasErrorPortEdge(node: DAGNode): boolean {
for (const [_, edge] of node.outgoingEdges) {
if (edge.sourceHandle === EDGE.ERROR) {

View File

@@ -71,8 +71,8 @@ describe('start-block utilities', () => {
})
expect(output.payload).toBe('value')
expect(output.input).toBe('')
expect(output.conversationId).toBe('')
expect(output.input).toBeUndefined()
expect(output.conversationId).toBeUndefined()
})
it('buildStartBlockOutput uses trigger schema for API triggers', () => {
@@ -101,8 +101,6 @@ describe('start-block utilities', () => {
size: 42,
type: 'text/plain',
key: 'file-key',
uploadedAt: new Date().toISOString(),
expiresAt: new Date(Date.now() + 1000).toISOString(),
},
]

View File

@@ -266,10 +266,25 @@ function buildUnifiedStartOutput(workflowInput: unknown): NormalizedBlockOutput
}
if (!Object.hasOwn(output, 'input')) {
output.input = ''
const fallbackInput =
isPlainObject(workflowInput) && typeof workflowInput.input !== 'undefined'
? ensureString(workflowInput.input)
: ''
output.input = fallbackInput ? fallbackInput : undefined
} else if (typeof output.input === 'string' && output.input.length === 0) {
output.input = undefined
}
if (!Object.hasOwn(output, 'conversationId')) {
output.conversationId = ''
const conversationId =
isPlainObject(workflowInput) && workflowInput.conversationId
? ensureString(workflowInput.conversationId)
: undefined
if (conversationId) {
output.conversationId = conversationId
}
} else if (typeof output.conversationId === 'string' && output.conversationId.length === 0) {
output.conversationId = undefined
}
return mergeFilesIntoOutput(output, workflowInput)
@@ -293,7 +308,11 @@ function buildChatOutput(workflowInput: unknown): NormalizedBlockOutput {
const output: NormalizedBlockOutput = {
input: ensureString(source?.input),
conversationId: ensureString(source?.conversationId),
}
const conversationId = ensureString(source?.conversationId)
if (conversationId) {
output.conversationId = conversationId
}
return mergeFilesIntoOutput(output, workflowInput)
@@ -319,7 +338,7 @@ function buildLegacyStarterOutput(
}
const conversationId = isPlainObject(workflowInput) ? workflowInput.conversationId : undefined
if (conversationId !== undefined) {
if (conversationId) {
output.conversationId = ensureString(conversationId)
}

View File

@@ -43,8 +43,8 @@ export class BlockResolver implements Resolver {
const output = this.getBlockOutput(blockId, context)
if (!output) {
throw new Error(`No state found for block "${blockName}"`)
if (output === undefined) {
return undefined
}
if (pathParts.length === 0) {
return output