From a213ad98cf769728fc49a07cd8dd82275f2a1140 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 2 Feb 2026 09:59:22 -0800 Subject: [PATCH] improvement(rooms): redis client closed should fail fast --- .../workspace/providers/socket-provider.tsx | 9 ++++- apps/sim/socket/handlers/operations.ts | 40 +++++++++++++++++-- apps/sim/socket/handlers/subblocks.ts | 15 +++++++ apps/sim/socket/handlers/variables.ts | 15 +++++++ apps/sim/socket/handlers/workflow.ts | 18 ++++++++- apps/sim/socket/rooms/memory-manager.ts | 4 ++ apps/sim/socket/rooms/redis-manager.ts | 20 +++++----- apps/sim/socket/rooms/types.ts | 5 +++ apps/sim/socket/routes/http.ts | 5 +++ 9 files changed, 114 insertions(+), 17 deletions(-) diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 425c64882..3cebcaa57 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger' import { useParams } from 'next/navigation' import { io, type Socket } from 'socket.io-client' import { getEnv } from '@/lib/core/config/env' +import { useOperationQueueStore } from '@/stores/operation-queue/store' const logger = createLogger('SocketContext') @@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [authFailed, setAuthFailed] = useState(false) const initializedRef = useRef(false) const socketRef = useRef(null) + const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode) const params = useParams() const urlWorkflowId = params?.workflowId as string | undefined @@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) }) - socketInstance.on('join-workflow-error', ({ error }) => { + socketInstance.on('join-workflow-error', ({ error, code }) => { isRejoiningRef.current = false - logger.error('Failed to join workflow:', error) + logger.error('Failed to join workflow:', { error, code }) + if (code === 'ROOM_MANAGER_UNAVAILABLE') { + triggerOfflineMode() + } }) socketInstance.on('workflow-operation', (data) => { diff --git a/apps/sim/socket/handlers/operations.ts b/apps/sim/socket/handlers/operations.ts index c8c538a33..04b08c5db 100644 --- a/apps/sim/socket/handlers/operations.ts +++ b/apps/sim/socket/handlers/operations.ts @@ -12,15 +12,49 @@ import { import { persistWorkflowOperation } from '@/socket/database/operations' import type { AuthenticatedSocket } from '@/socket/middleware/auth' import { checkRolePermission } from '@/socket/middleware/permissions' -import type { IRoomManager } from '@/socket/rooms' +import type { IRoomManager, UserSession } from '@/socket/rooms' import { WorkflowOperationSchema } from '@/socket/validation/schemas' const logger = createLogger('OperationsHandlers') export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('workflow-operation', async (data) => { - const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) - const session = await roomManager.getUserSession(socket.id) + if (!roomManager.isReady()) { + socket.emit('operation-forbidden', { + type: 'ROOM_MANAGER_UNAVAILABLE', + message: 'Realtime unavailable', + }) + if (data?.operationId) { + socket.emit('operation-failed', { + operationId: data.operationId, + error: 'Realtime unavailable', + retryable: true, + }) + } + return + } + + let workflowId: string | null = null + let session: UserSession | null = null + + try { + workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + session = await roomManager.getUserSession(socket.id) + } catch (error) { + logger.error('Error loading session for workflow operation:', error) + socket.emit('operation-forbidden', { + type: 'ROOM_MANAGER_UNAVAILABLE', + message: 'Realtime unavailable', + }) + if (data?.operationId) { + socket.emit('operation-failed', { + operationId: data.operationId, + error: 'Realtime unavailable', + retryable: true, + }) + } + return + } if (!workflowId || !session) { socket.emit('operation-forbidden', { diff --git a/apps/sim/socket/handlers/subblocks.ts b/apps/sim/socket/handlers/subblocks.ts index 23896fed3..cb50aca4a 100644 --- a/apps/sim/socket/handlers/subblocks.ts +++ b/apps/sim/socket/handlers/subblocks.ts @@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: operationId, } = data + if (!roomManager.isReady()) { + socket.emit('operation-forbidden', { + type: 'ROOM_MANAGER_UNAVAILABLE', + message: 'Realtime unavailable', + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: 'Realtime unavailable', + retryable: true, + }) + } + return + } + try { const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) diff --git a/apps/sim/socket/handlers/variables.ts b/apps/sim/socket/handlers/variables.ts index 5b36873a8..22e3aeed5 100644 --- a/apps/sim/socket/handlers/variables.ts +++ b/apps/sim/socket/handlers/variables.ts @@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: socket.on('variable-update', async (data) => { const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data + if (!roomManager.isReady()) { + socket.emit('operation-forbidden', { + type: 'ROOM_MANAGER_UNAVAILABLE', + message: 'Realtime unavailable', + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: 'Realtime unavailable', + retryable: true, + }) + } + return + } + try { const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) diff --git a/apps/sim/socket/handlers/workflow.ts b/apps/sim/socket/handlers/workflow.ts index 243c8264f..72b012eb5 100644 --- a/apps/sim/socket/handlers/workflow.ts +++ b/apps/sim/socket/handlers/workflow.ts @@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: return } + if (!roomManager.isReady()) { + logger.warn(`Join workflow rejected: Room manager unavailable`) + socket.emit('join-workflow-error', { + error: 'Realtime unavailable', + code: 'ROOM_MANAGER_UNAVAILABLE', + }) + return + } + logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`) // Verify workflow access @@ -128,12 +137,19 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: // Undo socket.join and room manager entry if any operation failed socket.leave(workflowId) await roomManager.removeUserFromRoom(socket.id) - socket.emit('join-workflow-error', { error: 'Failed to join workflow' }) + socket.emit('join-workflow-error', { + error: roomManager.isReady() ? 'Failed to join workflow' : 'Realtime unavailable', + code: roomManager.isReady() ? undefined : 'ROOM_MANAGER_UNAVAILABLE', + }) } }) socket.on('leave-workflow', async () => { try { + if (!roomManager.isReady()) { + return + } + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) diff --git a/apps/sim/socket/rooms/memory-manager.ts b/apps/sim/socket/rooms/memory-manager.ts index 4633bc775..908ee13f7 100644 --- a/apps/sim/socket/rooms/memory-manager.ts +++ b/apps/sim/socket/rooms/memory-manager.ts @@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager { logger.info('MemoryRoomManager initialized (single-pod mode)') } + isReady(): boolean { + return true + } + async shutdown(): Promise { this.workflowRooms.clear() this.socketToWorkflow.clear() diff --git a/apps/sim/socket/rooms/redis-manager.ts b/apps/sim/socket/rooms/redis-manager.ts index 38dde216f..9288a4762 100644 --- a/apps/sim/socket/rooms/redis-manager.ts +++ b/apps/sim/socket/rooms/redis-manager.ts @@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager { this._io = io this.redis = createClient({ url: redisUrl, - socket: { - reconnectStrategy: (retries) => { - if (retries > 10) { - logger.error('Redis reconnection failed after 10 attempts') - return new Error('Redis reconnection failed') - } - const delay = Math.min(retries * 100, 3000) - logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`) - return delay - }, - }, }) this.redis.on('error', (err) => { @@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager { logger.info('Redis client ready') this.isConnected = true }) + + this.redis.on('end', () => { + logger.warn('Redis client connection closed') + this.isConnected = false + }) } get io(): Server { return this._io } + isReady(): boolean { + return this.isConnected + } + async initialize(): Promise { if (this.isConnected) return diff --git a/apps/sim/socket/rooms/types.ts b/apps/sim/socket/rooms/types.ts index 4e3fc56b9..b294646f6 100644 --- a/apps/sim/socket/rooms/types.ts +++ b/apps/sim/socket/rooms/types.ts @@ -48,6 +48,11 @@ export interface IRoomManager { */ initialize(): Promise + /** + * Whether the room manager is ready to serve requests + */ + isReady(): boolean + /** * Clean shutdown */ diff --git a/apps/sim/socket/routes/http.ts b/apps/sim/socket/routes/http.ts index afd5946c5..248c34fde 100644 --- a/apps/sim/socket/routes/http.ts +++ b/apps/sim/socket/routes/http.ts @@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) { res.end(JSON.stringify({ error: authResult.error })) return } + + if (!roomManager.isReady()) { + sendError(res, 'Room manager unavailable', 503) + return + } } // Handle workflow deletion notifications from the main API