Files
sim/apps/sim/executor/dag/construction/paths.ts
Siddharth Ganesan 655fe4f3b7 feat(executor): run from/until block (#3029)
* Run from block

* Fixes

* Fix

* Fix

* Minor improvements

* Fix

* Fix trace spans

* Fix loop l ogs

* Change ordering

* Run u ntil block

* Lint

* Clean up

* Fix

* Allow run from block for triggers

* Consolidation

* Fix lint

* Fix

* Fix mock payload

* Fix

* Fix trigger clear snapshot

* Fix loops and parallels

* Fix

* Cleanup

* Fix test

* Fix bugs

* Catch error

* Fix

* Fix

* I think it works??

* Fix

* Fix

* Add tests

* Fix lint

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
2026-01-28 12:53:23 -08:00

185 lines
5.3 KiB
TypeScript

import { createLogger } from '@sim/logger'
import { isMetadataOnlyBlockType, isTriggerBlockType } from '@/executor/constants'
import { extractBaseBlockId } from '@/executor/utils/subflow-utils'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
const logger = createLogger('PathConstructor')
export class PathConstructor {
execute(
workflow: SerializedWorkflow,
triggerBlockId?: string,
includeAllBlocks?: boolean
): Set<string> {
// For run-from-block mode, include all enabled blocks regardless of trigger reachability
if (includeAllBlocks) {
return this.getAllEnabledBlocks(workflow)
}
const resolvedTriggerId = this.findTriggerBlock(workflow, triggerBlockId)
if (!resolvedTriggerId) {
logger.warn('No trigger block found, including all enabled blocks as fallback')
return this.getAllEnabledBlocks(workflow)
}
const adjacency = this.buildAdjacencyMap(workflow)
const reachable = this.performBFS(resolvedTriggerId, adjacency)
return reachable
}
private findTriggerBlock(
workflow: SerializedWorkflow,
triggerBlockId?: string
): string | undefined {
if (triggerBlockId) {
const block = workflow.blocks.find((b) => b.id === triggerBlockId)
if (block) {
if (!block.enabled) {
logger.error('Provided triggerBlockId is disabled, finding alternative', {
triggerBlockId,
blockEnabled: block.enabled,
})
// Try to find an alternative enabled trigger instead of failing
const alternativeTrigger = this.findExplicitTrigger(workflow)
if (alternativeTrigger) {
logger.info('Using alternative enabled trigger', {
disabledTriggerId: triggerBlockId,
alternativeTriggerId: alternativeTrigger,
})
return alternativeTrigger
}
throw new Error(
`Trigger block ${triggerBlockId} is disabled and no alternative enabled trigger found`
)
}
return triggerBlockId
}
const fallbackTriggerId = this.resolveResumeTriggerFallback(triggerBlockId, workflow)
if (fallbackTriggerId) {
return fallbackTriggerId
}
logger.error('Provided triggerBlockId not found in workflow', {
triggerBlockId,
availableBlocks: workflow.blocks.map((b) => ({ id: b.id, type: b.metadata?.id })),
})
throw new Error(`Trigger block not found: ${triggerBlockId}`)
}
const explicitTrigger = this.findExplicitTrigger(workflow)
if (explicitTrigger) {
return explicitTrigger
}
const rootBlock = this.findRootBlock(workflow)
if (rootBlock) {
return rootBlock
}
return undefined
}
private findExplicitTrigger(workflow: SerializedWorkflow): string | undefined {
for (const block of workflow.blocks) {
if (block.enabled && this.isTriggerBlock(block)) {
return block.id
}
}
return undefined
}
private findRootBlock(workflow: SerializedWorkflow): string | undefined {
const hasIncoming = new Set(workflow.connections.map((c) => c.target))
for (const block of workflow.blocks) {
if (
!hasIncoming.has(block.id) &&
block.enabled &&
!isMetadataOnlyBlockType(block.metadata?.id)
) {
return block.id
}
}
return undefined
}
private isTriggerBlock(block: SerializedBlock): boolean {
return isTriggerBlockType(block.metadata?.id)
}
private getAllEnabledBlocks(workflow: SerializedWorkflow): Set<string> {
return new Set(workflow.blocks.filter((b) => b.enabled).map((b) => b.id))
}
private buildAdjacencyMap(workflow: SerializedWorkflow): Map<string, string[]> {
const adjacency = new Map<string, string[]>()
const enabledBlocks = new Set(workflow.blocks.filter((b) => b.enabled).map((b) => b.id))
for (const connection of workflow.connections) {
if (!enabledBlocks.has(connection.source) || !enabledBlocks.has(connection.target)) {
continue
}
const neighbors = adjacency.get(connection.source) ?? []
neighbors.push(connection.target)
adjacency.set(connection.source, neighbors)
}
return adjacency
}
private performBFS(triggerBlockId: string, adjacency: Map<string, string[]>): Set<string> {
const reachable = new Set<string>([triggerBlockId])
const queue = [triggerBlockId]
while (queue.length > 0) {
const currentBlockId = queue.shift()
if (!currentBlockId) break
const neighbors = adjacency.get(currentBlockId) ?? []
for (const neighborId of neighbors) {
if (!reachable.has(neighborId)) {
reachable.add(neighborId)
queue.push(neighborId)
}
}
}
return reachable
}
private resolveResumeTriggerFallback(
triggerBlockId: string,
workflow: SerializedWorkflow
): string | undefined {
if (!triggerBlockId.endsWith('__trigger')) {
return undefined
}
const baseId = triggerBlockId.replace(/__trigger$/, '')
const normalizedBaseId = extractBaseBlockId(baseId)
const candidates = baseId === normalizedBaseId ? [baseId] : [baseId, normalizedBaseId]
for (const candidate of candidates) {
const block = workflow.blocks.find((b) => b.id === candidate)
if (block) {
return candidate
}
}
return undefined
}
}