fix(snapshot): changed insert to upsert when concurrent identical child workflows are running (#3259)

* fix(snapshot): changed insert to upsert when concurrent identical child workflows are running

* fixed ci tests failing
This commit is contained in:
Waleed
2026-02-19 13:58:35 -08:00
committed by GitHub
parent e24c824c9a
commit 2b5e436a2a
2 changed files with 234 additions and 39 deletions

View File

@@ -1,8 +1,59 @@
import { describe, expect, it } from 'vitest'
/**
* @vitest-environment node
*/
import { databaseMock, drizzleOrmMock, loggerMock } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'
const { mockSchemaExports } = vi.hoisted(() => ({
mockSchemaExports: {
workflowExecutionSnapshots: {
id: 'id',
workflowId: 'workflow_id',
stateHash: 'state_hash',
stateData: 'state_data',
createdAt: 'created_at',
},
workflowExecutionLogs: {
id: 'id',
stateSnapshotId: 'state_snapshot_id',
},
},
}))
vi.mock('@sim/db', () => databaseMock)
vi.mock('@sim/db/schema', () => mockSchemaExports)
vi.mock('@sim/logger', () => loggerMock)
vi.mock('drizzle-orm', () => drizzleOrmMock)
vi.mock('uuid', () => ({ v4: vi.fn(() => 'generated-uuid-1') }))
import { SnapshotService } from '@/lib/logs/execution/snapshot/service'
import type { WorkflowState } from '@/lib/logs/types'
const mockState: WorkflowState = {
blocks: {
block1: {
id: 'block1',
name: 'Test Agent',
type: 'agent',
position: { x: 100, y: 200 },
subBlocks: {},
outputs: {},
enabled: true,
horizontalHandles: true,
advancedMode: false,
height: 0,
},
},
edges: [{ id: 'edge1', source: 'block1', target: 'block2' }],
loops: {},
parallels: {},
}
describe('SnapshotService', () => {
beforeEach(() => {
vi.clearAllMocks()
})
describe('computeStateHash', () => {
it.concurrent('should generate consistent hashes for identical states', () => {
const service = new SnapshotService()
@@ -62,7 +113,7 @@ describe('SnapshotService', () => {
blocks: {
block1: {
...baseState.blocks.block1,
position: { x: 500, y: 600 }, // Different position
position: { x: 500, y: 600 },
},
},
}
@@ -140,7 +191,7 @@ describe('SnapshotService', () => {
const state2: WorkflowState = {
blocks: {},
edges: [
{ id: 'edge2', source: 'b', target: 'c' }, // Different order
{ id: 'edge2', source: 'b', target: 'c' },
{ id: 'edge1', source: 'a', target: 'b' },
],
loops: {},
@@ -219,7 +270,6 @@ describe('SnapshotService', () => {
const hash = service.computeStateHash(complexState)
expect(hash).toHaveLength(64)
// Should be consistent
const hash2 = service.computeStateHash(complexState)
expect(hash).toBe(hash2)
})
@@ -335,4 +385,166 @@ describe('SnapshotService', () => {
expect(hash1).toHaveLength(64)
})
})
describe('createSnapshotWithDeduplication', () => {
it('should use upsert to insert a new snapshot', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'
const mockReturning = vi.fn().mockResolvedValue([
{
id: 'generated-uuid-1',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
const mockInsert = vi.fn().mockReturnValue({ values: mockValues })
databaseMock.db.insert = mockInsert
const result = await service.createSnapshotWithDeduplication(workflowId, mockState)
expect(mockInsert).toHaveBeenCalled()
expect(mockValues).toHaveBeenCalledWith(
expect.objectContaining({
id: 'generated-uuid-1',
workflowId,
stateData: mockState,
})
)
expect(mockOnConflictDoUpdate).toHaveBeenCalledWith(
expect.objectContaining({
set: expect.any(Object),
})
)
expect(result.snapshot.id).toBe('generated-uuid-1')
expect(result.isNew).toBe(true)
})
it('should detect reused snapshot when returned id differs from generated id', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'
const mockReturning = vi.fn().mockResolvedValue([
{
id: 'existing-snapshot-id',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
const mockInsert = vi.fn().mockReturnValue({ values: mockValues })
databaseMock.db.insert = mockInsert
const result = await service.createSnapshotWithDeduplication(workflowId, mockState)
expect(result.snapshot.id).toBe('existing-snapshot-id')
expect(result.isNew).toBe(false)
})
it('should not throw on concurrent inserts with the same hash', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'
const mockReturningNew = vi.fn().mockResolvedValue([
{
id: 'generated-uuid-1',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
const mockReturningExisting = vi.fn().mockResolvedValue([
{
id: 'existing-snapshot-id',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
let callCount = 0
databaseMock.db.insert = vi.fn().mockImplementation(() => ({
values: vi.fn().mockImplementation(() => ({
onConflictDoUpdate: vi.fn().mockImplementation(() => ({
returning: callCount++ === 0 ? mockReturningNew : mockReturningExisting,
})),
})),
}))
const [result1, result2] = await Promise.all([
service.createSnapshotWithDeduplication(workflowId, mockState),
service.createSnapshotWithDeduplication(workflowId, mockState),
])
expect(result1.snapshot.id).toBe('generated-uuid-1')
expect(result1.isNew).toBe(true)
expect(result2.snapshot.id).toBe('existing-snapshot-id')
expect(result2.isNew).toBe(false)
})
it('should pass state_data in the ON CONFLICT SET clause', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'
let capturedConflictConfig: Record<string, unknown> | undefined
const mockReturning = vi.fn().mockResolvedValue([
{
id: 'generated-uuid-1',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
databaseMock.db.insert = vi.fn().mockReturnValue({
values: vi.fn().mockReturnValue({
onConflictDoUpdate: vi.fn().mockImplementation((config: Record<string, unknown>) => {
capturedConflictConfig = config
return { returning: mockReturning }
}),
}),
})
await service.createSnapshotWithDeduplication(workflowId, mockState)
expect(capturedConflictConfig).toBeDefined()
expect(capturedConflictConfig!.target).toBeDefined()
expect(capturedConflictConfig!.set).toBeDefined()
expect(capturedConflictConfig!.set).toHaveProperty('stateData')
})
it('should always call insert, never a separate select for deduplication', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'
const mockReturning = vi.fn().mockResolvedValue([
{
id: 'generated-uuid-1',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
databaseMock.db.insert = vi.fn().mockReturnValue({ values: mockValues })
databaseMock.db.select = vi.fn()
await service.createSnapshotWithDeduplication(workflowId, mockState)
expect(databaseMock.db.insert).toHaveBeenCalledTimes(1)
expect(databaseMock.db.select).not.toHaveBeenCalled()
})
})
})

View File

@@ -2,7 +2,7 @@ import { createHash } from 'crypto'
import { db } from '@sim/db'
import { workflowExecutionLogs, workflowExecutionSnapshots } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt, notExists } from 'drizzle-orm'
import { and, eq, lt, notExists, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import type {
SnapshotService as ISnapshotService,
@@ -28,36 +28,8 @@ export class SnapshotService implements ISnapshotService {
workflowId: string,
state: WorkflowState
): Promise<SnapshotCreationResult> {
// Hash the position-less state for deduplication (functional equivalence)
const stateHash = this.computeStateHash(state)
const existingSnapshot = await this.getSnapshotByHash(workflowId, stateHash)
if (existingSnapshot) {
let refreshedState: WorkflowState = existingSnapshot.stateData
try {
await db
.update(workflowExecutionSnapshots)
.set({ stateData: state })
.where(eq(workflowExecutionSnapshots.id, existingSnapshot.id))
refreshedState = state
} catch (error) {
logger.warn(
`Failed to refresh snapshot stateData for ${existingSnapshot.id}, continuing with existing data`,
error
)
}
logger.info(
`Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
)
return {
snapshot: { ...existingSnapshot, stateData: refreshedState },
isNew: false,
}
}
// Store the FULL state (including positions) so we can recreate the exact workflow
// Even though we hash without positions, we want to preserve the complete state
const snapshotData: WorkflowExecutionSnapshotInsert = {
id: uuidv4(),
workflowId,
@@ -65,21 +37,32 @@ export class SnapshotService implements ISnapshotService {
stateData: state,
}
const [newSnapshot] = await db
const [upsertedSnapshot] = await db
.insert(workflowExecutionSnapshots)
.values(snapshotData)
.onConflictDoUpdate({
target: [workflowExecutionSnapshots.workflowId, workflowExecutionSnapshots.stateHash],
set: {
stateData: sql`excluded.state_data`,
},
})
.returning()
const isNew = upsertedSnapshot.id === snapshotData.id
logger.info(
`Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
isNew
? `Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
: `Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
)
return {
snapshot: {
...newSnapshot,
stateData: newSnapshot.stateData as WorkflowState,
createdAt: newSnapshot.createdAt.toISOString(),
...upsertedSnapshot,
stateData: upsertedSnapshot.stateData as WorkflowState,
createdAt: upsertedSnapshot.createdAt.toISOString(),
},
isNew: true,
isNew,
}
}