mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-19 20:08:04 -05:00
Compare commits
30 Commits
fix/termin
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b09f683072 | ||
|
|
a8bb0db660 | ||
|
|
af82820a28 | ||
|
|
4372841797 | ||
|
|
5e8c843241 | ||
|
|
7bf3d73ee6 | ||
|
|
7ffc11a738 | ||
|
|
be578e2ed7 | ||
|
|
f415e5edc4 | ||
|
|
13a6e6c3fa | ||
|
|
f5ab7f21ae | ||
|
|
bfb6fffe38 | ||
|
|
4fbec0a43f | ||
|
|
585f5e365b | ||
|
|
3792bdd252 | ||
|
|
eb5d1f3e5b | ||
|
|
54ab82c8dd | ||
|
|
f895bf469b | ||
|
|
dd3209af06 | ||
|
|
b6ba3b50a7 | ||
|
|
b304233062 | ||
|
|
57e4b49bd6 | ||
|
|
e12dd204ed | ||
|
|
3d9d9cbc54 | ||
|
|
0f4ec962ad | ||
|
|
4827866f9a | ||
|
|
3e697d9ed9 | ||
|
|
4431a1a484 | ||
|
|
4d1a9a3f22 | ||
|
|
eb07a080fb |
@@ -1,599 +0,0 @@
|
|||||||
/**
|
|
||||||
* @vitest-environment node
|
|
||||||
*/
|
|
||||||
import { loggerMock } from '@sim/testing'
|
|
||||||
import { afterEach, beforeEach, describe, expect, it, type Mock, vi } from 'vitest'
|
|
||||||
|
|
||||||
vi.mock('@sim/logger', () => loggerMock)
|
|
||||||
|
|
||||||
vi.mock('@/lib/execution/cancellation', () => ({
|
|
||||||
isExecutionCancelled: vi.fn(),
|
|
||||||
isRedisCancellationEnabled: vi.fn(),
|
|
||||||
}))
|
|
||||||
|
|
||||||
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
|
|
||||||
import type { DAG, DAGNode } from '@/executor/dag/builder'
|
|
||||||
import type { EdgeManager } from '@/executor/execution/edge-manager'
|
|
||||||
import type { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
|
|
||||||
import type { ExecutionContext } from '@/executor/types'
|
|
||||||
import type { SerializedBlock } from '@/serializer/types'
|
|
||||||
import { ExecutionEngine } from './engine'
|
|
||||||
|
|
||||||
function createMockBlock(id: string): SerializedBlock {
|
|
||||||
return {
|
|
||||||
id,
|
|
||||||
metadata: { id: 'test', name: 'Test Block' },
|
|
||||||
position: { x: 0, y: 0 },
|
|
||||||
config: { tool: '', params: {} },
|
|
||||||
inputs: {},
|
|
||||||
outputs: {},
|
|
||||||
enabled: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function createMockNode(id: string, blockType = 'test'): DAGNode {
|
|
||||||
return {
|
|
||||||
id,
|
|
||||||
block: {
|
|
||||||
...createMockBlock(id),
|
|
||||||
metadata: { id: blockType, name: `Block ${id}` },
|
|
||||||
},
|
|
||||||
outgoingEdges: new Map(),
|
|
||||||
incomingEdges: new Set(),
|
|
||||||
metadata: {},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function createMockContext(overrides: Partial<ExecutionContext> = {}): ExecutionContext {
|
|
||||||
return {
|
|
||||||
workflowId: 'test-workflow',
|
|
||||||
workspaceId: 'test-workspace',
|
|
||||||
executionId: 'test-execution',
|
|
||||||
userId: 'test-user',
|
|
||||||
blockStates: new Map(),
|
|
||||||
executedBlocks: new Set(),
|
|
||||||
blockLogs: [],
|
|
||||||
loopExecutions: new Map(),
|
|
||||||
parallelExecutions: new Map(),
|
|
||||||
completedLoops: new Set(),
|
|
||||||
activeExecutionPath: new Set(),
|
|
||||||
metadata: {
|
|
||||||
executionId: 'test-execution',
|
|
||||||
startTime: new Date().toISOString(),
|
|
||||||
pendingBlocks: [],
|
|
||||||
},
|
|
||||||
envVars: {},
|
|
||||||
...overrides,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function createMockDAG(nodes: DAGNode[]): DAG {
|
|
||||||
const nodeMap = new Map<string, DAGNode>()
|
|
||||||
nodes.forEach((node) => nodeMap.set(node.id, node))
|
|
||||||
return {
|
|
||||||
nodes: nodeMap,
|
|
||||||
loopConfigs: new Map(),
|
|
||||||
parallelConfigs: new Map(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
interface MockEdgeManager extends EdgeManager {
|
|
||||||
processOutgoingEdges: ReturnType<typeof vi.fn>
|
|
||||||
}
|
|
||||||
|
|
||||||
function createMockEdgeManager(
|
|
||||||
processOutgoingEdgesImpl?: (node: DAGNode) => string[]
|
|
||||||
): MockEdgeManager {
|
|
||||||
const mockFn = vi.fn().mockImplementation(processOutgoingEdgesImpl || (() => []))
|
|
||||||
return {
|
|
||||||
processOutgoingEdges: mockFn,
|
|
||||||
isNodeReady: vi.fn().mockReturnValue(true),
|
|
||||||
deactivateEdgeAndDescendants: vi.fn(),
|
|
||||||
restoreIncomingEdge: vi.fn(),
|
|
||||||
clearDeactivatedEdges: vi.fn(),
|
|
||||||
clearDeactivatedEdgesForNodes: vi.fn(),
|
|
||||||
} as unknown as MockEdgeManager
|
|
||||||
}
|
|
||||||
|
|
||||||
interface MockNodeOrchestrator extends NodeExecutionOrchestrator {
|
|
||||||
executionCount: number
|
|
||||||
}
|
|
||||||
|
|
||||||
function createMockNodeOrchestrator(executeDelay = 0): MockNodeOrchestrator {
|
|
||||||
const mock = {
|
|
||||||
executionCount: 0,
|
|
||||||
executeNode: vi.fn().mockImplementation(async () => {
|
|
||||||
mock.executionCount++
|
|
||||||
if (executeDelay > 0) {
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, executeDelay))
|
|
||||||
}
|
|
||||||
return { nodeId: 'test', output: {}, isFinalOutput: false }
|
|
||||||
}),
|
|
||||||
handleNodeCompletion: vi.fn(),
|
|
||||||
}
|
|
||||||
return mock as unknown as MockNodeOrchestrator
|
|
||||||
}
|
|
||||||
|
|
||||||
describe('ExecutionEngine', () => {
|
|
||||||
beforeEach(() => {
|
|
||||||
vi.clearAllMocks()
|
|
||||||
;(isExecutionCancelled as Mock).mockResolvedValue(false)
|
|
||||||
;(isRedisCancellationEnabled as Mock).mockReturnValue(false)
|
|
||||||
})
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
vi.useRealTimers()
|
|
||||||
})
|
|
||||||
|
|
||||||
describe('Normal execution', () => {
|
|
||||||
it('should execute a simple linear workflow', async () => {
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const endNode = createMockNode('end', 'function')
|
|
||||||
startNode.outgoingEdges.set('edge1', { target: 'end' })
|
|
||||||
endNode.incomingEdges.add('start')
|
|
||||||
|
|
||||||
const dag = createMockDAG([startNode, endNode])
|
|
||||||
const context = createMockContext()
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
if (node.id === 'start') return ['end']
|
|
||||||
return []
|
|
||||||
})
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('start')
|
|
||||||
|
|
||||||
expect(result.success).toBe(true)
|
|
||||||
expect(nodeOrchestrator.executionCount).toBe(2)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should mark execution as successful when completed without cancellation', async () => {
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const dag = createMockDAG([startNode])
|
|
||||||
const context = createMockContext()
|
|
||||||
const edgeManager = createMockEdgeManager()
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('start')
|
|
||||||
|
|
||||||
expect(result.success).toBe(true)
|
|
||||||
expect(result.status).toBeUndefined()
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should execute all nodes in a multi-node workflow', async () => {
|
|
||||||
const nodes = [
|
|
||||||
createMockNode('start', 'starter'),
|
|
||||||
createMockNode('middle1', 'function'),
|
|
||||||
createMockNode('middle2', 'function'),
|
|
||||||
createMockNode('end', 'function'),
|
|
||||||
]
|
|
||||||
|
|
||||||
nodes[0].outgoingEdges.set('e1', { target: 'middle1' })
|
|
||||||
nodes[1].outgoingEdges.set('e2', { target: 'middle2' })
|
|
||||||
nodes[2].outgoingEdges.set('e3', { target: 'end' })
|
|
||||||
|
|
||||||
const dag = createMockDAG(nodes)
|
|
||||||
const context = createMockContext()
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
if (node.id === 'start') return ['middle1']
|
|
||||||
if (node.id === 'middle1') return ['middle2']
|
|
||||||
if (node.id === 'middle2') return ['end']
|
|
||||||
return []
|
|
||||||
})
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('start')
|
|
||||||
|
|
||||||
expect(result.success).toBe(true)
|
|
||||||
expect(nodeOrchestrator.executionCount).toBe(4)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
describe('Cancellation via AbortSignal', () => {
|
|
||||||
it('should stop execution immediately when aborted before start', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
abortController.abort()
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const dag = createMockDAG([startNode])
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
const edgeManager = createMockEdgeManager()
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('start')
|
|
||||||
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
expect(nodeOrchestrator.executionCount).toBe(0)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should stop execution when aborted mid-workflow', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
|
|
||||||
const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
|
|
||||||
for (let i = 0; i < nodes.length - 1; i++) {
|
|
||||||
nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
|
|
||||||
}
|
|
||||||
|
|
||||||
const dag = createMockDAG(nodes)
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
|
|
||||||
let callCount = 0
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
callCount++
|
|
||||||
if (callCount === 2) abortController.abort()
|
|
||||||
const idx = Number.parseInt(node.id.replace('node', ''))
|
|
||||||
if (idx < 4) return [`node${idx + 1}`]
|
|
||||||
return []
|
|
||||||
})
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('node0')
|
|
||||||
|
|
||||||
expect(result.success).toBe(false)
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
expect(nodeOrchestrator.executionCount).toBeLessThan(5)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should not wait for slow executions when cancelled', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const slowNode = createMockNode('slow', 'function')
|
|
||||||
startNode.outgoingEdges.set('edge1', { target: 'slow' })
|
|
||||||
|
|
||||||
const dag = createMockDAG([startNode, slowNode])
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
if (node.id === 'start') return ['slow']
|
|
||||||
return []
|
|
||||||
})
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator(500)
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
|
|
||||||
const executionPromise = engine.run('start')
|
|
||||||
setTimeout(() => abortController.abort(), 50)
|
|
||||||
|
|
||||||
const startTime = Date.now()
|
|
||||||
const result = await executionPromise
|
|
||||||
const duration = Date.now() - startTime
|
|
||||||
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
expect(duration).toBeLessThan(400)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should return cancelled status even if error thrown during cancellation', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
abortController.abort()
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const dag = createMockDAG([startNode])
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
const edgeManager = createMockEdgeManager()
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('start')
|
|
||||||
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
expect(result.success).toBe(false)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
describe('Cancellation via Redis', () => {
|
|
||||||
it('should check Redis for cancellation when enabled', async () => {
|
|
||||||
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
|
|
||||||
;(isExecutionCancelled as Mock).mockResolvedValue(false)
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const dag = createMockDAG([startNode])
|
|
||||||
const context = createMockContext()
|
|
||||||
const edgeManager = createMockEdgeManager()
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
await engine.run('start')
|
|
||||||
|
|
||||||
expect(isExecutionCancelled as Mock).toHaveBeenCalled()
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should stop execution when Redis reports cancellation', async () => {
|
|
||||||
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
|
|
||||||
|
|
||||||
let checkCount = 0
|
|
||||||
;(isExecutionCancelled as Mock).mockImplementation(async () => {
|
|
||||||
checkCount++
|
|
||||||
return checkCount > 1
|
|
||||||
})
|
|
||||||
|
|
||||||
const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
|
|
||||||
for (let i = 0; i < nodes.length - 1; i++) {
|
|
||||||
nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
|
|
||||||
}
|
|
||||||
|
|
||||||
const dag = createMockDAG(nodes)
|
|
||||||
const context = createMockContext()
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
const idx = Number.parseInt(node.id.replace('node', ''))
|
|
||||||
if (idx < 4) return [`node${idx + 1}`]
|
|
||||||
return []
|
|
||||||
})
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator(150)
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('node0')
|
|
||||||
|
|
||||||
expect(result.success).toBe(false)
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should respect cancellation check interval', async () => {
|
|
||||||
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
|
|
||||||
;(isExecutionCancelled as Mock).mockResolvedValue(false)
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const dag = createMockDAG([startNode])
|
|
||||||
const context = createMockContext()
|
|
||||||
const edgeManager = createMockEdgeManager()
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
await engine.run('start')
|
|
||||||
|
|
||||||
expect((isExecutionCancelled as Mock).mock.calls.length).toBeGreaterThanOrEqual(1)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
describe('Loop execution with cancellation', () => {
|
|
||||||
it('should break out of loop when cancelled mid-iteration', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
|
|
||||||
const loopStartNode = createMockNode('loop-start', 'loop_sentinel')
|
|
||||||
loopStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: 'loop1' }
|
|
||||||
|
|
||||||
const loopBodyNode = createMockNode('loop-body', 'function')
|
|
||||||
loopBodyNode.metadata = { isLoopNode: true, loopId: 'loop1' }
|
|
||||||
|
|
||||||
const loopEndNode = createMockNode('loop-end', 'loop_sentinel')
|
|
||||||
loopEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: 'loop1' }
|
|
||||||
|
|
||||||
loopStartNode.outgoingEdges.set('edge1', { target: 'loop-body' })
|
|
||||||
loopBodyNode.outgoingEdges.set('edge2', { target: 'loop-end' })
|
|
||||||
loopEndNode.outgoingEdges.set('loop_continue', {
|
|
||||||
target: 'loop-start',
|
|
||||||
sourceHandle: 'loop_continue',
|
|
||||||
})
|
|
||||||
|
|
||||||
const dag = createMockDAG([loopStartNode, loopBodyNode, loopEndNode])
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
|
|
||||||
let iterationCount = 0
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
if (node.id === 'loop-start') return ['loop-body']
|
|
||||||
if (node.id === 'loop-body') return ['loop-end']
|
|
||||||
if (node.id === 'loop-end') {
|
|
||||||
iterationCount++
|
|
||||||
if (iterationCount === 3) abortController.abort()
|
|
||||||
return ['loop-start']
|
|
||||||
}
|
|
||||||
return []
|
|
||||||
})
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator(5)
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('loop-start')
|
|
||||||
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
expect(iterationCount).toBeLessThan(100)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
describe('Parallel execution with cancellation', () => {
|
|
||||||
it('should stop queueing parallel branches when cancelled', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const parallelNodes = Array.from({ length: 10 }, (_, i) =>
|
|
||||||
createMockNode(`parallel${i}`, 'function')
|
|
||||||
)
|
|
||||||
|
|
||||||
parallelNodes.forEach((_, i) => {
|
|
||||||
startNode.outgoingEdges.set(`edge${i}`, { target: `parallel${i}` })
|
|
||||||
})
|
|
||||||
|
|
||||||
const dag = createMockDAG([startNode, ...parallelNodes])
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
if (node.id === 'start') {
|
|
||||||
return parallelNodes.map((_, i) => `parallel${i}`)
|
|
||||||
}
|
|
||||||
return []
|
|
||||||
})
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator(50)
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
|
|
||||||
const executionPromise = engine.run('start')
|
|
||||||
setTimeout(() => abortController.abort(), 30)
|
|
||||||
|
|
||||||
const result = await executionPromise
|
|
||||||
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
expect(nodeOrchestrator.executionCount).toBeLessThan(11)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should not wait for all parallel branches when cancelled', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const slowNodes = Array.from({ length: 5 }, (_, i) => createMockNode(`slow${i}`, 'function'))
|
|
||||||
|
|
||||||
slowNodes.forEach((_, i) => {
|
|
||||||
startNode.outgoingEdges.set(`edge${i}`, { target: `slow${i}` })
|
|
||||||
})
|
|
||||||
|
|
||||||
const dag = createMockDAG([startNode, ...slowNodes])
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
if (node.id === 'start') return slowNodes.map((_, i) => `slow${i}`)
|
|
||||||
return []
|
|
||||||
})
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator(200)
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
|
|
||||||
const executionPromise = engine.run('start')
|
|
||||||
setTimeout(() => abortController.abort(), 50)
|
|
||||||
|
|
||||||
const startTime = Date.now()
|
|
||||||
const result = await executionPromise
|
|
||||||
const duration = Date.now() - startTime
|
|
||||||
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
expect(duration).toBeLessThan(500)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
describe('Edge cases', () => {
|
|
||||||
it('should handle empty DAG gracefully', async () => {
|
|
||||||
const dag = createMockDAG([])
|
|
||||||
const context = createMockContext()
|
|
||||||
const edgeManager = createMockEdgeManager()
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run()
|
|
||||||
|
|
||||||
expect(result.success).toBe(true)
|
|
||||||
expect(nodeOrchestrator.executionCount).toBe(0)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should preserve partial output when cancelled', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const endNode = createMockNode('end', 'function')
|
|
||||||
endNode.outgoingEdges = new Map()
|
|
||||||
|
|
||||||
startNode.outgoingEdges.set('edge1', { target: 'end' })
|
|
||||||
|
|
||||||
const dag = createMockDAG([startNode, endNode])
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
if (node.id === 'start') return ['end']
|
|
||||||
return []
|
|
||||||
})
|
|
||||||
|
|
||||||
const nodeOrchestrator = {
|
|
||||||
executionCount: 0,
|
|
||||||
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
|
|
||||||
if (nodeId === 'start') {
|
|
||||||
return { nodeId: 'start', output: { startData: 'value' }, isFinalOutput: false }
|
|
||||||
}
|
|
||||||
abortController.abort()
|
|
||||||
return { nodeId: 'end', output: { endData: 'value' }, isFinalOutput: true }
|
|
||||||
}),
|
|
||||||
handleNodeCompletion: vi.fn(),
|
|
||||||
} as unknown as MockNodeOrchestrator
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('start')
|
|
||||||
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
expect(result.output).toBeDefined()
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should populate metadata on cancellation', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
abortController.abort()
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const dag = createMockDAG([startNode])
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
const edgeManager = createMockEdgeManager()
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('start')
|
|
||||||
|
|
||||||
expect(result.metadata).toBeDefined()
|
|
||||||
expect(result.metadata.endTime).toBeDefined()
|
|
||||||
expect(result.metadata.duration).toBeDefined()
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should return logs even when cancelled', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
|
|
||||||
const startNode = createMockNode('start', 'starter')
|
|
||||||
const dag = createMockDAG([startNode])
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
context.blockLogs.push({
|
|
||||||
blockId: 'test',
|
|
||||||
blockName: 'Test',
|
|
||||||
blockType: 'test',
|
|
||||||
startedAt: '',
|
|
||||||
endedAt: '',
|
|
||||||
durationMs: 0,
|
|
||||||
success: true,
|
|
||||||
})
|
|
||||||
|
|
||||||
const edgeManager = createMockEdgeManager()
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
abortController.abort()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('start')
|
|
||||||
|
|
||||||
expect(result.logs).toBeDefined()
|
|
||||||
expect(result.logs.length).toBeGreaterThan(0)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
describe('Cancellation flag behavior', () => {
|
|
||||||
it('should set cancelledFlag when abort signal fires', async () => {
|
|
||||||
const abortController = new AbortController()
|
|
||||||
|
|
||||||
const nodes = Array.from({ length: 3 }, (_, i) => createMockNode(`node${i}`, 'function'))
|
|
||||||
for (let i = 0; i < nodes.length - 1; i++) {
|
|
||||||
nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
|
|
||||||
}
|
|
||||||
|
|
||||||
const dag = createMockDAG(nodes)
|
|
||||||
const context = createMockContext({ abortSignal: abortController.signal })
|
|
||||||
const edgeManager = createMockEdgeManager((node) => {
|
|
||||||
if (node.id === 'node0') {
|
|
||||||
abortController.abort()
|
|
||||||
return ['node1']
|
|
||||||
}
|
|
||||||
return node.id === 'node1' ? ['node2'] : []
|
|
||||||
})
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
const result = await engine.run('node0')
|
|
||||||
|
|
||||||
expect(result.status).toBe('cancelled')
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should cache Redis cancellation result', async () => {
|
|
||||||
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
|
|
||||||
;(isExecutionCancelled as Mock).mockResolvedValue(true)
|
|
||||||
|
|
||||||
const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
|
|
||||||
const dag = createMockDAG(nodes)
|
|
||||||
const context = createMockContext()
|
|
||||||
const edgeManager = createMockEdgeManager()
|
|
||||||
const nodeOrchestrator = createMockNodeOrchestrator()
|
|
||||||
|
|
||||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
|
||||||
await engine.run('node0')
|
|
||||||
|
|
||||||
expect((isExecutionCancelled as Mock).mock.calls.length).toBeLessThanOrEqual(3)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
||||||
@@ -28,8 +28,6 @@ export class ExecutionEngine {
|
|||||||
private lastCancellationCheck = 0
|
private lastCancellationCheck = 0
|
||||||
private readonly useRedisCancellation: boolean
|
private readonly useRedisCancellation: boolean
|
||||||
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
|
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
|
||||||
private abortPromise: Promise<void> | null = null
|
|
||||||
private abortResolve: (() => void) | null = null
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private context: ExecutionContext,
|
private context: ExecutionContext,
|
||||||
@@ -39,34 +37,6 @@ export class ExecutionEngine {
|
|||||||
) {
|
) {
|
||||||
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
|
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
|
||||||
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
|
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
|
||||||
this.initializeAbortHandler()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets up a single abort promise that can be reused throughout execution.
|
|
||||||
* This avoids creating multiple event listeners and potential memory leaks.
|
|
||||||
*/
|
|
||||||
private initializeAbortHandler(): void {
|
|
||||||
if (!this.context.abortSignal) return
|
|
||||||
|
|
||||||
if (this.context.abortSignal.aborted) {
|
|
||||||
this.cancelledFlag = true
|
|
||||||
this.abortPromise = Promise.resolve()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
this.abortPromise = new Promise<void>((resolve) => {
|
|
||||||
this.abortResolve = resolve
|
|
||||||
})
|
|
||||||
|
|
||||||
this.context.abortSignal.addEventListener(
|
|
||||||
'abort',
|
|
||||||
() => {
|
|
||||||
this.cancelledFlag = true
|
|
||||||
this.abortResolve?.()
|
|
||||||
},
|
|
||||||
{ once: true }
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async checkCancellation(): Promise<boolean> {
|
private async checkCancellation(): Promise<boolean> {
|
||||||
@@ -103,15 +73,12 @@ export class ExecutionEngine {
|
|||||||
this.initializeQueue(triggerBlockId)
|
this.initializeQueue(triggerBlockId)
|
||||||
|
|
||||||
while (this.hasWork()) {
|
while (this.hasWork()) {
|
||||||
if (await this.checkCancellation()) {
|
if ((await this.checkCancellation()) && this.executing.size === 0) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
await this.processQueue()
|
await this.processQueue()
|
||||||
}
|
}
|
||||||
|
await this.waitForAllExecutions()
|
||||||
if (!this.cancelledFlag) {
|
|
||||||
await this.waitForAllExecutions()
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.pausedBlocks.size > 0) {
|
if (this.pausedBlocks.size > 0) {
|
||||||
return this.buildPausedResult(startTime)
|
return this.buildPausedResult(startTime)
|
||||||
@@ -197,7 +164,11 @@ export class ExecutionEngine {
|
|||||||
|
|
||||||
private trackExecution(promise: Promise<void>): void {
|
private trackExecution(promise: Promise<void>): void {
|
||||||
this.executing.add(promise)
|
this.executing.add(promise)
|
||||||
promise.catch(() => {})
|
// Attach error handler to prevent unhandled rejection warnings
|
||||||
|
// The actual error handling happens in waitForAllExecutions/waitForAnyExecution
|
||||||
|
promise.catch(() => {
|
||||||
|
// Error will be properly handled by Promise.all/Promise.race in wait methods
|
||||||
|
})
|
||||||
promise.finally(() => {
|
promise.finally(() => {
|
||||||
this.executing.delete(promise)
|
this.executing.delete(promise)
|
||||||
})
|
})
|
||||||
@@ -205,30 +176,12 @@ export class ExecutionEngine {
|
|||||||
|
|
||||||
private async waitForAnyExecution(): Promise<void> {
|
private async waitForAnyExecution(): Promise<void> {
|
||||||
if (this.executing.size > 0) {
|
if (this.executing.size > 0) {
|
||||||
const abortPromise = this.getAbortPromise()
|
await Promise.race(this.executing)
|
||||||
if (abortPromise) {
|
|
||||||
await Promise.race([...this.executing, abortPromise])
|
|
||||||
} else {
|
|
||||||
await Promise.race(this.executing)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async waitForAllExecutions(): Promise<void> {
|
private async waitForAllExecutions(): Promise<void> {
|
||||||
const abortPromise = this.getAbortPromise()
|
await Promise.all(Array.from(this.executing))
|
||||||
if (abortPromise) {
|
|
||||||
await Promise.race([Promise.all(this.executing), abortPromise])
|
|
||||||
} else {
|
|
||||||
await Promise.all(this.executing)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the cached abort promise. This is safe to call multiple times
|
|
||||||
* as it reuses the same promise instance created during initialization.
|
|
||||||
*/
|
|
||||||
private getAbortPromise(): Promise<void> | null {
|
|
||||||
return this.abortPromise
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async withQueueLock<T>(fn: () => Promise<T> | T): Promise<T> {
|
private async withQueueLock<T>(fn: () => Promise<T> | T): Promise<T> {
|
||||||
@@ -324,7 +277,7 @@ export class ExecutionEngine {
|
|||||||
this.trackExecution(promise)
|
this.trackExecution(promise)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.executing.size > 0 && !this.cancelledFlag) {
|
if (this.executing.size > 0) {
|
||||||
await this.waitForAnyExecution()
|
await this.waitForAnyExecution()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -383,6 +336,7 @@ export class ExecutionEngine {
|
|||||||
|
|
||||||
this.addMultipleToQueue(readyNodes)
|
this.addMultipleToQueue(readyNodes)
|
||||||
|
|
||||||
|
// Check for dynamically added nodes (e.g., from parallel expansion)
|
||||||
if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) {
|
if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) {
|
||||||
const dynamicNodes = this.context.pendingDynamicNodes
|
const dynamicNodes = this.context.pendingDynamicNodes
|
||||||
this.context.pendingDynamicNodes = []
|
this.context.pendingDynamicNodes = []
|
||||||
|
|||||||
Reference in New Issue
Block a user