From fc1ca1e36bf5dc7b7576c5630289736fe4784321 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 29 Jan 2026 16:50:38 -0800 Subject: [PATCH] improvment(sockets): migrate to redis --- apps/sim/app/api/auth/socket-token/route.ts | 14 +- .../workspace/providers/socket-provider.tsx | 145 +++--- apps/sim/hooks/use-collaborative-workflow.ts | 19 - apps/sim/package.json | 2 + apps/sim/socket/config/socket.ts | 108 ++++- apps/sim/socket/handlers/connection.ts | 34 +- apps/sim/socket/handlers/index.ts | 20 +- apps/sim/socket/handlers/operations.ts | 62 ++- apps/sim/socket/handlers/presence.ts | 81 ++-- apps/sim/socket/handlers/subblocks.ts | 109 +++-- apps/sim/socket/handlers/variables.ts | 99 ++-- apps/sim/socket/handlers/workflow.ts | 120 ++--- apps/sim/socket/index.test.ts | 125 ++++-- apps/sim/socket/index.ts | 192 ++++---- apps/sim/socket/middleware/permissions.ts | 2 +- apps/sim/socket/rooms/index.ts | 3 + apps/sim/socket/rooms/manager.ts | 291 ------------ apps/sim/socket/rooms/memory-manager.ts | 260 +++++++++++ apps/sim/socket/rooms/redis-manager.ts | 422 ++++++++++++++++++ apps/sim/socket/rooms/types.ts | 140 ++++++ apps/sim/socket/routes/http.ts | 25 +- apps/sim/socket/validation/schemas.ts | 2 - bun.lock | 22 + helm/sim/templates/deployment-realtime.yaml | 4 + 24 files changed, 1521 insertions(+), 780 deletions(-) create mode 100644 apps/sim/socket/rooms/index.ts delete mode 100644 apps/sim/socket/rooms/manager.ts create mode 100644 apps/sim/socket/rooms/memory-manager.ts create mode 100644 apps/sim/socket/rooms/redis-manager.ts create mode 100644 apps/sim/socket/rooms/types.ts diff --git a/apps/sim/app/api/auth/socket-token/route.ts b/apps/sim/app/api/auth/socket-token/route.ts index 15743b971..b44e8be25 100644 --- a/apps/sim/app/api/auth/socket-token/route.ts +++ b/apps/sim/app/api/auth/socket-token/route.ts @@ -14,12 +14,22 @@ export async function POST() { headers: hdrs, }) - if (!response) { - return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 }) + if (!response?.token) { + // No token usually means invalid/expired session + return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) } return NextResponse.json({ token: response.token }) } catch (error) { + // Check if it's an auth-related error + const errorMessage = error instanceof Error ? error.message : String(error) + if ( + errorMessage.includes('session') || + errorMessage.includes('unauthorized') || + errorMessage.includes('unauthenticated') + ) { + return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) + } return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 }) } } diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 4e18d14b1..a5cfcbd26 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -17,6 +17,19 @@ import { getEnv } from '@/lib/core/config/env' const logger = createLogger('SocketContext') +const TAB_SESSION_ID_KEY = 'sim_tab_session_id' + +function getTabSessionId(): string { + if (typeof window === 'undefined') return '' + + let tabSessionId = sessionStorage.getItem(TAB_SESSION_ID_KEY) + if (!tabSessionId) { + tabSessionId = crypto.randomUUID() + sessionStorage.setItem(TAB_SESSION_ID_KEY, tabSessionId) + } + return tabSessionId +} + interface User { id: string name?: string @@ -36,11 +49,13 @@ interface SocketContextType { socket: Socket | null isConnected: boolean isConnecting: boolean + authFailed: boolean currentWorkflowId: string | null currentSocketId: string | null presenceUsers: PresenceUser[] joinWorkflow: (workflowId: string) => void leaveWorkflow: () => void + retryConnection: () => void emitWorkflowOperation: ( operation: string, target: string, @@ -63,8 +78,6 @@ interface SocketContextType { onCursorUpdate: (handler: (data: any) => void) => void onSelectionUpdate: (handler: (data: any) => void) => void - onUserJoined: (handler: (data: any) => void) => void - onUserLeft: (handler: (data: any) => void) => void onWorkflowDeleted: (handler: (data: any) => void) => void onWorkflowReverted: (handler: (data: any) => void) => void onOperationConfirmed: (handler: (data: any) => void) => void @@ -75,11 +88,13 @@ const SocketContext = createContext({ socket: null, isConnected: false, isConnecting: false, + authFailed: false, currentWorkflowId: null, currentSocketId: null, presenceUsers: [], joinWorkflow: () => {}, leaveWorkflow: () => {}, + retryConnection: () => {}, emitWorkflowOperation: () => {}, emitSubblockUpdate: () => {}, emitVariableUpdate: () => {}, @@ -90,8 +105,6 @@ const SocketContext = createContext({ onVariableUpdate: () => {}, onCursorUpdate: () => {}, onSelectionUpdate: () => {}, - onUserJoined: () => {}, - onUserLeft: () => {}, onWorkflowDeleted: () => {}, onWorkflowReverted: () => {}, onOperationConfirmed: () => {}, @@ -112,33 +125,42 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [currentWorkflowId, setCurrentWorkflowId] = useState(null) const [currentSocketId, setCurrentSocketId] = useState(null) const [presenceUsers, setPresenceUsers] = useState([]) + const [authFailed, setAuthFailed] = useState(false) const initializedRef = useRef(false) const params = useParams() const urlWorkflowId = params?.workflowId as string | undefined + const urlWorkflowIdRef = useRef(urlWorkflowId) + urlWorkflowIdRef.current = urlWorkflowId const eventHandlers = useRef<{ workflowOperation?: (data: any) => void subblockUpdate?: (data: any) => void variableUpdate?: (data: any) => void - cursorUpdate?: (data: any) => void selectionUpdate?: (data: any) => void - userJoined?: (data: any) => void - userLeft?: (data: any) => void workflowDeleted?: (data: any) => void workflowReverted?: (data: any) => void operationConfirmed?: (data: any) => void operationFailed?: (data: any) => void }>({}) + const positionUpdateTimeouts = useRef>(new Map()) + const pendingPositionUpdates = useRef>(new Map()) + const generateSocketToken = async (): Promise => { const res = await fetch('/api/auth/socket-token', { method: 'POST', credentials: 'include', headers: { 'cache-control': 'no-store' }, }) - if (!res.ok) throw new Error('Failed to generate socket token') + if (!res.ok) { + // 401/403 indicates session expiry - don't keep retrying + if (res.status === 401 || res.status === 403) { + throw new Error('Authentication required') + } + throw new Error('Failed to generate socket token') + } const body = await res.json().catch(() => ({})) const token = body?.token if (!token || typeof token !== 'string') throw new Error('Invalid socket token') @@ -148,6 +170,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { useEffect(() => { if (!user?.id) return + if (authFailed) { + logger.info('Socket initialization skipped - auth failed, waiting for retry') + return + } + if (initializedRef.current || socket || isConnecting) { logger.info('Socket already exists or is connecting, skipping initialization') return @@ -194,26 +221,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) { connected: socketInstance.connected, transport: socketInstance.io.engine?.transport?.name, }) - - if (urlWorkflowId) { - logger.info(`Joining workflow room after connection: ${urlWorkflowId}`) - socketInstance.emit('join-workflow', { - workflowId: urlWorkflowId, - }) - setCurrentWorkflowId(urlWorkflowId) - } + // Note: join-workflow is handled by the useEffect watching isConnected }) socketInstance.on('disconnect', (reason) => { setIsConnected(false) setIsConnecting(false) setCurrentSocketId(null) + setCurrentWorkflowId(null) + setPresenceUsers([]) logger.info('Socket disconnected', { reason, }) - - setPresenceUsers([]) }) socketInstance.on('connect_error', (error: any) => { @@ -226,24 +246,34 @@ export function SocketProvider({ children, user }: SocketProviderProps) { transport: error.transport, }) - if ( + // Check if this is an authentication failure + const isAuthError = error.message?.includes('Token validation failed') || error.message?.includes('Authentication failed') || error.message?.includes('Authentication required') - ) { + + if (isAuthError) { logger.warn( - 'Authentication failed - this could indicate session expiry or token generation issues' + 'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.' ) + // Stop reconnection attempts to prevent infinite loop + socketInstance.disconnect() + // Reset state to allow re-initialization when session is restored + setSocket(null) + setAuthFailed(true) + initializedRef.current = false } }) socketInstance.on('reconnect', (attemptNumber) => { + setIsConnected(true) setCurrentSocketId(socketInstance.id ?? null) logger.info('Socket reconnected successfully', { attemptNumber, socketId: socketInstance.id, transport: socketInstance.io.engine?.transport?.name, }) + // Note: join-workflow is handled by the useEffect watching isConnected }) socketInstance.on('reconnect_attempt', (attemptNumber) => { @@ -284,6 +314,15 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) }) + // Handle join workflow success - confirms room membership with presence list + socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => { + setCurrentWorkflowId(workflowId) + setPresenceUsers(presenceUsers || []) + logger.info(`Successfully joined workflow room: ${workflowId}`, { + presenceCount: presenceUsers?.length || 0, + }) + }) + socketInstance.on('workflow-operation', (data) => { eventHandlers.current.workflowOperation?.(data) }) @@ -298,10 +337,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socketInstance.on('workflow-deleted', (data) => { logger.warn(`Workflow ${data.workflowId} has been deleted`) - if (currentWorkflowId === data.workflowId) { - setCurrentWorkflowId(null) - setPresenceUsers([]) - } + setCurrentWorkflowId((current) => { + if (current === data.workflowId) { + setPresenceUsers([]) + return null + } + return current + }) eventHandlers.current.workflowDeleted?.(data) }) @@ -446,10 +488,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.warn('Operation forbidden:', error) }) - socketInstance.on('operation-confirmed', (data) => { - logger.debug('Operation confirmed:', data) - }) - socketInstance.on('workflow-state', async (workflowData) => { logger.info('Received workflow state from server') @@ -478,11 +516,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) { positionUpdateTimeouts.current.clear() pendingPositionUpdates.current.clear() } - }, [user?.id]) + }, [user?.id, authFailed]) useEffect(() => { if (!socket || !isConnected || !urlWorkflowId) return + // Skip if already in the correct room if (currentWorkflowId === urlWorkflowId) return logger.info( @@ -497,19 +536,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.info(`Joining workflow room: ${urlWorkflowId}`) socket.emit('join-workflow', { workflowId: urlWorkflowId, + tabSessionId: getTabSessionId(), }) - setCurrentWorkflowId(urlWorkflowId) }, [socket, isConnected, urlWorkflowId, currentWorkflowId]) - useEffect(() => { - return () => { - if (socket) { - logger.info('Cleaning up socket connection on unmount') - socket.disconnect() - } - } - }, []) - const joinWorkflow = useCallback( (workflowId: string) => { if (!socket || !user?.id) { @@ -530,8 +560,9 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.info(`Joining workflow: ${workflowId}`) socket.emit('join-workflow', { workflowId, + tabSessionId: getTabSessionId(), }) - setCurrentWorkflowId(workflowId) + // currentWorkflowId will be set by join-workflow-success handler }, [socket, user, currentWorkflowId] ) @@ -555,8 +586,20 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } }, [socket, currentWorkflowId]) - const positionUpdateTimeouts = useRef>(new Map()) - const pendingPositionUpdates = useRef>(new Map()) + /** + * Retry socket connection after auth failure. + * Call this when user has re-authenticated (e.g., after login redirect). + */ + const retryConnection = useCallback(() => { + if (!authFailed) { + logger.info('retryConnection called but no auth failure - ignoring') + return + } + logger.info('Retrying socket connection after auth failure') + setAuthFailed(false) + // initializedRef.current was already reset in connect_error handler + // Effect will re-run and attempt connection + }, [authFailed]) const emitWorkflowOperation = useCallback( (operation: string, target: string, payload: any, operationId?: string) => { @@ -716,14 +759,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { eventHandlers.current.selectionUpdate = handler }, []) - const onUserJoined = useCallback((handler: (data: any) => void) => { - eventHandlers.current.userJoined = handler - }, []) - - const onUserLeft = useCallback((handler: (data: any) => void) => { - eventHandlers.current.userLeft = handler - }, []) - const onWorkflowDeleted = useCallback((handler: (data: any) => void) => { eventHandlers.current.workflowDeleted = handler }, []) @@ -745,11 +780,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socket, isConnected, isConnecting, + authFailed, currentWorkflowId, currentSocketId, presenceUsers, joinWorkflow, leaveWorkflow, + retryConnection, emitWorkflowOperation, emitSubblockUpdate, emitVariableUpdate, @@ -760,8 +797,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { onVariableUpdate, onCursorUpdate, onSelectionUpdate, - onUserJoined, - onUserLeft, onWorkflowDeleted, onWorkflowReverted, onOperationConfirmed, @@ -771,11 +806,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socket, isConnected, isConnecting, + authFailed, currentWorkflowId, currentSocketId, presenceUsers, joinWorkflow, leaveWorkflow, + retryConnection, emitWorkflowOperation, emitSubblockUpdate, emitVariableUpdate, @@ -786,8 +823,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { onVariableUpdate, onCursorUpdate, onSelectionUpdate, - onUserJoined, - onUserLeft, onWorkflowDeleted, onWorkflowReverted, onOperationConfirmed, diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index 5f5721549..ea0600330 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -119,8 +119,6 @@ export function useCollaborativeWorkflow() { onWorkflowOperation, onSubblockUpdate, onVariableUpdate, - onUserJoined, - onUserLeft, onWorkflowDeleted, onWorkflowReverted, onOperationConfirmed, @@ -484,14 +482,6 @@ export function useCollaborativeWorkflow() { } } - const handleUserJoined = (data: any) => { - logger.info(`User joined: ${data.userName}`) - } - - const handleUserLeft = (data: any) => { - logger.info(`User left: ${data.userId}`) - } - const handleWorkflowDeleted = (data: any) => { const { workflowId } = data logger.warn(`Workflow ${workflowId} has been deleted`) @@ -600,26 +590,17 @@ export function useCollaborativeWorkflow() { failOperation(operationId, retryable) } - // Register event handlers onWorkflowOperation(handleWorkflowOperation) onSubblockUpdate(handleSubblockUpdate) onVariableUpdate(handleVariableUpdate) - onUserJoined(handleUserJoined) - onUserLeft(handleUserLeft) onWorkflowDeleted(handleWorkflowDeleted) onWorkflowReverted(handleWorkflowReverted) onOperationConfirmed(handleOperationConfirmed) onOperationFailed(handleOperationFailed) - - return () => { - // Cleanup handled by socket context - } }, [ onWorkflowOperation, onSubblockUpdate, onVariableUpdate, - onUserJoined, - onUserLeft, onWorkflowDeleted, onWorkflowReverted, onOperationConfirmed, diff --git a/apps/sim/package.json b/apps/sim/package.json index ae3035697..122dc84d0 100644 --- a/apps/sim/package.json +++ b/apps/sim/package.json @@ -74,6 +74,7 @@ "@react-email/components": "^0.0.34", "@react-email/render": "2.0.0", "@sim/logger": "workspace:*", + "@socket.io/redis-adapter": "8.3.0", "@t3-oss/env-nextjs": "0.13.4", "@tanstack/react-query": "5.90.8", "@tanstack/react-query-devtools": "5.90.2", @@ -144,6 +145,7 @@ "react-simple-code-editor": "^0.14.1", "react-window": "2.2.3", "reactflow": "^11.11.4", + "redis": "5.10.0", "rehype-autolink-headings": "^7.1.0", "rehype-slug": "^6.0.0", "remark-gfm": "4.0.1", diff --git a/apps/sim/socket/config/socket.ts b/apps/sim/socket/config/socket.ts index 71bfd3d19..c4272b198 100644 --- a/apps/sim/socket/config/socket.ts +++ b/apps/sim/socket/config/socket.ts @@ -1,5 +1,7 @@ import type { Server as HttpServer } from 'http' import { createLogger } from '@sim/logger' +import { createAdapter } from '@socket.io/redis-adapter' +import { createClient, type RedisClientType } from 'redis' import { Server } from 'socket.io' import { env } from '@/lib/core/config/env' import { isProd } from '@/lib/core/config/feature-flags' @@ -7,9 +9,9 @@ import { getBaseUrl } from '@/lib/core/utils/urls' const logger = createLogger('SocketIOConfig') -/** - * Get allowed origins for Socket.IO CORS configuration - */ +let adapterPubClient: RedisClientType | null = null +let adapterSubClient: RedisClientType | null = null + function getAllowedOrigins(): string[] { const allowedOrigins = [ getBaseUrl(), @@ -24,11 +26,10 @@ function getAllowedOrigins(): string[] { } /** - * Create and configure a Socket.IO server instance - * @param httpServer - The HTTP server instance to attach Socket.IO to - * @returns Configured Socket.IO server instance + * Create and configure a Socket.IO server instance. + * If REDIS_URL is configured, adds Redis adapter for cross-pod broadcasting. */ -export function createSocketIOServer(httpServer: HttpServer): Server { +export async function createSocketIOServer(httpServer: HttpServer): Promise { const allowedOrigins = getAllowedOrigins() const io = new Server(httpServer, { @@ -36,22 +37,69 @@ export function createSocketIOServer(httpServer: HttpServer): Server { origin: allowedOrigins, methods: ['GET', 'POST', 'OPTIONS'], allowedHeaders: ['Content-Type', 'Authorization', 'Cookie', 'socket.io'], - credentials: true, // Enable credentials to accept cookies + credentials: true, }, - transports: ['websocket', 'polling'], // WebSocket first, polling as fallback - allowEIO3: true, // Keep legacy support for compatibility - pingTimeout: 60000, // Back to original conservative setting - pingInterval: 25000, // Back to original interval + transports: ['websocket', 'polling'], + allowEIO3: true, + pingTimeout: 60000, + pingInterval: 25000, maxHttpBufferSize: 1e6, cookie: { name: 'io', path: '/', httpOnly: true, - sameSite: 'none', // Required for cross-origin cookies - secure: isProd, // HTTPS in production + sameSite: 'none', + secure: isProd, }, }) + if (env.REDIS_URL) { + logger.info('Configuring Socket.IO Redis adapter...') + + const redisOptions = { + url: env.REDIS_URL, + socket: { + reconnectStrategy: (retries: number) => { + if (retries > 10) { + logger.error('Redis adapter reconnection failed after 10 attempts') + return new Error('Redis adapter reconnection failed') + } + const delay = Math.min(retries * 100, 3000) + logger.warn(`Redis adapter reconnecting in ${delay}ms (attempt ${retries})`) + return delay + }, + }, + } + + // Create separate clients for pub and sub (recommended for reliability) + adapterPubClient = createClient(redisOptions) + adapterSubClient = createClient(redisOptions) + + adapterPubClient.on('error', (err) => { + logger.error('Redis adapter pub client error:', err) + }) + + adapterSubClient.on('error', (err) => { + logger.error('Redis adapter sub client error:', err) + }) + + adapterPubClient.on('ready', () => { + logger.info('Redis adapter pub client ready') + }) + + adapterSubClient.on('ready', () => { + logger.info('Redis adapter sub client ready') + }) + + await Promise.all([adapterPubClient.connect(), adapterSubClient.connect()]) + + io.adapter(createAdapter(adapterPubClient, adapterSubClient)) + + logger.info('Socket.IO Redis adapter connected - cross-pod broadcasting enabled') + } else { + logger.warn('REDIS_URL not configured - running in single-pod mode') + } + logger.info('Socket.IO server configured with:', { allowedOrigins: allowedOrigins.length, transports: ['websocket', 'polling'], @@ -60,7 +108,39 @@ export function createSocketIOServer(httpServer: HttpServer): Server { maxHttpBufferSize: 1e6, cookieSecure: isProd, corsCredentials: true, + redisAdapter: !!env.REDIS_URL, }) return io } + +/** + * Clean up Redis adapter connections. + * Call this during graceful shutdown. + */ +export async function shutdownSocketIOAdapter(): Promise { + const closePromises: Promise[] = [] + + if (adapterPubClient) { + closePromises.push( + adapterPubClient.quit().then(() => { + logger.info('Redis adapter pub client closed') + adapterPubClient = null + }) + ) + } + + if (adapterSubClient) { + closePromises.push( + adapterSubClient.quit().then(() => { + logger.info('Redis adapter sub client closed') + adapterSubClient = null + }) + ) + } + + if (closePromises.length > 0) { + await Promise.all(closePromises) + logger.info('Socket.IO Redis adapter shutdown complete') + } +} diff --git a/apps/sim/socket/handlers/connection.ts b/apps/sim/socket/handlers/connection.ts index eac513ff6..5444c9a83 100644 --- a/apps/sim/socket/handlers/connection.ts +++ b/apps/sim/socket/handlers/connection.ts @@ -1,17 +1,12 @@ import { createLogger } from '@sim/logger' -import type { HandlerDependencies } from '@/socket/handlers/workflow' +import { cleanupPendingSubblocksForSocket } from '@/socket/handlers/subblocks' +import { cleanupPendingVariablesForSocket } from '@/socket/handlers/variables' import type { AuthenticatedSocket } from '@/socket/middleware/auth' -import type { RoomManager } from '@/socket/rooms/manager' +import type { IRoomManager } from '@/socket/rooms' const logger = createLogger('ConnectionHandlers') -export function setupConnectionHandlers( - socket: AuthenticatedSocket, - deps: HandlerDependencies | RoomManager -) { - const roomManager = - deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager) - +export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('error', (error) => { logger.error(`Socket ${socket.id} error:`, error) }) @@ -20,13 +15,22 @@ export function setupConnectionHandlers( logger.error(`Socket ${socket.id} connection error:`, error) }) - socket.on('disconnect', (reason) => { - const workflowId = roomManager.getWorkflowIdForSocket(socket.id) - const session = roomManager.getUserSession(socket.id) + socket.on('disconnect', async (reason) => { + try { + // Clean up pending debounce entries for this socket to prevent memory leaks + cleanupPendingSubblocksForSocket(socket.id) + cleanupPendingVariablesForSocket(socket.id) - if (workflowId && session) { - roomManager.cleanupUserFromRoom(socket.id, workflowId) - roomManager.broadcastPresenceUpdate(workflowId) + const workflowId = await roomManager.removeUserFromRoom(socket.id) + + if (workflowId) { + await roomManager.broadcastPresenceUpdate(workflowId) + logger.info( + `Socket ${socket.id} disconnected from workflow ${workflowId} (reason: ${reason})` + ) + } + } catch (error) { + logger.error(`Error handling disconnect for socket ${socket.id}:`, error) } }) } diff --git a/apps/sim/socket/handlers/index.ts b/apps/sim/socket/handlers/index.ts index 622a0b218..4afeed40e 100644 --- a/apps/sim/socket/handlers/index.ts +++ b/apps/sim/socket/handlers/index.ts @@ -5,16 +5,9 @@ import { setupSubblocksHandlers } from '@/socket/handlers/subblocks' import { setupVariablesHandlers } from '@/socket/handlers/variables' import { setupWorkflowHandlers } from '@/socket/handlers/workflow' import type { AuthenticatedSocket } from '@/socket/middleware/auth' -import type { RoomManager, UserPresence, WorkflowRoom } from '@/socket/rooms/manager' +import type { IRoomManager } from '@/socket/rooms' -export type { UserPresence, WorkflowRoom } - -/** - * Sets up all socket event handlers for an authenticated socket connection - * @param socket - The authenticated socket instance - * @param roomManager - Room manager instance for state management - */ -export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: RoomManager) { +export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { setupWorkflowHandlers(socket, roomManager) setupOperationsHandlers(socket, roomManager) setupSubblocksHandlers(socket, roomManager) @@ -22,12 +15,3 @@ export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: RoomM setupPresenceHandlers(socket, roomManager) setupConnectionHandlers(socket, roomManager) } - -export { - setupWorkflowHandlers, - setupOperationsHandlers, - setupSubblocksHandlers, - setupVariablesHandlers, - setupPresenceHandlers, - setupConnectionHandlers, -} diff --git a/apps/sim/socket/handlers/operations.ts b/apps/sim/socket/handlers/operations.ts index 9b74293bb..5cbccee3a 100644 --- a/apps/sim/socket/handlers/operations.ts +++ b/apps/sim/socket/handlers/operations.ts @@ -10,23 +10,17 @@ import { WORKFLOW_OPERATIONS, } from '@/socket/constants' import { persistWorkflowOperation } from '@/socket/database/operations' -import type { HandlerDependencies } from '@/socket/handlers/workflow' import type { AuthenticatedSocket } from '@/socket/middleware/auth' import { checkRolePermission } from '@/socket/middleware/permissions' -import type { RoomManager } from '@/socket/rooms/manager' +import type { IRoomManager } from '@/socket/rooms' import { WorkflowOperationSchema } from '@/socket/validation/schemas' const logger = createLogger('OperationsHandlers') -export function setupOperationsHandlers( - socket: AuthenticatedSocket, - deps: HandlerDependencies | RoomManager -) { - const roomManager = - deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager) +export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('workflow-operation', async (data) => { - const workflowId = roomManager.getWorkflowIdForSocket(socket.id) - const session = roomManager.getUserSession(socket.id) + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const session = await roomManager.getUserSession(socket.id) if (!workflowId || !session) { socket.emit('error', { @@ -36,8 +30,8 @@ export function setupOperationsHandlers( return } - const room = roomManager.getWorkflowRoom(workflowId) - if (!room) { + const hasRoom = await roomManager.hasWorkflowRoom(workflowId) + if (!hasRoom) { socket.emit('error', { type: 'ROOM_NOT_FOUND', message: 'Workflow room not found', @@ -60,16 +54,18 @@ export function setupOperationsHandlers( isPositionUpdate && 'commit' in payload ? payload.commit === true : false const operationTimestamp = isPositionUpdate ? timestamp : Date.now() + // Get user presence for permission checking + const users = await roomManager.getWorkflowUsers(workflowId) + const userPresence = users.find((u) => u.socketId === socket.id) + // Skip permission checks for non-committed position updates (broadcasts only, no persistence) if (isPositionUpdate && !commitPositionUpdate) { // Update last activity - const userPresence = room.users.get(socket.id) if (userPresence) { - userPresence.lastActivity = Date.now() + await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() }) } } else { // Check permissions from cached role for all other operations - const userPresence = room.users.get(socket.id) if (!userPresence) { logger.warn(`User presence not found for socket ${socket.id}`) socket.emit('operation-forbidden', { @@ -81,7 +77,7 @@ export function setupOperationsHandlers( return } - userPresence.lastActivity = Date.now() + await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() }) // Check permissions using cached role (no DB query) const permissionCheck = checkRolePermission(userPresence.role, operation) @@ -132,7 +128,7 @@ export function setupOperationsHandlers( timestamp: operationTimestamp, userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) if (operationId) { socket.emit('operation-confirmed', { @@ -178,7 +174,7 @@ export function setupOperationsHandlers( timestamp: operationTimestamp, userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) if (operationId) { socket.emit('operation-confirmed', { operationId, serverTimestamp: Date.now() }) @@ -211,7 +207,7 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) const broadcastData = { operation, @@ -251,7 +247,7 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) const broadcastData = { operation, @@ -288,7 +284,7 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) socket.to(workflowId).emit('workflow-operation', { operation, @@ -320,7 +316,7 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) socket.to(workflowId).emit('workflow-operation', { operation, @@ -349,7 +345,7 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) socket.to(workflowId).emit('workflow-operation', { operation, @@ -381,7 +377,7 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) socket.to(workflowId).emit('workflow-operation', { operation, @@ -413,7 +409,7 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) socket.to(workflowId).emit('workflow-operation', { operation, @@ -445,7 +441,7 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) socket.to(workflowId).emit('workflow-operation', { operation, @@ -474,7 +470,7 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) socket.to(workflowId).emit('workflow-operation', { operation, @@ -503,27 +499,24 @@ export function setupOperationsHandlers( userId: session.userId, }) - room.lastModified = Date.now() + await roomManager.updateRoomLastModified(workflowId) const broadcastData = { operation, target, payload, - timestamp: operationTimestamp, // Preserve client timestamp for position updates + timestamp: operationTimestamp, senderId: socket.id, userId: session.userId, userName: session.userName, - // Add operation metadata for better client handling metadata: { workflowId, operationId: crypto.randomUUID(), - isPositionUpdate, // Flag to help clients handle position updates specially }, } socket.to(workflowId).emit('workflow-operation', broadcastData) - // Emit confirmation if operationId is provided if (operationId) { socket.emit('operation-confirmed', { operationId, @@ -533,16 +526,14 @@ export function setupOperationsHandlers( } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred' - // Emit operation-failed for queue-tracked operations if (operationId) { socket.emit('operation-failed', { operationId, error: errorMessage, - retryable: !(error instanceof ZodError), // Don't retry validation errors + retryable: !(error instanceof ZodError), }) } - // Also emit legacy operation-error for backward compatibility if (error instanceof ZodError) { socket.emit('operation-error', { type: 'VALIDATION_ERROR', @@ -553,7 +544,6 @@ export function setupOperationsHandlers( }) logger.warn(`Validation error for operation from ${session.userId}:`, error.errors) } else if (error instanceof Error) { - // Handle specific database errors if (error.message.includes('not found')) { socket.emit('operation-error', { type: 'RESOURCE_NOT_FOUND', diff --git a/apps/sim/socket/handlers/presence.ts b/apps/sim/socket/handlers/presence.ts index 03b1e64cf..208183d2c 100644 --- a/apps/sim/socket/handlers/presence.ts +++ b/apps/sim/socket/handlers/presence.ts @@ -1,62 +1,53 @@ import { createLogger } from '@sim/logger' -import type { HandlerDependencies } from '@/socket/handlers/workflow' import type { AuthenticatedSocket } from '@/socket/middleware/auth' -import type { RoomManager } from '@/socket/rooms/manager' +import type { IRoomManager } from '@/socket/rooms' const logger = createLogger('PresenceHandlers') -export function setupPresenceHandlers( - socket: AuthenticatedSocket, - deps: HandlerDependencies | RoomManager -) { - const roomManager = - deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager) - socket.on('cursor-update', ({ cursor }) => { - const workflowId = roomManager.getWorkflowIdForSocket(socket.id) - const session = roomManager.getUserSession(socket.id) +export function setupPresenceHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { + socket.on('cursor-update', async ({ cursor }) => { + try { + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const session = await roomManager.getUserSession(socket.id) - if (!workflowId || !session) return + if (!workflowId || !session) return - const room = roomManager.getWorkflowRoom(workflowId) - if (!room) return + // Update cursor in room state + await roomManager.updateUserActivity(workflowId, socket.id, { cursor }) - const userPresence = room.users.get(socket.id) - if (userPresence) { - userPresence.cursor = cursor - userPresence.lastActivity = Date.now() + // Broadcast to other users in the room + socket.to(workflowId).emit('cursor-update', { + socketId: socket.id, + userId: session.userId, + userName: session.userName, + avatarUrl: session.avatarUrl, + cursor, + }) + } catch (error) { + logger.error(`Error handling cursor update for socket ${socket.id}:`, error) } - - socket.to(workflowId).emit('cursor-update', { - socketId: socket.id, - userId: session.userId, - userName: session.userName, - avatarUrl: session.avatarUrl, - cursor, - }) }) - // Handle user selection (for showing what block/element a user has selected) - socket.on('selection-update', ({ selection }) => { - const workflowId = roomManager.getWorkflowIdForSocket(socket.id) - const session = roomManager.getUserSession(socket.id) + socket.on('selection-update', async ({ selection }) => { + try { + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const session = await roomManager.getUserSession(socket.id) - if (!workflowId || !session) return + if (!workflowId || !session) return - const room = roomManager.getWorkflowRoom(workflowId) - if (!room) return + // Update selection in room state + await roomManager.updateUserActivity(workflowId, socket.id, { selection }) - const userPresence = room.users.get(socket.id) - if (userPresence) { - userPresence.selection = selection - userPresence.lastActivity = Date.now() + // Broadcast to other users in the room + socket.to(workflowId).emit('selection-update', { + socketId: socket.id, + userId: session.userId, + userName: session.userName, + avatarUrl: session.avatarUrl, + selection, + }) + } catch (error) { + logger.error(`Error handling selection update for socket ${socket.id}:`, error) } - - socket.to(workflowId).emit('selection-update', { - socketId: socket.id, - userId: session.userId, - userName: session.userName, - avatarUrl: session.avatarUrl, - selection, - }) }) } diff --git a/apps/sim/socket/handlers/subblocks.ts b/apps/sim/socket/handlers/subblocks.ts index cfd0e1a1a..0a9c30b9f 100644 --- a/apps/sim/socket/handlers/subblocks.ts +++ b/apps/sim/socket/handlers/subblocks.ts @@ -2,9 +2,8 @@ import { db } from '@sim/db' import { workflow, workflowBlocks } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' -import type { HandlerDependencies } from '@/socket/handlers/workflow' import type { AuthenticatedSocket } from '@/socket/middleware/auth' -import type { RoomManager } from '@/socket/rooms/manager' +import type { IRoomManager } from '@/socket/rooms' const logger = createLogger('SubblocksHandlers') @@ -18,15 +17,27 @@ type PendingSubblock = { // Keyed by `${workflowId}:${blockId}:${subblockId}` const pendingSubblockUpdates = new Map() -export function setupSubblocksHandlers( - socket: AuthenticatedSocket, - deps: HandlerDependencies | RoomManager -) { - const roomManager = - deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager) +/** + * Cleans up pending updates for a disconnected socket. + * Removes the socket's operationIds from pending updates to prevent memory leaks. + */ +export function cleanupPendingSubblocksForSocket(socketId: string): void { + for (const [key, pending] of pendingSubblockUpdates.entries()) { + // Remove this socket's operation entries + for (const [opId, sid] of pending.opToSocket.entries()) { + if (sid === socketId) { + pending.opToSocket.delete(opId) + } + } + // If no more operations are waiting, the timeout will still fire and flush + // This is fine - the update will still persist, just no confirmation to send + } +} + +export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('subblock-update', async (data) => { - const workflowId = roomManager.getWorkflowIdForSocket(socket.id) - const session = roomManager.getUserSession(socket.id) + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const session = await roomManager.getUserSession(socket.id) if (!workflowId || !session) { logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, { @@ -38,9 +49,9 @@ export function setupSubblocksHandlers( } const { blockId, subblockId, value, timestamp, operationId } = data - const room = roomManager.getWorkflowRoom(workflowId) - if (!room) { + const hasRoom = await roomManager.hasWorkflowRoom(workflowId) + if (!hasRoom) { logger.debug(`Ignoring subblock update: workflow room not found`, { socketId: socket.id, workflowId, @@ -51,10 +62,8 @@ export function setupSubblocksHandlers( } try { - const userPresence = room.users.get(socket.id) - if (userPresence) { - userPresence.lastActivity = Date.now() - } + // Update user activity + await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() }) // Server-side debounce/coalesce by workflowId+blockId+subblockId const debouncedKey = `${workflowId}:${blockId}:${subblockId}` @@ -88,7 +97,6 @@ export function setupSubblocksHandlers( const errorMessage = error instanceof Error ? error.message : 'Unknown error' - // Best-effort failure for the single operation if provided if (operationId) { socket.emit('operation-failed', { operationId, @@ -97,7 +105,6 @@ export function setupSubblocksHandlers( }) } - // Also emit legacy operation-error for backward compatibility socket.emit('operation-error', { type: 'SUBBLOCK_UPDATE_FAILED', message: `Failed to update subblock ${blockId}.${subblockId}: ${errorMessage}`, @@ -111,9 +118,11 @@ export function setupSubblocksHandlers( async function flushSubblockUpdate( workflowId: string, pending: PendingSubblock, - roomManager: RoomManager + roomManager: IRoomManager ) { const { blockId, subblockId, value, timestamp } = pending.latest + const io = roomManager.io + try { // Verify workflow still exists const workflowExists = await db @@ -124,7 +133,7 @@ async function flushSubblockUpdate( if (workflowExists.length === 0) { pending.opToSocket.forEach((socketId, opId) => { - const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId) + const sock = io.sockets.sockets.get(socketId) if (sock) { sock.emit('operation-failed', { operationId: opId, @@ -164,40 +173,48 @@ async function flushSubblockUpdate( }) if (updateSuccessful) { - // Broadcast to other clients (exclude senders to avoid overwriting their local state) - const senderSocketIds = new Set(pending.opToSocket.values()) - const io = (roomManager as any).io - if (io) { - // Get all sockets in the room - const roomSockets = io.sockets.adapter.rooms.get(workflowId) - if (roomSockets) { - roomSockets.forEach((socketId: string) => { - // Only emit to sockets that didn't send any of the coalesced ops - if (!senderSocketIds.has(socketId)) { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('subblock-update', { - blockId, - subblockId, - value, - timestamp, - }) - } - } - }) - } + // Collect all sender socket IDs to exclude from broadcast + const senderSocketIds = [...pending.opToSocket.values()] + const firstSenderSocket = + senderSocketIds.length > 0 ? io.sockets.sockets.get(senderSocketIds[0]) : null + + if (firstSenderSocket) { + // socket.to(room).emit() excludes sender and broadcasts across all pods via Redis adapter + firstSenderSocket.to(workflowId).emit('subblock-update', { + blockId, + subblockId, + value, + timestamp, + }) + } else if (senderSocketIds.length > 0) { + // Senders disconnected but we should still exclude them in case they reconnected + // Use io.except() to exclude all sender socket IDs + io.to(workflowId).except(senderSocketIds).emit('subblock-update', { + blockId, + subblockId, + value, + timestamp, + }) + } else { + // No senders tracked (edge case) - broadcast to all + roomManager.emitToWorkflow(workflowId, 'subblock-update', { + blockId, + subblockId, + value, + timestamp, + }) } - // Confirm all coalesced operationIds + // Confirm all coalesced operationIds (only to sockets still connected on this pod) pending.opToSocket.forEach((socketId, opId) => { - const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId) + const sock = io.sockets.sockets.get(socketId) if (sock) { sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() }) } }) } else { pending.opToSocket.forEach((socketId, opId) => { - const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId) + const sock = io.sockets.sockets.get(socketId) if (sock) { sock.emit('operation-failed', { operationId: opId, @@ -210,7 +227,7 @@ async function flushSubblockUpdate( } catch (error) { logger.error('Error flushing subblock update:', error) pending.opToSocket.forEach((socketId, opId) => { - const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId) + const sock = io.sockets.sockets.get(socketId) if (sock) { sock.emit('operation-failed', { operationId: opId, diff --git a/apps/sim/socket/handlers/variables.ts b/apps/sim/socket/handlers/variables.ts index ec4a6ae61..d53b295ee 100644 --- a/apps/sim/socket/handlers/variables.ts +++ b/apps/sim/socket/handlers/variables.ts @@ -2,9 +2,8 @@ import { db } from '@sim/db' import { workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' -import type { HandlerDependencies } from '@/socket/handlers/workflow' import type { AuthenticatedSocket } from '@/socket/middleware/auth' -import type { RoomManager } from '@/socket/rooms/manager' +import type { IRoomManager } from '@/socket/rooms' const logger = createLogger('VariablesHandlers') @@ -17,16 +16,24 @@ type PendingVariable = { // Keyed by `${workflowId}:${variableId}:${field}` const pendingVariableUpdates = new Map() -export function setupVariablesHandlers( - socket: AuthenticatedSocket, - deps: HandlerDependencies | RoomManager -) { - const roomManager = - deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager) +/** + * Cleans up pending updates for a disconnected socket. + * Removes the socket's operationIds from pending updates to prevent memory leaks. + */ +export function cleanupPendingVariablesForSocket(socketId: string): void { + for (const [key, pending] of pendingVariableUpdates.entries()) { + for (const [opId, sid] of pending.opToSocket.entries()) { + if (sid === socketId) { + pending.opToSocket.delete(opId) + } + } + } +} +export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('variable-update', async (data) => { - const workflowId = roomManager.getWorkflowIdForSocket(socket.id) - const session = roomManager.getUserSession(socket.id) + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const session = await roomManager.getUserSession(socket.id) if (!workflowId || !session) { logger.debug(`Ignoring variable update: socket not connected to any workflow room`, { @@ -38,9 +45,9 @@ export function setupVariablesHandlers( } const { variableId, field, value, timestamp, operationId } = data - const room = roomManager.getWorkflowRoom(workflowId) - if (!room) { + const hasRoom = await roomManager.hasWorkflowRoom(workflowId) + if (!hasRoom) { logger.debug(`Ignoring variable update: workflow room not found`, { socketId: socket.id, workflowId, @@ -51,10 +58,8 @@ export function setupVariablesHandlers( } try { - const userPresence = room.users.get(socket.id) - if (userPresence) { - userPresence.lastActivity = Date.now() - } + // Update user activity + await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() }) const debouncedKey = `${workflowId}:${variableId}:${field}` const existing = pendingVariableUpdates.get(debouncedKey) @@ -108,9 +113,11 @@ export function setupVariablesHandlers( async function flushVariableUpdate( workflowId: string, pending: PendingVariable, - roomManager: RoomManager + roomManager: IRoomManager ) { const { variableId, field, value, timestamp } = pending.latest + const io = roomManager.io + try { const workflowExists = await db .select({ id: workflow.id }) @@ -120,7 +127,7 @@ async function flushVariableUpdate( if (workflowExists.length === 0) { pending.opToSocket.forEach((socketId, opId) => { - const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId) + const sock = io.sockets.sockets.get(socketId) if (sock) { sock.emit('operation-failed', { operationId: opId, @@ -163,30 +170,40 @@ async function flushVariableUpdate( }) if (updateSuccessful) { - // Broadcast to other clients (exclude senders to avoid overwriting their local state) - const senderSocketIds = new Set(pending.opToSocket.values()) - const io = (roomManager as any).io - if (io) { - const roomSockets = io.sockets.adapter.rooms.get(workflowId) - if (roomSockets) { - roomSockets.forEach((socketId: string) => { - if (!senderSocketIds.has(socketId)) { - const sock = io.sockets.sockets.get(socketId) - if (sock) { - sock.emit('variable-update', { - variableId, - field, - value, - timestamp, - }) - } - } - }) - } + // Collect all sender socket IDs to exclude from broadcast + const senderSocketIds = [...pending.opToSocket.values()] + const firstSenderSocket = + senderSocketIds.length > 0 ? io.sockets.sockets.get(senderSocketIds[0]) : null + + if (firstSenderSocket) { + // socket.to(room).emit() excludes sender and broadcasts across all pods via Redis adapter + firstSenderSocket.to(workflowId).emit('variable-update', { + variableId, + field, + value, + timestamp, + }) + } else if (senderSocketIds.length > 0) { + // Senders disconnected but we should still exclude them in case they reconnected + // Use io.except() to exclude all sender socket IDs + io.to(workflowId).except(senderSocketIds).emit('variable-update', { + variableId, + field, + value, + timestamp, + }) + } else { + // No senders tracked (edge case) - broadcast to all + roomManager.emitToWorkflow(workflowId, 'variable-update', { + variableId, + field, + value, + timestamp, + }) } pending.opToSocket.forEach((socketId, opId) => { - const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId) + const sock = io.sockets.sockets.get(socketId) if (sock) { sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() }) } @@ -195,7 +212,7 @@ async function flushVariableUpdate( logger.debug(`Flushed variable update ${workflowId}: ${variableId}.${field}`) } else { pending.opToSocket.forEach((socketId, opId) => { - const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId) + const sock = io.sockets.sockets.get(socketId) if (sock) { sock.emit('operation-failed', { operationId: opId, @@ -208,7 +225,7 @@ async function flushVariableUpdate( } catch (error) { logger.error('Error flushing variable update:', error) pending.opToSocket.forEach((socketId, opId) => { - const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId) + const sock = io.sockets.sockets.get(socketId) if (sock) { sock.emit('operation-failed', { operationId: opId, diff --git a/apps/sim/socket/handlers/workflow.ts b/apps/sim/socket/handlers/workflow.ts index 539bcc226..93b7faef9 100644 --- a/apps/sim/socket/handlers/workflow.ts +++ b/apps/sim/socket/handlers/workflow.ts @@ -4,38 +4,12 @@ import { eq } from 'drizzle-orm' import { getWorkflowState } from '@/socket/database/operations' import type { AuthenticatedSocket } from '@/socket/middleware/auth' import { verifyWorkflowAccess } from '@/socket/middleware/permissions' -import type { RoomManager, UserPresence, WorkflowRoom } from '@/socket/rooms/manager' +import type { IRoomManager, UserPresence } from '@/socket/rooms' const logger = createLogger('WorkflowHandlers') -export type { UserPresence, WorkflowRoom } - -export interface HandlerDependencies { - roomManager: RoomManager -} - -export const createWorkflowRoom = (workflowId: string): WorkflowRoom => ({ - workflowId, - users: new Map(), - lastModified: Date.now(), - activeConnections: 0, -}) - -export const cleanupUserFromRoom = ( - socketId: string, - workflowId: string, - roomManager: RoomManager -) => { - roomManager.cleanupUserFromRoom(socketId, workflowId) -} - -export function setupWorkflowHandlers( - socket: AuthenticatedSocket, - deps: HandlerDependencies | RoomManager -) { - const roomManager = - deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager) - socket.on('join-workflow', async ({ workflowId }) => { +export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { + socket.on('join-workflow', async ({ workflowId, tabSessionId }) => { try { const userId = socket.userId const userName = socket.userName @@ -48,6 +22,7 @@ export function setupWorkflowHandlers( logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`) + // Verify workflow access let userRole: string try { const accessInfo = await verifyWorkflowAccess(userId, workflowId) @@ -63,23 +38,36 @@ export function setupWorkflowHandlers( return } - const currentWorkflowId = roomManager.getWorkflowIdForSocket(socket.id) + // Leave current room if in one + const currentWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) if (currentWorkflowId) { socket.leave(currentWorkflowId) - roomManager.cleanupUserFromRoom(socket.id, currentWorkflowId) - - roomManager.broadcastPresenceUpdate(currentWorkflowId) + await roomManager.removeUserFromRoom(socket.id) + await roomManager.broadcastPresenceUpdate(currentWorkflowId) } + const STALE_THRESHOLD_MS = 60_000 + const now = Date.now() + const existingUsers = await roomManager.getWorkflowUsers(workflowId) + for (const existingUser of existingUsers) { + if (existingUser.userId === userId && existingUser.socketId !== socket.id) { + const isSameTab = tabSessionId && existingUser.tabSessionId === tabSessionId + const isStale = + now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS + + if (isSameTab || isStale) { + logger.info( + `Cleaning up socket ${existingUser.socketId} for user ${userId} (${isSameTab ? 'same tab' : 'stale'})` + ) + await roomManager.removeUserFromRoom(existingUser.socketId) + } + } + } + + // Join the new room socket.join(workflowId) - if (!roomManager.hasWorkflowRoom(workflowId)) { - roomManager.setWorkflowRoom(workflowId, roomManager.createWorkflowRoom(workflowId)) - } - - const room = roomManager.getWorkflowRoom(workflowId)! - room.activeConnections++ - + // Get avatar URL let avatarUrl = socket.userImage || null if (!avatarUrl) { try { @@ -95,33 +83,44 @@ export function setupWorkflowHandlers( } } + // Create presence entry const userPresence: UserPresence = { userId, workflowId, userName, socketId: socket.id, + tabSessionId, joinedAt: Date.now(), lastActivity: Date.now(), role: userRole, avatarUrl, } - room.users.set(socket.id, userPresence) - roomManager.setWorkflowForSocket(socket.id, workflowId) - roomManager.setUserSession(socket.id, { - userId, - userName, - avatarUrl, + // Add user to room + await roomManager.addUserToRoom(workflowId, socket.id, userPresence) + + // Get current presence list for the join acknowledgment + const presenceUsers = await roomManager.getWorkflowUsers(workflowId) + + // Get workflow state + const workflowState = await getWorkflowState(workflowId) + + // Send join success with presence list (client waits for this to confirm join) + socket.emit('join-workflow-success', { + workflowId, + socketId: socket.id, + presenceUsers, }) - const workflowState = await getWorkflowState(workflowId) + // Send workflow state socket.emit('workflow-state', workflowState) - roomManager.broadcastPresenceUpdate(workflowId) + // Broadcast presence update to all users in the room + await roomManager.broadcastPresenceUpdate(workflowId) - const uniqueUserCount = roomManager.getUniqueUserCount(workflowId) + const uniqueUserCount = await roomManager.getUniqueUserCount(workflowId) logger.info( - `User ${userId} (${userName}) joined workflow ${workflowId}. Room now has ${uniqueUserCount} unique users (${room.activeConnections} connections).` + `User ${userId} (${userName}) joined workflow ${workflowId}. Room now has ${uniqueUserCount} unique users.` ) } catch (error) { logger.error('Error joining workflow:', error) @@ -132,17 +131,20 @@ export function setupWorkflowHandlers( } }) - socket.on('leave-workflow', () => { - const workflowId = roomManager.getWorkflowIdForSocket(socket.id) - const session = roomManager.getUserSession(socket.id) + socket.on('leave-workflow', async () => { + try { + const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const session = await roomManager.getUserSession(socket.id) - if (workflowId && session) { - socket.leave(workflowId) - roomManager.cleanupUserFromRoom(socket.id, workflowId) + if (workflowId && session) { + socket.leave(workflowId) + await roomManager.removeUserFromRoom(socket.id) + await roomManager.broadcastPresenceUpdate(workflowId) - roomManager.broadcastPresenceUpdate(workflowId) - - logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`) + logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`) + } + } catch (error) { + logger.error('Error leaving workflow:', error) } }) } diff --git a/apps/sim/socket/index.test.ts b/apps/sim/socket/index.test.ts index 377edd41b..ee5489120 100644 --- a/apps/sim/socket/index.test.ts +++ b/apps/sim/socket/index.test.ts @@ -7,7 +7,7 @@ import { createServer, request as httpRequest } from 'http' import { createMockLogger, databaseMock } from '@sim/testing' import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { createSocketIOServer } from '@/socket/config/socket' -import { RoomManager } from '@/socket/rooms/manager' +import { MemoryRoomManager } from '@/socket/rooms' import { createHttpHandler } from '@/socket/routes/http' vi.mock('@/lib/auth', () => ({ @@ -20,6 +20,30 @@ vi.mock('@/lib/auth', () => ({ vi.mock('@sim/db', () => databaseMock) +// Mock redis package to prevent actual Redis connections +vi.mock('redis', () => ({ + createClient: vi.fn(() => ({ + on: vi.fn(), + connect: vi.fn().mockResolvedValue(undefined), + quit: vi.fn().mockResolvedValue(undefined), + duplicate: vi.fn().mockReturnThis(), + })), +})) + +// Mock env to not have REDIS_URL (use importOriginal to get helper functions) +vi.mock('@/lib/core/config/env', async (importOriginal) => { + const actual = await importOriginal() + return { + ...actual, + env: { + ...actual.env, + DATABASE_URL: 'postgres://localhost/test', + NODE_ENV: 'test', + REDIS_URL: undefined, + }, + } +}) + vi.mock('@/socket/middleware/auth', () => ({ authenticateSocket: vi.fn((socket, next) => { socket.userId = 'test-user-id' @@ -51,7 +75,7 @@ vi.mock('@/socket/database/operations', () => ({ describe('Socket Server Index Integration', () => { let httpServer: any let io: any - let roomManager: RoomManager + let roomManager: MemoryRoomManager let logger: ReturnType let PORT: number @@ -64,9 +88,10 @@ describe('Socket Server Index Integration', () => { httpServer = createServer() - io = createSocketIOServer(httpServer) + io = await createSocketIOServer(httpServer) - roomManager = new RoomManager(io) + roomManager = new MemoryRoomManager(io) + await roomManager.initialize() const httpHandler = createHttpHandler(roomManager, logger) httpServer.on('request', httpHandler) @@ -98,6 +123,9 @@ describe('Socket Server Index Integration', () => { }, 20000) afterEach(async () => { + if (roomManager) { + await roomManager.shutdown() + } if (io) { await new Promise((resolve) => { io.close(() => resolve()) @@ -177,43 +205,60 @@ describe('Socket Server Index Integration', () => { }) describe('Room Manager Integration', () => { - it('should create room manager successfully', () => { + it('should create room manager successfully', async () => { expect(roomManager).toBeDefined() - expect(roomManager.getTotalActiveConnections()).toBe(0) + expect(await roomManager.getTotalActiveConnections()).toBe(0) }) - it('should create workflow rooms', () => { + it('should add and get users from workflow rooms', async () => { const workflowId = 'test-workflow-123' - const room = roomManager.createWorkflowRoom(workflowId) - roomManager.setWorkflowRoom(workflowId, room) + const socketId = 'test-socket-123' - expect(roomManager.hasWorkflowRoom(workflowId)).toBe(true) - const retrievedRoom = roomManager.getWorkflowRoom(workflowId) - expect(retrievedRoom).toBeDefined() - expect(retrievedRoom?.workflowId).toBe(workflowId) + const presence = { + userId: 'user-123', + workflowId, + userName: 'Test User', + socketId, + joinedAt: Date.now(), + lastActivity: Date.now(), + role: 'admin', + } + + await roomManager.addUserToRoom(workflowId, socketId, presence) + + expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(true) + const users = await roomManager.getWorkflowUsers(workflowId) + expect(users).toHaveLength(1) + expect(users[0].socketId).toBe(socketId) }) - it('should manage user sessions', () => { + it('should manage user sessions', async () => { const socketId = 'test-socket-123' const workflowId = 'test-workflow-456' - const session = { userId: 'user-123', userName: 'Test User' } - roomManager.setWorkflowForSocket(socketId, workflowId) - roomManager.setUserSession(socketId, session) + const presence = { + userId: 'user-123', + workflowId, + userName: 'Test User', + socketId, + joinedAt: Date.now(), + lastActivity: Date.now(), + role: 'admin', + } - expect(roomManager.getWorkflowIdForSocket(socketId)).toBe(workflowId) - expect(roomManager.getUserSession(socketId)).toEqual(session) + await roomManager.addUserToRoom(workflowId, socketId, presence) + + expect(await roomManager.getWorkflowIdForSocket(socketId)).toBe(workflowId) + const session = await roomManager.getUserSession(socketId) + expect(session).toBeDefined() + expect(session?.userId).toBe('user-123') }) - it('should clean up rooms properly', () => { + it('should clean up rooms properly', async () => { const workflowId = 'test-workflow-789' const socketId = 'test-socket-789' - const room = roomManager.createWorkflowRoom(workflowId) - roomManager.setWorkflowRoom(workflowId, room) - - // Add user to room - room.users.set(socketId, { + const presence = { userId: 'user-789', workflowId, userName: 'Test User', @@ -221,16 +266,18 @@ describe('Socket Server Index Integration', () => { joinedAt: Date.now(), lastActivity: Date.now(), role: 'admin', - }) - room.activeConnections = 1 + } - roomManager.setWorkflowForSocket(socketId, workflowId) + await roomManager.addUserToRoom(workflowId, socketId, presence) - // Clean up user - roomManager.cleanupUserFromRoom(socketId, workflowId) + expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(true) - expect(roomManager.hasWorkflowRoom(workflowId)).toBe(false) - expect(roomManager.getWorkflowIdForSocket(socketId)).toBeUndefined() + // Remove user + await roomManager.removeUserFromRoom(socketId) + + // Room should be cleaned up since it's now empty + expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(false) + expect(await roomManager.getWorkflowIdForSocket(socketId)).toBeNull() }) }) @@ -238,7 +285,7 @@ describe('Socket Server Index Integration', () => { it.concurrent('should properly import all extracted modules', async () => { const { createSocketIOServer } = await import('@/socket/config/socket') const { createHttpHandler } = await import('@/socket/routes/http') - const { RoomManager } = await import('@/socket/rooms/manager') + const { MemoryRoomManager, RedisRoomManager } = await import('@/socket/rooms') const { authenticateSocket } = await import('@/socket/middleware/auth') const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions') const { getWorkflowState } = await import('@/socket/database/operations') @@ -246,22 +293,23 @@ describe('Socket Server Index Integration', () => { expect(createSocketIOServer).toBeTypeOf('function') expect(createHttpHandler).toBeTypeOf('function') - expect(RoomManager).toBeTypeOf('function') + expect(MemoryRoomManager).toBeTypeOf('function') + expect(RedisRoomManager).toBeTypeOf('function') expect(authenticateSocket).toBeTypeOf('function') expect(verifyWorkflowAccess).toBeTypeOf('function') expect(getWorkflowState).toBeTypeOf('function') expect(WorkflowOperationSchema).toBeDefined() }) - it.concurrent('should maintain all original functionality after refactoring', () => { + it.concurrent('should maintain all original functionality after refactoring', async () => { expect(httpServer).toBeDefined() expect(io).toBeDefined() expect(roomManager).toBeDefined() - expect(typeof roomManager.createWorkflowRoom).toBe('function') - expect(typeof roomManager.cleanupUserFromRoom).toBe('function') + expect(typeof roomManager.addUserToRoom).toBe('function') + expect(typeof roomManager.removeUserFromRoom).toBe('function') expect(typeof roomManager.handleWorkflowDeletion).toBe('function') - expect(typeof roomManager.validateWorkflowConsistency).toBe('function') + expect(typeof roomManager.broadcastPresenceUpdate).toBe('function') }) }) @@ -286,6 +334,7 @@ describe('Socket Server Index Integration', () => { it('should have shutdown capability', () => { expect(typeof httpServer.close).toBe('function') expect(typeof io.close).toBe('function') + expect(typeof roomManager.shutdown).toBe('function') }) }) diff --git a/apps/sim/socket/index.ts b/apps/sim/socket/index.ts index 3f7d46615..d2f1d4d5e 100644 --- a/apps/sim/socket/index.ts +++ b/apps/sim/socket/index.ts @@ -1,112 +1,132 @@ import { createServer } from 'http' import { createLogger } from '@sim/logger' import { env } from '@/lib/core/config/env' -import { createSocketIOServer } from '@/socket/config/socket' +import { createSocketIOServer, shutdownSocketIOAdapter } from '@/socket/config/socket' import { setupAllHandlers } from '@/socket/handlers' import { type AuthenticatedSocket, authenticateSocket } from '@/socket/middleware/auth' -import { RoomManager } from '@/socket/rooms/manager' +import { type IRoomManager, MemoryRoomManager, RedisRoomManager } from '@/socket/rooms' import { createHttpHandler } from '@/socket/routes/http' const logger = createLogger('CollaborativeSocketServer') -// Enhanced server configuration - HTTP server will be configured with handler after all dependencies are set up -const httpServer = createServer() +/** + * Create the appropriate RoomManager based on configuration + * If REDIS_URL is set, uses Redis for multi-pod support + * Otherwise, uses in-memory for single-pod mode + * + * IMPORTANT: If Redis is configured, we ONLY use Redis - no fallback + * This prevents split-brain scenarios + */ +async function createRoomManager(io: ReturnType): Promise { + // Type assertion needed because io parameter comes from socket.io Server + const socketIo = io as unknown as import('socket.io').Server -const io = createSocketIOServer(httpServer) + if (env.REDIS_URL) { + logger.info('Initializing Redis-backed RoomManager for multi-pod support') + const manager = new RedisRoomManager(socketIo, env.REDIS_URL) + await manager.initialize() + return manager + } -// Initialize room manager after io is created -const roomManager = new RoomManager(io) + logger.warn('No REDIS_URL configured - using in-memory RoomManager (single-pod only)') + const manager = new MemoryRoomManager(socketIo) + await manager.initialize() + return manager +} -io.use(authenticateSocket) +async function main() { + const httpServer = createServer() + const PORT = Number(env.PORT || env.SOCKET_PORT || 3002) -const httpHandler = createHttpHandler(roomManager, logger) -httpServer.on('request', httpHandler) - -process.on('uncaughtException', (error) => { - logger.error('Uncaught Exception:', error) - // Don't exit in production, just log -}) - -process.on('unhandledRejection', (reason, promise) => { - logger.error('Unhandled Rejection at:', promise, 'reason:', reason) -}) - -httpServer.on('error', (error) => { - logger.error('HTTP server error:', error) -}) - -io.engine.on('connection_error', (err) => { - logger.error('Socket.IO connection error:', { - req: err.req?.url, - code: err.code, - message: err.message, - context: err.context, + logger.info('Starting Socket.IO server...', { + port: PORT, + nodeEnv: env.NODE_ENV, + hasDatabase: !!env.DATABASE_URL, + hasAuth: !!env.BETTER_AUTH_SECRET, + hasRedis: !!env.REDIS_URL, }) -}) -io.on('connection', (socket: AuthenticatedSocket) => { - logger.info(`New socket connection: ${socket.id}`) + // Create Socket.IO server with Redis adapter if configured + const io = await createSocketIOServer(httpServer) - setupAllHandlers(socket, roomManager) -}) + // Initialize room manager (Redis or in-memory based on config) + const roomManager = await createRoomManager(io as unknown as ReturnType) -httpServer.on('request', (req, res) => { - logger.info(`🌐 HTTP Request: ${req.method} ${req.url}`, { - method: req.method, - url: req.url, - userAgent: req.headers['user-agent'], - origin: req.headers.origin, - host: req.headers.host, - timestamp: new Date().toISOString(), + // Set up authentication middleware + io.use(authenticateSocket) + + // Set up HTTP handler for health checks and internal APIs + const httpHandler = createHttpHandler(roomManager, logger) + httpServer.on('request', httpHandler) + + // Global error handlers + process.on('uncaughtException', (error) => { + logger.error('Uncaught Exception:', error) }) -}) -io.engine.on('connection_error', (err) => { - logger.error('❌ Engine.IO Connection error:', { - code: err.code, - message: err.message, - context: err.context, - req: err.req - ? { - url: err.req.url, - method: err.req.method, - headers: err.req.headers, - } - : 'No request object', + process.on('unhandledRejection', (reason, promise) => { + logger.error('Unhandled Rejection at:', promise, 'reason:', reason) }) -}) -const PORT = Number(env.PORT || env.SOCKET_PORT || 3002) + httpServer.on('error', (error: NodeJS.ErrnoException) => { + logger.error('HTTP server error:', error) + if (error.code === 'EADDRINUSE' || error.code === 'EACCES') { + process.exit(1) + } + }) -logger.info('Starting Socket.IO server...', { - port: PORT, - nodeEnv: env.NODE_ENV, - hasDatabase: !!env.DATABASE_URL, - hasAuth: !!env.BETTER_AUTH_SECRET, -}) + io.engine.on('connection_error', (err) => { + logger.error('Socket.IO connection error:', { + req: err.req?.url, + code: err.code, + message: err.message, + context: err.context, + }) + }) -httpServer.listen(PORT, '0.0.0.0', () => { - logger.info(`Socket.IO server running on port ${PORT}`) - logger.info(`🏥 Health check available at: http://localhost:${PORT}/health`) -}) + io.on('connection', (socket: AuthenticatedSocket) => { + logger.info(`New socket connection: ${socket.id}`) + setupAllHandlers(socket, roomManager) + }) -httpServer.on('error', (error) => { - logger.error('❌ Server failed to start:', error) + httpServer.listen(PORT, '0.0.0.0', () => { + logger.info(`Socket.IO server running on port ${PORT}`) + logger.info(`Health check available at: http://localhost:${PORT}/health`) + }) + + const shutdown = async () => { + logger.info('Shutting down Socket.IO server...') + + try { + await roomManager.shutdown() + logger.info('RoomManager shutdown complete') + } catch (error) { + logger.error('Error during RoomManager shutdown:', error) + } + + try { + await shutdownSocketIOAdapter() + } catch (error) { + logger.error('Error during Socket.IO adapter shutdown:', error) + } + + httpServer.close(() => { + logger.info('Socket.IO server closed') + process.exit(0) + }) + + setTimeout(() => { + logger.error('Forced shutdown after timeout') + process.exit(1) + }, 10000) + } + + process.on('SIGINT', shutdown) + process.on('SIGTERM', shutdown) +} + +// Start the server +main().catch((error) => { + logger.error('Failed to start server:', error) process.exit(1) }) - -process.on('SIGINT', () => { - logger.info('Shutting down Socket.IO server...') - httpServer.close(() => { - logger.info('Socket.IO server closed') - process.exit(0) - }) -}) - -process.on('SIGTERM', () => { - logger.info('Shutting down Socket.IO server...') - httpServer.close(() => { - logger.info('Socket.IO server closed') - process.exit(0) - }) -}) diff --git a/apps/sim/socket/middleware/permissions.ts b/apps/sim/socket/middleware/permissions.ts index 3ec37f158..f3c574d20 100644 --- a/apps/sim/socket/middleware/permissions.ts +++ b/apps/sim/socket/middleware/permissions.ts @@ -73,7 +73,7 @@ export function checkRolePermission( return { allowed: true } } -export async function verifyWorkspaceMembership( +async function verifyWorkspaceMembership( userId: string, workspaceId: string ): Promise { diff --git a/apps/sim/socket/rooms/index.ts b/apps/sim/socket/rooms/index.ts new file mode 100644 index 000000000..3cbdc4134 --- /dev/null +++ b/apps/sim/socket/rooms/index.ts @@ -0,0 +1,3 @@ +export { MemoryRoomManager } from '@/socket/rooms/memory-manager' +export { RedisRoomManager } from '@/socket/rooms/redis-manager' +export type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/socket/rooms/types' diff --git a/apps/sim/socket/rooms/manager.ts b/apps/sim/socket/rooms/manager.ts deleted file mode 100644 index 8273834ff..000000000 --- a/apps/sim/socket/rooms/manager.ts +++ /dev/null @@ -1,291 +0,0 @@ -import * as schema from '@sim/db/schema' -import { workflowBlocks, workflowEdges } from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import { and, eq, isNull } from 'drizzle-orm' -import { drizzle } from 'drizzle-orm/postgres-js' -import postgres from 'postgres' -import type { Server } from 'socket.io' -import { env } from '@/lib/core/config/env' - -const connectionString = env.DATABASE_URL -const db = drizzle( - postgres(connectionString, { - prepare: false, - idle_timeout: 15, - connect_timeout: 20, - max: 3, - onnotice: () => {}, - }), - { schema } -) - -const logger = createLogger('RoomManager') - -export interface UserPresence { - userId: string - workflowId: string - userName: string - socketId: string - joinedAt: number - lastActivity: number - role: string - cursor?: { x: number; y: number } - selection?: { type: 'block' | 'edge' | 'none'; id?: string } - avatarUrl?: string | null -} - -export interface WorkflowRoom { - workflowId: string - users: Map // socketId -> UserPresence - lastModified: number - activeConnections: number -} - -export class RoomManager { - private workflowRooms = new Map() - private socketToWorkflow = new Map() - private userSessions = new Map< - string, - { userId: string; userName: string; avatarUrl?: string | null } - >() - private io: Server - - constructor(io: Server) { - this.io = io - } - - createWorkflowRoom(workflowId: string): WorkflowRoom { - return { - workflowId, - users: new Map(), - lastModified: Date.now(), - activeConnections: 0, - } - } - - cleanupUserFromRoom(socketId: string, workflowId: string) { - const room = this.workflowRooms.get(workflowId) - if (room) { - room.users.delete(socketId) - room.activeConnections = Math.max(0, room.activeConnections - 1) - - if (room.activeConnections === 0) { - this.workflowRooms.delete(workflowId) - logger.info(`Cleaned up empty workflow room: ${workflowId}`) - } - } - - this.socketToWorkflow.delete(socketId) - this.userSessions.delete(socketId) - } - - handleWorkflowDeletion(workflowId: string) { - logger.info(`Handling workflow deletion notification for ${workflowId}`) - - const room = this.workflowRooms.get(workflowId) - if (!room) { - logger.debug(`No active room found for deleted workflow ${workflowId}`) - return - } - - this.io.to(workflowId).emit('workflow-deleted', { - workflowId, - message: 'This workflow has been deleted', - timestamp: Date.now(), - }) - - const socketsToDisconnect: string[] = [] - room.users.forEach((_presence, socketId) => { - socketsToDisconnect.push(socketId) - }) - - socketsToDisconnect.forEach((socketId) => { - const socket = this.io.sockets.sockets.get(socketId) - if (socket) { - socket.leave(workflowId) - logger.debug(`Disconnected socket ${socketId} from deleted workflow ${workflowId}`) - } - this.cleanupUserFromRoom(socketId, workflowId) - }) - - this.workflowRooms.delete(workflowId) - logger.info( - `Cleaned up workflow room ${workflowId} after deletion (${socketsToDisconnect.length} users disconnected)` - ) - } - - handleWorkflowRevert(workflowId: string, timestamp: number) { - logger.info(`Handling workflow revert notification for ${workflowId}`) - - const room = this.workflowRooms.get(workflowId) - if (!room) { - logger.debug(`No active room found for reverted workflow ${workflowId}`) - return - } - - this.io.to(workflowId).emit('workflow-reverted', { - workflowId, - message: 'Workflow has been reverted to deployed state', - timestamp, - }) - - room.lastModified = timestamp - - logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`) - } - - handleWorkflowUpdate(workflowId: string) { - logger.info(`Handling workflow update notification for ${workflowId}`) - - const room = this.workflowRooms.get(workflowId) - if (!room) { - logger.debug(`No active room found for updated workflow ${workflowId}`) - return - } - - const timestamp = Date.now() - - // Notify all clients in the workflow room that the workflow has been updated - // This will trigger them to refresh their local state - this.io.to(workflowId).emit('workflow-updated', { - workflowId, - message: 'Workflow has been updated externally', - timestamp, - }) - - room.lastModified = timestamp - - logger.info(`Notified ${room.users.size} users about workflow update: ${workflowId}`) - } - - handleCopilotWorkflowEdit(workflowId: string, description?: string) { - logger.info(`Handling copilot workflow edit notification for ${workflowId}`) - - const room = this.workflowRooms.get(workflowId) - if (!room) { - logger.debug(`No active room found for copilot workflow edit ${workflowId}`) - return - } - - const timestamp = Date.now() - - // Emit special event for copilot edits that tells clients to rehydrate from database - this.io.to(workflowId).emit('copilot-workflow-edit', { - workflowId, - description, - message: 'Copilot has edited the workflow - rehydrating from database', - timestamp, - }) - - room.lastModified = timestamp - - logger.info(`Notified ${room.users.size} users about copilot workflow edit: ${workflowId}`) - } - - async validateWorkflowConsistency( - workflowId: string - ): Promise<{ valid: boolean; issues: string[] }> { - try { - const issues: string[] = [] - - const orphanedEdges = await db - .select({ - id: workflowEdges.id, - sourceBlockId: workflowEdges.sourceBlockId, - targetBlockId: workflowEdges.targetBlockId, - }) - .from(workflowEdges) - .leftJoin(workflowBlocks, eq(workflowEdges.sourceBlockId, workflowBlocks.id)) - .where(and(eq(workflowEdges.workflowId, workflowId), isNull(workflowBlocks.id))) - - if (orphanedEdges.length > 0) { - issues.push(`Found ${orphanedEdges.length} orphaned edges with missing source blocks`) - } - - return { valid: issues.length === 0, issues } - } catch (error) { - logger.error('Error validating workflow consistency:', error) - return { valid: false, issues: ['Consistency check failed'] } - } - } - - getWorkflowRooms(): ReadonlyMap { - return this.workflowRooms - } - - getSocketToWorkflow(): ReadonlyMap { - return this.socketToWorkflow - } - - getUserSessions(): ReadonlyMap { - return this.userSessions - } - - hasWorkflowRoom(workflowId: string): boolean { - return this.workflowRooms.has(workflowId) - } - - getWorkflowRoom(workflowId: string): WorkflowRoom | undefined { - return this.workflowRooms.get(workflowId) - } - - setWorkflowRoom(workflowId: string, room: WorkflowRoom): void { - this.workflowRooms.set(workflowId, room) - } - - getWorkflowIdForSocket(socketId: string): string | undefined { - return this.socketToWorkflow.get(socketId) - } - - setWorkflowForSocket(socketId: string, workflowId: string): void { - this.socketToWorkflow.set(socketId, workflowId) - } - - getUserSession( - socketId: string - ): { userId: string; userName: string; avatarUrl?: string | null } | undefined { - return this.userSessions.get(socketId) - } - - setUserSession( - socketId: string, - session: { userId: string; userName: string; avatarUrl?: string | null } - ): void { - this.userSessions.set(socketId, session) - } - - getTotalActiveConnections(): number { - return Array.from(this.workflowRooms.values()).reduce( - (total, room) => total + room.activeConnections, - 0 - ) - } - - broadcastPresenceUpdate(workflowId: string): void { - const room = this.workflowRooms.get(workflowId) - if (room) { - const roomPresence = Array.from(room.users.values()) - this.io.to(workflowId).emit('presence-update', roomPresence) - } - } - - emitToWorkflow(workflowId: string, event: string, payload: T): void { - this.io.to(workflowId).emit(event, payload) - } - - /** - * Get the number of unique users in a workflow room - * (not the number of socket connections) - */ - getUniqueUserCount(workflowId: string): number { - const room = this.workflowRooms.get(workflowId) - if (!room) return 0 - - const uniqueUsers = new Set() - room.users.forEach((presence) => { - uniqueUsers.add(presence.userId) - }) - - return uniqueUsers.size - } -} diff --git a/apps/sim/socket/rooms/memory-manager.ts b/apps/sim/socket/rooms/memory-manager.ts new file mode 100644 index 000000000..0aaf253bf --- /dev/null +++ b/apps/sim/socket/rooms/memory-manager.ts @@ -0,0 +1,260 @@ +import { createLogger } from '@sim/logger' +import type { Server } from 'socket.io' +import type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/socket/rooms/types' + +const logger = createLogger('MemoryRoomManager') + +/** + * In-memory room manager for single-pod deployments + * Used as fallback when REDIS_URL is not configured + */ +export class MemoryRoomManager implements IRoomManager { + private workflowRooms = new Map() + private socketToWorkflow = new Map() + private userSessions = new Map() + private _io: Server + + constructor(io: Server) { + this._io = io + } + + get io(): Server { + return this._io + } + + async initialize(): Promise { + logger.info('MemoryRoomManager initialized (single-pod mode)') + } + + async shutdown(): Promise { + this.workflowRooms.clear() + this.socketToWorkflow.clear() + this.userSessions.clear() + logger.info('MemoryRoomManager shutdown complete') + } + + async addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise { + // Create room if it doesn't exist + if (!this.workflowRooms.has(workflowId)) { + this.workflowRooms.set(workflowId, { + workflowId, + users: new Map(), + lastModified: Date.now(), + activeConnections: 0, + }) + } + + const room = this.workflowRooms.get(workflowId)! + room.users.set(socketId, presence) + room.activeConnections++ + room.lastModified = Date.now() + + // Map socket to workflow + this.socketToWorkflow.set(socketId, workflowId) + + // Store session + this.userSessions.set(socketId, { + userId: presence.userId, + userName: presence.userName, + avatarUrl: presence.avatarUrl, + }) + + logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`) + } + + async removeUserFromRoom(socketId: string): Promise { + const workflowId = this.socketToWorkflow.get(socketId) + + if (!workflowId) { + return null + } + + const room = this.workflowRooms.get(workflowId) + if (room) { + room.users.delete(socketId) + room.activeConnections = Math.max(0, room.activeConnections - 1) + + // Clean up empty rooms + if (room.activeConnections === 0) { + this.workflowRooms.delete(workflowId) + logger.info(`Cleaned up empty workflow room: ${workflowId}`) + } + } + + this.socketToWorkflow.delete(socketId) + this.userSessions.delete(socketId) + + logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`) + return workflowId + } + + async getWorkflowIdForSocket(socketId: string): Promise { + return this.socketToWorkflow.get(socketId) ?? null + } + + async getUserSession(socketId: string): Promise { + return this.userSessions.get(socketId) ?? null + } + + async getWorkflowUsers(workflowId: string): Promise { + const room = this.workflowRooms.get(workflowId) + if (!room) return [] + return Array.from(room.users.values()) + } + + async hasWorkflowRoom(workflowId: string): Promise { + return this.workflowRooms.has(workflowId) + } + + async updateUserActivity( + workflowId: string, + socketId: string, + updates: Partial> + ): Promise { + const room = this.workflowRooms.get(workflowId) + if (!room) return + + const presence = room.users.get(socketId) + if (presence) { + if (updates.cursor !== undefined) presence.cursor = updates.cursor + if (updates.selection !== undefined) presence.selection = updates.selection + presence.lastActivity = updates.lastActivity ?? Date.now() + } + } + + async updateRoomLastModified(workflowId: string): Promise { + const room = this.workflowRooms.get(workflowId) + if (room) { + room.lastModified = Date.now() + } + } + + async broadcastPresenceUpdate(workflowId: string): Promise { + const users = await this.getWorkflowUsers(workflowId) + this._io.to(workflowId).emit('presence-update', users) + } + + emitToWorkflow(workflowId: string, event: string, payload: T): void { + this._io.to(workflowId).emit(event, payload) + } + + async getUniqueUserCount(workflowId: string): Promise { + const room = this.workflowRooms.get(workflowId) + if (!room) return 0 + + const uniqueUsers = new Set() + room.users.forEach((presence) => { + uniqueUsers.add(presence.userId) + }) + + return uniqueUsers.size + } + + async getTotalActiveConnections(): Promise { + let total = 0 + for (const room of this.workflowRooms.values()) { + total += room.activeConnections + } + return total + } + + async handleWorkflowDeletion(workflowId: string): Promise { + logger.info(`Handling workflow deletion notification for ${workflowId}`) + + const room = this.workflowRooms.get(workflowId) + if (!room) { + logger.debug(`No active room found for deleted workflow ${workflowId}`) + return + } + + this._io.to(workflowId).emit('workflow-deleted', { + workflowId, + message: 'This workflow has been deleted', + timestamp: Date.now(), + }) + + const socketsToDisconnect: string[] = [] + room.users.forEach((_presence, socketId) => { + socketsToDisconnect.push(socketId) + }) + + for (const socketId of socketsToDisconnect) { + const socket = this._io.sockets.sockets.get(socketId) + if (socket) { + socket.leave(workflowId) + logger.debug(`Disconnected socket ${socketId} from deleted workflow ${workflowId}`) + } + await this.removeUserFromRoom(socketId) + } + + this.workflowRooms.delete(workflowId) + logger.info( + `Cleaned up workflow room ${workflowId} after deletion (${socketsToDisconnect.length} users disconnected)` + ) + } + + async handleWorkflowRevert(workflowId: string, timestamp: number): Promise { + logger.info(`Handling workflow revert notification for ${workflowId}`) + + const room = this.workflowRooms.get(workflowId) + if (!room) { + logger.debug(`No active room found for reverted workflow ${workflowId}`) + return + } + + this._io.to(workflowId).emit('workflow-reverted', { + workflowId, + message: 'Workflow has been reverted to deployed state', + timestamp, + }) + + room.lastModified = timestamp + + logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`) + } + + async handleWorkflowUpdate(workflowId: string): Promise { + logger.info(`Handling workflow update notification for ${workflowId}`) + + const room = this.workflowRooms.get(workflowId) + if (!room) { + logger.debug(`No active room found for updated workflow ${workflowId}`) + return + } + + const timestamp = Date.now() + + this._io.to(workflowId).emit('workflow-updated', { + workflowId, + message: 'Workflow has been updated externally', + timestamp, + }) + + room.lastModified = timestamp + + logger.info(`Notified ${room.users.size} users about workflow update: ${workflowId}`) + } + + async handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise { + logger.info(`Handling copilot workflow edit notification for ${workflowId}`) + + const room = this.workflowRooms.get(workflowId) + if (!room) { + logger.debug(`No active room found for copilot workflow edit ${workflowId}`) + return + } + + const timestamp = Date.now() + + this._io.to(workflowId).emit('copilot-workflow-edit', { + workflowId, + description, + message: 'Copilot has edited the workflow - rehydrating from database', + timestamp, + }) + + room.lastModified = timestamp + + logger.info(`Notified ${room.users.size} users about copilot workflow edit: ${workflowId}`) + } +} diff --git a/apps/sim/socket/rooms/redis-manager.ts b/apps/sim/socket/rooms/redis-manager.ts new file mode 100644 index 000000000..20e59444d --- /dev/null +++ b/apps/sim/socket/rooms/redis-manager.ts @@ -0,0 +1,422 @@ +import { createLogger } from '@sim/logger' +import { createClient, type RedisClientType } from 'redis' +import type { Server } from 'socket.io' +import type { IRoomManager, UserPresence, UserSession } from '@/socket/rooms/types' + +const logger = createLogger('RedisRoomManager') + +const KEYS = { + workflowUsers: (wfId: string) => `workflow:${wfId}:users`, + workflowMeta: (wfId: string) => `workflow:${wfId}:meta`, + socketWorkflow: (socketId: string) => `socket:${socketId}:workflow`, + socketSession: (socketId: string) => `socket:${socketId}:session`, +} as const + +const SOCKET_KEY_TTL = 3600 + +/** + * Lua script for atomic user removal from room. + * Returns workflowId if user was removed, null otherwise. + * Handles room cleanup atomically to prevent race conditions. + */ +const REMOVE_USER_SCRIPT = ` +local socketWorkflowKey = KEYS[1] +local socketSessionKey = KEYS[2] +local workflowUsersPrefix = ARGV[1] +local workflowMetaPrefix = ARGV[2] +local socketId = ARGV[3] + +local workflowId = redis.call('GET', socketWorkflowKey) +if not workflowId then + return nil +end + +local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users' +local workflowMetaKey = workflowMetaPrefix .. workflowId .. ':meta' + +redis.call('HDEL', workflowUsersKey, socketId) +redis.call('DEL', socketWorkflowKey, socketSessionKey) + +local remaining = redis.call('HLEN', workflowUsersKey) +if remaining == 0 then + redis.call('DEL', workflowUsersKey, workflowMetaKey) +end + +return workflowId +` + +/** + * Lua script for atomic user activity update. + * Performs read-modify-write atomically to prevent lost updates. + */ +const UPDATE_ACTIVITY_SCRIPT = ` +local workflowUsersKey = KEYS[1] +local socketId = ARGV[1] +local cursorJson = ARGV[2] +local selectionJson = ARGV[3] +local lastActivity = ARGV[4] + +local existingJson = redis.call('HGET', workflowUsersKey, socketId) +if not existingJson then + return 0 +end + +local existing = cjson.decode(existingJson) + +if cursorJson ~= '' then + existing.cursor = cjson.decode(cursorJson) +end +if selectionJson ~= '' then + existing.selection = cjson.decode(selectionJson) +end +existing.lastActivity = tonumber(lastActivity) + +redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing)) +return 1 +` + +/** + * Redis-backed room manager for multi-pod deployments. + * Uses Lua scripts for atomic operations to prevent race conditions. + */ +export class RedisRoomManager implements IRoomManager { + private redis: RedisClientType + private _io: Server + private isConnected = false + private removeUserScriptSha: string | null = null + private updateActivityScriptSha: string | null = null + + constructor(io: Server, redisUrl: string) { + this._io = io + this.redis = createClient({ + url: redisUrl, + socket: { + reconnectStrategy: (retries) => { + if (retries > 10) { + logger.error('Redis reconnection failed after 10 attempts') + return new Error('Redis reconnection failed') + } + const delay = Math.min(retries * 100, 3000) + logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`) + return delay + }, + }, + }) + + this.redis.on('error', (err) => { + logger.error('Redis client error:', err) + }) + + this.redis.on('reconnecting', () => { + logger.warn('Redis client reconnecting...') + this.isConnected = false + }) + + this.redis.on('ready', () => { + logger.info('Redis client ready') + this.isConnected = true + }) + } + + get io(): Server { + return this._io + } + + async initialize(): Promise { + if (this.isConnected) return + + try { + await this.redis.connect() + this.isConnected = true + + // Pre-load Lua scripts for better performance + this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT) + this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT) + + logger.info('RedisRoomManager connected to Redis and scripts loaded') + } catch (error) { + logger.error('Failed to connect to Redis:', error) + throw error + } + } + + async shutdown(): Promise { + if (!this.isConnected) return + + try { + await this.redis.quit() + this.isConnected = false + logger.info('RedisRoomManager disconnected from Redis') + } catch (error) { + logger.error('Error during Redis shutdown:', error) + } + } + + async addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise { + try { + const pipeline = this.redis.multi() + + pipeline.hSet(KEYS.workflowUsers(workflowId), socketId, JSON.stringify(presence)) + pipeline.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString()) + pipeline.set(KEYS.socketWorkflow(socketId), workflowId) + pipeline.expire(KEYS.socketWorkflow(socketId), SOCKET_KEY_TTL) + pipeline.hSet(KEYS.socketSession(socketId), { + userId: presence.userId, + userName: presence.userName, + avatarUrl: presence.avatarUrl || '', + }) + pipeline.expire(KEYS.socketSession(socketId), SOCKET_KEY_TTL) + + const results = await pipeline.exec() + + // Check if any command failed + const failed = results.some((result) => result instanceof Error) + if (failed) { + logger.error(`Pipeline partially failed when adding user to room`, { workflowId, socketId }) + } + + logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`) + } catch (error) { + logger.error(`Failed to add user to room: ${socketId} -> ${workflowId}`, error) + throw error + } + } + + async removeUserFromRoom(socketId: string, retried = false): Promise { + if (!this.removeUserScriptSha) { + logger.error('removeUserFromRoom called before initialize()') + return null + } + + try { + const workflowId = await this.redis.evalSha(this.removeUserScriptSha, { + keys: [KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId)], + arguments: ['workflow:', 'workflow:', socketId], + }) + + if (workflowId) { + logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`) + } + return workflowId as string | null + } catch (error) { + if ((error as Error).message?.includes('NOSCRIPT') && !retried) { + logger.warn('Lua script not found, reloading...') + this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT) + return this.removeUserFromRoom(socketId, true) + } + logger.error(`Failed to remove user from room: ${socketId}`, error) + return null + } + } + + async getWorkflowIdForSocket(socketId: string): Promise { + return this.redis.get(KEYS.socketWorkflow(socketId)) + } + + async getUserSession(socketId: string): Promise { + try { + const session = await this.redis.hGetAll(KEYS.socketSession(socketId)) + + if (!session.userId) { + return null + } + + return { + userId: session.userId, + userName: session.userName, + avatarUrl: session.avatarUrl || undefined, + } + } catch (error) { + logger.error(`Failed to get user session for ${socketId}:`, error) + return null + } + } + + async getWorkflowUsers(workflowId: string): Promise { + try { + const users = await this.redis.hGetAll(KEYS.workflowUsers(workflowId)) + return Object.entries(users) + .map(([socketId, json]) => { + try { + return JSON.parse(json) as UserPresence + } catch { + logger.warn(`Corrupted user data for socket ${socketId}, skipping`) + return null + } + }) + .filter((u): u is UserPresence => u !== null) + } catch (error) { + logger.error(`Failed to get workflow users for ${workflowId}:`, error) + return [] + } + } + + async hasWorkflowRoom(workflowId: string): Promise { + const exists = await this.redis.exists(KEYS.workflowUsers(workflowId)) + return exists > 0 + } + + async updateUserActivity( + workflowId: string, + socketId: string, + updates: Partial>, + retried = false + ): Promise { + if (!this.updateActivityScriptSha) { + logger.error('updateUserActivity called before initialize()') + return + } + + try { + await this.redis.evalSha(this.updateActivityScriptSha, { + keys: [KEYS.workflowUsers(workflowId)], + arguments: [ + socketId, + updates.cursor ? JSON.stringify(updates.cursor) : '', + updates.selection ? JSON.stringify(updates.selection) : '', + (updates.lastActivity ?? Date.now()).toString(), + ], + }) + } catch (error) { + if ((error as Error).message?.includes('NOSCRIPT') && !retried) { + logger.warn('Lua script not found, reloading...') + this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT) + return this.updateUserActivity(workflowId, socketId, updates, true) + } + logger.error(`Failed to update user activity: ${socketId}`, error) + } + } + + async updateRoomLastModified(workflowId: string): Promise { + await this.redis.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString()) + } + + async broadcastPresenceUpdate(workflowId: string): Promise { + const users = await this.getWorkflowUsers(workflowId) + // io.to() with Redis adapter broadcasts to all pods + this._io.to(workflowId).emit('presence-update', users) + } + + emitToWorkflow(workflowId: string, event: string, payload: T): void { + this._io.to(workflowId).emit(event, payload) + } + + async getUniqueUserCount(workflowId: string): Promise { + const users = await this.getWorkflowUsers(workflowId) + const uniqueUserIds = new Set(users.map((u) => u.userId)) + return uniqueUserIds.size + } + + async getTotalActiveConnections(): Promise { + // This is more complex with Redis - we'd need to scan all workflow:*:users keys + // For now, just count sockets in this server instance + // The true count would require aggregating across all pods + return this._io.sockets.sockets.size + } + + async handleWorkflowDeletion(workflowId: string): Promise { + logger.info(`Handling workflow deletion notification for ${workflowId}`) + + try { + const users = await this.getWorkflowUsers(workflowId) + if (users.length === 0) { + logger.debug(`No active users found for deleted workflow ${workflowId}`) + return + } + + // Notify all clients across all pods via Redis adapter + this._io.to(workflowId).emit('workflow-deleted', { + workflowId, + message: 'This workflow has been deleted', + timestamp: Date.now(), + }) + + // Use Socket.IO's cross-pod socketsLeave() to remove all sockets from the room + // This works across all pods when using the Redis adapter + await this._io.in(workflowId).socketsLeave(workflowId) + logger.debug(`All sockets left workflow room ${workflowId} via socketsLeave()`) + + // Remove all users from Redis state + for (const user of users) { + await this.removeUserFromRoom(user.socketId) + } + + // Clean up room data + await this.redis.del([KEYS.workflowUsers(workflowId), KEYS.workflowMeta(workflowId)]) + + logger.info( + `Cleaned up workflow room ${workflowId} after deletion (${users.length} users disconnected)` + ) + } catch (error) { + logger.error(`Failed to handle workflow deletion for ${workflowId}:`, error) + } + } + + async handleWorkflowRevert(workflowId: string, timestamp: number): Promise { + logger.info(`Handling workflow revert notification for ${workflowId}`) + + const hasRoom = await this.hasWorkflowRoom(workflowId) + if (!hasRoom) { + logger.debug(`No active room found for reverted workflow ${workflowId}`) + return + } + + this._io.to(workflowId).emit('workflow-reverted', { + workflowId, + message: 'Workflow has been reverted to deployed state', + timestamp, + }) + + await this.updateRoomLastModified(workflowId) + + const userCount = await this.getUniqueUserCount(workflowId) + logger.info(`Notified ${userCount} users about workflow revert: ${workflowId}`) + } + + async handleWorkflowUpdate(workflowId: string): Promise { + logger.info(`Handling workflow update notification for ${workflowId}`) + + const hasRoom = await this.hasWorkflowRoom(workflowId) + if (!hasRoom) { + logger.debug(`No active room found for updated workflow ${workflowId}`) + return + } + + const timestamp = Date.now() + + this._io.to(workflowId).emit('workflow-updated', { + workflowId, + message: 'Workflow has been updated externally', + timestamp, + }) + + await this.updateRoomLastModified(workflowId) + + const userCount = await this.getUniqueUserCount(workflowId) + logger.info(`Notified ${userCount} users about workflow update: ${workflowId}`) + } + + async handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise { + logger.info(`Handling copilot workflow edit notification for ${workflowId}`) + + const hasRoom = await this.hasWorkflowRoom(workflowId) + if (!hasRoom) { + logger.debug(`No active room found for copilot workflow edit ${workflowId}`) + return + } + + const timestamp = Date.now() + + this._io.to(workflowId).emit('copilot-workflow-edit', { + workflowId, + description, + message: 'Copilot has edited the workflow - rehydrating from database', + timestamp, + }) + + await this.updateRoomLastModified(workflowId) + + const userCount = await this.getUniqueUserCount(workflowId) + logger.info(`Notified ${userCount} users about copilot workflow edit: ${workflowId}`) + } +} diff --git a/apps/sim/socket/rooms/types.ts b/apps/sim/socket/rooms/types.ts new file mode 100644 index 000000000..6a5edb5e6 --- /dev/null +++ b/apps/sim/socket/rooms/types.ts @@ -0,0 +1,140 @@ +import type { Server } from 'socket.io' + +/** + * User presence data stored in room state + */ +export interface UserPresence { + userId: string + workflowId: string + userName: string + socketId: string + tabSessionId?: string + joinedAt: number + lastActivity: number + role: string + cursor?: { x: number; y: number } + selection?: { type: 'block' | 'edge' | 'none'; id?: string } + avatarUrl?: string | null +} + +/** + * User session data (minimal info for quick lookups) + */ +export interface UserSession { + userId: string + userName: string + avatarUrl?: string | null +} + +/** + * Workflow room state + */ +export interface WorkflowRoom { + workflowId: string + users: Map + lastModified: number + activeConnections: number +} + +/** + * Common interface for room managers (in-memory and Redis) + * All methods that access state are async to support Redis operations + */ +export interface IRoomManager { + readonly io: Server + + /** + * Initialize the room manager (connect to Redis, etc.) + */ + initialize(): Promise + + /** + * Clean shutdown + */ + shutdown(): Promise + + /** + * Add a user to a workflow room + */ + addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise + + /** + * Remove a user from their current room + * Returns the workflowId they were in, or null if not in any room + */ + removeUserFromRoom(socketId: string): Promise + + /** + * Get the workflow ID for a socket + */ + getWorkflowIdForSocket(socketId: string): Promise + + /** + * Get user session data for a socket + */ + getUserSession(socketId: string): Promise + + /** + * Get all users in a workflow room + */ + getWorkflowUsers(workflowId: string): Promise + + /** + * Check if a workflow room exists + */ + hasWorkflowRoom(workflowId: string): Promise + + /** + * Update user activity (cursor, selection, lastActivity) + */ + updateUserActivity( + workflowId: string, + socketId: string, + updates: Partial> + ): Promise + + /** + * Update room's lastModified timestamp + */ + updateRoomLastModified(workflowId: string): Promise + + /** + * Broadcast presence update to all clients in a workflow room + */ + broadcastPresenceUpdate(workflowId: string): Promise + + /** + * Emit an event to all clients in a workflow room + */ + emitToWorkflow(workflowId: string, event: string, payload: T): void + + /** + * Get the number of unique users in a workflow room + */ + getUniqueUserCount(workflowId: string): Promise + + /** + * Get total active connections across all rooms + */ + getTotalActiveConnections(): Promise + + /** + * Handle workflow deletion - notify users and clean up room + */ + handleWorkflowDeletion(workflowId: string): Promise + + /** + * Handle workflow revert - notify users + */ + handleWorkflowRevert(workflowId: string, timestamp: number): Promise + + /** + * Handle workflow update - notify users + */ + handleWorkflowUpdate(workflowId: string): Promise + + /** + * Handle copilot workflow edit - notify users to rehydrate + */ + handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise +} diff --git a/apps/sim/socket/routes/http.ts b/apps/sim/socket/routes/http.ts index 6fdbdbace..67753aa2f 100644 --- a/apps/sim/socket/routes/http.ts +++ b/apps/sim/socket/routes/http.ts @@ -1,5 +1,5 @@ import type { IncomingMessage, ServerResponse } from 'http' -import type { RoomManager } from '@/socket/rooms/manager' +import type { IRoomManager } from '@/socket/rooms' interface Logger { info: (message: string, ...args: any[]) => void @@ -14,15 +14,16 @@ interface Logger { * @param logger - Logger instance for logging requests and errors * @returns HTTP request handler function */ -export function createHttpHandler(roomManager: RoomManager, logger: Logger) { - return (req: IncomingMessage, res: ServerResponse) => { +export function createHttpHandler(roomManager: IRoomManager, logger: Logger) { + return async (req: IncomingMessage, res: ServerResponse) => { if (req.method === 'GET' && req.url === '/health') { + const connections = await roomManager.getTotalActiveConnections() res.writeHead(200, { 'Content-Type': 'application/json' }) res.end( JSON.stringify({ status: 'ok', timestamp: new Date().toISOString(), - connections: roomManager.getTotalActiveConnections(), + connections, }) ) return @@ -34,10 +35,10 @@ export function createHttpHandler(roomManager: RoomManager, logger: Logger) { req.on('data', (chunk) => { body += chunk.toString() }) - req.on('end', () => { + req.on('end', async () => { try { const { workflowId } = JSON.parse(body) - roomManager.handleWorkflowDeletion(workflowId) + await roomManager.handleWorkflowDeletion(workflowId) res.writeHead(200, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ success: true })) } catch (error) { @@ -55,10 +56,10 @@ export function createHttpHandler(roomManager: RoomManager, logger: Logger) { req.on('data', (chunk) => { body += chunk.toString() }) - req.on('end', () => { + req.on('end', async () => { try { const { workflowId } = JSON.parse(body) - roomManager.handleWorkflowUpdate(workflowId) + await roomManager.handleWorkflowUpdate(workflowId) res.writeHead(200, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ success: true })) } catch (error) { @@ -76,10 +77,10 @@ export function createHttpHandler(roomManager: RoomManager, logger: Logger) { req.on('data', (chunk) => { body += chunk.toString() }) - req.on('end', () => { + req.on('end', async () => { try { const { workflowId, description } = JSON.parse(body) - roomManager.handleCopilotWorkflowEdit(workflowId, description) + await roomManager.handleCopilotWorkflowEdit(workflowId, description) res.writeHead(200, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ success: true })) } catch (error) { @@ -97,10 +98,10 @@ export function createHttpHandler(roomManager: RoomManager, logger: Logger) { req.on('data', (chunk) => { body += chunk.toString() }) - req.on('end', () => { + req.on('end', async () => { try { const { workflowId, timestamp } = JSON.parse(body) - roomManager.handleWorkflowRevert(workflowId, timestamp) + await roomManager.handleWorkflowRevert(workflowId, timestamp) res.writeHead(200, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ success: true })) } catch (error) { diff --git a/apps/sim/socket/validation/schemas.ts b/apps/sim/socket/validation/schemas.ts index e0d1c6fa3..c6d14d4ef 100644 --- a/apps/sim/socket/validation/schemas.ts +++ b/apps/sim/socket/validation/schemas.ts @@ -239,5 +239,3 @@ export const WorkflowOperationSchema = z.union([ VariableOperationSchema, WorkflowStateOperationSchema, ]) - -export { PositionSchema, AutoConnectEdgeSchema } diff --git a/bun.lock b/bun.lock index c76b5b453..4fc600c2a 100644 --- a/bun.lock +++ b/bun.lock @@ -104,6 +104,7 @@ "@react-email/components": "^0.0.34", "@react-email/render": "2.0.0", "@sim/logger": "workspace:*", + "@socket.io/redis-adapter": "8.3.0", "@t3-oss/env-nextjs": "0.13.4", "@tanstack/react-query": "5.90.8", "@tanstack/react-query-devtools": "5.90.2", @@ -174,6 +175,7 @@ "react-simple-code-editor": "^0.14.1", "react-window": "2.2.3", "reactflow": "^11.11.4", + "redis": "5.10.0", "rehype-autolink-headings": "^7.1.0", "rehype-slug": "^6.0.0", "remark-gfm": "4.0.1", @@ -1146,6 +1148,16 @@ "@reactflow/node-toolbar": ["@reactflow/node-toolbar@1.3.14", "", { "dependencies": { "@reactflow/core": "11.11.4", "classcat": "^5.0.3", "zustand": "^4.4.1" }, "peerDependencies": { "react": ">=17", "react-dom": ">=17" } }, "sha512-rbynXQnH/xFNu4P9H+hVqlEUafDCkEoCy0Dg9mG22Sg+rY/0ck6KkrAQrYrTgXusd+cEJOMK0uOOFCK2/5rSGQ=="], + "@redis/bloom": ["@redis/bloom@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-doIF37ob+l47n0rkpRNgU8n4iacBlKM9xLiP1LtTZTvz8TloJB8qx/MgvhMhKdYG+CvCY2aPBnN2706izFn/4A=="], + + "@redis/client": ["@redis/client@5.10.0", "", { "dependencies": { "cluster-key-slot": "1.1.2" } }, "sha512-JXmM4XCoso6C75Mr3lhKA3eNxSzkYi3nCzxDIKY+YOszYsJjuKbFgVtguVPbLMOttN4iu2fXoc2BGhdnYhIOxA=="], + + "@redis/json": ["@redis/json@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-B2G8XlOmTPUuZtD44EMGbtoepQG34RCDXLZbjrtON1Djet0t5Ri7/YPXvL9aomXqP8lLTreaprtyLKF4tmXEEA=="], + + "@redis/search": ["@redis/search@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-3SVcPswoSfp2HnmWbAGUzlbUPn7fOohVu2weUQ0S+EMiQi8jwjL+aN2p6V3TI65eNfVsJ8vyPvqWklm6H6esmg=="], + + "@redis/time-series": ["@redis/time-series@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-cPkpddXH5kc/SdRhF0YG0qtjL+noqFT0AcHbQ6axhsPsO7iqPi1cjxgdkE9TNeKiBUUdCaU1DbqkR/LzbzPBhg=="], + "@resvg/resvg-wasm": ["@resvg/resvg-wasm@2.4.0", "", {}, "sha512-C7c51Nn4yTxXFKvgh2txJFNweaVcfUPQxwEUFw4aWsCmfiBDJsTSwviIF8EcwjQ6k8bPyMWCl1vw4BdxE569Cg=="], "@rolldown/pluginutils": ["@rolldown/pluginutils@1.0.0-beta.27", "", {}, "sha512-+d0F4MKMCbeVUJwG96uQ4SgAznZNSq93I3V+9NHA4OpvqG8mRCpGdKmK8l/dl02h2CCDHwW2FqilnTyDcAnqjA=="], @@ -1340,6 +1352,8 @@ "@socket.io/component-emitter": ["@socket.io/component-emitter@3.1.2", "", {}, "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA=="], + "@socket.io/redis-adapter": ["@socket.io/redis-adapter@8.3.0", "", { "dependencies": { "debug": "~4.3.1", "notepack.io": "~3.0.1", "uid2": "1.0.0" }, "peerDependencies": { "socket.io-adapter": "^2.5.4" } }, "sha512-ly0cra+48hDmChxmIpnESKrc94LjRL80TEmZVscuQ/WWkRP81nNj8W8cCGMqbI4L6NCuAaPRSzZF1a9GlAxxnA=="], + "@standard-schema/spec": ["@standard-schema/spec@1.1.0", "", {}, "sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w=="], "@standard-schema/utils": ["@standard-schema/utils@0.3.0", "", {}, "sha512-e7Mew686owMaPJVNNLs55PUvgz371nKgwsc4vxE49zsODpJEnxgxRo2y/OKrqueavXgZNMDVj3DdHFlaSAeU8g=="], @@ -2802,6 +2816,8 @@ "normalize-range": ["normalize-range@0.1.2", "", {}, "sha512-bdok/XvKII3nUpklnV6P2hxtMNrCboOjAcyBuQnWEhO665FwrSNRxU+AqpsyvO6LgGYPspN+lu5CLtw4jPRKNA=="], + "notepack.io": ["notepack.io@3.0.1", "", {}, "sha512-TKC/8zH5pXIAMVQio2TvVDTtPRX+DJPHDqjRbxogtFiByHyzKmy96RA0JtCQJ+WouyyL4A10xomQzgbUT+1jCg=="], + "npm-run-path": ["npm-run-path@5.3.0", "", { "dependencies": { "path-key": "^4.0.0" } }, "sha512-ppwTtiJZq0O/ai0z7yfudtBpWIoxM8yE6nHi1X47eFR2EWORqfbu6CnPlNsjeN683eT0qG6H/Pyf9fCcvjnnnQ=="], "npm-to-yarn": ["npm-to-yarn@3.0.1", "", {}, "sha512-tt6PvKu4WyzPwWUzy/hvPFqn+uwXO0K1ZHka8az3NnrhWJDmSqI8ncWq0fkL0k/lmmi5tAC11FXwXuh0rFbt1A=="], @@ -3072,6 +3088,8 @@ "redent": ["redent@3.0.0", "", { "dependencies": { "indent-string": "^4.0.0", "strip-indent": "^3.0.0" } }, "sha512-6tDA8g98We0zd0GvVeMT9arEOnTw9qM03L9cJXaCjrip1OO764RDBLBfrB4cwzNGDj5OA5ioymC9GkizgWJDUg=="], + "redis": ["redis@5.10.0", "", { "dependencies": { "@redis/bloom": "5.10.0", "@redis/client": "5.10.0", "@redis/json": "5.10.0", "@redis/search": "5.10.0", "@redis/time-series": "5.10.0" } }, "sha512-0/Y+7IEiTgVGPrLFKy8oAEArSyEJkU0zvgV5xyi9NzNQ+SLZmyFbUsWIbgPcd4UdUh00opXGKlXJwMmsis5Byw=="], + "redis-errors": ["redis-errors@1.2.0", "", {}, "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w=="], "redis-parser": ["redis-parser@3.0.0", "", { "dependencies": { "redis-errors": "^1.0.0" } }, "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A=="], @@ -3434,6 +3452,8 @@ "ufo": ["ufo@1.6.3", "", {}, "sha512-yDJTmhydvl5lJzBmy/hyOAA0d+aqCBuwl818haVdYCRrWV84o7YyeVm4QlVHStqNrrJSTb6jKuFAVqAFsr+K3Q=="], + "uid2": ["uid2@1.0.0", "", {}, "sha512-+I6aJUv63YAcY9n4mQreLUt0d4lvwkkopDNmpomkAUz0fAkEMV9pRWxN0EjhW1YfRhcuyHg2v3mwddCDW1+LFQ=="], + "ulid": ["ulid@2.4.0", "", { "bin": { "ulid": "bin/cli.js" } }, "sha512-fIRiVTJNcSRmXKPZtGzFQv9WRrZ3M9eoptl/teFJvjOzmpU+/K/JH6HZ8deBfb5vMEpicJcLn7JmvdknlMq7Zg=="], "unbzip2-stream": ["unbzip2-stream@1.4.3", "", { "dependencies": { "buffer": "^5.2.1", "through": "^2.3.8" } }, "sha512-mlExGW4w71ebDJviH16lQLtZS32VKqsSfk80GCfUlwT/4/hNRFsoscrF/c++9xinkMzECL1uL9DDwXqFWkruPg=="], @@ -3852,6 +3872,8 @@ "@shuding/opentype.js/fflate": ["fflate@0.7.4", "", {}, "sha512-5u2V/CDW15QM1XbbgS+0DfPxVB+jUKhWEKuuFuHncbk3tEEqzmoXL+2KyOFuKGqOnmdIy0/davWF1CkuwtibCw=="], + "@socket.io/redis-adapter/debug": ["debug@4.3.7", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ=="], + "@tailwindcss/node/jiti": ["jiti@2.6.1", "", { "bin": { "jiti": "lib/jiti-cli.mjs" } }, "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ=="], "@tailwindcss/oxide-wasm32-wasi/@emnapi/core": ["@emnapi/core@1.8.1", "", { "dependencies": { "@emnapi/wasi-threads": "1.1.0", "tslib": "^2.4.0" }, "bundled": true }, "sha512-AvT9QFpxK0Zd8J0jopedNm+w/2fIzvtPKPjqyw9jwvBaReTTqPBk9Hixaz7KbjimP+QNz605/XnjFcDAL2pqBg=="], diff --git a/helm/sim/templates/deployment-realtime.yaml b/helm/sim/templates/deployment-realtime.yaml index 00054ecf3..c3f5826b6 100644 --- a/helm/sim/templates/deployment-realtime.yaml +++ b/helm/sim/templates/deployment-realtime.yaml @@ -44,6 +44,10 @@ spec: env: - name: DATABASE_URL value: {{ include "sim.databaseUrl" . | quote }} + {{- if .Values.app.env.REDIS_URL }} + - name: REDIS_URL + value: {{ .Values.app.env.REDIS_URL | quote }} + {{- end }} {{- range $key, $value := .Values.realtime.env }} - name: {{ $key }} value: {{ $value | quote }}