From 1fbe3029f4add491332870cab8e2b0856b35b7a5 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 29 Jan 2026 17:30:19 -0800 Subject: [PATCH] fix bugbot comments --- .../workspace/providers/socket-provider.tsx | 33 ++++++++++-- apps/sim/socket/handlers/subblocks.ts | 50 ++++++++++--------- apps/sim/socket/handlers/variables.ts | 50 ++++++++++--------- apps/sim/socket/rooms/redis-manager.ts | 13 ++++- 4 files changed, 95 insertions(+), 51 deletions(-) diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 0c985f4e5..bf091974b 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -127,6 +127,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [presenceUsers, setPresenceUsers] = useState([]) const [authFailed, setAuthFailed] = useState(false) const initializedRef = useRef(false) + const socketRef = useRef(null) const params = useParams() const urlWorkflowId = params?.workflowId as string | undefined @@ -146,6 +147,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }>({}) const positionUpdateTimeouts = useRef>(new Map()) + const isRejoiningRef = useRef(false) const pendingPositionUpdates = useRef>(new Map()) const generateSocketToken = async (): Promise => { @@ -315,6 +317,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { // Handle join workflow success - confirms room membership with presence list socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => { + isRejoiningRef.current = false setCurrentWorkflowId(workflowId) setPresenceUsers(presenceUsers || []) logger.info(`Successfully joined workflow room: ${workflowId}`, { @@ -322,6 +325,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) }) + socketInstance.on('join-workflow-error', ({ error }) => { + isRejoiningRef.current = false + logger.error('Failed to join workflow:', error) + }) + socketInstance.on('workflow-operation', (data) => { eventHandlers.current.workflowOperation?.(data) }) @@ -485,6 +493,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socketInstance.on('operation-forbidden', (error) => { logger.warn('Operation forbidden:', error) + + if (error?.type === 'SESSION_ERROR') { + const workflowId = urlWorkflowIdRef.current + + if (workflowId && !isRejoiningRef.current) { + isRejoiningRef.current = true + logger.info(`Session expired, rejoining workflow: ${workflowId}`) + socketInstance.emit('join-workflow', { + workflowId, + tabSessionId: getTabSessionId(), + }) + } + } }) socketInstance.on('workflow-state', async (workflowData) => { @@ -495,11 +516,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } }) + socketRef.current = socketInstance setSocket(socketInstance) - - return () => { - socketInstance.close() - } } catch (error) { logger.error('Failed to initialize socket with token:', error) setIsConnecting(false) @@ -514,6 +532,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) positionUpdateTimeouts.current.clear() pendingPositionUpdates.current.clear() + + // Close socket on unmount + if (socketRef.current) { + logger.info('Closing socket connection on unmount') + socketRef.current.close() + socketRef.current = null + } } }, [user?.id, authFailed]) diff --git a/apps/sim/socket/handlers/subblocks.ts b/apps/sim/socket/handlers/subblocks.ts index 15c5cd54a..bad69352f 100644 --- a/apps/sim/socket/handlers/subblocks.ts +++ b/apps/sim/socket/handlers/subblocks.ts @@ -36,32 +36,36 @@ export function cleanupPendingSubblocksForSocket(socketId: string): void { export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('subblock-update', async (data) => { - const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) - const session = await roomManager.getUserSession(socket.id) - - if (!workflowId || !session) { - logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, { - socketId: socket.id, - hasWorkflowId: !!workflowId, - hasSession: !!session, - }) - return - } - const { blockId, subblockId, value, timestamp, operationId } = data - const hasRoom = await roomManager.hasWorkflowRoom(workflowId) - if (!hasRoom) { - logger.debug(`Ignoring subblock update: workflow room not found`, { - socketId: socket.id, - workflowId, - blockId, - subblockId, - }) - return - } - try { + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const session = await roomManager.getUserSession(socket.id) + + if (!workflowId || !session) { + logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, { + socketId: socket.id, + hasWorkflowId: !!workflowId, + hasSession: !!session, + }) + socket.emit('operation-forbidden', { + type: 'SESSION_ERROR', + message: 'Session expired, please rejoin workflow', + }) + return + } + + const hasRoom = await roomManager.hasWorkflowRoom(workflowId) + if (!hasRoom) { + logger.debug(`Ignoring subblock update: workflow room not found`, { + socketId: socket.id, + workflowId, + blockId, + subblockId, + }) + return + } + // Update user activity await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() }) diff --git a/apps/sim/socket/handlers/variables.ts b/apps/sim/socket/handlers/variables.ts index 8ac4bcb81..a34b60126 100644 --- a/apps/sim/socket/handlers/variables.ts +++ b/apps/sim/socket/handlers/variables.ts @@ -32,32 +32,36 @@ export function cleanupPendingVariablesForSocket(socketId: string): void { export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('variable-update', async (data) => { - const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) - const session = await roomManager.getUserSession(socket.id) - - if (!workflowId || !session) { - logger.debug(`Ignoring variable update: socket not connected to any workflow room`, { - socketId: socket.id, - hasWorkflowId: !!workflowId, - hasSession: !!session, - }) - return - } - const { variableId, field, value, timestamp, operationId } = data - const hasRoom = await roomManager.hasWorkflowRoom(workflowId) - if (!hasRoom) { - logger.debug(`Ignoring variable update: workflow room not found`, { - socketId: socket.id, - workflowId, - variableId, - field, - }) - return - } - try { + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const session = await roomManager.getUserSession(socket.id) + + if (!workflowId || !session) { + logger.debug(`Ignoring variable update: socket not connected to any workflow room`, { + socketId: socket.id, + hasWorkflowId: !!workflowId, + hasSession: !!session, + }) + socket.emit('operation-forbidden', { + type: 'SESSION_ERROR', + message: 'Session expired, please rejoin workflow', + }) + return + } + + const hasRoom = await roomManager.hasWorkflowRoom(workflowId) + if (!hasRoom) { + logger.debug(`Ignoring variable update: workflow room not found`, { + socketId: socket.id, + workflowId, + variableId, + field, + }) + return + } + // Update user activity await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() }) diff --git a/apps/sim/socket/rooms/redis-manager.ts b/apps/sim/socket/rooms/redis-manager.ts index 20e59444d..c710dba54 100644 --- a/apps/sim/socket/rooms/redis-manager.ts +++ b/apps/sim/socket/rooms/redis-manager.ts @@ -48,13 +48,17 @@ return workflowId /** * Lua script for atomic user activity update. * Performs read-modify-write atomically to prevent lost updates. + * Also refreshes TTL on socket keys to prevent expiry during long sessions. */ const UPDATE_ACTIVITY_SCRIPT = ` local workflowUsersKey = KEYS[1] +local socketWorkflowKey = KEYS[2] +local socketSessionKey = KEYS[3] local socketId = ARGV[1] local cursorJson = ARGV[2] local selectionJson = ARGV[3] local lastActivity = ARGV[4] +local ttl = tonumber(ARGV[5]) local existingJson = redis.call('HGET', workflowUsersKey, socketId) if not existingJson then @@ -72,6 +76,8 @@ end existing.lastActivity = tonumber(lastActivity) redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing)) +redis.call('EXPIRE', socketWorkflowKey, ttl) +redis.call('EXPIRE', socketSessionKey, ttl) return 1 ` @@ -269,12 +275,17 @@ export class RedisRoomManager implements IRoomManager { try { await this.redis.evalSha(this.updateActivityScriptSha, { - keys: [KEYS.workflowUsers(workflowId)], + keys: [ + KEYS.workflowUsers(workflowId), + KEYS.socketWorkflow(socketId), + KEYS.socketSession(socketId), + ], arguments: [ socketId, updates.cursor ? JSON.stringify(updates.cursor) : '', updates.selection ? JSON.stringify(updates.selection) : '', (updates.lastActivity ?? Date.now()).toString(), + SOCKET_KEY_TTL.toString(), ], }) } catch (error) {