Merge pull request #671 from simstudioai/fix/queuing

improvement(queuing): queuing with retries for sockets ops
This commit is contained in:
Vikhyath Mondreti
2025-07-13 01:18:30 -07:00
committed by GitHub
12 changed files with 758 additions and 344 deletions

View File

@@ -7,17 +7,21 @@ import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/w/compo
interface ConnectionStatusProps {
isConnected: boolean
hasOperationError?: boolean
}
export function ConnectionStatus({ isConnected }: ConnectionStatusProps) {
export function ConnectionStatus({ isConnected, hasOperationError }: ConnectionStatusProps) {
const userPermissions = useUserPermissionsContext()
const handleRefresh = () => {
window.location.reload()
}
// Don't render anything if not in offline mode
if (!userPermissions.isOfflineMode) {
// Show error if either offline mode OR operation error
const shouldShowError = userPermissions.isOfflineMode || hasOperationError
// Don't render anything if no errors
if (!shouldShowError) {
return null
}
@@ -32,10 +36,14 @@ export function ConnectionStatus({ isConnected }: ConnectionStatusProps) {
</div>
<div className='flex flex-col'>
<span className='font-medium text-xs leading-tight'>
{isConnected ? 'Reconnected' : 'Connection lost - please refresh'}
{hasOperationError
? 'Workflow Edit Failed'
: isConnected
? 'Reconnected'
: 'Connection lost - please refresh'}
</span>
<span className='text-red-600 text-xs leading-tight'>
{isConnected ? 'Refresh to continue editing' : 'Read-only mode active'}
Please refresh to continue editing
</span>
</div>
</div>

View File

@@ -1,6 +1,7 @@
'use client'
import { useMemo } from 'react'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { usePresence } from '../../../../hooks/use-presence'
import { ConnectionStatus } from './components/connection-status/connection-status'
import { UserAvatar } from './components/user-avatar/user-avatar'
@@ -29,6 +30,9 @@ export function UserAvatarStack({
const { users: presenceUsers, isConnected } = usePresence()
const users = propUsers || presenceUsers
// Get operation error state from collaborative workflow
const { hasOperationError } = useCollaborativeWorkflow()
// Memoize the processed users to avoid unnecessary re-renders
const { visibleUsers, overflowCount } = useMemo(() => {
if (users.length === 0) {
@@ -53,8 +57,8 @@ export function UserAvatarStack({
return (
<div className={`flex items-center gap-3 ${className}`}>
{/* Connection status - always check, shows when offline */}
<ConnectionStatus isConnected={isConnected} />
{/* Connection status - always check, shows when offline or operation errors */}
<ConnectionStatus isConnected={isConnected} hasOperationError={hasOperationError} />
{/* Only show avatar stack when there are multiple users (>1) */}
{users.length > 1 && (

View File

@@ -260,7 +260,7 @@ export function LoopBadges({ nodeId, data }: LoopBadgesProps) {
<div className='relative min-h-[80px] rounded-md border border-input bg-background px-3 pt-2 pb-3 font-mono text-sm'>
{editorValue === '' && (
<div className='pointer-events-none absolute top-[8.5px] left-3 select-none text-muted-foreground/50'>
["item1", "item2", "item3"]
['item1', 'item2', 'item3']
</div>
)}
<Editor

View File

@@ -269,7 +269,8 @@ export function useSubBlockValue<T = any>(
if (!isEqual(valueRef.current, newValue)) {
valueRef.current = newValue
// Always update local store immediately for UI responsiveness
// Update local store immediately for UI responsiveness
// The collaborative function will also update it, but that's okay for idempotency
useSubBlockStore.setState((state) => ({
workflowValues: {
...state.workflowValues,

View File

@@ -22,7 +22,6 @@ import { SkeletonLoading } from '@/app/workspace/[workspaceId]/w/[workflowId]/co
import { Toolbar } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/toolbar/toolbar'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/w/components/providers/workspace-permissions-provider'
import { getBlock } from '@/blocks'
import { useSocket } from '@/contexts/socket-context'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useWorkspacePermissions } from '@/hooks/use-workspace-permissions'
import { useExecutionStore } from '@/stores/execution/store'
@@ -120,10 +119,9 @@ const WorkflowContent = React.memo(() => {
collaborativeRemoveEdge: removeEdge,
collaborativeUpdateBlockPosition,
collaborativeUpdateParentId: updateParentId,
isConnected,
currentWorkflowId,
collaborativeSetSubblockValue,
} = useCollaborativeWorkflow()
const { emitSubblockUpdate } = useSocket()
const { markAllAsRead } = useNotificationStore()
const { resetLoaded: resetVariablesLoaded } = useVariablesStore()
@@ -1484,11 +1482,9 @@ const WorkflowContent = React.memo(() => {
const handleSubBlockValueUpdate = (event: CustomEvent) => {
const { blockId, subBlockId, value } = event.detail
if (blockId && subBlockId) {
// Only emit the socket update, don't update the store again
// The store was already updated in the setValue function
if (isConnected && currentWorkflowId && activeWorkflowId === currentWorkflowId) {
emitSubblockUpdate(blockId, subBlockId, value)
}
// Use collaborative function to go through queue system
// This ensures 5-second timeout and error detection work
collaborativeSetSubblockValue(blockId, subBlockId, value)
}
}
@@ -1500,7 +1496,7 @@ const WorkflowContent = React.memo(() => {
handleSubBlockValueUpdate as EventListener
)
}
}, [emitSubblockUpdate, isConnected, currentWorkflowId, activeWorkflowId])
}, [collaborativeSetSubblockValue])
// Show skeleton UI while loading, then smoothly transition to real content
const showSkeletonUI = !isWorkflowReady

View File

@@ -4,12 +4,12 @@ import type React from 'react'
import { createContext, useContext, useEffect, useMemo, useState } from 'react'
import { useParams } from 'next/navigation'
import { createLogger } from '@/lib/logs/console-logger'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useUserPermissions, type WorkspaceUserPermissions } from '@/hooks/use-user-permissions'
import {
useWorkspacePermissions,
type WorkspacePermissions,
} from '@/hooks/use-workspace-permissions'
import { usePresence } from '../../[workflowId]/hooks/use-presence'
const logger = createLogger('WorkspacePermissionsProvider')
@@ -57,7 +57,16 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
// Manage offline mode state locally
const [isOfflineMode, setIsOfflineMode] = useState(false)
const [hasBeenConnected, setHasBeenConnected] = useState(false)
// Get operation error state from collaborative workflow
const { hasOperationError } = useCollaborativeWorkflow()
// Set offline mode when there are operation errors
useEffect(() => {
if (hasOperationError) {
setIsOfflineMode(true)
}
}, [hasOperationError])
// Fetch workspace permissions and loading state
const {
@@ -74,26 +83,8 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
permissionsError
)
// Get connection status and update offline mode accordingly
const { isConnected } = usePresence()
useEffect(() => {
if (isConnected) {
// Mark that we've been connected at least once
setHasBeenConnected(true)
// On initial connection, allow going online
if (!hasBeenConnected) {
setIsOfflineMode(false)
}
// If we were previously connected and this is a reconnection, stay offline (user must refresh)
} else if (hasBeenConnected) {
const timeoutId = setTimeout(() => {
setIsOfflineMode(true)
}, 6000)
return () => clearTimeout(timeoutId)
}
// If not connected and never been connected, stay in initial state (not offline mode)
}, [isConnected, hasBeenConnected])
// Note: Connection-based error detection removed - only rely on operation timeouts
// The 5-second operation timeout system will handle all error cases
// Create connection-aware permissions that override user permissions when offline
const userPermissions = useMemo((): WorkspaceUserPermissions & { isOfflineMode?: boolean } => {

View File

@@ -38,8 +38,18 @@ interface SocketContextType {
presenceUsers: PresenceUser[]
joinWorkflow: (workflowId: string) => void
leaveWorkflow: () => void
emitWorkflowOperation: (operation: string, target: string, payload: any) => void
emitSubblockUpdate: (blockId: string, subblockId: string, value: any) => void
emitWorkflowOperation: (
operation: string,
target: string,
payload: any,
operationId?: string
) => void
emitSubblockUpdate: (
blockId: string,
subblockId: string,
value: any,
operationId?: string
) => void
emitCursorUpdate: (cursor: { x: number; y: number }) => void
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
// Event handlers for receiving real-time updates
@@ -51,6 +61,8 @@ interface SocketContextType {
onUserLeft: (handler: (data: any) => void) => void
onWorkflowDeleted: (handler: (data: any) => void) => void
onWorkflowReverted: (handler: (data: any) => void) => void
onOperationConfirmed: (handler: (data: any) => void) => void
onOperationFailed: (handler: (data: any) => void) => void
}
const SocketContext = createContext<SocketContextType>({
@@ -73,6 +85,8 @@ const SocketContext = createContext<SocketContextType>({
onUserLeft: () => {},
onWorkflowDeleted: () => {},
onWorkflowReverted: () => {},
onOperationConfirmed: () => {},
onOperationFailed: () => {},
})
export const useSocket = () => useContext(SocketContext)
@@ -103,6 +117,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
userLeft?: (data: any) => void
workflowDeleted?: (data: any) => void
workflowReverted?: (data: any) => void
operationConfirmed?: (data: any) => void
operationFailed?: (data: any) => void
}>({})
// Helper function to generate a fresh socket token
@@ -290,6 +306,18 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowReverted?.(data)
})
// Operation confirmation events
socketInstance.on('operation-confirmed', (data) => {
logger.debug('Operation confirmed', { operationId: data.operationId })
eventHandlers.current.operationConfirmed?.(data)
})
// Operation failure events
socketInstance.on('operation-failed', (data) => {
logger.warn('Operation failed', { operationId: data.operationId, error: data.error })
eventHandlers.current.operationFailed?.(data)
})
// Cursor update events
socketInstance.on('cursor-update', (data) => {
setPresenceUsers((prev) =>
@@ -444,8 +472,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
// Emit workflow operations (blocks, edges, subflows)
const emitWorkflowOperation = useCallback(
(operation: string, target: string, payload: any) => {
if (!socket || !currentWorkflowId) return
(operation: string, target: string, payload: any, operationId?: string) => {
if (!socket || !currentWorkflowId) {
return
}
// Apply light throttling only to position updates for smooth collaborative experience
const isPositionUpdate = operation === 'update-position' && target === 'block'
@@ -459,6 +489,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
target,
payload,
timestamp: Date.now(),
operationId, // Include operation ID for queue tracking
})
// Check if we already have a pending timeout for this block
@@ -482,6 +513,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
target,
payload,
timestamp: Date.now(),
operationId, // Include operation ID for queue tracking
})
}
},
@@ -490,7 +522,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
// Emit subblock value updates
const emitSubblockUpdate = useCallback(
(blockId: string, subblockId: string, value: any) => {
(blockId: string, subblockId: string, value: any, operationId?: string) => {
// Only emit if socket is connected and we're in a valid workflow room
if (socket && currentWorkflowId) {
socket.emit('subblock-update', {
@@ -498,6 +530,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
subblockId,
value,
timestamp: Date.now(),
operationId, // Include operation ID for queue tracking
})
} else {
logger.warn('Cannot emit subblock update: no socket connection or workflow room', {
@@ -570,6 +603,14 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowReverted = handler
}, [])
const onOperationConfirmed = useCallback((handler: (data: any) => void) => {
eventHandlers.current.operationConfirmed = handler
}, [])
const onOperationFailed = useCallback((handler: (data: any) => void) => {
eventHandlers.current.operationFailed = handler
}, [])
return (
<SocketContext.Provider
value={{
@@ -592,6 +633,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,
onOperationFailed,
}}
>
{children}

View File

@@ -1,9 +1,11 @@
import { useCallback, useEffect, useRef } from 'react'
import type { Edge } from 'reactflow'
import { useSession } from '@/lib/auth-client'
import { createLogger } from '@/lib/logs/console-logger'
import { getBlock } from '@/blocks'
import { resolveOutputType } from '@/blocks/utils'
import { useSocket } from '@/contexts/socket-context'
import { registerEmitFunctions, useOperationQueue } from '@/stores/operation-queue/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
@@ -26,11 +28,14 @@ export function useCollaborativeWorkflow() {
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,
onOperationFailed,
} = useSocket()
const { activeWorkflowId } = useWorkflowRegistry()
const workflowStore = useWorkflowStore()
const subBlockStore = useSubBlockStore()
const { data: session } = useSession()
// Track if we're applying remote changes to avoid infinite loops
const isApplyingRemoteChange = useRef(false)
@@ -38,6 +43,10 @@ export function useCollaborativeWorkflow() {
// Track last applied position timestamps to prevent out-of-order updates
const lastPositionTimestamps = useRef<Map<string, number>>(new Map())
// Operation queue
const { queue, hasOperationError, addToQueue, confirmOperation, failOperation } =
useOperationQueue()
// Clear position timestamps when switching workflows
// Note: Workflow joining is now handled automatically by socket connect event based on URL
useEffect(() => {
@@ -54,22 +63,15 @@ export function useCollaborativeWorkflow() {
}
}, [activeWorkflowId, isConnected, currentWorkflowId])
// Log connection status changes
// Register emit functions with operation queue store
useEffect(() => {
logger.info('Collaborative workflow connection status changed', {
isConnected,
currentWorkflowId,
activeWorkflowId,
presenceUsers: presenceUsers.length,
})
}, [isConnected, currentWorkflowId, activeWorkflowId, presenceUsers.length])
registerEmitFunctions(emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId)
}, [emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId])
// Handle incoming workflow operations from other users
useEffect(() => {
const handleWorkflowOperation = (data: any) => {
const { operation, target, payload, userId } = data
// Don't apply our own operations
if (isApplyingRemoteChange.current) return
logger.info(`Received ${operation} on ${target} from user ${userId}`)
@@ -81,8 +83,6 @@ export function useCollaborativeWorkflow() {
if (target === 'block') {
switch (operation) {
case 'add':
// Use normal addBlock - the collaborative system now sends complete data
// and the validation schema preserves outputs and subBlocks
workflowStore.addBlock(
payload.id,
payload.type,
@@ -92,17 +92,13 @@ export function useCollaborativeWorkflow() {
payload.parentId,
payload.extent
)
// Handle auto-connect edge if present
if (payload.autoConnectEdge) {
workflowStore.addEdge(payload.autoConnectEdge)
}
break
case 'update-position': {
// Apply position update only if it's newer than the last applied timestamp
// This prevents jagged movement from out-of-order position updates
const blockId = payload.id
// Server should always provide timestamp - if missing, skip ordering check
if (!data.timestamp) {
logger.warn('Position update missing timestamp, applying without ordering check', {
blockId,
@@ -146,12 +142,9 @@ export function useCollaborativeWorkflow() {
workflowStore.setBlockWide(payload.id, payload.isWide)
break
case 'update-advanced-mode':
// Note: toggleBlockAdvancedMode doesn't take a parameter, it just toggles
// For now, we'll use the existing toggle method
workflowStore.toggleBlockAdvancedMode(payload.id)
break
case 'toggle-handles': {
// Apply the handles toggle - we need to set the specific value to ensure consistency
const currentBlock = workflowStore.blocks[payload.id]
if (currentBlock && currentBlock.horizontalHandles !== payload.horizontalHandles) {
workflowStore.toggleBlockHandles(payload.id)
@@ -159,7 +152,6 @@ export function useCollaborativeWorkflow() {
break
}
case 'duplicate':
// Apply the duplicate operation by adding the new block
workflowStore.addBlock(
payload.id,
payload.type,
@@ -330,6 +322,19 @@ export function useCollaborativeWorkflow() {
}
}
const handleOperationConfirmed = (data: any) => {
const { operationId } = data
logger.debug('Operation confirmed', { operationId })
confirmOperation(operationId)
}
const handleOperationFailed = (data: any) => {
const { operationId, error, retryable } = data
logger.warn('Operation failed', { operationId, error, retryable })
failOperation(operationId)
}
// Register event handlers
onWorkflowOperation(handleWorkflowOperation)
onSubblockUpdate(handleSubblockUpdate)
@@ -337,6 +342,8 @@ export function useCollaborativeWorkflow() {
onUserLeft(handleUserLeft)
onWorkflowDeleted(handleWorkflowDeleted)
onWorkflowReverted(handleWorkflowReverted)
onOperationConfirmed(handleOperationConfirmed)
onOperationFailed(handleOperationFailed)
return () => {
// Cleanup handled by socket context
@@ -348,12 +355,52 @@ export function useCollaborativeWorkflow() {
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,
onOperationFailed,
workflowStore,
subBlockStore,
activeWorkflowId,
confirmOperation,
failOperation,
emitWorkflowOperation,
queue,
])
// Collaborative workflow operations
const executeQueuedOperation = useCallback(
(operation: string, target: string, payload: any, localAction: () => void) => {
if (isApplyingRemoteChange.current) {
return
}
const operationId = crypto.randomUUID()
addToQueue({
id: operationId,
operation: {
operation,
target,
payload,
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})
localAction()
},
[addToQueue, session?.user?.id]
)
const executeQueuedDebouncedOperation = useCallback(
(operation: string, target: string, payload: any, localAction: () => void) => {
if (isApplyingRemoteChange.current) return
localAction()
emitWorkflowOperation(operation, target, payload)
},
[emitWorkflowOperation]
)
const collaborativeAddBlock = useCallback(
(
id: string,
@@ -365,7 +412,6 @@ export function useCollaborativeWorkflow() {
extent?: 'parent',
autoConnectEdge?: Edge
) => {
// Create complete block data upfront using the same logic as the store
const blockConfig = getBlock(type)
// Handle loop/parallel blocks that don't use BlockConfig
@@ -388,16 +434,36 @@ export function useCollaborativeWorkflow() {
autoConnectEdge, // Include edge data for atomic operation
}
// Apply locally first
// Skip if applying remote changes
if (isApplyingRemoteChange.current) {
workflowStore.addBlock(id, type, name, position, data, parentId, extent)
if (autoConnectEdge) {
workflowStore.addEdge(autoConnectEdge)
}
return
}
// Generate operation ID for queue tracking
const operationId = crypto.randomUUID()
// Add to queue for retry mechanism
addToQueue({
id: operationId,
operation: {
operation: 'add',
target: 'block',
payload: completeBlockData,
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})
// Apply locally first (immediate UI feedback)
workflowStore.addBlock(id, type, name, position, data, parentId, extent)
if (autoConnectEdge) {
workflowStore.addEdge(autoConnectEdge)
}
// Then broadcast to other clients with complete block data
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('add', 'block', completeBlockData)
}
return
}
@@ -420,7 +486,6 @@ export function useCollaborativeWorkflow() {
})
}
// Generate outputs using the same logic as the store
const outputs = resolveOutputType(blockConfig.outputs)
const completeBlockData = {
@@ -440,96 +505,107 @@ export function useCollaborativeWorkflow() {
autoConnectEdge, // Include edge data for atomic operation
}
// Apply locally first
// Skip if applying remote changes
if (isApplyingRemoteChange.current) return
// Generate operation ID
const operationId = crypto.randomUUID()
// Add to queue
addToQueue({
id: operationId,
operation: {
operation: 'add',
target: 'block',
payload: completeBlockData,
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})
// Apply locally
workflowStore.addBlock(id, type, name, position, data, parentId, extent)
if (autoConnectEdge) {
workflowStore.addEdge(autoConnectEdge)
}
// Then broadcast to other clients with complete block data
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('add', 'block', completeBlockData)
}
},
[workflowStore, emitWorkflowOperation]
[workflowStore, emitWorkflowOperation, addToQueue, session?.user?.id]
)
const collaborativeRemoveBlock = useCallback(
(id: string) => {
// Apply locally first
workflowStore.removeBlock(id)
// Then broadcast to other clients
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('remove', 'block', { id })
}
executeQueuedOperation('remove', 'block', { id }, () => workflowStore.removeBlock(id))
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeUpdateBlockPosition = useCallback(
(id: string, position: Position) => {
// Apply locally first
workflowStore.updateBlockPosition(id, position)
// Then broadcast to other clients
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('update-position', 'block', { id, position })
}
executeQueuedDebouncedOperation('update-position', 'block', { id, position }, () =>
workflowStore.updateBlockPosition(id, position)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedDebouncedOperation, workflowStore]
)
const collaborativeUpdateBlockName = useCallback(
(id: string, name: string) => {
// Apply locally first
workflowStore.updateBlockName(id, name)
executeQueuedOperation('update-name', 'block', { id, name }, () => {
workflowStore.updateBlockName(id, name)
// Then broadcast to other clients
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('update-name', 'block', { id, name })
// Check for pending subblock updates from the store
// Handle pending subblock updates
const globalWindow = window as any
const pendingUpdates = globalWindow.__pendingSubblockUpdates
if (pendingUpdates && Array.isArray(pendingUpdates)) {
// Emit collaborative subblock updates for each changed subblock
// Queue each subblock update individually
for (const update of pendingUpdates) {
const { blockId, subBlockId, newValue } = update
emitSubblockUpdate(blockId, subBlockId, newValue)
const operationId = crypto.randomUUID()
addToQueue({
id: operationId,
operation: {
operation: 'subblock-update',
target: 'subblock',
payload: { blockId, subblockId: subBlockId, value: newValue },
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})
subBlockStore.setValue(blockId, subBlockId, newValue)
}
// Clear the pending updates
globalWindow.__pendingSubblockUpdates = undefined
}
}
})
},
[workflowStore, emitWorkflowOperation, emitSubblockUpdate]
[
executeQueuedOperation,
workflowStore,
addToQueue,
subBlockStore,
activeWorkflowId,
session?.user?.id,
]
)
const collaborativeToggleBlockEnabled = useCallback(
(id: string) => {
// Apply locally first
workflowStore.toggleBlockEnabled(id)
// Then broadcast to other clients
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('toggle-enabled', 'block', { id })
}
executeQueuedOperation('toggle-enabled', 'block', { id }, () =>
workflowStore.toggleBlockEnabled(id)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeUpdateParentId = useCallback(
(id: string, parentId: string, extent: 'parent') => {
// Apply locally first
workflowStore.updateParentId(id, parentId, extent)
// Then broadcast to other clients
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('update-parent', 'block', { id, parentId, extent })
}
executeQueuedOperation('update-parent', 'block', { id, parentId, extent }, () =>
workflowStore.updateParentId(id, parentId, extent)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeToggleBlockWide = useCallback(
@@ -541,61 +617,45 @@ export function useCollaborativeWorkflow() {
// Calculate the new isWide value
const newIsWide = !currentBlock.isWide
// Apply locally first
workflowStore.toggleBlockWide(id)
// Emit with the calculated new value (don't rely on async state update)
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('update-wide', 'block', { id, isWide: newIsWide })
}
executeQueuedOperation('update-wide', 'block', { id, isWide: newIsWide }, () =>
workflowStore.toggleBlockWide(id)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeToggleBlockAdvancedMode = useCallback(
(id: string) => {
// Get the current state before toggling
const currentBlock = workflowStore.blocks[id]
if (!currentBlock) return
// Calculate the new advancedMode value
const newAdvancedMode = !currentBlock.advancedMode
// Apply locally first
workflowStore.toggleBlockAdvancedMode(id)
// Emit with the calculated new value (don't rely on async state update)
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('update-advanced-mode', 'block', {
id,
advancedMode: newAdvancedMode,
})
}
executeQueuedOperation(
'update-advanced-mode',
'block',
{ id, advancedMode: newAdvancedMode },
() => workflowStore.toggleBlockAdvancedMode(id)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeToggleBlockHandles = useCallback(
(id: string) => {
// Get the current state before toggling
const currentBlock = workflowStore.blocks[id]
if (!currentBlock) return
// Calculate the new horizontalHandles value
const newHorizontalHandles = !currentBlock.horizontalHandles
// Apply locally first
workflowStore.toggleBlockHandles(id)
// Emit with the calculated new value (don't rely on async state update)
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('toggle-handles', 'block', {
id,
horizontalHandles: newHorizontalHandles,
})
}
executeQueuedOperation(
'toggle-handles',
'block',
{ id, horizontalHandles: newHorizontalHandles },
() => workflowStore.toggleBlockHandles(id)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeDuplicateBlock = useCallback(
@@ -610,7 +670,6 @@ export function useCollaborativeWorkflow() {
y: sourceBlock.position.y + 20,
}
// Generate new name with numbering
const match = sourceBlock.name.match(/(.*?)(\d+)?$/)
const newName = match?.[2]
? `${match[1]}${Number.parseInt(match[2]) + 1}`
@@ -634,7 +693,6 @@ export function useCollaborativeWorkflow() {
height: sourceBlock.height || 0,
}
// Apply locally first using addBlock to ensure consistent IDs
workflowStore.addBlock(
newId,
sourceBlock.type,
@@ -645,7 +703,6 @@ export function useCollaborativeWorkflow() {
sourceBlock.data?.extent
)
// Copy subblock values to the new block
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (activeWorkflowId) {
const subBlockValues =
@@ -661,67 +718,84 @@ export function useCollaborativeWorkflow() {
}))
}
// Then broadcast to other clients
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('duplicate', 'block', duplicatedBlockData)
}
executeQueuedOperation('duplicate', 'block', duplicatedBlockData, () => {
workflowStore.addBlock(
newId,
sourceBlock.type,
newName,
offsetPosition,
sourceBlock.data ? JSON.parse(JSON.stringify(sourceBlock.data)) : {}
)
const subBlockValues = subBlockStore.workflowValues[activeWorkflowId || '']?.[sourceId]
if (subBlockValues && activeWorkflowId) {
Object.entries(subBlockValues).forEach(([subblockId, value]) => {
subBlockStore.setValue(newId, subblockId, value)
})
}
})
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore, subBlockStore, activeWorkflowId]
)
const collaborativeAddEdge = useCallback(
(edge: Edge) => {
// Apply locally first
workflowStore.addEdge(edge)
// Then broadcast to other clients
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('add', 'edge', edge)
}
executeQueuedOperation('add', 'edge', edge, () => workflowStore.addEdge(edge))
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeRemoveEdge = useCallback(
(edgeId: string) => {
// Apply locally first
workflowStore.removeEdge(edgeId)
// Then broadcast to other clients
if (!isApplyingRemoteChange.current) {
emitWorkflowOperation('remove', 'edge', { id: edgeId })
}
executeQueuedOperation('remove', 'edge', { id: edgeId }, () =>
workflowStore.removeEdge(edgeId)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeSetSubblockValue = useCallback(
(blockId: string, subblockId: string, value: any) => {
// Apply locally first - the store automatically uses the active workflow ID
subBlockStore.setValue(blockId, subblockId, value)
if (isApplyingRemoteChange.current) return
// Then broadcast to other clients, but only if we have a valid workflow connection
if (
!isApplyingRemoteChange.current &&
isConnected &&
currentWorkflowId &&
activeWorkflowId === currentWorkflowId
) {
emitSubblockUpdate(blockId, subblockId, value)
} else if (!isConnected || !currentWorkflowId || activeWorkflowId !== currentWorkflowId) {
logger.debug('Skipping subblock update broadcast', {
isConnected,
if (!currentWorkflowId || activeWorkflowId !== currentWorkflowId) {
logger.debug('Skipping subblock update - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
blockId,
subblockId,
})
return
}
// Generate operation ID for queue tracking
const operationId = crypto.randomUUID()
// Add to queue for retry mechanism
addToQueue({
id: operationId,
operation: {
operation: 'subblock-update',
target: 'subblock',
payload: { blockId, subblockId, value },
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})
// Apply locally first (immediate UI feedback)
subBlockStore.setValue(blockId, subblockId, value)
},
[subBlockStore, emitSubblockUpdate, isConnected, currentWorkflowId, activeWorkflowId]
[
subBlockStore,
emitSubblockUpdate,
currentWorkflowId,
activeWorkflowId,
addToQueue,
session?.user?.id,
]
)
// Collaborative loop/parallel configuration updates
const collaborativeUpdateLoopCount = useCallback(
(loopId: string, count: number) => {
// Get current state BEFORE making changes
@@ -737,228 +811,174 @@ export function useCollaborativeWorkflow() {
const currentLoopType = currentBlock.data?.loopType || 'for'
const currentCollection = currentBlock.data?.collection || ''
// Apply local change
workflowStore.updateLoopCount(loopId, count)
// Emit subflow update operation with calculated values
if (!isApplyingRemoteChange.current) {
const config = {
id: loopId,
nodes: childNodes,
iterations: count,
loopType: currentLoopType,
forEachItems: currentCollection,
}
emitWorkflowOperation('update', 'subflow', {
id: loopId,
type: 'loop',
config,
})
const config = {
id: loopId,
nodes: childNodes,
iterations: count,
loopType: currentLoopType,
forEachItems: currentCollection,
}
executeQueuedOperation('update', 'subflow', { id: loopId, type: 'loop', config }, () =>
workflowStore.updateLoopCount(loopId, count)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeUpdateLoopType = useCallback(
(loopId: string, loopType: 'for' | 'forEach') => {
// Get current state BEFORE making changes
const currentBlock = workflowStore.blocks[loopId]
if (!currentBlock || currentBlock.type !== 'loop') return
// Find child nodes before state changes
const childNodes = Object.values(workflowStore.blocks)
.filter((b) => b.data?.parentId === loopId)
.map((b) => b.id)
// Get current values to preserve them
const currentIterations = currentBlock.data?.count || 5
const currentCollection = currentBlock.data?.collection || ''
// Apply local change
workflowStore.updateLoopType(loopId, loopType)
// Emit subflow update operation with calculated values
if (!isApplyingRemoteChange.current) {
const config = {
id: loopId,
nodes: childNodes,
iterations: currentIterations,
loopType,
forEachItems: currentCollection,
}
emitWorkflowOperation('update', 'subflow', {
id: loopId,
type: 'loop',
config,
})
const config = {
id: loopId,
nodes: childNodes,
iterations: currentIterations,
loopType,
forEachItems: currentCollection,
}
executeQueuedOperation('update', 'subflow', { id: loopId, type: 'loop', config }, () =>
workflowStore.updateLoopType(loopId, loopType)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeUpdateLoopCollection = useCallback(
(loopId: string, collection: string) => {
// Get current state BEFORE making changes
const currentBlock = workflowStore.blocks[loopId]
if (!currentBlock || currentBlock.type !== 'loop') return
// Find child nodes before state changes
const childNodes = Object.values(workflowStore.blocks)
.filter((b) => b.data?.parentId === loopId)
.map((b) => b.id)
// Get current values to preserve them
const currentIterations = currentBlock.data?.count || 5
const currentLoopType = currentBlock.data?.loopType || 'for'
// Apply local change
workflowStore.updateLoopCollection(loopId, collection)
// Emit subflow update operation with calculated values
if (!isApplyingRemoteChange.current) {
const config = {
id: loopId,
nodes: childNodes,
iterations: currentIterations,
loopType: currentLoopType,
forEachItems: collection,
}
emitWorkflowOperation('update', 'subflow', {
id: loopId,
type: 'loop',
config,
})
const config = {
id: loopId,
nodes: childNodes,
iterations: currentIterations,
loopType: currentLoopType,
forEachItems: collection,
}
executeQueuedOperation('update', 'subflow', { id: loopId, type: 'loop', config }, () =>
workflowStore.updateLoopCollection(loopId, collection)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeUpdateParallelCount = useCallback(
(parallelId: string, count: number) => {
// Get current state BEFORE making changes
const currentBlock = workflowStore.blocks[parallelId]
if (!currentBlock || currentBlock.type !== 'parallel') return
// Find child nodes before state changes
const childNodes = Object.values(workflowStore.blocks)
.filter((b) => b.data?.parentId === parallelId)
.map((b) => b.id)
// Get current values to preserve them
const currentDistribution = currentBlock.data?.collection || ''
const currentParallelType = currentBlock.data?.parallelType || 'collection'
// Apply local change
workflowStore.updateParallelCount(parallelId, count)
// Emit subflow update operation with calculated values
if (!isApplyingRemoteChange.current) {
const config = {
id: parallelId,
nodes: childNodes,
count: Math.max(1, Math.min(20, count)), // Clamp between 1-20
distribution: currentDistribution,
parallelType: currentParallelType,
}
emitWorkflowOperation('update', 'subflow', {
id: parallelId,
type: 'parallel',
config,
})
const config = {
id: parallelId,
nodes: childNodes,
count: Math.max(1, Math.min(20, count)), // Clamp between 1-20
distribution: currentDistribution,
parallelType: currentParallelType,
}
executeQueuedOperation(
'update',
'subflow',
{ id: parallelId, type: 'parallel', config },
() => workflowStore.updateParallelCount(parallelId, count)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeUpdateParallelCollection = useCallback(
(parallelId: string, collection: string) => {
// Get current state BEFORE making changes
const currentBlock = workflowStore.blocks[parallelId]
if (!currentBlock || currentBlock.type !== 'parallel') return
// Find child nodes before state changes
const childNodes = Object.values(workflowStore.blocks)
.filter((b) => b.data?.parentId === parallelId)
.map((b) => b.id)
// Get current values to preserve them
const currentCount = currentBlock.data?.count || 5
const currentParallelType = currentBlock.data?.parallelType || 'collection'
// Apply local change
workflowStore.updateParallelCollection(parallelId, collection)
// Emit subflow update operation with calculated values
if (!isApplyingRemoteChange.current) {
const config = {
id: parallelId,
nodes: childNodes,
count: currentCount,
distribution: collection,
parallelType: currentParallelType,
}
emitWorkflowOperation('update', 'subflow', {
id: parallelId,
type: 'parallel',
config,
})
const config = {
id: parallelId,
nodes: childNodes,
count: currentCount,
distribution: collection,
parallelType: currentParallelType,
}
executeQueuedOperation(
'update',
'subflow',
{ id: parallelId, type: 'parallel', config },
() => workflowStore.updateParallelCollection(parallelId, collection)
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
const collaborativeUpdateParallelType = useCallback(
(parallelId: string, parallelType: 'count' | 'collection') => {
// Get current state BEFORE making changes
const currentBlock = workflowStore.blocks[parallelId]
if (!currentBlock || currentBlock.type !== 'parallel') return
// Find child nodes before state changes
const childNodes = Object.values(workflowStore.blocks)
.filter((b) => b.data?.parentId === parallelId)
.map((b) => b.id)
// Calculate new values based on type change
let newCount = currentBlock.data?.count || 5
let newDistribution = currentBlock.data?.collection || ''
// Reset values based on type (same logic as the UI)
if (parallelType === 'count') {
newDistribution = ''
// Keep existing count
} else {
newCount = 1
newDistribution = newDistribution || ''
}
// Apply all changes locally first
workflowStore.updateParallelType(parallelId, parallelType)
workflowStore.updateParallelCount(parallelId, newCount)
workflowStore.updateParallelCollection(parallelId, newDistribution)
// Emit single subflow update with all changes
if (!isApplyingRemoteChange.current) {
const config = {
id: parallelId,
nodes: childNodes,
count: newCount,
distribution: newDistribution,
parallelType,
}
emitWorkflowOperation('update', 'subflow', {
id: parallelId,
type: 'parallel',
config,
})
const config = {
id: parallelId,
nodes: childNodes,
count: newCount,
distribution: newDistribution,
parallelType,
}
executeQueuedOperation(
'update',
'subflow',
{ id: parallelId, type: 'parallel', config },
() => {
workflowStore.updateParallelType(parallelId, parallelType)
workflowStore.updateParallelCount(parallelId, newCount)
workflowStore.updateParallelCollection(parallelId, newDistribution)
}
)
},
[workflowStore, emitWorkflowOperation]
[executeQueuedOperation, workflowStore]
)
return {
@@ -966,6 +986,7 @@ export function useCollaborativeWorkflow() {
isConnected,
currentWorkflowId,
presenceUsers,
hasOperationError,
// Workflow management
joinWorkflow,

View File

@@ -36,8 +36,11 @@ export function setupOperationsHandlers(
return
}
let operationId: string | undefined
try {
const validatedOperation = WorkflowOperationSchema.parse(data)
operationId = validatedOperation.operationId
const { operation, target, payload, timestamp } = validatedOperation
// Check operation permissions
@@ -100,16 +103,25 @@ export function setupOperationsHandlers(
userId: session.userId,
}).catch((error) => {
logger.error('Failed to persist position update:', error)
// Emit failure for position updates if operationId is provided
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: error instanceof Error ? error.message : 'Database persistence failed',
retryable: true,
})
}
})
room.lastModified = Date.now()
socket.emit('operation-confirmed', {
operation,
target,
operationId: broadcastData.metadata.operationId,
serverTimestamp: Date.now(),
})
// Emit confirmation if operationId is provided
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
serverTimestamp: Date.now(),
})
}
return // Early return for position updates
}
@@ -143,13 +155,26 @@ export function setupOperationsHandlers(
socket.to(workflowId).emit('workflow-operation', broadcastData)
socket.emit('operation-confirmed', {
operation,
target,
operationId: broadcastData.metadata.operationId,
serverTimestamp: Date.now(),
})
// Emit confirmation if operationId is provided
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
serverTimestamp: Date.now(),
})
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'
// Emit operation-failed for queue-tracked operations
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: errorMessage,
retryable: !(error instanceof ZodError), // Don't retry validation errors
})
}
// Also emit legacy operation-error for backward compatibility
if (error instanceof ZodError) {
socket.emit('operation-error', {
type: 'VALIDATION_ERROR',

View File

@@ -27,7 +27,7 @@ export function setupSubblocksHandlers(
return
}
const { blockId, subblockId, value, timestamp } = data
const { blockId, subblockId, value, timestamp, operationId } = data
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
@@ -117,15 +117,35 @@ export function setupSubblocksHandlers(
senderId: socket.id,
userId: session.userId,
})
// Emit confirmation if operationId is provided
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
serverTimestamp: Date.now(),
})
}
}
logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`)
} catch (error) {
logger.error('Error handling subblock update:', error)
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
// Emit operation-failed for queue-tracked operations
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: errorMessage,
retryable: true, // Subblock updates are generally retryable
})
}
// Also emit legacy operation-error for backward compatibility
socket.emit('operation-error', {
type: 'SUBBLOCK_UPDATE_FAILED',
message: `Failed to update subblock ${blockId}.${subblockId}: ${error instanceof Error ? error.message : 'Unknown error'}`,
message: `Failed to update subblock ${blockId}.${subblockId}: ${errorMessage}`,
operation: 'subblock-update',
target: 'subblock',
})

View File

@@ -48,6 +48,7 @@ export const BlockOperationSchema = z.object({
autoConnectEdge: AutoConnectEdgeSchema.optional(), // Add support for auto-connect edges
}),
timestamp: z.number(),
operationId: z.string().optional(),
})
export const EdgeOperationSchema = z.object({
@@ -61,6 +62,7 @@ export const EdgeOperationSchema = z.object({
targetHandle: z.string().nullable().optional(),
}),
timestamp: z.number(),
operationId: z.string().optional(),
})
export const SubflowOperationSchema = z.object({
@@ -72,6 +74,7 @@ export const SubflowOperationSchema = z.object({
config: z.record(z.any()).optional(),
}),
timestamp: z.number(),
operationId: z.string().optional(),
})
export const WorkflowOperationSchema = z.union([

View File

@@ -0,0 +1,302 @@
import { create } from 'zustand'
import { createLogger } from '@/lib/logs/console-logger'
const logger = createLogger('OperationQueue')
export interface QueuedOperation {
id: string
operation: {
operation: string
target: string
payload: any
}
workflowId: string
timestamp: number
retryCount: number
status: 'pending' | 'processing' | 'confirmed' | 'failed'
userId: string
}
interface OperationQueueState {
operations: QueuedOperation[]
isProcessing: boolean
hasOperationError: boolean
addToQueue: (operation: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>) => void
confirmOperation: (operationId: string) => void
failOperation: (operationId: string) => void
handleOperationTimeout: (operationId: string) => void
processNextOperation: () => void
triggerOfflineMode: () => void
clearError: () => void
}
const retryTimeouts = new Map<string, NodeJS.Timeout>()
const operationTimeouts = new Map<string, NodeJS.Timeout>()
let emitWorkflowOperation:
| ((operation: string, target: string, payload: any, operationId?: string) => void)
| null = null
let emitSubblockUpdate:
| ((blockId: string, subblockId: string, value: any, operationId?: string) => void)
| null = null
let currentWorkflowId: string | null = null
export function registerEmitFunctions(
workflowEmit: (operation: string, target: string, payload: any, operationId?: string) => void,
subblockEmit: (blockId: string, subblockId: string, value: any, operationId?: string) => void,
workflowId: string | null
) {
emitWorkflowOperation = workflowEmit
emitSubblockUpdate = subblockEmit
currentWorkflowId = workflowId
}
export const useOperationQueueStore = create<OperationQueueState>((set, get) => ({
operations: [],
isProcessing: false,
hasOperationError: false,
addToQueue: (operation) => {
const state = get()
// Check for duplicate operation ID
const existingOp = state.operations.find((op) => op.id === operation.id)
if (existingOp) {
logger.debug('Skipping duplicate operation ID', {
operationId: operation.id,
existingStatus: existingOp.status,
})
return
}
// Enhanced duplicate content check - especially important for block operations
const duplicateContent = state.operations.find(
(op) =>
op.operation.operation === operation.operation.operation &&
op.operation.target === operation.operation.target &&
op.workflowId === operation.workflowId &&
// For block operations, check the block ID specifically
((operation.operation.target === 'block' &&
op.operation.payload?.id === operation.operation.payload?.id) ||
// For subblock operations, check blockId and subblockId
(operation.operation.target === 'subblock' &&
op.operation.payload?.blockId === operation.operation.payload?.blockId &&
op.operation.payload?.subblockId === operation.operation.payload?.subblockId) ||
// For other operations, fall back to full payload comparison
(operation.operation.target !== 'block' &&
operation.operation.target !== 'subblock' &&
JSON.stringify(op.operation.payload) === JSON.stringify(operation.operation.payload)))
)
if (duplicateContent) {
logger.debug('Skipping duplicate operation content', {
operationId: operation.id,
existingOperationId: duplicateContent.id,
operation: operation.operation.operation,
target: operation.operation.target,
existingStatus: duplicateContent.status,
payload:
operation.operation.target === 'block'
? { id: operation.operation.payload?.id }
: operation.operation.payload,
})
return
}
const queuedOp: QueuedOperation = {
...operation,
timestamp: Date.now(),
retryCount: 0,
status: 'pending',
}
logger.debug('Adding operation to queue', {
operationId: queuedOp.id,
operation: queuedOp.operation,
})
set((state) => ({
operations: [...state.operations, queuedOp],
}))
// Start processing if not already processing
get().processNextOperation()
},
confirmOperation: (operationId) => {
const state = get()
const newOperations = state.operations.filter((op) => op.id !== operationId)
const retryTimeout = retryTimeouts.get(operationId)
if (retryTimeout) {
clearTimeout(retryTimeout)
retryTimeouts.delete(operationId)
}
const operationTimeout = operationTimeouts.get(operationId)
if (operationTimeout) {
clearTimeout(operationTimeout)
operationTimeouts.delete(operationId)
}
logger.debug('Removing operation from queue', {
operationId,
remainingOps: newOperations.length,
})
set({ operations: newOperations, isProcessing: false })
// Process next operation in queue
get().processNextOperation()
},
failOperation: (operationId: string) => {
const state = get()
const operation = state.operations.find((op) => op.id === operationId)
if (!operation) {
logger.warn('Attempted to fail operation that does not exist in queue', { operationId })
return
}
const operationTimeout = operationTimeouts.get(operationId)
if (operationTimeout) {
clearTimeout(operationTimeout)
operationTimeouts.delete(operationId)
}
if (operation.retryCount < 3) {
const newRetryCount = operation.retryCount + 1
const delay = 2 ** newRetryCount * 1000 // 2s, 4s, 8s
logger.warn(`Operation failed, retrying in ${delay}ms (attempt ${newRetryCount}/3)`, {
operationId,
retryCount: newRetryCount,
})
// Update retry count and mark as pending for retry
set((state) => ({
operations: state.operations.map((op) =>
op.id === operationId
? { ...op, retryCount: newRetryCount, status: 'pending' as const }
: op
),
isProcessing: false, // Allow processing to continue
}))
// Schedule retry
const timeout = setTimeout(() => {
retryTimeouts.delete(operationId)
get().processNextOperation()
}, delay)
retryTimeouts.set(operationId, timeout)
} else {
logger.error('Operation failed after max retries, triggering offline mode', { operationId })
get().triggerOfflineMode()
}
},
handleOperationTimeout: (operationId: string) => {
const state = get()
const operation = state.operations.find((op) => op.id === operationId)
if (!operation) {
logger.debug('Ignoring timeout for operation not in queue', { operationId })
return
}
logger.warn('Operation timeout detected - treating as failure to trigger retries', {
operationId,
})
get().failOperation(operationId)
},
processNextOperation: () => {
const state = get()
// Don't process if already processing
if (state.isProcessing) {
return
}
// Find the first pending operation (FIFO - first in, first out)
const nextOperation = state.operations.find((op) => op.status === 'pending')
if (!nextOperation) {
return // No pending operations
}
// Mark as processing
set((state) => ({
operations: state.operations.map((op) =>
op.id === nextOperation.id ? { ...op, status: 'processing' as const } : op
),
isProcessing: true,
}))
logger.debug('Processing operation sequentially', {
operationId: nextOperation.id,
operation: nextOperation.operation,
retryCount: nextOperation.retryCount,
})
// Emit the operation
const { operation: op, target, payload } = nextOperation.operation
if (op === 'subblock-update' && target === 'subblock') {
if (emitSubblockUpdate) {
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, nextOperation.id)
}
} else {
if (emitWorkflowOperation) {
emitWorkflowOperation(op, target, payload, nextOperation.id)
}
}
// Create operation timeout
const timeoutId = setTimeout(() => {
logger.warn('Operation timeout - no server response after 5 seconds', {
operationId: nextOperation.id,
})
operationTimeouts.delete(nextOperation.id)
get().handleOperationTimeout(nextOperation.id)
}, 5000)
operationTimeouts.set(nextOperation.id, timeoutId)
},
triggerOfflineMode: () => {
logger.error('Operation failed after retries - triggering offline mode')
retryTimeouts.forEach((timeout) => clearTimeout(timeout))
retryTimeouts.clear()
operationTimeouts.forEach((timeout) => clearTimeout(timeout))
operationTimeouts.clear()
set({
operations: [],
isProcessing: false,
hasOperationError: true,
})
},
clearError: () => {
set({ hasOperationError: false })
},
}))
export function useOperationQueue() {
const store = useOperationQueueStore()
return {
queue: store.operations,
isProcessing: store.isProcessing,
hasOperationError: store.hasOperationError,
addToQueue: store.addToQueue,
confirmOperation: store.confirmOperation,
failOperation: store.failOperation,
processNextOperation: store.processNextOperation,
triggerOfflineMode: store.triggerOfflineMode,
clearError: store.clearError,
}
}