fix(change-detection): move change detection logic to client-side to prevent unnecessary API calls, consolidate utils (#2576)

* fix(change-detection): move change detection logic to client-side to prevent unnecessary API calls, consolidate utils

* added tests

* ack PR comments

* added isPublished to API response
This commit is contained in:
Waleed
2025-12-24 17:16:35 -08:00
committed by GitHub
parent 92b2e34d25
commit da7eca9590
11 changed files with 3088 additions and 469 deletions

View File

@@ -66,7 +66,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
loops: normalizedData.loops,
parallels: normalizedData.parallels,
}
const { hasWorkflowChanged } = await import('@/lib/workflows/utils')
const { hasWorkflowChanged } = await import('@/lib/workflows/comparison')
needsRedeployment = hasWorkflowChanged(currentState as any, active.state as any)
}
}

View File

@@ -3,8 +3,8 @@ import { and, desc, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { generateRequestId } from '@/lib/core/utils/request'
import { createLogger } from '@/lib/logs/console/logger'
import { hasWorkflowChanged } from '@/lib/workflows/comparison'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { hasWorkflowChanged } from '@/lib/workflows/utils'
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
@@ -69,6 +69,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
return createSuccessResponse({
isDeployed: validation.workflow.isDeployed,
deployedAt: validation.workflow.deployedAt,
isPublished: validation.workflow.isPublished,
needsRedeployment,
})
} catch (error) {

View File

@@ -1,13 +1,10 @@
import { useEffect, useMemo, useState } from 'react'
import { createLogger } from '@/lib/logs/console/logger'
import { useMemo } from 'react'
import { hasWorkflowChanged } from '@/lib/workflows/comparison'
import { useDebounce } from '@/hooks/use-debounce'
import { useOperationQueueStore } from '@/stores/operation-queue/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
const logger = createLogger('useChangeDetection')
interface UseChangeDetectionProps {
workflowId: string | null
deployedState: WorkflowState | null
@@ -15,97 +12,76 @@ interface UseChangeDetectionProps {
}
/**
* Hook to detect changes between current workflow state and deployed state
* Uses API-based change detection for accuracy
* Detects meaningful changes between current workflow state and deployed state.
* Performs comparison entirely on the client - no API calls needed.
*/
export function useChangeDetection({
workflowId,
deployedState,
isLoadingDeployedState,
}: UseChangeDetectionProps) {
const [changeDetected, setChangeDetected] = useState(false)
const [blockStructureVersion, setBlockStructureVersion] = useState(0)
const [edgeStructureVersion, setEdgeStructureVersion] = useState(0)
const [subBlockStructureVersion, setSubBlockStructureVersion] = useState(0)
// Get current store state for change detection
const currentBlocks = useWorkflowStore((state) => state.blocks)
const currentEdges = useWorkflowStore((state) => state.edges)
const lastSaved = useWorkflowStore((state) => state.lastSaved)
const blocks = useWorkflowStore((state) => state.blocks)
const edges = useWorkflowStore((state) => state.edges)
const loops = useWorkflowStore((state) => state.loops)
const parallels = useWorkflowStore((state) => state.parallels)
const subBlockValues = useSubBlockStore((state) =>
workflowId ? state.workflowValues[workflowId] : null
)
// Track structure changes
useEffect(() => {
setBlockStructureVersion((version) => version + 1)
}, [currentBlocks])
// Build current state with subblock values merged into blocks
const currentState = useMemo((): WorkflowState | null => {
if (!workflowId) return null
useEffect(() => {
setEdgeStructureVersion((version) => version + 1)
}, [currentEdges])
const blocksWithSubBlocks: WorkflowState['blocks'] = {}
for (const [blockId, block] of Object.entries(blocks)) {
const blockSubValues = subBlockValues?.[blockId] || {}
const subBlocks: Record<string, any> = {}
useEffect(() => {
setSubBlockStructureVersion((version) => version + 1)
}, [subBlockValues])
// Merge subblock values into the block's subBlocks structure
for (const [subId, value] of Object.entries(blockSubValues)) {
subBlocks[subId] = { value }
}
// Reset version counters when workflow changes
useEffect(() => {
setBlockStructureVersion(0)
setEdgeStructureVersion(0)
setSubBlockStructureVersion(0)
}, [workflowId])
// Create trigger for status check
const statusCheckTrigger = useMemo(() => {
return JSON.stringify({
lastSaved: lastSaved ?? 0,
blockVersion: blockStructureVersion,
edgeVersion: edgeStructureVersion,
subBlockVersion: subBlockStructureVersion,
})
}, [lastSaved, blockStructureVersion, edgeStructureVersion, subBlockStructureVersion])
const debouncedStatusCheckTrigger = useDebounce(statusCheckTrigger, 500)
useEffect(() => {
// Avoid off-by-one false positives: wait until operation queue is idle
const { operations, isProcessing } = useOperationQueueStore.getState()
const hasPendingOps =
isProcessing || operations.some((op) => op.status === 'pending' || op.status === 'processing')
if (!workflowId || !deployedState) {
setChangeDetected(false)
return
}
if (isLoadingDeployedState || hasPendingOps) {
return
}
// Use the workflow status API to get accurate change detection
// This uses the same logic as the deployment API (reading from normalized tables)
const checkForChanges = async () => {
try {
const response = await fetch(`/api/workflows/${workflowId}/status`)
if (response.ok) {
const data = await response.json()
setChangeDetected(data.needsRedeployment || false)
} else {
logger.error('Failed to fetch workflow status:', response.status, response.statusText)
setChangeDetected(false)
// Also include existing subBlocks from the block itself
if (block.subBlocks) {
for (const [subId, subBlock] of Object.entries(block.subBlocks)) {
if (!subBlocks[subId]) {
subBlocks[subId] = subBlock
} else {
subBlocks[subId] = { ...subBlock, value: subBlocks[subId].value }
}
}
} catch (error) {
logger.error('Error fetching workflow status:', error)
setChangeDetected(false)
}
blocksWithSubBlocks[blockId] = {
...block,
subBlocks,
}
}
checkForChanges()
}, [workflowId, deployedState, debouncedStatusCheckTrigger, isLoadingDeployedState])
return {
blocks: blocksWithSubBlocks,
edges,
loops,
parallels,
}
}, [workflowId, blocks, edges, loops, parallels, subBlockValues])
return {
changeDetected,
setChangeDetected,
// Compute change detection with debouncing for performance
const rawChangeDetected = useMemo(() => {
if (!currentState || !deployedState || isLoadingDeployedState) {
return false
}
return hasWorkflowChanged(currentState, deployedState)
}, [currentState, deployedState, isLoadingDeployedState])
// Debounce to avoid UI flicker during rapid edits
const changeDetected = useDebounce(rawChangeDetected, 300)
const setChangeDetected = () => {
// No-op: change detection is now computed, not stateful
// Kept for API compatibility
}
return { changeDetected, setChangeDetected }
}

View File

@@ -885,7 +885,8 @@ export function ToolInput({
block.type === 'knowledge' ||
block.type === 'function') &&
block.type !== 'evaluator' &&
block.type !== 'mcp'
block.type !== 'mcp' &&
block.type !== 'file'
)
const value = isPreview ? previewValue : storeValue

View File

@@ -11,6 +11,12 @@ import type {
WorkflowExecutionSnapshotInsert,
WorkflowState,
} from '@/lib/logs/types'
import {
normalizedStringify,
normalizeEdge,
normalizeValue,
sortEdges,
} from '@/lib/workflows/comparison'
const logger = createLogger('SnapshotService')
@@ -45,7 +51,7 @@ export class SnapshotService implements ISnapshotService {
id: uuidv4(),
workflowId,
stateHash,
stateData: state, // Full state with positions, subblock values, etc.
stateData: state,
}
const [newSnapshot] = await db
@@ -107,7 +113,7 @@ export class SnapshotService implements ISnapshotService {
computeStateHash(state: WorkflowState): string {
const normalizedState = this.normalizeStateForHashing(state)
const stateString = this.normalizedStringify(normalizedState)
const stateString = normalizedStringify(normalizedState)
return createHash('sha256').update(stateString).digest('hex')
}
@@ -126,23 +132,10 @@ export class SnapshotService implements ISnapshotService {
}
private normalizeStateForHashing(state: WorkflowState): any {
// Use the same normalization logic as hasWorkflowChanged for consistency
// 1. Normalize and sort edges
const normalizedEdges = sortEdges((state.edges || []).map(normalizeEdge))
// 1. Normalize edges (same as hasWorkflowChanged)
const normalizedEdges = (state.edges || [])
.map((edge) => ({
source: edge.source,
sourceHandle: edge.sourceHandle,
target: edge.target,
targetHandle: edge.targetHandle,
}))
.sort((a, b) =>
`${a.source}-${a.sourceHandle}-${a.target}-${a.targetHandle}`.localeCompare(
`${b.source}-${b.sourceHandle}-${b.target}-${b.targetHandle}`
)
)
// 2. Normalize blocks (same as hasWorkflowChanged)
// 2. Normalize blocks
const normalizedBlocks: Record<string, any> = {}
for (const [blockId, block] of Object.entries(state.blocks || {})) {
@@ -155,18 +148,16 @@ export class SnapshotService implements ISnapshotService {
...dataRest
} = blockWithoutLayoutFields.data || {}
// Handle subBlocks with detailed comparison (same as hasWorkflowChanged)
// Normalize subBlocks
const subBlocks = blockWithoutLayoutFields.subBlocks || {}
const normalizedSubBlocks: Record<string, any> = {}
for (const [subBlockId, subBlock] of Object.entries(subBlocks)) {
// Normalize value with special handling for null/undefined
const value = subBlock.value ?? null
normalizedSubBlocks[subBlockId] = {
type: subBlock.type,
value: this.normalizeValue(value),
// Include other properties except value
value: normalizeValue(value),
...Object.fromEntries(
Object.entries(subBlock).filter(([key]) => key !== 'value' && key !== 'type')
),
@@ -183,12 +174,12 @@ export class SnapshotService implements ISnapshotService {
// 3. Normalize loops and parallels
const normalizedLoops: Record<string, any> = {}
for (const [loopId, loop] of Object.entries(state.loops || {})) {
normalizedLoops[loopId] = this.normalizeValue(loop)
normalizedLoops[loopId] = normalizeValue(loop)
}
const normalizedParallels: Record<string, any> = {}
for (const [parallelId, parallel] of Object.entries(state.parallels || {})) {
normalizedParallels[parallelId] = this.normalizeValue(parallel)
normalizedParallels[parallelId] = normalizeValue(parallel)
}
return {
@@ -198,46 +189,6 @@ export class SnapshotService implements ISnapshotService {
parallels: normalizedParallels,
}
}
private normalizeValue(value: any): any {
// Handle null/undefined consistently
if (value === null || value === undefined) return null
// Handle arrays
if (Array.isArray(value)) {
return value.map((item) => this.normalizeValue(item))
}
// Handle objects
if (typeof value === 'object') {
const normalized: Record<string, any> = {}
for (const [key, val] of Object.entries(value)) {
normalized[key] = this.normalizeValue(val)
}
return normalized
}
// Handle primitives
return value
}
private normalizedStringify(obj: any): string {
if (obj === null || obj === undefined) return 'null'
if (typeof obj === 'string') return `"${obj}"`
if (typeof obj === 'number' || typeof obj === 'boolean') return String(obj)
if (Array.isArray(obj)) {
return `[${obj.map((item) => this.normalizedStringify(item)).join(',')}]`
}
if (typeof obj === 'object') {
const keys = Object.keys(obj).sort()
const pairs = keys.map((key) => `"${key}":${this.normalizedStringify(obj[key])}`)
return `{${pairs.join(',')}}`
}
return String(obj)
}
}
export const snapshotService = new SnapshotService()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,228 @@
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import {
normalizedStringify,
normalizeEdge,
normalizeLoop,
normalizeParallel,
normalizeValue,
sanitizeInputFormat,
sanitizeTools,
sortEdges,
} from './normalize'
/**
* Compare the current workflow state with the deployed state to detect meaningful changes
* @param currentState - The current workflow state
* @param deployedState - The deployed workflow state
* @returns True if there are meaningful changes, false if only position changes or no changes
*/
export function hasWorkflowChanged(
currentState: WorkflowState,
deployedState: WorkflowState | null
): boolean {
// If no deployed state exists, then the workflow has changed
if (!deployedState) return true
// 1. Compare edges (connections between blocks)
const currentEdges = currentState.edges || []
const deployedEdges = deployedState.edges || []
const normalizedCurrentEdges = sortEdges(currentEdges.map(normalizeEdge))
const normalizedDeployedEdges = sortEdges(deployedEdges.map(normalizeEdge))
if (
normalizedStringify(normalizedCurrentEdges) !== normalizedStringify(normalizedDeployedEdges)
) {
return true
}
// 2. Compare blocks and their configurations
const currentBlockIds = Object.keys(currentState.blocks || {}).sort()
const deployedBlockIds = Object.keys(deployedState.blocks || {}).sort()
if (
currentBlockIds.length !== deployedBlockIds.length ||
normalizedStringify(currentBlockIds) !== normalizedStringify(deployedBlockIds)
) {
return true
}
// 3. Build normalized representations of blocks for comparison
const normalizedCurrentBlocks: Record<string, any> = {}
const normalizedDeployedBlocks: Record<string, any> = {}
for (const blockId of currentBlockIds) {
const currentBlock = currentState.blocks[blockId]
const deployedBlock = deployedState.blocks[blockId]
// Destructure and exclude non-functional fields:
// - position: visual positioning only
// - subBlocks: handled separately below
// - layout: contains measuredWidth/measuredHeight from autolayout
// - height: block height measurement from autolayout
const {
position: _currentPos,
subBlocks: currentSubBlocks = {},
layout: _currentLayout,
height: _currentHeight,
...currentRest
} = currentBlock
const {
position: _deployedPos,
subBlocks: deployedSubBlocks = {},
layout: _deployedLayout,
height: _deployedHeight,
...deployedRest
} = deployedBlock
// Also exclude width/height from data object (container dimensions from autolayout)
const {
width: _currentDataWidth,
height: _currentDataHeight,
...currentDataRest
} = currentRest.data || {}
const {
width: _deployedDataWidth,
height: _deployedDataHeight,
...deployedDataRest
} = deployedRest.data || {}
normalizedCurrentBlocks[blockId] = {
...currentRest,
data: currentDataRest,
subBlocks: undefined,
}
normalizedDeployedBlocks[blockId] = {
...deployedRest,
data: deployedDataRest,
subBlocks: undefined,
}
// Get all subBlock IDs from both states
const allSubBlockIds = [
...new Set([...Object.keys(currentSubBlocks), ...Object.keys(deployedSubBlocks)]),
].sort()
// Normalize and compare each subBlock
for (const subBlockId of allSubBlockIds) {
// If the subBlock doesn't exist in either state, there's a difference
if (!currentSubBlocks[subBlockId] || !deployedSubBlocks[subBlockId]) {
return true
}
// Get values with special handling for null/undefined
let currentValue = currentSubBlocks[subBlockId].value ?? null
let deployedValue = deployedSubBlocks[subBlockId].value ?? null
if (subBlockId === 'tools' && Array.isArray(currentValue) && Array.isArray(deployedValue)) {
currentValue = sanitizeTools(currentValue)
deployedValue = sanitizeTools(deployedValue)
}
if (
subBlockId === 'inputFormat' &&
Array.isArray(currentValue) &&
Array.isArray(deployedValue)
) {
currentValue = sanitizeInputFormat(currentValue)
deployedValue = sanitizeInputFormat(deployedValue)
}
// For string values, compare directly to catch even small text changes
if (typeof currentValue === 'string' && typeof deployedValue === 'string') {
if (currentValue !== deployedValue) {
return true
}
} else {
// For other types, use normalized comparison
const normalizedCurrentValue = normalizeValue(currentValue)
const normalizedDeployedValue = normalizeValue(deployedValue)
if (
normalizedStringify(normalizedCurrentValue) !==
normalizedStringify(normalizedDeployedValue)
) {
return true
}
}
// Compare type and other properties
const currentSubBlockWithoutValue = { ...currentSubBlocks[subBlockId], value: undefined }
const deployedSubBlockWithoutValue = { ...deployedSubBlocks[subBlockId], value: undefined }
if (
normalizedStringify(currentSubBlockWithoutValue) !==
normalizedStringify(deployedSubBlockWithoutValue)
) {
return true
}
}
const blocksEqual =
normalizedStringify(normalizedCurrentBlocks[blockId]) ===
normalizedStringify(normalizedDeployedBlocks[blockId])
if (!blocksEqual) {
return true
}
}
// 4. Compare loops
const currentLoops = currentState.loops || {}
const deployedLoops = deployedState.loops || {}
const currentLoopIds = Object.keys(currentLoops).sort()
const deployedLoopIds = Object.keys(deployedLoops).sort()
if (
currentLoopIds.length !== deployedLoopIds.length ||
normalizedStringify(currentLoopIds) !== normalizedStringify(deployedLoopIds)
) {
return true
}
for (const loopId of currentLoopIds) {
const normalizedCurrentLoop = normalizeValue(normalizeLoop(currentLoops[loopId]))
const normalizedDeployedLoop = normalizeValue(normalizeLoop(deployedLoops[loopId]))
if (
normalizedStringify(normalizedCurrentLoop) !== normalizedStringify(normalizedDeployedLoop)
) {
return true
}
}
// 5. Compare parallels
const currentParallels = currentState.parallels || {}
const deployedParallels = deployedState.parallels || {}
const currentParallelIds = Object.keys(currentParallels).sort()
const deployedParallelIds = Object.keys(deployedParallels).sort()
if (
currentParallelIds.length !== deployedParallelIds.length ||
normalizedStringify(currentParallelIds) !== normalizedStringify(deployedParallelIds)
) {
return true
}
for (const parallelId of currentParallelIds) {
const normalizedCurrentParallel = normalizeValue(
normalizeParallel(currentParallels[parallelId])
)
const normalizedDeployedParallel = normalizeValue(
normalizeParallel(deployedParallels[parallelId])
)
if (
normalizedStringify(normalizedCurrentParallel) !==
normalizedStringify(normalizedDeployedParallel)
) {
return true
}
}
return false
}

View File

@@ -0,0 +1,7 @@
export { hasWorkflowChanged } from './compare'
export {
normalizedStringify,
normalizeEdge,
normalizeValue,
sortEdges,
} from './normalize'

View File

@@ -0,0 +1,567 @@
/**
* Tests for workflow normalization utilities
*/
import { describe, expect, it } from 'vitest'
import {
normalizedStringify,
normalizeEdge,
normalizeLoop,
normalizeParallel,
normalizeValue,
sanitizeInputFormat,
sanitizeTools,
sortEdges,
} from './normalize'
describe('Workflow Normalization Utilities', () => {
describe('normalizeValue', () => {
it.concurrent('should return primitives unchanged', () => {
expect(normalizeValue(42)).toBe(42)
expect(normalizeValue('hello')).toBe('hello')
expect(normalizeValue(true)).toBe(true)
expect(normalizeValue(false)).toBe(false)
expect(normalizeValue(null)).toBe(null)
expect(normalizeValue(undefined)).toBe(undefined)
})
it.concurrent('should handle arrays by normalizing each element', () => {
const input = [
{ b: 2, a: 1 },
{ d: 4, c: 3 },
]
const result = normalizeValue(input)
expect(result).toEqual([
{ a: 1, b: 2 },
{ c: 3, d: 4 },
])
})
it.concurrent('should sort object keys alphabetically', () => {
const input = { zebra: 1, apple: 2, mango: 3 }
const result = normalizeValue(input)
expect(Object.keys(result)).toEqual(['apple', 'mango', 'zebra'])
})
it.concurrent('should recursively normalize nested objects', () => {
const input = {
outer: {
z: 1,
a: {
y: 2,
b: 3,
},
},
first: 'value',
}
const result = normalizeValue(input)
expect(Object.keys(result)).toEqual(['first', 'outer'])
expect(Object.keys(result.outer)).toEqual(['a', 'z'])
expect(Object.keys(result.outer.a)).toEqual(['b', 'y'])
})
it.concurrent('should handle empty objects', () => {
expect(normalizeValue({})).toEqual({})
})
it.concurrent('should handle empty arrays', () => {
expect(normalizeValue([])).toEqual([])
})
it.concurrent('should handle arrays with mixed types', () => {
const input = [1, 'string', { b: 2, a: 1 }, null, [3, 2, 1]]
const result = normalizeValue(input)
expect(result[0]).toBe(1)
expect(result[1]).toBe('string')
expect(Object.keys(result[2])).toEqual(['a', 'b'])
expect(result[3]).toBe(null)
expect(result[4]).toEqual([3, 2, 1]) // Array order preserved
})
it.concurrent('should handle deeply nested structures', () => {
const input = {
level1: {
level2: {
level3: {
level4: {
z: 'deep',
a: 'value',
},
},
},
},
}
const result = normalizeValue(input)
expect(Object.keys(result.level1.level2.level3.level4)).toEqual(['a', 'z'])
})
})
describe('normalizedStringify', () => {
it.concurrent('should produce identical strings for objects with different key orders', () => {
const obj1 = { b: 2, a: 1, c: 3 }
const obj2 = { a: 1, c: 3, b: 2 }
const obj3 = { c: 3, b: 2, a: 1 }
const str1 = normalizedStringify(obj1)
const str2 = normalizedStringify(obj2)
const str3 = normalizedStringify(obj3)
expect(str1).toBe(str2)
expect(str2).toBe(str3)
})
it.concurrent('should produce valid JSON', () => {
const obj = { nested: { value: [1, 2, 3] }, name: 'test' }
const str = normalizedStringify(obj)
expect(() => JSON.parse(str)).not.toThrow()
})
it.concurrent('should handle primitive values', () => {
expect(normalizedStringify(42)).toBe('42')
expect(normalizedStringify('hello')).toBe('"hello"')
expect(normalizedStringify(true)).toBe('true')
expect(normalizedStringify(null)).toBe('null')
})
it.concurrent('should produce different strings for different values', () => {
const obj1 = { a: 1, b: 2 }
const obj2 = { a: 1, b: 3 }
expect(normalizedStringify(obj1)).not.toBe(normalizedStringify(obj2))
})
})
describe('normalizeLoop', () => {
it.concurrent('should return null/undefined as-is', () => {
expect(normalizeLoop(null)).toBe(null)
expect(normalizeLoop(undefined)).toBe(undefined)
})
it.concurrent('should normalize "for" loop type', () => {
const loop = {
id: 'loop1',
nodes: ['block1', 'block2'],
loopType: 'for',
iterations: 10,
forEachItems: 'should-be-excluded',
whileCondition: 'should-be-excluded',
doWhileCondition: 'should-be-excluded',
extraField: 'should-be-excluded',
}
const result = normalizeLoop(loop)
expect(result).toEqual({
id: 'loop1',
nodes: ['block1', 'block2'],
loopType: 'for',
iterations: 10,
})
})
it.concurrent('should normalize "forEach" loop type', () => {
const loop = {
id: 'loop2',
nodes: ['block1'],
loopType: 'forEach',
iterations: 5,
forEachItems: '<block.items>',
whileCondition: 'should-be-excluded',
}
const result = normalizeLoop(loop)
expect(result).toEqual({
id: 'loop2',
nodes: ['block1'],
loopType: 'forEach',
forEachItems: '<block.items>',
})
})
it.concurrent('should normalize "while" loop type', () => {
const loop = {
id: 'loop3',
nodes: ['block1', 'block2', 'block3'],
loopType: 'while',
whileCondition: '<block.condition> === true',
doWhileCondition: 'should-be-excluded',
}
const result = normalizeLoop(loop)
expect(result).toEqual({
id: 'loop3',
nodes: ['block1', 'block2', 'block3'],
loopType: 'while',
whileCondition: '<block.condition> === true',
})
})
it.concurrent('should normalize "doWhile" loop type', () => {
const loop = {
id: 'loop4',
nodes: ['block1'],
loopType: 'doWhile',
doWhileCondition: '<counter.value> < 100',
whileCondition: 'should-be-excluded',
}
const result = normalizeLoop(loop)
expect(result).toEqual({
id: 'loop4',
nodes: ['block1'],
loopType: 'doWhile',
doWhileCondition: '<counter.value> < 100',
})
})
it.concurrent('should handle unknown loop type with base fields only', () => {
const loop = {
id: 'loop5',
nodes: ['block1'],
loopType: 'unknown',
iterations: 5,
forEachItems: 'items',
}
const result = normalizeLoop(loop)
expect(result).toEqual({
id: 'loop5',
nodes: ['block1'],
loopType: 'unknown',
})
})
})
describe('normalizeParallel', () => {
it.concurrent('should return null/undefined as-is', () => {
expect(normalizeParallel(null)).toBe(null)
expect(normalizeParallel(undefined)).toBe(undefined)
})
it.concurrent('should normalize "count" parallel type', () => {
const parallel = {
id: 'parallel1',
nodes: ['block1', 'block2'],
parallelType: 'count',
count: 5,
distribution: 'should-be-excluded',
extraField: 'should-be-excluded',
}
const result = normalizeParallel(parallel)
expect(result).toEqual({
id: 'parallel1',
nodes: ['block1', 'block2'],
parallelType: 'count',
count: 5,
})
})
it.concurrent('should normalize "collection" parallel type', () => {
const parallel = {
id: 'parallel2',
nodes: ['block1'],
parallelType: 'collection',
count: 10,
distribution: '<block.items>',
}
const result = normalizeParallel(parallel)
expect(result).toEqual({
id: 'parallel2',
nodes: ['block1'],
parallelType: 'collection',
distribution: '<block.items>',
})
})
it.concurrent('should handle unknown parallel type with base fields only', () => {
const parallel = {
id: 'parallel3',
nodes: ['block1'],
parallelType: 'unknown',
count: 5,
distribution: 'items',
}
const result = normalizeParallel(parallel)
expect(result).toEqual({
id: 'parallel3',
nodes: ['block1'],
parallelType: 'unknown',
})
})
})
describe('sanitizeTools', () => {
it.concurrent('should return empty array for undefined', () => {
expect(sanitizeTools(undefined)).toEqual([])
})
it.concurrent('should return empty array for non-array input', () => {
expect(sanitizeTools(null as any)).toEqual([])
expect(sanitizeTools('not-an-array' as any)).toEqual([])
expect(sanitizeTools({} as any)).toEqual([])
})
it.concurrent('should remove isExpanded field from tools', () => {
const tools = [
{ id: 'tool1', name: 'Search', isExpanded: true },
{ id: 'tool2', name: 'Calculator', isExpanded: false },
{ id: 'tool3', name: 'Weather' }, // No isExpanded field
]
const result = sanitizeTools(tools)
expect(result).toEqual([
{ id: 'tool1', name: 'Search' },
{ id: 'tool2', name: 'Calculator' },
{ id: 'tool3', name: 'Weather' },
])
})
it.concurrent('should preserve all other fields', () => {
const tools = [
{
id: 'tool1',
name: 'Complex Tool',
isExpanded: true,
schema: { type: 'function', name: 'search' },
params: { query: 'test' },
nested: { deep: { value: 123 } },
},
]
const result = sanitizeTools(tools)
expect(result[0]).toEqual({
id: 'tool1',
name: 'Complex Tool',
schema: { type: 'function', name: 'search' },
params: { query: 'test' },
nested: { deep: { value: 123 } },
})
})
it.concurrent('should handle empty array', () => {
expect(sanitizeTools([])).toEqual([])
})
})
describe('sanitizeInputFormat', () => {
it.concurrent('should return empty array for undefined', () => {
expect(sanitizeInputFormat(undefined)).toEqual([])
})
it.concurrent('should return empty array for non-array input', () => {
expect(sanitizeInputFormat(null as any)).toEqual([])
expect(sanitizeInputFormat('not-an-array' as any)).toEqual([])
expect(sanitizeInputFormat({} as any)).toEqual([])
})
it.concurrent('should remove value and collapsed fields', () => {
const inputFormat = [
{ id: 'input1', name: 'Name', value: 'John', collapsed: true },
{ id: 'input2', name: 'Age', value: 25, collapsed: false },
{ id: 'input3', name: 'Email' }, // No value or collapsed
]
const result = sanitizeInputFormat(inputFormat)
expect(result).toEqual([
{ id: 'input1', name: 'Name' },
{ id: 'input2', name: 'Age' },
{ id: 'input3', name: 'Email' },
])
})
it.concurrent('should preserve all other fields', () => {
const inputFormat = [
{
id: 'input1',
name: 'Complex Input',
value: 'test-value',
collapsed: true,
type: 'string',
required: true,
validation: { min: 0, max: 100 },
},
]
const result = sanitizeInputFormat(inputFormat)
expect(result[0]).toEqual({
id: 'input1',
name: 'Complex Input',
type: 'string',
required: true,
validation: { min: 0, max: 100 },
})
})
it.concurrent('should handle empty array', () => {
expect(sanitizeInputFormat([])).toEqual([])
})
})
describe('normalizeEdge', () => {
it.concurrent('should extract only connection-relevant fields', () => {
const edge = {
id: 'edge1',
source: 'block1',
sourceHandle: 'output',
target: 'block2',
targetHandle: 'input',
type: 'smoothstep',
animated: true,
style: { stroke: 'red' },
data: { label: 'connection' },
}
const result = normalizeEdge(edge)
expect(result).toEqual({
source: 'block1',
sourceHandle: 'output',
target: 'block2',
targetHandle: 'input',
})
})
it.concurrent('should handle edges without handles', () => {
const edge = {
id: 'edge1',
source: 'block1',
target: 'block2',
}
const result = normalizeEdge(edge)
expect(result).toEqual({
source: 'block1',
sourceHandle: undefined,
target: 'block2',
targetHandle: undefined,
})
})
it.concurrent('should handle edges with only source handle', () => {
const edge = {
id: 'edge1',
source: 'block1',
sourceHandle: 'output',
target: 'block2',
}
const result = normalizeEdge(edge)
expect(result).toEqual({
source: 'block1',
sourceHandle: 'output',
target: 'block2',
targetHandle: undefined,
})
})
})
describe('sortEdges', () => {
it.concurrent('should sort edges consistently', () => {
const edges = [
{ source: 'c', target: 'd' },
{ source: 'a', target: 'b' },
{ source: 'b', target: 'c' },
]
const result = sortEdges(edges)
expect(result[0].source).toBe('a')
expect(result[1].source).toBe('b')
expect(result[2].source).toBe('c')
})
it.concurrent(
'should sort by source, then sourceHandle, then target, then targetHandle',
() => {
const edges = [
{ source: 'a', sourceHandle: 'out2', target: 'b', targetHandle: 'in1' },
{ source: 'a', sourceHandle: 'out1', target: 'b', targetHandle: 'in1' },
{ source: 'a', sourceHandle: 'out1', target: 'b', targetHandle: 'in2' },
{ source: 'a', sourceHandle: 'out1', target: 'c', targetHandle: 'in1' },
]
const result = sortEdges(edges)
expect(result[0]).toEqual({
source: 'a',
sourceHandle: 'out1',
target: 'b',
targetHandle: 'in1',
})
expect(result[1]).toEqual({
source: 'a',
sourceHandle: 'out1',
target: 'b',
targetHandle: 'in2',
})
expect(result[2]).toEqual({
source: 'a',
sourceHandle: 'out1',
target: 'c',
targetHandle: 'in1',
})
expect(result[3]).toEqual({
source: 'a',
sourceHandle: 'out2',
target: 'b',
targetHandle: 'in1',
})
}
)
it.concurrent('should not mutate the original array', () => {
const edges = [
{ source: 'c', target: 'd' },
{ source: 'a', target: 'b' },
]
const originalFirst = edges[0]
sortEdges(edges)
expect(edges[0]).toBe(originalFirst)
})
it.concurrent('should handle empty array', () => {
expect(sortEdges([])).toEqual([])
})
it.concurrent('should handle edges with undefined handles', () => {
const edges = [
{ source: 'b', target: 'c' },
{ source: 'a', target: 'b', sourceHandle: 'out' },
]
const result = sortEdges(edges)
expect(result[0].source).toBe('a')
expect(result[1].source).toBe('b')
})
it.concurrent('should produce identical results regardless of input order', () => {
const edges1 = [
{ source: 'c', sourceHandle: 'x', target: 'd', targetHandle: 'y' },
{ source: 'a', sourceHandle: 'x', target: 'b', targetHandle: 'y' },
{ source: 'b', sourceHandle: 'x', target: 'c', targetHandle: 'y' },
]
const edges2 = [
{ source: 'a', sourceHandle: 'x', target: 'b', targetHandle: 'y' },
{ source: 'b', sourceHandle: 'x', target: 'c', targetHandle: 'y' },
{ source: 'c', sourceHandle: 'x', target: 'd', targetHandle: 'y' },
]
const edges3 = [
{ source: 'b', sourceHandle: 'x', target: 'c', targetHandle: 'y' },
{ source: 'c', sourceHandle: 'x', target: 'd', targetHandle: 'y' },
{ source: 'a', sourceHandle: 'x', target: 'b', targetHandle: 'y' },
]
const result1 = normalizedStringify(sortEdges(edges1))
const result2 = normalizedStringify(sortEdges(edges2))
const result3 = normalizedStringify(sortEdges(edges3))
expect(result1).toBe(result2)
expect(result2).toBe(result3)
})
})
})

View File

@@ -0,0 +1,133 @@
/**
* Shared normalization utilities for workflow change detection.
* Used by both client-side signature computation and server-side comparison.
*/
/**
* Normalizes a value for consistent comparison by sorting object keys recursively
* @param value - The value to normalize
* @returns A normalized version of the value with sorted keys
*/
export function normalizeValue(value: any): any {
if (value === null || value === undefined || typeof value !== 'object') {
return value
}
if (Array.isArray(value)) {
return value.map(normalizeValue)
}
const sorted: Record<string, any> = {}
for (const key of Object.keys(value).sort()) {
sorted[key] = normalizeValue(value[key])
}
return sorted
}
/**
* Generates a normalized JSON string for comparison
* @param value - The value to normalize and stringify
* @returns A normalized JSON string
*/
export function normalizedStringify(value: any): string {
return JSON.stringify(normalizeValue(value))
}
/**
* Normalizes a loop configuration by extracting only the relevant fields for the loop type
* @param loop - The loop configuration object
* @returns Normalized loop with only relevant fields
*/
export function normalizeLoop(loop: any): any {
if (!loop) return loop
const { id, nodes, loopType, iterations, forEachItems, whileCondition, doWhileCondition } = loop
const base: any = { id, nodes, loopType }
switch (loopType) {
case 'for':
return { ...base, iterations }
case 'forEach':
return { ...base, forEachItems }
case 'while':
return { ...base, whileCondition }
case 'doWhile':
return { ...base, doWhileCondition }
default:
return base
}
}
/**
* Normalizes a parallel configuration by extracting only the relevant fields for the parallel type
* @param parallel - The parallel configuration object
* @returns Normalized parallel with only relevant fields
*/
export function normalizeParallel(parallel: any): any {
if (!parallel) return parallel
const { id, nodes, parallelType, count, distribution } = parallel
const base: any = { id, nodes, parallelType }
switch (parallelType) {
case 'count':
return { ...base, count }
case 'collection':
return { ...base, distribution }
default:
return base
}
}
/**
* Sanitizes tools array by removing UI-only fields like isExpanded
* @param tools - Array of tool configurations
* @returns Sanitized tools array
*/
export function sanitizeTools(tools: any[] | undefined): any[] {
if (!Array.isArray(tools)) return []
return tools.map(({ isExpanded, ...rest }) => rest)
}
/**
* Sanitizes inputFormat array by removing UI-only fields like value and collapsed
* @param inputFormat - Array of input format configurations
* @returns Sanitized input format array
*/
export function sanitizeInputFormat(inputFormat: any[] | undefined): any[] {
if (!Array.isArray(inputFormat)) return []
return inputFormat.map(({ value, collapsed, ...rest }) => rest)
}
/**
* Normalizes an edge by extracting only the connection-relevant fields
* @param edge - The edge object
* @returns Normalized edge with only connection fields
*/
export function normalizeEdge(edge: any): {
source: string
sourceHandle?: string
target: string
targetHandle?: string
} {
return {
source: edge.source,
sourceHandle: edge.sourceHandle,
target: edge.target,
targetHandle: edge.targetHandle,
}
}
/**
* Sorts edges for consistent comparison
* @param edges - Array of edges to sort
* @returns Sorted array of normalized edges
*/
export function sortEdges(
edges: Array<{ source: string; sourceHandle?: string; target: string; targetHandle?: string }>
): Array<{ source: string; sourceHandle?: string; target: string; targetHandle?: string }> {
return [...edges].sort((a, b) =>
`${a.source}-${a.sourceHandle}-${a.target}-${a.targetHandle}`.localeCompare(
`${b.source}-${b.sourceHandle}-${b.target}-${b.targetHandle}`
)
)
}

View File

@@ -7,7 +7,6 @@ import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console/logger'
import type { PermissionType } from '@/lib/workspaces/permissions/utils'
import type { ExecutionResult } from '@/executor/types'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
const logger = createLogger('WorkflowUtils')
@@ -136,328 +135,6 @@ export async function updateWorkflowRunCounts(workflowId: string, runs = 1) {
}
}
/**
* Sanitize tools array by removing UI-only fields
* @param tools - The tools array to sanitize
* @returns A sanitized tools array
*/
function sanitizeToolsForComparison(tools: any[] | undefined): any[] {
if (!Array.isArray(tools)) {
return []
}
return tools.map((tool) => {
const { isExpanded, ...cleanTool } = tool
return cleanTool
})
}
/**
* Sanitize inputFormat array by removing test-only value fields
* @param inputFormat - The inputFormat array to sanitize
* @returns A sanitized inputFormat array without test values
*/
function sanitizeInputFormatForComparison(inputFormat: any[] | undefined): any[] {
if (!Array.isArray(inputFormat)) {
return []
}
return inputFormat.map((field) => {
const { value, collapsed, ...cleanField } = field
return cleanField
})
}
/**
* Normalize a value for consistent comparison by sorting object keys
* @param value - The value to normalize
* @returns A normalized version of the value
*/
function normalizeValue(value: any): any {
// If not an object or array, return as is
if (value === null || value === undefined || typeof value !== 'object') {
return value
}
// Handle arrays by normalizing each element
if (Array.isArray(value)) {
return value.map(normalizeValue)
}
// For objects, sort keys and normalize each value
const sortedObj: Record<string, any> = {}
// Get all keys and sort them
const sortedKeys = Object.keys(value).sort()
// Reconstruct object with sorted keys and normalized values
for (const key of sortedKeys) {
sortedObj[key] = normalizeValue(value[key])
}
return sortedObj
}
/**
* Generate a normalized JSON string for comparison
* @param value - The value to normalize and stringify
* @returns A normalized JSON string
*/
function normalizedStringify(value: any): string {
return JSON.stringify(normalizeValue(value))
}
/**
* Compare the current workflow state with the deployed state to detect meaningful changes
* @param currentState - The current workflow state
* @param deployedState - The deployed workflow state
* @returns True if there are meaningful changes, false if only position changes or no changes
*/
export function hasWorkflowChanged(
currentState: WorkflowState,
deployedState: WorkflowState | null
): boolean {
// If no deployed state exists, then the workflow has changed
if (!deployedState) return true
// 1. Compare edges (connections between blocks)
// First check length
const currentEdges = currentState.edges || []
const deployedEdges = deployedState.edges || []
// Create sorted, normalized representations of the edges for more reliable comparison
const normalizedCurrentEdges = currentEdges
.map((edge) => ({
source: edge.source,
sourceHandle: edge.sourceHandle,
target: edge.target,
targetHandle: edge.targetHandle,
}))
.sort((a, b) =>
`${a.source}-${a.sourceHandle}-${a.target}-${a.targetHandle}`.localeCompare(
`${b.source}-${b.sourceHandle}-${b.target}-${b.targetHandle}`
)
)
const normalizedDeployedEdges = deployedEdges
.map((edge) => ({
source: edge.source,
sourceHandle: edge.sourceHandle,
target: edge.target,
targetHandle: edge.targetHandle,
}))
.sort((a, b) =>
`${a.source}-${a.sourceHandle}-${a.target}-${a.targetHandle}`.localeCompare(
`${b.source}-${b.sourceHandle}-${b.target}-${b.targetHandle}`
)
)
// Compare the normalized edge arrays
if (
normalizedStringify(normalizedCurrentEdges) !== normalizedStringify(normalizedDeployedEdges)
) {
return true
}
// 2. Compare blocks and their configurations
const currentBlockIds = Object.keys(currentState.blocks || {}).sort()
const deployedBlockIds = Object.keys(deployedState.blocks || {}).sort()
// Check if the block IDs are different
if (
currentBlockIds.length !== deployedBlockIds.length ||
normalizedStringify(currentBlockIds) !== normalizedStringify(deployedBlockIds)
) {
return true
}
// 3. Build normalized representations of blocks for comparison
const normalizedCurrentBlocks: Record<string, any> = {}
const normalizedDeployedBlocks: Record<string, any> = {}
for (const blockId of currentBlockIds) {
const currentBlock = currentState.blocks[blockId]
const deployedBlock = deployedState.blocks[blockId]
// Destructure and exclude non-functional fields:
// - position: visual positioning only
// - subBlocks: handled separately below
// - layout: contains measuredWidth/measuredHeight from autolayout
// - height: block height measurement from autolayout
const {
position: _currentPos,
subBlocks: currentSubBlocks = {},
layout: _currentLayout,
height: _currentHeight,
...currentRest
} = currentBlock
const {
position: _deployedPos,
subBlocks: deployedSubBlocks = {},
layout: _deployedLayout,
height: _deployedHeight,
...deployedRest
} = deployedBlock
// Also exclude width/height from data object (container dimensions from autolayout)
const {
width: _currentDataWidth,
height: _currentDataHeight,
...currentDataRest
} = currentRest.data || {}
const {
width: _deployedDataWidth,
height: _deployedDataHeight,
...deployedDataRest
} = deployedRest.data || {}
normalizedCurrentBlocks[blockId] = {
...currentRest,
data: currentDataRest,
subBlocks: undefined,
}
normalizedDeployedBlocks[blockId] = {
...deployedRest,
data: deployedDataRest,
subBlocks: undefined,
}
// Get all subBlock IDs from both states
const allSubBlockIds = [
...new Set([...Object.keys(currentSubBlocks), ...Object.keys(deployedSubBlocks)]),
].sort()
// Check if any subBlocks are missing in either state
if (Object.keys(currentSubBlocks).length !== Object.keys(deployedSubBlocks).length) {
return true
}
// Normalize and compare each subBlock
for (const subBlockId of allSubBlockIds) {
// If the subBlock doesn't exist in either state, there's a difference
if (!currentSubBlocks[subBlockId] || !deployedSubBlocks[subBlockId]) {
return true
}
// Get values with special handling for null/undefined
let currentValue = currentSubBlocks[subBlockId].value ?? null
let deployedValue = deployedSubBlocks[subBlockId].value ?? null
// Special handling for 'tools' subBlock - sanitize UI-only fields
if (subBlockId === 'tools' && Array.isArray(currentValue) && Array.isArray(deployedValue)) {
currentValue = sanitizeToolsForComparison(currentValue)
deployedValue = sanitizeToolsForComparison(deployedValue)
}
// Special handling for 'inputFormat' subBlock - sanitize UI-only fields (collapsed state)
if (
subBlockId === 'inputFormat' &&
Array.isArray(currentValue) &&
Array.isArray(deployedValue)
) {
currentValue = sanitizeInputFormatForComparison(currentValue)
deployedValue = sanitizeInputFormatForComparison(deployedValue)
}
// For string values, compare directly to catch even small text changes
if (typeof currentValue === 'string' && typeof deployedValue === 'string') {
if (currentValue !== deployedValue) {
return true
}
} else {
// For other types, use normalized comparison
const normalizedCurrentValue = normalizeValue(currentValue)
const normalizedDeployedValue = normalizeValue(deployedValue)
if (
normalizedStringify(normalizedCurrentValue) !==
normalizedStringify(normalizedDeployedValue)
) {
return true
}
}
// Compare type and other properties
const currentSubBlockWithoutValue = { ...currentSubBlocks[subBlockId], value: undefined }
const deployedSubBlockWithoutValue = { ...deployedSubBlocks[subBlockId], value: undefined }
if (
normalizedStringify(currentSubBlockWithoutValue) !==
normalizedStringify(deployedSubBlockWithoutValue)
) {
return true
}
}
// Skip the normalization of subBlocks since we've already done detailed comparison above
const blocksEqual =
normalizedStringify(normalizedCurrentBlocks[blockId]) ===
normalizedStringify(normalizedDeployedBlocks[blockId])
// We've already compared subBlocks in detail
if (!blocksEqual) {
return true
}
}
// 4. Compare loops
const currentLoops = currentState.loops || {}
const deployedLoops = deployedState.loops || {}
const currentLoopIds = Object.keys(currentLoops).sort()
const deployedLoopIds = Object.keys(deployedLoops).sort()
if (
currentLoopIds.length !== deployedLoopIds.length ||
normalizedStringify(currentLoopIds) !== normalizedStringify(deployedLoopIds)
) {
return true
}
// Compare each loop with normalized values
for (const loopId of currentLoopIds) {
const normalizedCurrentLoop = normalizeValue(currentLoops[loopId])
const normalizedDeployedLoop = normalizeValue(deployedLoops[loopId])
if (
normalizedStringify(normalizedCurrentLoop) !== normalizedStringify(normalizedDeployedLoop)
) {
return true
}
}
// 5. Compare parallels
const currentParallels = currentState.parallels || {}
const deployedParallels = deployedState.parallels || {}
const currentParallelIds = Object.keys(currentParallels).sort()
const deployedParallelIds = Object.keys(deployedParallels).sort()
if (
currentParallelIds.length !== deployedParallelIds.length ||
normalizedStringify(currentParallelIds) !== normalizedStringify(deployedParallelIds)
) {
return true
}
// Compare each parallel with normalized values
for (const parallelId of currentParallelIds) {
const normalizedCurrentParallel = normalizeValue(currentParallels[parallelId])
const normalizedDeployedParallel = normalizeValue(deployedParallels[parallelId])
if (
normalizedStringify(normalizedCurrentParallel) !==
normalizedStringify(normalizedDeployedParallel)
) {
return true
}
}
return false
}
export const workflowHasResponseBlock = (executionResult: ExecutionResult): boolean => {
if (
!executionResult?.logs ||