diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 425c64882..3cebcaa57 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger' import { useParams } from 'next/navigation' import { io, type Socket } from 'socket.io-client' import { getEnv } from '@/lib/core/config/env' +import { useOperationQueueStore } from '@/stores/operation-queue/store' const logger = createLogger('SocketContext') @@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [authFailed, setAuthFailed] = useState(false) const initializedRef = useRef(false) const socketRef = useRef(null) + const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode) const params = useParams() const urlWorkflowId = params?.workflowId as string | undefined @@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) }) - socketInstance.on('join-workflow-error', ({ error }) => { + socketInstance.on('join-workflow-error', ({ error, code }) => { isRejoiningRef.current = false - logger.error('Failed to join workflow:', error) + logger.error('Failed to join workflow:', { error, code }) + if (code === 'ROOM_MANAGER_UNAVAILABLE') { + triggerOfflineMode() + } }) socketInstance.on('workflow-operation', (data) => { diff --git a/apps/sim/socket/handlers/operations.ts b/apps/sim/socket/handlers/operations.ts index c8c538a33..4852d79d3 100644 --- a/apps/sim/socket/handlers/operations.ts +++ b/apps/sim/socket/handlers/operations.ts @@ -12,39 +12,70 @@ import { import { persistWorkflowOperation } from '@/socket/database/operations' import type { AuthenticatedSocket } from '@/socket/middleware/auth' import { checkRolePermission } from '@/socket/middleware/permissions' -import type { IRoomManager } from '@/socket/rooms' +import type { IRoomManager, UserSession } from '@/socket/rooms' import { WorkflowOperationSchema } from '@/socket/validation/schemas' const logger = createLogger('OperationsHandlers') export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('workflow-operation', async (data) => { - const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) - const session = await roomManager.getUserSession(socket.id) - - if (!workflowId || !session) { - socket.emit('operation-forbidden', { - type: 'SESSION_ERROR', - message: 'Session expired, please rejoin workflow', - }) - if (data?.operationId) { - socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' }) + const emitOperationError = ( + forbidden: { type: string; message: string; operation?: string; target?: string }, + failed?: { error: string; retryable?: boolean } + ) => { + socket.emit('operation-forbidden', forbidden) + if (failed && data?.operationId) { + socket.emit('operation-failed', { operationId: data.operationId, ...failed }) } + } + + if (!roomManager.isReady()) { + emitOperationError( + { type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' }, + { error: 'Realtime unavailable', retryable: true } + ) return } - const hasRoom = await roomManager.hasWorkflowRoom(workflowId) + let workflowId: string | null = null + let session: UserSession | null = null + + try { + workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + session = await roomManager.getUserSession(socket.id) + } catch (error) { + logger.error('Error loading session for workflow operation:', error) + emitOperationError( + { type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' }, + { error: 'Realtime unavailable', retryable: true } + ) + return + } + + if (!workflowId || !session) { + emitOperationError( + { type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' }, + { error: 'Session expired' } + ) + return + } + + let hasRoom = false + try { + hasRoom = await roomManager.hasWorkflowRoom(workflowId) + } catch (error) { + logger.error('Error checking workflow room:', error) + emitOperationError( + { type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' }, + { error: 'Realtime unavailable', retryable: true } + ) + return + } if (!hasRoom) { - socket.emit('operation-forbidden', { - type: 'ROOM_NOT_FOUND', - message: 'Workflow room not found', - }) - if (data?.operationId) { - socket.emit('operation-failed', { - operationId: data.operationId, - error: 'Workflow room not found', - }) - } + emitOperationError( + { type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' }, + { error: 'Workflow room not found' } + ) return } @@ -77,15 +108,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager // Check permissions from cached role for all other operations if (!userPresence) { logger.warn(`User presence not found for socket ${socket.id}`) - socket.emit('operation-forbidden', { - type: 'SESSION_ERROR', - message: 'User session not found', - operation, - target, - }) - if (operationId) { - socket.emit('operation-failed', { operationId, error: 'User session not found' }) - } + emitOperationError( + { + type: 'SESSION_ERROR', + message: 'User session not found', + operation, + target, + }, + { error: 'User session not found' } + ) return } @@ -97,7 +128,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager logger.warn( `User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}` ) - socket.emit('operation-forbidden', { + emitOperationError({ type: 'INSUFFICIENT_PERMISSIONS', message: `${permissionCheck.reason} on '${target}'`, operation, diff --git a/apps/sim/socket/handlers/subblocks.ts b/apps/sim/socket/handlers/subblocks.ts index df2cac862..3ff5657c5 100644 --- a/apps/sim/socket/handlers/subblocks.ts +++ b/apps/sim/socket/handlers/subblocks.ts @@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: operationId, } = data + if (!roomManager.isReady()) { + socket.emit('operation-forbidden', { + type: 'ROOM_MANAGER_UNAVAILABLE', + message: 'Realtime unavailable', + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: 'Realtime unavailable', + retryable: true, + }) + } + return + } + try { const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) diff --git a/apps/sim/socket/handlers/variables.ts b/apps/sim/socket/handlers/variables.ts index 5b36873a8..22e3aeed5 100644 --- a/apps/sim/socket/handlers/variables.ts +++ b/apps/sim/socket/handlers/variables.ts @@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: socket.on('variable-update', async (data) => { const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data + if (!roomManager.isReady()) { + socket.emit('operation-forbidden', { + type: 'ROOM_MANAGER_UNAVAILABLE', + message: 'Realtime unavailable', + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: 'Realtime unavailable', + retryable: true, + }) + } + return + } + try { const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) diff --git a/apps/sim/socket/handlers/workflow.ts b/apps/sim/socket/handlers/workflow.ts index 243c8264f..c59316d1e 100644 --- a/apps/sim/socket/handlers/workflow.ts +++ b/apps/sim/socket/handlers/workflow.ts @@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: return } + if (!roomManager.isReady()) { + logger.warn(`Join workflow rejected: Room manager unavailable`) + socket.emit('join-workflow-error', { + error: 'Realtime unavailable', + code: 'ROOM_MANAGER_UNAVAILABLE', + }) + return + } + logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`) // Verify workflow access @@ -128,12 +137,20 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: // Undo socket.join and room manager entry if any operation failed socket.leave(workflowId) await roomManager.removeUserFromRoom(socket.id) - socket.emit('join-workflow-error', { error: 'Failed to join workflow' }) + const isReady = roomManager.isReady() + socket.emit('join-workflow-error', { + error: isReady ? 'Failed to join workflow' : 'Realtime unavailable', + code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE', + }) } }) socket.on('leave-workflow', async () => { try { + if (!roomManager.isReady()) { + return + } + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) diff --git a/apps/sim/socket/rooms/memory-manager.ts b/apps/sim/socket/rooms/memory-manager.ts index 4633bc775..908ee13f7 100644 --- a/apps/sim/socket/rooms/memory-manager.ts +++ b/apps/sim/socket/rooms/memory-manager.ts @@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager { logger.info('MemoryRoomManager initialized (single-pod mode)') } + isReady(): boolean { + return true + } + async shutdown(): Promise { this.workflowRooms.clear() this.socketToWorkflow.clear() diff --git a/apps/sim/socket/rooms/redis-manager.ts b/apps/sim/socket/rooms/redis-manager.ts index 38dde216f..9288a4762 100644 --- a/apps/sim/socket/rooms/redis-manager.ts +++ b/apps/sim/socket/rooms/redis-manager.ts @@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager { this._io = io this.redis = createClient({ url: redisUrl, - socket: { - reconnectStrategy: (retries) => { - if (retries > 10) { - logger.error('Redis reconnection failed after 10 attempts') - return new Error('Redis reconnection failed') - } - const delay = Math.min(retries * 100, 3000) - logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`) - return delay - }, - }, }) this.redis.on('error', (err) => { @@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager { logger.info('Redis client ready') this.isConnected = true }) + + this.redis.on('end', () => { + logger.warn('Redis client connection closed') + this.isConnected = false + }) } get io(): Server { return this._io } + isReady(): boolean { + return this.isConnected + } + async initialize(): Promise { if (this.isConnected) return diff --git a/apps/sim/socket/rooms/types.ts b/apps/sim/socket/rooms/types.ts index 4e3fc56b9..b294646f6 100644 --- a/apps/sim/socket/rooms/types.ts +++ b/apps/sim/socket/rooms/types.ts @@ -48,6 +48,11 @@ export interface IRoomManager { */ initialize(): Promise + /** + * Whether the room manager is ready to serve requests + */ + isReady(): boolean + /** * Clean shutdown */ diff --git a/apps/sim/socket/routes/http.ts b/apps/sim/socket/routes/http.ts index afd5946c5..248c34fde 100644 --- a/apps/sim/socket/routes/http.ts +++ b/apps/sim/socket/routes/http.ts @@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) { res.end(JSON.stringify({ error: authResult.error })) return } + + if (!roomManager.isReady()) { + sendError(res, 'Room manager unavailable', 503) + return + } } // Handle workflow deletion notifications from the main API