improvement(ratelimits, sockets): increase across all plans, reconnecting notif for sockets (#3096)

* improvement(rate-limits): increase across all plans

* improve sockets with reconnecting

* address bugbot comment

* fix typing
This commit is contained in:
Vikhyath Mondreti
2026-01-31 16:48:57 -08:00
committed by GitHub
parent 6cb3977dd9
commit e1ac201936
13 changed files with 366 additions and 257 deletions

View File

@@ -27,16 +27,16 @@ All API responses include information about your workflow execution limits and u
"limits": {
"workflowExecutionRateLimit": {
"sync": {
"requestsPerMinute": 60, // Sustained rate limit per minute
"maxBurst": 120, // Maximum burst capacity
"remaining": 118, // Current tokens available (up to maxBurst)
"resetAt": "..." // When tokens next refill
"requestsPerMinute": 150, // Sustained rate limit per minute
"maxBurst": 300, // Maximum burst capacity
"remaining": 298, // Current tokens available (up to maxBurst)
"resetAt": "..." // When tokens next refill
},
"async": {
"requestsPerMinute": 200, // Sustained rate limit per minute
"maxBurst": 400, // Maximum burst capacity
"remaining": 398, // Current tokens available
"resetAt": "..." // When tokens next refill
"requestsPerMinute": 1000, // Sustained rate limit per minute
"maxBurst": 2000, // Maximum burst capacity
"remaining": 1998, // Current tokens available
"resetAt": "..." // When tokens next refill
}
},
"usage": {
@@ -107,28 +107,28 @@ Query workflow execution logs with extensive filtering options.
}
],
"nextCursor": "eyJzIjoiMjAyNS0wMS0wMVQxMjozNDo1Ni43ODlaIiwiaWQiOiJsb2dfYWJjMTIzIn0",
"limits": {
"workflowExecutionRateLimit": {
"sync": {
"requestsPerMinute": 60,
"maxBurst": 120,
"remaining": 118,
"resetAt": "2025-01-01T12:35:56.789Z"
"limits": {
"workflowExecutionRateLimit": {
"sync": {
"requestsPerMinute": 150,
"maxBurst": 300,
"remaining": 298,
"resetAt": "2025-01-01T12:35:56.789Z"
},
"async": {
"requestsPerMinute": 1000,
"maxBurst": 2000,
"remaining": 1998,
"resetAt": "2025-01-01T12:35:56.789Z"
}
},
"async": {
"requestsPerMinute": 200,
"maxBurst": 400,
"remaining": 398,
"resetAt": "2025-01-01T12:35:56.789Z"
"usage": {
"currentPeriodCost": 1.234,
"limit": 10,
"plan": "pro",
"isExceeded": false
}
},
"usage": {
"currentPeriodCost": 1.234,
"limit": 10,
"plan": "pro",
"isExceeded": false
}
}
}
```
</Tab>
@@ -188,15 +188,15 @@ Retrieve detailed information about a specific log entry.
"limits": {
"workflowExecutionRateLimit": {
"sync": {
"requestsPerMinute": 60,
"maxBurst": 120,
"remaining": 118,
"requestsPerMinute": 150,
"maxBurst": 300,
"remaining": 298,
"resetAt": "2025-01-01T12:35:56.789Z"
},
"async": {
"requestsPerMinute": 200,
"maxBurst": 400,
"remaining": 398,
"requestsPerMinute": 1000,
"maxBurst": 2000,
"remaining": 1998,
"resetAt": "2025-01-01T12:35:56.789Z"
}
},
@@ -477,10 +477,10 @@ The API uses a **token bucket algorithm** for rate limiting, providing fair usag
| Plan | Requests/Minute | Burst Capacity |
|------|-----------------|----------------|
| Free | 10 | 20 |
| Pro | 30 | 60 |
| Team | 60 | 120 |
| Enterprise | 120 | 240 |
| Free | 30 | 60 |
| Pro | 100 | 200 |
| Team | 200 | 400 |
| Enterprise | 500 | 1000 |
**How it works:**
- Tokens refill at `requestsPerMinute` rate

View File

@@ -170,16 +170,16 @@ curl -X GET -H "X-API-Key: YOUR_API_KEY" -H "Content-Type: application/json" htt
"rateLimit": {
"sync": {
"isLimited": false,
"requestsPerMinute": 25,
"maxBurst": 50,
"remaining": 50,
"requestsPerMinute": 150,
"maxBurst": 300,
"remaining": 300,
"resetAt": "2025-09-08T22:51:55.999Z"
},
"async": {
"isLimited": false,
"requestsPerMinute": 200,
"maxBurst": 400,
"remaining": 400,
"requestsPerMinute": 1000,
"maxBurst": 2000,
"remaining": 2000,
"resetAt": "2025-09-08T22:51:56.155Z"
},
"authType": "api"
@@ -206,11 +206,11 @@ curl -X GET -H "X-API-Key: YOUR_API_KEY" -H "Content-Type: application/json" htt
Different subscription plans have different usage limits:
| Plan | Monthly Usage Limit | Rate Limits (per minute) |
|------|-------------------|-------------------------|
| **Free** | $20 | 5 sync, 10 async |
| **Pro** | $100 | 10 sync, 50 async |
| **Team** | $500 (pooled) | 50 sync, 100 async |
| Plan | Monthly Usage Included | Rate Limits (per minute) |
|------|------------------------|-------------------------|
| **Free** | $20 | 50 sync, 200 async |
| **Pro** | $20 (adjustable) | 150 sync, 1,000 async |
| **Team** | $40/seat (pooled, adjustable) | 300 sync, 2,500 async |
| **Enterprise** | Custom | Custom |
## Billing Model

View File

@@ -1,10 +1,11 @@
'use client'
import type React from 'react'
import { createContext, useCallback, useContext, useEffect, useMemo, useState } from 'react'
import { createContext, useCallback, useContext, useEffect, useMemo, useRef, useState } from 'react'
import { createLogger } from '@sim/logger'
import { useQueryClient } from '@tanstack/react-query'
import { useParams } from 'next/navigation'
import { useSocket } from '@/app/workspace/providers/socket-provider'
import {
useWorkspacePermissionsQuery,
type WorkspacePermissions,
@@ -57,14 +58,42 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
const [hasShownOfflineNotification, setHasShownOfflineNotification] = useState(false)
const hasOperationError = useOperationQueueStore((state) => state.hasOperationError)
const addNotification = useNotificationStore((state) => state.addNotification)
const removeNotification = useNotificationStore((state) => state.removeNotification)
const { isReconnecting } = useSocket()
const reconnectingNotificationIdRef = useRef<string | null>(null)
const isOfflineMode = hasOperationError
useEffect(() => {
if (isReconnecting && !reconnectingNotificationIdRef.current && !isOfflineMode) {
const id = addNotification({
level: 'error',
message: 'Reconnecting...',
})
reconnectingNotificationIdRef.current = id
} else if (!isReconnecting && reconnectingNotificationIdRef.current) {
removeNotification(reconnectingNotificationIdRef.current)
reconnectingNotificationIdRef.current = null
}
return () => {
if (reconnectingNotificationIdRef.current) {
removeNotification(reconnectingNotificationIdRef.current)
reconnectingNotificationIdRef.current = null
}
}
}, [isReconnecting, isOfflineMode, addNotification, removeNotification])
useEffect(() => {
if (!isOfflineMode || hasShownOfflineNotification) {
return
}
if (reconnectingNotificationIdRef.current) {
removeNotification(reconnectingNotificationIdRef.current)
reconnectingNotificationIdRef.current = null
}
try {
addNotification({
level: 'error',
@@ -78,7 +107,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
} catch (error) {
logger.error('Failed to add offline notification', { error })
}
}, [addNotification, hasShownOfflineNotification, isOfflineMode])
}, [addNotification, removeNotification, hasShownOfflineNotification, isOfflineMode])
const {
data: workspacePermissions,

View File

@@ -13,8 +13,8 @@ import { SlackMonoIcon } from '@/components/icons'
import type { PlanFeature } from '@/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/components/plan-card'
export const PRO_PLAN_FEATURES: PlanFeature[] = [
{ icon: Zap, text: '25 runs per minute (sync)' },
{ icon: Clock, text: '200 runs per minute (async)' },
{ icon: Zap, text: '150 runs per minute (sync)' },
{ icon: Clock, text: '1,000 runs per minute (async)' },
{ icon: HardDrive, text: '50GB file storage' },
{ icon: Building2, text: 'Unlimited workspaces' },
{ icon: Users, text: 'Unlimited invites' },
@@ -22,8 +22,8 @@ export const PRO_PLAN_FEATURES: PlanFeature[] = [
]
export const TEAM_PLAN_FEATURES: PlanFeature[] = [
{ icon: Zap, text: '75 runs per minute (sync)' },
{ icon: Clock, text: '500 runs per minute (async)' },
{ icon: Zap, text: '300 runs per minute (sync)' },
{ icon: Clock, text: '2,500 runs per minute (async)' },
{ icon: HardDrive, text: '500GB file storage (pooled)' },
{ icon: Building2, text: 'Unlimited workspaces' },
{ icon: Users, text: 'Unlimited invites' },

View File

@@ -49,6 +49,7 @@ interface SocketContextType {
socket: Socket | null
isConnected: boolean
isConnecting: boolean
isReconnecting: boolean
authFailed: boolean
currentWorkflowId: string | null
currentSocketId: string | null
@@ -66,9 +67,16 @@ interface SocketContextType {
blockId: string,
subblockId: string,
value: any,
operationId?: string
operationId: string | undefined,
workflowId: string
) => void
emitVariableUpdate: (
variableId: string,
field: string,
value: any,
operationId: string | undefined,
workflowId: string
) => void
emitVariableUpdate: (variableId: string, field: string, value: any, operationId?: string) => void
emitCursorUpdate: (cursor: { x: number; y: number } | null) => void
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
@@ -88,6 +96,7 @@ const SocketContext = createContext<SocketContextType>({
socket: null,
isConnected: false,
isConnecting: false,
isReconnecting: false,
authFailed: false,
currentWorkflowId: null,
currentSocketId: null,
@@ -122,6 +131,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [socket, setSocket] = useState<Socket | null>(null)
const [isConnected, setIsConnected] = useState(false)
const [isConnecting, setIsConnecting] = useState(false)
const [isReconnecting, setIsReconnecting] = useState(false)
const [currentWorkflowId, setCurrentWorkflowId] = useState<string | null>(null)
const [currentSocketId, setCurrentSocketId] = useState<string | null>(null)
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
@@ -236,20 +246,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
setCurrentWorkflowId(null)
setPresenceUsers([])
logger.info('Socket disconnected', {
reason,
})
// socket.active indicates if auto-reconnect will happen
if (socketInstance.active) {
setIsReconnecting(true)
logger.info('Socket disconnected, will auto-reconnect', { reason })
} else {
setIsReconnecting(false)
logger.info('Socket disconnected, no auto-reconnect', { reason })
}
})
socketInstance.on('connect_error', (error: any) => {
socketInstance.on('connect_error', (error: Error) => {
setIsConnecting(false)
logger.error('Socket connection error:', {
message: error.message,
stack: error.stack,
description: error.description,
type: error.type,
transport: error.transport,
})
logger.error('Socket connection error:', { message: error.message })
// Check if this is an authentication failure
const isAuthError =
@@ -261,43 +270,41 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.warn(
'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.'
)
// Stop reconnection attempts to prevent infinite loop
socketInstance.disconnect()
// Reset state to allow re-initialization when session is restored
setSocket(null)
setAuthFailed(true)
setIsReconnecting(false)
initializedRef.current = false
} else if (socketInstance.active) {
// Temporary failure, will auto-reconnect
setIsReconnecting(true)
}
})
socketInstance.on('reconnect', (attemptNumber) => {
// Reconnection events are on the Manager (socket.io), not the socket itself
socketInstance.io.on('reconnect', (attemptNumber) => {
setIsConnected(true)
setIsReconnecting(false)
setCurrentSocketId(socketInstance.id ?? null)
logger.info('Socket reconnected successfully', {
attemptNumber,
socketId: socketInstance.id,
transport: socketInstance.io.engine?.transport?.name,
})
// Note: join-workflow is handled by the useEffect watching isConnected
})
socketInstance.on('reconnect_attempt', (attemptNumber) => {
logger.info('Socket reconnection attempt (fresh token will be generated)', {
attemptNumber,
timestamp: new Date().toISOString(),
})
socketInstance.io.on('reconnect_attempt', (attemptNumber) => {
setIsReconnecting(true)
logger.info('Socket reconnection attempt', { attemptNumber })
})
socketInstance.on('reconnect_error', (error: any) => {
logger.error('Socket reconnection error:', {
message: error.message,
attemptNumber: error.attemptNumber,
type: error.type,
})
socketInstance.io.on('reconnect_error', (error: Error) => {
logger.error('Socket reconnection error:', { message: error.message })
})
socketInstance.on('reconnect_failed', () => {
socketInstance.io.on('reconnect_failed', () => {
logger.error('Socket reconnection failed - all attempts exhausted')
setIsReconnecting(false)
setIsConnecting(false)
})
@@ -629,6 +636,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
if (commit) {
socket.emit('workflow-operation', {
workflowId: currentWorkflowId,
operation,
target,
payload,
@@ -645,6 +653,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
pendingPositionUpdates.current.set(blockId, {
workflowId: currentWorkflowId,
operation,
target,
payload,
@@ -666,6 +675,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
} else {
socket.emit('workflow-operation', {
workflowId: currentWorkflowId,
operation,
target,
payload,
@@ -678,47 +688,51 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
)
const emitSubblockUpdate = useCallback(
(blockId: string, subblockId: string, value: any, operationId?: string) => {
if (socket && currentWorkflowId) {
socket.emit('subblock-update', {
blockId,
subblockId,
value,
timestamp: Date.now(),
operationId,
})
} else {
logger.warn('Cannot emit subblock update: no socket connection or workflow room', {
hasSocket: !!socket,
currentWorkflowId,
blockId,
subblockId,
})
(
blockId: string,
subblockId: string,
value: any,
operationId: string | undefined,
workflowId: string
) => {
if (!socket) {
logger.warn('Cannot emit subblock update: no socket connection', { workflowId, blockId })
return
}
socket.emit('subblock-update', {
workflowId,
blockId,
subblockId,
value,
timestamp: Date.now(),
operationId,
})
},
[socket, currentWorkflowId]
[socket]
)
const emitVariableUpdate = useCallback(
(variableId: string, field: string, value: any, operationId?: string) => {
if (socket && currentWorkflowId) {
socket.emit('variable-update', {
variableId,
field,
value,
timestamp: Date.now(),
operationId,
})
} else {
logger.warn('Cannot emit variable update: no socket connection or workflow room', {
hasSocket: !!socket,
currentWorkflowId,
variableId,
field,
})
(
variableId: string,
field: string,
value: any,
operationId: string | undefined,
workflowId: string
) => {
if (!socket) {
logger.warn('Cannot emit variable update: no socket connection', { workflowId, variableId })
return
}
socket.emit('variable-update', {
workflowId,
variableId,
field,
value,
timestamp: Date.now(),
operationId,
})
},
[socket, currentWorkflowId]
[socket]
)
const lastCursorEmit = useRef(0)
@@ -794,6 +808,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
isReconnecting,
authFailed,
currentWorkflowId,
currentSocketId,
@@ -820,6 +835,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
isReconnecting,
authFailed,
currentWorkflowId,
currentSocketId,

View File

@@ -13,8 +13,8 @@ interface FreeTierUpgradeEmailProps {
const proFeatures = [
{ label: '$20/month', desc: 'in credits included' },
{ label: '25 runs/min', desc: 'sync executions' },
{ label: '200 runs/min', desc: 'async executions' },
{ label: '150 runs/min', desc: 'sync executions' },
{ label: '1,000 runs/min', desc: 'async executions' },
{ label: '50GB storage', desc: 'for files & assets' },
{ label: 'Unlimited', desc: 'workspaces & invites' },
]

View File

@@ -146,10 +146,6 @@ export function useCollaborativeWorkflow() {
cancelOperationsForVariable,
} = useOperationQueue()
const isInActiveRoom = useCallback(() => {
return !!currentWorkflowId && activeWorkflowId === currentWorkflowId
}, [currentWorkflowId, activeWorkflowId])
// Register emit functions with operation queue store
useEffect(() => {
registerEmitFunctions(
@@ -162,10 +158,19 @@ export function useCollaborativeWorkflow() {
useEffect(() => {
const handleWorkflowOperation = (data: any) => {
const { operation, target, payload, userId } = data
const { operation, target, payload, userId, metadata } = data
if (isApplyingRemoteChange.current) return
// Filter broadcasts by workflowId to prevent cross-workflow updates
if (metadata?.workflowId && metadata.workflowId !== activeWorkflowId) {
logger.debug('Ignoring workflow operation for different workflow', {
broadcastWorkflowId: metadata.workflowId,
activeWorkflowId,
})
return
}
logger.info(`Received ${operation} on ${target} from user ${userId}`)
// Apply the operation to local state
@@ -436,16 +441,24 @@ export function useCollaborativeWorkflow() {
}
const handleSubblockUpdate = (data: any) => {
const { blockId, subblockId, value, userId } = data
const { workflowId, blockId, subblockId, value, userId } = data
if (isApplyingRemoteChange.current) return
// Filter broadcasts by workflowId to prevent cross-workflow updates
if (workflowId && workflowId !== activeWorkflowId) {
logger.debug('Ignoring subblock update for different workflow', {
broadcastWorkflowId: workflowId,
activeWorkflowId,
})
return
}
logger.info(`Received subblock update from user ${userId}: ${blockId}.${subblockId}`)
isApplyingRemoteChange.current = true
try {
// The setValue function automatically uses the active workflow ID
useSubBlockStore.getState().setValue(blockId, subblockId, value)
const blockType = useWorkflowStore.getState().blocks?.[blockId]?.type
if (activeWorkflowId && blockType === 'function' && subblockId === 'code') {
@@ -459,10 +472,19 @@ export function useCollaborativeWorkflow() {
}
const handleVariableUpdate = (data: any) => {
const { variableId, field, value, userId } = data
const { workflowId, variableId, field, value, userId } = data
if (isApplyingRemoteChange.current) return
// Filter broadcasts by workflowId to prevent cross-workflow updates
if (workflowId && workflowId !== activeWorkflowId) {
logger.debug('Ignoring variable update for different workflow', {
broadcastWorkflowId: workflowId,
activeWorkflowId,
})
return
}
logger.info(`Received variable update from user ${userId}: ${variableId}.${field}`)
isApplyingRemoteChange.current = true
@@ -623,13 +645,9 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping operation - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
operation,
target,
})
// Queue operations if we have an active workflow - queue handles socket readiness
if (!activeWorkflowId) {
logger.debug('Skipping operation - no active workflow', { operation, target })
return
}
@@ -642,20 +660,13 @@ export function useCollaborativeWorkflow() {
target,
payload,
},
workflowId: activeWorkflowId || '',
workflowId: activeWorkflowId,
userId: session?.user?.id || 'unknown',
})
localAction()
},
[
addToQueue,
session?.user?.id,
isBaselineDiffView,
activeWorkflowId,
isInActiveRoom,
currentWorkflowId,
]
[addToQueue, session?.user?.id, isBaselineDiffView, activeWorkflowId]
)
const collaborativeBatchUpdatePositions = useCallback(
@@ -669,8 +680,8 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch position update - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch position update - no active workflow')
return
}
@@ -714,7 +725,7 @@ export function useCollaborativeWorkflow() {
}
}
},
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, isInActiveRoom, undoRedo]
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, undoRedo]
)
const collaborativeUpdateBlockName = useCallback(
@@ -858,8 +869,8 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch update parent - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch update parent - no active workflow')
return
}
@@ -928,7 +939,7 @@ export function useCollaborativeWorkflow() {
logger.debug('Batch updated parent for blocks', { updateCount: updates.length })
},
[isBaselineDiffView, isInActiveRoom, undoRedo, addToQueue, activeWorkflowId, session?.user?.id]
[isBaselineDiffView, undoRedo, addToQueue, activeWorkflowId, session?.user?.id]
)
const collaborativeToggleBlockAdvancedMode = useCallback(
@@ -1020,8 +1031,8 @@ export function useCollaborativeWorkflow() {
return false
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch add edges - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch add edges - no active workflow')
return false
}
@@ -1055,7 +1066,7 @@ export function useCollaborativeWorkflow() {
return true
},
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, isInActiveRoom, undoRedo]
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, undoRedo]
)
const collaborativeBatchRemoveEdges = useCallback(
@@ -1064,8 +1075,8 @@ export function useCollaborativeWorkflow() {
return false
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch remove edges - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch remove edges - no active workflow')
return false
}
@@ -1113,7 +1124,7 @@ export function useCollaborativeWorkflow() {
logger.info('Batch removed edges', { count: validEdgeIds.length })
return true
},
[isBaselineDiffView, isInActiveRoom, addToQueue, activeWorkflowId, session, undoRedo]
[isBaselineDiffView, addToQueue, activeWorkflowId, session, undoRedo]
)
const collaborativeSetSubblockValue = useCallback(
@@ -1148,11 +1159,9 @@ export function useCollaborativeWorkflow() {
// Best-effort; do not block on clearing
}
// Only emit to socket if in active room
if (!isInActiveRoom()) {
logger.debug('Local update applied, skipping socket emit - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
// Queue socket operation if we have an active workflow
if (!activeWorkflowId) {
logger.debug('Local update applied, skipping socket queue - no active workflow', {
blockId,
subblockId,
})
@@ -1174,14 +1183,7 @@ export function useCollaborativeWorkflow() {
userId: session?.user?.id || 'unknown',
})
},
[
currentWorkflowId,
activeWorkflowId,
addToQueue,
session?.user?.id,
isBaselineDiffView,
isInActiveRoom,
]
[activeWorkflowId, addToQueue, session?.user?.id, isBaselineDiffView]
)
// Immediate tag selection (uses queue but processes immediately, no debouncing)
@@ -1193,13 +1195,8 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping tag selection - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
blockId,
subblockId,
})
if (!activeWorkflowId) {
logger.debug('Skipping tag selection - no active workflow', { blockId, subblockId })
return
}
@@ -1220,14 +1217,7 @@ export function useCollaborativeWorkflow() {
userId: session?.user?.id || 'unknown',
})
},
[
isBaselineDiffView,
addToQueue,
currentWorkflowId,
activeWorkflowId,
session?.user?.id,
isInActiveRoom,
]
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id]
)
const collaborativeUpdateLoopType = useCallback(
@@ -1514,8 +1504,8 @@ export function useCollaborativeWorkflow() {
subBlockValues: Record<string, Record<string, unknown>> = {},
options?: { skipUndoRedo?: boolean }
) => {
if (!isInActiveRoom()) {
logger.debug('Skipping batch add blocks - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch add blocks - no active workflow')
return false
}
@@ -1568,7 +1558,7 @@ export function useCollaborativeWorkflow() {
return true
},
[addToQueue, activeWorkflowId, session?.user?.id, isBaselineDiffView, isInActiveRoom, undoRedo]
[addToQueue, activeWorkflowId, session?.user?.id, isBaselineDiffView, undoRedo]
)
const collaborativeBatchRemoveBlocks = useCallback(
@@ -1577,8 +1567,8 @@ export function useCollaborativeWorkflow() {
return false
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch remove blocks - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch remove blocks - no active workflow')
return false
}
@@ -1662,7 +1652,6 @@ export function useCollaborativeWorkflow() {
addToQueue,
activeWorkflowId,
session?.user?.id,
isInActiveRoom,
cancelOperationsForBlock,
undoRedo,
]

View File

@@ -161,14 +161,14 @@ export const env = createEnv({
// Rate Limiting Configuration
RATE_LIMIT_WINDOW_MS: z.string().optional().default('60000'), // Rate limit window duration in milliseconds (default: 1 minute)
MANUAL_EXECUTION_LIMIT: z.string().optional().default('999999'),// Manual execution bypass value (effectively unlimited)
RATE_LIMIT_FREE_SYNC: z.string().optional().default('10'), // Free tier sync API executions per minute
RATE_LIMIT_FREE_ASYNC: z.string().optional().default('50'), // Free tier async API executions per minute
RATE_LIMIT_PRO_SYNC: z.string().optional().default('25'), // Pro tier sync API executions per minute
RATE_LIMIT_PRO_ASYNC: z.string().optional().default('200'), // Pro tier async API executions per minute
RATE_LIMIT_TEAM_SYNC: z.string().optional().default('75'), // Team tier sync API executions per minute
RATE_LIMIT_TEAM_ASYNC: z.string().optional().default('500'), // Team tier async API executions per minute
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('150'), // Enterprise tier sync API executions per minute
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('1000'), // Enterprise tier async API executions per minute
RATE_LIMIT_FREE_SYNC: z.string().optional().default('50'), // Free tier sync API executions per minute
RATE_LIMIT_FREE_ASYNC: z.string().optional().default('200'), // Free tier async API executions per minute
RATE_LIMIT_PRO_SYNC: z.string().optional().default('150'), // Pro tier sync API executions per minute
RATE_LIMIT_PRO_ASYNC: z.string().optional().default('1000'), // Pro tier async API executions per minute
RATE_LIMIT_TEAM_SYNC: z.string().optional().default('300'), // Team tier sync API executions per minute
RATE_LIMIT_TEAM_ASYNC: z.string().optional().default('2500'), // Team tier async API executions per minute
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
// Knowledge Base Processing Configuration - Shared across all processing methods
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)

View File

@@ -28,24 +28,24 @@ function createBucketConfig(ratePerMinute: number, burstMultiplier = 2): TokenBu
export const RATE_LIMITS: Record<SubscriptionPlan, RateLimitConfig> = {
free: {
sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_FREE_SYNC) || 10),
async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_FREE_ASYNC) || 50),
apiEndpoint: createBucketConfig(10),
},
pro: {
sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_PRO_SYNC) || 25),
async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_PRO_ASYNC) || 200),
sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_FREE_SYNC) || 50),
async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_FREE_ASYNC) || 200),
apiEndpoint: createBucketConfig(30),
},
pro: {
sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_PRO_SYNC) || 150),
async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_PRO_ASYNC) || 1000),
apiEndpoint: createBucketConfig(100),
},
team: {
sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_TEAM_SYNC) || 75),
async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_TEAM_ASYNC) || 500),
apiEndpoint: createBucketConfig(60),
sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_TEAM_SYNC) || 300),
async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_TEAM_ASYNC) || 2500),
apiEndpoint: createBucketConfig(200),
},
enterprise: {
sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_ENTERPRISE_SYNC) || 150),
async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_ENTERPRISE_ASYNC) || 1000),
apiEndpoint: createBucketConfig(120),
sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_ENTERPRISE_SYNC) || 600),
async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_ENTERPRISE_ASYNC) || 5000),
apiEndpoint: createBucketConfig(500),
},
}

View File

@@ -39,16 +39,23 @@ export function cleanupPendingSubblocksForSocket(socketId: string): void {
export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('subblock-update', async (data) => {
const { blockId, subblockId, value, timestamp, operationId } = data
const {
workflowId: payloadWorkflowId,
blockId,
subblockId,
value,
timestamp,
operationId,
} = data
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
if (!sessionWorkflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasWorkflowId: !!sessionWorkflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
@@ -61,6 +68,24 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
return
}
const workflowId = payloadWorkflowId || sessionWorkflowId
if (payloadWorkflowId && payloadWorkflowId !== sessionWorkflowId) {
logger.warn('Workflow ID mismatch in subblock update', {
payloadWorkflowId,
sessionWorkflowId,
socketId: socket.id,
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Workflow ID mismatch',
retryable: true,
})
}
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
@@ -182,20 +207,17 @@ async function flushSubblockUpdate(
if (updateSuccessful) {
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
const broadcastPayload = {
workflowId,
blockId,
subblockId,
value,
timestamp,
}
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
io.to(workflowId).except(senderSocketIds).emit('subblock-update', broadcastPayload)
} else {
io.to(workflowId).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
io.to(workflowId).emit('subblock-update', broadcastPayload)
}
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)

View File

@@ -35,16 +35,16 @@ export function cleanupPendingVariablesForSocket(socketId: string): void {
export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('variable-update', async (data) => {
const { variableId, field, value, timestamp, operationId } = data
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
if (!sessionWorkflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasWorkflowId: !!sessionWorkflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
@@ -57,6 +57,24 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
return
}
const workflowId = payloadWorkflowId || sessionWorkflowId
if (payloadWorkflowId && payloadWorkflowId !== sessionWorkflowId) {
logger.warn('Workflow ID mismatch in variable update', {
payloadWorkflowId,
sessionWorkflowId,
socketId: socket.id,
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Workflow ID mismatch',
retryable: true,
})
}
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring variable update: workflow room not found`, {
@@ -179,20 +197,17 @@ async function flushVariableUpdate(
if (updateSuccessful) {
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
const broadcastPayload = {
workflowId,
variableId,
field,
value,
timestamp,
}
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
io.to(workflowId).except(senderSocketIds).emit('variable-update', broadcastPayload)
} else {
io.to(workflowId).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
io.to(workflowId).emit('variable-update', broadcastPayload)
}
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)

View File

@@ -24,16 +24,40 @@ let emitWorkflowOperation:
| ((operation: string, target: string, payload: any, operationId?: string) => void)
| null = null
let emitSubblockUpdate:
| ((blockId: string, subblockId: string, value: any, operationId?: string) => void)
| ((
blockId: string,
subblockId: string,
value: any,
operationId: string | undefined,
workflowId: string
) => void)
| null = null
let emitVariableUpdate:
| ((variableId: string, field: string, value: any, operationId?: string) => void)
| ((
variableId: string,
field: string,
value: any,
operationId: string | undefined,
workflowId: string
) => void)
| null = null
export function registerEmitFunctions(
workflowEmit: (operation: string, target: string, payload: any, operationId?: string) => void,
subblockEmit: (blockId: string, subblockId: string, value: any, operationId?: string) => void,
variableEmit: (variableId: string, field: string, value: any, operationId?: string) => void,
subblockEmit: (
blockId: string,
subblockId: string,
value: any,
operationId: string | undefined,
workflowId: string
) => void,
variableEmit: (
variableId: string,
field: string,
value: any,
operationId: string | undefined,
workflowId: string
) => void,
workflowId: string | null
) {
emitWorkflowOperation = workflowEmit
@@ -196,14 +220,16 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
}
if (!retryable) {
logger.debug('Operation marked as non-retryable, removing from queue', { operationId })
logger.error(
'Operation failed with non-retryable error - state out of sync, triggering offline mode',
{
operationId,
operation: operation.operation.operation,
target: operation.operation.target,
}
)
set((state) => ({
operations: state.operations.filter((op) => op.id !== operationId),
isProcessing: false,
}))
get().processNextOperation()
get().triggerOfflineMode()
return
}
@@ -305,11 +331,23 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
const { operation: op, target, payload } = nextOperation.operation
if (op === 'subblock-update' && target === 'subblock') {
if (emitSubblockUpdate) {
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, nextOperation.id)
emitSubblockUpdate(
payload.blockId,
payload.subblockId,
payload.value,
nextOperation.id,
nextOperation.workflowId
)
}
} else if (op === 'variable-update' && target === 'variable') {
if (emitVariableUpdate) {
emitVariableUpdate(payload.variableId, payload.field, payload.value, nextOperation.id)
emitVariableUpdate(
payload.variableId,
payload.field,
payload.value,
nextOperation.id,
nextOperation.workflowId
)
}
} else {
if (emitWorkflowOperation) {

View File

@@ -125,8 +125,8 @@ app:
# Rate Limiting Configuration (per minute)
RATE_LIMIT_WINDOW_MS: "60000" # Rate limit window duration (1 minute)
RATE_LIMIT_FREE_SYNC: "10" # Sync API executions per minute
RATE_LIMIT_FREE_ASYNC: "50" # Async API executions per minute
RATE_LIMIT_FREE_SYNC: "50" # Sync API executions per minute
RATE_LIMIT_FREE_ASYNC: "200" # Async API executions per minute
# UI Branding & Whitelabeling Configuration
NEXT_PUBLIC_BRAND_NAME: "Sim" # Custom brand name