From e5d9b989091b28d0951724ad1af1aede0a2b5b78 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 29 Jan 2026 17:08:44 -0800 Subject: [PATCH] use native api --- apps/sim/socket/handlers/subblocks.ts | 69 +++++++++------------------ apps/sim/socket/handlers/variables.ts | 68 +++++++++----------------- 2 files changed, 46 insertions(+), 91 deletions(-) diff --git a/apps/sim/socket/handlers/subblocks.ts b/apps/sim/socket/handlers/subblocks.ts index 0a9c30b9f..15c5cd54a 100644 --- a/apps/sim/socket/handlers/subblocks.ts +++ b/apps/sim/socket/handlers/subblocks.ts @@ -133,14 +133,11 @@ async function flushSubblockUpdate( if (workflowExists.length === 0) { pending.opToSocket.forEach((socketId, opId) => { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('operation-failed', { - operationId: opId, - error: 'Workflow not found', - retryable: false, - }) - } + io.to(socketId).emit('operation-failed', { + operationId: opId, + error: 'Workflow not found', + retryable: false, + }) }) return } @@ -173,22 +170,9 @@ async function flushSubblockUpdate( }) if (updateSuccessful) { - // Collect all sender socket IDs to exclude from broadcast + // Broadcast to room excluding all senders (works cross-pod via Redis adapter) const senderSocketIds = [...pending.opToSocket.values()] - const firstSenderSocket = - senderSocketIds.length > 0 ? io.sockets.sockets.get(senderSocketIds[0]) : null - - if (firstSenderSocket) { - // socket.to(room).emit() excludes sender and broadcasts across all pods via Redis adapter - firstSenderSocket.to(workflowId).emit('subblock-update', { - blockId, - subblockId, - value, - timestamp, - }) - } else if (senderSocketIds.length > 0) { - // Senders disconnected but we should still exclude them in case they reconnected - // Use io.except() to exclude all sender socket IDs + if (senderSocketIds.length > 0) { io.to(workflowId).except(senderSocketIds).emit('subblock-update', { blockId, subblockId, @@ -196,8 +180,7 @@ async function flushSubblockUpdate( timestamp, }) } else { - // No senders tracked (edge case) - broadcast to all - roomManager.emitToWorkflow(workflowId, 'subblock-update', { + io.to(workflowId).emit('subblock-update', { blockId, subblockId, value, @@ -205,36 +188,30 @@ async function flushSubblockUpdate( }) } - // Confirm all coalesced operationIds (only to sockets still connected on this pod) + // Confirm all coalesced operationIds (io.to(socketId) works cross-pod) pending.opToSocket.forEach((socketId, opId) => { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() }) - } + io.to(socketId).emit('operation-confirmed', { + operationId: opId, + serverTimestamp: Date.now(), + }) }) } else { pending.opToSocket.forEach((socketId, opId) => { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('operation-failed', { - operationId: opId, - error: 'Block no longer exists', - retryable: false, - }) - } + io.to(socketId).emit('operation-failed', { + operationId: opId, + error: 'Block no longer exists', + retryable: false, + }) }) } } catch (error) { logger.error('Error flushing subblock update:', error) pending.opToSocket.forEach((socketId, opId) => { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('operation-failed', { - operationId: opId, - error: error instanceof Error ? error.message : 'Unknown error', - retryable: true, - }) - } + io.to(socketId).emit('operation-failed', { + operationId: opId, + error: error instanceof Error ? error.message : 'Unknown error', + retryable: true, + }) }) } } diff --git a/apps/sim/socket/handlers/variables.ts b/apps/sim/socket/handlers/variables.ts index d53b295ee..8ac4bcb81 100644 --- a/apps/sim/socket/handlers/variables.ts +++ b/apps/sim/socket/handlers/variables.ts @@ -127,14 +127,11 @@ async function flushVariableUpdate( if (workflowExists.length === 0) { pending.opToSocket.forEach((socketId, opId) => { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('operation-failed', { - operationId: opId, - error: 'Workflow not found', - retryable: false, - }) - } + io.to(socketId).emit('operation-failed', { + operationId: opId, + error: 'Workflow not found', + retryable: false, + }) }) return } @@ -170,22 +167,9 @@ async function flushVariableUpdate( }) if (updateSuccessful) { - // Collect all sender socket IDs to exclude from broadcast + // Broadcast to room excluding all senders (works cross-pod via Redis adapter) const senderSocketIds = [...pending.opToSocket.values()] - const firstSenderSocket = - senderSocketIds.length > 0 ? io.sockets.sockets.get(senderSocketIds[0]) : null - - if (firstSenderSocket) { - // socket.to(room).emit() excludes sender and broadcasts across all pods via Redis adapter - firstSenderSocket.to(workflowId).emit('variable-update', { - variableId, - field, - value, - timestamp, - }) - } else if (senderSocketIds.length > 0) { - // Senders disconnected but we should still exclude them in case they reconnected - // Use io.except() to exclude all sender socket IDs + if (senderSocketIds.length > 0) { io.to(workflowId).except(senderSocketIds).emit('variable-update', { variableId, field, @@ -193,8 +177,7 @@ async function flushVariableUpdate( timestamp, }) } else { - // No senders tracked (edge case) - broadcast to all - roomManager.emitToWorkflow(workflowId, 'variable-update', { + io.to(workflowId).emit('variable-update', { variableId, field, value, @@ -202,37 +185,32 @@ async function flushVariableUpdate( }) } + // Confirm all coalesced operationIds (io.to(socketId) works cross-pod) pending.opToSocket.forEach((socketId, opId) => { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() }) - } + io.to(socketId).emit('operation-confirmed', { + operationId: opId, + serverTimestamp: Date.now(), + }) }) logger.debug(`Flushed variable update ${workflowId}: ${variableId}.${field}`) } else { pending.opToSocket.forEach((socketId, opId) => { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('operation-failed', { - operationId: opId, - error: 'Variable no longer exists', - retryable: false, - }) - } + io.to(socketId).emit('operation-failed', { + operationId: opId, + error: 'Variable no longer exists', + retryable: false, + }) }) } } catch (error) { logger.error('Error flushing variable update:', error) pending.opToSocket.forEach((socketId, opId) => { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('operation-failed', { - operationId: opId, - error: error instanceof Error ? error.message : 'Unknown error', - retryable: true, - }) - } + io.to(socketId).emit('operation-failed', { + operationId: opId, + error: error instanceof Error ? error.message : 'Unknown error', + retryable: true, + }) }) } }