Compare commits

..

1 Commits

Author SHA1 Message Date
Cursor Agent
4b79f1fa59 feat(note-block): enable body dragging to match workflow block
Co-authored-by: emir <emir@simstudio.ai>
2026-01-30 02:45:05 +00:00
30 changed files with 971 additions and 1801 deletions

View File

@@ -4,22 +4,22 @@ import { auth } from '@/lib/auth'
import { isAuthDisabled } from '@/lib/core/config/feature-flags'
export async function POST() {
if (isAuthDisabled) {
return NextResponse.json({ token: 'anonymous-socket-token' })
}
try {
if (isAuthDisabled) {
return NextResponse.json({ token: 'anonymous-socket-token' })
}
const hdrs = await headers()
const response = await auth.api.generateOneTimeToken({
headers: hdrs,
})
if (!response?.token) {
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
if (!response) {
return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 })
}
return NextResponse.json({ token: response.token })
} catch {
} catch (error) {
return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 })
}
}

View File

@@ -97,10 +97,7 @@ export async function POST(
const socketServerUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
await fetch(`${socketServerUrl}/api/workflow-reverted`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ workflowId: id, timestamp: Date.now() }),
})
} catch (e) {

View File

@@ -361,10 +361,7 @@ export async function DELETE(
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
const socketResponse = await fetch(`${socketUrl}/api/workflow-deleted`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ workflowId }),
})

View File

@@ -254,10 +254,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
const notifyResponse = await fetch(`${socketUrl}/api/workflow-updated`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ workflowId }),
})

View File

