Compare commits

..

4 Commits

Author SHA1 Message Date
Vikhyath Mondreti
49fcdc64e4 update env vars 2026-02-04 11:20:33 -08:00
Vikhyath Mondreti
76e7f57b8a improvement(timeouts): sync to 50 min, self-hosted maxed out 2026-02-04 11:07:02 -08:00
Vikhyath Mondreti
a627faabe7 feat(timeouts): execution timeout limits (#3120)
* feat(timeouts): execution timeout limits

* fix type issues

* add to docs

* update stale exec cleanup route

* update more callsites

* update tests

* address bugbot comments

* remove import expression

* support streaming and async paths'

* fix streaming path

* add hitl and workflow handler

* make sync path match

* consolidate

* timeout errors

* validation errors typed

* import order

* Merge staging into feat/timeout-lims

Resolved conflicts:
- stt/route.ts: Keep both execution timeout and security imports
- textract/parse/route.ts: Keep both execution timeout and validation imports
- use-workflow-execution.ts: Keep cancellation console entry from feature branch
- input-validation.ts: Remove server functions (moved to .server.ts in staging)
- tools/index.ts: Keep execution timeout, use .server import for security

* make run from block consistent

* revert console update change

* fix subflow errors

* clean up base 64 cache correctly

* update docs

* consolidate workflow execution and run from block hook code

* remove unused constant

* fix cleanup base64 sse

* fix run from block tracespan
2026-02-04 10:26:36 -08:00
Vikhyath Mondreti
f811594875 improvement(rooms): redis client closed should fail with indicator (#3115)
* improvement(rooms): redis client closed should fail fast

* bugbot comment

* consolidate
2026-02-03 23:48:46 -08:00
13 changed files with 159 additions and 58 deletions

View File

@@ -220,9 +220,9 @@ Workflows have maximum execution time limits based on your subscription plan:
| Plan | Sync Execution | Async Execution |
|------|----------------|-----------------|
| **Free** | 5 minutes | 10 minutes |
| **Pro** | 60 minutes | 90 minutes |
| **Team** | 60 minutes | 90 minutes |
| **Enterprise** | 60 minutes | 90 minutes |
| **Pro** | 50 minutes | 90 minutes |
| **Team** | 50 minutes | 90 minutes |
| **Enterprise** | 50 minutes | 90 minutes |
**Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI.
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are up to 2x the sync limit, capped at 90 minutes.

View File

@@ -15,7 +15,7 @@ import type { PlanFeature } from '@/app/workspace/[workspaceId]/w/components/sid
export const PRO_PLAN_FEATURES: PlanFeature[] = [
{ icon: Zap, text: '150 runs per minute (sync)' },
{ icon: Clock, text: '1,000 runs per minute (async)' },
{ icon: Timer, text: '60 min sync execution limit' },
{ icon: Timer, text: '50 min sync execution limit' },
{ icon: HardDrive, text: '50GB file storage' },
{ icon: Users, text: 'Unlimited invites' },
{ icon: Database, text: 'Unlimited log retention' },
@@ -24,7 +24,7 @@ export const PRO_PLAN_FEATURES: PlanFeature[] = [
export const TEAM_PLAN_FEATURES: PlanFeature[] = [
{ icon: Zap, text: '300 runs per minute (sync)' },
{ icon: Clock, text: '2,500 runs per minute (async)' },
{ icon: Timer, text: '60 min sync execution limit' },
{ icon: Timer, text: '50 min sync execution limit' },
{ icon: HardDrive, text: '500GB file storage (pooled)' },
{ icon: Users, text: 'Unlimited invites' },
{ icon: Database, text: 'Unlimited log retention' },

View File

@@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger'
import { useParams } from 'next/navigation'
import { io, type Socket } from 'socket.io-client'
import { getEnv } from '@/lib/core/config/env'
import { useOperationQueueStore } from '@/stores/operation-queue/store'
const logger = createLogger('SocketContext')
@@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [authFailed, setAuthFailed] = useState(false)
const initializedRef = useRef(false)
const socketRef = useRef<Socket | null>(null)
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined
@@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
})
socketInstance.on('join-workflow-error', ({ error }) => {
socketInstance.on('join-workflow-error', ({ error, code }) => {
isRejoiningRef.current = false
logger.error('Failed to join workflow:', error)
logger.error('Failed to join workflow:', { error, code })
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
triggerOfflineMode()
}
})
socketInstance.on('workflow-operation', (data) => {

View File

@@ -171,9 +171,9 @@ export const env = createEnv({
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'),
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'),
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'),
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'),
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3000'),
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3000'),
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3000'),
// Knowledge Base Processing Configuration - Shared across all processing methods
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)

View File

@@ -1,4 +1,5 @@
import { env } from '@/lib/core/config/env'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
interface ExecutionTimeoutConfig {
@@ -8,9 +9,9 @@ interface ExecutionTimeoutConfig {
const DEFAULT_SYNC_TIMEOUTS_SECONDS = {
free: 300,
pro: 3600,
team: 3600,
enterprise: 3600,
pro: 3000,
team: 3000,
enterprise: 3000,
} as const
const ASYNC_MULTIPLIER = 2
@@ -56,6 +57,9 @@ export function getExecutionTimeout(
plan: SubscriptionPlan | undefined,
type: 'sync' | 'async' = 'sync'
): number {
if (!isBillingEnabled) {
return EXECUTION_TIMEOUTS.enterprise[type]
}
return EXECUTION_TIMEOUTS[plan || 'free'][type]
}
@@ -63,7 +67,9 @@ export function getMaxExecutionTimeout(): number {
return EXECUTION_TIMEOUTS.enterprise.async
}
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
export const DEFAULT_EXECUTION_TIMEOUT_MS = isBillingEnabled
? EXECUTION_TIMEOUTS.free.sync
: EXECUTION_TIMEOUTS.enterprise.sync
export function isTimeoutError(error: unknown): boolean {
if (!error) return false

View File

@@ -12,39 +12,70 @@ import {
import { persistWorkflowOperation } from '@/socket/database/operations'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { checkRolePermission } from '@/socket/middleware/permissions'
import type { IRoomManager } from '@/socket/rooms'
import type { IRoomManager, UserSession } from '@/socket/rooms'
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
const logger = createLogger('OperationsHandlers')
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('workflow-operation', async (data) => {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
const emitOperationError = (
forbidden: { type: string; message: string; operation?: string; target?: string },
failed?: { error: string; retryable?: boolean }
) => {
socket.emit('operation-forbidden', forbidden)
if (failed && data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, ...failed })
}
}
if (!roomManager.isReady()) {
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
let workflowId: string | null = null
let session: UserSession | null = null
try {
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
session = await roomManager.getUserSession(socket.id)
} catch (error) {
logger.error('Error loading session for workflow operation:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
if (!workflowId || !session) {
emitOperationError(
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' },
{ error: 'Session expired' }
)
return
}
let hasRoom = false
try {
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
} catch (error) {
logger.error('Error checking workflow room:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
if (!hasRoom) {
socket.emit('operation-forbidden', {
type: 'ROOM_NOT_FOUND',
message: 'Workflow room not found',
})
if (data?.operationId) {
socket.emit('operation-failed', {
operationId: data.operationId,
error: 'Workflow room not found',
})
}
emitOperationError(
{ type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' },
{ error: 'Workflow room not found' }
)
return
}
@@ -77,15 +108,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
// Check permissions from cached role for all other operations
if (!userPresence) {
logger.warn(`User presence not found for socket ${socket.id}`)
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'User session not found',
operation,
target,
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'User session not found' })
}
emitOperationError(
{
type: 'SESSION_ERROR',
message: 'User session not found',
operation,
target,
},
{ error: 'User session not found' }
)
return
}
@@ -97,7 +128,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
logger.warn(
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
)
socket.emit('operation-forbidden', {
emitOperationError({
type: 'INSUFFICIENT_PERMISSIONS',
message: `${permissionCheck.reason} on '${target}'`,
operation,

View File

@@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
operationId,
} = data
if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}
try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)

View File

@@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
socket.on('variable-update', async (data) => {
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}
try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)

View File

@@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
return
}
if (!roomManager.isReady()) {
logger.warn(`Join workflow rejected: Room manager unavailable`)
socket.emit('join-workflow-error', {
error: 'Realtime unavailable',
code: 'ROOM_MANAGER_UNAVAILABLE',
})
return
}
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
// Verify workflow access
@@ -128,12 +137,20 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
// Undo socket.join and room manager entry if any operation failed
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
const isReady = roomManager.isReady()
socket.emit('join-workflow-error', {
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
})
}
})
socket.on('leave-workflow', async () => {
try {
if (!roomManager.isReady()) {
return
}
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)

View File

@@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager {
logger.info('MemoryRoomManager initialized (single-pod mode)')
}
isReady(): boolean {
return true
}
async shutdown(): Promise<void> {
this.workflowRooms.clear()
this.socketToWorkflow.clear()

View File

@@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager {
this._io = io
this.redis = createClient({
url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
if (retries > 10) {
logger.error('Redis reconnection failed after 10 attempts')
return new Error('Redis reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
})
this.redis.on('error', (err) => {
@@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager {
logger.info('Redis client ready')
this.isConnected = true
})
this.redis.on('end', () => {
logger.warn('Redis client connection closed')
this.isConnected = false
})
}
get io(): Server {
return this._io
}
isReady(): boolean {
return this.isConnected
}
async initialize(): Promise<void> {
if (this.isConnected) return

View File

@@ -48,6 +48,11 @@ export interface IRoomManager {
*/
initialize(): Promise<void>
/**
* Whether the room manager is ready to serve requests
*/
isReady(): boolean
/**
* Clean shutdown
*/

View File

@@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
res.end(JSON.stringify({ error: authResult.error }))
return
}
if (!roomManager.isReady()) {
sendError(res, 'Room manager unavailable', 503)
return
}
}
// Handle workflow deletion notifications from the main API