mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-19 20:08:04 -05:00
Compare commits
2 Commits
main
...
fix/termin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ae55c6bd9 | ||
|
|
fa54d9989d |
599
apps/sim/executor/execution/engine.test.ts
Normal file
599
apps/sim/executor/execution/engine.test.ts
Normal file
@@ -0,0 +1,599 @@
|
|||||||
|
/**
|
||||||
|
* @vitest-environment node
|
||||||
|
*/
|
||||||
|
import { loggerMock } from '@sim/testing'
|
||||||
|
import { afterEach, beforeEach, describe, expect, it, type Mock, vi } from 'vitest'
|
||||||
|
|
||||||
|
vi.mock('@sim/logger', () => loggerMock)
|
||||||
|
|
||||||
|
vi.mock('@/lib/execution/cancellation', () => ({
|
||||||
|
isExecutionCancelled: vi.fn(),
|
||||||
|
isRedisCancellationEnabled: vi.fn(),
|
||||||
|
}))
|
||||||
|
|
||||||
|
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
|
||||||
|
import type { DAG, DAGNode } from '@/executor/dag/builder'
|
||||||
|
import type { EdgeManager } from '@/executor/execution/edge-manager'
|
||||||
|
import type { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
|
||||||
|
import type { ExecutionContext } from '@/executor/types'
|
||||||
|
import type { SerializedBlock } from '@/serializer/types'
|
||||||
|
import { ExecutionEngine } from './engine'
|
||||||
|
|
||||||
|
function createMockBlock(id: string): SerializedBlock {
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
metadata: { id: 'test', name: 'Test Block' },
|
||||||
|
position: { x: 0, y: 0 },
|
||||||
|
config: { tool: '', params: {} },
|
||||||
|
inputs: {},
|
||||||
|
outputs: {},
|
||||||
|
enabled: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function createMockNode(id: string, blockType = 'test'): DAGNode {
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
block: {
|
||||||
|
...createMockBlock(id),
|
||||||
|
metadata: { id: blockType, name: `Block ${id}` },
|
||||||
|
},
|
||||||
|
outgoingEdges: new Map(),
|
||||||
|
incomingEdges: new Set(),
|
||||||
|
metadata: {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function createMockContext(overrides: Partial<ExecutionContext> = {}): ExecutionContext {
|
||||||
|
return {
|
||||||
|
workflowId: 'test-workflow',
|
||||||
|
workspaceId: 'test-workspace',
|
||||||
|
executionId: 'test-execution',
|
||||||
|
userId: 'test-user',
|
||||||
|
blockStates: new Map(),
|
||||||
|
executedBlocks: new Set(),
|
||||||
|
blockLogs: [],
|
||||||
|
loopExecutions: new Map(),
|
||||||
|
parallelExecutions: new Map(),
|
||||||
|
completedLoops: new Set(),
|
||||||
|
activeExecutionPath: new Set(),
|
||||||
|
metadata: {
|
||||||
|
executionId: 'test-execution',
|
||||||
|
startTime: new Date().toISOString(),
|
||||||
|
pendingBlocks: [],
|
||||||
|
},
|
||||||
|
envVars: {},
|
||||||
|
...overrides,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function createMockDAG(nodes: DAGNode[]): DAG {
|
||||||
|
const nodeMap = new Map<string, DAGNode>()
|
||||||
|
nodes.forEach((node) => nodeMap.set(node.id, node))
|
||||||
|
return {
|
||||||
|
nodes: nodeMap,
|
||||||
|
loopConfigs: new Map(),
|
||||||
|
parallelConfigs: new Map(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface MockEdgeManager extends EdgeManager {
|
||||||
|
processOutgoingEdges: ReturnType<typeof vi.fn>
|
||||||
|
}
|
||||||
|
|
||||||
|
function createMockEdgeManager(
|
||||||
|
processOutgoingEdgesImpl?: (node: DAGNode) => string[]
|
||||||
|
): MockEdgeManager {
|
||||||
|
const mockFn = vi.fn().mockImplementation(processOutgoingEdgesImpl || (() => []))
|
||||||
|
return {
|
||||||
|
processOutgoingEdges: mockFn,
|
||||||
|
isNodeReady: vi.fn().mockReturnValue(true),
|
||||||
|
deactivateEdgeAndDescendants: vi.fn(),
|
||||||
|
restoreIncomingEdge: vi.fn(),
|
||||||
|
clearDeactivatedEdges: vi.fn(),
|
||||||
|
clearDeactivatedEdgesForNodes: vi.fn(),
|
||||||
|
} as unknown as MockEdgeManager
|
||||||
|
}
|
||||||
|
|
||||||
|
interface MockNodeOrchestrator extends NodeExecutionOrchestrator {
|
||||||
|
executionCount: number
|
||||||
|
}
|
||||||
|
|
||||||
|
function createMockNodeOrchestrator(executeDelay = 0): MockNodeOrchestrator {
|
||||||
|
const mock = {
|
||||||
|
executionCount: 0,
|
||||||
|
executeNode: vi.fn().mockImplementation(async () => {
|
||||||
|
mock.executionCount++
|
||||||
|
if (executeDelay > 0) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, executeDelay))
|
||||||
|
}
|
||||||
|
return { nodeId: 'test', output: {}, isFinalOutput: false }
|
||||||
|
}),
|
||||||
|
handleNodeCompletion: vi.fn(),
|
||||||
|
}
|
||||||
|
return mock as unknown as MockNodeOrchestrator
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('ExecutionEngine', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.clearAllMocks()
|
||||||
|
;(isExecutionCancelled as Mock).mockResolvedValue(false)
|
||||||
|
;(isRedisCancellationEnabled as Mock).mockReturnValue(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.useRealTimers()
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('Normal execution', () => {
|
||||||
|
it('should execute a simple linear workflow', async () => {
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const endNode = createMockNode('end', 'function')
|
||||||
|
startNode.outgoingEdges.set('edge1', { target: 'end' })
|
||||||
|
endNode.incomingEdges.add('start')
|
||||||
|
|
||||||
|
const dag = createMockDAG([startNode, endNode])
|
||||||
|
const context = createMockContext()
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
if (node.id === 'start') return ['end']
|
||||||
|
return []
|
||||||
|
})
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('start')
|
||||||
|
|
||||||
|
expect(result.success).toBe(true)
|
||||||
|
expect(nodeOrchestrator.executionCount).toBe(2)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should mark execution as successful when completed without cancellation', async () => {
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const dag = createMockDAG([startNode])
|
||||||
|
const context = createMockContext()
|
||||||
|
const edgeManager = createMockEdgeManager()
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('start')
|
||||||
|
|
||||||
|
expect(result.success).toBe(true)
|
||||||
|
expect(result.status).toBeUndefined()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should execute all nodes in a multi-node workflow', async () => {
|
||||||
|
const nodes = [
|
||||||
|
createMockNode('start', 'starter'),
|
||||||
|
createMockNode('middle1', 'function'),
|
||||||
|
createMockNode('middle2', 'function'),
|
||||||
|
createMockNode('end', 'function'),
|
||||||
|
]
|
||||||
|
|
||||||
|
nodes[0].outgoingEdges.set('e1', { target: 'middle1' })
|
||||||
|
nodes[1].outgoingEdges.set('e2', { target: 'middle2' })
|
||||||
|
nodes[2].outgoingEdges.set('e3', { target: 'end' })
|
||||||
|
|
||||||
|
const dag = createMockDAG(nodes)
|
||||||
|
const context = createMockContext()
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
if (node.id === 'start') return ['middle1']
|
||||||
|
if (node.id === 'middle1') return ['middle2']
|
||||||
|
if (node.id === 'middle2') return ['end']
|
||||||
|
return []
|
||||||
|
})
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('start')
|
||||||
|
|
||||||
|
expect(result.success).toBe(true)
|
||||||
|
expect(nodeOrchestrator.executionCount).toBe(4)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('Cancellation via AbortSignal', () => {
|
||||||
|
it('should stop execution immediately when aborted before start', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
abortController.abort()
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const dag = createMockDAG([startNode])
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
const edgeManager = createMockEdgeManager()
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('start')
|
||||||
|
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
expect(nodeOrchestrator.executionCount).toBe(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should stop execution when aborted mid-workflow', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
|
||||||
|
for (let i = 0; i < nodes.length - 1; i++) {
|
||||||
|
nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
|
||||||
|
}
|
||||||
|
|
||||||
|
const dag = createMockDAG(nodes)
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
|
||||||
|
let callCount = 0
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
callCount++
|
||||||
|
if (callCount === 2) abortController.abort()
|
||||||
|
const idx = Number.parseInt(node.id.replace('node', ''))
|
||||||
|
if (idx < 4) return [`node${idx + 1}`]
|
||||||
|
return []
|
||||||
|
})
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('node0')
|
||||||
|
|
||||||
|
expect(result.success).toBe(false)
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
expect(nodeOrchestrator.executionCount).toBeLessThan(5)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not wait for slow executions when cancelled', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const slowNode = createMockNode('slow', 'function')
|
||||||
|
startNode.outgoingEdges.set('edge1', { target: 'slow' })
|
||||||
|
|
||||||
|
const dag = createMockDAG([startNode, slowNode])
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
if (node.id === 'start') return ['slow']
|
||||||
|
return []
|
||||||
|
})
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator(500)
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
|
||||||
|
const executionPromise = engine.run('start')
|
||||||
|
setTimeout(() => abortController.abort(), 50)
|
||||||
|
|
||||||
|
const startTime = Date.now()
|
||||||
|
const result = await executionPromise
|
||||||
|
const duration = Date.now() - startTime
|
||||||
|
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
expect(duration).toBeLessThan(400)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should return cancelled status even if error thrown during cancellation', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
abortController.abort()
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const dag = createMockDAG([startNode])
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
const edgeManager = createMockEdgeManager()
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('start')
|
||||||
|
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
expect(result.success).toBe(false)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('Cancellation via Redis', () => {
|
||||||
|
it('should check Redis for cancellation when enabled', async () => {
|
||||||
|
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
|
||||||
|
;(isExecutionCancelled as Mock).mockResolvedValue(false)
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const dag = createMockDAG([startNode])
|
||||||
|
const context = createMockContext()
|
||||||
|
const edgeManager = createMockEdgeManager()
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
await engine.run('start')
|
||||||
|
|
||||||
|
expect(isExecutionCancelled as Mock).toHaveBeenCalled()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should stop execution when Redis reports cancellation', async () => {
|
||||||
|
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
|
||||||
|
|
||||||
|
let checkCount = 0
|
||||||
|
;(isExecutionCancelled as Mock).mockImplementation(async () => {
|
||||||
|
checkCount++
|
||||||
|
return checkCount > 1
|
||||||
|
})
|
||||||
|
|
||||||
|
const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
|
||||||
|
for (let i = 0; i < nodes.length - 1; i++) {
|
||||||
|
nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
|
||||||
|
}
|
||||||
|
|
||||||
|
const dag = createMockDAG(nodes)
|
||||||
|
const context = createMockContext()
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
const idx = Number.parseInt(node.id.replace('node', ''))
|
||||||
|
if (idx < 4) return [`node${idx + 1}`]
|
||||||
|
return []
|
||||||
|
})
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator(150)
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('node0')
|
||||||
|
|
||||||
|
expect(result.success).toBe(false)
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should respect cancellation check interval', async () => {
|
||||||
|
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
|
||||||
|
;(isExecutionCancelled as Mock).mockResolvedValue(false)
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const dag = createMockDAG([startNode])
|
||||||
|
const context = createMockContext()
|
||||||
|
const edgeManager = createMockEdgeManager()
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
await engine.run('start')
|
||||||
|
|
||||||
|
expect((isExecutionCancelled as Mock).mock.calls.length).toBeGreaterThanOrEqual(1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('Loop execution with cancellation', () => {
|
||||||
|
it('should break out of loop when cancelled mid-iteration', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
const loopStartNode = createMockNode('loop-start', 'loop_sentinel')
|
||||||
|
loopStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: 'loop1' }
|
||||||
|
|
||||||
|
const loopBodyNode = createMockNode('loop-body', 'function')
|
||||||
|
loopBodyNode.metadata = { isLoopNode: true, loopId: 'loop1' }
|
||||||
|
|
||||||
|
const loopEndNode = createMockNode('loop-end', 'loop_sentinel')
|
||||||
|
loopEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: 'loop1' }
|
||||||
|
|
||||||
|
loopStartNode.outgoingEdges.set('edge1', { target: 'loop-body' })
|
||||||
|
loopBodyNode.outgoingEdges.set('edge2', { target: 'loop-end' })
|
||||||
|
loopEndNode.outgoingEdges.set('loop_continue', {
|
||||||
|
target: 'loop-start',
|
||||||
|
sourceHandle: 'loop_continue',
|
||||||
|
})
|
||||||
|
|
||||||
|
const dag = createMockDAG([loopStartNode, loopBodyNode, loopEndNode])
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
|
||||||
|
let iterationCount = 0
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
if (node.id === 'loop-start') return ['loop-body']
|
||||||
|
if (node.id === 'loop-body') return ['loop-end']
|
||||||
|
if (node.id === 'loop-end') {
|
||||||
|
iterationCount++
|
||||||
|
if (iterationCount === 3) abortController.abort()
|
||||||
|
return ['loop-start']
|
||||||
|
}
|
||||||
|
return []
|
||||||
|
})
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator(5)
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('loop-start')
|
||||||
|
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
expect(iterationCount).toBeLessThan(100)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('Parallel execution with cancellation', () => {
|
||||||
|
it('should stop queueing parallel branches when cancelled', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const parallelNodes = Array.from({ length: 10 }, (_, i) =>
|
||||||
|
createMockNode(`parallel${i}`, 'function')
|
||||||
|
)
|
||||||
|
|
||||||
|
parallelNodes.forEach((_, i) => {
|
||||||
|
startNode.outgoingEdges.set(`edge${i}`, { target: `parallel${i}` })
|
||||||
|
})
|
||||||
|
|
||||||
|
const dag = createMockDAG([startNode, ...parallelNodes])
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
if (node.id === 'start') {
|
||||||
|
return parallelNodes.map((_, i) => `parallel${i}`)
|
||||||
|
}
|
||||||
|
return []
|
||||||
|
})
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator(50)
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
|
||||||
|
const executionPromise = engine.run('start')
|
||||||
|
setTimeout(() => abortController.abort(), 30)
|
||||||
|
|
||||||
|
const result = await executionPromise
|
||||||
|
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
expect(nodeOrchestrator.executionCount).toBeLessThan(11)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not wait for all parallel branches when cancelled', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const slowNodes = Array.from({ length: 5 }, (_, i) => createMockNode(`slow${i}`, 'function'))
|
||||||
|
|
||||||
|
slowNodes.forEach((_, i) => {
|
||||||
|
startNode.outgoingEdges.set(`edge${i}`, { target: `slow${i}` })
|
||||||
|
})
|
||||||
|
|
||||||
|
const dag = createMockDAG([startNode, ...slowNodes])
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
if (node.id === 'start') return slowNodes.map((_, i) => `slow${i}`)
|
||||||
|
return []
|
||||||
|
})
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator(200)
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
|
||||||
|
const executionPromise = engine.run('start')
|
||||||
|
setTimeout(() => abortController.abort(), 50)
|
||||||
|
|
||||||
|
const startTime = Date.now()
|
||||||
|
const result = await executionPromise
|
||||||
|
const duration = Date.now() - startTime
|
||||||
|
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
expect(duration).toBeLessThan(500)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('Edge cases', () => {
|
||||||
|
it('should handle empty DAG gracefully', async () => {
|
||||||
|
const dag = createMockDAG([])
|
||||||
|
const context = createMockContext()
|
||||||
|
const edgeManager = createMockEdgeManager()
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run()
|
||||||
|
|
||||||
|
expect(result.success).toBe(true)
|
||||||
|
expect(nodeOrchestrator.executionCount).toBe(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should preserve partial output when cancelled', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const endNode = createMockNode('end', 'function')
|
||||||
|
endNode.outgoingEdges = new Map()
|
||||||
|
|
||||||
|
startNode.outgoingEdges.set('edge1', { target: 'end' })
|
||||||
|
|
||||||
|
const dag = createMockDAG([startNode, endNode])
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
if (node.id === 'start') return ['end']
|
||||||
|
return []
|
||||||
|
})
|
||||||
|
|
||||||
|
const nodeOrchestrator = {
|
||||||
|
executionCount: 0,
|
||||||
|
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
|
||||||
|
if (nodeId === 'start') {
|
||||||
|
return { nodeId: 'start', output: { startData: 'value' }, isFinalOutput: false }
|
||||||
|
}
|
||||||
|
abortController.abort()
|
||||||
|
return { nodeId: 'end', output: { endData: 'value' }, isFinalOutput: true }
|
||||||
|
}),
|
||||||
|
handleNodeCompletion: vi.fn(),
|
||||||
|
} as unknown as MockNodeOrchestrator
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('start')
|
||||||
|
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
expect(result.output).toBeDefined()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should populate metadata on cancellation', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
abortController.abort()
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const dag = createMockDAG([startNode])
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
const edgeManager = createMockEdgeManager()
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('start')
|
||||||
|
|
||||||
|
expect(result.metadata).toBeDefined()
|
||||||
|
expect(result.metadata.endTime).toBeDefined()
|
||||||
|
expect(result.metadata.duration).toBeDefined()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should return logs even when cancelled', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
const startNode = createMockNode('start', 'starter')
|
||||||
|
const dag = createMockDAG([startNode])
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
context.blockLogs.push({
|
||||||
|
blockId: 'test',
|
||||||
|
blockName: 'Test',
|
||||||
|
blockType: 'test',
|
||||||
|
startedAt: '',
|
||||||
|
endedAt: '',
|
||||||
|
durationMs: 0,
|
||||||
|
success: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
const edgeManager = createMockEdgeManager()
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
abortController.abort()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('start')
|
||||||
|
|
||||||
|
expect(result.logs).toBeDefined()
|
||||||
|
expect(result.logs.length).toBeGreaterThan(0)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('Cancellation flag behavior', () => {
|
||||||
|
it('should set cancelledFlag when abort signal fires', async () => {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
const nodes = Array.from({ length: 3 }, (_, i) => createMockNode(`node${i}`, 'function'))
|
||||||
|
for (let i = 0; i < nodes.length - 1; i++) {
|
||||||
|
nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
|
||||||
|
}
|
||||||
|
|
||||||
|
const dag = createMockDAG(nodes)
|
||||||
|
const context = createMockContext({ abortSignal: abortController.signal })
|
||||||
|
const edgeManager = createMockEdgeManager((node) => {
|
||||||
|
if (node.id === 'node0') {
|
||||||
|
abortController.abort()
|
||||||
|
return ['node1']
|
||||||
|
}
|
||||||
|
return node.id === 'node1' ? ['node2'] : []
|
||||||
|
})
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
const result = await engine.run('node0')
|
||||||
|
|
||||||
|
expect(result.status).toBe('cancelled')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should cache Redis cancellation result', async () => {
|
||||||
|
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
|
||||||
|
;(isExecutionCancelled as Mock).mockResolvedValue(true)
|
||||||
|
|
||||||
|
const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
|
||||||
|
const dag = createMockDAG(nodes)
|
||||||
|
const context = createMockContext()
|
||||||
|
const edgeManager = createMockEdgeManager()
|
||||||
|
const nodeOrchestrator = createMockNodeOrchestrator()
|
||||||
|
|
||||||
|
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||||
|
await engine.run('node0')
|
||||||
|
|
||||||
|
expect((isExecutionCancelled as Mock).mock.calls.length).toBeLessThanOrEqual(3)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
@@ -28,6 +28,8 @@ export class ExecutionEngine {
|
|||||||
private lastCancellationCheck = 0
|
private lastCancellationCheck = 0
|
||||||
private readonly useRedisCancellation: boolean
|
private readonly useRedisCancellation: boolean
|
||||||
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
|
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
|
||||||
|
private abortPromise: Promise<void> | null = null
|
||||||
|
private abortResolve: (() => void) | null = null
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private context: ExecutionContext,
|
private context: ExecutionContext,
|
||||||
@@ -37,6 +39,34 @@ export class ExecutionEngine {
|
|||||||
) {
|
) {
|
||||||
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
|
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
|
||||||
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
|
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
|
||||||
|
this.initializeAbortHandler()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets up a single abort promise that can be reused throughout execution.
|
||||||
|
* This avoids creating multiple event listeners and potential memory leaks.
|
||||||
|
*/
|
||||||
|
private initializeAbortHandler(): void {
|
||||||
|
if (!this.context.abortSignal) return
|
||||||
|
|
||||||
|
if (this.context.abortSignal.aborted) {
|
||||||
|
this.cancelledFlag = true
|
||||||
|
this.abortPromise = Promise.resolve()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this.abortPromise = new Promise<void>((resolve) => {
|
||||||
|
this.abortResolve = resolve
|
||||||
|
})
|
||||||
|
|
||||||
|
this.context.abortSignal.addEventListener(
|
||||||
|
'abort',
|
||||||
|
() => {
|
||||||
|
this.cancelledFlag = true
|
||||||
|
this.abortResolve?.()
|
||||||
|
},
|
||||||
|
{ once: true }
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private async checkCancellation(): Promise<boolean> {
|
private async checkCancellation(): Promise<boolean> {
|
||||||
@@ -73,12 +103,15 @@ export class ExecutionEngine {
|
|||||||
this.initializeQueue(triggerBlockId)
|
this.initializeQueue(triggerBlockId)
|
||||||
|
|
||||||
while (this.hasWork()) {
|
while (this.hasWork()) {
|
||||||
if ((await this.checkCancellation()) && this.executing.size === 0) {
|
if (await this.checkCancellation()) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
await this.processQueue()
|
await this.processQueue()
|
||||||
}
|
}
|
||||||
await this.waitForAllExecutions()
|
|
||||||
|
if (!this.cancelledFlag) {
|
||||||
|
await this.waitForAllExecutions()
|
||||||
|
}
|
||||||
|
|
||||||
if (this.pausedBlocks.size > 0) {
|
if (this.pausedBlocks.size > 0) {
|
||||||
return this.buildPausedResult(startTime)
|
return this.buildPausedResult(startTime)
|
||||||
@@ -164,11 +197,7 @@ export class ExecutionEngine {
|
|||||||
|
|
||||||
private trackExecution(promise: Promise<void>): void {
|
private trackExecution(promise: Promise<void>): void {
|
||||||
this.executing.add(promise)
|
this.executing.add(promise)
|
||||||
// Attach error handler to prevent unhandled rejection warnings
|
promise.catch(() => {})
|
||||||
// The actual error handling happens in waitForAllExecutions/waitForAnyExecution
|
|
||||||
promise.catch(() => {
|
|
||||||
// Error will be properly handled by Promise.all/Promise.race in wait methods
|
|
||||||
})
|
|
||||||
promise.finally(() => {
|
promise.finally(() => {
|
||||||
this.executing.delete(promise)
|
this.executing.delete(promise)
|
||||||
})
|
})
|
||||||
@@ -176,12 +205,30 @@ export class ExecutionEngine {
|
|||||||
|
|
||||||
private async waitForAnyExecution(): Promise<void> {
|
private async waitForAnyExecution(): Promise<void> {
|
||||||
if (this.executing.size > 0) {
|
if (this.executing.size > 0) {
|
||||||
await Promise.race(this.executing)
|
const abortPromise = this.getAbortPromise()
|
||||||
|
if (abortPromise) {
|
||||||
|
await Promise.race([...this.executing, abortPromise])
|
||||||
|
} else {
|
||||||
|
await Promise.race(this.executing)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async waitForAllExecutions(): Promise<void> {
|
private async waitForAllExecutions(): Promise<void> {
|
||||||
await Promise.all(Array.from(this.executing))
|
const abortPromise = this.getAbortPromise()
|
||||||
|
if (abortPromise) {
|
||||||
|
await Promise.race([Promise.all(this.executing), abortPromise])
|
||||||
|
} else {
|
||||||
|
await Promise.all(this.executing)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the cached abort promise. This is safe to call multiple times
|
||||||
|
* as it reuses the same promise instance created during initialization.
|
||||||
|
*/
|
||||||
|
private getAbortPromise(): Promise<void> | null {
|
||||||
|
return this.abortPromise
|
||||||
}
|
}
|
||||||
|
|
||||||
private async withQueueLock<T>(fn: () => Promise<T> | T): Promise<T> {
|
private async withQueueLock<T>(fn: () => Promise<T> | T): Promise<T> {
|
||||||
@@ -277,7 +324,7 @@ export class ExecutionEngine {
|
|||||||
this.trackExecution(promise)
|
this.trackExecution(promise)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.executing.size > 0) {
|
if (this.executing.size > 0 && !this.cancelledFlag) {
|
||||||
await this.waitForAnyExecution()
|
await this.waitForAnyExecution()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -336,7 +383,6 @@ export class ExecutionEngine {
|
|||||||
|
|
||||||
this.addMultipleToQueue(readyNodes)
|
this.addMultipleToQueue(readyNodes)
|
||||||
|
|
||||||
// Check for dynamically added nodes (e.g., from parallel expansion)
|
|
||||||
if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) {
|
if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) {
|
||||||
const dynamicNodes = this.context.pendingDynamicNodes
|
const dynamicNodes = this.context.pendingDynamicNodes
|
||||||
this.context.pendingDynamicNodes = []
|
this.context.pendingDynamicNodes = []
|
||||||
|
|||||||
Reference in New Issue
Block a user