This commit is contained in:
Vikhyath Mondreti
2025-07-12 18:27:56 -07:00
parent db4ad80a4c
commit f16d759d8d
2 changed files with 49 additions and 85 deletions

View File

@@ -44,14 +44,8 @@ export function useCollaborativeWorkflow() {
const lastPositionTimestamps = useRef<Map<string, number>>(new Map())
// Operation queue
const {
queue,
hasOperationError,
addToQueue,
confirmOperation,
failOperation,
handleSocketReconnection,
} = useOperationQueue()
const { queue, hasOperationError, addToQueue, confirmOperation, failOperation } =
useOperationQueue()
// Clear position timestamps when switching workflows
// Note: Workflow joining is now handled automatically by socket connect event based on URL
@@ -74,30 +68,6 @@ export function useCollaborativeWorkflow() {
registerEmitFunctions(emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId)
}, [emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId])
// Log connection status changes and handle reconnection
useEffect(() => {
logger.info('Collaborative workflow connection status changed', {
isConnected,
currentWorkflowId,
activeWorkflowId,
presenceUsers: presenceUsers.length,
})
// Clear operation queue when socket reconnects AND has joined workflow
// We need both isConnected=true AND currentWorkflowId to match activeWorkflowId
// This ensures the socket has actually joined the workflow room before we allow retries
if (isConnected && currentWorkflowId && currentWorkflowId === activeWorkflowId) {
logger.info('Socket reconnected and joined workflow - clearing operation queue')
handleSocketReconnection()
}
}, [
isConnected,
currentWorkflowId,
activeWorkflowId,
presenceUsers.length,
handleSocketReconnection,
])
useEffect(() => {
const handleWorkflowOperation = (data: any) => {
const { operation, target, payload, userId } = data
@@ -416,10 +386,8 @@ export function useCollaborativeWorkflow() {
})
localAction()
emitWorkflowOperation(operation, target, payload, operationId)
},
[addToQueue, emitWorkflowOperation, session?.user?.id]
[addToQueue, session?.user?.id]
)
const executeQueuedDebouncedOperation = useCallback(
@@ -496,8 +464,6 @@ export function useCollaborativeWorkflow() {
workflowStore.addEdge(autoConnectEdge)
}
// Emit to server with operation ID for tracking
emitWorkflowOperation('add', 'block', completeBlockData, operationId)
return
}
@@ -562,9 +528,6 @@ export function useCollaborativeWorkflow() {
if (autoConnectEdge) {
workflowStore.addEdge(autoConnectEdge)
}
// Emit to server with operation ID
emitWorkflowOperation('add', 'block', completeBlockData, operationId)
},
[workflowStore, emitWorkflowOperation, addToQueue, session?.user?.id]
)
@@ -594,17 +557,37 @@ export function useCollaborativeWorkflow() {
const globalWindow = window as any
const pendingUpdates = globalWindow.__pendingSubblockUpdates
if (pendingUpdates && Array.isArray(pendingUpdates)) {
// Emit collaborative subblock updates for each changed subblock
// Queue each subblock update individually
for (const update of pendingUpdates) {
const { blockId, subBlockId, newValue } = update
emitSubblockUpdate(blockId, subBlockId, newValue)
const operationId = crypto.randomUUID()
addToQueue({
id: operationId,
operation: {
operation: 'subblock-update',
target: 'subblock',
payload: { blockId, subblockId: subBlockId, value: newValue },
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})
subBlockStore.setValue(blockId, subBlockId, newValue)
}
// Clear the pending updates
globalWindow.__pendingSubblockUpdates = undefined
}
})
},
[executeQueuedOperation, workflowStore, emitSubblockUpdate]
[
executeQueuedOperation,
workflowStore,
addToQueue,
subBlockStore,
activeWorkflowId,
session?.user?.id,
]
)
const collaborativeToggleBlockEnabled = useCallback(
@@ -802,9 +785,6 @@ export function useCollaborativeWorkflow() {
// Apply locally first (immediate UI feedback)
subBlockStore.setValue(blockId, subblockId, value)
// Emit to server with operation ID for tracking
emitSubblockUpdate(blockId, subblockId, value, operationId)
},
[
subBlockStore,

View File

@@ -26,7 +26,6 @@ interface OperationQueueState {
confirmOperation: (operationId: string) => void
failOperation: (operationId: string) => void
handleOperationTimeout: (operationId: string) => void
handleSocketReconnection: () => void
processNextOperation: () => void
triggerOfflineMode: () => void
@@ -65,24 +64,43 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
// Check for duplicate operation ID
const existingOp = state.operations.find((op) => op.id === operation.id)
if (existingOp) {
logger.debug('Skipping duplicate operation', { operationId: operation.id })
logger.debug('Skipping duplicate operation ID', {
operationId: operation.id,
existingStatus: existingOp.status,
})
return
}
// Check for duplicate operation content (same operation on same target with same payload)
// Enhanced duplicate content check - especially important for block operations
const duplicateContent = state.operations.find(
(op) =>
op.operation.operation === operation.operation.operation &&
op.operation.target === operation.operation.target &&
JSON.stringify(op.operation.payload) === JSON.stringify(operation.operation.payload) &&
op.workflowId === operation.workflowId
op.workflowId === operation.workflowId &&
// For block operations, check the block ID specifically
((operation.operation.target === 'block' &&
op.operation.payload?.id === operation.operation.payload?.id) ||
// For subblock operations, check blockId and subblockId
(operation.operation.target === 'subblock' &&
op.operation.payload?.blockId === operation.operation.payload?.blockId &&
op.operation.payload?.subblockId === operation.operation.payload?.subblockId) ||
// For other operations, fall back to full payload comparison
(operation.operation.target !== 'block' &&
operation.operation.target !== 'subblock' &&
JSON.stringify(op.operation.payload) === JSON.stringify(operation.operation.payload)))
)
if (duplicateContent) {
logger.debug('Skipping duplicate operation content', {
operationId: operation.id,
existingOperationId: duplicateContent.id,
operation: operation.operation.operation,
target: operation.operation.target,
existingStatus: duplicateContent.status,
payload:
operation.operation.target === 'block'
? { id: operation.operation.payload?.id }
: operation.operation.payload,
})
return
}
@@ -262,39 +280,6 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
operationTimeouts.set(nextOperation.id, timeoutId)
},
handleSocketReconnection: () => {
// Clear all timeouts since they're for the old socket
retryTimeouts.forEach((timeout) => clearTimeout(timeout))
retryTimeouts.clear()
operationTimeouts.forEach((timeout) => clearTimeout(timeout))
operationTimeouts.clear()
const state = get()
const resetOperations = state.operations.map((op) => ({
...op,
retryCount: 0, // Reset retry count for fresh attempts
status: 'pending' as const,
}))
set({
operations: resetOperations,
isProcessing: false,
hasOperationError: false,
})
resetOperations.forEach((operation) => {
const timeoutId = setTimeout(() => {
logger.warn('Operation timeout after reconnection - no server response after 5 seconds', {
operationId: operation.id,
})
operationTimeouts.delete(operation.id)
get().handleOperationTimeout(operation.id)
}, 5000)
operationTimeouts.set(operation.id, timeoutId)
})
},
triggerOfflineMode: () => {
logger.error('Operation failed after retries - triggering offline mode')
@@ -325,7 +310,6 @@ export function useOperationQueue() {
addToQueue: store.addToQueue,
confirmOperation: store.confirmOperation,
failOperation: store.failOperation,
handleSocketReconnection: store.handleSocketReconnection,
processNextOperation: store.processNextOperation,
triggerOfflineMode: store.triggerOfflineMode,
clearError: store.clearError,