@@ -530,18 +530,13 @@ export const NoteBlock = memo(function NoteBlock({
<div className='group relative'>
<div
className={cn(
'relative z-[20] w-[250px] cursor-default select-none rounded-[8px] border border-[var(--border)] bg-[var(--surface-2)]'
'note-drag-handle relative z-[20] w-[250px] cursor-grab select-none rounded-[8px] border border-[var(--border)] bg-[var(--surface-2)] [&:active]:cursor-grabbing'
)}
onClick={handleClick}
>
<ActionBar blockId={id} blockType={type} disabled={!userPermissions.canEdit} />
<div
className='note-drag-handle flex cursor-grab items-center justify-between border-[var(--divider)] border-b p-[8px] [&:active]:cursor-grabbing'
onMouseDown={(event) => {
event.stopPropagation()
}}
>
<div className='flex items-center justify-between border-[var(--divider)] border-b p-[8px]'>
<div className='flex min-w-0 flex-1 items-center'>
<span
className={cn(

View File

@@ -17,19 +17,6 @@ import { getEnv } from '@/lib/core/config/env'
const logger = createLogger('SocketContext')
const TAB_SESSION_ID_KEY = 'sim_tab_session_id'
function getTabSessionId(): string {
if (typeof window === 'undefined') return ''
let tabSessionId = sessionStorage.getItem(TAB_SESSION_ID_KEY)
if (!tabSessionId) {
tabSessionId = crypto.randomUUID()
sessionStorage.setItem(TAB_SESSION_ID_KEY, tabSessionId)
}
return tabSessionId
}
interface User {
id: string
name?: string
@@ -49,13 +36,11 @@ interface SocketContextType {
socket: Socket | null
isConnected: boolean
isConnecting: boolean
authFailed: boolean
currentWorkflowId: string | null
currentSocketId: string | null
presenceUsers: PresenceUser[]
joinWorkflow: (workflowId: string) => void
leaveWorkflow: () => void
retryConnection: () => void
emitWorkflowOperation: (
operation: string,
target: string,
@@ -78,6 +63,8 @@ interface SocketContextType {
onCursorUpdate: (handler: (data: any) => void) => void
onSelectionUpdate: (handler: (data: any) => void) => void
onUserJoined: (handler: (data: any) => void) => void
onUserLeft: (handler: (data: any) => void) => void
onWorkflowDeleted: (handler: (data: any) => void) => void
onWorkflowReverted: (handler: (data: any) => void) => void
onOperationConfirmed: (handler: (data: any) => void) => void
@@ -88,13 +75,11 @@ const SocketContext = createContext<SocketContextType>({
socket: null,
isConnected: false,
isConnecting: false,
authFailed: false,
currentWorkflowId: null,
currentSocketId: null,
presenceUsers: [],
joinWorkflow: () => {},
leaveWorkflow: () => {},
retryConnection: () => {},
emitWorkflowOperation: () => {},
emitSubblockUpdate: () => {},
emitVariableUpdate: () => {},
@@ -105,6 +90,8 @@ const SocketContext = createContext<SocketContextType>({
onVariableUpdate: () => {},
onCursorUpdate: () => {},
onSelectionUpdate: () => {},
onUserJoined: () => {},
onUserLeft: () => {},
onWorkflowDeleted: () => {},
onWorkflowReverted: () => {},
onOperationConfirmed: () => {},
@@ -125,43 +112,33 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [currentWorkflowId, setCurrentWorkflowId] = useState<string | null>(null)
const [currentSocketId, setCurrentSocketId] = useState<string | null>(null)
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
const [authFailed, setAuthFailed] = useState(false)
const initializedRef = useRef(false)
const socketRef = useRef<Socket | null>(null)
const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined
const urlWorkflowIdRef = useRef(urlWorkflowId)
urlWorkflowIdRef.current = urlWorkflowId
const eventHandlers = useRef<{
workflowOperation?: (data: any) => void
subblockUpdate?: (data: any) => void
variableUpdate?: (data: any) => void
cursorUpdate?: (data: any) => void
selectionUpdate?: (data: any) => void
userJoined?: (data: any) => void
userLeft?: (data: any) => void
workflowDeleted?: (data: any) => void
workflowReverted?: (data: any) => void
operationConfirmed?: (data: any) => void
operationFailed?: (data: any) => void
}>({})
const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
const isRejoiningRef = useRef<boolean>(false)
const pendingPositionUpdates = useRef<Map<string, any>>(new Map())
const generateSocketToken = async (): Promise<string> => {
const res = await fetch('/api/auth/socket-token', {
method: 'POST',
credentials: 'include',
headers: { 'cache-control': 'no-store' },
})
if (!res.ok) {
if (res.status === 401) {
throw new Error('Authentication required')
}
throw new Error('Failed to generate socket token')
}
if (!res.ok) throw new Error('Failed to generate socket token')
const body = await res.json().catch(() => ({}))
const token = body?.token
if (!token || typeof token !== 'string') throw new Error('Invalid socket token')
@@ -171,11 +148,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
useEffect(() => {
if (!user?.id) return
if (authFailed) {
logger.info('Socket initialization skipped - auth failed, waiting for retry')
return
}
if (initializedRef.current || socket || isConnecting) {
logger.info('Socket already exists or is connecting, skipping initialization')
return
@@ -208,11 +180,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
cb({ token: freshToken })
} catch (error) {
logger.error('Failed to generate fresh token for connection:', error)
if (error instanceof Error && error.message === 'Authentication required') {
// True auth failure - pass null token, server will reject with "Authentication required"
cb({ token: null })
}
// For server errors, don't call cb - connection will timeout and Socket.IO will retry
cb({ token: null })
}
},
})
@@ -226,19 +194,26 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
connected: socketInstance.connected,
transport: socketInstance.io.engine?.transport?.name,
})
// Note: join-workflow is handled by the useEffect watching isConnected
if (urlWorkflowId) {
logger.info(`Joining workflow room after connection: ${urlWorkflowId}`)
socketInstance.emit('join-workflow', {
workflowId: urlWorkflowId,
})
setCurrentWorkflowId(urlWorkflowId)
}
})
socketInstance.on('disconnect', (reason) => {
setIsConnected(false)
setIsConnecting(false)
setCurrentSocketId(null)
setCurrentWorkflowId(null)
setPresenceUsers([])
logger.info('Socket disconnected', {
reason,
})
setPresenceUsers([])
})
socketInstance.on('connect_error', (error: any) => {
@@ -251,34 +226,24 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
transport: error.transport,
})
// Check if this is an authentication failure
const isAuthError =
if (
error.message?.includes('Token validation failed') ||
error.message?.includes('Authentication failed') ||
error.message?.includes('Authentication required')
if (isAuthError) {
) {
logger.warn(
'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.'
'Authentication failed - this could indicate session expiry or token generation issues'
)
// Stop reconnection attempts to prevent infinite loop
socketInstance.disconnect()
// Reset state to allow re-initialization when session is restored
setSocket(null)
setAuthFailed(true)
initializedRef.current = false
}
})
socketInstance.on('reconnect', (attemptNumber) => {
setIsConnected(true)
setCurrentSocketId(socketInstance.id ?? null)
logger.info('Socket reconnected successfully', {
attemptNumber,
socketId: socketInstance.id,
transport: socketInstance.io.engine?.transport?.name,
})
// Note: join-workflow is handled by the useEffect watching isConnected
})
socketInstance.on('reconnect_attempt', (attemptNumber) => {
@@ -319,26 +284,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
})
// Handle join workflow success - confirms room membership with presence list
socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => {
isRejoiningRef.current = false
// Ignore stale success responses from previous navigation
if (workflowId !== urlWorkflowIdRef.current) {
logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`)
return
}
setCurrentWorkflowId(workflowId)
setPresenceUsers(presenceUsers || [])
logger.info(`Successfully joined workflow room: ${workflowId}`, {
presenceCount: presenceUsers?.length || 0,
})
})
socketInstance.on('join-workflow-error', ({ error }) => {
isRejoiningRef.current = false
logger.error('Failed to join workflow:', error)
})
socketInstance.on('workflow-operation', (data) => {
eventHandlers.current.workflowOperation?.(data)
})
@@ -353,13 +298,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('workflow-deleted', (data) => {
logger.warn(`Workflow ${data.workflowId} has been deleted`)
setCurrentWorkflowId((current) => {
if (current === data.workflowId) {
setPresenceUsers([])
return null
}
return current
})
if (currentWorkflowId === data.workflowId) {
setCurrentWorkflowId(null)
setPresenceUsers([])
}
eventHandlers.current.workflowDeleted?.(data)
})
@@ -502,35 +444,25 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('operation-forbidden', (error) => {
logger.warn('Operation forbidden:', error)
})
if (error?.type === 'SESSION_ERROR') {
const workflowId = urlWorkflowIdRef.current
if (workflowId && !isRejoiningRef.current) {
isRejoiningRef.current = true
logger.info(`Session expired, rejoining workflow: ${workflowId}`)
socketInstance.emit('join-workflow', {
workflowId,
tabSessionId: getTabSessionId(),
})
}
}
socketInstance.on('operation-confirmed', (data) => {
logger.debug('Operation confirmed:', data)
})
socketInstance.on('workflow-state', async (workflowData) => {
logger.info('Received workflow state from server')
if (workflowData?.state) {
try {
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
} catch (error) {
logger.error('Error rehydrating workflow state:', error)
}
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
}
})
socketRef.current = socketInstance
setSocket(socketInstance)
return () => {
socketInstance.close()
}
} catch (error) {
logger.error('Failed to initialize socket with token:', error)
setIsConnecting(false)
@@ -545,20 +477,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
positionUpdateTimeouts.current.clear()
pendingPositionUpdates.current.clear()
// Close socket on unmount
if (socketRef.current) {
logger.info('Closing socket connection on unmount')
socketRef.current.close()
socketRef.current = null
}
}
}, [user?.id, authFailed])
}, [user?.id])
useEffect(() => {
if (!socket || !isConnected || !urlWorkflowId) return
// Skip if already in the correct room
if (currentWorkflowId === urlWorkflowId) return
logger.info(
@@ -573,10 +497,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.info(`Joining workflow room: ${urlWorkflowId}`)
socket.emit('join-workflow', {
workflowId: urlWorkflowId,
tabSessionId: getTabSessionId(),
})
setCurrentWorkflowId(urlWorkflowId)
}, [socket, isConnected, urlWorkflowId, currentWorkflowId])
useEffect(() => {
return () => {
if (socket) {
logger.info('Cleaning up socket connection on unmount')
socket.disconnect()
}
}
}, [])
const joinWorkflow = useCallback(
(workflowId: string) => {
if (!socket || !user?.id) {
@@ -597,9 +530,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.info(`Joining workflow: ${workflowId}`)
socket.emit('join-workflow', {
workflowId,
tabSessionId: getTabSessionId(),
})
// currentWorkflowId will be set by join-workflow-success handler
setCurrentWorkflowId(workflowId)
},
[socket, user, currentWorkflowId]
)
@@ -607,13 +539,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const leaveWorkflow = useCallback(() => {
if (socket && currentWorkflowId) {
logger.info(`Leaving workflow: ${currentWorkflowId}`)
import('@/stores/operation-queue/store')
.then(({ useOperationQueueStore }) => {
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
})
.catch((error) => {
logger.warn('Failed to cancel operations for workflow:', error)
})
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
} catch {}
socket.emit('leave-workflow')
setCurrentWorkflowId(null)
setPresenceUsers([])
@@ -626,20 +555,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
}, [socket, currentWorkflowId])
/**
* Retry socket connection after auth failure.
* Call this when user has re-authenticated (e.g., after login redirect).
*/
const retryConnection = useCallback(() => {
if (!authFailed) {
logger.info('retryConnection called but no auth failure - ignoring')
return
}
logger.info('Retrying socket connection after auth failure')
setAuthFailed(false)
// initializedRef.current was already reset in connect_error handler
// Effect will re-run and attempt connection
}, [authFailed])
const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
const pendingPositionUpdates = useRef<Map<string, any>>(new Map())
const emitWorkflowOperation = useCallback(
(operation: string, target: string, payload: any, operationId?: string) => {
@@ -799,6 +716,14 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.selectionUpdate = handler
}, [])
const onUserJoined = useCallback((handler: (data: any) => void) => {
eventHandlers.current.userJoined = handler
}, [])
const onUserLeft = useCallback((handler: (data: any) => void) => {
eventHandlers.current.userLeft = handler
}, [])
const onWorkflowDeleted = useCallback((handler: (data: any) => void) => {
eventHandlers.current.workflowDeleted = handler
}, [])
@@ -820,13 +745,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
authFailed,
currentWorkflowId,
currentSocketId,
presenceUsers,
joinWorkflow,
leaveWorkflow,
retryConnection,
emitWorkflowOperation,
emitSubblockUpdate,
emitVariableUpdate,
@@ -837,6 +760,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onVariableUpdate,
onCursorUpdate,
onSelectionUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,
@@ -846,13 +771,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
authFailed,
currentWorkflowId,
currentSocketId,
presenceUsers,
joinWorkflow,
leaveWorkflow,
retryConnection,
emitWorkflowOperation,
emitSubblockUpdate,
emitVariableUpdate,
@@ -863,6 +786,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onVariableUpdate,
onCursorUpdate,
onSelectionUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,

View File

@@ -119,6 +119,8 @@ export function useCollaborativeWorkflow() {
onWorkflowOperation,
onSubblockUpdate,
onVariableUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,
@@ -482,6 +484,14 @@ export function useCollaborativeWorkflow() {
}
}
const handleUserJoined = (data: any) => {
logger.info(`User joined: ${data.userName}`)
}
const handleUserLeft = (data: any) => {
logger.info(`User left: ${data.userId}`)
}
const handleWorkflowDeleted = (data: any) => {
const { workflowId } = data
logger.warn(`Workflow ${workflowId} has been deleted`)
@@ -590,17 +600,26 @@ export function useCollaborativeWorkflow() {
failOperation(operationId, retryable)
}
// Register event handlers
onWorkflowOperation(handleWorkflowOperation)
onSubblockUpdate(handleSubblockUpdate)
onVariableUpdate(handleVariableUpdate)
onUserJoined(handleUserJoined)
onUserLeft(handleUserLeft)
onWorkflowDeleted(handleWorkflowDeleted)
onWorkflowReverted(handleWorkflowReverted)
onOperationConfirmed(handleOperationConfirmed)
onOperationFailed(handleOperationFailed)
return () => {
// Cleanup handled by socket context
}
}, [
onWorkflowOperation,
onSubblockUpdate,
onVariableUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,

View File

@@ -74,7 +74,6 @@
"@react-email/components": "^0.0.34",
"@react-email/render": "2.0.0",
"@sim/logger": "workspace:*",
"@socket.io/redis-adapter": "8.3.0",
"@t3-oss/env-nextjs": "0.13.4",
"@tanstack/react-query": "5.90.8",
"@tanstack/react-query-devtools": "5.90.2",
@@ -145,7 +144,6 @@
"react-simple-code-editor": "^0.14.1",
"react-window": "2.2.3",
"reactflow": "^11.11.4",
"redis": "5.10.0",
"rehype-autolink-headings": "^7.1.0",
"rehype-slug": "^6.0.0",
"remark-gfm": "4.0.1",

View File

@@ -1,7 +1,5 @@
import type { Server as HttpServer } from 'http'
import { createLogger } from '@sim/logger'
import { createAdapter } from '@socket.io/redis-adapter'
import { createClient, type RedisClientType } from 'redis'
import { Server } from 'socket.io'
import { env } from '@/lib/core/config/env'
import { isProd } from '@/lib/core/config/feature-flags'
@@ -9,16 +7,9 @@ import { getBaseUrl } from '@/lib/core/utils/urls'
const logger = createLogger('SocketIOConfig')
/** Socket.IO ping timeout - how long to wait for pong before considering connection dead */
const PING_TIMEOUT_MS = 60000
/** Socket.IO ping interval - how often to send ping packets */
const PING_INTERVAL_MS = 25000
/** Maximum HTTP buffer size for Socket.IO messages */
const MAX_HTTP_BUFFER_SIZE = 1e6
let adapterPubClient: RedisClientType | null = null
let adapterSubClient: RedisClientType | null = null
/**
* Get allowed origins for Socket.IO CORS configuration
*/
function getAllowedOrigins(): string[] {
const allowedOrigins = [
getBaseUrl(),
@@ -33,10 +24,11 @@ function getAllowedOrigins(): string[] {
}
/**
* Create and configure a Socket.IO server instance.
* If REDIS_URL is configured, adds Redis adapter for cross-pod broadcasting.
* Create and configure a Socket.IO server instance
* @param httpServer - The HTTP server instance to attach Socket.IO to
* @returns Configured Socket.IO server instance
*/
export async function createSocketIOServer(httpServer: HttpServer): Promise<Server> {
export function createSocketIOServer(httpServer: HttpServer): Server {
const allowedOrigins = getAllowedOrigins()
const io = new Server(httpServer, {
@@ -44,110 +36,31 @@ export async function createSocketIOServer(httpServer: HttpServer): Promise<Serv
origin: allowedOrigins,
methods: ['GET', 'POST', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization', 'Cookie', 'socket.io'],
credentials: true,
credentials: true, // Enable credentials to accept cookies
},
transports: ['websocket', 'polling'],
allowEIO3: true,
pingTimeout: PING_TIMEOUT_MS,
pingInterval: PING_INTERVAL_MS,
maxHttpBufferSize: MAX_HTTP_BUFFER_SIZE,
transports: ['websocket', 'polling'], // WebSocket first, polling as fallback
allowEIO3: true, // Keep legacy support for compatibility
pingTimeout: 60000, // Back to original conservative setting
pingInterval: 25000, // Back to original interval
maxHttpBufferSize: 1e6,
cookie: {
name: 'io',
path: '/',
httpOnly: true,
sameSite: 'none',
secure: isProd,
sameSite: 'none', // Required for cross-origin cookies
secure: isProd, // HTTPS in production
},
})
if (env.REDIS_URL) {
logger.info('Configuring Socket.IO Redis adapter...')
const redisOptions = {
url: env.REDIS_URL,
socket: {
reconnectStrategy: (retries: number) => {
if (retries > 10) {
logger.error('Redis adapter reconnection failed after 10 attempts')
return new Error('Redis adapter reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis adapter reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
}
// Create separate clients for pub and sub (recommended for reliability)
adapterPubClient = createClient(redisOptions)
adapterSubClient = createClient(redisOptions)
adapterPubClient.on('error', (err) => {
logger.error('Redis adapter pub client error:', err)
})
adapterSubClient.on('error', (err) => {
logger.error('Redis adapter sub client error:', err)
})
adapterPubClient.on('ready', () => {
logger.info('Redis adapter pub client ready')
})
adapterSubClient.on('ready', () => {
logger.info('Redis adapter sub client ready')
})
await Promise.all([adapterPubClient.connect(), adapterSubClient.connect()])
io.adapter(createAdapter(adapterPubClient, adapterSubClient))
logger.info('Socket.IO Redis adapter connected - cross-pod broadcasting enabled')
} else {
logger.warn('REDIS_URL not configured - running in single-pod mode')
}
logger.info('Socket.IO server configured with:', {
allowedOrigins: allowedOrigins.length,
transports: ['websocket', 'polling'],
pingTimeout: PING_TIMEOUT_MS,
pingInterval: PING_INTERVAL_MS,
maxHttpBufferSize: MAX_HTTP_BUFFER_SIZE,
pingTimeout: 60000,
pingInterval: 25000,
maxHttpBufferSize: 1e6,
cookieSecure: isProd,
corsCredentials: true,
redisAdapter: !!env.REDIS_URL,
})
return io
}
/**
* Clean up Redis adapter connections.
* Call this during graceful shutdown.
*/
export async function shutdownSocketIOAdapter(): Promise<void> {
const closePromises: Promise<void>[] = []
if (adapterPubClient) {
closePromises.push(
adapterPubClient.quit().then(() => {
logger.info('Redis adapter pub client closed')
adapterPubClient = null
})
)
}
if (adapterSubClient) {
closePromises.push(
adapterSubClient.quit().then(() => {
logger.info('Redis adapter sub client closed')
adapterSubClient = null
})
)
}
if (closePromises.length > 0) {
await Promise.all(closePromises)
logger.info('Socket.IO Redis adapter shutdown complete')
}
}

View File

@@ -1,12 +1,17 @@
import { createLogger } from '@sim/logger'
import { cleanupPendingSubblocksForSocket } from '@/socket/handlers/subblocks'
import { cleanupPendingVariablesForSocket } from '@/socket/handlers/variables'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
const logger = createLogger('ConnectionHandlers')
export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
export function setupConnectionHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('error', (error) => {
logger.error(`Socket ${socket.id} error:`, error)
})
@@ -15,22 +20,13 @@ export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager
logger.error(`Socket ${socket.id} connection error:`, error)
})
socket.on('disconnect', async (reason) => {
try {
// Clean up pending debounce entries for this socket to prevent memory leaks
cleanupPendingSubblocksForSocket(socket.id)
cleanupPendingVariablesForSocket(socket.id)
socket.on('disconnect', (reason) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
const workflowId = await roomManager.removeUserFromRoom(socket.id)
if (workflowId) {
await roomManager.broadcastPresenceUpdate(workflowId)
logger.info(
`Socket ${socket.id} disconnected from workflow ${workflowId} (reason: ${reason})`
)
}
} catch (error) {
logger.error(`Error handling disconnect for socket ${socket.id}:`, error)
if (workflowId && session) {
roomManager.cleanupUserFromRoom(socket.id, workflowId)
roomManager.broadcastPresenceUpdate(workflowId)
}
})
}

View File

@@ -5,9 +5,16 @@ import { setupSubblocksHandlers } from '@/socket/handlers/subblocks'
import { setupVariablesHandlers } from '@/socket/handlers/variables'
import { setupWorkflowHandlers } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager, UserPresence, WorkflowRoom } from '@/socket/rooms/manager'
export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
export type { UserPresence, WorkflowRoom }
/**
* Sets up all socket event handlers for an authenticated socket connection
* @param socket - The authenticated socket instance
* @param roomManager - Room manager instance for state management
*/
export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: RoomManager) {
setupWorkflowHandlers(socket, roomManager)
setupOperationsHandlers(socket, roomManager)
setupSubblocksHandlers(socket, roomManager)
@@ -15,3 +22,12 @@ export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: IRoom
setupPresenceHandlers(socket, roomManager)
setupConnectionHandlers(socket, roomManager)
}
export {
setupWorkflowHandlers,
setupOperationsHandlers,
setupSubblocksHandlers,
setupVariablesHandlers,
setupPresenceHandlers,
setupConnectionHandlers,
}

View File

@@ -10,41 +10,38 @@ import {
WORKFLOW_OPERATIONS,
} from '@/socket/constants'
import { persistWorkflowOperation } from '@/socket/database/operations'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { checkRolePermission } from '@/socket/middleware/permissions'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
const logger = createLogger('OperationsHandlers')
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
export function setupOperationsHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('workflow-operation', async (data) => {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
socket.emit('error', {
type: 'NOT_JOINED',
message: 'Not joined to any workflow',
})
if (data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
}
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
socket.emit('operation-forbidden', {
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
socket.emit('error', {
type: 'ROOM_NOT_FOUND',
message: 'Workflow room not found',
})
if (data?.operationId) {
socket.emit('operation-failed', {
operationId: data.operationId,
error: 'Workflow room not found',
})
}
return
}
@@ -63,18 +60,16 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
isPositionUpdate && 'commit' in payload ? payload.commit === true : false
const operationTimestamp = isPositionUpdate ? timestamp : Date.now()
// Get user presence for permission checking
const users = await roomManager.getWorkflowUsers(workflowId)
const userPresence = users.find((u) => u.socketId === socket.id)
// Skip permission checks for non-committed position updates (broadcasts only, no persistence)
if (isPositionUpdate && !commitPositionUpdate) {
// Update last activity
const userPresence = room.users.get(socket.id)
if (userPresence) {
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
userPresence.lastActivity = Date.now()
}
} else {
// Check permissions from cached role for all other operations
const userPresence = room.users.get(socket.id)
if (!userPresence) {
logger.warn(`User presence not found for socket ${socket.id}`)
socket.emit('operation-forbidden', {
@@ -83,13 +78,10 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
operation,
target,
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'User session not found' })
}
return
}
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
userPresence.lastActivity = Date.now()
// Check permissions using cached role (no DB query)
const permissionCheck = checkRolePermission(userPresence.role, operation)
@@ -140,7 +132,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
timestamp: operationTimestamp,
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
if (operationId) {
socket.emit('operation-confirmed', {
@@ -186,7 +178,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
timestamp: operationTimestamp,
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
if (operationId) {
socket.emit('operation-confirmed', { operationId, serverTimestamp: Date.now() })
@@ -219,7 +211,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
const broadcastData = {
operation,
@@ -259,7 +251,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
const broadcastData = {
operation,
@@ -296,7 +288,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -328,7 +320,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -357,7 +349,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -389,7 +381,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -421,7 +413,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -453,7 +445,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -482,7 +474,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -511,24 +503,27 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
const broadcastData = {
operation,
target,
payload,
timestamp: operationTimestamp,
timestamp: operationTimestamp, // Preserve client timestamp for position updates
senderId: socket.id,
userId: session.userId,
userName: session.userName,
// Add operation metadata for better client handling
metadata: {
workflowId,
operationId: crypto.randomUUID(),
isPositionUpdate, // Flag to help clients handle position updates specially
},
}
socket.to(workflowId).emit('workflow-operation', broadcastData)
// Emit confirmation if operationId is provided
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
@@ -538,14 +533,16 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'
// Emit operation-failed for queue-tracked operations
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: errorMessage,
retryable: !(error instanceof ZodError),
retryable: !(error instanceof ZodError), // Don't retry validation errors
})
}
// Also emit legacy operation-error for backward compatibility
if (error instanceof ZodError) {
socket.emit('operation-error', {
type: 'VALIDATION_ERROR',
@@ -556,6 +553,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
})
logger.warn(`Validation error for operation from ${session.userId}:`, error.errors)
} else if (error instanceof Error) {
// Handle specific database errors
if (error.message.includes('not found')) {
socket.emit('operation-error', {
type: 'RESOURCE_NOT_FOUND',

View File

@@ -1,53 +1,62 @@
import { createLogger } from '@sim/logger'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
const logger = createLogger('PresenceHandlers')
export function setupPresenceHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('cursor-update', async ({ cursor }) => {
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
export function setupPresenceHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('cursor-update', ({ cursor }) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) return
if (!workflowId || !session) return
// Update cursor in room state
await roomManager.updateUserActivity(workflowId, socket.id, { cursor })
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) return
// Broadcast to other users in the room
socket.to(workflowId).emit('cursor-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
cursor,
})
} catch (error) {
logger.error(`Error handling cursor update for socket ${socket.id}:`, error)
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.cursor = cursor
userPresence.lastActivity = Date.now()
}
socket.to(workflowId).emit('cursor-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
cursor,
})
})
socket.on('selection-update', async ({ selection }) => {
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
// Handle user selection (for showing what block/element a user has selected)
socket.on('selection-update', ({ selection }) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) return
if (!workflowId || !session) return
// Update selection in room state
await roomManager.updateUserActivity(workflowId, socket.id, { selection })
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) return
// Broadcast to other users in the room
socket.to(workflowId).emit('selection-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
selection,
})
} catch (error) {
logger.error(`Error handling selection update for socket ${socket.id}:`, error)
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.selection = selection
userPresence.lastActivity = Date.now()
}
socket.to(workflowId).emit('selection-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
selection,
})
})
}

View File

@@ -2,14 +2,12 @@ import { db } from '@sim/db'
import { workflow, workflowBlocks } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
const logger = createLogger('SubblocksHandlers')
/** Debounce interval for coalescing rapid subblock updates before persisting */
const DEBOUNCE_INTERVAL_MS = 25
type PendingSubblock = {
latest: { blockId: string; subblockId: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
@@ -20,61 +18,44 @@ type PendingSubblock = {
// Keyed by `${workflowId}:${blockId}:${subblockId}`
const pendingSubblockUpdates = new Map<string, PendingSubblock>()
/**
* Cleans up pending updates for a disconnected socket.
* Removes the socket's operationIds from pending updates to prevent memory leaks.
*/
export function cleanupPendingSubblocksForSocket(socketId: string): void {
for (const [, pending] of pendingSubblockUpdates.entries()) {
// Remove this socket's operation entries
for (const [opId, sid] of pending.opToSocket.entries()) {
if (sid === socketId) {
pending.opToSocket.delete(opId)
}
}
// If no more operations are waiting, the timeout will still fire and flush
// This is fine - the update will still persist, just no confirmation to send
}
}
export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
export function setupSubblocksHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('subblock-update', async (data) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { blockId, subblockId, value, timestamp, operationId } = data
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
socketId: socket.id,
workflowId,
blockId,
subblockId,
})
return
}
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'Session expired' })
}
return
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.lastActivity = Date.now()
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
socketId: socket.id,
workflowId,
blockId,
subblockId,
})
return
}
// Update user activity
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
// Server-side debounce/coalesce by workflowId+blockId+subblockId
const debouncedKey = `${workflowId}:${blockId}:${subblockId}`
const existing = pendingSubblockUpdates.get(debouncedKey)
@@ -85,7 +66,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
existing.timeout = setTimeout(async () => {
await flushSubblockUpdate(workflowId, existing, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}, DEBOUNCE_INTERVAL_MS)
}, 25)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
@@ -95,7 +76,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
await flushSubblockUpdate(workflowId, pending, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}
}, DEBOUNCE_INTERVAL_MS)
}, 25)
pendingSubblockUpdates.set(debouncedKey, {
latest: { blockId, subblockId, value, timestamp },
timeout,
@@ -107,6 +88,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
// Best-effort failure for the single operation if provided
if (operationId) {
socket.emit('operation-failed', {
operationId,
@@ -115,6 +97,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
})
}
// Also emit legacy operation-error for backward compatibility
socket.emit('operation-error', {
type: 'SUBBLOCK_UPDATE_FAILED',
message: `Failed to update subblock ${blockId}.${subblockId}: ${errorMessage}`,
@@ -128,11 +111,9 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
async function flushSubblockUpdate(
workflowId: string,
pending: PendingSubblock,
roomManager: IRoomManager
roomManager: RoomManager
) {
const { blockId, subblockId, value, timestamp } = pending.latest
const io = roomManager.io
try {
// Verify workflow still exists
const workflowExists = await db
@@ -143,11 +124,14 @@ async function flushSubblockUpdate(
if (workflowExists.length === 0) {
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
}
})
return
}
@@ -180,48 +164,60 @@ async function flushSubblockUpdate(
})
if (updateSuccessful) {
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
} else {
io.to(workflowId).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
const senderSocketIds = new Set(pending.opToSocket.values())
const io = (roomManager as any).io
if (io) {
// Get all sockets in the room
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
if (roomSockets) {
roomSockets.forEach((socketId: string) => {
// Only emit to sockets that didn't send any of the coalesced ops
if (!senderSocketIds.has(socketId)) {
const sock = io.sockets.sockets.get(socketId)
if (sock) {
sock.emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
}
}
})
}
}
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)
// Confirm all coalesced operationIds
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-confirmed', {
operationId: opId,
serverTimestamp: Date.now(),
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
}
})
} else {
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Block no longer exists',
retryable: false,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Block no longer exists',
retryable: false,
})
}
})
}
} catch (error) {
logger.error('Error flushing subblock update:', error)
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
}
})
}
}

View File

@@ -2,14 +2,12 @@ import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
const logger = createLogger('VariablesHandlers')
/** Debounce interval for coalescing rapid variable updates before persisting */
const DEBOUNCE_INTERVAL_MS = 25
type PendingVariable = {
latest: { variableId: string; field: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
@@ -19,58 +17,45 @@ type PendingVariable = {
// Keyed by `${workflowId}:${variableId}:${field}`
const pendingVariableUpdates = new Map<string, PendingVariable>()
/**
* Cleans up pending updates for a disconnected socket.
* Removes the socket's operationIds from pending updates to prevent memory leaks.
*/
export function cleanupPendingVariablesForSocket(socketId: string): void {
for (const [, pending] of pendingVariableUpdates.entries()) {
for (const [opId, sid] of pending.opToSocket.entries()) {
if (sid === socketId) {
pending.opToSocket.delete(opId)
}
}
}
}
export function setupVariablesHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('variable-update', async (data) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { variableId, field, value, timestamp, operationId } = data
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
logger.debug(`Ignoring variable update: workflow room not found`, {
socketId: socket.id,
workflowId,
variableId,
field,
})
return
}
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'Session expired' })
}
return
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.lastActivity = Date.now()
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring variable update: workflow room not found`, {
socketId: socket.id,
workflowId,
variableId,
field,
})
return
}
// Update user activity
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
const debouncedKey = `${workflowId}:${variableId}:${field}`
const existing = pendingVariableUpdates.get(debouncedKey)
if (existing) {
@@ -80,7 +65,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
existing.timeout = setTimeout(async () => {
await flushVariableUpdate(workflowId, existing, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}, DEBOUNCE_INTERVAL_MS)
}, 25)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
@@ -90,7 +75,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
await flushVariableUpdate(workflowId, pending, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}
}, DEBOUNCE_INTERVAL_MS)
}, 25)
pendingVariableUpdates.set(debouncedKey, {
latest: { variableId, field, value, timestamp },
timeout,
@@ -123,11 +108,9 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
async function flushVariableUpdate(
workflowId: string,
pending: PendingVariable,
roomManager: IRoomManager
roomManager: RoomManager
) {
const { variableId, field, value, timestamp } = pending.latest
const io = roomManager.io
try {
const workflowExists = await db
.select({ id: workflow.id })
@@ -137,11 +120,14 @@ async function flushVariableUpdate(
if (workflowExists.length === 0) {
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
}
})
return
}
@@ -177,50 +163,59 @@ async function flushVariableUpdate(
})
if (updateSuccessful) {
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
} else {
io.to(workflowId).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
const senderSocketIds = new Set(pending.opToSocket.values())
const io = (roomManager as any).io
if (io) {
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
if (roomSockets) {
roomSockets.forEach((socketId: string) => {
if (!senderSocketIds.has(socketId)) {
const sock = io.sockets.sockets.get(socketId)
if (sock) {
sock.emit('variable-update', {
variableId,
field,
value,
timestamp,
})
}
}
})
}
}
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-confirmed', {
operationId: opId,
serverTimestamp: Date.now(),
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
}
})
logger.debug(`Flushed variable update ${workflowId}: ${variableId}.${field}`)
} else {
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Variable no longer exists',
retryable: false,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Variable no longer exists',
retryable: false,
})
}
})
}
} catch (error) {
logger.error('Error flushing variable update:', error)
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
}
})
}
}

