improvement(sockets): duplicate op should let addBlock take subblock values instead of separate looped op (#836)

* improvement(sockets): addBlock can accept subblock values

* cleanup unused code
This commit is contained in:
Vikhyath Mondreti
2025-07-31 19:36:32 -07:00
committed by GitHub
parent fb6f5553bb
commit 914f1cdd47
4 changed files with 35 additions and 301 deletions

View File

@@ -50,17 +50,13 @@ interface SocketContextType {
value: any,
operationId?: string
) => void
emitBatchSubblockUpdate: (
blockId: string,
subblockValues: Record<string, any>,
operationId?: string
) => void
emitCursorUpdate: (cursor: { x: number; y: number }) => void
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
// Event handlers for receiving real-time updates
onWorkflowOperation: (handler: (data: any) => void) => void
onSubblockUpdate: (handler: (data: any) => void) => void
onBatchSubblockUpdate: (handler: (data: any) => void) => void
onCursorUpdate: (handler: (data: any) => void) => void
onSelectionUpdate: (handler: (data: any) => void) => void
onUserJoined: (handler: (data: any) => void) => void
@@ -81,12 +77,10 @@ const SocketContext = createContext<SocketContextType>({
leaveWorkflow: () => {},
emitWorkflowOperation: () => {},
emitSubblockUpdate: () => {},
emitBatchSubblockUpdate: () => {},
emitCursorUpdate: () => {},
emitSelectionUpdate: () => {},
onWorkflowOperation: () => {},
onSubblockUpdate: () => {},
onBatchSubblockUpdate: () => {},
onCursorUpdate: () => {},
onSelectionUpdate: () => {},
onUserJoined: () => {},
@@ -119,7 +113,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const eventHandlers = useRef<{
workflowOperation?: (data: any) => void
subblockUpdate?: (data: any) => void
batchSubblockUpdate?: (data: any) => void
cursorUpdate?: (data: any) => void
selectionUpdate?: (data: any) => void
userJoined?: (data: any) => void
@@ -298,11 +292,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.subblockUpdate?.(data)
})
// Batch subblock update events
socketInstance.on('batch-subblock-update', (data) => {
eventHandlers.current.batchSubblockUpdate?.(data)
})
// Workflow deletion events
socketInstance.on('workflow-deleted', (data) => {
logger.warn(`Workflow ${data.workflowId} has been deleted`)
@@ -708,29 +697,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
[socket, currentWorkflowId]
)
// Emit batch subblock value updates
const emitBatchSubblockUpdate = useCallback(
(blockId: string, subblockValues: Record<string, any>, operationId?: string) => {
// Only emit if socket is connected and we're in a valid workflow room
if (socket && currentWorkflowId) {
socket.emit('batch-subblock-update', {
blockId,
subblockValues,
timestamp: Date.now(),
operationId, // Include operation ID for queue tracking
})
} else {
logger.warn('Cannot emit batch subblock update: no socket connection or workflow room', {
hasSocket: !!socket,
currentWorkflowId,
blockId,
subblockCount: Object.keys(subblockValues).length,
})
}
},
[socket, currentWorkflowId]
)
// Cursor throttling optimized for database connection health
const lastCursorEmit = useRef(0)
const emitCursorUpdate = useCallback(
@@ -766,10 +732,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.subblockUpdate = handler
}, [])
const onBatchSubblockUpdate = useCallback((handler: (data: any) => void) => {
eventHandlers.current.batchSubblockUpdate = handler
}, [])
const onCursorUpdate = useCallback((handler: (data: any) => void) => {
eventHandlers.current.cursorUpdate = handler
}, [])
@@ -814,12 +776,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
leaveWorkflow,
emitWorkflowOperation,
emitSubblockUpdate,
emitBatchSubblockUpdate,
emitCursorUpdate,
emitSelectionUpdate,
onWorkflowOperation,
onSubblockUpdate,
onBatchSubblockUpdate,
onCursorUpdate,
onSelectionUpdate,
onUserJoined,

View File

@@ -22,10 +22,8 @@ export function useCollaborativeWorkflow() {
leaveWorkflow,
emitWorkflowOperation,
emitSubblockUpdate,
emitBatchSubblockUpdate,
onWorkflowOperation,
onSubblockUpdate,
onBatchSubblockUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
@@ -73,13 +71,8 @@ export function useCollaborativeWorkflow() {
// Register emit functions with operation queue store
useEffect(() => {
registerEmitFunctions(
emitWorkflowOperation,
emitSubblockUpdate,
emitBatchSubblockUpdate,
currentWorkflowId
)
}, [emitWorkflowOperation, emitSubblockUpdate, emitBatchSubblockUpdate, currentWorkflowId])
registerEmitFunctions(emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId)
}, [emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId])
useEffect(() => {
const handleWorkflowOperation = (data: any) => {
@@ -245,29 +238,6 @@ export function useCollaborativeWorkflow() {
}
}
const handleBatchSubblockUpdate = (data: any) => {
const { blockId, subblockValues, userId } = data
if (isApplyingRemoteChange.current) return
logger.info(
`Received batch subblock update from user ${userId}: ${blockId} (${Object.keys(subblockValues).length} subblocks)`
)
isApplyingRemoteChange.current = true
try {
// Apply all subblock values in batch
Object.entries(subblockValues).forEach(([subblockId, value]) => {
subBlockStore.setValue(blockId, subblockId, value)
})
} catch (error) {
logger.error('Error applying remote batch subblock update:', error)
} finally {
isApplyingRemoteChange.current = false
}
}
const handleUserJoined = (data: any) => {
logger.info(`User joined: ${data.userName}`)
}
@@ -373,7 +343,6 @@ export function useCollaborativeWorkflow() {
// Register event handlers
onWorkflowOperation(handleWorkflowOperation)
onSubblockUpdate(handleSubblockUpdate)
onBatchSubblockUpdate(handleBatchSubblockUpdate)
onUserJoined(handleUserJoined)
onUserLeft(handleUserLeft)
onWorkflowDeleted(handleWorkflowDeleted)
@@ -387,7 +356,6 @@ export function useCollaborativeWorkflow() {
}, [
onWorkflowOperation,
onSubblockUpdate,
onBatchSubblockUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
@@ -755,43 +723,6 @@ export function useCollaborativeWorkflow() {
]
)
const collaborativeBatchSetSubblockValues = useCallback(
(blockId: string, subblockValues: Record<string, any>) => {
if (isApplyingRemoteChange.current) return
if (!currentWorkflowId || activeWorkflowId !== currentWorkflowId) {
logger.debug('Skipping batch subblock update - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
blockId,
subblockCount: Object.keys(subblockValues).length,
})
return
}
// Generate operation ID for queue tracking
const operationId = crypto.randomUUID()
// Add to queue for retry mechanism
addToQueue({
id: operationId,
operation: {
operation: 'batch-subblock-update',
target: 'block',
payload: { blockId, subblockValues },
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})
// Apply locally first (immediate UI feedback)
Object.entries(subblockValues).forEach(([subblockId, value]) => {
subBlockStore.setValue(blockId, subblockId, value)
})
},
[subBlockStore, currentWorkflowId, activeWorkflowId, addToQueue, session?.user?.id]
)
const collaborativeDuplicateBlock = useCallback(
(sourceId: string) => {
const sourceBlock = workflowStore.blocks[sourceId]
@@ -809,6 +740,26 @@ export function useCollaborativeWorkflow() {
? `${match[1]}${Number.parseInt(match[2]) + 1}`
: `${sourceBlock.name} 1`
// Get subblock values from the store
const subBlockValues = subBlockStore.workflowValues[activeWorkflowId || '']?.[sourceId] || {}
// Merge subblock structure with actual values
const mergedSubBlocks = sourceBlock.subBlocks
? JSON.parse(JSON.stringify(sourceBlock.subBlocks))
: {}
Object.entries(subBlockValues).forEach(([subblockId, value]) => {
if (mergedSubBlocks[subblockId]) {
mergedSubBlocks[subblockId].value = value
} else {
// Create subblock if it doesn't exist in structure
mergedSubBlocks[subblockId] = {
id: subblockId,
type: 'unknown',
value: value,
}
}
})
// Create the complete block data for the socket operation
const duplicatedBlockData = {
sourceId,
@@ -817,7 +768,7 @@ export function useCollaborativeWorkflow() {
name: newName,
position: offsetPosition,
data: sourceBlock.data ? JSON.parse(JSON.stringify(sourceBlock.data)) : {},
subBlocks: sourceBlock.subBlocks ? JSON.parse(JSON.stringify(sourceBlock.subBlocks)) : {},
subBlocks: mergedSubBlocks,
outputs: sourceBlock.outputs ? JSON.parse(JSON.stringify(sourceBlock.outputs)) : {},
parentId: sourceBlock.data?.parentId || null,
extent: sourceBlock.data?.extent || null,
@@ -837,21 +788,6 @@ export function useCollaborativeWorkflow() {
sourceBlock.data?.extent
)
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (activeWorkflowId) {
const subBlockValues =
useSubBlockStore.getState().workflowValues[activeWorkflowId]?.[sourceId] || {}
useSubBlockStore.setState((state) => ({
workflowValues: {
...state.workflowValues,
[activeWorkflowId]: {
...state.workflowValues[activeWorkflowId],
[newId]: JSON.parse(JSON.stringify(subBlockValues)),
},
},
}))
}
executeQueuedOperation('duplicate', 'block', duplicatedBlockData, () => {
workflowStore.addBlock(
newId,
@@ -861,19 +797,16 @@ export function useCollaborativeWorkflow() {
sourceBlock.data ? JSON.parse(JSON.stringify(sourceBlock.data)) : {}
)
const subBlockValues = subBlockStore.workflowValues[activeWorkflowId || '']?.[sourceId]
if (subBlockValues && activeWorkflowId) {
collaborativeBatchSetSubblockValues(newId, subBlockValues)
// Apply subblock values locally for immediate UI feedback
// The server will persist these values as part of the block creation
if (activeWorkflowId && Object.keys(subBlockValues).length > 0) {
Object.entries(subBlockValues).forEach(([subblockId, value]) => {
subBlockStore.setValue(newId, subblockId, value)
})
}
})
},
[
executeQueuedOperation,
workflowStore,
subBlockStore,
activeWorkflowId,
collaborativeBatchSetSubblockValues,
]
[executeQueuedOperation, workflowStore, subBlockStore, activeWorkflowId]
)
const collaborativeUpdateLoopCount = useCallback(

View File

@@ -158,152 +158,4 @@ export function setupSubblocksHandlers(
})
}
})
socket.on('batch-subblock-update', async (data) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring batch subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { blockId, subblockValues, timestamp, operationId } = data
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
logger.debug(`Ignoring batch subblock update: workflow room not found`, {
socketId: socket.id,
workflowId,
blockId,
subblockCount: Object.keys(subblockValues).length,
})
return
}
try {
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.lastActivity = Date.now()
}
// First, verify that the workflow still exists in the database
const workflowExists = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (workflowExists.length === 0) {
logger.warn(`Ignoring batch subblock update: workflow ${workflowId} no longer exists`, {
socketId: socket.id,
blockId,
subblockCount: Object.keys(subblockValues).length,
})
roomManager.cleanupUserFromRoom(socket.id, workflowId)
return
}
let updateSuccessful = false
await db.transaction(async (tx) => {
// Get the current block
const [block] = await tx
.select({ subBlocks: workflowBlocks.subBlocks })
.from(workflowBlocks)
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
.limit(1)
if (!block) {
logger.debug(`Block ${blockId} not found in workflow ${workflowId}`)
return
}
const subBlocks = (block.subBlocks as any) || {}
// Update all subblock values in batch
for (const [subblockId, value] of Object.entries(subblockValues)) {
if (!subBlocks[subblockId]) {
// Create new subblock with minimal structure
subBlocks[subblockId] = {
id: subblockId,
type: 'unknown', // Will be corrected by next collaborative update
value: value,
}
} else {
// Preserve existing id and type, only update value
subBlocks[subblockId] = {
...subBlocks[subblockId],
value: value,
}
}
}
await tx
.update(workflowBlocks)
.set({
subBlocks: subBlocks,
updatedAt: new Date(),
})
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
updateSuccessful = true
})
// Only broadcast to other clients if the update was successful
if (updateSuccessful) {
socket.to(workflowId).emit('batch-subblock-update', {
blockId,
subblockValues,
timestamp,
senderId: socket.id,
userId: session.userId,
})
// Emit confirmation if operationId is provided
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
serverTimestamp: Date.now(),
})
}
logger.debug(
`Batch subblock update in workflow ${workflowId}: ${blockId} (${Object.keys(subblockValues).length} subblocks)`
)
} else if (operationId) {
// Block was deleted - notify client that operation completed (but didn't update anything)
socket.emit('operation-failed', {
operationId,
error: 'Block no longer exists',
retryable: false, // No point retrying for deleted blocks
})
}
} catch (error) {
logger.error('Error handling batch subblock update:', error)
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
// Emit operation-failed for queue-tracked operations
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: errorMessage,
retryable: true, // Batch subblock updates are generally retryable
})
}
// Also emit legacy operation-error for backward compatibility
socket.emit('operation-error', {
type: 'BATCH_SUBBLOCK_UPDATE_FAILED',
message: `Failed to update batch subblocks for ${blockId}: ${errorMessage}`,
operation: 'batch-subblock-update',
target: 'block',
})
}
})
}

