fix(loops) (#238)

* fix(loops): added resolve for string collections and added single block loop

* improvement(loop-input): error validation removed

* improvement(loops): fixed now for source/external blocks; doesn't work for multi-block loop (>2)

* fix(loops): complex looping functions fully work

* improvement: cleaned up debugging logs
This commit is contained in:
Emir Karabeg
2025-04-10 00:15:42 -07:00
committed by GitHub
parent 23abb29b88
commit 51dde8cbee
10 changed files with 590 additions and 227 deletions

View File

@@ -296,7 +296,7 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
data-handleid="target"
isConnectableStart={false}
isConnectableEnd={true}
isValidConnection={(connection) => connection.source !== id}
isValidConnection={(connection) => true}
/>
)}
@@ -493,7 +493,7 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
data-handleid="source"
isConnectableStart={true}
isConnectableEnd={false}
isValidConnection={(connection) => connection.target !== id}
isValidConnection={(connection) => true}
/>
{/* Error Handle - Don't show for starter blocks */}
@@ -533,7 +533,7 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
data-handleid="error"
isConnectableStart={true}
isConnectableEnd={false}
isValidConnection={(connection) => connection.target !== id}
isValidConnection={(connection) => true}
/>
)}
</>

View File

@@ -127,13 +127,15 @@ export function LoopInput({ id }: NodeProps) {
</Badge>
</PopoverTrigger>
<PopoverContent
className={cn('p-3', loopType !== 'for' ? 'w-64' : 'w-48')}
className={cn('p-3', loopType !== 'for' ? 'w-72' : 'w-48')}
align="start"
onClick={(e) => e.stopPropagation()}
>
<div className="space-y-2">
<div className="text-xs font-medium text-muted-foreground">
{loopType === 'for' ? 'Loop Iterations' : 'Collection Items'}
<div className="flex items-center justify-between">
<div className="text-xs font-medium text-muted-foreground">
{loopType === 'for' ? 'Loop Iterations' : 'Collection Items'}
</div>
</div>
{loopType === 'for' ? (
@@ -177,7 +179,7 @@ export function LoopInput({ id }: NodeProps) {
<div className="text-[10px] text-muted-foreground">
{loopType === 'for'
? 'Enter a number between 1 and 50'
: 'Define the collection to iterate over'}
: 'Array or object to iterate over'}
</div>
</div>
</PopoverContent>

View File

@@ -19,7 +19,7 @@ type AirtableResponse =
export const AirtableBlock: BlockConfig<AirtableResponse> = {
type: 'airtable',
name: 'Airtable',
description: 'Read, create, and update Airtable records',
description: 'Read, create, and update Airtable',
longDescription:
'Integrate Airtable functionality to manage table records. List, get, create, ' +
'update single, or update multiple records using OAuth authentication. ' +

View File

@@ -141,9 +141,13 @@ export class Executor {
finalOutput = outputs[outputs.length - 1]
}
const hasLoopReachedMaxIterations =
await this.loopManager.processLoopIterations(context)
if (hasLoopReachedMaxIterations) {
// Process loop iterations - this will activate external paths when loops complete
await this.loopManager.processLoopIterations(context)
// Continue execution for any newly activated paths
// Only stop execution if there are no more blocks to execute
const updatedNextLayer = this.getNextExecutionLayer(context)
if (updatedNextLayer.length === 0) {
hasMoreLayers = false
}
}
@@ -199,7 +203,6 @@ export class Executor {
if (outputs.length > 0) {
finalOutput = outputs[outputs.length - 1]
}
await this.loopManager.processLoopIterations(context)
const nextLayer = this.getNextExecutionLayer(context)
setPendingBlocks(nextLayer)
@@ -239,7 +242,7 @@ export class Executor {
logs: context.blockLogs,
}
} catch (error: any) {
console.error('Debug step execution failed:', this.sanitizeError(error))
logger.error('Debug step execution failed:', this.sanitizeError(error))
return {
success: false,
@@ -293,10 +296,6 @@ export class Executor {
}
}
if (loop.nodes.length < 2) {
throw new Error(`Loop ${loopId} must contain at least 2 blocks`)
}
if (loop.iterations <= 0) {
throw new Error(`Loop ${loopId} must have a positive iterations value`)
}
@@ -326,6 +325,7 @@ export class Executor {
},
loopIterations: new Map(),
loopItems: new Map(),
completedLoops: new Set(),
executedBlocks: new Set(),
activeExecutionPath: new Set(),
workflow: this.workflow,
@@ -483,7 +483,7 @@ export class Executor {
// Fallback to raw input with both paths accessible
// Ensure we handle both input formats
const inputData =
this.workflowInput.input !== undefined
this.workflowInput?.input !== undefined
? this.workflowInput.input // Use nested input if available
: this.workflowInput // Fallback to direct input
@@ -545,11 +545,32 @@ export class Executor {
(conn) => conn.target === block.id
)
const isInLoop = Object.values(this.workflow.loops || {}).some((loop) =>
// Find all loops that this block is a part of
const containingLoops = Object.values(this.workflow.loops || {}).filter(loop =>
loop.nodes.includes(block.id)
)
const isInLoop = containingLoops.length > 0
if (isInLoop) {
// Check if this block is part of a self-loop (single-node loop)
const isInSelfLoop = containingLoops.some(loop =>
loop.nodes.length === 1 && loop.nodes[0] === block.id
)
// Check if there's a direct self-connection
const hasSelfConnection = this.workflow.connections.some(
conn => conn.source === block.id && conn.target === block.id
)
if (isInSelfLoop || hasSelfConnection) {
// For self-loops, we only need the node to be in the active execution path
// It will be reset after each iteration by the loop manager
pendingBlocks.add(block.id)
continue
}
// For regular multi-node loops
const hasValidPath = incomingConnections.some((conn) => {
return executedBlocks.has(conn.source)
})
@@ -558,6 +579,7 @@ export class Executor {
pendingBlocks.add(block.id)
}
} else {
// Regular non-loop block handling (unchanged)
const allDependenciesMet = incomingConnections.every((conn) => {
const sourceExecuted = executedBlocks.has(conn.source)
const sourceBlock = this.workflow.blocks.find((b) => b.id === conn.source)
@@ -765,21 +787,14 @@ export class Executor {
blockLog.durationMs =
new Date(blockLog.endedAt).getTime() - new Date(blockLog.startedAt).getTime()
// Log the error even if we'll continue execution through error path
context.blockLogs.push(blockLog)
addConsole({
output: {},
error:
error.message ||
`Error executing ${block.metadata?.id || 'unknown'} block: ${String(error)}`,
durationMs: blockLog.durationMs,
startedAt: blockLog.startedAt,
endedAt: blockLog.endedAt,
workflowId: context.workflowId,
timestamp: blockLog.startedAt,
blockName: block.metadata?.name || 'Unnamed Block',
blockType: block.metadata?.id || 'unknown',
})
// Check for error connections and follow them if they exist
const hasErrorPath = this.activateErrorPath(blockId, context)
// Log the error for visibility
logger.error(
`Error executing block ${block.metadata?.name || blockId}:`,
this.sanitizeError(error)
)
// Create error output with appropriate structure
const errorOutput: NormalizedBlockOutput = {
@@ -797,15 +812,6 @@ export class Executor {
executionTime: blockLog.durationMs,
})
// Check for error connections and follow them if they exist
const hasErrorPath = this.activateErrorPath(blockId, context)
// Console.error the error for visibility
logger.error(
`Error executing block ${block.metadata?.name || blockId}:`,
this.sanitizeError(error)
)
// If there are error paths to follow, return error output instead of throwing
if (hasErrorPath) {
// Return the error output to allow execution to continue along error path

View File

@@ -1,5 +1,8 @@
import { SerializedBlock, SerializedConnection, SerializedLoop } from '@/serializer/types'
import { ExecutionContext } from './types'
import { createLogger } from '@/lib/logs/console-logger'
const logger = createLogger('LoopManager')
/**
* Manages loop detection, iteration limits, and state resets.
@@ -27,6 +30,7 @@ export class LoopManager {
for (const [loopId, loop] of Object.entries(this.loops)) {
// Get the loop type (default to 'for')
const loopType = loop.loopType || 'for'
const currentIteration = context.loopIterations.get(loopId) || 0
// Handle forEach loop
if (loopType === 'forEach') {
@@ -48,19 +52,21 @@ export class LoopManager {
}
}
// Get current iteration count
const currentIteration = context.loopIterations.get(loopId) || 0
// For forEach, convert to array if it's an object
const items = Array.isArray(loop.forEachItems)
? loop.forEachItems
: Object.entries(loop.forEachItems as Record<string, any>)
// If we've processed all items or hit max iterations, skip this loop
// If we've processed all items or hit max iterations, mark loop as completed
if (currentIteration >= items.length || currentIteration >= loop.iterations) {
if (currentIteration >= items.length) {
hasLoopReachedMaxIterations = true
} else {
hasLoopReachedMaxIterations = true
}
// Now that the loop is complete, activate only external paths
this.activateExternalPaths(loopId, loop, context)
continue
}
@@ -82,22 +88,32 @@ export class LoopManager {
// Check if we've now reached iterations limit after incrementing
if (currentIteration + 1 >= items.length || currentIteration + 1 >= loop.iterations) {
hasLoopReachedMaxIterations = true
}
// IMPORTANT: If we've completed all iterations, activate the external paths
// This is different from the previous approach - we activate external paths
// at the end of the last iteration
this.activateExternalPaths(loopId, loop, context)
} else {
// We have more iterations to go, reset the blocks
// Reset ALL blocks in the loop for the next iteration
for (const nodeId of loop.nodes) {
// Remove from executed blocks
context.executedBlocks.delete(nodeId)
// Reset ALL blocks in the loop for the next iteration
for (const nodeId of loop.nodes) {
// Remove from executed blocks
context.executedBlocks.delete(nodeId)
// Make sure it's in the active execution path
context.activeExecutionPath.add(nodeId)
}
// Make sure it's in the active execution path
context.activeExecutionPath.add(nodeId)
}
// Make sure the first block in the loop is marked as executable
const entryBlock = this.findEntryBlock(loop.nodes, context)
if (loop.nodes.length > 0 && entryBlock) {
context.activeExecutionPath.add(entryBlock)
// Make sure the first block in the loop is marked as executable
const entryBlock = this.findEntryBlock(loop.nodes, context)
if (loop.nodes.length > 0 && entryBlock) {
context.activeExecutionPath.add(entryBlock)
}
}
} else {
// Not all blocks in the loop have been executed yet
// We need to activate the next block(s) in the loop sequence
this.activateNextBlocksInLoop(loopId, loop, context)
}
} else {
// Original logic for 'for' loops
@@ -107,6 +123,9 @@ export class LoopManager {
// If we've hit the iterations count, skip this loop and mark flag
if (currentIteration >= loop.iterations) {
hasLoopReachedMaxIterations = true
// Activate external paths from loop blocks when the loop is completed
this.activateExternalPaths(loopId, loop, context)
continue
}
@@ -121,23 +140,32 @@ export class LoopManager {
// Check if we've now reached iterations limit after incrementing
if (currentIteration + 1 >= loop.iterations) {
hasLoopReachedMaxIterations = true
}
// IMPORTANT: If we've completed all iterations, activate the external paths
// This is different from the previous approach - we activate external paths
// at the end of the last iteration
this.activateExternalPaths(loopId, loop, context)
} else {
// Reset ALL blocks in the loop, not just blocks after the entry
for (const nodeId of loop.nodes) {
// Remove from executed blocks
context.executedBlocks.delete(nodeId)
// Reset ALL blocks in the loop, not just blocks after the entry
for (const nodeId of loop.nodes) {
// Remove from executed blocks
context.executedBlocks.delete(nodeId)
// Make sure it's in the active execution path
context.activeExecutionPath.add(nodeId)
}
// Make sure it's in the active execution path
context.activeExecutionPath.add(nodeId)
}
// Important: Make sure the first block in the loop is marked as executable
const entryBlock = this.findEntryBlock(loop.nodes, context)
if (loop.nodes.length > 0 && entryBlock) {
// Make sure it's in the active path
context.activeExecutionPath.add(entryBlock)
// Important: Make sure the first block in the loop is marked as executable
const entryBlock = this.findEntryBlock(loop.nodes, context)
if (loop.nodes.length > 0 && entryBlock) {
// Make sure it's in the active path
context.activeExecutionPath.add(entryBlock)
}
}
} else {
// Not all blocks in the loop have been executed yet
// We need to activate the next block(s) in the loop sequence
this.activateNextBlocksInLoop(loopId, loop, context)
}
}
}
@@ -145,6 +173,98 @@ export class LoopManager {
return hasLoopReachedMaxIterations
}
/**
* Activates only paths that go from loop nodes to nodes outside the loop.
* This is called when a loop completes all iterations.
*
* @param loopId - ID of the loop
* @param loop - The loop configuration
* @param context - Current execution context
*/
private activateExternalPaths(
loopId: string,
loop: SerializedLoop,
context: ExecutionContext
): void {
if (!context.workflow) return;
// Mark this loop as completed
if (!context.completedLoops) {
context.completedLoops = new Set<string>();
}
context.completedLoops.add(loopId);
// First, identify all paths that lead to outside the loop
// We need to ensure only these are activated on loop completion
const externalTargets = new Set<string>();
// Build a map of outgoing connections from each node in the loop
for (const nodeId of loop.nodes) {
// Get all outgoing connections from this node
const outgoingConnections = context.workflow.connections.filter(conn =>
conn.source === nodeId
);
// For each outgoing connection, check if it leads outside the loop
for (const conn of outgoingConnections) {
// If target is not in the loop, it's an external path
if (!loop.nodes.includes(conn.target)) {
externalTargets.add(conn.target);
}
}
}
// Now, only activate the identified external targets
for (const target of externalTargets) {
// Find all connections leading to this target from nodes in the loop
const incomingConnections = context.workflow.connections.filter(conn =>
loop.nodes.includes(conn.source) && conn.target === target
);
for (const conn of incomingConnections) {
const sourceBlockId = conn.source;
const blockState = context.blockStates.get(sourceBlockId);
const hasError =
blockState?.output?.error !== undefined ||
blockState?.output?.response?.error !== undefined;
// Apply connection type rules, but only activate for the final iteration
if (conn.sourceHandle === 'error') {
// Only activate error paths if there was an error
if (hasError) {
context.activeExecutionPath.add(target);
}
} else if (conn.sourceHandle === 'source' || !conn.sourceHandle) {
// Only activate regular paths if there was no error
if (!hasError) {
context.activeExecutionPath.add(target);
}
} else if (conn.sourceHandle?.startsWith('condition-')) {
// For condition connections, check if this was the selected condition
const conditionId = conn.sourceHandle.replace('condition-', '');
const selectedCondition = context.decisions.condition.get(sourceBlockId);
if (conditionId === selectedCondition) {
context.activeExecutionPath.add(target);
}
} else if (sourceBlockId === conn.source) {
// For router blocks, check if this was the selected target
const sourceBlock = context.workflow.blocks.find(b => b.id === sourceBlockId);
if (sourceBlock?.metadata?.id === 'router') {
const selectedTarget = context.decisions.router.get(sourceBlockId);
if (selectedTarget === target) {
context.activeExecutionPath.add(target);
}
} else {
// For any other connection type
context.activeExecutionPath.add(target);
}
}
}
}
}
/**
* Gets the correct loop index based on the current block being executed.
* Accounts for position within the loop cycle to provide accurate index.
@@ -167,57 +287,6 @@ export class LoopManager {
return iterationCounter
}
/**
* Determines the execution order of blocks in a loop based on the connections.
* This is needed to figure out which blocks should be assigned which iteration.
*
* @param nodeIds - IDs of nodes in the loop
* @param context - Current execution context
* @returns Array of block IDs in execution order
*/
private determineBlockExecutionOrder(nodeIds: string[], context: ExecutionContext): string[] {
// Start with the entry block
const entryBlock = this.findEntryBlock(nodeIds, context)
if (!entryBlock) return nodeIds
const result: string[] = [entryBlock]
const visited = new Set<string>([entryBlock])
// Perform a depth-first traversal to determine execution order
const traverse = (nodeId: string) => {
// Find all outgoing connections from this node
const connections =
context.workflow?.connections.filter(
(conn) =>
conn.source === nodeId && nodeIds.includes(conn.target) && conn.sourceHandle !== 'error'
) || []
// Sort by target node to ensure deterministic order
connections.sort((a, b) => a.target.localeCompare(b.target))
// Visit each target node
for (const conn of connections) {
if (!visited.has(conn.target)) {
visited.add(conn.target)
result.push(conn.target)
traverse(conn.target)
}
}
}
// Start traversal from the entry block
traverse(entryBlock)
// If there are nodes we didn't visit, add them at the end
for (const nodeId of nodeIds) {
if (!visited.has(nodeId)) {
result.push(nodeId)
}
}
return result
}
/**
* Evaluates the forEach items string or retrieves items for a forEach loop.
*
@@ -257,9 +326,16 @@ export class LoopManager {
if (trimmedExpression.startsWith('[') || trimmedExpression.startsWith('{')) {
try {
// Try to parse as JSON first
return JSON.parse(trimmedExpression)
// Handle both JSON format (double quotes) and JS format (single quotes)
const normalizedExpression = trimmedExpression
.replace(/'/g, '"') // Replace all single quotes with double quotes
.replace(/(\w+):/g, '"$1":') // Convert property names to double-quoted strings
.replace(/,\s*]/g, ']') // Remove trailing commas before closing brackets
.replace(/,\s*}/g, '}') // Remove trailing commas before closing braces
return JSON.parse(normalizedExpression)
} catch (jsonError) {
console.error(`Error parsing JSON for loop ${loopId}:`, jsonError)
logger.debug(`Error parsing JSON for loop ${loopId}:`, jsonError)
// If JSON parsing fails, continue with expression evaluation
}
}
@@ -279,7 +355,7 @@ export class LoopManager {
return []
} catch (e) {
console.error(`Error evaluating forEach items for loop ${loopId}:`, e)
logger.error(`Error evaluating forEach items for loop ${loopId}:`, e)
return []
}
}
@@ -304,27 +380,93 @@ export class LoopManager {
/**
* Finds the entry block for a loop (the one that should be executed first).
* Typically the block with the fewest incoming connections.
* Typically the block with incoming connections from outside the loop.
*
* @param nodeIds - IDs of nodes in the loop
* @param context - Current execution context
* @returns ID of the entry block
*/
private findEntryBlock(nodeIds: string[], context: ExecutionContext): string | undefined {
const blockConnectionCounts = new Map<string, number>()
for (const nodeId of nodeIds) {
const incomingCount = context.workflow!.connections.filter(
(conn) => conn.target === nodeId
).length
blockConnectionCounts.set(nodeId, incomingCount)
// If there's only one node in the loop, it's the entry block
if (nodeIds.length === 1) {
return nodeIds[0];
}
const sortedBlocks = [...nodeIds].sort(
(a, b) => (blockConnectionCounts.get(a) || 0) - (blockConnectionCounts.get(b) || 0)
)
return sortedBlocks[0]
// Check which blocks have connections from outside the loop
const blocksWithExternalConnections = new Map<string, {
incomingExternal: number,
outgoingExternal: number,
incomingInternal: number,
}>();
for (const nodeId of nodeIds) {
// Count connections coming from outside the loop
const externalIncomingCount = context.workflow!.connections.filter(
conn => conn.target === nodeId && !nodeIds.includes(conn.source)
).length;
// Count connections going to outside the loop
const externalOutgoingCount = context.workflow!.connections.filter(
conn => conn.source === nodeId && !nodeIds.includes(conn.target)
).length;
// Count internal incoming connections that aren't self-connections
const internalIncomingCount = context.workflow!.connections.filter(
conn => conn.target === nodeId && conn.source !== nodeId && nodeIds.includes(conn.source)
).length;
blocksWithExternalConnections.set(nodeId, {
incomingExternal: externalIncomingCount,
outgoingExternal: externalOutgoingCount,
incomingInternal: internalIncomingCount
});
}
// First priority: blocks with incoming connections from outside the loop
const blocksWithExternalIncoming = nodeIds.filter(id =>
blocksWithExternalConnections.get(id)?.incomingExternal! > 0
);
if (blocksWithExternalIncoming.length > 0) {
// If multiple blocks have external incoming connections,
// prioritize the one with the most external incoming connections
return blocksWithExternalIncoming.sort((a, b) =>
blocksWithExternalConnections.get(b)?.incomingExternal! -
blocksWithExternalConnections.get(a)?.incomingExternal!
)[0];
}
// Second priority: look for the likely first node in a self-contained loop
// This is often the node that has the fewest internal incoming connections
// but might have outgoing connections to outside the loop
// Sort blocks by fewest internal incoming connections
const sortedByInternalIncoming = [...nodeIds].sort(
(a, b) =>
(blocksWithExternalConnections.get(a)?.incomingInternal || 0) -
(blocksWithExternalConnections.get(b)?.incomingInternal || 0)
);
// Among those with the fewest internal incoming connections,
// prioritize those with external outgoing connections
const candidatesWithFewestIncoming = sortedByInternalIncoming.filter(id =>
blocksWithExternalConnections.get(id)?.incomingInternal ===
blocksWithExternalConnections.get(sortedByInternalIncoming[0])?.incomingInternal
);
if (candidatesWithFewestIncoming.length > 1) {
// Among these candidates, prioritize those with external outgoing connections
const withExternalOutgoing = candidatesWithFewestIncoming.filter(id =>
blocksWithExternalConnections.get(id)?.outgoingExternal! > 0
);
if (withExternalOutgoing.length > 0) {
return withExternalOutgoing[0];
}
}
// If no better criteria found, return the first node with fewest internal incoming connections
return sortedByInternalIncoming[0];
}
/**
@@ -335,7 +477,13 @@ export class LoopManager {
* @returns Whether all blocks have been executed
*/
private allBlocksExecuted(nodeIds: string[], context: ExecutionContext): boolean {
return nodeIds.every((nodeId) => context.executedBlocks.has(nodeId))
// For single-node loops, ensure the node has been executed at least once
if (nodeIds.length === 1 && context.executedBlocks.has(nodeIds[0])) {
return true;
}
// For multi-node loops, ensure all nodes have been executed
return nodeIds.every((nodeId) => context.executedBlocks.has(nodeId));
}
/**
@@ -347,8 +495,18 @@ export class LoopManager {
* @returns Whether the connection forms a feedback path
*/
isFeedbackPath(connection: SerializedConnection, blocks: SerializedBlock[]): boolean {
// Self-loops are always feedback paths
if (connection.source === connection.target) {
return true;
}
for (const [loopId, loop] of Object.entries(this.loops)) {
if (loop.nodes.includes(connection.source) && loop.nodes.includes(connection.target)) {
// For single-node loops, any connection to itself is a feedback path
if (loop.nodes.length === 1 && loop.nodes[0] === connection.source && connection.source === connection.target) {
return true;
}
const sourceIndex = loop.nodes.indexOf(connection.source)
const targetIndex = loop.nodes.indexOf(connection.target)
@@ -384,4 +542,59 @@ export class LoopManager {
getCurrentItem(loopId: string, context: ExecutionContext): any {
return context.loopItems.get(loopId)
}
/**
* Activates the next blocks in the loop sequence when not all blocks have been executed.
* This ensures proper flow through the loop when PathTracker is prevented from activating within-loop paths.
*
* @param loopId - ID of the loop
* @param loop - The loop configuration
* @param context - Current execution context
*/
private activateNextBlocksInLoop(
loopId: string,
loop: SerializedLoop,
context: ExecutionContext
): void {
if (!context.workflow) return;
// Find which blocks in the loop have been executed
const executedLoopBlocks = new Set(
loop.nodes.filter(nodeId => context.executedBlocks.has(nodeId))
);
if (executedLoopBlocks.size === 0) {
// If no blocks have been executed yet, activate the entry block
const entryBlock = this.findEntryBlock(loop.nodes, context);
if (entryBlock) {
context.activeExecutionPath.add(entryBlock);
}
return;
}
// For each executed block, find and activate its next blocks in the loop
for (const executedBlockId of executedLoopBlocks) {
// Get outgoing connections from this block to other blocks in the loop
const outgoingConnections = context.workflow.connections.filter(conn =>
conn.source === executedBlockId &&
loop.nodes.includes(conn.target) &&
!executedLoopBlocks.has(conn.target)
);
// Activate each target that hasn't been executed yet
for (const conn of outgoingConnections) {
// Skip error connections unless there was an error
if (conn.sourceHandle === 'error') {
const blockState = context.blockStates.get(executedBlockId);
const hasError =
blockState?.output?.error !== undefined ||
blockState?.output?.response?.error !== undefined;
if (!hasError) continue;
}
context.activeExecutionPath.add(conn.target);
}
}
}
}

View File

@@ -1,5 +1,8 @@
import { SerializedWorkflow } from '@/serializer/types'
import { ExecutionContext } from './types'
import { createLogger } from '@/lib/logs/console-logger'
const logger = createLogger('PathTracker')
/**
* Manages the active execution paths in the workflow.
@@ -66,63 +69,104 @@ export class PathTracker {
* @param context - Current execution context
*/
updateExecutionPaths(executedBlockIds: string[], context: ExecutionContext): void {
logger.info(`Updating paths for blocks: ${executedBlockIds.join(', ')}`);
for (const blockId of executedBlockIds) {
const block = this.workflow.blocks.find((b) => b.id === blockId)
const block = this.workflow.blocks.find((b) => b.id === blockId);
if (block?.metadata?.id === 'router') {
const routerOutput = context.blockStates.get(blockId)?.output
const selectedPath = routerOutput?.response?.selectedPath?.blockId
const routerOutput = context.blockStates.get(blockId)?.output;
const selectedPath = routerOutput?.response?.selectedPath?.blockId;
if (selectedPath) {
// Record the decision but don't deactivate other paths
context.decisions.router.set(blockId, selectedPath)
context.activeExecutionPath.add(selectedPath)
context.decisions.router.set(blockId, selectedPath);
context.activeExecutionPath.add(selectedPath);
logger.info(`Router ${blockId} selected path: ${selectedPath}`);
}
} else if (block?.metadata?.id === 'condition') {
const conditionOutput = context.blockStates.get(blockId)?.output
const selectedConditionId = conditionOutput?.response?.selectedConditionId
const conditionOutput = context.blockStates.get(blockId)?.output;
const selectedConditionId = conditionOutput?.response?.selectedConditionId;
if (selectedConditionId) {
// Record the decision but don't deactivate other paths
context.decisions.condition.set(blockId, selectedConditionId)
context.decisions.condition.set(blockId, selectedConditionId);
const targetConnection = this.workflow.connections.find(
(conn) =>
conn.source === blockId && conn.sourceHandle === `condition-${selectedConditionId}`
)
);
if (targetConnection) {
context.activeExecutionPath.add(targetConnection.target)
context.activeExecutionPath.add(targetConnection.target);
logger.debug(`Condition ${blockId} selected: ${selectedConditionId}`);
}
}
} else {
// For regular blocks, activate all outgoing connections based on success or error status
const blockState = context.blockStates.get(blockId)
const blockState = context.blockStates.get(blockId);
const hasError =
blockState?.output?.error !== undefined ||
blockState?.output?.response?.error !== undefined
blockState?.output?.response?.error !== undefined;
// Get all outgoing connections
const outgoingConnections = this.workflow.connections.filter(
(conn) => conn.source === blockId
)
);
// Find out which loops this block belongs to
const blockLoops = Object.entries(context.workflow?.loops || {})
.filter(([_, loop]) => loop.nodes.includes(blockId))
.map(([id, loop]) => ({ id, loop }));
// Check if the block is part of any loops
const isPartOfLoop = blockLoops.length > 0;
// Process each outgoing connection
for (const conn of outgoingConnections) {
// Check if this connection is internal to any loop the source block belongs to
const isInternalLoopConnection = blockLoops.some(({ loop }) =>
// Target is also in the same loop as the source
loop.nodes.includes(conn.target)
);
// Check if this is a connection to a block outside any loop that contains the source
const isExternalLoopConnection = isPartOfLoop && !isInternalLoopConnection;
// Let the LoopManager handle all connections within loops
if (isInternalLoopConnection) {
continue;
}
// Check if all loops this block belongs to are completed
const allLoopsCompleted = blockLoops.every(({ id }) =>
context.completedLoops && context.completedLoops.has(id)
);
// Skip external connections from loop blocks UNLESS all loops are completed
if (isExternalLoopConnection && !allLoopsCompleted) {
continue;
}
// Now we can activate the path if:
// 1. It's not a loop connection, or
// 2. It's an external connection and all loops are completed
// For error connections, only activate them on error
if (conn.sourceHandle === 'error') {
if (hasError) {
context.activeExecutionPath.add(conn.target)
context.activeExecutionPath.add(conn.target);
}
}
// For regular (source) connections, only activate them if there's no error
else if (conn.sourceHandle === 'source' || !conn.sourceHandle) {
if (!hasError) {
context.activeExecutionPath.add(conn.target)
context.activeExecutionPath.add(conn.target);
}
}
// All other types of connections (e.g., from condition blocks) follow their own rules
else {
context.activeExecutionPath.add(conn.target)
context.activeExecutionPath.add(conn.target);
}
}
}

View File

@@ -301,6 +301,11 @@ export class InputResolver {
if (!blockMatches) return value
let resolvedValue = value
// Check if we're in a template literal for function blocks
const isInTemplateLiteral =
currentBlock.metadata?.id === 'function' &&
(/\${[^}]*</.test(value) || /<[^>]*}\$/.test(value))
for (const match of blockMatches) {
// Skip variables - they've already been processed
@@ -418,8 +423,11 @@ export class InputResolver {
// Format the value based on type
if (currentItem !== undefined) {
if (typeof currentItem !== 'object' || currentItem === null) {
// For primitives, convert to string
resolvedValue = resolvedValue.replace(match, String(currentItem))
// Format primitive values properly for code contexts
resolvedValue = resolvedValue.replace(
match,
this.formatValueForCodeContext(currentItem, currentBlock, isInTemplateLiteral)
)
} else if (
Array.isArray(currentItem) &&
currentItem.length === 2 &&
@@ -428,14 +436,15 @@ export class InputResolver {
// Handle [key, value] pair from Object.entries()
if (pathParts.length > 1) {
if (pathParts[1] === 'key') {
resolvedValue = resolvedValue.replace(match, String(currentItem[0]))
resolvedValue = resolvedValue.replace(
match,
this.formatValueForCodeContext(currentItem[0], currentBlock, isInTemplateLiteral)
)
} else if (pathParts[1] === 'value') {
const itemValue = currentItem[1]
const formattedValue =
typeof itemValue === 'object' && itemValue !== null
? JSON.stringify(itemValue)
: String(itemValue)
resolvedValue = resolvedValue.replace(match, formattedValue)
resolvedValue = resolvedValue.replace(
match,
this.formatValueForCodeContext(currentItem[1], currentBlock, isInTemplateLiteral)
)
}
} else {
// Default to stringifying the whole item
@@ -457,12 +466,11 @@ export class InputResolver {
}
}
const formattedValue =
typeof itemValue === 'object' && itemValue !== null
? JSON.stringify(itemValue)
: String(itemValue)
resolvedValue = resolvedValue.replace(match, formattedValue)
// Use the formatter helper method
resolvedValue = resolvedValue.replace(
match,
this.formatValueForCodeContext(itemValue, currentBlock, isInTemplateLiteral)
)
} else {
// Return the whole item as JSON
resolvedValue = resolvedValue.replace(match, JSON.stringify(currentItem))
@@ -477,11 +485,11 @@ export class InputResolver {
const items = this.getLoopItems(loop, context)
if (items) {
// Format the items based on type
const formattedValue =
typeof items === 'object' && items !== null ? JSON.stringify(items) : String(items)
resolvedValue = resolvedValue.replace(match, formattedValue)
// Format the items using our helper
resolvedValue = resolvedValue.replace(
match,
this.formatValueForCodeContext(items, currentBlock, isInTemplateLiteral)
)
continue
}
} else if (pathParts[0] === 'index') {
@@ -490,7 +498,11 @@ export class InputResolver {
? this.loopManager.getLoopIndex(containingLoopId, currentBlock.id, context)
: context.loopIterations.get(containingLoopId) || 0
resolvedValue = resolvedValue.replace(match, String(index))
// For function blocks, we don't need to quote numbers, but use the formatter for consistency
resolvedValue = resolvedValue.replace(
match,
this.formatValueForCodeContext(index, currentBlock, isInTemplateLiteral)
)
continue
}
}
@@ -576,8 +588,13 @@ export class InputResolver {
typeof replacementValue === 'string' &&
this.needsCodeStringLiteral(currentBlock, value)
) {
// For code blocks, quote string values properly for the given language
formattedValue = JSON.stringify(replacementValue)
// Check if we're in a template literal
const isInTemplateLiteral =
currentBlock.metadata?.id === 'function' &&
(/\${[^}]*</.test(value) || /<[^>]*}\$/.test(value))
// For code blocks, use our formatter
formattedValue = this.formatValueForCodeContext(replacementValue, currentBlock, isInTemplateLiteral)
} else {
formattedValue =
typeof replacementValue === 'object'
@@ -828,7 +845,14 @@ export class InputResolver {
if (trimmedExpression.startsWith('[') || trimmedExpression.startsWith('{')) {
try {
// Try to parse as JSON first
return JSON.parse(trimmedExpression)
// Handle both JSON format (double quotes) and JS format (single quotes)
const normalizedExpression = trimmedExpression
.replace(/'/g, '"') // Replace all single quotes with double quotes
.replace(/(\w+):/g, '"$1":') // Convert property names to double-quoted strings
.replace(/,\s*]/g, ']') // Remove trailing commas before closing brackets
.replace(/,\s*}/g, '}') // Remove trailing commas before closing braces
return JSON.parse(normalizedExpression)
} catch (jsonError) {
console.error(`Error parsing JSON for loop:`, jsonError)
// If JSON parsing fails, continue with expression evaluation
@@ -866,4 +890,54 @@ export class InputResolver {
// Default to empty array if no valid items found
return []
}
/**
* Formats a value for safe use in a code context (like function blocks).
* Ensures strings are properly quoted in JavaScript.
*
* @param value - The value to format
* @param block - The block that will use this value
* @param isInTemplateLiteral - Whether this value is inside a template literal
* @returns Properly formatted value for code insertion
*/
private formatValueForCodeContext(
value: any,
block: SerializedBlock,
isInTemplateLiteral: boolean = false
): string {
// For function blocks, properly format values to avoid syntax errors
if (block.metadata?.id === 'function') {
// Special case for values in template literals (like `Hello ${<loop.currentItem>}`)
if (isInTemplateLiteral) {
if (typeof value === 'string') {
return value // Don't quote strings in template literals
} else if (typeof value === 'object' && value !== null) {
return JSON.stringify(value) // But do stringify objects
} else {
return String(value)
}
}
// Regular (non-template) contexts
if (typeof value === 'string') {
// Quote strings for JavaScript
return JSON.stringify(value)
} else if (typeof value === 'object' && value !== null) {
// Stringify objects and arrays
return JSON.stringify(value)
} else if (value === undefined) {
return 'undefined'
} else if (value === null) {
return 'null'
} else {
// Numbers, booleans can be inserted as is
return String(value)
}
}
// For non-code blocks, use normal string conversion
return typeof value === 'object' && value !== null
? JSON.stringify(value)
: String(value)
}
}

View File

@@ -88,6 +88,7 @@ export interface ExecutionContext {
loopIterations: Map<string, number> // Tracks current iteration count for each loop
loopItems: Map<string, any> // Tracks current item for forEach loops
completedLoops: Set<string> // Tracks which loops have completed all iterations
// Execution tracking
executedBlocks: Set<string> // Set of block IDs that have been executed

View File

@@ -135,7 +135,8 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
// Clean up loops
Object.entries(newState.loops).forEach(([loopId, loop]) => {
if (loop.nodes.includes(id)) {
if (loop.nodes.length <= 2) {
// If removing this node would leave the loop empty, delete the loop
if (loop.nodes.length <= 1) {
delete newState.loops[loopId]
} else {
newState.loops[loopId] = {
@@ -183,6 +184,7 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
// Recalculate all loops after adding the edge
const newLoops: Record<string, Loop> = {}
const processedPaths = new Set<string>()
const existingLoops = get().loops
// Check for cycles from each node
const nodes = new Set(newEdges.map((e) => e.source))
@@ -192,15 +194,34 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
// Create a canonical path representation for deduplication
const canonicalPath = [...path].sort().join(',')
if (!processedPaths.has(canonicalPath)) {
const loopId = crypto.randomUUID()
newLoops[loopId] = {
id: loopId,
nodes: path,
iterations: 5,
loopType: 'for', // Default to 'for' loop
forEachItems: '',
}
processedPaths.add(canonicalPath)
// Check if this path matches an existing loop
let existingLoop: Loop | undefined
Object.values(existingLoops).forEach((loop) => {
const loopCanonicalPath = [...loop.nodes].sort().join(',')
if (loopCanonicalPath === canonicalPath) {
existingLoop = loop
}
})
if (existingLoop) {
// Preserve the existing loop's properties
newLoops[existingLoop.id] = {
...existingLoop,
nodes: path // Update nodes in case order changed
}
} else {
// Create a new loop with default settings
const loopId = crypto.randomUUID()
newLoops[loopId] = {
id: loopId,
nodes: path,
iterations: 5,
loopType: 'for',
forEachItems: '',
}
}
}
})
})
@@ -223,6 +244,7 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
// Recalculate all loops after edge removal
const newLoops: Record<string, Loop> = {}
const processedPaths = new Set<string>()
const existingLoops = get().loops
// Check for cycles from each node
const nodes = new Set(newEdges.map((e) => e.source))
@@ -232,15 +254,34 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
// Create a canonical path representation for deduplication
const canonicalPath = [...path].sort().join(',')
if (!processedPaths.has(canonicalPath)) {
const loopId = crypto.randomUUID()
newLoops[loopId] = {
id: loopId,
nodes: path,
iterations: 5,
loopType: 'for', // Default to 'for' loop
forEachItems: '',
}
processedPaths.add(canonicalPath)
// Check if this path matches an existing loop
let existingLoop: Loop | undefined
Object.values(existingLoops).forEach((loop) => {
const loopCanonicalPath = [...loop.nodes].sort().join(',')
if (loopCanonicalPath === canonicalPath) {
existingLoop = loop
}
})
if (existingLoop) {
// Preserve the existing loop's properties
newLoops[existingLoop.id] = {
...existingLoop,
nodes: path // Update nodes in case order changed
}
} else {
// Create a new loop with default settings
const loopId = crypto.randomUUID()
newLoops[loopId] = {
id: loopId,
nodes: path,
iterations: 5,
loopType: 'for',
forEachItems: '',
}
}
}
})
})
@@ -568,28 +609,6 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
},
updateLoopForEachItems: (loopId: string, items: string) => {
let parsedItems: any = items
// Try to parse the string as JSON if it looks like JSON
if (
typeof items === 'string' &&
((items.trim().startsWith('[') && items.trim().endsWith(']')) ||
(items.trim().startsWith('{') && items.trim().endsWith('}')))
) {
try {
// First try to parse to validate it's valid JSON
const parsed = JSON.parse(items)
// If parsing succeeds, store the original string to preserve formatting
// This way we keep the user's exact formatting (spacing, line breaks, etc.)
parsedItems = items
} catch (e) {
// If parsing fails, keep it as a string expression
console.error('Invalid JSON format for forEach items:', e)
parsedItems = items
}
}
const newState = {
blocks: { ...get().blocks },
edges: [...get().edges],
@@ -597,7 +616,7 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
...get().loops,
[loopId]: {
...get().loops[loopId],
forEachItems: parsedItems,
forEachItems: items,
},
},
}

View File

@@ -24,6 +24,12 @@ export function detectCycle(
const neighbors = edges.filter((edge) => edge.source === node).map((edge) => edge.target)
for (const neighbor of neighbors) {
// Check for self-loops (node connecting to itself)
if (neighbor === node) {
allCycles.push([node])
continue
}
if (!recursionStack.has(neighbor)) {
if (!visited.has(neighbor)) {
dfs(neighbor)
@@ -33,10 +39,8 @@ export function detectCycle(
const cycleStartIndex = currentPath.indexOf(neighbor)
if (cycleStartIndex !== -1) {
const cycle = currentPath.slice(cycleStartIndex)
// Only add cycles with length > 1
if (cycle.length > 1) {
allCycles.push([...cycle])
}
// Include all cycles, even single-node ones
allCycles.push([...cycle])
}
}
}