From c1200efaa5bf17f83b620f3dbf487142f4377fce Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 20 Jan 2026 13:56:13 -0800 Subject: [PATCH] improvement(execution): update execution for passing base64 strings --- .../app/api/workflows/[id]/execute/route.ts | 16 +++---- .../components/tag-dropdown/tag-dropdown.tsx | 38 ++++------------ apps/sim/background/schedule-execution.ts | 2 + apps/sim/background/webhook-execution.ts | 3 ++ apps/sim/background/workflow-execution.ts | 2 + apps/sim/executor/execution/block-executor.ts | 12 +++++ apps/sim/executor/execution/executor.ts | 2 + apps/sim/executor/execution/types.ts | 2 + apps/sim/executor/types.ts | 13 ++++++ .../sim/executor/variables/resolvers/block.ts | 44 ++++++++++++++++++- .../sim/lib/workflows/blocks/block-outputs.ts | 21 ++++++++- .../workflows/executor/execute-workflow.ts | 4 ++ .../lib/workflows/executor/execution-core.ts | 20 ++++++--- .../executor/human-in-the-loop-manager.ts | 2 + apps/sim/lib/workflows/streaming/streaming.ts | 2 + 15 files changed, 132 insertions(+), 51 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index fa8636d21..a850c7ac9 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -14,7 +14,6 @@ import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { cleanupExecutionBase64Cache, - containsUserFileWithMetadata, hydrateUserFilesWithBase64, } from '@/lib/uploads/utils/user-file-base64.server' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' @@ -438,6 +437,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: snapshot, callbacks: {}, loggingSession, + includeFileBase64, + base64MaxBytes, }) const outputWithBase64 = includeFileBase64 @@ -596,15 +597,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: iterationContext?: IterationContext ) => { const hasError = callbackData.output?.error - const shouldHydrate = - includeFileBase64 && !hasError && containsUserFileWithMetadata(callbackData.output) - const outputWithBase64 = shouldHydrate - ? await hydrateUserFilesWithBase64(callbackData.output, { - requestId, - executionId, - maxBytes: base64MaxBytes, - }) - : callbackData.output if (hasError) { logger.info(`[${requestId}] ✗ onBlockComplete (error) called:`, { @@ -648,7 +640,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: blockName, blockType, input: callbackData.input, - output: outputWithBase64, + output: callbackData.output, durationMs: callbackData.executionTime || 0, ...(iterationContext && { iterationCurrent: iterationContext.iterationCurrent, @@ -733,6 +725,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, loggingSession, abortSignal: abortController.signal, + includeFileBase64, + base64MaxBytes, }) if (result.status === 'paused') { diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tag-dropdown/tag-dropdown.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tag-dropdown/tag-dropdown.tsx index d5fde3119..32491d54e 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tag-dropdown/tag-dropdown.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tag-dropdown/tag-dropdown.tsx @@ -214,40 +214,18 @@ const getOutputTypeForPath = ( outputPath: string, mergedSubBlocksOverride?: Record ): string => { - if (block?.triggerMode && blockConfig?.triggers?.enabled) { - return getBlockOutputType(block.type, outputPath, mergedSubBlocksOverride, true) - } - if (block?.type === 'starter') { - const startWorkflowValue = - mergedSubBlocksOverride?.startWorkflow?.value ?? getSubBlockValue(blockId, 'startWorkflow') + const subBlocks = + mergedSubBlocksOverride ?? useWorkflowStore.getState().blocks[blockId]?.subBlocks + const triggerMode = block?.triggerMode && blockConfig?.triggers?.enabled - if (startWorkflowValue === 'chat') { - const chatModeTypes: Record = { - input: 'string', - conversationId: 'string', - files: 'files', - } - return chatModeTypes[outputPath] || 'any' - } - const inputFormatValue = - mergedSubBlocksOverride?.inputFormat?.value ?? getSubBlockValue(blockId, 'inputFormat') - if (inputFormatValue && Array.isArray(inputFormatValue)) { - const field = inputFormatValue.find( - (f: { name?: string; type?: string }) => f.name === outputPath - ) - if (field?.type) return field.type - } - } else if (blockConfig?.category === 'triggers') { - const blockState = useWorkflowStore.getState().blocks[blockId] - const subBlocks = mergedSubBlocksOverride ?? (blockState?.subBlocks || {}) - return getBlockOutputType(block.type, outputPath, subBlocks) - } else { + if (blockConfig?.tools?.config?.tool) { const operationValue = getSubBlockValue(blockId, 'operation') - if (blockConfig && operationValue) { + if (operationValue) { return getToolOutputType(blockConfig, operationValue, outputPath) } } - return 'any' + + return getBlockOutputType(block?.type ?? '', outputPath, subBlocks, triggerMode) } /** @@ -1789,7 +1767,7 @@ export const TagDropdown: React.FC = ({ mergedSubBlocks ) - if (fieldType === 'files' || fieldType === 'array') { + if (fieldType === 'files' || fieldType === 'file[]' || fieldType === 'array') { const blockName = parts[0] const remainingPath = parts.slice(2).join('.') processedTag = `${blockName}.${arrayFieldName}[0].${remainingPath}` diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index f4fc2a443..7d19dc060 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -208,6 +208,8 @@ async function runWorkflowExecution({ snapshot, callbacks: {}, loggingSession, + includeFileBase64: true, + base64MaxBytes: undefined, }) if (executionResult.status === 'paused') { diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index fbe0f0883..c34b5497b 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -240,6 +240,8 @@ async function executeWebhookJobInternal( snapshot, callbacks: {}, loggingSession, + includeFileBase64: true, // Enable base64 hydration + base64MaxBytes: undefined, // Use default limit }) if (executionResult.status === 'paused') { @@ -493,6 +495,7 @@ async function executeWebhookJobInternal( snapshot, callbacks: {}, loggingSession, + includeFileBase64: true, }) if (executionResult.status === 'paused') { diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 491c9863b..6a8cca8b1 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -109,6 +109,8 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { snapshot, callbacks: {}, loggingSession, + includeFileBase64: true, + base64MaxBytes: undefined, }) if (result.status === 'paused') { diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 116056d35..2f60c96ef 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -3,6 +3,10 @@ import { mcpServers } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, inArray, isNull } from 'drizzle-orm' import { getBaseUrl } from '@/lib/core/utils/urls' +import { + containsUserFileWithMetadata, + hydrateUserFilesWithBase64, +} from '@/lib/uploads/utils/user-file-base64.server' import { BlockType, buildResumeApiUrl, @@ -135,6 +139,14 @@ export class BlockExecutor { normalizedOutput = this.normalizeOutput(output) } + if (ctx.includeFileBase64 && containsUserFileWithMetadata(normalizedOutput)) { + normalizedOutput = (await hydrateUserFilesWithBase64(normalizedOutput, { + requestId: ctx.metadata.requestId, + executionId: ctx.executionId, + maxBytes: ctx.base64MaxBytes, + })) as NormalizedBlockOutput + } + const duration = Date.now() - startTime if (blockLog) { diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index cf085b334..c8da45234 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -169,6 +169,8 @@ export class DAGExecutor { onBlockStart: this.contextExtensions.onBlockStart, onBlockComplete: this.contextExtensions.onBlockComplete, abortSignal: this.contextExtensions.abortSignal, + includeFileBase64: this.contextExtensions.includeFileBase64, + base64MaxBytes: this.contextExtensions.base64MaxBytes, } if (this.contextExtensions.resumeFromSnapshot) { diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 38d403f04..701f5de35 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -89,6 +89,8 @@ export interface ContextExtensions { * When aborted, the execution should stop gracefully. */ abortSignal?: AbortSignal + includeFileBase64?: boolean + base64MaxBytes?: number onStream?: (streamingExecution: unknown) => Promise onBlockStart?: ( blockId: string, diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index ce12bb6e0..27eaa0c2b 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -237,6 +237,19 @@ export interface ExecutionContext { // Dynamically added nodes that need to be scheduled (e.g., from parallel expansion) pendingDynamicNodes?: string[] + + /** + * When true, UserFile objects in block outputs will be hydrated with base64 content + * before being stored in execution state. This ensures base64 is available for + * variable resolution in downstream blocks. + */ + includeFileBase64?: boolean + + /** + * Maximum file size in bytes for base64 hydration. Files larger than this limit + * will not have their base64 content fetched. + */ + base64MaxBytes?: number } export interface ExecutionResult { diff --git a/apps/sim/executor/variables/resolvers/block.ts b/apps/sim/executor/variables/resolvers/block.ts index 536b30de1..0a4d46f3d 100644 --- a/apps/sim/executor/variables/resolvers/block.ts +++ b/apps/sim/executor/variables/resolvers/block.ts @@ -21,14 +21,54 @@ function isPathInOutputSchema( return true } + const isFileArrayType = (value: any): boolean => + value?.type === 'file[]' || value?.type === 'files' + let current: any = outputs for (let i = 0; i < pathParts.length; i++) { const part = pathParts[i] + const arrayMatch = part.match(/^([^[]+)\[(\d+)\]$/) + if (arrayMatch) { + const [, prop] = arrayMatch + let fieldDef: any + + if (prop in current) { + fieldDef = current[prop] + } else if (current.properties && prop in current.properties) { + fieldDef = current.properties[prop] + } else if (current.type === 'array' && current.items) { + if (current.items.properties && prop in current.items.properties) { + fieldDef = current.items.properties[prop] + } else if (prop in current.items) { + fieldDef = current.items[prop] + } + } + + if (!fieldDef) { + return false + } + + if (isFileArrayType(fieldDef)) { + if (i + 1 < pathParts.length) { + return USER_FILE_ACCESSIBLE_PROPERTIES.includes(pathParts[i + 1] as any) + } + return true + } + + if (fieldDef.type === 'array' && fieldDef.items) { + current = fieldDef.items + continue + } + + current = fieldDef + continue + } + // Handle array index access (e.g., [0]) if (/^\d+$/.test(part)) { // If current is file[] type, next part should be a file property - if (current?.type === 'file[]') { + if (isFileArrayType(current)) { // Check if next part is a valid file property if (i + 1 < pathParts.length) { const nextPart = pathParts[i + 1] @@ -78,7 +118,7 @@ function isPathInOutputSchema( } // Handle file[] type - allow access to file properties after array index - if (current?.type === 'file[]' && USER_FILE_ACCESSIBLE_PROPERTIES.includes(part as any)) { + if (isFileArrayType(current) && USER_FILE_ACCESSIBLE_PROPERTIES.includes(part as any)) { // Valid file property access return true } diff --git a/apps/sim/lib/workflows/blocks/block-outputs.ts b/apps/sim/lib/workflows/blocks/block-outputs.ts index c914227ab..dd58a2ff5 100644 --- a/apps/sim/lib/workflows/blocks/block-outputs.ts +++ b/apps/sim/lib/workflows/blocks/block-outputs.ts @@ -351,7 +351,7 @@ function collectOutputPaths( if (value && typeof value === 'object' && 'type' in value) { const typedValue = value as { type: unknown } - if (typedValue.type === 'files') { + if (typedValue.type === 'files' || typedValue.type === 'file[]') { paths.push(...expandFileTypeProperties(path)) } else { paths.push(path) @@ -393,7 +393,8 @@ function getFilePropertyType(outputs: OutputDefinition, pathParts: string[]): st current && typeof current === 'object' && 'type' in current && - (current as { type: unknown }).type === 'files' + ((current as { type: unknown }).type === 'files' || + (current as { type: unknown }).type === 'file[]') ) { return USER_FILE_PROPERTY_TYPES[lastPart as keyof typeof USER_FILE_PROPERTY_TYPES] } @@ -462,6 +463,11 @@ function generateOutputPaths(outputs: Record, prefix = ''): string[ paths.push(currentPath) } else if (typeof value === 'object' && value !== null) { if ('type' in value && typeof value.type === 'string') { + if (value.type === 'files' || value.type === 'file[]') { + paths.push(...expandFileTypeProperties(currentPath)) + continue + } + const hasNestedProperties = ((value.type === 'object' || value.type === 'json') && value.properties) || (value.type === 'array' && value.items?.properties) || @@ -518,6 +524,17 @@ function generateOutputPathsWithTypes( paths.push({ path: currentPath, type: value }) } else if (typeof value === 'object' && value !== null) { if ('type' in value && typeof value.type === 'string') { + if (value.type === 'files' || value.type === 'file[]') { + paths.push({ path: currentPath, type: value.type }) + for (const prop of USER_FILE_ACCESSIBLE_PROPERTIES) { + paths.push({ + path: `${currentPath}.${prop}`, + type: USER_FILE_PROPERTY_TYPES[prop as keyof typeof USER_FILE_PROPERTY_TYPES], + }) + } + continue + } + if (value.type === 'array' && value.items?.properties) { paths.push({ path: currentPath, type: 'array' }) const subPaths = generateOutputPathsWithTypes(value.items.properties, currentPath) diff --git a/apps/sim/lib/workflows/executor/execute-workflow.ts b/apps/sim/lib/workflows/executor/execute-workflow.ts index ce6f4c2c0..1ed65c119 100644 --- a/apps/sim/lib/workflows/executor/execute-workflow.ts +++ b/apps/sim/lib/workflows/executor/execute-workflow.ts @@ -17,6 +17,8 @@ export interface ExecuteWorkflowOptions { onStream?: (streamingExec: StreamingExecution) => Promise onBlockComplete?: (blockId: string, output: unknown) => Promise skipLoggingComplete?: boolean + includeFileBase64?: boolean + base64MaxBytes?: number } export interface WorkflowInfo { @@ -78,6 +80,8 @@ export async function executeWorkflow( : undefined, }, loggingSession, + includeFileBase64: streamConfig?.includeFileBase64, + base64MaxBytes: streamConfig?.base64MaxBytes, }) if (result.status === 'paused') { diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index a98aa3227..8cac4fcdc 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -37,12 +37,10 @@ export interface ExecuteWorkflowCoreOptions { snapshot: ExecutionSnapshot callbacks: ExecutionCallbacks loggingSession: LoggingSession - skipLogCreation?: boolean // For resume executions - reuse existing log entry - /** - * AbortSignal for cancellation support. - * When aborted (e.g., client disconnects from SSE), execution stops gracefully. - */ + skipLogCreation?: boolean abortSignal?: AbortSignal + includeFileBase64?: boolean + base64MaxBytes?: number } function parseVariableValueByType(value: unknown, type: string): unknown { @@ -109,7 +107,15 @@ function parseVariableValueByType(value: unknown, type: string): unknown { export async function executeWorkflowCore( options: ExecuteWorkflowCoreOptions ): Promise { - const { snapshot, callbacks, loggingSession, skipLogCreation, abortSignal } = options + const { + snapshot, + callbacks, + loggingSession, + skipLogCreation, + abortSignal, + includeFileBase64, + base64MaxBytes, + } = options const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } = metadata @@ -334,6 +340,8 @@ export async function executeWorkflowCore( snapshotState: snapshot.state, metadata, abortSignal, + includeFileBase64, + base64MaxBytes, } const executorInstance = new Executor({ diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index f695e8dc6..936f7cd29 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -751,6 +751,8 @@ export class PauseResumeManager { callbacks: {}, loggingSession, skipLogCreation: true, // Reuse existing log entry + includeFileBase64: true, // Enable base64 hydration + base64MaxBytes: undefined, // Use default limit }) } diff --git a/apps/sim/lib/workflows/streaming/streaming.ts b/apps/sim/lib/workflows/streaming/streaming.ts index 725c30cf2..88e7a584d 100644 --- a/apps/sim/lib/workflows/streaming/streaming.ts +++ b/apps/sim/lib/workflows/streaming/streaming.ts @@ -282,6 +282,8 @@ export async function createStreamingResponse( onStream: onStreamCallback, onBlockComplete: onBlockCompleteCallback, skipLoggingComplete: true, + includeFileBase64: streamConfig.includeFileBase64, + base64MaxBytes: streamConfig.base64MaxBytes, }, executionId )