mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-03 03:04:57 -05:00
Compare commits
1 Commits
improvemen
...
fix/stale-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4df319c321 |
@@ -8,6 +8,7 @@ import { verifyCronAuth } from '@/lib/auth/internal'
|
|||||||
const logger = createLogger('CleanupStaleExecutions')
|
const logger = createLogger('CleanupStaleExecutions')
|
||||||
|
|
||||||
const STALE_THRESHOLD_MINUTES = 30
|
const STALE_THRESHOLD_MINUTES = 30
|
||||||
|
const MAX_INT32 = 2_147_483_647
|
||||||
|
|
||||||
export async function GET(request: NextRequest) {
|
export async function GET(request: NextRequest) {
|
||||||
try {
|
try {
|
||||||
@@ -45,13 +46,14 @@ export async function GET(request: NextRequest) {
|
|||||||
try {
|
try {
|
||||||
const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime()
|
const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime()
|
||||||
const staleDurationMinutes = Math.round(staleDurationMs / 60000)
|
const staleDurationMinutes = Math.round(staleDurationMs / 60000)
|
||||||
|
const totalDurationMs = Math.min(staleDurationMs, MAX_INT32)
|
||||||
|
|
||||||
await db
|
await db
|
||||||
.update(workflowExecutionLogs)
|
.update(workflowExecutionLogs)
|
||||||
.set({
|
.set({
|
||||||
status: 'failed',
|
status: 'failed',
|
||||||
endedAt: new Date(),
|
endedAt: new Date(),
|
||||||
totalDurationMs: staleDurationMs,
|
totalDurationMs,
|
||||||
executionData: sql`jsonb_set(
|
executionData: sql`jsonb_set(
|
||||||
COALESCE(execution_data, '{}'::jsonb),
|
COALESCE(execution_data, '{}'::jsonb),
|
||||||
ARRAY['error'],
|
ARRAY['error'],
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ import { createLogger } from '@sim/logger'
|
|||||||
import { useParams } from 'next/navigation'
|
import { useParams } from 'next/navigation'
|
||||||
import { io, type Socket } from 'socket.io-client'
|
import { io, type Socket } from 'socket.io-client'
|
||||||
import { getEnv } from '@/lib/core/config/env'
|
import { getEnv } from '@/lib/core/config/env'
|
||||||
import { useOperationQueueStore } from '@/stores/operation-queue/store'
|
|
||||||
|
|
||||||
const logger = createLogger('SocketContext')
|
const logger = createLogger('SocketContext')
|
||||||
|
|
||||||
@@ -139,7 +138,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
|||||||
const [authFailed, setAuthFailed] = useState(false)
|
const [authFailed, setAuthFailed] = useState(false)
|
||||||
const initializedRef = useRef(false)
|
const initializedRef = useRef(false)
|
||||||
const socketRef = useRef<Socket | null>(null)
|
const socketRef = useRef<Socket | null>(null)
|
||||||
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
|
|
||||||
|
|
||||||
const params = useParams()
|
const params = useParams()
|
||||||
const urlWorkflowId = params?.workflowId as string | undefined
|
const urlWorkflowId = params?.workflowId as string | undefined
|
||||||
@@ -343,12 +341,9 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
socketInstance.on('join-workflow-error', ({ error, code }) => {
|
socketInstance.on('join-workflow-error', ({ error }) => {
|
||||||
isRejoiningRef.current = false
|
isRejoiningRef.current = false
|
||||||
logger.error('Failed to join workflow:', { error, code })
|
logger.error('Failed to join workflow:', error)
|
||||||
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
|
|
||||||
triggerOfflineMode()
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
|
||||||
socketInstance.on('workflow-operation', (data) => {
|
socketInstance.on('workflow-operation', (data) => {
|
||||||
|
|||||||
@@ -12,70 +12,39 @@ import {
|
|||||||
import { persistWorkflowOperation } from '@/socket/database/operations'
|
import { persistWorkflowOperation } from '@/socket/database/operations'
|
||||||
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
|
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
|
||||||
import { checkRolePermission } from '@/socket/middleware/permissions'
|
import { checkRolePermission } from '@/socket/middleware/permissions'
|
||||||
import type { IRoomManager, UserSession } from '@/socket/rooms'
|
import type { IRoomManager } from '@/socket/rooms'
|
||||||
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
|
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
|
||||||
|
|
||||||
const logger = createLogger('OperationsHandlers')
|
const logger = createLogger('OperationsHandlers')
|
||||||
|
|
||||||
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
|
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
|
||||||
socket.on('workflow-operation', async (data) => {
|
socket.on('workflow-operation', async (data) => {
|
||||||
const emitOperationError = (
|
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||||
forbidden: { type: string; message: string; operation?: string; target?: string },
|
const session = await roomManager.getUserSession(socket.id)
|
||||||
failed?: { error: string; retryable?: boolean }
|
|
||||||
) => {
|
|
||||||
socket.emit('operation-forbidden', forbidden)
|
|
||||||
if (failed && data?.operationId) {
|
|
||||||
socket.emit('operation-failed', { operationId: data.operationId, ...failed })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!roomManager.isReady()) {
|
|
||||||
emitOperationError(
|
|
||||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
|
||||||
{ error: 'Realtime unavailable', retryable: true }
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
let workflowId: string | null = null
|
|
||||||
let session: UserSession | null = null
|
|
||||||
|
|
||||||
try {
|
|
||||||
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
|
||||||
session = await roomManager.getUserSession(socket.id)
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Error loading session for workflow operation:', error)
|
|
||||||
emitOperationError(
|
|
||||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
|
||||||
{ error: 'Realtime unavailable', retryable: true }
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!workflowId || !session) {
|
if (!workflowId || !session) {
|
||||||
emitOperationError(
|
socket.emit('operation-forbidden', {
|
||||||
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' },
|
type: 'SESSION_ERROR',
|
||||||
{ error: 'Session expired' }
|
message: 'Session expired, please rejoin workflow',
|
||||||
)
|
})
|
||||||
|
if (data?.operationId) {
|
||||||
|
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
let hasRoom = false
|
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
|
||||||
try {
|
|
||||||
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Error checking workflow room:', error)
|
|
||||||
emitOperationError(
|
|
||||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
|
||||||
{ error: 'Realtime unavailable', retryable: true }
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if (!hasRoom) {
|
if (!hasRoom) {
|
||||||
emitOperationError(
|
socket.emit('operation-forbidden', {
|
||||||
{ type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' },
|
type: 'ROOM_NOT_FOUND',
|
||||||
{ error: 'Workflow room not found' }
|
message: 'Workflow room not found',
|
||||||
)
|
})
|
||||||
|
if (data?.operationId) {
|
||||||
|
socket.emit('operation-failed', {
|
||||||
|
operationId: data.operationId,
|
||||||
|
error: 'Workflow room not found',
|
||||||
|
})
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,15 +77,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
|
|||||||
// Check permissions from cached role for all other operations
|
// Check permissions from cached role for all other operations
|
||||||
if (!userPresence) {
|
if (!userPresence) {
|
||||||
logger.warn(`User presence not found for socket ${socket.id}`)
|
logger.warn(`User presence not found for socket ${socket.id}`)
|
||||||
emitOperationError(
|
socket.emit('operation-forbidden', {
|
||||||
{
|
type: 'SESSION_ERROR',
|
||||||
type: 'SESSION_ERROR',
|
message: 'User session not found',
|
||||||
message: 'User session not found',
|
operation,
|
||||||
operation,
|
target,
|
||||||
target,
|
})
|
||||||
},
|
if (operationId) {
|
||||||
{ error: 'User session not found' }
|
socket.emit('operation-failed', { operationId, error: 'User session not found' })
|
||||||
)
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,7 +97,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
|
|||||||
logger.warn(
|
logger.warn(
|
||||||
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
|
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
|
||||||
)
|
)
|
||||||
emitOperationError({
|
socket.emit('operation-forbidden', {
|
||||||
type: 'INSUFFICIENT_PERMISSIONS',
|
type: 'INSUFFICIENT_PERMISSIONS',
|
||||||
message: `${permissionCheck.reason} on '${target}'`,
|
message: `${permissionCheck.reason} on '${target}'`,
|
||||||
operation,
|
operation,
|
||||||
|
|||||||
@@ -48,21 +48,6 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
|
|||||||
operationId,
|
operationId,
|
||||||
} = data
|
} = data
|
||||||
|
|
||||||
if (!roomManager.isReady()) {
|
|
||||||
socket.emit('operation-forbidden', {
|
|
||||||
type: 'ROOM_MANAGER_UNAVAILABLE',
|
|
||||||
message: 'Realtime unavailable',
|
|
||||||
})
|
|
||||||
if (operationId) {
|
|
||||||
socket.emit('operation-failed', {
|
|
||||||
operationId,
|
|
||||||
error: 'Realtime unavailable',
|
|
||||||
retryable: true,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||||
const session = await roomManager.getUserSession(socket.id)
|
const session = await roomManager.getUserSession(socket.id)
|
||||||
|
|||||||
@@ -37,21 +37,6 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
|
|||||||
socket.on('variable-update', async (data) => {
|
socket.on('variable-update', async (data) => {
|
||||||
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
|
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
|
||||||
|
|
||||||
if (!roomManager.isReady()) {
|
|
||||||
socket.emit('operation-forbidden', {
|
|
||||||
type: 'ROOM_MANAGER_UNAVAILABLE',
|
|
||||||
message: 'Realtime unavailable',
|
|
||||||
})
|
|
||||||
if (operationId) {
|
|
||||||
socket.emit('operation-failed', {
|
|
||||||
operationId,
|
|
||||||
error: 'Realtime unavailable',
|
|
||||||
retryable: true,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||||
const session = await roomManager.getUserSession(socket.id)
|
const session = await roomManager.getUserSession(socket.id)
|
||||||
|
|||||||
@@ -20,15 +20,6 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!roomManager.isReady()) {
|
|
||||||
logger.warn(`Join workflow rejected: Room manager unavailable`)
|
|
||||||
socket.emit('join-workflow-error', {
|
|
||||||
error: 'Realtime unavailable',
|
|
||||||
code: 'ROOM_MANAGER_UNAVAILABLE',
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
|
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
|
||||||
|
|
||||||
// Verify workflow access
|
// Verify workflow access
|
||||||
@@ -137,20 +128,12 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
|
|||||||
// Undo socket.join and room manager entry if any operation failed
|
// Undo socket.join and room manager entry if any operation failed
|
||||||
socket.leave(workflowId)
|
socket.leave(workflowId)
|
||||||
await roomManager.removeUserFromRoom(socket.id)
|
await roomManager.removeUserFromRoom(socket.id)
|
||||||
const isReady = roomManager.isReady()
|
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
|
||||||
socket.emit('join-workflow-error', {
|
|
||||||
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
|
|
||||||
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
socket.on('leave-workflow', async () => {
|
socket.on('leave-workflow', async () => {
|
||||||
try {
|
try {
|
||||||
if (!roomManager.isReady()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||||
const session = await roomManager.getUserSession(socket.id)
|
const session = await roomManager.getUserSession(socket.id)
|
||||||
|
|
||||||
|
|||||||
@@ -26,10 +26,6 @@ export class MemoryRoomManager implements IRoomManager {
|
|||||||
logger.info('MemoryRoomManager initialized (single-pod mode)')
|
logger.info('MemoryRoomManager initialized (single-pod mode)')
|
||||||
}
|
}
|
||||||
|
|
||||||
isReady(): boolean {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
async shutdown(): Promise<void> {
|
async shutdown(): Promise<void> {
|
||||||
this.workflowRooms.clear()
|
this.workflowRooms.clear()
|
||||||
this.socketToWorkflow.clear()
|
this.socketToWorkflow.clear()
|
||||||
|
|||||||
@@ -96,6 +96,17 @@ export class RedisRoomManager implements IRoomManager {
|
|||||||
this._io = io
|
this._io = io
|
||||||
this.redis = createClient({
|
this.redis = createClient({
|
||||||
url: redisUrl,
|
url: redisUrl,
|
||||||
|
socket: {
|
||||||
|
reconnectStrategy: (retries) => {
|
||||||
|
if (retries > 10) {
|
||||||
|
logger.error('Redis reconnection failed after 10 attempts')
|
||||||
|
return new Error('Redis reconnection failed')
|
||||||
|
}
|
||||||
|
const delay = Math.min(retries * 100, 3000)
|
||||||
|
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
|
||||||
|
return delay
|
||||||
|
},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
this.redis.on('error', (err) => {
|
this.redis.on('error', (err) => {
|
||||||
@@ -111,21 +122,12 @@ export class RedisRoomManager implements IRoomManager {
|
|||||||
logger.info('Redis client ready')
|
logger.info('Redis client ready')
|
||||||
this.isConnected = true
|
this.isConnected = true
|
||||||
})
|
})
|
||||||
|
|
||||||
this.redis.on('end', () => {
|
|
||||||
logger.warn('Redis client connection closed')
|
|
||||||
this.isConnected = false
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
get io(): Server {
|
get io(): Server {
|
||||||
return this._io
|
return this._io
|
||||||
}
|
}
|
||||||
|
|
||||||
isReady(): boolean {
|
|
||||||
return this.isConnected
|
|
||||||
}
|
|
||||||
|
|
||||||
async initialize(): Promise<void> {
|
async initialize(): Promise<void> {
|
||||||
if (this.isConnected) return
|
if (this.isConnected) return
|
||||||
|
|
||||||
|
|||||||
@@ -48,11 +48,6 @@ export interface IRoomManager {
|
|||||||
*/
|
*/
|
||||||
initialize(): Promise<void>
|
initialize(): Promise<void>
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether the room manager is ready to serve requests
|
|
||||||
*/
|
|
||||||
isReady(): boolean
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clean shutdown
|
* Clean shutdown
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -85,11 +85,6 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
|
|||||||
res.end(JSON.stringify({ error: authResult.error }))
|
res.end(JSON.stringify({ error: authResult.error }))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!roomManager.isReady()) {
|
|
||||||
sendError(res, 'Room manager unavailable', 503)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle workflow deletion notifications from the main API
|
// Handle workflow deletion notifications from the main API
|
||||||
|
|||||||
Reference in New Issue
Block a user