mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-28 00:08:21 -05:00
I think it works??
This commit is contained in:
@@ -114,9 +114,17 @@ export const ActionBar = memo(
|
||||
const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null
|
||||
const incomingEdges = edges.filter((edge) => edge.target === blockId)
|
||||
const isTriggerBlock = incomingEdges.length === 0
|
||||
|
||||
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
|
||||
const isSourceSatisfied = (sourceId: string) => {
|
||||
if (snapshot?.executedBlocks.includes(sourceId)) return true
|
||||
// Check if source is a trigger (has no incoming edges itself)
|
||||
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
|
||||
return sourceIncomingEdges.length === 0
|
||||
}
|
||||
|
||||
const dependenciesSatisfied =
|
||||
isTriggerBlock ||
|
||||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
|
||||
isTriggerBlock || incomingEdges.every((edge) => isSourceSatisfied(edge.source))
|
||||
const canRunFromBlock =
|
||||
dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting
|
||||
|
||||
@@ -149,7 +157,7 @@ export const ActionBar = memo(
|
||||
'dark:border-transparent dark:bg-[var(--surface-4)]'
|
||||
)}
|
||||
>
|
||||
{!isNoteBlock && (
|
||||
{!isNoteBlock && !isInsideSubflow && (
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
@@ -170,7 +178,6 @@ export const ActionBar = memo(
|
||||
{(() => {
|
||||
if (disabled) return getTooltipMessage('Run from block')
|
||||
if (isExecuting) return 'Execution in progress'
|
||||
if (isInsideSubflow) return 'Cannot run from inside subflow'
|
||||
if (!dependenciesSatisfied) return 'Run upstream blocks first'
|
||||
return 'Run from block'
|
||||
})()}
|
||||
|
||||
@@ -48,6 +48,8 @@ export interface BlockMenuProps {
|
||||
canRunFromBlock?: boolean
|
||||
disableEdit?: boolean
|
||||
isExecuting?: boolean
|
||||
/** Whether the selected block is a trigger (has no incoming edges) */
|
||||
isPositionalTrigger?: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -77,6 +79,7 @@ export function BlockMenu({
|
||||
canRunFromBlock = false,
|
||||
disableEdit = false,
|
||||
isExecuting = false,
|
||||
isPositionalTrigger = false,
|
||||
}: BlockMenuProps) {
|
||||
const isSingleBlock = selectedBlocks.length === 1
|
||||
|
||||
@@ -87,7 +90,9 @@ export function BlockMenu({
|
||||
(b) =>
|
||||
TriggerUtils.requiresSingleInstance(b.type) || TriggerUtils.isSingleInstanceBlockType(b.type)
|
||||
)
|
||||
const hasTriggerBlock = selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b))
|
||||
// A block is a trigger if it's explicitly a trigger type OR has no incoming edges (positional trigger)
|
||||
const hasTriggerBlock =
|
||||
selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b)) || isPositionalTrigger
|
||||
const allNoteBlocks = selectedBlocks.every((b) => b.type === 'note')
|
||||
const isSubflow =
|
||||
isSingleBlock && (selectedBlocks[0]?.type === 'loop' || selectedBlocks[0]?.type === 'parallel')
|
||||
|
||||
@@ -1072,6 +1072,11 @@ export function useWorkflowExecution() {
|
||||
logs: accumulatedBlockLogs,
|
||||
}
|
||||
|
||||
// Add trigger block to executed blocks so downstream blocks can use run-from-block
|
||||
if (data.success && startBlockId) {
|
||||
executedBlockIds.add(startBlockId)
|
||||
}
|
||||
|
||||
if (data.success && activeWorkflowId) {
|
||||
if (stopAfterBlockId) {
|
||||
const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId)
|
||||
@@ -1443,14 +1448,25 @@ export function useWorkflowExecution() {
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
|
||||
const isTriggerBlock = incomingEdges.length === 0
|
||||
|
||||
if (!snapshot && !isTriggerBlock) {
|
||||
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
|
||||
const isSourceSatisfied = (sourceId: string) => {
|
||||
if (snapshot?.executedBlocks.includes(sourceId)) return true
|
||||
// Check if source is a trigger (has no incoming edges itself)
|
||||
const sourceIncomingEdges = workflowEdges.filter((edge) => edge.target === sourceId)
|
||||
return sourceIncomingEdges.length === 0
|
||||
}
|
||||
|
||||
if (
|
||||
!snapshot &&
|
||||
!isTriggerBlock &&
|
||||
!incomingEdges.every((edge) => isSourceSatisfied(edge.source))
|
||||
) {
|
||||
logger.error('No execution snapshot available for run-from-block', { workflowId, blockId })
|
||||
return
|
||||
}
|
||||
|
||||
const dependenciesSatisfied =
|
||||
isTriggerBlock ||
|
||||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
|
||||
isTriggerBlock || incomingEdges.every((edge) => isSourceSatisfied(edge.source))
|
||||
|
||||
if (!dependenciesSatisfied) {
|
||||
logger.error('Upstream dependencies not satisfied for run-from-block', {
|
||||
@@ -1637,6 +1653,9 @@ export function useWorkflowExecution() {
|
||||
|
||||
onExecutionCompleted: (data) => {
|
||||
if (data.success) {
|
||||
// Add the start block (trigger) to executed blocks
|
||||
executedBlockIds.add(blockId)
|
||||
|
||||
const mergedBlockStates: Record<string, BlockState> = {
|
||||
...effectiveSnapshot.blockStates,
|
||||
}
|
||||
|
||||
@@ -1128,23 +1128,17 @@ const WorkflowContent = React.memo(() => {
|
||||
const snapshot = getLastExecutionSnapshot(workflowIdParam)
|
||||
const incomingEdges = edges.filter((edge) => edge.target === block.id)
|
||||
const isTriggerBlock = incomingEdges.length === 0
|
||||
const isSubflow = block.type === 'loop' || block.type === 'parallel'
|
||||
|
||||
// For subflows, check if the sentinel-end was executed (meaning the subflow completed at least once)
|
||||
// Sentinel IDs follow the pattern: loop-{id}-sentinel-end or parallel-{id}-sentinel-end
|
||||
const subflowWasExecuted =
|
||||
isSubflow &&
|
||||
snapshot &&
|
||||
snapshot.executedBlocks.some(
|
||||
(executedId) =>
|
||||
executedId === `loop-${block.id}-sentinel-end` ||
|
||||
executedId === `parallel-${block.id}-sentinel-end`
|
||||
)
|
||||
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
|
||||
const isSourceSatisfied = (sourceId: string) => {
|
||||
if (snapshot?.executedBlocks.includes(sourceId)) return true
|
||||
// Check if source is a trigger (has no incoming edges itself)
|
||||
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
|
||||
return sourceIncomingEdges.length === 0
|
||||
}
|
||||
|
||||
const dependenciesSatisfied =
|
||||
isTriggerBlock ||
|
||||
subflowWasExecuted ||
|
||||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
|
||||
isTriggerBlock || incomingEdges.every((edge) => isSourceSatisfied(edge.source))
|
||||
const isNoteBlock = block.type === 'note'
|
||||
const isInsideSubflow =
|
||||
block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel')
|
||||
@@ -3482,6 +3476,10 @@ const WorkflowContent = React.memo(() => {
|
||||
canRunFromBlock={runFromBlockState.canRun}
|
||||
disableEdit={!effectivePermissions.canEdit}
|
||||
isExecuting={isExecuting}
|
||||
isPositionalTrigger={
|
||||
contextMenuBlocks.length === 1 &&
|
||||
edges.filter((e) => e.target === contextMenuBlocks[0]?.id).length === 0
|
||||
}
|
||||
/>
|
||||
|
||||
<CanvasMenu
|
||||
|
||||
@@ -33,6 +33,15 @@ export interface DAG {
|
||||
parallelConfigs: Map<string, SerializedParallel>
|
||||
}
|
||||
|
||||
export interface DAGBuildOptions {
|
||||
/** Trigger block ID to start path construction from */
|
||||
triggerBlockId?: string
|
||||
/** Saved incoming edges from snapshot for resumption */
|
||||
savedIncomingEdges?: Record<string, string[]>
|
||||
/** Include all enabled blocks instead of only those reachable from trigger */
|
||||
includeAllBlocks?: boolean
|
||||
}
|
||||
|
||||
export class DAGBuilder {
|
||||
private pathConstructor = new PathConstructor()
|
||||
private loopConstructor = new LoopConstructor()
|
||||
@@ -40,11 +49,9 @@ export class DAGBuilder {
|
||||
private nodeConstructor = new NodeConstructor()
|
||||
private edgeConstructor = new EdgeConstructor()
|
||||
|
||||
build(
|
||||
workflow: SerializedWorkflow,
|
||||
triggerBlockId?: string,
|
||||
savedIncomingEdges?: Record<string, string[]>
|
||||
): DAG {
|
||||
build(workflow: SerializedWorkflow, options: DAGBuildOptions = {}): DAG {
|
||||
const { triggerBlockId, savedIncomingEdges, includeAllBlocks } = options
|
||||
|
||||
const dag: DAG = {
|
||||
nodes: new Map(),
|
||||
loopConfigs: new Map(),
|
||||
@@ -53,7 +60,7 @@ export class DAGBuilder {
|
||||
|
||||
this.initializeConfigs(workflow, dag)
|
||||
|
||||
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId)
|
||||
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId, includeAllBlocks)
|
||||
|
||||
this.loopConstructor.execute(dag, reachableBlocks)
|
||||
this.parallelConstructor.execute(dag, reachableBlocks)
|
||||
|
||||
@@ -6,7 +6,16 @@ import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
|
||||
const logger = createLogger('PathConstructor')
|
||||
|
||||
export class PathConstructor {
|
||||
execute(workflow: SerializedWorkflow, triggerBlockId?: string): Set<string> {
|
||||
execute(
|
||||
workflow: SerializedWorkflow,
|
||||
triggerBlockId?: string,
|
||||
includeAllBlocks?: boolean
|
||||
): Set<string> {
|
||||
// For run-from-block mode, include all enabled blocks regardless of trigger reachability
|
||||
if (includeAllBlocks) {
|
||||
return this.getAllEnabledBlocks(workflow)
|
||||
}
|
||||
|
||||
const resolvedTriggerId = this.findTriggerBlock(workflow, triggerBlockId)
|
||||
|
||||
if (!resolvedTriggerId) {
|
||||
|
||||
@@ -62,7 +62,10 @@ export class DAGExecutor {
|
||||
|
||||
async execute(workflowId: string, triggerBlockId?: string): Promise<ExecutionResult> {
|
||||
const savedIncomingEdges = this.contextExtensions.dagIncomingEdges
|
||||
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
|
||||
const dag = this.dagBuilder.build(this.workflow, {
|
||||
triggerBlockId,
|
||||
savedIncomingEdges,
|
||||
})
|
||||
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
|
||||
|
||||
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
|
||||
@@ -111,8 +114,9 @@ export class DAGExecutor {
|
||||
startBlockId: string,
|
||||
sourceSnapshot: SerializableExecutionState
|
||||
): Promise<ExecutionResult> {
|
||||
// Build full DAG to compute upstream set for snapshot filtering
|
||||
const dag = this.dagBuilder.build(this.workflow)
|
||||
// Build full DAG with all blocks to compute upstream set for snapshot filtering
|
||||
// includeAllBlocks is needed because the startBlockId might be a trigger not reachable from the main trigger
|
||||
const dag = this.dagBuilder.build(this.workflow, { includeAllBlocks: true })
|
||||
|
||||
const executedBlocks = new Set(sourceSnapshot.executedBlocks)
|
||||
const validation = validateRunFromBlock(startBlockId, dag, executedBlocks)
|
||||
|
||||
@@ -176,6 +176,10 @@ export function validateRunFromBlock(
|
||||
// Skip sentinel nodes - they're internal and not in executedBlocks
|
||||
if (sourceNode?.metadata.isSentinel) continue
|
||||
|
||||
// Skip trigger nodes - they're entry points and don't need prior execution
|
||||
// A trigger node has no incoming edges
|
||||
if (sourceNode && sourceNode.incomingEdges.size === 0) continue
|
||||
|
||||
if (!executedBlocks.has(sourceId)) {
|
||||
return {
|
||||
valid: false,
|
||||
|
||||
Reference in New Issue
Block a user