mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-17 01:42:43 -05:00
feat(while, vars, wait): add while subflow, variables block, wait block (#1754)
* Add variables block * Add wait block * While loop v1 * While loop v1 * Do while loops * Copilot user input rerender fix * Fix while and dowhile * Vars block dropdown * While loop docs * Remove vars block coloring * Fix lint * Link docs to wait * Fix build fail
This commit is contained in:
committed by
GitHub
parent
ef5b6999ab
commit
aace3066aa
@@ -48,6 +48,8 @@ export const setupHandlerMocks = () => {
|
||||
LoopBlockHandler: createMockHandler('loop'),
|
||||
ParallelBlockHandler: createMockHandler('parallel'),
|
||||
WorkflowBlockHandler: createMockHandler('workflow'),
|
||||
VariablesBlockHandler: createMockHandler('variables'),
|
||||
WaitBlockHandler: createMockHandler('wait'),
|
||||
GenericBlockHandler: createMockHandler('generic'),
|
||||
ResponseBlockHandler: createMockHandler('response'),
|
||||
}))
|
||||
|
||||
@@ -15,6 +15,8 @@ export enum BlockType {
|
||||
WORKFLOW = 'workflow', // Deprecated - kept for backwards compatibility
|
||||
WORKFLOW_INPUT = 'workflow_input', // Current workflow block type
|
||||
STARTER = 'starter',
|
||||
VARIABLES = 'variables',
|
||||
WAIT = 'wait',
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -332,7 +332,7 @@ describe('ConditionBlockHandler', () => {
|
||||
mockResolver.resolveEnvVariables.mockReturnValue('context.nonExistentProperty.doSomething()')
|
||||
|
||||
await expect(handler.execute(mockBlock, inputs, mockContext)).rejects.toThrow(
|
||||
/^Evaluation error in condition "if": Cannot read properties of undefined \(reading 'doSomething'\)\. \(Resolved: context\.nonExistentProperty\.doSomething\(\)\)$/
|
||||
/^Evaluation error in condition "if": Evaluation error in condition: Cannot read properties of undefined \(reading 'doSomething'\)\. \(Resolved: context\.nonExistentProperty\.doSomething\(\)\)$/
|
||||
)
|
||||
})
|
||||
|
||||
|
||||
@@ -8,6 +8,61 @@ import type { SerializedBlock } from '@/serializer/types'
|
||||
|
||||
const logger = createLogger('ConditionBlockHandler')
|
||||
|
||||
/**
|
||||
* Evaluates a single condition expression with variable/block reference resolution
|
||||
* Returns true if condition is met, false otherwise
|
||||
*/
|
||||
export async function evaluateConditionExpression(
|
||||
conditionExpression: string,
|
||||
context: ExecutionContext,
|
||||
block: SerializedBlock,
|
||||
resolver: InputResolver,
|
||||
providedEvalContext?: Record<string, any>
|
||||
): Promise<boolean> {
|
||||
// Build evaluation context - use provided context or just loop context
|
||||
const evalContext = providedEvalContext || {
|
||||
// Add loop context if applicable
|
||||
...(context.loopItems.get(block.id) || {}),
|
||||
}
|
||||
|
||||
let resolvedConditionValue = conditionExpression
|
||||
try {
|
||||
// Use full resolution pipeline: variables -> block references -> env vars
|
||||
const resolvedVars = resolver.resolveVariableReferences(conditionExpression, block)
|
||||
const resolvedRefs = resolver.resolveBlockReferences(resolvedVars, context, block)
|
||||
resolvedConditionValue = resolver.resolveEnvVariables(resolvedRefs)
|
||||
logger.info(`Resolved condition: from "${conditionExpression}" to "${resolvedConditionValue}"`)
|
||||
} catch (resolveError: any) {
|
||||
logger.error(`Failed to resolve references in condition: ${resolveError.message}`, {
|
||||
conditionExpression,
|
||||
resolveError,
|
||||
})
|
||||
throw new Error(`Failed to resolve references in condition: ${resolveError.message}`)
|
||||
}
|
||||
|
||||
// Evaluate the RESOLVED condition string
|
||||
try {
|
||||
logger.info(`Evaluating resolved condition: "${resolvedConditionValue}"`, { evalContext })
|
||||
// IMPORTANT: The resolved value (e.g., "some string".length > 0) IS the code to run
|
||||
const conditionMet = new Function(
|
||||
'context',
|
||||
`with(context) { return ${resolvedConditionValue} }`
|
||||
)(evalContext)
|
||||
logger.info(`Condition evaluated to: ${conditionMet}`)
|
||||
return Boolean(conditionMet)
|
||||
} catch (evalError: any) {
|
||||
logger.error(`Failed to evaluate condition: ${evalError.message}`, {
|
||||
originalCondition: conditionExpression,
|
||||
resolvedCondition: resolvedConditionValue,
|
||||
evalContext,
|
||||
evalError,
|
||||
})
|
||||
throw new Error(
|
||||
`Evaluation error in condition: ${evalError.message}. (Resolved: ${resolvedConditionValue})`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for Condition blocks that evaluate expressions to determine execution paths.
|
||||
*/
|
||||
@@ -102,35 +157,16 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
continue // Should ideally not happen if 'else' exists and has a connection
|
||||
}
|
||||
|
||||
// 2. Resolve references WITHIN the specific condition's value string
|
||||
// 2. Evaluate the condition using the shared evaluation function
|
||||
const conditionValueString = String(condition.value || '')
|
||||
let resolvedConditionValue = conditionValueString
|
||||
try {
|
||||
// Use full resolution pipeline: variables -> block references -> env vars
|
||||
const resolvedVars = this.resolver.resolveVariableReferences(conditionValueString, block)
|
||||
const resolvedRefs = this.resolver.resolveBlockReferences(resolvedVars, context, block)
|
||||
resolvedConditionValue = this.resolver.resolveEnvVariables(resolvedRefs)
|
||||
logger.info(
|
||||
`Resolved condition "${condition.title}" (${condition.id}): from "${conditionValueString}" to "${resolvedConditionValue}"`
|
||||
const conditionMet = await evaluateConditionExpression(
|
||||
conditionValueString,
|
||||
context,
|
||||
block,
|
||||
this.resolver,
|
||||
evalContext
|
||||
)
|
||||
} catch (resolveError: any) {
|
||||
logger.error(`Failed to resolve references in condition: ${resolveError.message}`, {
|
||||
condition,
|
||||
resolveError,
|
||||
})
|
||||
throw new Error(`Failed to resolve references in condition: ${resolveError.message}`)
|
||||
}
|
||||
|
||||
// 3. Evaluate the RESOLVED condition string
|
||||
try {
|
||||
logger.info(`Evaluating resolved condition: "${resolvedConditionValue}"`, {
|
||||
evalContext, // Log the context being used for evaluation
|
||||
})
|
||||
// IMPORTANT: The resolved value (e.g., "some string".length > 0) IS the code to run
|
||||
const conditionMet = new Function(
|
||||
'context',
|
||||
`with(context) { return ${resolvedConditionValue} }`
|
||||
)(evalContext)
|
||||
logger.info(`Condition "${condition.title}" (${condition.id}) met: ${conditionMet}`)
|
||||
|
||||
// Find connection for this condition
|
||||
@@ -143,17 +179,9 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
selectedCondition = condition
|
||||
break // Found the first matching condition
|
||||
}
|
||||
} catch (evalError: any) {
|
||||
logger.error(`Failed to evaluate condition: ${evalError.message}`, {
|
||||
originalCondition: condition.value,
|
||||
resolvedCondition: resolvedConditionValue,
|
||||
evalContext,
|
||||
evalError,
|
||||
})
|
||||
// Construct a more informative error message
|
||||
throw new Error(
|
||||
`Evaluation error in condition "${condition.title}": ${evalError.message}. (Resolved: ${resolvedConditionValue})`
|
||||
)
|
||||
} catch (error: any) {
|
||||
logger.error(`Failed to evaluate condition "${condition.title}": ${error.message}`)
|
||||
throw new Error(`Evaluation error in condition "${condition.title}": ${error.message}`)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,8 @@ import { ParallelBlockHandler } from '@/executor/handlers/parallel/parallel-hand
|
||||
import { ResponseBlockHandler } from '@/executor/handlers/response/response-handler'
|
||||
import { RouterBlockHandler } from '@/executor/handlers/router/router-handler'
|
||||
import { TriggerBlockHandler } from '@/executor/handlers/trigger/trigger-handler'
|
||||
import { VariablesBlockHandler } from '@/executor/handlers/variables/variables-handler'
|
||||
import { WaitBlockHandler } from '@/executor/handlers/wait/wait-handler'
|
||||
import { WorkflowBlockHandler } from '@/executor/handlers/workflow/workflow-handler'
|
||||
|
||||
export {
|
||||
@@ -23,5 +25,7 @@ export {
|
||||
ResponseBlockHandler,
|
||||
RouterBlockHandler,
|
||||
TriggerBlockHandler,
|
||||
VariablesBlockHandler,
|
||||
WaitBlockHandler,
|
||||
WorkflowBlockHandler,
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import { BlockType } from '@/executor/consts'
|
||||
import { evaluateConditionExpression } from '@/executor/handlers/condition/condition-handler'
|
||||
import type { PathTracker } from '@/executor/path/path'
|
||||
import type { InputResolver } from '@/executor/resolver/resolver'
|
||||
import { Routing } from '@/executor/routing/routing'
|
||||
@@ -50,6 +51,8 @@ export class LoopBlockHandler implements BlockHandler {
|
||||
const currentIteration = context.loopIterations.get(block.id) || 1
|
||||
let maxIterations: number
|
||||
let forEachItems: any[] | Record<string, any> | null = null
|
||||
let shouldContinueLoop = true
|
||||
|
||||
if (loop.loopType === 'forEach') {
|
||||
if (
|
||||
!loop.forEachItems ||
|
||||
@@ -82,16 +85,96 @@ export class LoopBlockHandler implements BlockHandler {
|
||||
logger.info(
|
||||
`forEach loop ${block.id} - Items: ${itemsLength}, Max iterations: ${maxIterations}`
|
||||
)
|
||||
} else if (loop.loopType === 'while' || loop.loopType === 'doWhile') {
|
||||
// For while and doWhile loops, set loop context BEFORE evaluating condition
|
||||
// This makes variables like index, currentIteration available in the condition
|
||||
const loopContext = {
|
||||
index: currentIteration - 1, // 0-based index
|
||||
currentIteration, // 1-based iteration number
|
||||
}
|
||||
context.loopItems.set(block.id, loopContext)
|
||||
|
||||
// Evaluate the condition to determine if we should continue
|
||||
if (!loop.whileCondition || loop.whileCondition.trim() === '') {
|
||||
throw new Error(
|
||||
`${loop.loopType} loop "${block.id}" requires a condition expression. Please provide a valid JavaScript expression.`
|
||||
)
|
||||
}
|
||||
|
||||
// For doWhile loops, skip condition evaluation on the first iteration
|
||||
// For while loops, always evaluate the condition
|
||||
if (loop.loopType === 'doWhile' && currentIteration === 1) {
|
||||
shouldContinueLoop = true
|
||||
} else {
|
||||
// Evaluate the condition at the start of each iteration
|
||||
try {
|
||||
if (!this.resolver) {
|
||||
throw new Error('Resolver is required for while/doWhile loop condition evaluation')
|
||||
}
|
||||
shouldContinueLoop = await evaluateConditionExpression(
|
||||
loop.whileCondition,
|
||||
context,
|
||||
block,
|
||||
this.resolver
|
||||
)
|
||||
} catch (error: any) {
|
||||
throw new Error(
|
||||
`Failed to evaluate ${loop.loopType} loop condition for "${block.id}": ${error.message}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// No max iterations for while/doWhile - rely on condition and workflow timeout
|
||||
maxIterations = Number.MAX_SAFE_INTEGER
|
||||
} else {
|
||||
maxIterations = loop.iterations || DEFAULT_MAX_ITERATIONS
|
||||
logger.info(`For loop ${block.id} - Max iterations: ${maxIterations}`)
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Loop ${block.id} - Current iteration: ${currentIteration}, Max iterations: ${maxIterations}`
|
||||
`Loop ${block.id} - Current iteration: ${currentIteration}, Max iterations: ${maxIterations}, Should continue: ${shouldContinueLoop}`
|
||||
)
|
||||
|
||||
if (currentIteration > maxIterations) {
|
||||
// For while and doWhile loops, check if the condition is false
|
||||
if ((loop.loopType === 'while' || loop.loopType === 'doWhile') && !shouldContinueLoop) {
|
||||
// Mark the loop as completed
|
||||
context.completedLoops.add(block.id)
|
||||
|
||||
// Remove any activated loop-start paths since we're not continuing
|
||||
const loopStartConnections =
|
||||
context.workflow?.connections.filter(
|
||||
(conn) => conn.source === block.id && conn.sourceHandle === 'loop-start-source'
|
||||
) || []
|
||||
|
||||
for (const conn of loopStartConnections) {
|
||||
context.activeExecutionPath.delete(conn.target)
|
||||
}
|
||||
|
||||
// Activate the loop-end connections (blocks after the loop)
|
||||
const loopEndConnections =
|
||||
context.workflow?.connections.filter(
|
||||
(conn) => conn.source === block.id && conn.sourceHandle === 'loop-end-source'
|
||||
) || []
|
||||
|
||||
for (const conn of loopEndConnections) {
|
||||
context.activeExecutionPath.add(conn.target)
|
||||
}
|
||||
|
||||
return {
|
||||
loopId: block.id,
|
||||
currentIteration,
|
||||
maxIterations,
|
||||
loopType: loop.loopType,
|
||||
completed: true,
|
||||
message: `${loop.loopType === 'doWhile' ? 'Do-While' : 'While'} loop completed after ${currentIteration} iterations (condition became false)`,
|
||||
} as Record<string, any>
|
||||
}
|
||||
|
||||
// Only check max iterations for for/forEach loops (while/doWhile have no limit)
|
||||
if (
|
||||
(loop.loopType === 'for' || loop.loopType === 'forEach') &&
|
||||
currentIteration > maxIterations
|
||||
) {
|
||||
logger.info(`Loop ${block.id} has reached maximum iterations (${maxIterations})`)
|
||||
|
||||
return {
|
||||
@@ -142,7 +225,23 @@ export class LoopBlockHandler implements BlockHandler {
|
||||
this.activateChildNodes(block, context, currentIteration)
|
||||
}
|
||||
|
||||
context.loopIterations.set(block.id, currentIteration)
|
||||
// For while/doWhile loops, now that condition is confirmed true, reset child blocks and increment counter
|
||||
if (loop.loopType === 'while' || loop.loopType === 'doWhile') {
|
||||
// Reset all child blocks for this iteration
|
||||
for (const nodeId of loop.nodes || []) {
|
||||
context.executedBlocks.delete(nodeId)
|
||||
context.blockStates.delete(nodeId)
|
||||
context.activeExecutionPath.delete(nodeId)
|
||||
context.decisions.router.delete(nodeId)
|
||||
context.decisions.condition.delete(nodeId)
|
||||
}
|
||||
|
||||
// Increment the counter for the next iteration
|
||||
context.loopIterations.set(block.id, currentIteration + 1)
|
||||
} else {
|
||||
// For for/forEach loops, keep the counter value - it will be managed by the loop manager
|
||||
context.loopIterations.set(block.id, currentIteration)
|
||||
}
|
||||
|
||||
return {
|
||||
loopId: block.id,
|
||||
|
||||
163
apps/sim/executor/handlers/variables/variables-handler.ts
Normal file
163
apps/sim/executor/handlers/variables/variables-handler.ts
Normal file
@@ -0,0 +1,163 @@
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import { BlockType } from '@/executor/consts'
|
||||
import type { BlockHandler, ExecutionContext } from '@/executor/types'
|
||||
import type { SerializedBlock } from '@/serializer/types'
|
||||
|
||||
const logger = createLogger('VariablesBlockHandler')
|
||||
|
||||
export class VariablesBlockHandler implements BlockHandler {
|
||||
canHandle(block: SerializedBlock): boolean {
|
||||
const canHandle = block.metadata?.id === BlockType.VARIABLES
|
||||
logger.info(`VariablesBlockHandler.canHandle: ${canHandle}`, {
|
||||
blockId: block.id,
|
||||
metadataId: block.metadata?.id,
|
||||
expectedType: BlockType.VARIABLES,
|
||||
})
|
||||
return canHandle
|
||||
}
|
||||
|
||||
async execute(
|
||||
block: SerializedBlock,
|
||||
inputs: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<BlockOutput> {
|
||||
logger.info(`Executing variables block: ${block.id}`, {
|
||||
blockName: block.metadata?.name,
|
||||
inputsKeys: Object.keys(inputs),
|
||||
variablesInput: inputs.variables,
|
||||
})
|
||||
|
||||
try {
|
||||
// Initialize workflowVariables if not present
|
||||
if (!context.workflowVariables) {
|
||||
context.workflowVariables = {}
|
||||
}
|
||||
|
||||
// Parse variable assignments from the custom input
|
||||
const assignments = this.parseAssignments(inputs.variables)
|
||||
|
||||
// Update context.workflowVariables with new values
|
||||
for (const assignment of assignments) {
|
||||
// Find the variable by ID or name
|
||||
const existingEntry = assignment.variableId
|
||||
? [assignment.variableId, context.workflowVariables[assignment.variableId]]
|
||||
: Object.entries(context.workflowVariables).find(
|
||||
([_, v]) => v.name === assignment.variableName
|
||||
)
|
||||
|
||||
if (existingEntry?.[1]) {
|
||||
// Update existing variable value
|
||||
const [id, variable] = existingEntry
|
||||
context.workflowVariables[id] = {
|
||||
...variable,
|
||||
value: assignment.value,
|
||||
}
|
||||
} else {
|
||||
logger.warn(`Variable "${assignment.variableName}" not found in workflow variables`)
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('Variables updated', {
|
||||
updatedVariables: assignments.map((a) => a.variableName),
|
||||
allVariables: Object.values(context.workflowVariables).map((v: any) => v.name),
|
||||
updatedValues: Object.entries(context.workflowVariables).map(([id, v]: [string, any]) => ({
|
||||
id,
|
||||
name: v.name,
|
||||
value: v.value,
|
||||
})),
|
||||
})
|
||||
|
||||
// Return assignments as a JSON object mapping variable names to values
|
||||
const assignmentsOutput: Record<string, any> = {}
|
||||
for (const assignment of assignments) {
|
||||
assignmentsOutput[assignment.variableName] = assignment.value
|
||||
}
|
||||
|
||||
return {
|
||||
assignments: assignmentsOutput,
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.error('Variables block execution failed:', error)
|
||||
throw new Error(`Variables block execution failed: ${error.message}`)
|
||||
}
|
||||
}
|
||||
|
||||
private parseAssignments(
|
||||
assignmentsInput: any
|
||||
): Array<{ variableId?: string; variableName: string; type: string; value: any }> {
|
||||
const result: Array<{ variableId?: string; variableName: string; type: string; value: any }> =
|
||||
[]
|
||||
|
||||
if (!assignmentsInput || !Array.isArray(assignmentsInput)) {
|
||||
return result
|
||||
}
|
||||
|
||||
for (const assignment of assignmentsInput) {
|
||||
if (assignment?.variableName?.trim()) {
|
||||
const name = assignment.variableName.trim()
|
||||
const type = assignment.type || 'string'
|
||||
const value = this.parseValueByType(assignment.value, type)
|
||||
|
||||
result.push({
|
||||
variableId: assignment.variableId,
|
||||
variableName: name,
|
||||
type,
|
||||
value,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
private parseValueByType(value: any, type: string): any {
|
||||
// Handle null/undefined early
|
||||
if (value === null || value === undefined) {
|
||||
if (type === 'number') return 0
|
||||
if (type === 'boolean') return false
|
||||
if (type === 'array') return []
|
||||
if (type === 'object') return {}
|
||||
return ''
|
||||
}
|
||||
|
||||
// Handle plain and string types (plain is for backward compatibility)
|
||||
if (type === 'string' || type === 'plain') {
|
||||
return typeof value === 'string' ? value : String(value)
|
||||
}
|
||||
|
||||
if (type === 'number') {
|
||||
if (typeof value === 'number') return value
|
||||
if (typeof value === 'string') {
|
||||
const num = Number(value)
|
||||
return Number.isNaN(num) ? 0 : num
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
if (type === 'boolean') {
|
||||
if (typeof value === 'boolean') return value
|
||||
if (typeof value === 'string') {
|
||||
return value.toLowerCase() === 'true'
|
||||
}
|
||||
return Boolean(value)
|
||||
}
|
||||
|
||||
if (type === 'object' || type === 'array') {
|
||||
if (typeof value === 'object' && value !== null) {
|
||||
return value
|
||||
}
|
||||
if (typeof value === 'string' && value.trim()) {
|
||||
try {
|
||||
return JSON.parse(value)
|
||||
} catch {
|
||||
return type === 'array' ? [] : {}
|
||||
}
|
||||
}
|
||||
return type === 'array' ? [] : {}
|
||||
}
|
||||
|
||||
// Default: return value as-is
|
||||
return value
|
||||
}
|
||||
}
|
||||
103
apps/sim/executor/handlers/wait/wait-handler.ts
Normal file
103
apps/sim/executor/handlers/wait/wait-handler.ts
Normal file
@@ -0,0 +1,103 @@
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { BlockType } from '@/executor/consts'
|
||||
import type { BlockHandler, ExecutionContext } from '@/executor/types'
|
||||
import type { SerializedBlock } from '@/serializer/types'
|
||||
|
||||
const logger = createLogger('WaitBlockHandler')
|
||||
|
||||
/**
|
||||
* Helper function to sleep for a specified number of milliseconds
|
||||
* On client-side: checks for cancellation every 100ms (non-blocking for UI)
|
||||
* On server-side: simple sleep without polling (server execution can't be cancelled mid-flight)
|
||||
*/
|
||||
const sleep = async (ms: number, checkCancelled?: () => boolean): Promise<boolean> => {
|
||||
const isClientSide = typeof window !== 'undefined'
|
||||
|
||||
// Server-side: simple sleep without polling
|
||||
if (!isClientSide) {
|
||||
await new Promise((resolve) => setTimeout(resolve, ms))
|
||||
return true
|
||||
}
|
||||
|
||||
// Client-side: check for cancellation every 100ms
|
||||
const chunkMs = 100
|
||||
let elapsed = 0
|
||||
|
||||
while (elapsed < ms) {
|
||||
// Check if execution was cancelled
|
||||
if (checkCancelled?.()) {
|
||||
return false // Sleep was interrupted
|
||||
}
|
||||
|
||||
// Sleep for a chunk or remaining time, whichever is smaller
|
||||
const sleepTime = Math.min(chunkMs, ms - elapsed)
|
||||
await new Promise((resolve) => setTimeout(resolve, sleepTime))
|
||||
elapsed += sleepTime
|
||||
}
|
||||
|
||||
return true // Sleep completed normally
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for Wait blocks that pause workflow execution for a time delay
|
||||
*/
|
||||
export class WaitBlockHandler implements BlockHandler {
|
||||
canHandle(block: SerializedBlock): boolean {
|
||||
return block.metadata?.id === BlockType.WAIT
|
||||
}
|
||||
|
||||
async execute(
|
||||
block: SerializedBlock,
|
||||
inputs: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<any> {
|
||||
logger.info(`Executing Wait block: ${block.id}`, { inputs })
|
||||
|
||||
// Parse the wait duration
|
||||
const timeValue = Number.parseInt(inputs.timeValue || '10', 10)
|
||||
const timeUnit = inputs.timeUnit || 'seconds'
|
||||
|
||||
// Validate time value
|
||||
if (Number.isNaN(timeValue) || timeValue <= 0) {
|
||||
throw new Error('Wait amount must be a positive number')
|
||||
}
|
||||
|
||||
// Calculate wait time in milliseconds
|
||||
let waitMs = timeValue * 1000 // Default to seconds
|
||||
if (timeUnit === 'minutes') {
|
||||
waitMs = timeValue * 60 * 1000
|
||||
}
|
||||
|
||||
// Enforce 10-minute maximum (600,000 ms)
|
||||
const maxWaitMs = 10 * 60 * 1000
|
||||
if (waitMs > maxWaitMs) {
|
||||
const maxDisplay = timeUnit === 'minutes' ? '10 minutes' : '600 seconds'
|
||||
throw new Error(`Wait time exceeds maximum of ${maxDisplay}`)
|
||||
}
|
||||
|
||||
logger.info(`Waiting for ${waitMs}ms (${timeValue} ${timeUnit})`)
|
||||
|
||||
// Actually sleep for the specified duration
|
||||
// The executor updates context.isCancelled when cancel() is called
|
||||
const checkCancelled = () => {
|
||||
// Check if execution was marked as cancelled in the context
|
||||
return (context as any).isCancelled === true
|
||||
}
|
||||
|
||||
const completed = await sleep(waitMs, checkCancelled)
|
||||
|
||||
if (!completed) {
|
||||
logger.info('Wait was interrupted by cancellation')
|
||||
return {
|
||||
waitDuration: waitMs,
|
||||
status: 'cancelled',
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('Wait completed successfully')
|
||||
return {
|
||||
waitDuration: waitMs,
|
||||
status: 'completed',
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,8 @@ import {
|
||||
ResponseBlockHandler,
|
||||
RouterBlockHandler,
|
||||
TriggerBlockHandler,
|
||||
VariablesBlockHandler,
|
||||
WaitBlockHandler,
|
||||
WorkflowBlockHandler,
|
||||
} from '@/executor/handlers'
|
||||
import { LoopManager } from '@/executor/loops/loops'
|
||||
@@ -213,6 +215,8 @@ export class Executor {
|
||||
new ParallelBlockHandler(this.resolver, this.pathTracker),
|
||||
new ResponseBlockHandler(),
|
||||
new WorkflowBlockHandler(),
|
||||
new VariablesBlockHandler(),
|
||||
new WaitBlockHandler(),
|
||||
new GenericBlockHandler(),
|
||||
]
|
||||
|
||||
@@ -1972,10 +1976,11 @@ export class Executor {
|
||||
? forEachItems.length
|
||||
: Object.keys(forEachItems).length
|
||||
}
|
||||
} else {
|
||||
// For regular loops, use the iterations count
|
||||
} else if (loop.loopType === 'for') {
|
||||
// For 'for' loops, use the iterations count
|
||||
iterationTotal = loop.iterations || 5
|
||||
}
|
||||
// For while/doWhile loops, don't set iterationTotal (no max)
|
||||
iterationType = 'loop'
|
||||
}
|
||||
}
|
||||
@@ -2099,10 +2104,11 @@ export class Executor {
|
||||
? forEachItems.length
|
||||
: Object.keys(forEachItems).length
|
||||
}
|
||||
} else {
|
||||
// For regular loops, use the iterations count
|
||||
} else if (loop.loopType === 'for') {
|
||||
// For 'for' loops, use the iterations count
|
||||
iterationTotal = loop.iterations || 5
|
||||
}
|
||||
// For while/doWhile loops, don't set iterationTotal (no max)
|
||||
iterationType = 'loop'
|
||||
}
|
||||
}
|
||||
@@ -2226,10 +2232,11 @@ export class Executor {
|
||||
? forEachItems.length
|
||||
: Object.keys(forEachItems).length
|
||||
}
|
||||
} else {
|
||||
// For regular loops, use the iterations count
|
||||
} else if (loop.loopType === 'for') {
|
||||
// For 'for' loops, use the iterations count
|
||||
iterationTotal = loop.iterations || 5
|
||||
}
|
||||
// For while/doWhile loops, don't set iterationTotal (no max)
|
||||
iterationType = 'loop'
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,12 +81,19 @@ export class LoopManager {
|
||||
)
|
||||
}
|
||||
}
|
||||
} else if (loop.loopType === 'while' || loop.loopType === 'doWhile') {
|
||||
// For while and doWhile loops, no max iteration limit
|
||||
// They rely on the condition to stop (and workflow timeout as safety)
|
||||
maxIterations = Number.MAX_SAFE_INTEGER
|
||||
}
|
||||
|
||||
logger.info(`Loop ${loopId} - Current: ${currentIteration}, Max: ${maxIterations}`)
|
||||
|
||||
// Check if we've completed all iterations
|
||||
if (currentIteration >= maxIterations) {
|
||||
// Check if we've completed all iterations (only for for/forEach loops)
|
||||
if (
|
||||
currentIteration >= maxIterations &&
|
||||
(loop.loopType === 'for' || loop.loopType === 'forEach')
|
||||
) {
|
||||
hasLoopReachedMaxIterations = true
|
||||
logger.info(`Loop ${loopId} has completed all ${maxIterations} iterations`)
|
||||
|
||||
@@ -131,15 +138,21 @@ export class LoopManager {
|
||||
|
||||
logger.info(`Loop ${loopId} - Completed and activated end connections`)
|
||||
} else {
|
||||
context.loopIterations.set(loopId, currentIteration + 1)
|
||||
logger.info(`Loop ${loopId} - Incremented counter to ${currentIteration + 1}`)
|
||||
// For while/doWhile loops, DON'T reset yet - let the loop handler check the condition first
|
||||
// The loop handler will decide whether to continue or exit based on the condition
|
||||
if (loop.loopType === 'while' || loop.loopType === 'doWhile') {
|
||||
// Just reset the loop block itself so it can re-evaluate the condition
|
||||
context.executedBlocks.delete(loopId)
|
||||
context.blockStates.delete(loopId)
|
||||
} else {
|
||||
// For for/forEach loops, increment and reset everything as usual
|
||||
context.loopIterations.set(loopId, currentIteration + 1)
|
||||
|
||||
this.resetLoopBlocks(loopId, loop, context)
|
||||
this.resetLoopBlocks(loopId, loop, context)
|
||||
|
||||
context.executedBlocks.delete(loopId)
|
||||
context.blockStates.delete(loopId)
|
||||
|
||||
logger.info(`Loop ${loopId} - Reset for iteration ${currentIteration + 1}`)
|
||||
context.executedBlocks.delete(loopId)
|
||||
context.blockStates.delete(loopId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,12 +229,20 @@ describe('PathTracker', () => {
|
||||
})
|
||||
|
||||
describe('loop blocks', () => {
|
||||
it('should only activate loop-start connections', () => {
|
||||
it('should activate loop-start connections when loop is not completed', () => {
|
||||
pathTracker.updateExecutionPaths(['loop1'], mockContext)
|
||||
|
||||
expect(mockContext.activeExecutionPath.has('block1')).toBe(true)
|
||||
expect(mockContext.activeExecutionPath.has('block2')).toBe(false)
|
||||
})
|
||||
|
||||
it('should not activate loop-start connections when loop is completed', () => {
|
||||
mockContext.completedLoops.add('loop1')
|
||||
pathTracker.updateExecutionPaths(['loop1'], mockContext)
|
||||
|
||||
expect(mockContext.activeExecutionPath.has('block1')).toBe(false)
|
||||
expect(mockContext.activeExecutionPath.has('block2')).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe('regular blocks', () => {
|
||||
|
||||
@@ -286,13 +286,18 @@ export class PathTracker {
|
||||
* Update paths for loop blocks
|
||||
*/
|
||||
private updateLoopPaths(block: SerializedBlock, context: ExecutionContext): void {
|
||||
// Don't activate loop-start connections if the loop has completed
|
||||
// (e.g., while loop condition is false)
|
||||
if (context.completedLoops.has(block.id)) {
|
||||
return
|
||||
}
|
||||
|
||||
const outgoingConnections = this.getOutgoingConnections(block.id)
|
||||
|
||||
for (const conn of outgoingConnections) {
|
||||
// Only activate loop-start connections
|
||||
if (conn.sourceHandle === 'loop-start-source') {
|
||||
context.activeExecutionPath.add(conn.target)
|
||||
logger.info(`Loop ${block.id} activated start path to: ${conn.target}`)
|
||||
}
|
||||
// loop-end-source connections will be activated by the loop manager
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user