Compare commits

..

1 Commits

Author SHA1 Message Date
Vikhyath Mondreti
4df319c321 fix(cleanup-cron): stale execution cleanup integer overflow 2026-02-02 01:18:45 -08:00
10 changed files with 48 additions and 141 deletions

View File

@@ -8,6 +8,7 @@ import { verifyCronAuth } from '@/lib/auth/internal'
const logger = createLogger('CleanupStaleExecutions') const logger = createLogger('CleanupStaleExecutions')
const STALE_THRESHOLD_MINUTES = 30 const STALE_THRESHOLD_MINUTES = 30
const MAX_INT32 = 2_147_483_647
export async function GET(request: NextRequest) { export async function GET(request: NextRequest) {
try { try {
@@ -45,13 +46,14 @@ export async function GET(request: NextRequest) {
try { try {
const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime() const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime()
const staleDurationMinutes = Math.round(staleDurationMs / 60000) const staleDurationMinutes = Math.round(staleDurationMs / 60000)
const totalDurationMs = Math.min(staleDurationMs, MAX_INT32)
await db await db
.update(workflowExecutionLogs) .update(workflowExecutionLogs)
.set({ .set({
status: 'failed', status: 'failed',
endedAt: new Date(), endedAt: new Date(),
totalDurationMs: staleDurationMs, totalDurationMs,
executionData: sql`jsonb_set( executionData: sql`jsonb_set(
COALESCE(execution_data, '{}'::jsonb), COALESCE(execution_data, '{}'::jsonb),
ARRAY['error'], ARRAY['error'],

View File

@@ -14,7 +14,6 @@ import { createLogger } from '@sim/logger'
import { useParams } from 'next/navigation' import { useParams } from 'next/navigation'
import { io, type Socket } from 'socket.io-client' import { io, type Socket } from 'socket.io-client'
import { getEnv } from '@/lib/core/config/env' import { getEnv } from '@/lib/core/config/env'
import { useOperationQueueStore } from '@/stores/operation-queue/store'
const logger = createLogger('SocketContext') const logger = createLogger('SocketContext')
@@ -139,7 +138,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [authFailed, setAuthFailed] = useState(false) const [authFailed, setAuthFailed] = useState(false)
const initializedRef = useRef(false) const initializedRef = useRef(false)
const socketRef = useRef<Socket | null>(null) const socketRef = useRef<Socket | null>(null)
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
const params = useParams() const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined const urlWorkflowId = params?.workflowId as string | undefined
@@ -343,12 +341,9 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}) })
}) })
socketInstance.on('join-workflow-error', ({ error, code }) => { socketInstance.on('join-workflow-error', ({ error }) => {
isRejoiningRef.current = false isRejoiningRef.current = false
logger.error('Failed to join workflow:', { error, code }) logger.error('Failed to join workflow:', error)
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
triggerOfflineMode()
}
}) })
socketInstance.on('workflow-operation', (data) => { socketInstance.on('workflow-operation', (data) => {

View File

@@ -12,70 +12,39 @@ import {
import { persistWorkflowOperation } from '@/socket/database/operations' import { persistWorkflowOperation } from '@/socket/database/operations'
import type { AuthenticatedSocket } from '@/socket/middleware/auth' import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { checkRolePermission } from '@/socket/middleware/permissions' import { checkRolePermission } from '@/socket/middleware/permissions'
import type { IRoomManager, UserSession } from '@/socket/rooms' import type { IRoomManager } from '@/socket/rooms'
import { WorkflowOperationSchema } from '@/socket/validation/schemas' import { WorkflowOperationSchema } from '@/socket/validation/schemas'
const logger = createLogger('OperationsHandlers') const logger = createLogger('OperationsHandlers')
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('workflow-operation', async (data) => { socket.on('workflow-operation', async (data) => {
const emitOperationError = ( const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
forbidden: { type: string; message: string; operation?: string; target?: string }, const session = await roomManager.getUserSession(socket.id)
failed?: { error: string; retryable?: boolean }
) => {
socket.emit('operation-forbidden', forbidden)
if (failed && data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, ...failed })
}
}
if (!roomManager.isReady()) {
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
let workflowId: string | null = null
let session: UserSession | null = null
try {
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
session = await roomManager.getUserSession(socket.id)
} catch (error) {
logger.error('Error loading session for workflow operation:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
if (!workflowId || !session) { if (!workflowId || !session) {
emitOperationError( socket.emit('operation-forbidden', {
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' }, type: 'SESSION_ERROR',
{ error: 'Session expired' } message: 'Session expired, please rejoin workflow',
) })
if (data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
}
return return
} }
let hasRoom = false const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
try {
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
} catch (error) {
logger.error('Error checking workflow room:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
if (!hasRoom) { if (!hasRoom) {
emitOperationError( socket.emit('operation-forbidden', {
{ type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' }, type: 'ROOM_NOT_FOUND',
{ error: 'Workflow room not found' } message: 'Workflow room not found',
) })
if (data?.operationId) {
socket.emit('operation-failed', {
operationId: data.operationId,
error: 'Workflow room not found',
})
}
return return
} }
@@ -108,15 +77,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
// Check permissions from cached role for all other operations // Check permissions from cached role for all other operations
if (!userPresence) { if (!userPresence) {
logger.warn(`User presence not found for socket ${socket.id}`) logger.warn(`User presence not found for socket ${socket.id}`)
emitOperationError( socket.emit('operation-forbidden', {
{ type: 'SESSION_ERROR',
type: 'SESSION_ERROR', message: 'User session not found',
message: 'User session not found', operation,
operation, target,
target, })
}, if (operationId) {
{ error: 'User session not found' } socket.emit('operation-failed', { operationId, error: 'User session not found' })
) }
return return
} }
@@ -128,7 +97,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
logger.warn( logger.warn(
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}` `User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
) )
emitOperationError({ socket.emit('operation-forbidden', {
type: 'INSUFFICIENT_PERMISSIONS', type: 'INSUFFICIENT_PERMISSIONS',
message: `${permissionCheck.reason} on '${target}'`, message: `${permissionCheck.reason} on '${target}'`,
operation, operation,

View File

@@ -48,21 +48,6 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
operationId, operationId,
} = data } = data
if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}
try { try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id) const session = await roomManager.getUserSession(socket.id)

View File

@@ -37,21 +37,6 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
socket.on('variable-update', async (data) => { socket.on('variable-update', async (data) => {
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}
try { try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id) const session = await roomManager.getUserSession(socket.id)

View File

@@ -20,15 +20,6 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
return return
} }
if (!roomManager.isReady()) {
logger.warn(`Join workflow rejected: Room manager unavailable`)
socket.emit('join-workflow-error', {
error: 'Realtime unavailable',
code: 'ROOM_MANAGER_UNAVAILABLE',
})
return
}
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`) logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
// Verify workflow access // Verify workflow access
@@ -137,20 +128,12 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
// Undo socket.join and room manager entry if any operation failed // Undo socket.join and room manager entry if any operation failed
socket.leave(workflowId) socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id) await roomManager.removeUserFromRoom(socket.id)
const isReady = roomManager.isReady() socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
socket.emit('join-workflow-error', {
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
})
} }
}) })
socket.on('leave-workflow', async () => { socket.on('leave-workflow', async () => {
try { try {
if (!roomManager.isReady()) {
return
}
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id) const session = await roomManager.getUserSession(socket.id)

View File

@@ -26,10 +26,6 @@ export class MemoryRoomManager implements IRoomManager {
logger.info('MemoryRoomManager initialized (single-pod mode)') logger.info('MemoryRoomManager initialized (single-pod mode)')
} }
isReady(): boolean {
return true
}
async shutdown(): Promise<void> { async shutdown(): Promise<void> {
this.workflowRooms.clear() this.workflowRooms.clear()
this.socketToWorkflow.clear() this.socketToWorkflow.clear()

View File

@@ -96,6 +96,17 @@ export class RedisRoomManager implements IRoomManager {
this._io = io this._io = io
this.redis = createClient({ this.redis = createClient({
url: redisUrl, url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
if (retries > 10) {
logger.error('Redis reconnection failed after 10 attempts')
return new Error('Redis reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
}) })
this.redis.on('error', (err) => { this.redis.on('error', (err) => {
@@ -111,21 +122,12 @@ export class RedisRoomManager implements IRoomManager {
logger.info('Redis client ready') logger.info('Redis client ready')
this.isConnected = true this.isConnected = true
}) })
this.redis.on('end', () => {
logger.warn('Redis client connection closed')
this.isConnected = false
})
} }
get io(): Server { get io(): Server {
return this._io return this._io
} }
isReady(): boolean {
return this.isConnected
}
async initialize(): Promise<void> { async initialize(): Promise<void> {
if (this.isConnected) return if (this.isConnected) return

View File

@@ -48,11 +48,6 @@ export interface IRoomManager {
*/ */
initialize(): Promise<void> initialize(): Promise<void>
/**
* Whether the room manager is ready to serve requests
*/
isReady(): boolean
/** /**
* Clean shutdown * Clean shutdown
*/ */

View File

@@ -85,11 +85,6 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
res.end(JSON.stringify({ error: authResult.error })) res.end(JSON.stringify({ error: authResult.error }))
return return
} }
if (!roomManager.isReady()) {
sendError(res, 'Room manager unavailable', 503)
return
}
} }
// Handle workflow deletion notifications from the main API // Handle workflow deletion notifications from the main API