View File

@@ -4,12 +4,38 @@ import { eq } from 'drizzle-orm'
import { getWorkflowState } from '@/socket/database/operations'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { verifyWorkflowAccess } from '@/socket/middleware/permissions'
import type { IRoomManager, UserPresence } from '@/socket/rooms'
import type { RoomManager, UserPresence, WorkflowRoom } from '@/socket/rooms/manager'
const logger = createLogger('WorkflowHandlers')
export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('join-workflow', async ({ workflowId, tabSessionId }) => {
export type { UserPresence, WorkflowRoom }
export interface HandlerDependencies {
roomManager: RoomManager
}
export const createWorkflowRoom = (workflowId: string): WorkflowRoom => ({
workflowId,
users: new Map(),
lastModified: Date.now(),
activeConnections: 0,
})
export const cleanupUserFromRoom = (
socketId: string,
workflowId: string,
roomManager: RoomManager
) => {
roomManager.cleanupUserFromRoom(socketId, workflowId)
}
export function setupWorkflowHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('join-workflow', async ({ workflowId }) => {
try {
const userId = socket.userId
const userName = socket.userName
@@ -22,7 +48,6 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
// Verify workflow access
let userRole: string
try {
const accessInfo = await verifyWorkflowAccess(userId, workflowId)
@@ -38,37 +63,23 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
return
}
// Leave current room if in one
const currentWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const currentWorkflowId = roomManager.getWorkflowIdForSocket(socket.id)
if (currentWorkflowId) {
socket.leave(currentWorkflowId)
await roomManager.removeUserFromRoom(socket.id)
await roomManager.broadcastPresenceUpdate(currentWorkflowId)
roomManager.cleanupUserFromRoom(socket.id, currentWorkflowId)
roomManager.broadcastPresenceUpdate(currentWorkflowId)
}
const STALE_THRESHOLD_MS = 60_000
const now = Date.now()
const existingUsers = await roomManager.getWorkflowUsers(workflowId)
for (const existingUser of existingUsers) {
if (existingUser.userId === userId && existingUser.socketId !== socket.id) {
const isSameTab = tabSessionId && existingUser.tabSessionId === tabSessionId
const isStale =
now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS
if (isSameTab || isStale) {
logger.info(
`Cleaning up socket ${existingUser.socketId} for user ${userId} (${isSameTab ? 'same tab' : 'stale'})`
)
await roomManager.removeUserFromRoom(existingUser.socketId)
roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
}
}
}
// Join the new room
socket.join(workflowId)
// Get avatar URL
if (!roomManager.hasWorkflowRoom(workflowId)) {
roomManager.setWorkflowRoom(workflowId, roomManager.createWorkflowRoom(workflowId))
}
const room = roomManager.getWorkflowRoom(workflowId)!
room.activeConnections++
let avatarUrl = socket.userImage || null
if (!avatarUrl) {
try {
@@ -84,68 +95,54 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
}
}
// Create presence entry
const userPresence: UserPresence = {
userId,
workflowId,
userName,
socketId: socket.id,
tabSessionId,
joinedAt: Date.now(),
lastActivity: Date.now(),
role: userRole,
avatarUrl,
}
// Add user to room
await roomManager.addUserToRoom(workflowId, socket.id, userPresence)
// Get current presence list for the join acknowledgment
const presenceUsers = await roomManager.getWorkflowUsers(workflowId)
// Get workflow state
const workflowState = await getWorkflowState(workflowId)
// Send join success with presence list (client waits for this to confirm join)
socket.emit('join-workflow-success', {
workflowId,
socketId: socket.id,
presenceUsers,
room.users.set(socket.id, userPresence)
roomManager.setWorkflowForSocket(socket.id, workflowId)
roomManager.setUserSession(socket.id, {
userId,
userName,
avatarUrl,
})
// Send workflow state
const workflowState = await getWorkflowState(workflowId)
socket.emit('workflow-state', workflowState)
// Broadcast presence update to all users in the room
await roomManager.broadcastPresenceUpdate(workflowId)
roomManager.broadcastPresenceUpdate(workflowId)
const uniqueUserCount = await roomManager.getUniqueUserCount(workflowId)
const uniqueUserCount = roomManager.getUniqueUserCount(workflowId)
logger.info(
`User ${userId} (${userName}) joined workflow ${workflowId}. Room now has ${uniqueUserCount} unique users.`
`User ${userId} (${userName}) joined workflow ${workflowId}. Room now has ${uniqueUserCount} unique users (${room.activeConnections} connections).`
)
} catch (error) {
logger.error('Error joining workflow:', error)
// Undo socket.join and room manager entry if any operation failed
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
socket.emit('error', {
type: 'JOIN_ERROR',
message: 'Failed to join workflow',
})
}
})
socket.on('leave-workflow', async () => {
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
socket.on('leave-workflow', () => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (workflowId && session) {
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
await roomManager.broadcastPresenceUpdate(workflowId)
if (workflowId && session) {
socket.leave(workflowId)
roomManager.cleanupUserFromRoom(socket.id, workflowId)
logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`)
}
} catch (error) {
logger.error('Error leaving workflow:', error)
roomManager.broadcastPresenceUpdate(workflowId)
logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`)
}
})
}

View File

@@ -7,7 +7,7 @@ import { createServer, request as httpRequest } from 'http'
import { createMockLogger, databaseMock } from '@sim/testing'
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'
import { createSocketIOServer } from '@/socket/config/socket'
import { MemoryRoomManager } from '@/socket/rooms'
import { RoomManager } from '@/socket/rooms/manager'
import { createHttpHandler } from '@/socket/routes/http'
vi.mock('@/lib/auth', () => ({
@@ -20,30 +20,6 @@ vi.mock('@/lib/auth', () => ({
vi.mock('@sim/db', () => databaseMock)
// Mock redis package to prevent actual Redis connections
vi.mock('redis', () => ({
createClient: vi.fn(() => ({
on: vi.fn(),
connect: vi.fn().mockResolvedValue(undefined),
quit: vi.fn().mockResolvedValue(undefined),
duplicate: vi.fn().mockReturnThis(),
})),
}))
// Mock env to not have REDIS_URL (use importOriginal to get helper functions)
vi.mock('@/lib/core/config/env', async (importOriginal) => {
const actual = await importOriginal<typeof import('@/lib/core/config/env')>()
return {
...actual,
env: {
...actual.env,
DATABASE_URL: 'postgres://localhost/test',
NODE_ENV: 'test',
REDIS_URL: undefined,
},
}
})
vi.mock('@/socket/middleware/auth', () => ({
authenticateSocket: vi.fn((socket, next) => {
socket.userId = 'test-user-id'
@@ -75,7 +51,7 @@ vi.mock('@/socket/database/operations', () => ({
describe('Socket Server Index Integration', () => {
let httpServer: any
let io: any
let roomManager: MemoryRoomManager
let roomManager: RoomManager
let logger: ReturnType<typeof createMockLogger>
let PORT: number
@@ -88,10 +64,9 @@ describe('Socket Server Index Integration', () => {
httpServer = createServer()
io = await createSocketIOServer(httpServer)
io = createSocketIOServer(httpServer)
roomManager = new MemoryRoomManager(io)
await roomManager.initialize()
roomManager = new RoomManager(io)
const httpHandler = createHttpHandler(roomManager, logger)
httpServer.on('request', httpHandler)
@@ -123,9 +98,6 @@ describe('Socket Server Index Integration', () => {
}, 20000)
afterEach(async () => {
if (roomManager) {
await roomManager.shutdown()
}
if (io) {
await new Promise<void>((resolve) => {
io.close(() => resolve())
@@ -205,60 +177,43 @@ describe('Socket Server Index Integration', () => {
})
describe('Room Manager Integration', () => {
it('should create room manager successfully', async () => {
it('should create room manager successfully', () => {
expect(roomManager).toBeDefined()
expect(await roomManager.getTotalActiveConnections()).toBe(0)
expect(roomManager.getTotalActiveConnections()).toBe(0)
})
it('should add and get users from workflow rooms', async () => {
it('should create workflow rooms', () => {
const workflowId = 'test-workflow-123'
const socketId = 'test-socket-123'
const room = roomManager.createWorkflowRoom(workflowId)
roomManager.setWorkflowRoom(workflowId, room)
const presence = {
userId: 'user-123',
workflowId,
userName: 'Test User',
socketId,
joinedAt: Date.now(),
lastActivity: Date.now(),
role: 'admin',
}
await roomManager.addUserToRoom(workflowId, socketId, presence)
expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(true)
const users = await roomManager.getWorkflowUsers(workflowId)
expect(users).toHaveLength(1)
expect(users[0].socketId).toBe(socketId)
expect(roomManager.hasWorkflowRoom(workflowId)).toBe(true)
const retrievedRoom = roomManager.getWorkflowRoom(workflowId)
expect(retrievedRoom).toBeDefined()
expect(retrievedRoom?.workflowId).toBe(workflowId)
})
it('should manage user sessions', async () => {
it('should manage user sessions', () => {
const socketId = 'test-socket-123'
const workflowId = 'test-workflow-456'
const session = { userId: 'user-123', userName: 'Test User' }
const presence = {
userId: 'user-123',
workflowId,
userName: 'Test User',
socketId,
joinedAt: Date.now(),
lastActivity: Date.now(),
role: 'admin',
}
roomManager.setWorkflowForSocket(socketId, workflowId)
roomManager.setUserSession(socketId, session)
await roomManager.addUserToRoom(workflowId, socketId, presence)
expect(await roomManager.getWorkflowIdForSocket(socketId)).toBe(workflowId)
const session = await roomManager.getUserSession(socketId)
expect(session).toBeDefined()
expect(session?.userId).toBe('user-123')
expect(roomManager.getWorkflowIdForSocket(socketId)).toBe(workflowId)
expect(roomManager.getUserSession(socketId)).toEqual(session)
})
it('should clean up rooms properly', async () => {
it('should clean up rooms properly', () => {
const workflowId = 'test-workflow-789'
const socketId = 'test-socket-789'
const presence = {
const room = roomManager.createWorkflowRoom(workflowId)
roomManager.setWorkflowRoom(workflowId, room)
// Add user to room
room.users.set(socketId, {
userId: 'user-789',
workflowId,
userName: 'Test User',
@@ -266,18 +221,16 @@ describe('Socket Server Index Integration', () => {
joinedAt: Date.now(),
lastActivity: Date.now(),
role: 'admin',
}
})
room.activeConnections = 1
await roomManager.addUserToRoom(workflowId, socketId, presence)
roomManager.setWorkflowForSocket(socketId, workflowId)
expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(true)
// Clean up user
roomManager.cleanupUserFromRoom(socketId, workflowId)
// Remove user
await roomManager.removeUserFromRoom(socketId)
// Room should be cleaned up since it's now empty
expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(false)
expect(await roomManager.getWorkflowIdForSocket(socketId)).toBeNull()
expect(roomManager.hasWorkflowRoom(workflowId)).toBe(false)
expect(roomManager.getWorkflowIdForSocket(socketId)).toBeUndefined()
})
})
@@ -285,7 +238,7 @@ describe('Socket Server Index Integration', () => {
it.concurrent('should properly import all extracted modules', async () => {
const { createSocketIOServer } = await import('@/socket/config/socket')
const { createHttpHandler } = await import('@/socket/routes/http')
const { MemoryRoomManager, RedisRoomManager } = await import('@/socket/rooms')
const { RoomManager } = await import('@/socket/rooms/manager')
const { authenticateSocket } = await import('@/socket/middleware/auth')
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
const { getWorkflowState } = await import('@/socket/database/operations')
@@ -293,23 +246,22 @@ describe('Socket Server Index Integration', () => {
expect(createSocketIOServer).toBeTypeOf('function')
expect(createHttpHandler).toBeTypeOf('function')
expect(MemoryRoomManager).toBeTypeOf('function')
expect(RedisRoomManager).toBeTypeOf('function')
expect(RoomManager).toBeTypeOf('function')
expect(authenticateSocket).toBeTypeOf('function')
expect(verifyWorkflowAccess).toBeTypeOf('function')
expect(getWorkflowState).toBeTypeOf('function')
expect(WorkflowOperationSchema).toBeDefined()
})
it.concurrent('should maintain all original functionality after refactoring', async () => {
it.concurrent('should maintain all original functionality after refactoring', () => {
expect(httpServer).toBeDefined()
expect(io).toBeDefined()
expect(roomManager).toBeDefined()
expect(typeof roomManager.addUserToRoom).toBe('function')
expect(typeof roomManager.removeUserFromRoom).toBe('function')
expect(typeof roomManager.createWorkflowRoom).toBe('function')
expect(typeof roomManager.cleanupUserFromRoom).toBe('function')
expect(typeof roomManager.handleWorkflowDeletion).toBe('function')
expect(typeof roomManager.broadcastPresenceUpdate).toBe('function')
expect(typeof roomManager.validateWorkflowConsistency).toBe('function')
})
})
@@ -334,7 +286,6 @@ describe('Socket Server Index Integration', () => {
it('should have shutdown capability', () => {
expect(typeof httpServer.close).toBe('function')
expect(typeof io.close).toBe('function')
expect(typeof roomManager.shutdown).toBe('function')
})
})

View File

@@ -1,125 +1,112 @@
import { createServer } from 'http'
import { createLogger } from '@sim/logger'
import type { Server as SocketIOServer } from 'socket.io'
import { env } from '@/lib/core/config/env'
import { createSocketIOServer, shutdownSocketIOAdapter } from '@/socket/config/socket'
import { createSocketIOServer } from '@/socket/config/socket'
import { setupAllHandlers } from '@/socket/handlers'
import { type AuthenticatedSocket, authenticateSocket } from '@/socket/middleware/auth'
import { type IRoomManager, MemoryRoomManager, RedisRoomManager } from '@/socket/rooms'
import { RoomManager } from '@/socket/rooms/manager'
import { createHttpHandler } from '@/socket/routes/http'
const logger = createLogger('CollaborativeSocketServer')
/** Maximum time to wait for graceful shutdown before forcing exit */
const SHUTDOWN_TIMEOUT_MS = 10000
// Enhanced server configuration - HTTP server will be configured with handler after all dependencies are set up
const httpServer = createServer()
async function createRoomManager(io: SocketIOServer): Promise<IRoomManager> {
if (env.REDIS_URL) {
logger.info('Initializing Redis-backed RoomManager for multi-pod support')
const manager = new RedisRoomManager(io, env.REDIS_URL)
await manager.initialize()
return manager
}
const io = createSocketIOServer(httpServer)
logger.warn('No REDIS_URL configured - using in-memory RoomManager (single-pod only)')
const manager = new MemoryRoomManager(io)
await manager.initialize()
return manager
}
// Initialize room manager after io is created
const roomManager = new RoomManager(io)
async function main() {
const httpServer = createServer()
const PORT = Number(env.PORT || env.SOCKET_PORT || 3002)
io.use(authenticateSocket)
logger.info('Starting Socket.IO server...', {
port: PORT,
nodeEnv: env.NODE_ENV,
hasDatabase: !!env.DATABASE_URL,
hasAuth: !!env.BETTER_AUTH_SECRET,
hasRedis: !!env.REDIS_URL,
const httpHandler = createHttpHandler(roomManager, logger)
httpServer.on('request', httpHandler)
process.on('uncaughtException', (error) => {
logger.error('Uncaught Exception:', error)
// Don't exit in production, just log
})
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection at:', promise, 'reason:', reason)
})
httpServer.on('error', (error) => {
logger.error('HTTP server error:', error)
})
io.engine.on('connection_error', (err) => {
logger.error('Socket.IO connection error:', {
req: err.req?.url,
code: err.code,
message: err.message,
context: err.context,
})
})
// Create Socket.IO server with Redis adapter if configured
const io = await createSocketIOServer(httpServer)
io.on('connection', (socket: AuthenticatedSocket) => {
logger.info(`New socket connection: ${socket.id}`)
// Initialize room manager (Redis or in-memory based on config)
const roomManager = await createRoomManager(io)
setupAllHandlers(socket, roomManager)
})
// Set up authentication middleware
io.use(authenticateSocket)
// Set up HTTP handler for health checks and internal APIs
const httpHandler = createHttpHandler(roomManager, logger)
httpServer.on('request', httpHandler)
// Global error handlers
process.on('uncaughtException', (error) => {
logger.error('Uncaught Exception:', error)
httpServer.on('request', (req, res) => {
logger.info(`🌐 HTTP Request: ${req.method} ${req.url}`, {
method: req.method,
url: req.url,
userAgent: req.headers['user-agent'],
origin: req.headers.origin,
host: req.headers.host,
timestamp: new Date().toISOString(),
})
})
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection at:', promise, 'reason:', reason)
io.engine.on('connection_error', (err) => {
logger.error('❌ Engine.IO Connection error:', {
code: err.code,
message: err.message,
context: err.context,
req: err.req
? {
url: err.req.url,
method: err.req.method,
headers: err.req.headers,
}
: 'No request object',
})
})
httpServer.on('error', (error: NodeJS.ErrnoException) => {
logger.error('HTTP server error:', error)
if (error.code === 'EADDRINUSE' || error.code === 'EACCES') {
process.exit(1)
}
})
const PORT = Number(env.PORT || env.SOCKET_PORT || 3002)
io.engine.on('connection_error', (err) => {
logger.error('Socket.IO connection error:', {
req: err.req?.url,
code: err.code,
message: err.message,
context: err.context,
})
})
logger.info('Starting Socket.IO server...', {
port: PORT,
nodeEnv: env.NODE_ENV,
hasDatabase: !!env.DATABASE_URL,
hasAuth: !!env.BETTER_AUTH_SECRET,
})
io.on('connection', (socket: AuthenticatedSocket) => {
logger.info(`New socket connection: ${socket.id}`)
setupAllHandlers(socket, roomManager)
})
httpServer.listen(PORT, '0.0.0.0', () => {
logger.info(`Socket.IO server running on port ${PORT}`)
logger.info(`🏥 Health check available at: http://localhost:${PORT}/health`)
})
httpServer.listen(PORT, '0.0.0.0', () => {
logger.info(`Socket.IO server running on port ${PORT}`)
logger.info(`Health check available at: http://localhost:${PORT}/health`)
})
const shutdown = async () => {
logger.info('Shutting down Socket.IO server...')
try {
await roomManager.shutdown()
logger.info('RoomManager shutdown complete')
} catch (error) {
logger.error('Error during RoomManager shutdown:', error)
}
try {
await shutdownSocketIOAdapter()
} catch (error) {
logger.error('Error during Socket.IO adapter shutdown:', error)
}
httpServer.close(() => {
logger.info('Socket.IO server closed')
process.exit(0)
})
setTimeout(() => {
logger.error('Forced shutdown after timeout')
process.exit(1)
}, SHUTDOWN_TIMEOUT_MS)
}
process.on('SIGINT', shutdown)
process.on('SIGTERM', shutdown)
}
// Start the server
main().catch((error) => {
logger.error('Failed to start server:', error)
httpServer.on('error', (error) => {
logger.error('❌ Server failed to start:', error)
process.exit(1)
})
process.on('SIGINT', () => {
logger.info('Shutting down Socket.IO server...')
httpServer.close(() => {
logger.info('Socket.IO server closed')
process.exit(0)
})
})
process.on('SIGTERM', () => {
logger.info('Shutting down Socket.IO server...')
httpServer.close(() => {
logger.info('Socket.IO server closed')
process.exit(0)
})
})

View File

@@ -21,7 +21,7 @@ export interface AuthenticatedSocket extends Socket {
* Socket.IO authentication middleware.
* Handles both anonymous mode (DISABLE_AUTH=true) and normal token-based auth.
*/
export async function authenticateSocket(socket: AuthenticatedSocket, next: (err?: Error) => void) {
export async function authenticateSocket(socket: AuthenticatedSocket, next: any) {
try {
if (isAuthDisabled) {
socket.userId = ANONYMOUS_USER_ID

View File

@@ -73,7 +73,7 @@ export function checkRolePermission(
return { allowed: true }
}
async function verifyWorkspaceMembership(
export async function verifyWorkspaceMembership(
userId: string,
workspaceId: string
): Promise<string | null> {

View File

@@ -1,3 +0,0 @@
export { MemoryRoomManager } from '@/socket/rooms/memory-manager'
export { RedisRoomManager } from '@/socket/rooms/redis-manager'
export type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/socket/rooms/types'

View File

@@ -0,0 +1,291 @@
import * as schema from '@sim/db/schema'
import { workflowBlocks, workflowEdges } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull } from 'drizzle-orm'
import { drizzle } from 'drizzle-orm/postgres-js'
import postgres from 'postgres'
import type { Server } from 'socket.io'
import { env } from '@/lib/core/config/env'
const connectionString = env.DATABASE_URL
const db = drizzle(
postgres(connectionString, {
prepare: false,
idle_timeout: 15,
connect_timeout: 20,
max: 3,
onnotice: () => {},
}),
{ schema }
)
const logger = createLogger('RoomManager')
export interface UserPresence {
userId: string
workflowId: string
userName: string
socketId: string
joinedAt: number
lastActivity: number
role: string
cursor?: { x: number; y: number }
selection?: { type: 'block' | 'edge' | 'none'; id?: string }
avatarUrl?: string | null
}
export interface WorkflowRoom {
workflowId: string
users: Map<string, UserPresence> // socketId -> UserPresence
lastModified: number
activeConnections: number
}
export class RoomManager {
private workflowRooms = new Map<string, WorkflowRoom>()
private socketToWorkflow = new Map<string, string>()
private userSessions = new Map<
string,
{ userId: string; userName: string; avatarUrl?: string | null }
>()
private io: Server
constructor(io: Server) {
this.io = io
}
createWorkflowRoom(workflowId: string): WorkflowRoom {
return {
workflowId,
users: new Map(),
lastModified: Date.now(),
activeConnections: 0,
}
}
cleanupUserFromRoom(socketId: string, workflowId: string) {
const room = this.workflowRooms.get(workflowId)
if (room) {
room.users.delete(socketId)
room.activeConnections = Math.max(0, room.activeConnections - 1)
if (room.activeConnections === 0) {
this.workflowRooms.delete(workflowId)
logger.info(`Cleaned up empty workflow room: ${workflowId}`)
}
}
this.socketToWorkflow.delete(socketId)
this.userSessions.delete(socketId)
}
handleWorkflowDeletion(workflowId: string) {
logger.info(`Handling workflow deletion notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for deleted workflow ${workflowId}`)
return
}
this.io.to(workflowId).emit('workflow-deleted', {
workflowId,
message: 'This workflow has been deleted',
timestamp: Date.now(),
})
const socketsToDisconnect: string[] = []
room.users.forEach((_presence, socketId) => {
socketsToDisconnect.push(socketId)
})
socketsToDisconnect.forEach((socketId) => {
const socket = this.io.sockets.sockets.get(socketId)
if (socket) {
socket.leave(workflowId)
logger.debug(`Disconnected socket ${socketId} from deleted workflow ${workflowId}`)
}
this.cleanupUserFromRoom(socketId, workflowId)
})
this.workflowRooms.delete(workflowId)
logger.info(
`Cleaned up workflow room ${workflowId} after deletion (${socketsToDisconnect.length} users disconnected)`
)
}
handleWorkflowRevert(workflowId: string, timestamp: number) {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this.io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`)
}
handleWorkflowUpdate(workflowId: string) {
logger.info(`Handling workflow update notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for updated workflow ${workflowId}`)
return
}
const timestamp = Date.now()
// Notify all clients in the workflow room that the workflow has been updated
// This will trigger them to refresh their local state
this.io.to(workflowId).emit('workflow-updated', {
workflowId,
message: 'Workflow has been updated externally',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow update: ${workflowId}`)
}
handleCopilotWorkflowEdit(workflowId: string, description?: string) {
logger.info(`Handling copilot workflow edit notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for copilot workflow edit ${workflowId}`)
return
}
const timestamp = Date.now()
// Emit special event for copilot edits that tells clients to rehydrate from database
this.io.to(workflowId).emit('copilot-workflow-edit', {
workflowId,
description,
message: 'Copilot has edited the workflow - rehydrating from database',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about copilot workflow edit: ${workflowId}`)
}
async validateWorkflowConsistency(
workflowId: string
): Promise<{ valid: boolean; issues: string[] }> {
try {
const issues: string[] = []
const orphanedEdges = await db
.select({
id: workflowEdges.id,
sourceBlockId: workflowEdges.sourceBlockId,
targetBlockId: workflowEdges.targetBlockId,
})
.from(workflowEdges)
.leftJoin(workflowBlocks, eq(workflowEdges.sourceBlockId, workflowBlocks.id))
.where(and(eq(workflowEdges.workflowId, workflowId), isNull(workflowBlocks.id)))
if (orphanedEdges.length > 0) {
issues.push(`Found ${orphanedEdges.length} orphaned edges with missing source blocks`)
}
return { valid: issues.length === 0, issues }
} catch (error) {
logger.error('Error validating workflow consistency:', error)
return { valid: false, issues: ['Consistency check failed'] }
}
}
getWorkflowRooms(): ReadonlyMap<string, WorkflowRoom> {
return this.workflowRooms
}
getSocketToWorkflow(): ReadonlyMap<string, string> {
return this.socketToWorkflow
}
getUserSessions(): ReadonlyMap<string, { userId: string; userName: string }> {
return this.userSessions
}
hasWorkflowRoom(workflowId: string): boolean {
return this.workflowRooms.has(workflowId)
}
getWorkflowRoom(workflowId: string): WorkflowRoom | undefined {
return this.workflowRooms.get(workflowId)
}
setWorkflowRoom(workflowId: string, room: WorkflowRoom): void {
this.workflowRooms.set(workflowId, room)
}
getWorkflowIdForSocket(socketId: string): string | undefined {
return this.socketToWorkflow.get(socketId)
}
setWorkflowForSocket(socketId: string, workflowId: string): void {
this.socketToWorkflow.set(socketId, workflowId)
}
getUserSession(
socketId: string
): { userId: string; userName: string; avatarUrl?: string | null } | undefined {
return this.userSessions.get(socketId)
}
setUserSession(
socketId: string,
session: { userId: string; userName: string; avatarUrl?: string | null }
): void {
this.userSessions.set(socketId, session)
}
getTotalActiveConnections(): number {
return Array.from(this.workflowRooms.values()).reduce(
(total, room) => total + room.activeConnections,
0
)
}
broadcastPresenceUpdate(workflowId: string): void {
const room = this.workflowRooms.get(workflowId)
if (room) {
const roomPresence = Array.from(room.users.values())
this.io.to(workflowId).emit('presence-update', roomPresence)
}
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this.io.to(workflowId).emit(event, payload)
}
/**
* Get the number of unique users in a workflow room
* (not the number of socket connections)
*/
getUniqueUserCount(workflowId: string): number {
const room = this.workflowRooms.get(workflowId)
if (!room) return 0
const uniqueUsers = new Set<string>()
room.users.forEach((presence) => {
uniqueUsers.add(presence.userId)
})
return uniqueUsers.size
}
}

View File

@@ -1,260 +0,0 @@
import { createLogger } from '@sim/logger'
import type { Server } from 'socket.io'
import type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/socket/rooms/types'
const logger = createLogger('MemoryRoomManager')
/**
* In-memory room manager for single-pod deployments
* Used as fallback when REDIS_URL is not configured
*/
export class MemoryRoomManager implements IRoomManager {
private workflowRooms = new Map<string, WorkflowRoom>()
private socketToWorkflow = new Map<string, string>()
private userSessions = new Map<string, UserSession>()
private _io: Server
constructor(io: Server) {
this._io = io
}
get io(): Server {
return this._io
}
async initialize(): Promise<void> {
logger.info('MemoryRoomManager initialized (single-pod mode)')
}
async shutdown(): Promise<void> {
this.workflowRooms.clear()
this.socketToWorkflow.clear()
this.userSessions.clear()
logger.info('MemoryRoomManager shutdown complete')
}
async addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise<void> {
// Create room if it doesn't exist
if (!this.workflowRooms.has(workflowId)) {
this.workflowRooms.set(workflowId, {
workflowId,
users: new Map(),
lastModified: Date.now(),
activeConnections: 0,
})
}
const room = this.workflowRooms.get(workflowId)!
room.users.set(socketId, presence)
room.activeConnections++
room.lastModified = Date.now()
// Map socket to workflow
this.socketToWorkflow.set(socketId, workflowId)
// Store session
this.userSessions.set(socketId, {
userId: presence.userId,
userName: presence.userName,
avatarUrl: presence.avatarUrl,
})
logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`)
}
async removeUserFromRoom(socketId: string): Promise<string | null> {
const workflowId = this.socketToWorkflow.get(socketId)
if (!workflowId) {
return null
}
const room = this.workflowRooms.get(workflowId)
if (room) {
room.users.delete(socketId)
room.activeConnections = Math.max(0, room.activeConnections - 1)
// Clean up empty rooms
if (room.activeConnections === 0) {
this.workflowRooms.delete(workflowId)
logger.info(`Cleaned up empty workflow room: ${workflowId}`)
}
}
this.socketToWorkflow.delete(socketId)
this.userSessions.delete(socketId)
logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`)
return workflowId
}
async getWorkflowIdForSocket(socketId: string): Promise<string | null> {
return this.socketToWorkflow.get(socketId) ?? null
}
async getUserSession(socketId: string): Promise<UserSession | null> {
return this.userSessions.get(socketId) ?? null
}
async getWorkflowUsers(workflowId: string): Promise<UserPresence[]> {
const room = this.workflowRooms.get(workflowId)
if (!room) return []
return Array.from(room.users.values())
}
async hasWorkflowRoom(workflowId: string): Promise<boolean> {
return this.workflowRooms.has(workflowId)
}
async updateUserActivity(
workflowId: string,
socketId: string,
updates: Partial<Pick<UserPresence, 'cursor' | 'selection' | 'lastActivity'>>
): Promise<void> {
const room = this.workflowRooms.get(workflowId)
if (!room) return
const presence = room.users.get(socketId)
if (presence) {
if (updates.cursor !== undefined) presence.cursor = updates.cursor
if (updates.selection !== undefined) presence.selection = updates.selection
presence.lastActivity = updates.lastActivity ?? Date.now()
}
}
async updateRoomLastModified(workflowId: string): Promise<void> {
const room = this.workflowRooms.get(workflowId)
if (room) {
room.lastModified = Date.now()
}
}
async broadcastPresenceUpdate(workflowId: string): Promise<void> {
const users = await this.getWorkflowUsers(workflowId)
this._io.to(workflowId).emit('presence-update', users)
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this._io.to(workflowId).emit(event, payload)
}
async getUniqueUserCount(workflowId: string): Promise<number> {
const room = this.workflowRooms.get(workflowId)
if (!room) return 0
const uniqueUsers = new Set<string>()
room.users.forEach((presence) => {
uniqueUsers.add(presence.userId)
})
return uniqueUsers.size
}
async getTotalActiveConnections(): Promise<number> {
let total = 0
for (const room of this.workflowRooms.values()) {
total += room.activeConnections
}
return total
}
async handleWorkflowDeletion(workflowId: string): Promise<void> {
logger.info(`Handling workflow deletion notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for deleted workflow ${workflowId}`)
return
}
this._io.to(workflowId).emit('workflow-deleted', {
workflowId,
message: 'This workflow has been deleted',
timestamp: Date.now(),
})
const socketsToDisconnect: string[] = []
room.users.forEach((_presence, socketId) => {
socketsToDisconnect.push(socketId)
})
for (const socketId of socketsToDisconnect) {
const socket = this._io.sockets.sockets.get(socketId)
if (socket) {
socket.leave(workflowId)
logger.debug(`Disconnected socket ${socketId} from deleted workflow ${workflowId}`)
}
await this.removeUserFromRoom(socketId)
}
this.workflowRooms.delete(workflowId)
logger.info(
`Cleaned up workflow room ${workflowId} after deletion (${socketsToDisconnect.length} users disconnected)`
)
}
async handleWorkflowRevert(workflowId: string, timestamp: number): Promise<void> {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this._io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`)
}
async handleWorkflowUpdate(workflowId: string): Promise<void> {
logger.info(`Handling workflow update notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for updated workflow ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('workflow-updated', {
workflowId,
message: 'Workflow has been updated externally',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow update: ${workflowId}`)
}
async handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise<void> {
logger.info(`Handling copilot workflow edit notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for copilot workflow edit ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('copilot-workflow-edit', {
workflowId,
description,
message: 'Copilot has edited the workflow - rehydrating from database',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about copilot workflow edit: ${workflowId}`)
}
}

View File

@@ -1,434 +0,0 @@
import { createLogger } from '@sim/logger'
import { createClient, type RedisClientType } from 'redis'
import type { Server } from 'socket.io'
import type { IRoomManager, UserPresence, UserSession } from '@/socket/rooms/types'
const logger = createLogger('RedisRoomManager')
const KEYS = {
workflowUsers: (wfId: string) => `workflow:${wfId}:users`,
workflowMeta: (wfId: string) => `workflow:${wfId}:meta`,
socketWorkflow: (socketId: string) => `socket:${socketId}:workflow`,
socketSession: (socketId: string) => `socket:${socketId}:session`,
} as const
const SOCKET_KEY_TTL = 3600
/**
* Lua script for atomic user removal from room.
* Returns workflowId if user was removed, null otherwise.
* Handles room cleanup atomically to prevent race conditions.
*/
const REMOVE_USER_SCRIPT = `
local socketWorkflowKey = KEYS[1]
local socketSessionKey = KEYS[2]
local workflowUsersPrefix = ARGV[1]
local workflowMetaPrefix = ARGV[2]
local socketId = ARGV[3]
local workflowId = redis.call('GET', socketWorkflowKey)
if not workflowId then
return nil
end
local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users'
local workflowMetaKey = workflowMetaPrefix .. workflowId .. ':meta'
redis.call('HDEL', workflowUsersKey, socketId)
redis.call('DEL', socketWorkflowKey, socketSessionKey)
local remaining = redis.call('HLEN', workflowUsersKey)
if remaining == 0 then
redis.call('DEL', workflowUsersKey, workflowMetaKey)
end
return workflowId
`
/**
* Lua script for atomic user activity update.
* Performs read-modify-write atomically to prevent lost updates.
* Also refreshes TTL on socket keys to prevent expiry during long sessions.
*/
const UPDATE_ACTIVITY_SCRIPT = `
local workflowUsersKey = KEYS[1]
local socketWorkflowKey = KEYS[2]
local socketSessionKey = KEYS[3]
local socketId = ARGV[1]
local cursorJson = ARGV[2]
local selectionJson = ARGV[3]
local lastActivity = ARGV[4]
local ttl = tonumber(ARGV[5])
local existingJson = redis.call('HGET', workflowUsersKey, socketId)
if not existingJson then
return 0
end
local existing = cjson.decode(existingJson)
if cursorJson ~= '' then
existing.cursor = cjson.decode(cursorJson)
end
if selectionJson ~= '' then
existing.selection = cjson.decode(selectionJson)
end
existing.lastActivity = tonumber(lastActivity)
redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing))
redis.call('EXPIRE', socketWorkflowKey, ttl)
redis.call('EXPIRE', socketSessionKey, ttl)
return 1
`
/**
* Redis-backed room manager for multi-pod deployments.
* Uses Lua scripts for atomic operations to prevent race conditions.
*/
export class RedisRoomManager implements IRoomManager {
private redis: RedisClientType
private _io: Server
private isConnected = false
private removeUserScriptSha: string | null = null
private updateActivityScriptSha: string | null = null
constructor(io: Server, redisUrl: string) {
this._io = io
this.redis = createClient({
url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
if (retries > 10) {
logger.error('Redis reconnection failed after 10 attempts')
return new Error('Redis reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
})
this.redis.on('error', (err) => {
logger.error('Redis client error:', err)
})
this.redis.on('reconnecting', () => {
logger.warn('Redis client reconnecting...')
this.isConnected = false
})
this.redis.on('ready', () => {
logger.info('Redis client ready')
this.isConnected = true
})
}
get io(): Server {
return this._io
}
async initialize(): Promise<void> {
if (this.isConnected) return
try {
await this.redis.connect()
this.isConnected = true
// Pre-load Lua scripts for better performance
this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT)
this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT)
logger.info('RedisRoomManager connected to Redis and scripts loaded')
} catch (error) {
logger.error('Failed to connect to Redis:', error)
throw error
}
}
async shutdown(): Promise<void> {
if (!this.isConnected) return
try {
await this.redis.quit()
this.isConnected = false
logger.info('RedisRoomManager disconnected from Redis')
} catch (error) {
logger.error('Error during Redis shutdown:', error)
}
}
async addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise<void> {
try {
const pipeline = this.redis.multi()
pipeline.hSet(KEYS.workflowUsers(workflowId), socketId, JSON.stringify(presence))
pipeline.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString())
pipeline.set(KEYS.socketWorkflow(socketId), workflowId)
pipeline.expire(KEYS.socketWorkflow(socketId), SOCKET_KEY_TTL)
pipeline.hSet(KEYS.socketSession(socketId), {
userId: presence.userId,
userName: presence.userName,
avatarUrl: presence.avatarUrl || '',
})
pipeline.expire(KEYS.socketSession(socketId), SOCKET_KEY_TTL)
const results = await pipeline.exec()
// Check if any command failed
const failed = results.some((result) => result instanceof Error)
if (failed) {
logger.error(`Pipeline partially failed when adding user to room`, { workflowId, socketId })
throw new Error('Failed to store user session data in Redis')
}
logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`)
} catch (error) {
logger.error(`Failed to add user to room: ${socketId} -> ${workflowId}`, error)
throw error
}
}
async removeUserFromRoom(socketId: string, retried = false): Promise<string | null> {
if (!this.removeUserScriptSha) {
logger.error('removeUserFromRoom called before initialize()')
return null
}
try {
const workflowId = await this.redis.evalSha(this.removeUserScriptSha, {
keys: [KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId)],
arguments: ['workflow:', 'workflow:', socketId],
})
if (workflowId) {
logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`)
}
return workflowId as string | null
} catch (error) {
if ((error as Error).message?.includes('NOSCRIPT') && !retried) {
logger.warn('Lua script not found, reloading...')
this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT)
return this.removeUserFromRoom(socketId, true)
}
logger.error(`Failed to remove user from room: ${socketId}`, error)
return null
}
}
async getWorkflowIdForSocket(socketId: string): Promise<string | null> {
return this.redis.get(KEYS.socketWorkflow(socketId))
}
async getUserSession(socketId: string): Promise<UserSession | null> {
try {
const session = await this.redis.hGetAll(KEYS.socketSession(socketId))
if (!session.userId) {
return null
}
return {
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl || undefined,
}
} catch (error) {
logger.error(`Failed to get user session for ${socketId}:`, error)
return null
}
}
async getWorkflowUsers(workflowId: string): Promise<UserPresence[]> {
try {
const users = await this.redis.hGetAll(KEYS.workflowUsers(workflowId))
return Object.entries(users)
.map(([socketId, json]) => {
try {
return JSON.parse(json) as UserPresence
} catch {
logger.warn(`Corrupted user data for socket ${socketId}, skipping`)
return null
}
})
.filter((u): u is UserPresence => u !== null)
} catch (error) {
logger.error(`Failed to get workflow users for ${workflowId}:`, error)
return []
}
}
async hasWorkflowRoom(workflowId: string): Promise<boolean> {
const exists = await this.redis.exists(KEYS.workflowUsers(workflowId))
return exists > 0
}
async updateUserActivity(
workflowId: string,
socketId: string,
updates: Partial<Pick<UserPresence, 'cursor' | 'selection' | 'lastActivity'>>,
retried = false
): Promise<void> {
if (!this.updateActivityScriptSha) {
logger.error('updateUserActivity called before initialize()')
return
}
try {
await this.redis.evalSha(this.updateActivityScriptSha, {
keys: [
KEYS.workflowUsers(workflowId),
KEYS.socketWorkflow(socketId),
KEYS.socketSession(socketId),
],
arguments: [
socketId,
updates.cursor !== undefined ? JSON.stringify(updates.cursor) : '',
updates.selection !== undefined ? JSON.stringify(updates.selection) : '',
(updates.lastActivity ?? Date.now()).toString(),
SOCKET_KEY_TTL.toString(),
],
})
} catch (error) {
if ((error as Error).message?.includes('NOSCRIPT') && !retried) {
logger.warn('Lua script not found, reloading...')
this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT)
return this.updateUserActivity(workflowId, socketId, updates, true)
}
logger.error(`Failed to update user activity: ${socketId}`, error)
}
}
async updateRoomLastModified(workflowId: string): Promise<void> {
await this.redis.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString())
}
async broadcastPresenceUpdate(workflowId: string): Promise<void> {
const users = await this.getWorkflowUsers(workflowId)
// io.to() with Redis adapter broadcasts to all pods
this._io.to(workflowId).emit('presence-update', users)
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this._io.to(workflowId).emit(event, payload)
}
async getUniqueUserCount(workflowId: string): Promise<number> {
const users = await this.getWorkflowUsers(workflowId)
const uniqueUserIds = new Set(users.map((u) => u.userId))
return uniqueUserIds.size
}
async getTotalActiveConnections(): Promise<number> {
// This is more complex with Redis - we'd need to scan all workflow:*:users keys
// For now, just count sockets in this server instance
// The true count would require aggregating across all pods
return this._io.sockets.sockets.size
}
async handleWorkflowDeletion(workflowId: string): Promise<void> {
logger.info(`Handling workflow deletion notification for ${workflowId}`)
try {
const users = await this.getWorkflowUsers(workflowId)
if (users.length === 0) {
logger.debug(`No active users found for deleted workflow ${workflowId}`)
return
}
// Notify all clients across all pods via Redis adapter
this._io.to(workflowId).emit('workflow-deleted', {
workflowId,
message: 'This workflow has been deleted',
timestamp: Date.now(),
})
// Use Socket.IO's cross-pod socketsLeave() to remove all sockets from the room
// This works across all pods when using the Redis adapter
await this._io.in(workflowId).socketsLeave(workflowId)
logger.debug(`All sockets left workflow room ${workflowId} via socketsLeave()`)
// Remove all users from Redis state
for (const user of users) {
await this.removeUserFromRoom(user.socketId)
}
// Clean up room data
await this.redis.del([KEYS.workflowUsers(workflowId), KEYS.workflowMeta(workflowId)])
logger.info(
`Cleaned up workflow room ${workflowId} after deletion (${users.length} users disconnected)`
)
} catch (error) {
logger.error(`Failed to handle workflow deletion for ${workflowId}:`, error)
}
}
async handleWorkflowRevert(workflowId: string, timestamp: number): Promise<void> {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const hasRoom = await this.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this._io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
await this.updateRoomLastModified(workflowId)
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about workflow revert: ${workflowId}`)
}
async handleWorkflowUpdate(workflowId: string): Promise<void> {
logger.info(`Handling workflow update notification for ${workflowId}`)
const hasRoom = await this.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`No active room found for updated workflow ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('workflow-updated', {
workflowId,
message: 'Workflow has been updated externally',
timestamp,
})
await this.updateRoomLastModified(workflowId)
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about workflow update: ${workflowId}`)
}
async handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise<void> {
logger.info(`Handling copilot workflow edit notification for ${workflowId}`)
const hasRoom = await this.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`No active room found for copilot workflow edit ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('copilot-workflow-edit', {
workflowId,
description,
message: 'Copilot has edited the workflow - rehydrating from database',
timestamp,
})
await this.updateRoomLastModified(workflowId)
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about copilot workflow edit: ${workflowId}`)
}
}

View File

@@ -1,140 +0,0 @@
import type { Server } from 'socket.io'
/**
* User presence data stored in room state
*/
export interface UserPresence {
userId: string
workflowId: string
userName: string
socketId: string
tabSessionId?: string
joinedAt: number
lastActivity: number
role: string
cursor?: { x: number; y: number }
selection?: { type: 'block' | 'edge' | 'none'; id?: string }
avatarUrl?: string | null
}
/**
* User session data (minimal info for quick lookups)
*/
export interface UserSession {
userId: string
userName: string
avatarUrl?: string | null
}
/**
* Workflow room state
*/
export interface WorkflowRoom {
workflowId: string
users: Map<string, UserPresence>
lastModified: number
activeConnections: number
}
/**
* Common interface for room managers (in-memory and Redis)
* All methods that access state are async to support Redis operations
*/
export interface IRoomManager {
readonly io: Server
/**
* Initialize the room manager (connect to Redis, etc.)
*/
initialize(): Promise<void>
/**
* Clean shutdown
*/
shutdown(): Promise<void>
/**
* Add a user to a workflow room
*/
addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise<void>
/**
* Remove a user from their current room
* Returns the workflowId they were in, or null if not in any room
*/
removeUserFromRoom(socketId: string): Promise<string | null>
/**
* Get the workflow ID for a socket
*/
getWorkflowIdForSocket(socketId: string): Promise<string | null>
/**
* Get user session data for a socket
*/
getUserSession(socketId: string): Promise<UserSession | null>
/**
* Get all users in a workflow room
*/
getWorkflowUsers(workflowId: string): Promise<UserPresence[]>
/**
* Check if a workflow room exists
*/
hasWorkflowRoom(workflowId: string): Promise<boolean>
/**
* Update user activity (cursor, selection, lastActivity)
*/
updateUserActivity(
workflowId: string,
socketId: string,
updates: Partial<Pick<UserPresence, 'cursor' | 'selection' | 'lastActivity'>>
): Promise<void>
/**
* Update room's lastModified timestamp
*/
updateRoomLastModified(workflowId: string): Promise<void>
/**
* Broadcast presence update to all clients in a workflow room
*/
broadcastPresenceUpdate(workflowId: string): Promise<void>
/**
* Emit an event to all clients in a workflow room
*/
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void
/**
* Get the number of unique users in a workflow room
*/
getUniqueUserCount(workflowId: string): Promise<number>
/**
* Get total active connections across all rooms
*/
getTotalActiveConnections(): Promise<number>
/**
* Handle workflow deletion - notify users and clean up room
*/
handleWorkflowDeletion(workflowId: string): Promise<void>
/**
* Handle workflow revert - notify users
*/
handleWorkflowRevert(workflowId: string, timestamp: number): Promise<void>
/**
* Handle workflow update - notify users
*/
handleWorkflowUpdate(workflowId: string): Promise<void>
/**
* Handle copilot workflow edit - notify users to rehydrate
*/
handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise<void>
}

View File

@@ -1,52 +1,11 @@
import type { IncomingMessage, ServerResponse } from 'http'
import { env } from '@/lib/core/config/env'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
interface Logger {
info: (message: string, ...args: unknown[]) => void
error: (message: string, ...args: unknown[]) => void
debug: (message: string, ...args: unknown[]) => void
warn: (message: string, ...args: unknown[]) => void
}
function checkInternalApiKey(req: IncomingMessage): { success: boolean; error?: string } {
const apiKey = req.headers['x-api-key']
const expectedApiKey = env.INTERNAL_API_SECRET
if (!expectedApiKey) {
return { success: false, error: 'Internal API key not configured' }
}
if (!apiKey) {
return { success: false, error: 'API key required' }
}
if (apiKey !== expectedApiKey) {
return { success: false, error: 'Invalid API key' }
}
return { success: true }
}
function readRequestBody(req: IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => resolve(body))
req.on('error', reject)
})
}
function sendSuccess(res: ServerResponse): void {
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
}
function sendError(res: ServerResponse, message: string, status = 500): void {
res.writeHead(status, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: message }))
info: (message: string, ...args: any[]) => void
error: (message: string, ...args: any[]) => void
debug: (message: string, ...args: any[]) => void
warn: (message: string, ...args: any[]) => void
}
/**
@@ -55,91 +14,101 @@ function sendError(res: ServerResponse, message: string, status = 500): void {
* @param logger - Logger instance for logging requests and errors
* @returns HTTP request handler function
*/
export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
return async (req: IncomingMessage, res: ServerResponse) => {
// Health check doesn't require auth
export function createHttpHandler(roomManager: RoomManager, logger: Logger) {
return (req: IncomingMessage, res: ServerResponse) => {
if (req.method === 'GET' && req.url === '/health') {
try {
const connections = await roomManager.getTotalActiveConnections()
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(
JSON.stringify({
status: 'ok',
timestamp: new Date().toISOString(),
connections,
})
)
} catch (error) {
logger.error('Error in health check:', error)
res.writeHead(503, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ status: 'error', message: 'Health check failed' }))
}
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(
JSON.stringify({
status: 'ok',
timestamp: new Date().toISOString(),
connections: roomManager.getTotalActiveConnections(),
})
)
return
}
// All POST endpoints require internal API key authentication
if (req.method === 'POST') {
const authResult = checkInternalApiKey(req)
if (!authResult.success) {
res.writeHead(401, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: authResult.error }))
return
}
}
// Handle workflow deletion notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-deleted') {
try {
const body = await readRequestBody(req)
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowDeletion(workflowId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow deletion notification:', error)
sendError(res, 'Failed to process deletion notification')
}
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId } = JSON.parse(body)
roomManager.handleWorkflowDeletion(workflowId)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow deletion notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process deletion notification' }))
}
})
return
}
// Handle workflow update notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-updated') {
try {
const body = await readRequestBody(req)
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowUpdate(workflowId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow update notification:', error)
sendError(res, 'Failed to process update notification')
}
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId } = JSON.parse(body)
roomManager.handleWorkflowUpdate(workflowId)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow update notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process update notification' }))
}
})
return
}
// Handle copilot workflow edit notifications from the main API
if (req.method === 'POST' && req.url === '/api/copilot-workflow-edit') {
try {
const body = await readRequestBody(req)
const { workflowId, description } = JSON.parse(body)
await roomManager.handleCopilotWorkflowEdit(workflowId, description)
sendSuccess(res)
} catch (error) {
logger.error('Error handling copilot workflow edit notification:', error)
sendError(res, 'Failed to process copilot edit notification')
}
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId, description } = JSON.parse(body)
roomManager.handleCopilotWorkflowEdit(workflowId, description)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling copilot workflow edit notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process copilot edit notification' }))
}
})
return
}
// Handle workflow revert notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-reverted') {
try {
const body = await readRequestBody(req)
const { workflowId, timestamp } = JSON.parse(body)
await roomManager.handleWorkflowRevert(workflowId, timestamp)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow revert notification:', error)
sendError(res, 'Failed to process revert notification')
}
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId, timestamp } = JSON.parse(body)
roomManager.handleWorkflowRevert(workflowId, timestamp)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow revert notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process revert notification' }))
}
})
return
}

View File

@@ -239,3 +239,5 @@ export const WorkflowOperationSchema = z.union([
VariableOperationSchema,
WorkflowStateOperationSchema,
])
export { PositionSchema, AutoConnectEdgeSchema }

View File

@@ -4,19 +4,6 @@ import type { OperationQueueState, QueuedOperation } from './types'
const logger = createLogger('OperationQueue')
/** Timeout for subblock/variable operations before considering them failed */
const SUBBLOCK_VARIABLE_TIMEOUT_MS = 15000
/** Timeout for structural operations before considering them failed */
const STRUCTURAL_TIMEOUT_MS = 5000
/** Maximum retry attempts for subblock/variable operations */
const SUBBLOCK_VARIABLE_MAX_RETRIES = 5
/** Maximum retry attempts for structural operations */
const STRUCTURAL_MAX_RETRIES = 3
/** Maximum retry delay cap for subblock/variable operations */
const SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS = 3000
/** Base retry delay multiplier (1s, 2s, 3s for linear) */
const RETRY_DELAY_BASE_MS = 1000
const retryTimeouts = new Map<string, NodeJS.Timeout>()
const operationTimeouts = new Map<string, NodeJS.Timeout>()
@@ -213,14 +200,14 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
(operation.operation.operation === 'variable-update' &&
operation.operation.target === 'variable')
const maxRetries = isSubblockOrVariable ? SUBBLOCK_VARIABLE_MAX_RETRIES : STRUCTURAL_MAX_RETRIES
const maxRetries = isSubblockOrVariable ? 5 : 3 // 5 retries for text, 3 for structural
if (operation.retryCount < maxRetries) {
const newRetryCount = operation.retryCount + 1
// Faster retries for subblock/variable, exponential for structural
const delay = isSubblockOrVariable
? Math.min(RETRY_DELAY_BASE_MS * newRetryCount, SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS)
: 2 ** newRetryCount * RETRY_DELAY_BASE_MS
? Math.min(1000 * newRetryCount, 3000) // 1s, 2s, 3s, 3s, 3s (cap at 3s)
: 2 ** newRetryCount * 1000 // 2s, 4s, 8s (exponential for structural)
logger.warn(
`Operation failed, retrying in ${delay}ms (attempt ${newRetryCount}/${maxRetries})`,
@@ -322,9 +309,7 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
nextOperation.operation.target === 'subblock') ||
(nextOperation.operation.operation === 'variable-update' &&
nextOperation.operation.target === 'variable')
const timeoutDuration = isSubblockOrVariable
? SUBBLOCK_VARIABLE_TIMEOUT_MS
: STRUCTURAL_TIMEOUT_MS
const timeoutDuration = isSubblockOrVariable ? 15000 : 5000 // 15s for text edits, 5s for structural ops
const timeoutId = setTimeout(() => {
logger.warn(`Operation timeout - no server response after ${timeoutDuration}ms`, {

View File

@@ -104,7 +104,6 @@
"@react-email/components": "^0.0.34",
"@react-email/render": "2.0.0",
"@sim/logger": "workspace:*",
"@socket.io/redis-adapter": "8.3.0",
"@t3-oss/env-nextjs": "0.13.4",
"@tanstack/react-query": "5.90.8",
"@tanstack/react-query-devtools": "5.90.2",
@@ -175,7 +174,6 @@
"react-simple-code-editor": "^0.14.1",
"react-window": "2.2.3",
"reactflow": "^11.11.4",
"redis": "5.10.0",
"rehype-autolink-headings": "^7.1.0",
"rehype-slug": "^6.0.0",
"remark-gfm": "4.0.1",
@@ -1148,16 +1146,6 @@
"@reactflow/node-toolbar": ["@reactflow/node-toolbar@1.3.14", "", { "dependencies": { "@reactflow/core": "11.11.4", "classcat": "^5.0.3", "zustand": "^4.4.1" }, "peerDependencies": { "react": ">=17", "react-dom": ">=17" } }, "sha512-rbynXQnH/xFNu4P9H+hVqlEUafDCkEoCy0Dg9mG22Sg+rY/0ck6KkrAQrYrTgXusd+cEJOMK0uOOFCK2/5rSGQ=="],
"@redis/bloom": ["@redis/bloom@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-doIF37ob+l47n0rkpRNgU8n4iacBlKM9xLiP1LtTZTvz8TloJB8qx/MgvhMhKdYG+CvCY2aPBnN2706izFn/4A=="],
"@redis/client": ["@redis/client@5.10.0", "", { "dependencies": { "cluster-key-slot": "1.1.2" } }, "sha512-JXmM4XCoso6C75Mr3lhKA3eNxSzkYi3nCzxDIKY+YOszYsJjuKbFgVtguVPbLMOttN4iu2fXoc2BGhdnYhIOxA=="],
"@redis/json": ["@redis/json@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-B2G8XlOmTPUuZtD44EMGbtoepQG34RCDXLZbjrtON1Djet0t5Ri7/YPXvL9aomXqP8lLTreaprtyLKF4tmXEEA=="],
"@redis/search": ["@redis/search@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-3SVcPswoSfp2HnmWbAGUzlbUPn7fOohVu2weUQ0S+EMiQi8jwjL+aN2p6V3TI65eNfVsJ8vyPvqWklm6H6esmg=="],
"@redis/time-series": ["@redis/time-series@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-cPkpddXH5kc/SdRhF0YG0qtjL+noqFT0AcHbQ6axhsPsO7iqPi1cjxgdkE9TNeKiBUUdCaU1DbqkR/LzbzPBhg=="],
"@resvg/resvg-wasm": ["@resvg/resvg-wasm@2.4.0", "", {}, "sha512-C7c51Nn4yTxXFKvgh2txJFNweaVcfUPQxwEUFw4aWsCmfiBDJsTSwviIF8EcwjQ6k8bPyMWCl1vw4BdxE569Cg=="],
"@rolldown/pluginutils": ["@rolldown/pluginutils@1.0.0-beta.27", "", {}, "sha512-+d0F4MKMCbeVUJwG96uQ4SgAznZNSq93I3V+9NHA4OpvqG8mRCpGdKmK8l/dl02h2CCDHwW2FqilnTyDcAnqjA=="],
@@ -1352,8 +1340,6 @@
"@socket.io/component-emitter": ["@socket.io/component-emitter@3.1.2", "", {}, "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA=="],
"@socket.io/redis-adapter": ["@socket.io/redis-adapter@8.3.0", "", { "dependencies": { "debug": "~4.3.1", "notepack.io": "~3.0.1", "uid2": "1.0.0" }, "peerDependencies": { "socket.io-adapter": "^2.5.4" } }, "sha512-ly0cra+48hDmChxmIpnESKrc94LjRL80TEmZVscuQ/WWkRP81nNj8W8cCGMqbI4L6NCuAaPRSzZF1a9GlAxxnA=="],
"@standard-schema/spec": ["@standard-schema/spec@1.1.0", "", {}, "sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w=="],
"@standard-schema/utils": ["@standard-schema/utils@0.3.0", "", {}, "sha512-e7Mew686owMaPJVNNLs55PUvgz371nKgwsc4vxE49zsODpJEnxgxRo2y/OKrqueavXgZNMDVj3DdHFlaSAeU8g=="],
@@ -2816,8 +2802,6 @@
"normalize-range": ["normalize-range@0.1.2", "", {}, "sha512-bdok/XvKII3nUpklnV6P2hxtMNrCboOjAcyBuQnWEhO665FwrSNRxU+AqpsyvO6LgGYPspN+lu5CLtw4jPRKNA=="],
"notepack.io": ["notepack.io@3.0.1", "", {}, "sha512-TKC/8zH5pXIAMVQio2TvVDTtPRX+DJPHDqjRbxogtFiByHyzKmy96RA0JtCQJ+WouyyL4A10xomQzgbUT+1jCg=="],
"npm-run-path": ["npm-run-path@5.3.0", "", { "dependencies": { "path-key": "^4.0.0" } }, "sha512-ppwTtiJZq0O/ai0z7yfudtBpWIoxM8yE6nHi1X47eFR2EWORqfbu6CnPlNsjeN683eT0qG6H/Pyf9fCcvjnnnQ=="],
"npm-to-yarn": ["npm-to-yarn@3.0.1", "", {}, "sha512-tt6PvKu4WyzPwWUzy/hvPFqn+uwXO0K1ZHka8az3NnrhWJDmSqI8ncWq0fkL0k/lmmi5tAC11FXwXuh0rFbt1A=="],
@@ -3088,8 +3072,6 @@
"redent": ["redent@3.0.0", "", { "dependencies": { "indent-string": "^4.0.0", "strip-indent": "^3.0.0" } }, "sha512-6tDA8g98We0zd0GvVeMT9arEOnTw9qM03L9cJXaCjrip1OO764RDBLBfrB4cwzNGDj5OA5ioymC9GkizgWJDUg=="],
"redis": ["redis@5.10.0", "", { "dependencies": { "@redis/bloom": "5.10.0", "@redis/client": "5.10.0", "@redis/json": "5.10.0", "@redis/search": "5.10.0", "@redis/time-series": "5.10.0" } }, "sha512-0/Y+7IEiTgVGPrLFKy8oAEArSyEJkU0zvgV5xyi9NzNQ+SLZmyFbUsWIbgPcd4UdUh00opXGKlXJwMmsis5Byw=="],
"redis-errors": ["redis-errors@1.2.0", "", {}, "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w=="],
"redis-parser": ["redis-parser@3.0.0", "", { "dependencies": { "redis-errors": "^1.0.0" } }, "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A=="],
@@ -3452,8 +3434,6 @@
"ufo": ["ufo@1.6.3", "", {}, "sha512-yDJTmhydvl5lJzBmy/hyOAA0d+aqCBuwl818haVdYCRrWV84o7YyeVm4QlVHStqNrrJSTb6jKuFAVqAFsr+K3Q=="],
"uid2": ["uid2@1.0.0", "", {}, "sha512-+I6aJUv63YAcY9n4mQreLUt0d4lvwkkopDNmpomkAUz0fAkEMV9pRWxN0EjhW1YfRhcuyHg2v3mwddCDW1+LFQ=="],
"ulid": ["ulid@2.4.0", "", { "bin": { "ulid": "bin/cli.js" } }, "sha512-fIRiVTJNcSRmXKPZtGzFQv9WRrZ3M9eoptl/teFJvjOzmpU+/K/JH6HZ8deBfb5vMEpicJcLn7JmvdknlMq7Zg=="],
"unbzip2-stream": ["unbzip2-stream@1.4.3", "", { "dependencies": { "buffer": "^5.2.1", "through": "^2.3.8" } }, "sha512-mlExGW4w71ebDJviH16lQLtZS32VKqsSfk80GCfUlwT/4/hNRFsoscrF/c++9xinkMzECL1uL9DDwXqFWkruPg=="],
@@ -3872,8 +3852,6 @@
"@shuding/opentype.js/fflate": ["fflate@0.7.4", "", {}, "sha512-5u2V/CDW15QM1XbbgS+0DfPxVB+jUKhWEKuuFuHncbk3tEEqzmoXL+2KyOFuKGqOnmdIy0/davWF1CkuwtibCw=="],
"@socket.io/redis-adapter/debug": ["debug@4.3.7", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ=="],
"@tailwindcss/node/jiti": ["jiti@2.6.1", "", { "bin": { "jiti": "lib/jiti-cli.mjs" } }, "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ=="],
"@tailwindcss/oxide-wasm32-wasi/@emnapi/core": ["@emnapi/core@1.8.1", "", { "dependencies": { "@emnapi/wasi-threads": "1.1.0", "tslib": "^2.4.0" }, "bundled": true }, "sha512-AvT9QFpxK0Zd8J0jopedNm+w/2fIzvtPKPjqyw9jwvBaReTTqPBk9Hixaz7KbjimP+QNz605/XnjFcDAL2pqBg=="],

View File

@@ -44,10 +44,6 @@ spec:
env:
- name: DATABASE_URL
value: {{ include "sim.databaseUrl" . | quote }}
{{- if .Values.app.env.REDIS_URL }}
- name: REDIS_URL
value: {{ .Values.app.env.REDIS_URL | quote }}
{{- end }}
{{- range $key, $value := .Values.realtime.env }}
- name: {{ $key }}
value: {{ $value | quote }}