fix(edge-validation): race condition on collaborative add

This commit is contained in:
Vikhyath Mondreti
2026-01-24 12:59:37 -08:00
parent 3bbf7f5d1d
commit bff4476cbe
4 changed files with 74 additions and 37 deletions

View File

@@ -24,7 +24,7 @@ import { useUndoRedoStore } from '@/stores/undo-redo'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { filterNewEdges, mergeSubblockState } from '@/stores/workflows/utils'
import { filterNewEdges, filterValidEdges, mergeSubblockState } from '@/stores/workflows/utils'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import type { BlockState, Loop, Parallel, Position } from '@/stores/workflows/workflow/types'
@@ -1004,7 +1004,11 @@ export function useCollaborativeWorkflow() {
if (edges.length === 0) return false
const newEdges = filterNewEdges(edges, useWorkflowStore.getState().edges)
// Filter out invalid edges (e.g., edges targeting trigger blocks) and duplicates
const blocks = useWorkflowStore.getState().blocks
const currentEdges = useWorkflowStore.getState().edges
const validEdges = filterValidEdges(edges, blocks)
const newEdges = filterNewEdges(validEdges, currentEdges)
if (newEdges.length === 0) return false
const operationId = crypto.randomUUID()
@@ -1020,7 +1024,7 @@ export function useCollaborativeWorkflow() {
userId: session?.user?.id || 'unknown',
})
useWorkflowStore.getState().batchAddEdges(newEdges)
useWorkflowStore.getState().batchAddEdges(newEdges, { skipValidation: true })
if (!options?.skipUndoRedo) {
newEdges.forEach((edge) => undoRedo.recordAddEdge(edge.id))
@@ -1484,9 +1488,23 @@ export function useCollaborativeWorkflow() {
if (blocks.length === 0) return false
// Filter out invalid edges (e.g., edges targeting trigger blocks)
// Combine existing blocks with new blocks for validation
const existingBlocks = useWorkflowStore.getState().blocks
const newBlocksMap = blocks.reduce(
(acc, block) => {
acc[block.id] = block
return acc
},
{} as Record<string, BlockState>
)
const allBlocks = { ...existingBlocks, ...newBlocksMap }
const validEdges = filterValidEdges(edges, allBlocks)
logger.info('Batch adding blocks collaboratively', {
blockCount: blocks.length,
edgeCount: edges.length,
edgeCount: validEdges.length,
filteredEdges: edges.length - validEdges.length,
})
const operationId = crypto.randomUUID()
@@ -1496,16 +1514,18 @@ export function useCollaborativeWorkflow() {
operation: {
operation: BLOCKS_OPERATIONS.BATCH_ADD_BLOCKS,
target: OPERATION_TARGETS.BLOCKS,
payload: { blocks, edges, loops, parallels, subBlockValues },
payload: { blocks, edges: validEdges, loops, parallels, subBlockValues },
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})
useWorkflowStore.getState().batchAddBlocks(blocks, edges, subBlockValues)
useWorkflowStore.getState().batchAddBlocks(blocks, validEdges, subBlockValues, {
skipEdgeValidation: true,
})
if (!options?.skipUndoRedo) {
undoRedo.recordBatchAddBlocks(blocks, edges, subBlockValues)
undoRedo.recordBatchAddBlocks(blocks, validEdges, subBlockValues)
}
return true

View File

@@ -2,8 +2,9 @@ import type { Edge } from 'reactflow'
import { v4 as uuidv4 } from 'uuid'
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { mergeSubBlockValues, mergeSubblockStateWithValues } from '@/lib/workflows/subblocks'
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
import { getBlock } from '@/blocks'
import { normalizeName } from '@/executor/constants'
import { isAnnotationOnlyBlock, normalizeName } from '@/executor/constants'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import type {
BlockState,
@@ -17,6 +18,32 @@ import { TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants'
const WEBHOOK_SUBBLOCK_FIELDS = ['webhookId', 'triggerPath']
/**
* Checks if an edge is valid (source and target exist, not annotation-only, target is not a trigger)
*/
function isValidEdge(
edge: Edge,
blocks: Record<string, { type: string; triggerMode?: boolean }>
): boolean {
const sourceBlock = blocks[edge.source]
const targetBlock = blocks[edge.target]
if (!sourceBlock || !targetBlock) return false
if (isAnnotationOnlyBlock(sourceBlock.type)) return false
if (isAnnotationOnlyBlock(targetBlock.type)) return false
if (TriggerUtils.isTriggerBlock(targetBlock)) return false
return true
}
/**
* Filters edges to only include valid ones (target exists and is not a trigger block)
*/
export function filterValidEdges(
edges: Edge[],
blocks: Record<string, { type: string; triggerMode?: boolean }>
): Edge[] {
return edges.filter((edge) => isValidEdge(edge, blocks))
}
export function filterNewEdges(edgesToAdd: Edge[], currentEdges: Edge[]): Edge[] {
return edgesToAdd.filter((edge) => {
if (edge.source === edge.target) return false

View File

@@ -4,13 +4,17 @@ import { create } from 'zustand'
import { devtools } from 'zustand/middleware'
import { DEFAULT_DUPLICATE_OFFSET } from '@/lib/workflows/autolayout/constants'
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
import { getBlock } from '@/blocks'
import type { SubBlockConfig } from '@/blocks/types'
import { isAnnotationOnlyBlock, normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { filterNewEdges, getUniqueBlockName, mergeSubblockState } from '@/stores/workflows/utils'
import {
filterNewEdges,
filterValidEdges,
getUniqueBlockName,
mergeSubblockState,
} from '@/stores/workflows/utils'
import type {
Position,
SubBlockState,
@@ -91,26 +95,6 @@ function resolveInitialSubblockValue(config: SubBlockConfig): unknown {
return null
}
function isValidEdge(
edge: Edge,
blocks: Record<string, { type: string; triggerMode?: boolean }>
): boolean {
const sourceBlock = blocks[edge.source]
const targetBlock = blocks[edge.target]
if (!sourceBlock || !targetBlock) return false
if (isAnnotationOnlyBlock(sourceBlock.type)) return false
if (isAnnotationOnlyBlock(targetBlock.type)) return false
if (TriggerUtils.isTriggerBlock(targetBlock)) return false
return true
}
function filterValidEdges(
edges: Edge[],
blocks: Record<string, { type: string; triggerMode?: boolean }>
): Edge[] {
return edges.filter((edge) => isValidEdge(edge, blocks))
}
const initialState = {
blocks: {},
edges: [],
@@ -356,7 +340,8 @@ export const useWorkflowStore = create<WorkflowStore>()(
data?: Record<string, any>
}>,
edges?: Edge[],
subBlockValues?: Record<string, Record<string, unknown>>
subBlockValues?: Record<string, Record<string, unknown>>,
options?: { skipEdgeValidation?: boolean }
) => {
const currentBlocks = get().blocks
const currentEdges = get().edges
@@ -381,7 +366,10 @@ export const useWorkflowStore = create<WorkflowStore>()(
}
if (edges && edges.length > 0) {
const validEdges = filterValidEdges(edges, newBlocks)
// Skip validation if already validated by caller (e.g., collaborative layer)
const validEdges = options?.skipEdgeValidation
? edges
: filterValidEdges(edges, newBlocks)
const existingEdgeIds = new Set(currentEdges.map((e) => e.id))
for (const edge of validEdges) {
if (!existingEdgeIds.has(edge.id)) {
@@ -516,11 +504,12 @@ export const useWorkflowStore = create<WorkflowStore>()(
get().updateLastSaved()
},
batchAddEdges: (edges: Edge[]) => {
batchAddEdges: (edges: Edge[], options?: { skipValidation?: boolean }) => {
const blocks = get().blocks
const currentEdges = get().edges
const validEdges = filterValidEdges(edges, blocks)
// Skip validation if already validated by caller (e.g., collaborative layer)
const validEdges = options?.skipValidation ? edges : filterValidEdges(edges, blocks)
const filtered = filterNewEdges(validEdges, currentEdges)
const newEdges = [...currentEdges]

View File

@@ -203,12 +203,13 @@ export interface WorkflowActions {
batchAddBlocks: (
blocks: BlockState[],
edges?: Edge[],
subBlockValues?: Record<string, Record<string, unknown>>
subBlockValues?: Record<string, Record<string, unknown>>,
options?: { skipEdgeValidation?: boolean }
) => void
batchRemoveBlocks: (ids: string[]) => void
batchToggleEnabled: (ids: string[]) => void
batchToggleHandles: (ids: string[]) => void
batchAddEdges: (edges: Edge[]) => void
batchAddEdges: (edges: Edge[], options?: { skipValidation?: boolean }) => void
batchRemoveEdges: (ids: string[]) => void
clear: () => Partial<WorkflowState>
updateLastSaved: () => void