improvement(execution): update execution for passing base64 strings

This commit is contained in:
Vikhyath Mondreti
2026-01-20 13:56:13 -08:00
parent 05c4538bb6
commit c1200efaa5
15 changed files with 132 additions and 51 deletions

View File

@@ -14,7 +14,6 @@ import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import {
cleanupExecutionBase64Cache,
containsUserFileWithMetadata,
hydrateUserFilesWithBase64,
} from '@/lib/uploads/utils/user-file-base64.server'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
@@ -438,6 +437,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
snapshot,
callbacks: {},
loggingSession,
includeFileBase64,
base64MaxBytes,
})
const outputWithBase64 = includeFileBase64
@@ -596,15 +597,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationContext?: IterationContext
) => {
const hasError = callbackData.output?.error
const shouldHydrate =
includeFileBase64 && !hasError && containsUserFileWithMetadata(callbackData.output)
const outputWithBase64 = shouldHydrate
? await hydrateUserFilesWithBase64(callbackData.output, {
requestId,
executionId,
maxBytes: base64MaxBytes,
})
: callbackData.output
if (hasError) {
logger.info(`[${requestId}] ✗ onBlockComplete (error) called:`, {
@@ -648,7 +640,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
blockName,
blockType,
input: callbackData.input,
output: outputWithBase64,
output: callbackData.output,
durationMs: callbackData.executionTime || 0,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
@@ -733,6 +725,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
loggingSession,
abortSignal: abortController.signal,
includeFileBase64,
base64MaxBytes,
})
if (result.status === 'paused') {

View File

@@ -214,40 +214,18 @@ const getOutputTypeForPath = (
outputPath: string,
mergedSubBlocksOverride?: Record<string, any>
): string => {
if (block?.triggerMode && blockConfig?.triggers?.enabled) {
return getBlockOutputType(block.type, outputPath, mergedSubBlocksOverride, true)
}
if (block?.type === 'starter') {
const startWorkflowValue =
mergedSubBlocksOverride?.startWorkflow?.value ?? getSubBlockValue(blockId, 'startWorkflow')
const subBlocks =
mergedSubBlocksOverride ?? useWorkflowStore.getState().blocks[blockId]?.subBlocks
const triggerMode = block?.triggerMode && blockConfig?.triggers?.enabled
if (startWorkflowValue === 'chat') {
const chatModeTypes: Record<string, string> = {
input: 'string',
conversationId: 'string',
files: 'files',
}
return chatModeTypes[outputPath] || 'any'
}
const inputFormatValue =
mergedSubBlocksOverride?.inputFormat?.value ?? getSubBlockValue(blockId, 'inputFormat')
if (inputFormatValue && Array.isArray(inputFormatValue)) {
const field = inputFormatValue.find(
(f: { name?: string; type?: string }) => f.name === outputPath
)
if (field?.type) return field.type
}
} else if (blockConfig?.category === 'triggers') {
const blockState = useWorkflowStore.getState().blocks[blockId]
const subBlocks = mergedSubBlocksOverride ?? (blockState?.subBlocks || {})
return getBlockOutputType(block.type, outputPath, subBlocks)
} else {
if (blockConfig?.tools?.config?.tool) {
const operationValue = getSubBlockValue(blockId, 'operation')
if (blockConfig && operationValue) {
if (operationValue) {
return getToolOutputType(blockConfig, operationValue, outputPath)
}
}
return 'any'
return getBlockOutputType(block?.type ?? '', outputPath, subBlocks, triggerMode)
}
/**
@@ -1789,7 +1767,7 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
mergedSubBlocks
)
if (fieldType === 'files' || fieldType === 'array') {
if (fieldType === 'files' || fieldType === 'file[]' || fieldType === 'array') {
const blockName = parts[0]
const remainingPath = parts.slice(2).join('.')
processedTag = `${blockName}.${arrayFieldName}[0].${remainingPath}`

View File

@@ -208,6 +208,8 @@ async function runWorkflowExecution({
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
})
if (executionResult.status === 'paused') {

View File

@@ -240,6 +240,8 @@ async function executeWebhookJobInternal(
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true, // Enable base64 hydration
base64MaxBytes: undefined, // Use default limit
})
if (executionResult.status === 'paused') {
@@ -493,6 +495,7 @@ async function executeWebhookJobInternal(
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true,
})
if (executionResult.status === 'paused') {

View File

@@ -109,6 +109,8 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
})
if (result.status === 'paused') {

View File

@@ -3,6 +3,10 @@ import { mcpServers } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray, isNull } from 'drizzle-orm'
import { getBaseUrl } from '@/lib/core/utils/urls'
import {
containsUserFileWithMetadata,
hydrateUserFilesWithBase64,
} from '@/lib/uploads/utils/user-file-base64.server'
import {
BlockType,
buildResumeApiUrl,
@@ -135,6 +139,14 @@ export class BlockExecutor {
normalizedOutput = this.normalizeOutput(output)
}
if (ctx.includeFileBase64 && containsUserFileWithMetadata(normalizedOutput)) {
normalizedOutput = (await hydrateUserFilesWithBase64(normalizedOutput, {
requestId: ctx.metadata.requestId,
executionId: ctx.executionId,
maxBytes: ctx.base64MaxBytes,
})) as NormalizedBlockOutput
}
const duration = Date.now() - startTime
if (blockLog) {

View File

@@ -169,6 +169,8 @@ export class DAGExecutor {
onBlockStart: this.contextExtensions.onBlockStart,
onBlockComplete: this.contextExtensions.onBlockComplete,
abortSignal: this.contextExtensions.abortSignal,
includeFileBase64: this.contextExtensions.includeFileBase64,
base64MaxBytes: this.contextExtensions.base64MaxBytes,
}
if (this.contextExtensions.resumeFromSnapshot) {

View File

@@ -89,6 +89,8 @@ export interface ContextExtensions {
* When aborted, the execution should stop gracefully.
*/
abortSignal?: AbortSignal
includeFileBase64?: boolean
base64MaxBytes?: number
onStream?: (streamingExecution: unknown) => Promise<void>
onBlockStart?: (
blockId: string,

View File

@@ -237,6 +237,19 @@ export interface ExecutionContext {
// Dynamically added nodes that need to be scheduled (e.g., from parallel expansion)
pendingDynamicNodes?: string[]
/**
* When true, UserFile objects in block outputs will be hydrated with base64 content
* before being stored in execution state. This ensures base64 is available for
* variable resolution in downstream blocks.
*/
includeFileBase64?: boolean
/**
* Maximum file size in bytes for base64 hydration. Files larger than this limit
* will not have their base64 content fetched.
*/
base64MaxBytes?: number
}
export interface ExecutionResult {

View File

@@ -21,14 +21,54 @@ function isPathInOutputSchema(
return true
}
const isFileArrayType = (value: any): boolean =>
value?.type === 'file[]' || value?.type === 'files'
let current: any = outputs
for (let i = 0; i < pathParts.length; i++) {
const part = pathParts[i]
const arrayMatch = part.match(/^([^[]+)\[(\d+)\]$/)
if (arrayMatch) {
const [, prop] = arrayMatch
let fieldDef: any
if (prop in current) {
fieldDef = current[prop]
} else if (current.properties && prop in current.properties) {
fieldDef = current.properties[prop]
} else if (current.type === 'array' && current.items) {
if (current.items.properties && prop in current.items.properties) {
fieldDef = current.items.properties[prop]
} else if (prop in current.items) {
fieldDef = current.items[prop]
}
}
if (!fieldDef) {
return false
}
if (isFileArrayType(fieldDef)) {
if (i + 1 < pathParts.length) {
return USER_FILE_ACCESSIBLE_PROPERTIES.includes(pathParts[i + 1] as any)
}
return true
}
if (fieldDef.type === 'array' && fieldDef.items) {
current = fieldDef.items
continue
}
current = fieldDef
continue
}
// Handle array index access (e.g., [0])
if (/^\d+$/.test(part)) {
// If current is file[] type, next part should be a file property
if (current?.type === 'file[]') {
if (isFileArrayType(current)) {
// Check if next part is a valid file property
if (i + 1 < pathParts.length) {
const nextPart = pathParts[i + 1]
@@ -78,7 +118,7 @@ function isPathInOutputSchema(
}
// Handle file[] type - allow access to file properties after array index
if (current?.type === 'file[]' && USER_FILE_ACCESSIBLE_PROPERTIES.includes(part as any)) {
if (isFileArrayType(current) && USER_FILE_ACCESSIBLE_PROPERTIES.includes(part as any)) {
// Valid file property access
return true
}

View File

@@ -351,7 +351,7 @@ function collectOutputPaths(
if (value && typeof value === 'object' && 'type' in value) {
const typedValue = value as { type: unknown }
if (typedValue.type === 'files') {
if (typedValue.type === 'files' || typedValue.type === 'file[]') {
paths.push(...expandFileTypeProperties(path))
} else {
paths.push(path)
@@ -393,7 +393,8 @@ function getFilePropertyType(outputs: OutputDefinition, pathParts: string[]): st
current &&
typeof current === 'object' &&
'type' in current &&
(current as { type: unknown }).type === 'files'
((current as { type: unknown }).type === 'files' ||
(current as { type: unknown }).type === 'file[]')
) {
return USER_FILE_PROPERTY_TYPES[lastPart as keyof typeof USER_FILE_PROPERTY_TYPES]
}
@@ -462,6 +463,11 @@ function generateOutputPaths(outputs: Record<string, any>, prefix = ''): string[
paths.push(currentPath)
} else if (typeof value === 'object' && value !== null) {
if ('type' in value && typeof value.type === 'string') {
if (value.type === 'files' || value.type === 'file[]') {
paths.push(...expandFileTypeProperties(currentPath))
continue
}
const hasNestedProperties =
((value.type === 'object' || value.type === 'json') && value.properties) ||
(value.type === 'array' && value.items?.properties) ||
@@ -518,6 +524,17 @@ function generateOutputPathsWithTypes(
paths.push({ path: currentPath, type: value })
} else if (typeof value === 'object' && value !== null) {
if ('type' in value && typeof value.type === 'string') {
if (value.type === 'files' || value.type === 'file[]') {
paths.push({ path: currentPath, type: value.type })
for (const prop of USER_FILE_ACCESSIBLE_PROPERTIES) {
paths.push({
path: `${currentPath}.${prop}`,
type: USER_FILE_PROPERTY_TYPES[prop as keyof typeof USER_FILE_PROPERTY_TYPES],
})
}
continue
}
if (value.type === 'array' && value.items?.properties) {
paths.push({ path: currentPath, type: 'array' })
const subPaths = generateOutputPathsWithTypes(value.items.properties, currentPath)

View File

@@ -17,6 +17,8 @@ export interface ExecuteWorkflowOptions {
onStream?: (streamingExec: StreamingExecution) => Promise<void>
onBlockComplete?: (blockId: string, output: unknown) => Promise<void>
skipLoggingComplete?: boolean
includeFileBase64?: boolean
base64MaxBytes?: number
}
export interface WorkflowInfo {
@@ -78,6 +80,8 @@ export async function executeWorkflow(
: undefined,
},
loggingSession,
includeFileBase64: streamConfig?.includeFileBase64,
base64MaxBytes: streamConfig?.base64MaxBytes,
})
if (result.status === 'paused') {

View File

@@ -37,12 +37,10 @@ export interface ExecuteWorkflowCoreOptions {
snapshot: ExecutionSnapshot
callbacks: ExecutionCallbacks
loggingSession: LoggingSession
skipLogCreation?: boolean // For resume executions - reuse existing log entry
/**
* AbortSignal for cancellation support.
* When aborted (e.g., client disconnects from SSE), execution stops gracefully.
*/
skipLogCreation?: boolean
abortSignal?: AbortSignal
includeFileBase64?: boolean
base64MaxBytes?: number
}
function parseVariableValueByType(value: unknown, type: string): unknown {
@@ -109,7 +107,15 @@ function parseVariableValueByType(value: unknown, type: string): unknown {
export async function executeWorkflowCore(
options: ExecuteWorkflowCoreOptions
): Promise<ExecutionResult> {
const { snapshot, callbacks, loggingSession, skipLogCreation, abortSignal } = options
const {
snapshot,
callbacks,
loggingSession,
skipLogCreation,
abortSignal,
includeFileBase64,
base64MaxBytes,
} = options
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
metadata
@@ -334,6 +340,8 @@ export async function executeWorkflowCore(
snapshotState: snapshot.state,
metadata,
abortSignal,
includeFileBase64,
base64MaxBytes,
}
const executorInstance = new Executor({

View File

@@ -751,6 +751,8 @@ export class PauseResumeManager {
callbacks: {},
loggingSession,
skipLogCreation: true, // Reuse existing log entry
includeFileBase64: true, // Enable base64 hydration
base64MaxBytes: undefined, // Use default limit
})
}

View File

@@ -282,6 +282,8 @@ export async function createStreamingResponse(
onStream: onStreamCallback,
onBlockComplete: onBlockCompleteCallback,
skipLoggingComplete: true,
includeFileBase64: streamConfig.includeFileBase64,
base64MaxBytes: streamConfig.base64MaxBytes,
},
executionId
)