From 49a6197cd24d66deac57c4ccd8e52a1966c93f26 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 29 Jan 2026 19:56:36 -0800 Subject: [PATCH] cleanup code --- .../deployments/[version]/revert/route.ts | 5 +- apps/sim/app/api/workflows/[id]/route.ts | 5 +- .../sim/app/api/workflows/[id]/state/route.ts | 5 +- .../workspace/providers/socket-provider.tsx | 22 ++- apps/sim/socket/config/socket.ts | 19 ++- apps/sim/socket/handlers/operations.ts | 17 +- apps/sim/socket/handlers/subblocks.ts | 12 +- apps/sim/socket/handlers/variables.ts | 12 +- apps/sim/socket/index.ts | 5 +- apps/sim/socket/middleware/auth.ts | 2 +- apps/sim/socket/routes/http.ts | 160 ++++++++++-------- apps/sim/stores/operation-queue/store.ts | 23 ++- 12 files changed, 189 insertions(+), 98 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/deployments/[version]/revert/route.ts b/apps/sim/app/api/workflows/[id]/deployments/[version]/revert/route.ts index 80eb62fc9..b7e31a0e3 100644 --- a/apps/sim/app/api/workflows/[id]/deployments/[version]/revert/route.ts +++ b/apps/sim/app/api/workflows/[id]/deployments/[version]/revert/route.ts @@ -97,7 +97,10 @@ export async function POST( const socketServerUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002' await fetch(`${socketServerUrl}/api/workflow-reverted`, { method: 'POST', - headers: { 'Content-Type': 'application/json' }, + headers: { + 'Content-Type': 'application/json', + 'x-api-key': env.INTERNAL_API_SECRET, + }, body: JSON.stringify({ workflowId: id, timestamp: Date.now() }), }) } catch (e) { diff --git a/apps/sim/app/api/workflows/[id]/route.ts b/apps/sim/app/api/workflows/[id]/route.ts index 802e4e447..2ceee222d 100644 --- a/apps/sim/app/api/workflows/[id]/route.ts +++ b/apps/sim/app/api/workflows/[id]/route.ts @@ -361,7 +361,10 @@ export async function DELETE( const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002' const socketResponse = await fetch(`${socketUrl}/api/workflow-deleted`, { method: 'POST', - headers: { 'Content-Type': 'application/json' }, + headers: { + 'Content-Type': 'application/json', + 'x-api-key': env.INTERNAL_API_SECRET, + }, body: JSON.stringify({ workflowId }), }) diff --git a/apps/sim/app/api/workflows/[id]/state/route.ts b/apps/sim/app/api/workflows/[id]/state/route.ts index 0c977a56c..f99f47c27 100644 --- a/apps/sim/app/api/workflows/[id]/state/route.ts +++ b/apps/sim/app/api/workflows/[id]/state/route.ts @@ -254,7 +254,10 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{ const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002' const notifyResponse = await fetch(`${socketUrl}/api/workflow-updated`, { method: 'POST', - headers: { 'Content-Type': 'application/json' }, + headers: { + 'Content-Type': 'application/json', + 'x-api-key': env.INTERNAL_API_SECRET, + }, body: JSON.stringify({ workflowId }), }) diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 494c31036..a637fa1b2 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -322,6 +322,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { // Handle join workflow success - confirms room membership with presence list socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => { isRejoiningRef.current = false + // Ignore stale success responses from previous navigation + if (workflowId !== urlWorkflowIdRef.current) { + logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`) + return + } setCurrentWorkflowId(workflowId) setPresenceUsers(presenceUsers || []) logger.info(`Successfully joined workflow room: ${workflowId}`, { @@ -516,7 +521,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.info('Received workflow state from server') if (workflowData?.state) { - await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state') + try { + await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state') + } catch (error) { + logger.error('Error rehydrating workflow state:', error) + } } }) @@ -598,10 +607,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const leaveWorkflow = useCallback(() => { if (socket && currentWorkflowId) { logger.info(`Leaving workflow: ${currentWorkflowId}`) - try { - const { useOperationQueueStore } = require('@/stores/operation-queue/store') - useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId) - } catch {} + import('@/stores/operation-queue/store') + .then(({ useOperationQueueStore }) => { + useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId) + }) + .catch((error) => { + logger.warn('Failed to cancel operations for workflow:', error) + }) socket.emit('leave-workflow') setCurrentWorkflowId(null) setPresenceUsers([]) diff --git a/apps/sim/socket/config/socket.ts b/apps/sim/socket/config/socket.ts index c4272b198..366558536 100644 --- a/apps/sim/socket/config/socket.ts +++ b/apps/sim/socket/config/socket.ts @@ -9,6 +9,13 @@ import { getBaseUrl } from '@/lib/core/utils/urls' const logger = createLogger('SocketIOConfig') +/** Socket.IO ping timeout - how long to wait for pong before considering connection dead */ +const PING_TIMEOUT_MS = 60000 +/** Socket.IO ping interval - how often to send ping packets */ +const PING_INTERVAL_MS = 25000 +/** Maximum HTTP buffer size for Socket.IO messages */ +const MAX_HTTP_BUFFER_SIZE = 1e6 + let adapterPubClient: RedisClientType | null = null let adapterSubClient: RedisClientType | null = null @@ -41,9 +48,9 @@ export async function createSocketIOServer(httpServer: HttpServer): Promise() * Removes the socket's operationIds from pending updates to prevent memory leaks. */ export function cleanupPendingSubblocksForSocket(socketId: string): void { - for (const [key, pending] of pendingSubblockUpdates.entries()) { + for (const [, pending] of pendingSubblockUpdates.entries()) { // Remove this socket's operation entries for (const [opId, sid] of pending.opToSocket.entries()) { if (sid === socketId) { @@ -52,6 +55,9 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow', }) + if (operationId) { + socket.emit('operation-failed', { operationId, error: 'Session expired' }) + } return } @@ -79,7 +85,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: existing.timeout = setTimeout(async () => { await flushSubblockUpdate(workflowId, existing, roomManager) pendingSubblockUpdates.delete(debouncedKey) - }, 25) + }, DEBOUNCE_INTERVAL_MS) } else { const opToSocket = new Map() if (operationId) opToSocket.set(operationId, socket.id) @@ -89,7 +95,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: await flushSubblockUpdate(workflowId, pending, roomManager) pendingSubblockUpdates.delete(debouncedKey) } - }, 25) + }, DEBOUNCE_INTERVAL_MS) pendingSubblockUpdates.set(debouncedKey, { latest: { blockId, subblockId, value, timestamp }, timeout, diff --git a/apps/sim/socket/handlers/variables.ts b/apps/sim/socket/handlers/variables.ts index a34b60126..0421b4e60 100644 --- a/apps/sim/socket/handlers/variables.ts +++ b/apps/sim/socket/handlers/variables.ts @@ -7,6 +7,9 @@ import type { IRoomManager } from '@/socket/rooms' const logger = createLogger('VariablesHandlers') +/** Debounce interval for coalescing rapid variable updates before persisting */ +const DEBOUNCE_INTERVAL_MS = 25 + type PendingVariable = { latest: { variableId: string; field: string; value: any; timestamp: number } timeout: NodeJS.Timeout @@ -21,7 +24,7 @@ const pendingVariableUpdates = new Map() * Removes the socket's operationIds from pending updates to prevent memory leaks. */ export function cleanupPendingVariablesForSocket(socketId: string): void { - for (const [key, pending] of pendingVariableUpdates.entries()) { + for (const [, pending] of pendingVariableUpdates.entries()) { for (const [opId, sid] of pending.opToSocket.entries()) { if (sid === socketId) { pending.opToSocket.delete(opId) @@ -48,6 +51,9 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow', }) + if (operationId) { + socket.emit('operation-failed', { operationId, error: 'Session expired' }) + } return } @@ -74,7 +80,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: existing.timeout = setTimeout(async () => { await flushVariableUpdate(workflowId, existing, roomManager) pendingVariableUpdates.delete(debouncedKey) - }, 25) + }, DEBOUNCE_INTERVAL_MS) } else { const opToSocket = new Map() if (operationId) opToSocket.set(operationId, socket.id) @@ -84,7 +90,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: await flushVariableUpdate(workflowId, pending, roomManager) pendingVariableUpdates.delete(debouncedKey) } - }, 25) + }, DEBOUNCE_INTERVAL_MS) pendingVariableUpdates.set(debouncedKey, { latest: { variableId, field, value, timestamp }, timeout, diff --git a/apps/sim/socket/index.ts b/apps/sim/socket/index.ts index b325cb096..08d94294b 100644 --- a/apps/sim/socket/index.ts +++ b/apps/sim/socket/index.ts @@ -10,6 +10,9 @@ import { createHttpHandler } from '@/socket/routes/http' const logger = createLogger('CollaborativeSocketServer') +/** Maximum time to wait for graceful shutdown before forcing exit */ +const SHUTDOWN_TIMEOUT_MS = 10000 + async function createRoomManager(io: SocketIOServer): Promise { if (env.REDIS_URL) { logger.info('Initializing Redis-backed RoomManager for multi-pod support') @@ -108,7 +111,7 @@ async function main() { setTimeout(() => { logger.error('Forced shutdown after timeout') process.exit(1) - }, 10000) + }, SHUTDOWN_TIMEOUT_MS) } process.on('SIGINT', shutdown) diff --git a/apps/sim/socket/middleware/auth.ts b/apps/sim/socket/middleware/auth.ts index d6c26e63a..b334902d8 100644 --- a/apps/sim/socket/middleware/auth.ts +++ b/apps/sim/socket/middleware/auth.ts @@ -21,7 +21,7 @@ export interface AuthenticatedSocket extends Socket { * Socket.IO authentication middleware. * Handles both anonymous mode (DISABLE_AUTH=true) and normal token-based auth. */ -export async function authenticateSocket(socket: AuthenticatedSocket, next: any) { +export async function authenticateSocket(socket: AuthenticatedSocket, next: (err?: Error) => void) { try { if (isAuthDisabled) { socket.userId = ANONYMOUS_USER_ID diff --git a/apps/sim/socket/routes/http.ts b/apps/sim/socket/routes/http.ts index 0d57ff29f..b068b94ab 100644 --- a/apps/sim/socket/routes/http.ts +++ b/apps/sim/socket/routes/http.ts @@ -1,11 +1,52 @@ import type { IncomingMessage, ServerResponse } from 'http' +import { env } from '@/lib/core/config/env' import type { IRoomManager } from '@/socket/rooms' interface Logger { - info: (message: string, ...args: any[]) => void - error: (message: string, ...args: any[]) => void - debug: (message: string, ...args: any[]) => void - warn: (message: string, ...args: any[]) => void + info: (message: string, ...args: unknown[]) => void + error: (message: string, ...args: unknown[]) => void + debug: (message: string, ...args: unknown[]) => void + warn: (message: string, ...args: unknown[]) => void +} + +function checkInternalApiKey(req: IncomingMessage): { success: boolean; error?: string } { + const apiKey = req.headers['x-api-key'] + const expectedApiKey = env.INTERNAL_API_SECRET + + if (!expectedApiKey) { + return { success: false, error: 'Internal API key not configured' } + } + + if (!apiKey) { + return { success: false, error: 'API key required' } + } + + if (apiKey !== expectedApiKey) { + return { success: false, error: 'Invalid API key' } + } + + return { success: true } +} + +function readRequestBody(req: IncomingMessage): Promise { + return new Promise((resolve, reject) => { + let body = '' + req.on('data', (chunk) => { + body += chunk.toString() + }) + req.on('end', () => resolve(body)) + req.on('error', reject) + }) +} + +function sendSuccess(res: ServerResponse): void { + res.writeHead(200, { 'Content-Type': 'application/json' }) + res.end(JSON.stringify({ success: true })) +} + +function sendError(res: ServerResponse, message: string, status = 500): void { + res.writeHead(status, { 'Content-Type': 'application/json' }) + res.end(JSON.stringify({ error: message })) } /** @@ -16,6 +57,7 @@ interface Logger { */ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) { return async (req: IncomingMessage, res: ServerResponse) => { + // Health check doesn't require auth if (req.method === 'GET' && req.url === '/health') { try { const connections = await roomManager.getTotalActiveConnections() @@ -35,87 +77,69 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) { return } + // All POST endpoints require internal API key authentication + if (req.method === 'POST') { + const authResult = checkInternalApiKey(req) + if (!authResult.success) { + res.writeHead(401, { 'Content-Type': 'application/json' }) + res.end(JSON.stringify({ error: authResult.error })) + return + } + } + // Handle workflow deletion notifications from the main API if (req.method === 'POST' && req.url === '/api/workflow-deleted') { - let body = '' - req.on('data', (chunk) => { - body += chunk.toString() - }) - req.on('end', async () => { - try { - const { workflowId } = JSON.parse(body) - await roomManager.handleWorkflowDeletion(workflowId) - res.writeHead(200, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ success: true })) - } catch (error) { - logger.error('Error handling workflow deletion notification:', error) - res.writeHead(500, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ error: 'Failed to process deletion notification' })) - } - }) + try { + const body = await readRequestBody(req) + const { workflowId } = JSON.parse(body) + await roomManager.handleWorkflowDeletion(workflowId) + sendSuccess(res) + } catch (error) { + logger.error('Error handling workflow deletion notification:', error) + sendError(res, 'Failed to process deletion notification') + } return } // Handle workflow update notifications from the main API if (req.method === 'POST' && req.url === '/api/workflow-updated') { - let body = '' - req.on('data', (chunk) => { - body += chunk.toString() - }) - req.on('end', async () => { - try { - const { workflowId } = JSON.parse(body) - await roomManager.handleWorkflowUpdate(workflowId) - res.writeHead(200, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ success: true })) - } catch (error) { - logger.error('Error handling workflow update notification:', error) - res.writeHead(500, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ error: 'Failed to process update notification' })) - } - }) + try { + const body = await readRequestBody(req) + const { workflowId } = JSON.parse(body) + await roomManager.handleWorkflowUpdate(workflowId) + sendSuccess(res) + } catch (error) { + logger.error('Error handling workflow update notification:', error) + sendError(res, 'Failed to process update notification') + } return } // Handle copilot workflow edit notifications from the main API if (req.method === 'POST' && req.url === '/api/copilot-workflow-edit') { - let body = '' - req.on('data', (chunk) => { - body += chunk.toString() - }) - req.on('end', async () => { - try { - const { workflowId, description } = JSON.parse(body) - await roomManager.handleCopilotWorkflowEdit(workflowId, description) - res.writeHead(200, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ success: true })) - } catch (error) { - logger.error('Error handling copilot workflow edit notification:', error) - res.writeHead(500, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ error: 'Failed to process copilot edit notification' })) - } - }) + try { + const body = await readRequestBody(req) + const { workflowId, description } = JSON.parse(body) + await roomManager.handleCopilotWorkflowEdit(workflowId, description) + sendSuccess(res) + } catch (error) { + logger.error('Error handling copilot workflow edit notification:', error) + sendError(res, 'Failed to process copilot edit notification') + } return } // Handle workflow revert notifications from the main API if (req.method === 'POST' && req.url === '/api/workflow-reverted') { - let body = '' - req.on('data', (chunk) => { - body += chunk.toString() - }) - req.on('end', async () => { - try { - const { workflowId, timestamp } = JSON.parse(body) - await roomManager.handleWorkflowRevert(workflowId, timestamp) - res.writeHead(200, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ success: true })) - } catch (error) { - logger.error('Error handling workflow revert notification:', error) - res.writeHead(500, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ error: 'Failed to process revert notification' })) - } - }) + try { + const body = await readRequestBody(req) + const { workflowId, timestamp } = JSON.parse(body) + await roomManager.handleWorkflowRevert(workflowId, timestamp) + sendSuccess(res) + } catch (error) { + logger.error('Error handling workflow revert notification:', error) + sendError(res, 'Failed to process revert notification') + } return } diff --git a/apps/sim/stores/operation-queue/store.ts b/apps/sim/stores/operation-queue/store.ts index b5a23d8d4..e9f3deedc 100644 --- a/apps/sim/stores/operation-queue/store.ts +++ b/apps/sim/stores/operation-queue/store.ts @@ -4,6 +4,19 @@ import type { OperationQueueState, QueuedOperation } from './types' const logger = createLogger('OperationQueue') +/** Timeout for subblock/variable operations before considering them failed */ +const SUBBLOCK_VARIABLE_TIMEOUT_MS = 15000 +/** Timeout for structural operations before considering them failed */ +const STRUCTURAL_TIMEOUT_MS = 5000 +/** Maximum retry attempts for subblock/variable operations */ +const SUBBLOCK_VARIABLE_MAX_RETRIES = 5 +/** Maximum retry attempts for structural operations */ +const STRUCTURAL_MAX_RETRIES = 3 +/** Maximum retry delay cap for subblock/variable operations */ +const SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS = 3000 +/** Base retry delay multiplier (1s, 2s, 3s for linear) */ +const RETRY_DELAY_BASE_MS = 1000 + const retryTimeouts = new Map() const operationTimeouts = new Map() @@ -200,14 +213,14 @@ export const useOperationQueueStore = create((set, get) => (operation.operation.operation === 'variable-update' && operation.operation.target === 'variable') - const maxRetries = isSubblockOrVariable ? 5 : 3 // 5 retries for text, 3 for structural + const maxRetries = isSubblockOrVariable ? SUBBLOCK_VARIABLE_MAX_RETRIES : STRUCTURAL_MAX_RETRIES if (operation.retryCount < maxRetries) { const newRetryCount = operation.retryCount + 1 // Faster retries for subblock/variable, exponential for structural const delay = isSubblockOrVariable - ? Math.min(1000 * newRetryCount, 3000) // 1s, 2s, 3s, 3s, 3s (cap at 3s) - : 2 ** newRetryCount * 1000 // 2s, 4s, 8s (exponential for structural) + ? Math.min(RETRY_DELAY_BASE_MS * newRetryCount, SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS) + : 2 ** newRetryCount * RETRY_DELAY_BASE_MS logger.warn( `Operation failed, retrying in ${delay}ms (attempt ${newRetryCount}/${maxRetries})`, @@ -309,7 +322,9 @@ export const useOperationQueueStore = create((set, get) => nextOperation.operation.target === 'subblock') || (nextOperation.operation.operation === 'variable-update' && nextOperation.operation.target === 'variable') - const timeoutDuration = isSubblockOrVariable ? 15000 : 5000 // 15s for text edits, 5s for structural ops + const timeoutDuration = isSubblockOrVariable + ? SUBBLOCK_VARIABLE_TIMEOUT_MS + : STRUCTURAL_TIMEOUT_MS const timeoutId = setTimeout(() => { logger.warn(`Operation timeout - no server response after ${timeoutDuration}ms`, {