fix(rehydration): consolidate store rehydration code (#1249)

* fix(rehydration): consolidate store rehydration code

* fix stale closure
This commit is contained in:
Vikhyath Mondreti
2025-09-04 20:00:51 -07:00
committed by GitHub
parent 37dcde2afc
commit 47da5eb6e8
2 changed files with 95 additions and 147 deletions

View File

@@ -125,6 +125,12 @@ export function useSubBlockValue<T = any>(
return
}
const currentActiveWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!currentActiveWorkflowId) {
logger.warn('No active workflow ID when setting value', { blockId, subBlockId })
return
}
// Use deep comparison to avoid unnecessary updates for complex objects
if (!isEqual(valueRef.current, newValue)) {
valueRef.current = newValue
@@ -147,10 +153,10 @@ export function useSubBlockValue<T = any>(
useSubBlockStore.setState((state) => ({
workflowValues: {
...state.workflowValues,
[activeWorkflowId || '']: {
...state.workflowValues[activeWorkflowId || ''],
[currentActiveWorkflowId]: {
...state.workflowValues[currentActiveWorkflowId],
[blockId]: {
...state.workflowValues[activeWorkflowId || '']?.[blockId],
...state.workflowValues[currentActiveWorkflowId]?.[blockId],
[subBlockId]: newValue,
},
},
@@ -194,7 +200,6 @@ export function useSubBlockValue<T = any>(
isStreaming,
emitValue,
isShowingDiff,
activeWorkflowId,
]
)

View File

@@ -327,91 +327,97 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
})
// Shared function to rehydrate workflow stores
const rehydrateWorkflowStores = async (
workflowId: string,
workflowState: any,
source: 'copilot' | 'workflow-state'
) => {
// Import stores dynamically
const [
{ useOperationQueueStore },
{ useWorkflowRegistry },
{ useWorkflowStore },
{ useSubBlockStore },
] = await Promise.all([
import('@/stores/operation-queue/store'),
import('@/stores/workflows/registry/store'),
import('@/stores/workflows/workflow/store'),
import('@/stores/workflows/subblock/store'),
])
// Only proceed if this is the active workflow
const { activeWorkflowId } = useWorkflowRegistry.getState()
if (activeWorkflowId !== workflowId) {
logger.info(`Skipping rehydration - workflow ${workflowId} is not active`)
return false
}
// Check for pending operations
const hasPending = useOperationQueueStore
.getState()
.operations.some((op: any) => op.workflowId === workflowId && op.status !== 'confirmed')
if (hasPending) {
logger.info(`Skipping ${source} rehydration due to pending operations in queue`)
return false
}
// Extract subblock values from blocks
const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
})
})
// Replace local workflow store with authoritative server state
useWorkflowStore.setState({
blocks: workflowState.blocks || {},
edges: workflowState.edges || [],
loops: workflowState.loops || {},
parallels: workflowState.parallels || {},
lastSaved: workflowState.lastSaved || Date.now(),
isDeployed: workflowState.isDeployed ?? false,
deployedAt: workflowState.deployedAt,
deploymentStatuses: workflowState.deploymentStatuses || {},
hasActiveWebhook: workflowState.hasActiveWebhook ?? false,
})
// Replace subblock store values for this workflow
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[workflowId]: subblockValues,
},
}))
logger.info(`Successfully rehydrated stores from ${source}`)
return true
}
// Copilot workflow edit events (database has been updated, rehydrate stores)
socketInstance.on('copilot-workflow-edit', async (data) => {
logger.info(
`Copilot edited workflow ${data.workflowId} - rehydrating stores from database`
)
if (data.workflowId === urlWorkflowId) {
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
const hasPending = useOperationQueueStore
.getState()
.operations.some(
(op: any) => op.workflowId === data.workflowId && op.status !== 'confirmed'
)
if (hasPending) {
logger.info('Skipping copilot rehydration due to pending operations in queue')
return
try {
// Fetch fresh workflow state directly from API
const response = await fetch(`/api/workflows/${data.workflowId}`)
if (response.ok) {
const responseData = await response.json()
const workflowData = responseData.data
if (workflowData?.state) {
await rehydrateWorkflowStores(data.workflowId, workflowData.state, 'copilot')
}
} catch {}
try {
// Fetch fresh workflow state directly from API
const response = await fetch(`/api/workflows/${data.workflowId}`)
if (response.ok) {
const responseData = await response.json()
const workflowData = responseData.data
if (workflowData?.state) {
logger.info('Rehydrating stores with fresh workflow state from database')
// Import stores dynamically to avoid import issues
Promise.all([
import('@/stores/workflows/workflow/store'),
import('@/stores/workflows/subblock/store'),
])
.then(([{ useWorkflowStore }, { useSubBlockStore }]) => {
const workflowState = workflowData.state
// Extract subblock values from blocks
const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(
([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
}
)
})
// Replace local workflow store with authoritative server state
useWorkflowStore.setState({
blocks: workflowState.blocks || {},
edges: workflowState.edges || [],
loops: workflowState.loops || {},
parallels: workflowState.parallels || {},
lastSaved: workflowState.lastSaved || Date.now(),
isDeployed: workflowState.isDeployed ?? false,
deployedAt: workflowState.deployedAt,
deploymentStatuses: workflowState.deploymentStatuses || {},
hasActiveWebhook: workflowState.hasActiveWebhook ?? false,
})
// Replace subblock store values for this workflow
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[data.workflowId]: subblockValues,
},
}))
// Note: Auto-layout is already handled by the copilot backend before saving
// No need to trigger additional auto-layout here to avoid ID conflicts
logger.info('Successfully rehydrated stores from database after copilot edit')
})
.catch((error) => {
logger.error('Failed to import stores for copilot rehydration:', error)
})
}
} else {
logger.error('Failed to fetch fresh workflow state:', response.statusText)
}
} catch (error) {
logger.error('Failed to rehydrate stores after copilot edit:', error)
} else {
logger.error('Failed to fetch fresh workflow state:', response.statusText)
}
} catch (error) {
logger.error('Failed to rehydrate stores after copilot edit:', error)
}
})
@@ -465,74 +471,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.debug('Operation confirmed:', data)
})
socketInstance.on('workflow-state', (workflowData) => {
socketInstance.on('workflow-state', async (workflowData) => {
logger.info('Received workflow state from server')
// Update local stores with the fresh workflow state (same logic as YAML editor)
if (workflowData?.state && workflowData.id === urlWorkflowId) {
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
const hasPending = useOperationQueueStore
.getState()
.operations.some(
(op: any) => op.workflowId === workflowData.id && op.status !== 'confirmed'
)
if (hasPending) {
logger.info(
'Skipping workflow-state rehydration due to pending operations in queue'
)
return
}
} catch {}
logger.info('Updating local stores with fresh workflow state from server')
try {
Promise.all([
import('@/stores/workflows/workflow/store'),
import('@/stores/workflows/subblock/store'),
import('@/stores/workflows/registry/store'),
])
.then(([{ useWorkflowStore }, { useSubBlockStore }]) => {
const workflowState = workflowData.state
const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
})
})
// Replace local workflow store with authoritative server state
useWorkflowStore.setState({
blocks: workflowState.blocks || {},
edges: workflowState.edges || [],
loops: workflowState.loops || {},
parallels: workflowState.parallels || {},
lastSaved: workflowState.lastSaved || Date.now(),
isDeployed: workflowState.isDeployed ?? false,
deployedAt: workflowState.deployedAt,
deploymentStatuses: workflowState.deploymentStatuses || {},
hasActiveWebhook: workflowState.hasActiveWebhook ?? false,
})
// Replace subblock store values for this workflow
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[workflowData.id]: subblockValues,
},
}))
logger.info('Merged fresh workflow state with local state')
})
.catch((error) => {
logger.error('Failed to import stores for workflow state update:', error)
})
} catch (error) {
logger.error('Failed to update local stores with workflow state:', error)
}
if (workflowData?.state) {
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
}
})