cleanup code

This commit is contained in:
Vikhyath Mondreti
2026-01-29 19:56:36 -08:00
parent 175b72899c
commit 49a6197cd2
12 changed files with 189 additions and 98 deletions

View File

@@ -97,7 +97,10 @@ export async function POST(
const socketServerUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
await fetch(`${socketServerUrl}/api/workflow-reverted`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
body: JSON.stringify({ workflowId: id, timestamp: Date.now() }),
})
} catch (e) {

View File

@@ -361,7 +361,10 @@ export async function DELETE(
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
const socketResponse = await fetch(`${socketUrl}/api/workflow-deleted`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
body: JSON.stringify({ workflowId }),
})

View File

@@ -254,7 +254,10 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
const notifyResponse = await fetch(`${socketUrl}/api/workflow-updated`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
body: JSON.stringify({ workflowId }),
})

View File

@@ -322,6 +322,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
// Handle join workflow success - confirms room membership with presence list
socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => {
isRejoiningRef.current = false
// Ignore stale success responses from previous navigation
if (workflowId !== urlWorkflowIdRef.current) {
logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`)
return
}
setCurrentWorkflowId(workflowId)
setPresenceUsers(presenceUsers || [])
logger.info(`Successfully joined workflow room: ${workflowId}`, {
@@ -516,7 +521,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.info('Received workflow state from server')
if (workflowData?.state) {
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
try {
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
} catch (error) {
logger.error('Error rehydrating workflow state:', error)
}
}
})
@@ -598,10 +607,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const leaveWorkflow = useCallback(() => {
if (socket && currentWorkflowId) {
logger.info(`Leaving workflow: ${currentWorkflowId}`)
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
} catch {}
import('@/stores/operation-queue/store')
.then(({ useOperationQueueStore }) => {
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
})
.catch((error) => {
logger.warn('Failed to cancel operations for workflow:', error)
})
socket.emit('leave-workflow')
setCurrentWorkflowId(null)
setPresenceUsers([])

View File

@@ -9,6 +9,13 @@ import { getBaseUrl } from '@/lib/core/utils/urls'
const logger = createLogger('SocketIOConfig')
/** Socket.IO ping timeout - how long to wait for pong before considering connection dead */
const PING_TIMEOUT_MS = 60000
/** Socket.IO ping interval - how often to send ping packets */
const PING_INTERVAL_MS = 25000
/** Maximum HTTP buffer size for Socket.IO messages */
const MAX_HTTP_BUFFER_SIZE = 1e6
let adapterPubClient: RedisClientType | null = null
let adapterSubClient: RedisClientType | null = null
@@ -41,9 +48,9 @@ export async function createSocketIOServer(httpServer: HttpServer): Promise<Serv
},
transports: ['websocket', 'polling'],
allowEIO3: true,
pingTimeout: 60000,
pingInterval: 25000,
maxHttpBufferSize: 1e6,
pingTimeout: PING_TIMEOUT_MS,
pingInterval: PING_INTERVAL_MS,
maxHttpBufferSize: MAX_HTTP_BUFFER_SIZE,
cookie: {
name: 'io',
path: '/',
@@ -103,9 +110,9 @@ export async function createSocketIOServer(httpServer: HttpServer): Promise<Serv
logger.info('Socket.IO server configured with:', {
allowedOrigins: allowedOrigins.length,
transports: ['websocket', 'polling'],
pingTimeout: 60000,
pingInterval: 25000,
maxHttpBufferSize: 1e6,
pingTimeout: PING_TIMEOUT_MS,
pingInterval: PING_INTERVAL_MS,
maxHttpBufferSize: MAX_HTTP_BUFFER_SIZE,
cookieSecure: isProd,
corsCredentials: true,
redisAdapter: !!env.REDIS_URL,

View File

@@ -23,19 +23,28 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
socket.emit('error', {
type: 'NOT_JOINED',
message: 'Not joined to any workflow',
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
}
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
socket.emit('error', {
socket.emit('operation-forbidden', {
type: 'ROOM_NOT_FOUND',
message: 'Workflow room not found',
})
if (data?.operationId) {
socket.emit('operation-failed', {
operationId: data.operationId,
error: 'Workflow room not found',
})
}
return
}

View File

@@ -7,6 +7,9 @@ import type { IRoomManager } from '@/socket/rooms'
const logger = createLogger('SubblocksHandlers')
/** Debounce interval for coalescing rapid subblock updates before persisting */
const DEBOUNCE_INTERVAL_MS = 25
type PendingSubblock = {
latest: { blockId: string; subblockId: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
@@ -22,7 +25,7 @@ const pendingSubblockUpdates = new Map<string, PendingSubblock>()
* Removes the socket's operationIds from pending updates to prevent memory leaks.
*/
export function cleanupPendingSubblocksForSocket(socketId: string): void {
for (const [key, pending] of pendingSubblockUpdates.entries()) {
for (const [, pending] of pendingSubblockUpdates.entries()) {
// Remove this socket's operation entries
for (const [opId, sid] of pending.opToSocket.entries()) {
if (sid === socketId) {
@@ -52,6 +55,9 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'Session expired' })
}
return
}
@@ -79,7 +85,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
existing.timeout = setTimeout(async () => {
await flushSubblockUpdate(workflowId, existing, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}, 25)
}, DEBOUNCE_INTERVAL_MS)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
@@ -89,7 +95,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
await flushSubblockUpdate(workflowId, pending, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}
}, 25)
}, DEBOUNCE_INTERVAL_MS)
pendingSubblockUpdates.set(debouncedKey, {
latest: { blockId, subblockId, value, timestamp },
timeout,

View File

@@ -7,6 +7,9 @@ import type { IRoomManager } from '@/socket/rooms'
const logger = createLogger('VariablesHandlers')
/** Debounce interval for coalescing rapid variable updates before persisting */
const DEBOUNCE_INTERVAL_MS = 25
type PendingVariable = {
latest: { variableId: string; field: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
@@ -21,7 +24,7 @@ const pendingVariableUpdates = new Map<string, PendingVariable>()
* Removes the socket's operationIds from pending updates to prevent memory leaks.
*/
export function cleanupPendingVariablesForSocket(socketId: string): void {
for (const [key, pending] of pendingVariableUpdates.entries()) {
for (const [, pending] of pendingVariableUpdates.entries()) {
for (const [opId, sid] of pending.opToSocket.entries()) {
if (sid === socketId) {
pending.opToSocket.delete(opId)
@@ -48,6 +51,9 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'Session expired' })
}
return
}
@@ -74,7 +80,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
existing.timeout = setTimeout(async () => {
await flushVariableUpdate(workflowId, existing, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}, 25)
}, DEBOUNCE_INTERVAL_MS)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
@@ -84,7 +90,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
await flushVariableUpdate(workflowId, pending, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}
}, 25)
}, DEBOUNCE_INTERVAL_MS)
pendingVariableUpdates.set(debouncedKey, {
latest: { variableId, field, value, timestamp },
timeout,

View File

@@ -10,6 +10,9 @@ import { createHttpHandler } from '@/socket/routes/http'
const logger = createLogger('CollaborativeSocketServer')
/** Maximum time to wait for graceful shutdown before forcing exit */
const SHUTDOWN_TIMEOUT_MS = 10000
async function createRoomManager(io: SocketIOServer): Promise<IRoomManager> {
if (env.REDIS_URL) {
logger.info('Initializing Redis-backed RoomManager for multi-pod support')
@@ -108,7 +111,7 @@ async function main() {
setTimeout(() => {
logger.error('Forced shutdown after timeout')
process.exit(1)
}, 10000)
}, SHUTDOWN_TIMEOUT_MS)
}
process.on('SIGINT', shutdown)

View File

@@ -21,7 +21,7 @@ export interface AuthenticatedSocket extends Socket {
* Socket.IO authentication middleware.
* Handles both anonymous mode (DISABLE_AUTH=true) and normal token-based auth.
*/
export async function authenticateSocket(socket: AuthenticatedSocket, next: any) {
export async function authenticateSocket(socket: AuthenticatedSocket, next: (err?: Error) => void) {
try {
if (isAuthDisabled) {
socket.userId = ANONYMOUS_USER_ID

View File

@@ -1,11 +1,52 @@
import type { IncomingMessage, ServerResponse } from 'http'
import { env } from '@/lib/core/config/env'
import type { IRoomManager } from '@/socket/rooms'
interface Logger {
info: (message: string, ...args: any[]) => void
error: (message: string, ...args: any[]) => void
debug: (message: string, ...args: any[]) => void
warn: (message: string, ...args: any[]) => void
info: (message: string, ...args: unknown[]) => void
error: (message: string, ...args: unknown[]) => void
debug: (message: string, ...args: unknown[]) => void
warn: (message: string, ...args: unknown[]) => void
}
function checkInternalApiKey(req: IncomingMessage): { success: boolean; error?: string } {
const apiKey = req.headers['x-api-key']
const expectedApiKey = env.INTERNAL_API_SECRET
if (!expectedApiKey) {
return { success: false, error: 'Internal API key not configured' }
}
if (!apiKey) {
return { success: false, error: 'API key required' }
}
if (apiKey !== expectedApiKey) {
return { success: false, error: 'Invalid API key' }
}
return { success: true }
}
function readRequestBody(req: IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => resolve(body))
req.on('error', reject)
})
}
function sendSuccess(res: ServerResponse): void {
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
}
function sendError(res: ServerResponse, message: string, status = 500): void {
res.writeHead(status, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: message }))
}
/**
@@ -16,6 +57,7 @@ interface Logger {
*/
export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
return async (req: IncomingMessage, res: ServerResponse) => {
// Health check doesn't require auth
if (req.method === 'GET' && req.url === '/health') {
try {
const connections = await roomManager.getTotalActiveConnections()
@@ -35,87 +77,69 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
return
}
// All POST endpoints require internal API key authentication
if (req.method === 'POST') {
const authResult = checkInternalApiKey(req)
if (!authResult.success) {
res.writeHead(401, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: authResult.error }))
return
}
}
// Handle workflow deletion notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-deleted') {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', async () => {
try {
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowDeletion(workflowId)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow deletion notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process deletion notification' }))
}
})
try {
const body = await readRequestBody(req)
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowDeletion(workflowId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow deletion notification:', error)
sendError(res, 'Failed to process deletion notification')
}
return
}
// Handle workflow update notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-updated') {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', async () => {
try {
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowUpdate(workflowId)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow update notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process update notification' }))
}
})
try {
const body = await readRequestBody(req)
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowUpdate(workflowId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow update notification:', error)
sendError(res, 'Failed to process update notification')
}
return
}
// Handle copilot workflow edit notifications from the main API
if (req.method === 'POST' && req.url === '/api/copilot-workflow-edit') {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', async () => {
try {
const { workflowId, description } = JSON.parse(body)
await roomManager.handleCopilotWorkflowEdit(workflowId, description)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling copilot workflow edit notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process copilot edit notification' }))
}
})
try {
const body = await readRequestBody(req)
const { workflowId, description } = JSON.parse(body)
await roomManager.handleCopilotWorkflowEdit(workflowId, description)
sendSuccess(res)
} catch (error) {
logger.error('Error handling copilot workflow edit notification:', error)
sendError(res, 'Failed to process copilot edit notification')
}
return
}
// Handle workflow revert notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-reverted') {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', async () => {
try {
const { workflowId, timestamp } = JSON.parse(body)
await roomManager.handleWorkflowRevert(workflowId, timestamp)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow revert notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process revert notification' }))
}
})
try {
const body = await readRequestBody(req)
const { workflowId, timestamp } = JSON.parse(body)
await roomManager.handleWorkflowRevert(workflowId, timestamp)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow revert notification:', error)
sendError(res, 'Failed to process revert notification')
}
return
}

View File

@@ -4,6 +4,19 @@ import type { OperationQueueState, QueuedOperation } from './types'
const logger = createLogger('OperationQueue')
/** Timeout for subblock/variable operations before considering them failed */
const SUBBLOCK_VARIABLE_TIMEOUT_MS = 15000
/** Timeout for structural operations before considering them failed */
const STRUCTURAL_TIMEOUT_MS = 5000
/** Maximum retry attempts for subblock/variable operations */
const SUBBLOCK_VARIABLE_MAX_RETRIES = 5
/** Maximum retry attempts for structural operations */
const STRUCTURAL_MAX_RETRIES = 3
/** Maximum retry delay cap for subblock/variable operations */
const SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS = 3000
/** Base retry delay multiplier (1s, 2s, 3s for linear) */
const RETRY_DELAY_BASE_MS = 1000
const retryTimeouts = new Map<string, NodeJS.Timeout>()
const operationTimeouts = new Map<string, NodeJS.Timeout>()
@@ -200,14 +213,14 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
(operation.operation.operation === 'variable-update' &&
operation.operation.target === 'variable')
const maxRetries = isSubblockOrVariable ? 5 : 3 // 5 retries for text, 3 for structural
const maxRetries = isSubblockOrVariable ? SUBBLOCK_VARIABLE_MAX_RETRIES : STRUCTURAL_MAX_RETRIES
if (operation.retryCount < maxRetries) {
const newRetryCount = operation.retryCount + 1
// Faster retries for subblock/variable, exponential for structural
const delay = isSubblockOrVariable
? Math.min(1000 * newRetryCount, 3000) // 1s, 2s, 3s, 3s, 3s (cap at 3s)
: 2 ** newRetryCount * 1000 // 2s, 4s, 8s (exponential for structural)
? Math.min(RETRY_DELAY_BASE_MS * newRetryCount, SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS)
: 2 ** newRetryCount * RETRY_DELAY_BASE_MS
logger.warn(
`Operation failed, retrying in ${delay}ms (attempt ${newRetryCount}/${maxRetries})`,
@@ -309,7 +322,9 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
nextOperation.operation.target === 'subblock') ||
(nextOperation.operation.operation === 'variable-update' &&
nextOperation.operation.target === 'variable')
const timeoutDuration = isSubblockOrVariable ? 15000 : 5000 // 15s for text edits, 5s for structural ops
const timeoutDuration = isSubblockOrVariable
? SUBBLOCK_VARIABLE_TIMEOUT_MS
: STRUCTURAL_TIMEOUT_MS
const timeoutId = setTimeout(() => {
logger.warn(`Operation timeout - no server response after ${timeoutDuration}ms`, {