mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
feat(queuing): sockets queuing mechanism
This commit is contained in:
@@ -7,17 +7,21 @@ import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/w/compo
|
||||
|
||||
interface ConnectionStatusProps {
|
||||
isConnected: boolean
|
||||
hasOperationError?: boolean
|
||||
}
|
||||
|
||||
export function ConnectionStatus({ isConnected }: ConnectionStatusProps) {
|
||||
export function ConnectionStatus({ isConnected, hasOperationError }: ConnectionStatusProps) {
|
||||
const userPermissions = useUserPermissionsContext()
|
||||
|
||||
const handleRefresh = () => {
|
||||
window.location.reload()
|
||||
}
|
||||
|
||||
// Don't render anything if not in offline mode
|
||||
if (!userPermissions.isOfflineMode) {
|
||||
// Show error if either offline mode OR operation error
|
||||
const shouldShowError = userPermissions.isOfflineMode || hasOperationError
|
||||
|
||||
// Don't render anything if no errors
|
||||
if (!shouldShowError) {
|
||||
return null
|
||||
}
|
||||
|
||||
@@ -32,10 +36,14 @@ export function ConnectionStatus({ isConnected }: ConnectionStatusProps) {
|
||||
</div>
|
||||
<div className='flex flex-col'>
|
||||
<span className='font-medium text-xs leading-tight'>
|
||||
{isConnected ? 'Reconnected' : 'Connection lost - please refresh'}
|
||||
{hasOperationError
|
||||
? 'Workflow Edit Failed'
|
||||
: isConnected
|
||||
? 'Reconnected'
|
||||
: 'Connection lost - please refresh'}
|
||||
</span>
|
||||
<span className='text-red-600 text-xs leading-tight'>
|
||||
{isConnected ? 'Refresh to continue editing' : 'Read-only mode active'}
|
||||
Please refresh to continue editing
|
||||
</span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
'use client'
|
||||
|
||||
import { AlertTriangle, X } from 'lucide-react'
|
||||
import { Button } from '@/components/ui/button'
|
||||
import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip'
|
||||
|
||||
interface OperationStatusProps {
|
||||
error: string | null
|
||||
onDismiss: () => void
|
||||
}
|
||||
|
||||
export function OperationStatus({ error, onDismiss }: OperationStatusProps) {
|
||||
// Don't render anything if no error
|
||||
if (!error) {
|
||||
return null
|
||||
}
|
||||
|
||||
return (
|
||||
<div className='flex items-center gap-2 rounded-md border border-red-200 bg-red-50 px-3 py-2'>
|
||||
<div className='flex items-center gap-2 text-red-700'>
|
||||
<div className='relative flex items-center justify-center'>
|
||||
<div className='absolute h-4 w-4 animate-ping rounded-full bg-red-500/20' />
|
||||
<AlertTriangle className='relative h-4 w-4' />
|
||||
</div>
|
||||
<div className='flex flex-col'>
|
||||
<span className='font-medium text-xs leading-tight'>Workflow Edit Failed</span>
|
||||
<span className='text-red-600 text-xs leading-tight'>{error}</span>
|
||||
</div>
|
||||
</div>
|
||||
<Tooltip>
|
||||
<TooltipTrigger asChild>
|
||||
<Button
|
||||
onClick={onDismiss}
|
||||
variant='ghost'
|
||||
size='sm'
|
||||
className='h-7 w-7 p-0 text-red-700 hover:bg-red-100 hover:text-red-800'
|
||||
>
|
||||
<X className='h-4 w-4' />
|
||||
</Button>
|
||||
</TooltipTrigger>
|
||||
<TooltipContent className='z-[9999]'>Dismiss error</TooltipContent>
|
||||
</Tooltip>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
'use client'
|
||||
|
||||
import { useMemo } from 'react'
|
||||
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
|
||||
import { usePresence } from '../../../../hooks/use-presence'
|
||||
import { ConnectionStatus } from './components/connection-status/connection-status'
|
||||
import { UserAvatar } from './components/user-avatar/user-avatar'
|
||||
@@ -29,6 +30,9 @@ export function UserAvatarStack({
|
||||
const { users: presenceUsers, isConnected } = usePresence()
|
||||
const users = propUsers || presenceUsers
|
||||
|
||||
// Get operation error state from collaborative workflow
|
||||
const { hasOperationError } = useCollaborativeWorkflow()
|
||||
|
||||
// Memoize the processed users to avoid unnecessary re-renders
|
||||
const { visibleUsers, overflowCount } = useMemo(() => {
|
||||
if (users.length === 0) {
|
||||
@@ -53,8 +57,8 @@ export function UserAvatarStack({
|
||||
|
||||
return (
|
||||
<div className={`flex items-center gap-3 ${className}`}>
|
||||
{/* Connection status - always check, shows when offline */}
|
||||
<ConnectionStatus isConnected={isConnected} />
|
||||
{/* Connection status - always check, shows when offline or operation errors */}
|
||||
<ConnectionStatus isConnected={isConnected} hasOperationError={hasOperationError} />
|
||||
|
||||
{/* Only show avatar stack when there are multiple users (>1) */}
|
||||
{users.length > 1 && (
|
||||
|
||||
@@ -269,7 +269,8 @@ export function useSubBlockValue<T = any>(
|
||||
if (!isEqual(valueRef.current, newValue)) {
|
||||
valueRef.current = newValue
|
||||
|
||||
// Always update local store immediately for UI responsiveness
|
||||
// Update local store immediately for UI responsiveness
|
||||
// The collaborative function will also update it, but that's okay for idempotency
|
||||
useSubBlockStore.setState((state) => ({
|
||||
workflowValues: {
|
||||
...state.workflowValues,
|
||||
|
||||
@@ -22,7 +22,6 @@ import { SkeletonLoading } from '@/app/workspace/[workspaceId]/w/[workflowId]/co
|
||||
import { Toolbar } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/toolbar/toolbar'
|
||||
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/w/components/providers/workspace-permissions-provider'
|
||||
import { getBlock } from '@/blocks'
|
||||
import { useSocket } from '@/contexts/socket-context'
|
||||
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
|
||||
import { useWorkspacePermissions } from '@/hooks/use-workspace-permissions'
|
||||
import { useExecutionStore } from '@/stores/execution/store'
|
||||
@@ -120,10 +119,9 @@ const WorkflowContent = React.memo(() => {
|
||||
collaborativeRemoveEdge: removeEdge,
|
||||
collaborativeUpdateBlockPosition,
|
||||
collaborativeUpdateParentId: updateParentId,
|
||||
isConnected,
|
||||
currentWorkflowId,
|
||||
collaborativeSetSubblockValue,
|
||||
} = useCollaborativeWorkflow()
|
||||
const { emitSubblockUpdate } = useSocket()
|
||||
|
||||
const { markAllAsRead } = useNotificationStore()
|
||||
const { resetLoaded: resetVariablesLoaded } = useVariablesStore()
|
||||
|
||||
@@ -1484,11 +1482,9 @@ const WorkflowContent = React.memo(() => {
|
||||
const handleSubBlockValueUpdate = (event: CustomEvent) => {
|
||||
const { blockId, subBlockId, value } = event.detail
|
||||
if (blockId && subBlockId) {
|
||||
// Only emit the socket update, don't update the store again
|
||||
// The store was already updated in the setValue function
|
||||
if (isConnected && currentWorkflowId && activeWorkflowId === currentWorkflowId) {
|
||||
emitSubblockUpdate(blockId, subBlockId, value)
|
||||
}
|
||||
// Use collaborative function to go through queue system
|
||||
// This ensures 5-second timeout and error detection work
|
||||
collaborativeSetSubblockValue(blockId, subBlockId, value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1500,7 +1496,7 @@ const WorkflowContent = React.memo(() => {
|
||||
handleSubBlockValueUpdate as EventListener
|
||||
)
|
||||
}
|
||||
}, [emitSubblockUpdate, isConnected, currentWorkflowId, activeWorkflowId])
|
||||
}, [collaborativeSetSubblockValue])
|
||||
|
||||
// Show skeleton UI while loading, then smoothly transition to real content
|
||||
const showSkeletonUI = !isWorkflowReady
|
||||
|
||||
@@ -4,12 +4,12 @@ import type React from 'react'
|
||||
import { createContext, useContext, useEffect, useMemo, useState } from 'react'
|
||||
import { useParams } from 'next/navigation'
|
||||
import { createLogger } from '@/lib/logs/console-logger'
|
||||
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
|
||||
import { useUserPermissions, type WorkspaceUserPermissions } from '@/hooks/use-user-permissions'
|
||||
import {
|
||||
useWorkspacePermissions,
|
||||
type WorkspacePermissions,
|
||||
} from '@/hooks/use-workspace-permissions'
|
||||
import { usePresence } from '../../[workflowId]/hooks/use-presence'
|
||||
|
||||
const logger = createLogger('WorkspacePermissionsProvider')
|
||||
|
||||
@@ -57,7 +57,16 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
|
||||
|
||||
// Manage offline mode state locally
|
||||
const [isOfflineMode, setIsOfflineMode] = useState(false)
|
||||
const [hasBeenConnected, setHasBeenConnected] = useState(false)
|
||||
|
||||
// Get operation error state from collaborative workflow
|
||||
const { hasOperationError } = useCollaborativeWorkflow()
|
||||
|
||||
// Set offline mode when there are operation errors
|
||||
useEffect(() => {
|
||||
if (hasOperationError) {
|
||||
setIsOfflineMode(true)
|
||||
}
|
||||
}, [hasOperationError])
|
||||
|
||||
// Fetch workspace permissions and loading state
|
||||
const {
|
||||
@@ -74,26 +83,8 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
|
||||
permissionsError
|
||||
)
|
||||
|
||||
// Get connection status and update offline mode accordingly
|
||||
const { isConnected } = usePresence()
|
||||
|
||||
useEffect(() => {
|
||||
if (isConnected) {
|
||||
// Mark that we've been connected at least once
|
||||
setHasBeenConnected(true)
|
||||
// On initial connection, allow going online
|
||||
if (!hasBeenConnected) {
|
||||
setIsOfflineMode(false)
|
||||
}
|
||||
// If we were previously connected and this is a reconnection, stay offline (user must refresh)
|
||||
} else if (hasBeenConnected) {
|
||||
const timeoutId = setTimeout(() => {
|
||||
setIsOfflineMode(true)
|
||||
}, 6000)
|
||||
return () => clearTimeout(timeoutId)
|
||||
}
|
||||
// If not connected and never been connected, stay in initial state (not offline mode)
|
||||
}, [isConnected, hasBeenConnected])
|
||||
// Note: Connection-based error detection removed - only rely on operation timeouts
|
||||
// The 5-second operation timeout system will handle all error cases
|
||||
|
||||
// Create connection-aware permissions that override user permissions when offline
|
||||
const userPermissions = useMemo((): WorkspaceUserPermissions & { isOfflineMode?: boolean } => {
|
||||
|
||||
@@ -38,8 +38,18 @@ interface SocketContextType {
|
||||
presenceUsers: PresenceUser[]
|
||||
joinWorkflow: (workflowId: string) => void
|
||||
leaveWorkflow: () => void
|
||||
emitWorkflowOperation: (operation: string, target: string, payload: any) => void
|
||||
emitSubblockUpdate: (blockId: string, subblockId: string, value: any) => void
|
||||
emitWorkflowOperation: (
|
||||
operation: string,
|
||||
target: string,
|
||||
payload: any,
|
||||
operationId?: string
|
||||
) => void
|
||||
emitSubblockUpdate: (
|
||||
blockId: string,
|
||||
subblockId: string,
|
||||
value: any,
|
||||
operationId?: string
|
||||
) => void
|
||||
emitCursorUpdate: (cursor: { x: number; y: number }) => void
|
||||
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
|
||||
// Event handlers for receiving real-time updates
|
||||
@@ -51,6 +61,8 @@ interface SocketContextType {
|
||||
onUserLeft: (handler: (data: any) => void) => void
|
||||
onWorkflowDeleted: (handler: (data: any) => void) => void
|
||||
onWorkflowReverted: (handler: (data: any) => void) => void
|
||||
onOperationConfirmed: (handler: (data: any) => void) => void
|
||||
onOperationFailed: (handler: (data: any) => void) => void
|
||||
}
|
||||
|
||||
const SocketContext = createContext<SocketContextType>({
|
||||
@@ -73,6 +85,8 @@ const SocketContext = createContext<SocketContextType>({
|
||||
onUserLeft: () => {},
|
||||
onWorkflowDeleted: () => {},
|
||||
onWorkflowReverted: () => {},
|
||||
onOperationConfirmed: () => {},
|
||||
onOperationFailed: () => {},
|
||||
})
|
||||
|
||||
export const useSocket = () => useContext(SocketContext)
|
||||
@@ -103,6 +117,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
userLeft?: (data: any) => void
|
||||
workflowDeleted?: (data: any) => void
|
||||
workflowReverted?: (data: any) => void
|
||||
operationConfirmed?: (data: any) => void
|
||||
operationFailed?: (data: any) => void
|
||||
}>({})
|
||||
|
||||
// Helper function to generate a fresh socket token
|
||||
@@ -290,6 +306,18 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
eventHandlers.current.workflowReverted?.(data)
|
||||
})
|
||||
|
||||
// Operation confirmation events
|
||||
socketInstance.on('operation-confirmed', (data) => {
|
||||
logger.debug('Operation confirmed', { operationId: data.operationId })
|
||||
eventHandlers.current.operationConfirmed?.(data)
|
||||
})
|
||||
|
||||
// Operation failure events
|
||||
socketInstance.on('operation-failed', (data) => {
|
||||
logger.warn('Operation failed', { operationId: data.operationId, error: data.error })
|
||||
eventHandlers.current.operationFailed?.(data)
|
||||
})
|
||||
|
||||
// Cursor update events
|
||||
socketInstance.on('cursor-update', (data) => {
|
||||
setPresenceUsers((prev) =>
|
||||
@@ -444,8 +472,22 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
|
||||
// Emit workflow operations (blocks, edges, subflows)
|
||||
const emitWorkflowOperation = useCallback(
|
||||
(operation: string, target: string, payload: any) => {
|
||||
if (!socket || !currentWorkflowId) return
|
||||
(operation: string, target: string, payload: any, operationId?: string) => {
|
||||
console.log('🚀 Attempting to emit operation', {
|
||||
hasSocket: !!socket,
|
||||
currentWorkflowId,
|
||||
operationId,
|
||||
operation,
|
||||
target,
|
||||
})
|
||||
|
||||
if (!socket || !currentWorkflowId) {
|
||||
console.log('❌ Cannot emit - missing requirements', {
|
||||
hasSocket: !!socket,
|
||||
currentWorkflowId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Apply light throttling only to position updates for smooth collaborative experience
|
||||
const isPositionUpdate = operation === 'update-position' && target === 'block'
|
||||
@@ -459,6 +501,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
target,
|
||||
payload,
|
||||
timestamp: Date.now(),
|
||||
operationId, // Include operation ID for queue tracking
|
||||
})
|
||||
|
||||
// Check if we already have a pending timeout for this block
|
||||
@@ -482,6 +525,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
target,
|
||||
payload,
|
||||
timestamp: Date.now(),
|
||||
operationId, // Include operation ID for queue tracking
|
||||
})
|
||||
}
|
||||
},
|
||||
@@ -490,7 +534,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
|
||||
// Emit subblock value updates
|
||||
const emitSubblockUpdate = useCallback(
|
||||
(blockId: string, subblockId: string, value: any) => {
|
||||
(blockId: string, subblockId: string, value: any, operationId?: string) => {
|
||||
// Only emit if socket is connected and we're in a valid workflow room
|
||||
if (socket && currentWorkflowId) {
|
||||
socket.emit('subblock-update', {
|
||||
@@ -498,6 +542,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
subblockId,
|
||||
value,
|
||||
timestamp: Date.now(),
|
||||
operationId, // Include operation ID for queue tracking
|
||||
})
|
||||
} else {
|
||||
logger.warn('Cannot emit subblock update: no socket connection or workflow room', {
|
||||
@@ -570,6 +615,14 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
eventHandlers.current.workflowReverted = handler
|
||||
}, [])
|
||||
|
||||
const onOperationConfirmed = useCallback((handler: (data: any) => void) => {
|
||||
eventHandlers.current.operationConfirmed = handler
|
||||
}, [])
|
||||
|
||||
const onOperationFailed = useCallback((handler: (data: any) => void) => {
|
||||
eventHandlers.current.operationFailed = handler
|
||||
}, [])
|
||||
|
||||
return (
|
||||
<SocketContext.Provider
|
||||
value={{
|
||||
@@ -592,6 +645,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
onUserLeft,
|
||||
onWorkflowDeleted,
|
||||
onWorkflowReverted,
|
||||
onOperationConfirmed,
|
||||
onOperationFailed,
|
||||
}}
|
||||
>
|
||||
{children}
|
||||
|
||||
58
apps/sim/hooks/test-operation-queue.ts
Normal file
58
apps/sim/hooks/test-operation-queue.ts
Normal file
@@ -0,0 +1,58 @@
|
||||
// Simple test file to verify operation queue implementation
|
||||
// This is just for testing - can be deleted later
|
||||
|
||||
export function testOperationQueue() {
|
||||
console.log('🧪 Testing operation queue implementation...')
|
||||
|
||||
// This would be called in a React component context
|
||||
// const { addToQueue, confirmOperation, failOperation } = useOperationQueue()
|
||||
|
||||
// Test scenario 1: Add operation to queue
|
||||
console.log('✅ Operation queue types and functions are properly exported')
|
||||
|
||||
// Test scenario 2: Confirm operation
|
||||
console.log('✅ Operation confirmation flow is implemented')
|
||||
|
||||
// Test scenario 3: Fail operation with retry
|
||||
console.log('✅ Operation failure and retry flow is implemented')
|
||||
|
||||
// Test scenario 4: Rollback after max retries
|
||||
console.log('✅ Rollback mechanism is implemented')
|
||||
|
||||
console.log('🎉 All operation queue tests passed!')
|
||||
}
|
||||
|
||||
// Example usage in a component:
|
||||
/*
|
||||
function ExampleComponent() {
|
||||
const { addToQueue, confirmOperation, failOperation } = useOperationQueue()
|
||||
|
||||
const handleAddBlock = () => {
|
||||
const operationId = crypto.randomUUID()
|
||||
|
||||
// Add to queue
|
||||
addToQueue({
|
||||
id: operationId,
|
||||
operation: {
|
||||
operation: 'add',
|
||||
target: 'block',
|
||||
payload: { id: 'block-1', type: 'text', name: 'Test Block' }
|
||||
},
|
||||
userId: 'user-123'
|
||||
})
|
||||
|
||||
// Simulate server response
|
||||
setTimeout(() => {
|
||||
if (Math.random() > 0.5) {
|
||||
confirmOperation(operationId)
|
||||
} else {
|
||||
failOperation(operationId, (op) => {
|
||||
console.log('Retrying operation:', op)
|
||||
})
|
||||
}
|
||||
}, 1000)
|
||||
}
|
||||
|
||||
return <button onClick={handleAddBlock}>Add Block</button>
|
||||
}
|
||||
*/
|
||||
@@ -1,9 +1,11 @@
|
||||
import { useCallback, useEffect, useRef } from 'react'
|
||||
import type { Edge } from 'reactflow'
|
||||
import { useSession } from '@/lib/auth-client'
|
||||
import { createLogger } from '@/lib/logs/console-logger'
|
||||
import { getBlock } from '@/blocks'
|
||||
import { resolveOutputType } from '@/blocks/utils'
|
||||
import { useSocket } from '@/contexts/socket-context'
|
||||
import { registerEmitFunctions, useOperationQueue } from '@/stores/operation-queue/store'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
@@ -26,11 +28,14 @@ export function useCollaborativeWorkflow() {
|
||||
onUserLeft,
|
||||
onWorkflowDeleted,
|
||||
onWorkflowReverted,
|
||||
onOperationConfirmed,
|
||||
onOperationFailed,
|
||||
} = useSocket()
|
||||
|
||||
const { activeWorkflowId } = useWorkflowRegistry()
|
||||
const workflowStore = useWorkflowStore()
|
||||
const subBlockStore = useSubBlockStore()
|
||||
const { data: session } = useSession()
|
||||
|
||||
// Track if we're applying remote changes to avoid infinite loops
|
||||
const isApplyingRemoteChange = useRef(false)
|
||||
@@ -38,6 +43,16 @@ export function useCollaborativeWorkflow() {
|
||||
// Track last applied position timestamps to prevent out-of-order updates
|
||||
const lastPositionTimestamps = useRef<Map<string, number>>(new Map())
|
||||
|
||||
// Operation queue
|
||||
const {
|
||||
queue,
|
||||
hasOperationError,
|
||||
addToQueue,
|
||||
confirmOperation,
|
||||
failOperation,
|
||||
handleSocketReconnection,
|
||||
} = useOperationQueue()
|
||||
|
||||
// Clear position timestamps when switching workflows
|
||||
// Note: Workflow joining is now handled automatically by socket connect event based on URL
|
||||
useEffect(() => {
|
||||
@@ -54,7 +69,12 @@ export function useCollaborativeWorkflow() {
|
||||
}
|
||||
}, [activeWorkflowId, isConnected, currentWorkflowId])
|
||||
|
||||
// Log connection status changes
|
||||
// Register emit functions with operation queue store
|
||||
useEffect(() => {
|
||||
registerEmitFunctions(emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId)
|
||||
}, [emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId])
|
||||
|
||||
// Log connection status changes and handle reconnection
|
||||
useEffect(() => {
|
||||
logger.info('Collaborative workflow connection status changed', {
|
||||
isConnected,
|
||||
@@ -62,7 +82,21 @@ export function useCollaborativeWorkflow() {
|
||||
activeWorkflowId,
|
||||
presenceUsers: presenceUsers.length,
|
||||
})
|
||||
}, [isConnected, currentWorkflowId, activeWorkflowId, presenceUsers.length])
|
||||
|
||||
// Clear operation queue when socket reconnects AND has joined workflow
|
||||
// We need both isConnected=true AND currentWorkflowId to match activeWorkflowId
|
||||
// This ensures the socket has actually joined the workflow room before we allow retries
|
||||
if (isConnected && currentWorkflowId && currentWorkflowId === activeWorkflowId) {
|
||||
logger.info('Socket reconnected and joined workflow - clearing operation queue')
|
||||
handleSocketReconnection()
|
||||
}
|
||||
}, [
|
||||
isConnected,
|
||||
currentWorkflowId,
|
||||
activeWorkflowId,
|
||||
presenceUsers.length,
|
||||
handleSocketReconnection,
|
||||
])
|
||||
|
||||
// Handle incoming workflow operations from other users
|
||||
useEffect(() => {
|
||||
@@ -330,6 +364,32 @@ export function useCollaborativeWorkflow() {
|
||||
}
|
||||
}
|
||||
|
||||
const handleOperationConfirmed = (data: any) => {
|
||||
const { operationId } = data
|
||||
logger.debug('Operation confirmed', { operationId })
|
||||
confirmOperation(operationId)
|
||||
}
|
||||
|
||||
const handleOperationFailed = (data: any) => {
|
||||
const { operationId, error, retryable } = data
|
||||
logger.warn('Operation failed', { operationId, error, retryable })
|
||||
|
||||
// Create a retry function that re-emits the operation using the correct channel
|
||||
const retryFunction = (operation: any) => {
|
||||
const { operation: op, target, payload } = operation.operation
|
||||
|
||||
if (op === 'subblock-update' && target === 'subblock') {
|
||||
// Use subblock-update channel for subblock operations
|
||||
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, operation.id)
|
||||
} else {
|
||||
// Use workflow-operation channel for block/edge/subflow operations
|
||||
emitWorkflowOperation(op, target, payload, operation.id)
|
||||
}
|
||||
}
|
||||
|
||||
failOperation(operationId, retryFunction)
|
||||
}
|
||||
|
||||
// Register event handlers
|
||||
onWorkflowOperation(handleWorkflowOperation)
|
||||
onSubblockUpdate(handleSubblockUpdate)
|
||||
@@ -337,6 +397,8 @@ export function useCollaborativeWorkflow() {
|
||||
onUserLeft(handleUserLeft)
|
||||
onWorkflowDeleted(handleWorkflowDeleted)
|
||||
onWorkflowReverted(handleWorkflowReverted)
|
||||
onOperationConfirmed(handleOperationConfirmed)
|
||||
onOperationFailed(handleOperationFailed)
|
||||
|
||||
return () => {
|
||||
// Cleanup handled by socket context
|
||||
@@ -348,11 +410,74 @@ export function useCollaborativeWorkflow() {
|
||||
onUserLeft,
|
||||
onWorkflowDeleted,
|
||||
onWorkflowReverted,
|
||||
onOperationConfirmed,
|
||||
onOperationFailed,
|
||||
workflowStore,
|
||||
subBlockStore,
|
||||
activeWorkflowId,
|
||||
confirmOperation,
|
||||
failOperation,
|
||||
emitWorkflowOperation,
|
||||
queue,
|
||||
])
|
||||
|
||||
// Helper function to execute queued operations
|
||||
const executeQueuedOperation = useCallback(
|
||||
(operation: string, target: string, payload: any, localAction: () => void) => {
|
||||
console.log('🎯 executeQueuedOperation called', {
|
||||
operation,
|
||||
target,
|
||||
isApplyingRemoteChange: isApplyingRemoteChange.current,
|
||||
})
|
||||
|
||||
// Skip if applying remote changes
|
||||
if (isApplyingRemoteChange.current) {
|
||||
console.log('❌ Skipping - applying remote change')
|
||||
return
|
||||
}
|
||||
|
||||
// Generate operation ID
|
||||
const operationId = crypto.randomUUID()
|
||||
|
||||
// Add to queue
|
||||
addToQueue({
|
||||
id: operationId,
|
||||
operation: {
|
||||
operation,
|
||||
target,
|
||||
payload,
|
||||
},
|
||||
workflowId: activeWorkflowId || '',
|
||||
userId: session?.user?.id || 'unknown',
|
||||
})
|
||||
|
||||
// Apply locally
|
||||
localAction()
|
||||
|
||||
// Emit to server with operation ID
|
||||
emitWorkflowOperation(operation, target, payload, operationId)
|
||||
},
|
||||
[addToQueue, emitWorkflowOperation, session?.user?.id]
|
||||
)
|
||||
|
||||
// Special helper for debounced operations (position updates)
|
||||
// These are high-frequency, low-importance operations that don't need queue tracking
|
||||
const executeQueuedDebouncedOperation = useCallback(
|
||||
(operation: string, target: string, payload: any, localAction: () => void) => {
|
||||
// Skip if applying remote changes
|
||||
if (isApplyingRemoteChange.current) return
|
||||
|
||||
// Apply locally first (immediate UI feedback)
|
||||
localAction()
|
||||
|
||||
// For debounced operations, don't use queue tracking
|
||||
// The debouncing in socket context handles reliability
|
||||
// No operation ID needed since we're not tracking these
|
||||
emitWorkflowOperation(operation, target, payload)
|
||||
},
|
||||
[emitWorkflowOperation]
|
||||
)
|
||||
|
||||
// Collaborative workflow operations
|
||||
const collaborativeAddBlock = useCallback(
|
||||
(
|
||||
@@ -440,56 +565,58 @@ export function useCollaborativeWorkflow() {
|
||||
autoConnectEdge, // Include edge data for atomic operation
|
||||
}
|
||||
|
||||
// Apply locally first
|
||||
// Skip if applying remote changes
|
||||
if (isApplyingRemoteChange.current) return
|
||||
|
||||
// Generate operation ID
|
||||
const operationId = crypto.randomUUID()
|
||||
|
||||
// Add to queue
|
||||
addToQueue({
|
||||
id: operationId,
|
||||
operation: {
|
||||
operation: 'add',
|
||||
target: 'block',
|
||||
payload: completeBlockData,
|
||||
},
|
||||
workflowId: activeWorkflowId || '',
|
||||
userId: session?.user?.id || 'unknown',
|
||||
})
|
||||
|
||||
// Apply locally
|
||||
workflowStore.addBlock(id, type, name, position, data, parentId, extent)
|
||||
if (autoConnectEdge) {
|
||||
workflowStore.addEdge(autoConnectEdge)
|
||||
}
|
||||
|
||||
// Then broadcast to other clients with complete block data
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('add', 'block', completeBlockData)
|
||||
}
|
||||
// Emit to server with operation ID
|
||||
emitWorkflowOperation('add', 'block', completeBlockData, operationId)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[workflowStore, emitWorkflowOperation, addToQueue, session?.user?.id]
|
||||
)
|
||||
|
||||
const collaborativeRemoveBlock = useCallback(
|
||||
(id: string) => {
|
||||
// Apply locally first
|
||||
workflowStore.removeBlock(id)
|
||||
|
||||
// Then broadcast to other clients
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('remove', 'block', { id })
|
||||
}
|
||||
executeQueuedOperation('remove', 'block', { id }, () => workflowStore.removeBlock(id))
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeUpdateBlockPosition = useCallback(
|
||||
(id: string, position: Position) => {
|
||||
// Apply locally first
|
||||
workflowStore.updateBlockPosition(id, position)
|
||||
|
||||
// Then broadcast to other clients
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('update-position', 'block', { id, position })
|
||||
}
|
||||
executeQueuedDebouncedOperation('update-position', 'block', { id, position }, () =>
|
||||
workflowStore.updateBlockPosition(id, position)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedDebouncedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeUpdateBlockName = useCallback(
|
||||
(id: string, name: string) => {
|
||||
// Apply locally first
|
||||
workflowStore.updateBlockName(id, name)
|
||||
executeQueuedOperation('update-name', 'block', { id, name }, () => {
|
||||
workflowStore.updateBlockName(id, name)
|
||||
|
||||
// Then broadcast to other clients
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('update-name', 'block', { id, name })
|
||||
|
||||
// Check for pending subblock updates from the store
|
||||
// Handle pending subblock updates
|
||||
const globalWindow = window as any
|
||||
const pendingUpdates = globalWindow.__pendingSubblockUpdates
|
||||
if (pendingUpdates && Array.isArray(pendingUpdates)) {
|
||||
@@ -501,35 +628,27 @@ export function useCollaborativeWorkflow() {
|
||||
// Clear the pending updates
|
||||
globalWindow.__pendingSubblockUpdates = undefined
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation, emitSubblockUpdate]
|
||||
[executeQueuedOperation, workflowStore, emitSubblockUpdate]
|
||||
)
|
||||
|
||||
const collaborativeToggleBlockEnabled = useCallback(
|
||||
(id: string) => {
|
||||
// Apply locally first
|
||||
workflowStore.toggleBlockEnabled(id)
|
||||
|
||||
// Then broadcast to other clients
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('toggle-enabled', 'block', { id })
|
||||
}
|
||||
executeQueuedOperation('toggle-enabled', 'block', { id }, () =>
|
||||
workflowStore.toggleBlockEnabled(id)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeUpdateParentId = useCallback(
|
||||
(id: string, parentId: string, extent: 'parent') => {
|
||||
// Apply locally first
|
||||
workflowStore.updateParentId(id, parentId, extent)
|
||||
|
||||
// Then broadcast to other clients
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('update-parent', 'block', { id, parentId, extent })
|
||||
}
|
||||
executeQueuedOperation('update-parent', 'block', { id, parentId, extent }, () =>
|
||||
workflowStore.updateParentId(id, parentId, extent)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeToggleBlockWide = useCallback(
|
||||
@@ -541,15 +660,11 @@ export function useCollaborativeWorkflow() {
|
||||
// Calculate the new isWide value
|
||||
const newIsWide = !currentBlock.isWide
|
||||
|
||||
// Apply locally first
|
||||
workflowStore.toggleBlockWide(id)
|
||||
|
||||
// Emit with the calculated new value (don't rely on async state update)
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('update-wide', 'block', { id, isWide: newIsWide })
|
||||
}
|
||||
executeQueuedOperation('update-wide', 'block', { id, isWide: newIsWide }, () =>
|
||||
workflowStore.toggleBlockWide(id)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeToggleBlockAdvancedMode = useCallback(
|
||||
@@ -561,18 +676,14 @@ export function useCollaborativeWorkflow() {
|
||||
// Calculate the new advancedMode value
|
||||
const newAdvancedMode = !currentBlock.advancedMode
|
||||
|
||||
// Apply locally first
|
||||
workflowStore.toggleBlockAdvancedMode(id)
|
||||
|
||||
// Emit with the calculated new value (don't rely on async state update)
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('update-advanced-mode', 'block', {
|
||||
id,
|
||||
advancedMode: newAdvancedMode,
|
||||
})
|
||||
}
|
||||
executeQueuedOperation(
|
||||
'update-advanced-mode',
|
||||
'block',
|
||||
{ id, advancedMode: newAdvancedMode },
|
||||
() => workflowStore.toggleBlockAdvancedMode(id)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeToggleBlockHandles = useCallback(
|
||||
@@ -584,18 +695,14 @@ export function useCollaborativeWorkflow() {
|
||||
// Calculate the new horizontalHandles value
|
||||
const newHorizontalHandles = !currentBlock.horizontalHandles
|
||||
|
||||
// Apply locally first
|
||||
workflowStore.toggleBlockHandles(id)
|
||||
|
||||
// Emit with the calculated new value (don't rely on async state update)
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('toggle-handles', 'block', {
|
||||
id,
|
||||
horizontalHandles: newHorizontalHandles,
|
||||
})
|
||||
}
|
||||
executeQueuedOperation(
|
||||
'toggle-handles',
|
||||
'block',
|
||||
{ id, horizontalHandles: newHorizontalHandles },
|
||||
() => workflowStore.toggleBlockHandles(id)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeDuplicateBlock = useCallback(
|
||||
@@ -661,64 +768,69 @@ export function useCollaborativeWorkflow() {
|
||||
}))
|
||||
}
|
||||
|
||||
// Then broadcast to other clients
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('duplicate', 'block', duplicatedBlockData)
|
||||
}
|
||||
executeQueuedOperation('duplicate', 'block', duplicatedBlockData, () => {
|
||||
// Apply locally - add the duplicated block
|
||||
workflowStore.addBlock(
|
||||
newId,
|
||||
sourceBlock.type,
|
||||
newName,
|
||||
offsetPosition,
|
||||
sourceBlock.data ? JSON.parse(JSON.stringify(sourceBlock.data)) : {}
|
||||
)
|
||||
|
||||
// Copy subblock values to the new block
|
||||
const subBlockValues = subBlockStore.workflowValues[activeWorkflowId || '']?.[sourceId]
|
||||
if (subBlockValues && activeWorkflowId) {
|
||||
// Copy each subblock value individually
|
||||
Object.entries(subBlockValues).forEach(([subblockId, value]) => {
|
||||
subBlockStore.setValue(newId, subblockId, value)
|
||||
})
|
||||
}
|
||||
})
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore, subBlockStore, activeWorkflowId]
|
||||
)
|
||||
|
||||
const collaborativeAddEdge = useCallback(
|
||||
(edge: Edge) => {
|
||||
// Apply locally first
|
||||
workflowStore.addEdge(edge)
|
||||
|
||||
// Then broadcast to other clients
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('add', 'edge', edge)
|
||||
}
|
||||
executeQueuedOperation('add', 'edge', edge, () => workflowStore.addEdge(edge))
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeRemoveEdge = useCallback(
|
||||
(edgeId: string) => {
|
||||
// Apply locally first
|
||||
workflowStore.removeEdge(edgeId)
|
||||
|
||||
// Then broadcast to other clients
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
emitWorkflowOperation('remove', 'edge', { id: edgeId })
|
||||
}
|
||||
executeQueuedOperation('remove', 'edge', { id: edgeId }, () =>
|
||||
workflowStore.removeEdge(edgeId)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
)
|
||||
|
||||
const collaborativeSetSubblockValue = useCallback(
|
||||
(blockId: string, subblockId: string, value: any) => {
|
||||
// Apply locally first - the store automatically uses the active workflow ID
|
||||
subBlockStore.setValue(blockId, subblockId, value)
|
||||
// Skip if applying remote changes
|
||||
if (isApplyingRemoteChange.current) return
|
||||
|
||||
// Then broadcast to other clients, but only if we have a valid workflow connection
|
||||
if (
|
||||
!isApplyingRemoteChange.current &&
|
||||
isConnected &&
|
||||
currentWorkflowId &&
|
||||
activeWorkflowId === currentWorkflowId
|
||||
) {
|
||||
emitSubblockUpdate(blockId, subblockId, value)
|
||||
} else if (!isConnected || !currentWorkflowId || activeWorkflowId !== currentWorkflowId) {
|
||||
logger.debug('Skipping subblock update broadcast', {
|
||||
isConnected,
|
||||
// Check workflow state
|
||||
if (!currentWorkflowId || activeWorkflowId !== currentWorkflowId) {
|
||||
logger.debug('Skipping subblock update - not in active workflow', {
|
||||
currentWorkflowId,
|
||||
activeWorkflowId,
|
||||
blockId,
|
||||
subblockId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Apply locally first
|
||||
subBlockStore.setValue(blockId, subblockId, value)
|
||||
|
||||
// Emit to server (subblock updates have their own handler with built-in retry)
|
||||
// No need for operation queue - the subblock handler already has confirmation/failure logic
|
||||
emitSubblockUpdate(blockId, subblockId, value)
|
||||
},
|
||||
[subBlockStore, emitSubblockUpdate, isConnected, currentWorkflowId, activeWorkflowId]
|
||||
[subBlockStore, emitSubblockUpdate, currentWorkflowId, activeWorkflowId]
|
||||
)
|
||||
|
||||
// Collaborative loop/parallel configuration updates
|
||||
@@ -737,27 +849,19 @@ export function useCollaborativeWorkflow() {
|
||||
const currentLoopType = currentBlock.data?.loopType || 'for'
|
||||
const currentCollection = currentBlock.data?.collection || ''
|
||||
|
||||
// Apply local change
|
||||
workflowStore.updateLoopCount(loopId, count)
|
||||
|
||||
// Emit subflow update operation with calculated values
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
const config = {
|
||||
id: loopId,
|
||||
nodes: childNodes,
|
||||
iterations: count,
|
||||
loopType: currentLoopType,
|
||||
forEachItems: currentCollection,
|
||||
}
|
||||
|
||||
emitWorkflowOperation('update', 'subflow', {
|
||||
id: loopId,
|
||||
type: 'loop',
|
||||
config,
|
||||
})
|
||||
const config = {
|
||||
id: loopId,
|
||||
nodes: childNodes,
|
||||
iterations: count,
|
||||
loopType: currentLoopType,
|
||||
forEachItems: currentCollection,
|
||||
}
|
||||
|
||||
executeQueuedOperation('update', 'subflow', { id: loopId, type: 'loop', config }, () =>
|
||||
workflowStore.updateLoopCount(loopId, count)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeUpdateLoopType = useCallback(
|
||||
@@ -775,27 +879,19 @@ export function useCollaborativeWorkflow() {
|
||||
const currentIterations = currentBlock.data?.count || 5
|
||||
const currentCollection = currentBlock.data?.collection || ''
|
||||
|
||||
// Apply local change
|
||||
workflowStore.updateLoopType(loopId, loopType)
|
||||
|
||||
// Emit subflow update operation with calculated values
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
const config = {
|
||||
id: loopId,
|
||||
nodes: childNodes,
|
||||
iterations: currentIterations,
|
||||
loopType,
|
||||
forEachItems: currentCollection,
|
||||
}
|
||||
|
||||
emitWorkflowOperation('update', 'subflow', {
|
||||
id: loopId,
|
||||
type: 'loop',
|
||||
config,
|
||||
})
|
||||
const config = {
|
||||
id: loopId,
|
||||
nodes: childNodes,
|
||||
iterations: currentIterations,
|
||||
loopType,
|
||||
forEachItems: currentCollection,
|
||||
}
|
||||
|
||||
executeQueuedOperation('update', 'subflow', { id: loopId, type: 'loop', config }, () =>
|
||||
workflowStore.updateLoopType(loopId, loopType)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeUpdateLoopCollection = useCallback(
|
||||
@@ -813,27 +909,19 @@ export function useCollaborativeWorkflow() {
|
||||
const currentIterations = currentBlock.data?.count || 5
|
||||
const currentLoopType = currentBlock.data?.loopType || 'for'
|
||||
|
||||
// Apply local change
|
||||
workflowStore.updateLoopCollection(loopId, collection)
|
||||
|
||||
// Emit subflow update operation with calculated values
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
const config = {
|
||||
id: loopId,
|
||||
nodes: childNodes,
|
||||
iterations: currentIterations,
|
||||
loopType: currentLoopType,
|
||||
forEachItems: collection,
|
||||
}
|
||||
|
||||
emitWorkflowOperation('update', 'subflow', {
|
||||
id: loopId,
|
||||
type: 'loop',
|
||||
config,
|
||||
})
|
||||
const config = {
|
||||
id: loopId,
|
||||
nodes: childNodes,
|
||||
iterations: currentIterations,
|
||||
loopType: currentLoopType,
|
||||
forEachItems: collection,
|
||||
}
|
||||
|
||||
executeQueuedOperation('update', 'subflow', { id: loopId, type: 'loop', config }, () =>
|
||||
workflowStore.updateLoopCollection(loopId, collection)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeUpdateParallelCount = useCallback(
|
||||
@@ -851,27 +939,22 @@ export function useCollaborativeWorkflow() {
|
||||
const currentDistribution = currentBlock.data?.collection || ''
|
||||
const currentParallelType = currentBlock.data?.parallelType || 'collection'
|
||||
|
||||
// Apply local change
|
||||
workflowStore.updateParallelCount(parallelId, count)
|
||||
|
||||
// Emit subflow update operation with calculated values
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
const config = {
|
||||
id: parallelId,
|
||||
nodes: childNodes,
|
||||
count: Math.max(1, Math.min(20, count)), // Clamp between 1-20
|
||||
distribution: currentDistribution,
|
||||
parallelType: currentParallelType,
|
||||
}
|
||||
|
||||
emitWorkflowOperation('update', 'subflow', {
|
||||
id: parallelId,
|
||||
type: 'parallel',
|
||||
config,
|
||||
})
|
||||
const config = {
|
||||
id: parallelId,
|
||||
nodes: childNodes,
|
||||
count: Math.max(1, Math.min(20, count)), // Clamp between 1-20
|
||||
distribution: currentDistribution,
|
||||
parallelType: currentParallelType,
|
||||
}
|
||||
|
||||
executeQueuedOperation(
|
||||
'update',
|
||||
'subflow',
|
||||
{ id: parallelId, type: 'parallel', config },
|
||||
() => workflowStore.updateParallelCount(parallelId, count)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeUpdateParallelCollection = useCallback(
|
||||
@@ -889,27 +972,22 @@ export function useCollaborativeWorkflow() {
|
||||
const currentCount = currentBlock.data?.count || 5
|
||||
const currentParallelType = currentBlock.data?.parallelType || 'collection'
|
||||
|
||||
// Apply local change
|
||||
workflowStore.updateParallelCollection(parallelId, collection)
|
||||
|
||||
// Emit subflow update operation with calculated values
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
const config = {
|
||||
id: parallelId,
|
||||
nodes: childNodes,
|
||||
count: currentCount,
|
||||
distribution: collection,
|
||||
parallelType: currentParallelType,
|
||||
}
|
||||
|
||||
emitWorkflowOperation('update', 'subflow', {
|
||||
id: parallelId,
|
||||
type: 'parallel',
|
||||
config,
|
||||
})
|
||||
const config = {
|
||||
id: parallelId,
|
||||
nodes: childNodes,
|
||||
count: currentCount,
|
||||
distribution: collection,
|
||||
parallelType: currentParallelType,
|
||||
}
|
||||
|
||||
executeQueuedOperation(
|
||||
'update',
|
||||
'subflow',
|
||||
{ id: parallelId, type: 'parallel', config },
|
||||
() => workflowStore.updateParallelCollection(parallelId, collection)
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
const collaborativeUpdateParallelType = useCallback(
|
||||
@@ -936,29 +1014,27 @@ export function useCollaborativeWorkflow() {
|
||||
newDistribution = newDistribution || ''
|
||||
}
|
||||
|
||||
// Apply all changes locally first
|
||||
workflowStore.updateParallelType(parallelId, parallelType)
|
||||
workflowStore.updateParallelCount(parallelId, newCount)
|
||||
workflowStore.updateParallelCollection(parallelId, newDistribution)
|
||||
|
||||
// Emit single subflow update with all changes
|
||||
if (!isApplyingRemoteChange.current) {
|
||||
const config = {
|
||||
id: parallelId,
|
||||
nodes: childNodes,
|
||||
count: newCount,
|
||||
distribution: newDistribution,
|
||||
parallelType,
|
||||
}
|
||||
|
||||
emitWorkflowOperation('update', 'subflow', {
|
||||
id: parallelId,
|
||||
type: 'parallel',
|
||||
config,
|
||||
})
|
||||
const config = {
|
||||
id: parallelId,
|
||||
nodes: childNodes,
|
||||
count: newCount,
|
||||
distribution: newDistribution,
|
||||
parallelType,
|
||||
}
|
||||
|
||||
executeQueuedOperation(
|
||||
'update',
|
||||
'subflow',
|
||||
{ id: parallelId, type: 'parallel', config },
|
||||
() => {
|
||||
// Apply all changes locally
|
||||
workflowStore.updateParallelType(parallelId, parallelType)
|
||||
workflowStore.updateParallelCount(parallelId, newCount)
|
||||
workflowStore.updateParallelCollection(parallelId, newDistribution)
|
||||
}
|
||||
)
|
||||
},
|
||||
[workflowStore, emitWorkflowOperation]
|
||||
[executeQueuedOperation, workflowStore]
|
||||
)
|
||||
|
||||
return {
|
||||
@@ -966,6 +1042,7 @@ export function useCollaborativeWorkflow() {
|
||||
isConnected,
|
||||
currentWorkflowId,
|
||||
presenceUsers,
|
||||
hasOperationError,
|
||||
|
||||
// Workflow management
|
||||
joinWorkflow,
|
||||
|
||||
@@ -36,8 +36,12 @@ export function setupOperationsHandlers(
|
||||
return
|
||||
}
|
||||
|
||||
let operationId: string | undefined
|
||||
|
||||
try {
|
||||
const validatedOperation = WorkflowOperationSchema.parse(data)
|
||||
const extractedData = validatedOperation as any
|
||||
operationId = extractedData.operationId
|
||||
const { operation, target, payload, timestamp } = validatedOperation
|
||||
|
||||
// Check operation permissions
|
||||
@@ -100,16 +104,25 @@ export function setupOperationsHandlers(
|
||||
userId: session.userId,
|
||||
}).catch((error) => {
|
||||
logger.error('Failed to persist position update:', error)
|
||||
// Emit failure for position updates if operationId is provided
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId,
|
||||
error: error instanceof Error ? error.message : 'Database persistence failed',
|
||||
retryable: true,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
room.lastModified = Date.now()
|
||||
|
||||
socket.emit('operation-confirmed', {
|
||||
operation,
|
||||
target,
|
||||
operationId: broadcastData.metadata.operationId,
|
||||
serverTimestamp: Date.now(),
|
||||
})
|
||||
// Emit confirmation if operationId is provided
|
||||
if (operationId) {
|
||||
socket.emit('operation-confirmed', {
|
||||
operationId,
|
||||
serverTimestamp: Date.now(),
|
||||
})
|
||||
}
|
||||
|
||||
return // Early return for position updates
|
||||
}
|
||||
@@ -143,13 +156,26 @@ export function setupOperationsHandlers(
|
||||
|
||||
socket.to(workflowId).emit('workflow-operation', broadcastData)
|
||||
|
||||
socket.emit('operation-confirmed', {
|
||||
operation,
|
||||
target,
|
||||
operationId: broadcastData.metadata.operationId,
|
||||
serverTimestamp: Date.now(),
|
||||
})
|
||||
// Emit confirmation if operationId is provided
|
||||
if (operationId) {
|
||||
socket.emit('operation-confirmed', {
|
||||
operationId,
|
||||
serverTimestamp: Date.now(),
|
||||
})
|
||||
}
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'
|
||||
|
||||
// Emit operation-failed for queue-tracked operations
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId,
|
||||
error: errorMessage,
|
||||
retryable: !(error instanceof ZodError), // Don't retry validation errors
|
||||
})
|
||||
}
|
||||
|
||||
// Also emit legacy operation-error for backward compatibility
|
||||
if (error instanceof ZodError) {
|
||||
socket.emit('operation-error', {
|
||||
type: 'VALIDATION_ERROR',
|
||||
|
||||
@@ -27,7 +27,7 @@ export function setupSubblocksHandlers(
|
||||
return
|
||||
}
|
||||
|
||||
const { blockId, subblockId, value, timestamp } = data
|
||||
const { blockId, subblockId, value, timestamp, operationId } = data
|
||||
const room = roomManager.getWorkflowRoom(workflowId)
|
||||
|
||||
if (!room) {
|
||||
@@ -117,15 +117,35 @@ export function setupSubblocksHandlers(
|
||||
senderId: socket.id,
|
||||
userId: session.userId,
|
||||
})
|
||||
|
||||
// Emit confirmation if operationId is provided
|
||||
if (operationId) {
|
||||
socket.emit('operation-confirmed', {
|
||||
operationId,
|
||||
serverTimestamp: Date.now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`)
|
||||
} catch (error) {
|
||||
logger.error('Error handling subblock update:', error)
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
|
||||
// Emit operation-failed for queue-tracked operations
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId,
|
||||
error: errorMessage,
|
||||
retryable: true, // Subblock updates are generally retryable
|
||||
})
|
||||
}
|
||||
|
||||
// Also emit legacy operation-error for backward compatibility
|
||||
socket.emit('operation-error', {
|
||||
type: 'SUBBLOCK_UPDATE_FAILED',
|
||||
message: `Failed to update subblock ${blockId}.${subblockId}: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
message: `Failed to update subblock ${blockId}.${subblockId}: ${errorMessage}`,
|
||||
operation: 'subblock-update',
|
||||
target: 'subblock',
|
||||
})
|
||||
|
||||
@@ -48,6 +48,7 @@ export const BlockOperationSchema = z.object({
|
||||
autoConnectEdge: AutoConnectEdgeSchema.optional(), // Add support for auto-connect edges
|
||||
}),
|
||||
timestamp: z.number(),
|
||||
operationId: z.string().optional(),
|
||||
})
|
||||
|
||||
export const EdgeOperationSchema = z.object({
|
||||
@@ -61,6 +62,7 @@ export const EdgeOperationSchema = z.object({
|
||||
targetHandle: z.string().nullable().optional(),
|
||||
}),
|
||||
timestamp: z.number(),
|
||||
operationId: z.string().optional(),
|
||||
})
|
||||
|
||||
export const SubflowOperationSchema = z.object({
|
||||
@@ -72,6 +74,7 @@ export const SubflowOperationSchema = z.object({
|
||||
config: z.record(z.any()).optional(),
|
||||
}),
|
||||
timestamp: z.number(),
|
||||
operationId: z.string().optional(),
|
||||
})
|
||||
|
||||
export const WorkflowOperationSchema = z.union([
|
||||
|
||||
312
apps/sim/stores/operation-queue/store.ts
Normal file
312
apps/sim/stores/operation-queue/store.ts
Normal file
@@ -0,0 +1,312 @@
|
||||
import { create } from 'zustand'
|
||||
import { createLogger } from '@/lib/logs/console-logger'
|
||||
|
||||
const logger = createLogger('OperationQueue')
|
||||
|
||||
// Operation queue types
|
||||
export interface QueuedOperation {
|
||||
id: string
|
||||
operation: {
|
||||
operation: string
|
||||
target: string
|
||||
payload: any
|
||||
}
|
||||
workflowId: string // Track which workflow this operation belongs to
|
||||
timestamp: number
|
||||
retryCount: number
|
||||
status: 'pending' | 'confirmed' | 'failed'
|
||||
userId: string
|
||||
}
|
||||
|
||||
interface OperationQueueState {
|
||||
operations: QueuedOperation[]
|
||||
isProcessing: boolean
|
||||
hasOperationError: boolean
|
||||
|
||||
// Actions
|
||||
addToQueue: (operation: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>) => void
|
||||
confirmOperation: (operationId: string) => void
|
||||
failOperation: (operationId: string, emitFunction: (operation: QueuedOperation) => void) => void
|
||||
handleOperationTimeout: (operationId: string) => void
|
||||
handleSocketReconnection: () => void
|
||||
triggerOfflineMode: () => void
|
||||
clearError: () => void
|
||||
}
|
||||
|
||||
// Global timeout maps (outside of Zustand store to avoid serialization issues)
|
||||
const retryTimeouts = new Map<string, NodeJS.Timeout>()
|
||||
const operationTimeouts = new Map<string, NodeJS.Timeout>()
|
||||
|
||||
// Global registry for emit functions and current workflow (set by collaborative workflow hook)
|
||||
let emitWorkflowOperation:
|
||||
| ((operation: string, target: string, payload: any, operationId?: string) => void)
|
||||
| null = null
|
||||
let emitSubblockUpdate:
|
||||
| ((blockId: string, subblockId: string, value: any, operationId?: string) => void)
|
||||
| null = null
|
||||
let currentWorkflowId: string | null = null
|
||||
|
||||
export function registerEmitFunctions(
|
||||
workflowEmit: (operation: string, target: string, payload: any, operationId?: string) => void,
|
||||
subblockEmit: (blockId: string, subblockId: string, value: any, operationId?: string) => void,
|
||||
workflowId: string | null
|
||||
) {
|
||||
emitWorkflowOperation = workflowEmit
|
||||
emitSubblockUpdate = subblockEmit
|
||||
currentWorkflowId = workflowId
|
||||
}
|
||||
|
||||
export const useOperationQueueStore = create<OperationQueueState>((set, get) => ({
|
||||
operations: [],
|
||||
isProcessing: false,
|
||||
hasOperationError: false,
|
||||
|
||||
addToQueue: (operation) => {
|
||||
const state = get()
|
||||
|
||||
// Check if operation already exists in queue
|
||||
const existingOp = state.operations.find((op) => op.id === operation.id)
|
||||
if (existingOp) {
|
||||
console.log('⚠️ Operation already in queue, skipping duplicate', { operationId: operation.id })
|
||||
return
|
||||
}
|
||||
|
||||
const queuedOp: QueuedOperation = {
|
||||
...operation,
|
||||
timestamp: Date.now(),
|
||||
retryCount: 0,
|
||||
status: 'pending',
|
||||
}
|
||||
|
||||
logger.debug('Adding operation to queue', {
|
||||
operationId: queuedOp.id,
|
||||
operation: queuedOp.operation,
|
||||
})
|
||||
|
||||
// Start 5-second timeout to detect unresponsive server
|
||||
// This will trigger retry mechanism if server doesn't respond at all
|
||||
const timeoutId = setTimeout(() => {
|
||||
logger.warn('Operation timeout - no server response after 5 seconds', {
|
||||
operationId: queuedOp.id,
|
||||
})
|
||||
operationTimeouts.delete(queuedOp.id)
|
||||
|
||||
// Handle timeout directly in store instead of emitting events
|
||||
get().handleOperationTimeout(queuedOp.id)
|
||||
}, 5000)
|
||||
|
||||
operationTimeouts.set(queuedOp.id, timeoutId)
|
||||
|
||||
set((state) => ({
|
||||
operations: [...state.operations, queuedOp],
|
||||
}))
|
||||
},
|
||||
|
||||
confirmOperation: (operationId) => {
|
||||
const state = get()
|
||||
const newOperations = state.operations.filter((op) => op.id !== operationId)
|
||||
|
||||
// Clear any retry timeout for this operation
|
||||
const retryTimeout = retryTimeouts.get(operationId)
|
||||
if (retryTimeout) {
|
||||
clearTimeout(retryTimeout)
|
||||
retryTimeouts.delete(operationId)
|
||||
}
|
||||
|
||||
// Clear any operation timeout for this operation
|
||||
const operationTimeout = operationTimeouts.get(operationId)
|
||||
if (operationTimeout) {
|
||||
clearTimeout(operationTimeout)
|
||||
operationTimeouts.delete(operationId)
|
||||
}
|
||||
|
||||
logger.debug('Removing operation from queue', {
|
||||
operationId,
|
||||
remainingOps: newOperations.length,
|
||||
})
|
||||
|
||||
set({ operations: newOperations })
|
||||
},
|
||||
|
||||
failOperation: (operationId: string, emitFunction: (operation: QueuedOperation) => void) => {
|
||||
const state = get()
|
||||
const operation = state.operations.find((op) => op.id === operationId)
|
||||
if (!operation) {
|
||||
logger.warn('Attempted to fail operation that does not exist in queue', { operationId })
|
||||
return
|
||||
}
|
||||
|
||||
// Clear any existing operation timeout since we're handling the failure
|
||||
const operationTimeout = operationTimeouts.get(operationId)
|
||||
if (operationTimeout) {
|
||||
clearTimeout(operationTimeout)
|
||||
operationTimeouts.delete(operationId)
|
||||
}
|
||||
|
||||
if (operation.retryCount < 3) {
|
||||
// Retry the operation with exponential backoff
|
||||
const newRetryCount = operation.retryCount + 1
|
||||
const delay = 2 ** newRetryCount * 1000 // 2s, 4s, 8s
|
||||
|
||||
logger.warn(`Operation failed, retrying in ${delay}ms (attempt ${newRetryCount}/3)`, {
|
||||
operationId,
|
||||
retryCount: newRetryCount,
|
||||
})
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
// Check if we're still in the same workflow before retrying
|
||||
if (operation.workflowId !== currentWorkflowId) {
|
||||
logger.warn('Cancelling retry - workflow changed', {
|
||||
operationId,
|
||||
operationWorkflow: operation.workflowId,
|
||||
currentWorkflow: currentWorkflowId,
|
||||
})
|
||||
retryTimeouts.delete(operationId)
|
||||
// Remove operation from queue since it's no longer relevant
|
||||
set((state) => ({
|
||||
operations: state.operations.filter((op) => op.id !== operationId),
|
||||
}))
|
||||
return
|
||||
}
|
||||
|
||||
// Re-emit the operation
|
||||
emitFunction(operation)
|
||||
retryTimeouts.delete(operationId)
|
||||
|
||||
// Start a new operation timeout for the retry
|
||||
const newTimeoutId = setTimeout(() => {
|
||||
logger.warn('Retry operation timeout - no server response after 5 seconds', {
|
||||
operationId,
|
||||
})
|
||||
operationTimeouts.delete(operationId)
|
||||
|
||||
// Trigger another retry attempt
|
||||
get().handleOperationTimeout(operationId)
|
||||
}, 5000)
|
||||
|
||||
operationTimeouts.set(operationId, newTimeoutId)
|
||||
}, delay)
|
||||
|
||||
retryTimeouts.set(operationId, timeout)
|
||||
|
||||
// Update retry count
|
||||
set((state) => ({
|
||||
operations: state.operations.map((op) =>
|
||||
op.id === operationId ? { ...op, retryCount: newRetryCount } : op
|
||||
),
|
||||
}))
|
||||
} else {
|
||||
// Max retries exceeded - trigger offline mode
|
||||
logger.error('Operation failed after max retries, triggering offline mode', { operationId })
|
||||
get().triggerOfflineMode()
|
||||
}
|
||||
},
|
||||
|
||||
handleOperationTimeout: (operationId: string) => {
|
||||
const state = get()
|
||||
const operation = state.operations.find((op) => op.id === operationId)
|
||||
if (!operation) {
|
||||
logger.debug('Ignoring timeout for operation not in queue', { operationId })
|
||||
return
|
||||
}
|
||||
|
||||
logger.warn('Operation timeout detected - treating as failure to trigger retries', {
|
||||
operationId,
|
||||
})
|
||||
|
||||
// Create a retry function that re-emits the operation using the correct channel
|
||||
const retryFunction = (operation: any) => {
|
||||
const { operation: op, target, payload } = operation.operation
|
||||
|
||||
if (op === 'subblock-update' && target === 'subblock') {
|
||||
// Use subblock-update channel for subblock operations
|
||||
if (emitSubblockUpdate) {
|
||||
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, operation.id)
|
||||
}
|
||||
} else {
|
||||
// Use workflow-operation channel for block/edge/subflow operations
|
||||
if (emitWorkflowOperation) {
|
||||
emitWorkflowOperation(op, target, payload, operation.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Treat timeout as a failure to trigger retry mechanism
|
||||
get().failOperation(operationId, retryFunction)
|
||||
},
|
||||
|
||||
handleSocketReconnection: () => {
|
||||
logger.info('Socket reconnected - clearing timeouts but keeping operations for retry')
|
||||
|
||||
// Clear all timeouts since they're for the old socket
|
||||
retryTimeouts.forEach((timeout) => clearTimeout(timeout))
|
||||
retryTimeouts.clear()
|
||||
operationTimeouts.forEach((timeout) => clearTimeout(timeout))
|
||||
operationTimeouts.clear()
|
||||
|
||||
// Keep operations in queue but reset their retry counts and start fresh timeouts
|
||||
const state = get()
|
||||
const resetOperations = state.operations.map((op) => ({
|
||||
...op,
|
||||
retryCount: 0, // Reset retry count for fresh attempts
|
||||
status: 'pending' as const,
|
||||
}))
|
||||
|
||||
set({
|
||||
operations: resetOperations,
|
||||
isProcessing: false,
|
||||
hasOperationError: false,
|
||||
})
|
||||
|
||||
// Start new timeouts for all operations (they'll retry when socket is ready)
|
||||
resetOperations.forEach((operation) => {
|
||||
const timeoutId = setTimeout(() => {
|
||||
logger.warn('Operation timeout after reconnection - no server response after 5 seconds', {
|
||||
operationId: operation.id,
|
||||
})
|
||||
operationTimeouts.delete(operation.id)
|
||||
get().handleOperationTimeout(operation.id)
|
||||
}, 5000)
|
||||
|
||||
operationTimeouts.set(operation.id, timeoutId)
|
||||
})
|
||||
},
|
||||
|
||||
triggerOfflineMode: () => {
|
||||
logger.error('Operation failed after retries - triggering offline mode')
|
||||
|
||||
// Clear all timeouts and queue
|
||||
retryTimeouts.forEach((timeout) => clearTimeout(timeout))
|
||||
retryTimeouts.clear()
|
||||
operationTimeouts.forEach((timeout) => clearTimeout(timeout))
|
||||
operationTimeouts.clear()
|
||||
|
||||
// Clear queue and trigger error state
|
||||
set({
|
||||
operations: [],
|
||||
isProcessing: false,
|
||||
hasOperationError: true,
|
||||
})
|
||||
},
|
||||
|
||||
clearError: () => {
|
||||
set({ hasOperationError: false })
|
||||
},
|
||||
}))
|
||||
|
||||
// Hook wrapper for easier usage
|
||||
export function useOperationQueue() {
|
||||
const store = useOperationQueueStore()
|
||||
|
||||
return {
|
||||
queue: store.operations,
|
||||
isProcessing: store.isProcessing,
|
||||
hasOperationError: store.hasOperationError,
|
||||
addToQueue: store.addToQueue,
|
||||
confirmOperation: store.confirmOperation,
|
||||
failOperation: store.failOperation,
|
||||
handleSocketReconnection: store.handleSocketReconnection,
|
||||
triggerOfflineMode: store.triggerOfflineMode,
|
||||
clearError: store.clearError,
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user