mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-04 19:55:08 -05:00
Compare commits
28 Commits
improvemen
...
feat/timeo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc2f9853c5 | ||
|
|
eff4fc922b | ||
|
|
e63a642f11 | ||
|
|
cb63e98dc8 | ||
|
|
7f23b9057c | ||
|
|
7b75f156b4 | ||
|
|
7d28f62398 | ||
|
|
b890120afb | ||
|
|
c519034c8d | ||
|
|
17f02f8ed2 | ||
|
|
32a571a22a | ||
|
|
593bda7d0b | ||
|
|
5565677f7c | ||
|
|
06ddd80ab4 | ||
|
|
fe27adfb7c | ||
|
|
ee06ee34f6 | ||
|
|
39d75892a3 | ||
|
|
424b6e6a61 | ||
|
|
066850b65a | ||
|
|
c332efd1e4 | ||
|
|
d2e4afd15b | ||
|
|
bbf5c66abd | ||
|
|
f104659330 | ||
|
|
eac163cfd0 | ||
|
|
b53ed5dae0 | ||
|
|
d7259e304a | ||
|
|
501b44e05a | ||
|
|
7c1e7273de |
@@ -220,9 +220,9 @@ Workflows have maximum execution time limits based on your subscription plan:
|
||||
| Plan | Sync Execution | Async Execution |
|
||||
|------|----------------|-----------------|
|
||||
| **Free** | 5 minutes | 10 minutes |
|
||||
| **Pro** | 50 minutes | 90 minutes |
|
||||
| **Team** | 50 minutes | 90 minutes |
|
||||
| **Enterprise** | 50 minutes | 90 minutes |
|
||||
| **Pro** | 60 minutes | 90 minutes |
|
||||
| **Team** | 60 minutes | 90 minutes |
|
||||
| **Enterprise** | 60 minutes | 90 minutes |
|
||||
|
||||
**Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI.
|
||||
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are up to 2x the sync limit, capped at 90 minutes.
|
||||
|
||||
@@ -15,7 +15,7 @@ import type { PlanFeature } from '@/app/workspace/[workspaceId]/w/components/sid
|
||||
export const PRO_PLAN_FEATURES: PlanFeature[] = [
|
||||
{ icon: Zap, text: '150 runs per minute (sync)' },
|
||||
{ icon: Clock, text: '1,000 runs per minute (async)' },
|
||||
{ icon: Timer, text: '50 min sync execution limit' },
|
||||
{ icon: Timer, text: '60 min sync execution limit' },
|
||||
{ icon: HardDrive, text: '50GB file storage' },
|
||||
{ icon: Users, text: 'Unlimited invites' },
|
||||
{ icon: Database, text: 'Unlimited log retention' },
|
||||
@@ -24,7 +24,7 @@ export const PRO_PLAN_FEATURES: PlanFeature[] = [
|
||||
export const TEAM_PLAN_FEATURES: PlanFeature[] = [
|
||||
{ icon: Zap, text: '300 runs per minute (sync)' },
|
||||
{ icon: Clock, text: '2,500 runs per minute (async)' },
|
||||
{ icon: Timer, text: '50 min sync execution limit' },
|
||||
{ icon: Timer, text: '60 min sync execution limit' },
|
||||
{ icon: HardDrive, text: '500GB file storage (pooled)' },
|
||||
{ icon: Users, text: 'Unlimited invites' },
|
||||
{ icon: Database, text: 'Unlimited log retention' },
|
||||
|
||||
@@ -14,7 +14,6 @@ import { createLogger } from '@sim/logger'
|
||||
import { useParams } from 'next/navigation'
|
||||
import { io, type Socket } from 'socket.io-client'
|
||||
import { getEnv } from '@/lib/core/config/env'
|
||||
import { useOperationQueueStore } from '@/stores/operation-queue/store'
|
||||
|
||||
const logger = createLogger('SocketContext')
|
||||
|
||||
@@ -139,7 +138,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
const [authFailed, setAuthFailed] = useState(false)
|
||||
const initializedRef = useRef(false)
|
||||
const socketRef = useRef<Socket | null>(null)
|
||||
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
|
||||
|
||||
const params = useParams()
|
||||
const urlWorkflowId = params?.workflowId as string | undefined
|
||||
@@ -343,12 +341,9 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
})
|
||||
})
|
||||
|
||||
socketInstance.on('join-workflow-error', ({ error, code }) => {
|
||||
socketInstance.on('join-workflow-error', ({ error }) => {
|
||||
isRejoiningRef.current = false
|
||||
logger.error('Failed to join workflow:', { error, code })
|
||||
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
|
||||
triggerOfflineMode()
|
||||
}
|
||||
logger.error('Failed to join workflow:', error)
|
||||
})
|
||||
|
||||
socketInstance.on('workflow-operation', (data) => {
|
||||
|
||||
@@ -171,9 +171,9 @@ export const env = createEnv({
|
||||
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
|
||||
|
||||
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'),
|
||||
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3000'),
|
||||
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3000'),
|
||||
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3000'),
|
||||
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'),
|
||||
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'),
|
||||
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'),
|
||||
|
||||
// Knowledge Base Processing Configuration - Shared across all processing methods
|
||||
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
|
||||
interface ExecutionTimeoutConfig {
|
||||
@@ -9,9 +8,9 @@ interface ExecutionTimeoutConfig {
|
||||
|
||||
const DEFAULT_SYNC_TIMEOUTS_SECONDS = {
|
||||
free: 300,
|
||||
pro: 3000,
|
||||
team: 3000,
|
||||
enterprise: 3000,
|
||||
pro: 3600,
|
||||
team: 3600,
|
||||
enterprise: 3600,
|
||||
} as const
|
||||
|
||||
const ASYNC_MULTIPLIER = 2
|
||||
@@ -57,9 +56,6 @@ export function getExecutionTimeout(
|
||||
plan: SubscriptionPlan | undefined,
|
||||
type: 'sync' | 'async' = 'sync'
|
||||
): number {
|
||||
if (!isBillingEnabled) {
|
||||
return EXECUTION_TIMEOUTS.enterprise[type]
|
||||
}
|
||||
return EXECUTION_TIMEOUTS[plan || 'free'][type]
|
||||
}
|
||||
|
||||
@@ -67,9 +63,7 @@ export function getMaxExecutionTimeout(): number {
|
||||
return EXECUTION_TIMEOUTS.enterprise.async
|
||||
}
|
||||
|
||||
export const DEFAULT_EXECUTION_TIMEOUT_MS = isBillingEnabled
|
||||
? EXECUTION_TIMEOUTS.free.sync
|
||||
: EXECUTION_TIMEOUTS.enterprise.sync
|
||||
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
|
||||
|
||||
export function isTimeoutError(error: unknown): boolean {
|
||||
if (!error) return false
|
||||
|
||||
@@ -12,70 +12,39 @@ import {
|
||||
import { persistWorkflowOperation } from '@/socket/database/operations'
|
||||
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
|
||||
import { checkRolePermission } from '@/socket/middleware/permissions'
|
||||
import type { IRoomManager, UserSession } from '@/socket/rooms'
|
||||
import type { IRoomManager } from '@/socket/rooms'
|
||||
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
|
||||
|
||||
const logger = createLogger('OperationsHandlers')
|
||||
|
||||
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
|
||||
socket.on('workflow-operation', async (data) => {
|
||||
const emitOperationError = (
|
||||
forbidden: { type: string; message: string; operation?: string; target?: string },
|
||||
failed?: { error: string; retryable?: boolean }
|
||||
) => {
|
||||
socket.emit('operation-forbidden', forbidden)
|
||||
if (failed && data?.operationId) {
|
||||
socket.emit('operation-failed', { operationId: data.operationId, ...failed })
|
||||
}
|
||||
}
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
||||
{ error: 'Realtime unavailable', retryable: true }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
let workflowId: string | null = null
|
||||
let session: UserSession | null = null
|
||||
|
||||
try {
|
||||
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
session = await roomManager.getUserSession(socket.id)
|
||||
} catch (error) {
|
||||
logger.error('Error loading session for workflow operation:', error)
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
||||
{ error: 'Realtime unavailable', retryable: true }
|
||||
)
|
||||
return
|
||||
}
|
||||
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
if (!workflowId || !session) {
|
||||
emitOperationError(
|
||||
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' },
|
||||
{ error: 'Session expired' }
|
||||
)
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'SESSION_ERROR',
|
||||
message: 'Session expired, please rejoin workflow',
|
||||
})
|
||||
if (data?.operationId) {
|
||||
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
let hasRoom = false
|
||||
try {
|
||||
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
|
||||
} catch (error) {
|
||||
logger.error('Error checking workflow room:', error)
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
||||
{ error: 'Realtime unavailable', retryable: true }
|
||||
)
|
||||
return
|
||||
}
|
||||
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
|
||||
if (!hasRoom) {
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' },
|
||||
{ error: 'Workflow room not found' }
|
||||
)
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'ROOM_NOT_FOUND',
|
||||
message: 'Workflow room not found',
|
||||
})
|
||||
if (data?.operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId: data.operationId,
|
||||
error: 'Workflow room not found',
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -108,15 +77,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
|
||||
// Check permissions from cached role for all other operations
|
||||
if (!userPresence) {
|
||||
logger.warn(`User presence not found for socket ${socket.id}`)
|
||||
emitOperationError(
|
||||
{
|
||||
type: 'SESSION_ERROR',
|
||||
message: 'User session not found',
|
||||
operation,
|
||||
target,
|
||||
},
|
||||
{ error: 'User session not found' }
|
||||
)
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'SESSION_ERROR',
|
||||
message: 'User session not found',
|
||||
operation,
|
||||
target,
|
||||
})
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', { operationId, error: 'User session not found' })
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -128,7 +97,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
|
||||
logger.warn(
|
||||
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
|
||||
)
|
||||
emitOperationError({
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'INSUFFICIENT_PERMISSIONS',
|
||||
message: `${permissionCheck.reason} on '${target}'`,
|
||||
operation,
|
||||
|
||||
@@ -48,21 +48,6 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
operationId,
|
||||
} = data
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'ROOM_MANAGER_UNAVAILABLE',
|
||||
message: 'Realtime unavailable',
|
||||
})
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId,
|
||||
error: 'Realtime unavailable',
|
||||
retryable: true,
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
@@ -37,21 +37,6 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
socket.on('variable-update', async (data) => {
|
||||
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'ROOM_MANAGER_UNAVAILABLE',
|
||||
message: 'Realtime unavailable',
|
||||
})
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId,
|
||||
error: 'Realtime unavailable',
|
||||
retryable: true,
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
@@ -20,15 +20,6 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
return
|
||||
}
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
logger.warn(`Join workflow rejected: Room manager unavailable`)
|
||||
socket.emit('join-workflow-error', {
|
||||
error: 'Realtime unavailable',
|
||||
code: 'ROOM_MANAGER_UNAVAILABLE',
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
|
||||
|
||||
// Verify workflow access
|
||||
@@ -137,20 +128,12 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
// Undo socket.join and room manager entry if any operation failed
|
||||
socket.leave(workflowId)
|
||||
await roomManager.removeUserFromRoom(socket.id)
|
||||
const isReady = roomManager.isReady()
|
||||
socket.emit('join-workflow-error', {
|
||||
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
|
||||
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
|
||||
})
|
||||
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
|
||||
}
|
||||
})
|
||||
|
||||
socket.on('leave-workflow', async () => {
|
||||
try {
|
||||
if (!roomManager.isReady()) {
|
||||
return
|
||||
}
|
||||
|
||||
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
|
||||
@@ -26,10 +26,6 @@ export class MemoryRoomManager implements IRoomManager {
|
||||
logger.info('MemoryRoomManager initialized (single-pod mode)')
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
return true
|
||||
}
|
||||
|
||||
async shutdown(): Promise<void> {
|
||||
this.workflowRooms.clear()
|
||||
this.socketToWorkflow.clear()
|
||||
|
||||
@@ -96,6 +96,17 @@ export class RedisRoomManager implements IRoomManager {
|
||||
this._io = io
|
||||
this.redis = createClient({
|
||||
url: redisUrl,
|
||||
socket: {
|
||||
reconnectStrategy: (retries) => {
|
||||
if (retries > 10) {
|
||||
logger.error('Redis reconnection failed after 10 attempts')
|
||||
return new Error('Redis reconnection failed')
|
||||
}
|
||||
const delay = Math.min(retries * 100, 3000)
|
||||
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
|
||||
return delay
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
this.redis.on('error', (err) => {
|
||||
@@ -111,21 +122,12 @@ export class RedisRoomManager implements IRoomManager {
|
||||
logger.info('Redis client ready')
|
||||
this.isConnected = true
|
||||
})
|
||||
|
||||
this.redis.on('end', () => {
|
||||
logger.warn('Redis client connection closed')
|
||||
this.isConnected = false
|
||||
})
|
||||
}
|
||||
|
||||
get io(): Server {
|
||||
return this._io
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
return this.isConnected
|
||||
}
|
||||
|
||||
async initialize(): Promise<void> {
|
||||
if (this.isConnected) return
|
||||
|
||||
|
||||
@@ -48,11 +48,6 @@ export interface IRoomManager {
|
||||
*/
|
||||
initialize(): Promise<void>
|
||||
|
||||
/**
|
||||
* Whether the room manager is ready to serve requests
|
||||
*/
|
||||
isReady(): boolean
|
||||
|
||||
/**
|
||||
* Clean shutdown
|
||||
*/
|
||||
|
||||
@@ -85,11 +85,6 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
|
||||
res.end(JSON.stringify({ error: authResult.error }))
|
||||
return
|
||||
}
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
sendError(res, 'Room manager unavailable', 503)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Handle workflow deletion notifications from the main API
|
||||
|
||||
Reference in New Issue
Block a user