fix(snapshot): consolidate to use hasWorkflowChanges check

This commit is contained in:
Vikhyath Mondreti
2026-01-28 13:23:57 -08:00
parent 78410eef84
commit e2e86a7b19
5 changed files with 271 additions and 311 deletions

View File

@@ -86,7 +86,13 @@ describe('SnapshotService', () => {
type: 'agent',
position: { x: 100, y: 200 },
subBlocks: {},
subBlocks: {
prompt: {
id: 'prompt',
type: 'short-input',
value: 'Hello world',
},
},
outputs: {},
enabled: true,
horizontalHandles: true,
@@ -104,8 +110,14 @@ describe('SnapshotService', () => {
blocks: {
block1: {
...baseState.blocks.block1,
// Different block state - we can change outputs to make it different
outputs: { response: { type: 'string', description: 'different result' } },
// Different subBlock value - this is a meaningful change
subBlocks: {
prompt: {
id: 'prompt',
type: 'short-input',
value: 'Different prompt',
},
},
},
},
}

View File

@@ -11,12 +11,7 @@ import type {
WorkflowExecutionSnapshotInsert,
WorkflowState,
} from '@/lib/logs/types'
import {
normalizedStringify,
normalizeEdge,
normalizeValue,
sortEdges,
} from '@/lib/workflows/comparison'
import { normalizedStringify, normalizeWorkflowState } from '@/lib/workflows/comparison'
const logger = createLogger('SnapshotService')
@@ -38,7 +33,9 @@ export class SnapshotService implements ISnapshotService {
const existingSnapshot = await this.getSnapshotByHash(workflowId, stateHash)
if (existingSnapshot) {
logger.debug(`Reusing existing snapshot for workflow ${workflowId} with hash ${stateHash}`)
logger.info(
`Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
)
return {
snapshot: existingSnapshot,
isNew: false,
@@ -59,8 +56,9 @@ export class SnapshotService implements ISnapshotService {
.values(snapshotData)
.returning()
logger.debug(`Created new snapshot for workflow ${workflowId} with hash ${stateHash}`)
logger.debug(`Stored full state with ${Object.keys(state.blocks || {}).length} blocks`)
logger.info(
`Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
)
return {
snapshot: {
...newSnapshot,
@@ -112,7 +110,7 @@ export class SnapshotService implements ISnapshotService {
}
computeStateHash(state: WorkflowState): string {
const normalizedState = this.normalizeStateForHashing(state)
const normalizedState = normalizeWorkflowState(state)
const stateString = normalizedStringify(normalizedState)
return createHash('sha256').update(stateString).digest('hex')
}
@@ -130,69 +128,6 @@ export class SnapshotService implements ISnapshotService {
logger.info(`Cleaned up ${deletedCount} orphaned snapshots older than ${olderThanDays} days`)
return deletedCount
}
private normalizeStateForHashing(state: WorkflowState): any {
// 1. Normalize and sort edges
const normalizedEdges = sortEdges((state.edges || []).map(normalizeEdge))
// 2. Normalize blocks
const normalizedBlocks: Record<string, any> = {}
for (const [blockId, block] of Object.entries(state.blocks || {})) {
const { position, layout, height, ...blockWithoutLayoutFields } = block
// Also exclude width/height from data object (container dimensions from autolayout)
const {
width: _dataWidth,
height: _dataHeight,
...dataRest
} = blockWithoutLayoutFields.data || {}
// Normalize subBlocks
const subBlocks = blockWithoutLayoutFields.subBlocks || {}
const normalizedSubBlocks: Record<string, any> = {}
for (const [subBlockId, subBlock] of Object.entries(subBlocks)) {
const value = subBlock.value ?? null
normalizedSubBlocks[subBlockId] = {
type: subBlock.type,
value: normalizeValue(value),
...Object.fromEntries(
Object.entries(subBlock).filter(([key]) => key !== 'value' && key !== 'type')
),
}
}
normalizedBlocks[blockId] = {
...blockWithoutLayoutFields,
data: dataRest,
subBlocks: normalizedSubBlocks,
}
}
// 3. Normalize loops and parallels
const normalizedLoops: Record<string, any> = {}
for (const [loopId, loop] of Object.entries(state.loops || {})) {
normalizedLoops[loopId] = normalizeValue(loop)
}
const normalizedParallels: Record<string, any> = {}
for (const [parallelId, parallel] of Object.entries(state.parallels || {})) {
normalizedParallels[parallelId] = normalizeValue(parallel)
}
// 4. Normalize variables (if present)
const normalizedVariables = state.variables ? normalizeValue(state.variables) : undefined
return {
blocks: normalizedBlocks,
edges: normalizedEdges,
loops: normalizedLoops,
parallels: normalizedParallels,
...(normalizedVariables !== undefined && { variables: normalizedVariables }),
}
}
}
export const snapshotService = new SnapshotService()

View File

@@ -1,34 +1,10 @@
import type { BlockState, WorkflowState } from '@/stores/workflows/workflow/types'
import { SYSTEM_SUBBLOCK_IDS, TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants'
import {
normalizedStringify,
normalizeEdge,
normalizeLoop,
normalizeParallel,
normalizeValue,
normalizeVariables,
sanitizeInputFormat,
sanitizeTools,
sanitizeVariable,
sortEdges,
} from './normalize'
/** Block with optional diff markers added by copilot */
type BlockWithDiffMarkers = BlockState & {
is_diff?: string
field_diffs?: Record<string, unknown>
}
/** SubBlock with optional diff marker */
type SubBlockWithDiffMarker = {
id: string
type: string
value: unknown
is_diff?: string
}
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import { normalizedStringify, normalizeWorkflowState } from './normalize'
/**
* Compare the current workflow state with the deployed state to detect meaningful changes
* Compare the current workflow state with the deployed state to detect meaningful changes.
* Uses the shared normalizeWorkflowState function to ensure consistency with snapshot hashing.
*
* @param currentState - The current workflow state
* @param deployedState - The deployed workflow state
* @returns True if there are meaningful changes, false if only position changes or no changes
@@ -40,236 +16,106 @@ export function hasWorkflowChanged(
// If no deployed state exists, then the workflow has changed
if (!deployedState) return true
// 1. Compare edges (connections between blocks)
const currentEdges = currentState.edges || []
const deployedEdges = deployedState.edges || []
const normalizedCurrent = normalizeWorkflowState(currentState)
const normalizedDeployed = normalizeWorkflowState(deployedState)
const normalizedCurrentEdges = sortEdges(currentEdges.map(normalizeEdge))
const normalizedDeployedEdges = sortEdges(deployedEdges.map(normalizeEdge))
const currentStr = normalizedStringify(normalizedCurrent)
const deployedStr = normalizedStringify(normalizedDeployed)
if (
normalizedStringify(normalizedCurrentEdges) !== normalizedStringify(normalizedDeployedEdges)
) {
return true
}
if (currentStr !== deployedStr) {
// Debug: Find what's different
console.log('[hasWorkflowChanged] Detected differences:')
// 2. Compare blocks and their configurations
const currentBlockIds = Object.keys(currentState.blocks || {}).sort()
const deployedBlockIds = Object.keys(deployedState.blocks || {}).sort()
if (
currentBlockIds.length !== deployedBlockIds.length ||
normalizedStringify(currentBlockIds) !== normalizedStringify(deployedBlockIds)
) {
return true
}
// 3. Build normalized representations of blocks for comparison
const normalizedCurrentBlocks: Record<string, unknown> = {}
const normalizedDeployedBlocks: Record<string, unknown> = {}
for (const blockId of currentBlockIds) {
const currentBlock = currentState.blocks[blockId]
const deployedBlock = deployedState.blocks[blockId]
// Destructure and exclude non-functional fields:
// - position: visual positioning only
// - subBlocks: handled separately below
// - layout: contains measuredWidth/measuredHeight from autolayout
// - height: block height measurement from autolayout
// - outputs: derived from subBlocks (e.g., inputFormat), already compared via subBlocks
// - is_diff, field_diffs: diff markers from copilot edits
const currentBlockWithDiff = currentBlock as BlockWithDiffMarkers
const deployedBlockWithDiff = deployedBlock as BlockWithDiffMarkers
const {
position: _currentPos,
subBlocks: currentSubBlocks = {},
layout: _currentLayout,
height: _currentHeight,
outputs: _currentOutputs,
is_diff: _currentIsDiff,
field_diffs: _currentFieldDiffs,
...currentRest
} = currentBlockWithDiff
const {
position: _deployedPos,
subBlocks: deployedSubBlocks = {},
layout: _deployedLayout,
height: _deployedHeight,
outputs: _deployedOutputs,
is_diff: _deployedIsDiff,
field_diffs: _deployedFieldDiffs,
...deployedRest
} = deployedBlockWithDiff
// Also exclude width/height from data object (container dimensions from autolayout)
const {
width: _currentDataWidth,
height: _currentDataHeight,
...currentDataRest
} = currentRest.data || {}
const {
width: _deployedDataWidth,
height: _deployedDataHeight,
...deployedDataRest
} = deployedRest.data || {}
normalizedCurrentBlocks[blockId] = {
...currentRest,
data: currentDataRest,
subBlocks: undefined,
// Compare edges
if (
normalizedStringify(normalizedCurrent.edges) !== normalizedStringify(normalizedDeployed.edges)
) {
console.log(' - Edges differ')
console.log(' Current:', JSON.stringify(normalizedCurrent.edges, null, 2))
console.log(' Deployed:', JSON.stringify(normalizedDeployed.edges, null, 2))
}
normalizedDeployedBlocks[blockId] = {
...deployedRest,
data: deployedDataRest,
subBlocks: undefined,
}
// Compare blocks
const currentBlockIds = Object.keys(normalizedCurrent.blocks).sort()
const deployedBlockIds = Object.keys(normalizedDeployed.blocks).sort()
// Get all subBlock IDs from both states, excluding runtime metadata and UI-only elements
const allSubBlockIds = [
...new Set([...Object.keys(currentSubBlocks), ...Object.keys(deployedSubBlocks)]),
]
.filter(
(id) => !TRIGGER_RUNTIME_SUBBLOCK_IDS.includes(id) && !SYSTEM_SUBBLOCK_IDS.includes(id)
)
.sort()
if (normalizedStringify(currentBlockIds) !== normalizedStringify(deployedBlockIds)) {
console.log(' - Block IDs differ')
console.log(' Current:', currentBlockIds)
console.log(' Deployed:', deployedBlockIds)
} else {
for (const blockId of currentBlockIds) {
const currentBlock = normalizedCurrent.blocks[blockId]
const deployedBlock = normalizedDeployed.blocks[blockId]
// Normalize and compare each subBlock
for (const subBlockId of allSubBlockIds) {
// If the subBlock doesn't exist in either state, there's a difference
if (!currentSubBlocks[subBlockId] || !deployedSubBlocks[subBlockId]) {
return true
}
if (normalizedStringify(currentBlock) !== normalizedStringify(deployedBlock)) {
console.log(` - Block "${blockId}" differs:`)
// Get values with special handling for null/undefined
// Using unknown type since sanitization functions return different types
let currentValue: unknown = currentSubBlocks[subBlockId].value ?? null
let deployedValue: unknown = deployedSubBlocks[subBlockId].value ?? null
// Compare subBlocks
const currentSubBlockIds = Object.keys(currentBlock.subBlocks || {}).sort()
const deployedSubBlockIds = Object.keys(deployedBlock.subBlocks || {}).sort()
if (subBlockId === 'tools' && Array.isArray(currentValue) && Array.isArray(deployedValue)) {
currentValue = sanitizeTools(currentValue)
deployedValue = sanitizeTools(deployedValue)
}
if (
normalizedStringify(currentSubBlockIds) !== normalizedStringify(deployedSubBlockIds)
) {
console.log(' SubBlock IDs differ:')
console.log(' Current:', currentSubBlockIds)
console.log(' Deployed:', deployedSubBlockIds)
} else {
for (const subBlockId of currentSubBlockIds) {
const currentSub = currentBlock.subBlocks[subBlockId]
const deployedSub = deployedBlock.subBlocks[subBlockId]
if (
subBlockId === 'inputFormat' &&
Array.isArray(currentValue) &&
Array.isArray(deployedValue)
) {
currentValue = sanitizeInputFormat(currentValue)
deployedValue = sanitizeInputFormat(deployedValue)
}
if (normalizedStringify(currentSub) !== normalizedStringify(deployedSub)) {
console.log(` SubBlock "${subBlockId}" differs:`)
console.log(' Current:', JSON.stringify(currentSub, null, 2))
console.log(' Deployed:', JSON.stringify(deployedSub, null, 2))
}
}
}
// For string values, compare directly to catch even small text changes
if (typeof currentValue === 'string' && typeof deployedValue === 'string') {
if (currentValue !== deployedValue) {
return true
}
} else {
// For other types, use normalized comparison
const normalizedCurrentValue = normalizeValue(currentValue)
const normalizedDeployedValue = normalizeValue(deployedValue)
// Compare block properties (excluding subBlocks)
const { subBlocks: _cs, ...currentBlockRest } = currentBlock
const { subBlocks: _ds, ...deployedBlockRest } = deployedBlock
if (
normalizedStringify(normalizedCurrentValue) !==
normalizedStringify(normalizedDeployedValue)
) {
return true
if (normalizedStringify(currentBlockRest) !== normalizedStringify(deployedBlockRest)) {
console.log(' Block properties differ:')
console.log(' Current:', JSON.stringify(currentBlockRest, null, 2))
console.log(' Deployed:', JSON.stringify(deployedBlockRest, null, 2))
}
}
}
// Compare type and other properties (excluding diff markers and value)
const currentSubBlockWithDiff = currentSubBlocks[subBlockId] as SubBlockWithDiffMarker
const deployedSubBlockWithDiff = deployedSubBlocks[subBlockId] as SubBlockWithDiffMarker
const { value: _cv, is_diff: _cd, ...currentSubBlockRest } = currentSubBlockWithDiff
const { value: _dv, is_diff: _dd, ...deployedSubBlockRest } = deployedSubBlockWithDiff
if (normalizedStringify(currentSubBlockRest) !== normalizedStringify(deployedSubBlockRest)) {
return true
}
}
const blocksEqual =
normalizedStringify(normalizedCurrentBlocks[blockId]) ===
normalizedStringify(normalizedDeployedBlocks[blockId])
if (!blocksEqual) {
return true
}
}
// 4. Compare loops
const currentLoops = currentState.loops || {}
const deployedLoops = deployedState.loops || {}
const currentLoopIds = Object.keys(currentLoops).sort()
const deployedLoopIds = Object.keys(deployedLoops).sort()
if (
currentLoopIds.length !== deployedLoopIds.length ||
normalizedStringify(currentLoopIds) !== normalizedStringify(deployedLoopIds)
) {
return true
}
for (const loopId of currentLoopIds) {
const normalizedCurrentLoop = normalizeValue(normalizeLoop(currentLoops[loopId]))
const normalizedDeployedLoop = normalizeValue(normalizeLoop(deployedLoops[loopId]))
// Compare loops
if (
normalizedStringify(normalizedCurrentLoop) !== normalizedStringify(normalizedDeployedLoop)
normalizedStringify(normalizedCurrent.loops) !== normalizedStringify(normalizedDeployed.loops)
) {
return true
console.log(' - Loops differ')
console.log(' Current:', JSON.stringify(normalizedCurrent.loops, null, 2))
console.log(' Deployed:', JSON.stringify(normalizedDeployed.loops, null, 2))
}
}
// 5. Compare parallels
const currentParallels = currentState.parallels || {}
const deployedParallels = deployedState.parallels || {}
const currentParallelIds = Object.keys(currentParallels).sort()
const deployedParallelIds = Object.keys(deployedParallels).sort()
if (
currentParallelIds.length !== deployedParallelIds.length ||
normalizedStringify(currentParallelIds) !== normalizedStringify(deployedParallelIds)
) {
return true
}
for (const parallelId of currentParallelIds) {
const normalizedCurrentParallel = normalizeValue(
normalizeParallel(currentParallels[parallelId])
)
const normalizedDeployedParallel = normalizeValue(
normalizeParallel(deployedParallels[parallelId])
)
// Compare parallels
if (
normalizedStringify(normalizedCurrentParallel) !==
normalizedStringify(normalizedDeployedParallel)
normalizedStringify(normalizedCurrent.parallels) !==
normalizedStringify(normalizedDeployed.parallels)
) {
return true
console.log(' - Parallels differ')
console.log(' Current:', JSON.stringify(normalizedCurrent.parallels, null, 2))
console.log(' Deployed:', JSON.stringify(normalizedDeployed.parallels, null, 2))
}
}
// 6. Compare variables
const currentVariables = normalizeVariables(currentState.variables)
const deployedVariables = normalizeVariables(deployedState.variables)
// Compare variables
if (
normalizedStringify(normalizedCurrent.variables) !==
normalizedStringify(normalizedDeployed.variables)
) {
console.log(' - Variables differ')
console.log(' Current:', JSON.stringify(normalizedCurrent.variables, null, 2))
console.log(' Deployed:', JSON.stringify(normalizedDeployed.variables, null, 2))
}
const normalizedCurrentVars = normalizeValue(
Object.fromEntries(Object.entries(currentVariables).map(([id, v]) => [id, sanitizeVariable(v)]))
)
const normalizedDeployedVars = normalizeValue(
Object.fromEntries(
Object.entries(deployedVariables).map(([id, v]) => [id, sanitizeVariable(v)])
)
)
if (normalizedStringify(normalizedCurrentVars) !== normalizedStringify(normalizedDeployedVars)) {
return true
}

View File

@@ -1,7 +1,15 @@
export { hasWorkflowChanged } from './compare'
export type { NormalizedWorkflowState } from './normalize'
export {
normalizedStringify,
normalizeEdge,
normalizeLoop,
normalizeParallel,
normalizeValue,
normalizeVariables,
normalizeWorkflowState,
sanitizeInputFormat,
sanitizeTools,
sanitizeVariable,
sortEdges,
} from './normalize'

View File

@@ -4,7 +4,14 @@
*/
import type { Edge } from 'reactflow'
import type { Loop, Parallel, Variable } from '@/stores/workflows/workflow/types'
import type {
BlockState,
Loop,
Parallel,
Variable,
WorkflowState,
} from '@/stores/workflows/workflow/types'
import { SYSTEM_SUBBLOCK_IDS, TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants'
/**
* Normalizes a value for consistent comparison by sorting object keys recursively
@@ -220,3 +227,155 @@ export function sortEdges(
)
)
}
/** Block with optional diff markers added by copilot */
type BlockWithDiffMarkers = BlockState & {
is_diff?: string
field_diffs?: Record<string, unknown>
}
/** SubBlock with optional diff marker */
type SubBlockWithDiffMarker = {
id: string
type: string
value: unknown
is_diff?: string
}
/** Normalized block structure for comparison */
interface NormalizedBlock {
[key: string]: unknown
data: Record<string, unknown>
subBlocks: Record<string, NormalizedSubBlock>
}
/** Normalized subBlock structure */
interface NormalizedSubBlock {
[key: string]: unknown
value: unknown
}
/** Normalized workflow state structure */
export interface NormalizedWorkflowState {
blocks: Record<string, NormalizedBlock>
edges: Array<{
source: string
sourceHandle?: string | null
target: string
targetHandle?: string | null
}>
loops: Record<string, unknown>
parallels: Record<string, unknown>
variables: unknown
}
/**
* Normalizes a workflow state for comparison or hashing.
* Excludes non-functional fields (position, layout, height, outputs, diff markers)
* and system/trigger runtime subBlocks.
*
* @param state - The workflow state to normalize
* @returns A normalized workflow state suitable for comparison or hashing
*/
export function normalizeWorkflowState(state: WorkflowState): NormalizedWorkflowState {
// 1. Normalize and sort edges (connection-relevant fields only)
const normalizedEdges = sortEdges((state.edges || []).map(normalizeEdge))
// 2. Normalize blocks
const normalizedBlocks: Record<string, NormalizedBlock> = {}
for (const [blockId, block] of Object.entries(state.blocks || {})) {
const blockWithDiff = block as BlockWithDiffMarkers
// Exclude non-functional fields:
// - position: visual positioning only
// - layout: contains measuredWidth/measuredHeight from autolayout
// - height: block height measurement from autolayout
// - outputs: derived from subBlocks, already compared via subBlocks
// - is_diff, field_diffs: diff markers from copilot edits
// - subBlocks: handled separately
const {
position: _position,
subBlocks: blockSubBlocks = {},
layout: _layout,
height: _height,
outputs: _outputs,
is_diff: _isDiff,
field_diffs: _fieldDiffs,
...blockRest
} = blockWithDiff
// Exclude from data object:
// - width/height: container dimensions from autolayout
// - nodes: subflow node membership (derived/runtime for parallel/loop blocks)
// - distribution: parallel distribution (derived/runtime)
const {
width: _dataWidth,
height: _dataHeight,
nodes: _dataNodes,
distribution: _dataDistribution,
...dataRest
} = (blockRest.data || {}) as Record<string, unknown>
// Filter and normalize subBlocks (exclude system/trigger runtime subBlocks)
const normalizedSubBlocks: Record<string, NormalizedSubBlock> = {}
const subBlockIds = Object.keys(blockSubBlocks)
.filter(
(id) => !SYSTEM_SUBBLOCK_IDS.includes(id) && !TRIGGER_RUNTIME_SUBBLOCK_IDS.includes(id)
)
.sort()
for (const subBlockId of subBlockIds) {
const subBlock = blockSubBlocks[subBlockId] as SubBlockWithDiffMarker
let value: unknown = subBlock.value ?? null
// Sanitize UI-only fields from tools and inputFormat
if (subBlockId === 'tools' && Array.isArray(value)) {
value = sanitizeTools(value)
}
if (subBlockId === 'inputFormat' && Array.isArray(value)) {
value = sanitizeInputFormat(value)
}
// Exclude diff markers from subBlock
const { value: _v, is_diff: _sd, ...subBlockRest } = subBlock
normalizedSubBlocks[subBlockId] = {
...subBlockRest,
value: normalizeValue(value),
}
}
normalizedBlocks[blockId] = {
...blockRest,
data: dataRest,
subBlocks: normalizedSubBlocks,
}
}
// 3. Normalize loops using specialized normalizeLoop (extracts only type-relevant fields)
const normalizedLoops: Record<string, unknown> = {}
for (const [loopId, loop] of Object.entries(state.loops || {})) {
normalizedLoops[loopId] = normalizeValue(normalizeLoop(loop))
}
// 4. Normalize parallels using specialized normalizeParallel
const normalizedParallels: Record<string, unknown> = {}
for (const [parallelId, parallel] of Object.entries(state.parallels || {})) {
normalizedParallels[parallelId] = normalizeValue(normalizeParallel(parallel))
}
// 5. Normalize variables (remove UI-only validationError field)
const variables = normalizeVariables(state.variables)
const normalizedVariablesObj = normalizeValue(
Object.fromEntries(Object.entries(variables).map(([id, v]) => [id, sanitizeVariable(v)]))
)
return {
blocks: normalizedBlocks,
edges: normalizedEdges,
loops: normalizedLoops,
parallels: normalizedParallels,
variables: normalizedVariablesObj,
}
}