From 0cb671449658e0e9a62e49ae3d4512ebd12f27fa Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 7 Feb 2026 12:18:07 -0800 Subject: [PATCH] fix(rooms): cleanup edge case for 1hr ttl (#3163) * fix(rooms): cleanup edge case for 1hr ttl * revert feature flags * address comments * remove console log --- apps/sim/serializer/index.ts | 1 - apps/sim/socket/handlers/connection.ts | 3 +- apps/sim/socket/handlers/workflow.ts | 70 +++++++++++++++++++------ apps/sim/socket/rooms/memory-manager.ts | 2 +- apps/sim/socket/rooms/redis-manager.ts | 52 ++++++++++++++---- apps/sim/socket/rooms/types.ts | 5 +- 6 files changed, 104 insertions(+), 29 deletions(-) diff --git a/apps/sim/serializer/index.ts b/apps/sim/serializer/index.ts index 66f4568a4..622667d9f 100644 --- a/apps/sim/serializer/index.ts +++ b/apps/sim/serializer/index.ts @@ -70,7 +70,6 @@ function shouldSerializeSubBlock( : group.basicId === subBlockConfig.id return matchesMode && evaluateSubBlockCondition(subBlockConfig.condition, values) } - console.log('[FUCK] subBlockConfig.condition', subBlockConfig.condition, values) return evaluateSubBlockCondition(subBlockConfig.condition, values) } diff --git a/apps/sim/socket/handlers/connection.ts b/apps/sim/socket/handlers/connection.ts index 5444c9a83..ee7a9a774 100644 --- a/apps/sim/socket/handlers/connection.ts +++ b/apps/sim/socket/handlers/connection.ts @@ -21,7 +21,8 @@ export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager cleanupPendingSubblocksForSocket(socket.id) cleanupPendingVariablesForSocket(socket.id) - const workflowId = await roomManager.removeUserFromRoom(socket.id) + const workflowIdHint = [...socket.rooms].find((roomId) => roomId !== socket.id) + const workflowId = await roomManager.removeUserFromRoom(socket.id, workflowIdHint) if (workflowId) { await roomManager.broadcastPresenceUpdate(workflowId) diff --git a/apps/sim/socket/handlers/workflow.ts b/apps/sim/socket/handlers/workflow.ts index c59316d1e..8353f0a38 100644 --- a/apps/sim/socket/handlers/workflow.ts +++ b/apps/sim/socket/handlers/workflow.ts @@ -51,26 +51,66 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: const currentWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) if (currentWorkflowId) { socket.leave(currentWorkflowId) - await roomManager.removeUserFromRoom(socket.id) + await roomManager.removeUserFromRoom(socket.id, currentWorkflowId) await roomManager.broadcastPresenceUpdate(currentWorkflowId) } - const STALE_THRESHOLD_MS = 60_000 + // Keep this above Redis socket key TTL (1h) so a normal idle user is not evicted too aggressively. + const STALE_THRESHOLD_MS = 75 * 60 * 1000 const now = Date.now() const existingUsers = await roomManager.getWorkflowUsers(workflowId) - for (const existingUser of existingUsers) { - if (existingUser.userId === userId && existingUser.socketId !== socket.id) { - const isSameTab = tabSessionId && existingUser.tabSessionId === tabSessionId - const isStale = - now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS + let liveSocketIds = new Set() + let canCheckLiveness = false - if (isSameTab || isStale) { - logger.info( - `Cleaning up socket ${existingUser.socketId} for user ${userId} (${isSameTab ? 'same tab' : 'stale'})` - ) - await roomManager.removeUserFromRoom(existingUser.socketId) - roomManager.io.in(existingUser.socketId).socketsLeave(workflowId) + try { + const liveSockets = await roomManager.io.in(workflowId).fetchSockets() + liveSocketIds = new Set(liveSockets.map((liveSocket) => liveSocket.id)) + canCheckLiveness = true + } catch (error) { + logger.warn( + `Skipping stale cleanup for ${workflowId} due to live socket lookup failure`, + error + ) + } + + for (const existingUser of existingUsers) { + try { + if (existingUser.socketId === socket.id) { + continue } + + const isSameTab = Boolean( + existingUser.userId === userId && + tabSessionId && + existingUser.tabSessionId === tabSessionId + ) + + if (isSameTab) { + logger.info( + `Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (same tab)` + ) + await roomManager.removeUserFromRoom(existingUser.socketId, workflowId) + await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId) + continue + } + + if (!canCheckLiveness || liveSocketIds.has(existingUser.socketId)) { + continue + } + + const isStaleByActivity = + now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS + if (!isStaleByActivity) { + continue + } + + logger.info( + `Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (stale activity)` + ) + await roomManager.removeUserFromRoom(existingUser.socketId, workflowId) + await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId) + } catch (error) { + logger.warn(`Best-effort cleanup failed for socket ${existingUser.socketId}`, error) } } @@ -136,7 +176,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: logger.error('Error joining workflow:', error) // Undo socket.join and room manager entry if any operation failed socket.leave(workflowId) - await roomManager.removeUserFromRoom(socket.id) + await roomManager.removeUserFromRoom(socket.id, workflowId) const isReady = roomManager.isReady() socket.emit('join-workflow-error', { error: isReady ? 'Failed to join workflow' : 'Realtime unavailable', @@ -156,7 +196,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: if (workflowId && session) { socket.leave(workflowId) - await roomManager.removeUserFromRoom(socket.id) + await roomManager.removeUserFromRoom(socket.id, workflowId) await roomManager.broadcastPresenceUpdate(workflowId) logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`) diff --git a/apps/sim/socket/rooms/memory-manager.ts b/apps/sim/socket/rooms/memory-manager.ts index 908ee13f7..fa631ff68 100644 --- a/apps/sim/socket/rooms/memory-manager.ts +++ b/apps/sim/socket/rooms/memory-manager.ts @@ -66,7 +66,7 @@ export class MemoryRoomManager implements IRoomManager { logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`) } - async removeUserFromRoom(socketId: string): Promise { + async removeUserFromRoom(socketId: string, _workflowIdHint?: string): Promise { const workflowId = this.socketToWorkflow.get(socketId) if (!workflowId) { diff --git a/apps/sim/socket/rooms/redis-manager.ts b/apps/sim/socket/rooms/redis-manager.ts index 9288a4762..fb0d0d104 100644 --- a/apps/sim/socket/rooms/redis-manager.ts +++ b/apps/sim/socket/rooms/redis-manager.ts @@ -10,9 +10,11 @@ const KEYS = { workflowMeta: (wfId: string) => `workflow:${wfId}:meta`, socketWorkflow: (socketId: string) => `socket:${socketId}:workflow`, socketSession: (socketId: string) => `socket:${socketId}:session`, + socketPresenceWorkflow: (socketId: string) => `socket:${socketId}:presence-workflow`, } as const const SOCKET_KEY_TTL = 3600 +const SOCKET_PRESENCE_WORKFLOW_KEY_TTL = 24 * 60 * 60 /** * Lua script for atomic user removal from room. @@ -22,11 +24,21 @@ const SOCKET_KEY_TTL = 3600 const REMOVE_USER_SCRIPT = ` local socketWorkflowKey = KEYS[1] local socketSessionKey = KEYS[2] +local socketPresenceWorkflowKey = KEYS[3] local workflowUsersPrefix = ARGV[1] local workflowMetaPrefix = ARGV[2] local socketId = ARGV[3] +local workflowIdHint = ARGV[4] local workflowId = redis.call('GET', socketWorkflowKey) +if not workflowId then + workflowId = redis.call('GET', socketPresenceWorkflowKey) +end + +if not workflowId and workflowIdHint ~= '' then + workflowId = workflowIdHint +end + if not workflowId then return nil end @@ -35,7 +47,7 @@ local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users' local workflowMetaKey = workflowMetaPrefix .. workflowId .. ':meta' redis.call('HDEL', workflowUsersKey, socketId) -redis.call('DEL', socketWorkflowKey, socketSessionKey) +redis.call('DEL', socketWorkflowKey, socketSessionKey, socketPresenceWorkflowKey) local remaining = redis.call('HLEN', workflowUsersKey) if remaining == 0 then @@ -54,11 +66,13 @@ const UPDATE_ACTIVITY_SCRIPT = ` local workflowUsersKey = KEYS[1] local socketWorkflowKey = KEYS[2] local socketSessionKey = KEYS[3] +local socketPresenceWorkflowKey = KEYS[4] local socketId = ARGV[1] local cursorJson = ARGV[2] local selectionJson = ARGV[3] local lastActivity = ARGV[4] local ttl = tonumber(ARGV[5]) +local presenceWorkflowTtl = tonumber(ARGV[6]) local existingJson = redis.call('HGET', workflowUsersKey, socketId) if not existingJson then @@ -78,6 +92,7 @@ existing.lastActivity = tonumber(lastActivity) redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing)) redis.call('EXPIRE', socketWorkflowKey, ttl) redis.call('EXPIRE', socketSessionKey, ttl) +redis.call('EXPIRE', socketPresenceWorkflowKey, presenceWorkflowTtl) return 1 ` @@ -164,6 +179,8 @@ export class RedisRoomManager implements IRoomManager { pipeline.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString()) pipeline.set(KEYS.socketWorkflow(socketId), workflowId) pipeline.expire(KEYS.socketWorkflow(socketId), SOCKET_KEY_TTL) + pipeline.set(KEYS.socketPresenceWorkflow(socketId), workflowId) + pipeline.expire(KEYS.socketPresenceWorkflow(socketId), SOCKET_PRESENCE_WORKFLOW_KEY_TTL) pipeline.hSet(KEYS.socketSession(socketId), { userId: presence.userId, userName: presence.userName, @@ -187,7 +204,11 @@ export class RedisRoomManager implements IRoomManager { } } - async removeUserFromRoom(socketId: string, retried = false): Promise { + async removeUserFromRoom( + socketId: string, + workflowIdHint?: string, + retried = false + ): Promise { if (!this.removeUserScriptSha) { logger.error('removeUserFromRoom called before initialize()') return null @@ -195,19 +216,25 @@ export class RedisRoomManager implements IRoomManager { try { const workflowId = await this.redis.evalSha(this.removeUserScriptSha, { - keys: [KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId)], - arguments: ['workflow:', 'workflow:', socketId], + keys: [ + KEYS.socketWorkflow(socketId), + KEYS.socketSession(socketId), + KEYS.socketPresenceWorkflow(socketId), + ], + arguments: ['workflow:', 'workflow:', socketId, workflowIdHint ?? ''], }) - if (workflowId) { + if (typeof workflowId === 'string' && workflowId.length > 0) { logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`) + return workflowId } - return workflowId as string | null + + return null } catch (error) { if ((error as Error).message?.includes('NOSCRIPT') && !retried) { logger.warn('Lua script not found, reloading...') this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT) - return this.removeUserFromRoom(socketId, true) + return this.removeUserFromRoom(socketId, workflowIdHint, true) } logger.error(`Failed to remove user from room: ${socketId}`, error) return null @@ -215,7 +242,12 @@ export class RedisRoomManager implements IRoomManager { } async getWorkflowIdForSocket(socketId: string): Promise { - return this.redis.get(KEYS.socketWorkflow(socketId)) + const workflowId = await this.redis.get(KEYS.socketWorkflow(socketId)) + if (workflowId) { + return workflowId + } + + return this.redis.get(KEYS.socketPresenceWorkflow(socketId)) } async getUserSession(socketId: string): Promise { @@ -278,6 +310,7 @@ export class RedisRoomManager implements IRoomManager { KEYS.workflowUsers(workflowId), KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId), + KEYS.socketPresenceWorkflow(socketId), ], arguments: [ socketId, @@ -285,6 +318,7 @@ export class RedisRoomManager implements IRoomManager { updates.selection !== undefined ? JSON.stringify(updates.selection) : '', (updates.lastActivity ?? Date.now()).toString(), SOCKET_KEY_TTL.toString(), + SOCKET_PRESENCE_WORKFLOW_KEY_TTL.toString(), ], }) } catch (error) { @@ -348,7 +382,7 @@ export class RedisRoomManager implements IRoomManager { // Remove all users from Redis state for (const user of users) { - await this.removeUserFromRoom(user.socketId) + await this.removeUserFromRoom(user.socketId, workflowId) } // Clean up room data diff --git a/apps/sim/socket/rooms/types.ts b/apps/sim/socket/rooms/types.ts index b294646f6..5c755a739 100644 --- a/apps/sim/socket/rooms/types.ts +++ b/apps/sim/socket/rooms/types.ts @@ -65,9 +65,10 @@ export interface IRoomManager { /** * Remove a user from their current room - * Returns the workflowId they were in, or null if not in any room + * Optional workflowIdHint is used when socket mapping keys are missing/expired. + * Returns the workflowId they were in, or null if not in any room. */ - removeUserFromRoom(socketId: string): Promise + removeUserFromRoom(socketId: string, workflowIdHint?: string): Promise /** * Get the workflow ID for a socket