View File

@@ -43,23 +43,14 @@ let emitWorkflowOperation:
let emitSubblockUpdate:
| ((blockId: string, subblockId: string, value: any, operationId?: string) => void)
| null = null
let emitBatchSubblockUpdate:
| ((blockId: string, subblockValues: Record<string, any>, operationId?: string) => void)
| null = null
export function registerEmitFunctions(
workflowEmit: (operation: string, target: string, payload: any, operationId?: string) => void,
subblockEmit: (blockId: string, subblockId: string, value: any, operationId?: string) => void,
batchSubblockEmit: (
blockId: string,
subblockValues: Record<string, any>,
operationId?: string
) => void,
workflowId: string | null
) {
emitWorkflowOperation = workflowEmit
emitSubblockUpdate = subblockEmit
emitBatchSubblockUpdate = batchSubblockEmit
}
export const useOperationQueueStore = create<OperationQueueState>((set, get) => ({
@@ -340,10 +331,6 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
if (emitSubblockUpdate) {
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, nextOperation.id)
}
} else if (op === 'batch-subblock-update' && target === 'block') {
if (emitBatchSubblockUpdate) {
emitBatchSubblockUpdate(payload.blockId, payload.subblockValues, nextOperation.id)
}
} else {
if (emitWorkflowOperation) {
emitWorkflowOperation(op, target, payload, nextOperation.id)