fix(triggers): incoming edges should be filtered from execution and UI graph (#1777)

This commit is contained in:
Vikhyath Mondreti
2025-10-30 20:20:32 -07:00
committed by GitHub
parent eac358bc7c
commit 0b16fa4dd0
6 changed files with 88 additions and 16 deletions

View File

@@ -22,6 +22,7 @@ import {
} from '@/lib/workflows/utils'
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
import { filterEdgesFromTriggerBlocks } from '@/app/workspace/[workspaceId]/w/[workflowId]/lib/workflow-execution-utils'
import { Executor } from '@/executor'
import type { ExecutionResult } from '@/executor/types'
import { Serializer } from '@/serializer'
@@ -292,10 +293,13 @@ export async function executeWorkflow(
logger.debug(`[${requestId}] No workflow variables found for: ${workflowId}`)
}
// Filter out edges between trigger blocks - triggers are independent entry points
const filteredEdges = filterEdgesFromTriggerBlocks(mergedStates, edges)
logger.debug(`[${requestId}] Serializing workflow: ${workflowId}`)
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates,
edges,
filteredEdges,
loops,
parallels,
true
@@ -335,7 +339,7 @@ export async function executeWorkflow(
if (streamConfig?.enabled) {
contextExtensions.stream = true
contextExtensions.selectedOutputs = streamConfig.selectedOutputs || []
contextExtensions.edges = edges.map((e: any) => ({
contextExtensions.edges = filteredEdges.map((e: any) => ({
source: e.source,
target: e.target,
}))

View File

@@ -2,6 +2,7 @@ import { shallow } from 'zustand/shallow'
import { BlockPathCalculator } from '@/lib/block-path-calculator'
import { createLogger } from '@/lib/logs/console/logger'
import { getBlockOutputs } from '@/lib/workflows/block-outputs'
import { TriggerUtils } from '@/lib/workflows/triggers'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
@@ -110,8 +111,34 @@ export function useBlockConnections(blockId: string) {
return merged
}
// Find all blocks along paths leading to this block
const allPathNodeIds = BlockPathCalculator.findAllPathNodes(edges, blockId)
// Filter out edges between trigger blocks - triggers are independent entry points
// This ensures UI tags only show blocks that are actually connected in execution
const filteredEdges = edges.filter((edge) => {
const sourceBlock = blocks[edge.source]
const targetBlock = blocks[edge.target]
// If either block not found, keep the edge (might be in a different state structure)
if (!sourceBlock || !targetBlock) {
return true
}
const sourceIsTrigger = TriggerUtils.isTriggerBlock({
type: sourceBlock.type,
triggerMode: sourceBlock.triggerMode,
})
const targetIsTrigger = TriggerUtils.isTriggerBlock({
type: targetBlock.type,
triggerMode: targetBlock.triggerMode,
})
// Filter out edges where source is trigger AND target is trigger
// Keep edges from triggers to regular blocks
return !(sourceIsTrigger && targetIsTrigger)
})
// Find all blocks along paths leading to this block (using filtered edges)
const allPathNodeIds = BlockPathCalculator.findAllPathNodes(filteredEdges, blockId)
// Map each path node to a ConnectedBlock structure
const allPathConnections = allPathNodeIds
@@ -161,8 +188,8 @@ export function useBlockConnections(blockId: string) {
})
.filter(Boolean) as ConnectedBlock[]
// Keep the original incoming connections for compatibility
const directIncomingConnections = edges
// Keep the original incoming connections for compatibility (using filtered edges)
const directIncomingConnections = filteredEdges
.filter((edge) => edge.target === blockId)
.map((edge) => {
const sourceBlock = blocks[edge.source]

View File

@@ -16,6 +16,7 @@ import { useEnvironmentStore } from '@/stores/settings/environment/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
import { filterEdgesFromTriggerBlocks } from '../lib/workflow-execution-utils'
import { useCurrentWorkflow } from './use-current-workflow'
const logger = createLogger('useWorkflowExecution')
@@ -773,8 +774,8 @@ export function useWorkflowExecution() {
{} as Record<string, any>
)
// Keep edges intact to allow execution starting from trigger blocks
const filteredEdges = workflowEdges
// Filter out edges between trigger blocks - triggers are independent entry points
const filteredEdges = filterEdgesFromTriggerBlocks(filteredStates, workflowEdges)
// Derive subflows from the current filtered graph to avoid stale state
const runtimeLoops = generateLoopBlocks(filteredStates)

View File

@@ -3,9 +3,11 @@
* This allows workflow execution with proper logging from both React hooks and tools
*/
import type { Edge } from 'reactflow'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { TriggerUtils } from '@/lib/workflows/triggers'
import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor'
import type { ExecutionResult, StreamingExecution } from '@/executor/types'
@@ -81,6 +83,37 @@ export function getWorkflowExecutionContext(): WorkflowExecutionContext {
}
}
/**
* Filter out edges between trigger blocks.
* Trigger blocks are independent entry points and should not have edges to other trigger blocks.
* However, trigger blocks can have edges to regular blocks.
*/
export function filterEdgesFromTriggerBlocks(blocks: Record<string, any>, edges: Edge[]): Edge[] {
return edges.filter((edge) => {
const sourceBlock = blocks[edge.source]
const targetBlock = blocks[edge.target]
// If either block not found, keep the edge (might be in a different state structure)
if (!sourceBlock || !targetBlock) {
return true
}
const sourceIsTrigger = TriggerUtils.isTriggerBlock({
type: sourceBlock.type,
triggerMode: sourceBlock.triggerMode,
})
const targetIsTrigger = TriggerUtils.isTriggerBlock({
type: targetBlock.type,
triggerMode: targetBlock.triggerMode,
})
// Filter out edges where source is trigger AND target is trigger
// Keep edges from triggers to regular blocks
return !(sourceIsTrigger && targetIsTrigger)
})
}
/**
* Execute a workflow with proper state management and logging
* This is the core execution logic extracted from useWorkflowExecution
@@ -168,9 +201,9 @@ export async function executeWorkflowWithLogging(
{} as Record<string, any>
)
// Don't filter edges - let all connections remain intact
// The executor's routing system will handle execution paths properly
const filteredEdges = workflowEdges
// Filter out edges from trigger blocks - triggers are independent entry points
// and should not have edges to other trigger blocks
const filteredEdges = filterEdgesFromTriggerBlocks(validBlocks, workflowEdges)
// Create serialized workflow with filtered blocks and edges
const workflow = new Serializer().serializeWorkflow(

View File

@@ -32,6 +32,7 @@ import { WorkflowBlock } from '@/app/workspace/[workspaceId]/w/[workflowId]/comp
import { WorkflowEdge } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-edge/workflow-edge'
import { CollaboratorCursorLayer } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-presence/collaborator-cursor-layer'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import { filterEdgesFromTriggerBlocks } from '@/app/workspace/[workspaceId]/w/[workflowId]/lib/workflow-execution-utils'
import {
getNodeAbsolutePosition,
getNodeDepth,
@@ -151,8 +152,10 @@ const WorkflowContent = React.memo(() => {
// Get diff analysis for edge reconstruction
const { diffAnalysis, isShowingDiff, isDiffReady } = useWorkflowDiffStore()
// Reconstruct deleted edges when viewing original workflow
// Reconstruct deleted edges when viewing original workflow and filter trigger edges
const edgesForDisplay = useMemo(() => {
let edgesToFilter = edges
// If we're not in diff mode and we have diff analysis with deleted edges,
// we need to reconstruct those deleted edges and add them to the display
// Only do this if diff is ready to prevent race conditions
@@ -214,11 +217,11 @@ const WorkflowContent = React.memo(() => {
})
// Combine existing edges with reconstructed deleted edges
return [...edges, ...reconstructedEdges]
edgesToFilter = [...edges, ...reconstructedEdges]
}
// Otherwise, just use the edges as-is
return edges
// Filter out edges between trigger blocks for consistent UI and execution behavior
return filterEdgesFromTriggerBlocks(blocks, edgesToFilter)
}, [edges, isShowingDiff, isDiffReady, diffAnalysis, blocks])
// User permissions - get current user's specific permissions from context

View File

@@ -11,6 +11,7 @@ import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { filterEdgesFromTriggerBlocks } from '@/app/workspace/[workspaceId]/w/[workflowId]/lib/workflow-execution-utils'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -107,11 +108,14 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
variables: decryptedEnvVars,
})
// Filter out edges between trigger blocks - triggers are independent entry points
const filteredEdges = filterEdgesFromTriggerBlocks(mergedStates, edges)
// Create serialized workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
mergedStates,
edges,
filteredEdges,
loops || {},
parallels || {},
true // Enable validation during execution