From e1ac20193651c0dd63b7264288b0f1d70118ca45 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 31 Jan 2026 16:48:57 -0800 Subject: [PATCH] improvement(ratelimits, sockets): increase across all plans, reconnecting notif for sockets (#3096) * improvement(rate-limits): increase across all plans * improve sockets with reconnecting * address bugbot comment * fix typing --- apps/docs/content/docs/en/execution/api.mdx | 74 ++++----- apps/docs/content/docs/en/execution/costs.mdx | 22 +-- .../workspace-permissions-provider.tsx | 33 +++- .../components/subscription/plan-configs.ts | 8 +- .../workspace/providers/socket-provider.tsx | 142 ++++++++++-------- .../billing/free-tier-upgrade-email.tsx | 4 +- apps/sim/hooks/use-collaborative-workflow.ts | 129 ++++++++-------- apps/sim/lib/core/config/env.ts | 16 +- apps/sim/lib/core/rate-limiter/types.ts | 26 ++-- apps/sim/socket/handlers/subblocks.ts | 54 +++++-- apps/sim/socket/handlers/variables.ts | 47 ++++-- apps/sim/stores/operation-queue/store.ts | 64 ++++++-- helm/sim/values.yaml | 4 +- 13 files changed, 366 insertions(+), 257 deletions(-) diff --git a/apps/docs/content/docs/en/execution/api.mdx b/apps/docs/content/docs/en/execution/api.mdx index 85e02eb02..860b44571 100644 --- a/apps/docs/content/docs/en/execution/api.mdx +++ b/apps/docs/content/docs/en/execution/api.mdx @@ -27,16 +27,16 @@ All API responses include information about your workflow execution limits and u "limits": { "workflowExecutionRateLimit": { "sync": { - "requestsPerMinute": 60, // Sustained rate limit per minute - "maxBurst": 120, // Maximum burst capacity - "remaining": 118, // Current tokens available (up to maxBurst) - "resetAt": "..." // When tokens next refill + "requestsPerMinute": 150, // Sustained rate limit per minute + "maxBurst": 300, // Maximum burst capacity + "remaining": 298, // Current tokens available (up to maxBurst) + "resetAt": "..." // When tokens next refill }, "async": { - "requestsPerMinute": 200, // Sustained rate limit per minute - "maxBurst": 400, // Maximum burst capacity - "remaining": 398, // Current tokens available - "resetAt": "..." // When tokens next refill + "requestsPerMinute": 1000, // Sustained rate limit per minute + "maxBurst": 2000, // Maximum burst capacity + "remaining": 1998, // Current tokens available + "resetAt": "..." // When tokens next refill } }, "usage": { @@ -107,28 +107,28 @@ Query workflow execution logs with extensive filtering options. } ], "nextCursor": "eyJzIjoiMjAyNS0wMS0wMVQxMjozNDo1Ni43ODlaIiwiaWQiOiJsb2dfYWJjMTIzIn0", - "limits": { - "workflowExecutionRateLimit": { - "sync": { - "requestsPerMinute": 60, - "maxBurst": 120, - "remaining": 118, - "resetAt": "2025-01-01T12:35:56.789Z" +"limits": { + "workflowExecutionRateLimit": { + "sync": { + "requestsPerMinute": 150, + "maxBurst": 300, + "remaining": 298, + "resetAt": "2025-01-01T12:35:56.789Z" + }, + "async": { + "requestsPerMinute": 1000, + "maxBurst": 2000, + "remaining": 1998, + "resetAt": "2025-01-01T12:35:56.789Z" + } }, - "async": { - "requestsPerMinute": 200, - "maxBurst": 400, - "remaining": 398, - "resetAt": "2025-01-01T12:35:56.789Z" + "usage": { + "currentPeriodCost": 1.234, + "limit": 10, + "plan": "pro", + "isExceeded": false } - }, - "usage": { - "currentPeriodCost": 1.234, - "limit": 10, - "plan": "pro", - "isExceeded": false } - } } ``` @@ -188,15 +188,15 @@ Retrieve detailed information about a specific log entry. "limits": { "workflowExecutionRateLimit": { "sync": { - "requestsPerMinute": 60, - "maxBurst": 120, - "remaining": 118, + "requestsPerMinute": 150, + "maxBurst": 300, + "remaining": 298, "resetAt": "2025-01-01T12:35:56.789Z" }, "async": { - "requestsPerMinute": 200, - "maxBurst": 400, - "remaining": 398, + "requestsPerMinute": 1000, + "maxBurst": 2000, + "remaining": 1998, "resetAt": "2025-01-01T12:35:56.789Z" } }, @@ -477,10 +477,10 @@ The API uses a **token bucket algorithm** for rate limiting, providing fair usag | Plan | Requests/Minute | Burst Capacity | |------|-----------------|----------------| -| Free | 10 | 20 | -| Pro | 30 | 60 | -| Team | 60 | 120 | -| Enterprise | 120 | 240 | +| Free | 30 | 60 | +| Pro | 100 | 200 | +| Team | 200 | 400 | +| Enterprise | 500 | 1000 | **How it works:** - Tokens refill at `requestsPerMinute` rate diff --git a/apps/docs/content/docs/en/execution/costs.mdx b/apps/docs/content/docs/en/execution/costs.mdx index 65dcd8a58..5d88091b1 100644 --- a/apps/docs/content/docs/en/execution/costs.mdx +++ b/apps/docs/content/docs/en/execution/costs.mdx @@ -170,16 +170,16 @@ curl -X GET -H "X-API-Key: YOUR_API_KEY" -H "Content-Type: application/json" htt "rateLimit": { "sync": { "isLimited": false, - "requestsPerMinute": 25, - "maxBurst": 50, - "remaining": 50, + "requestsPerMinute": 150, + "maxBurst": 300, + "remaining": 300, "resetAt": "2025-09-08T22:51:55.999Z" }, "async": { "isLimited": false, - "requestsPerMinute": 200, - "maxBurst": 400, - "remaining": 400, + "requestsPerMinute": 1000, + "maxBurst": 2000, + "remaining": 2000, "resetAt": "2025-09-08T22:51:56.155Z" }, "authType": "api" @@ -206,11 +206,11 @@ curl -X GET -H "X-API-Key: YOUR_API_KEY" -H "Content-Type: application/json" htt Different subscription plans have different usage limits: -| Plan | Monthly Usage Limit | Rate Limits (per minute) | -|------|-------------------|-------------------------| -| **Free** | $20 | 5 sync, 10 async | -| **Pro** | $100 | 10 sync, 50 async | -| **Team** | $500 (pooled) | 50 sync, 100 async | +| Plan | Monthly Usage Included | Rate Limits (per minute) | +|------|------------------------|-------------------------| +| **Free** | $20 | 50 sync, 200 async | +| **Pro** | $20 (adjustable) | 150 sync, 1,000 async | +| **Team** | $40/seat (pooled, adjustable) | 300 sync, 2,500 async | | **Enterprise** | Custom | Custom | ## Billing Model diff --git a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx index ab198204d..a501f0d23 100644 --- a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx +++ b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx @@ -1,10 +1,11 @@ 'use client' import type React from 'react' -import { createContext, useCallback, useContext, useEffect, useMemo, useState } from 'react' +import { createContext, useCallback, useContext, useEffect, useMemo, useRef, useState } from 'react' import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' import { useParams } from 'next/navigation' +import { useSocket } from '@/app/workspace/providers/socket-provider' import { useWorkspacePermissionsQuery, type WorkspacePermissions, @@ -57,14 +58,42 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP const [hasShownOfflineNotification, setHasShownOfflineNotification] = useState(false) const hasOperationError = useOperationQueueStore((state) => state.hasOperationError) const addNotification = useNotificationStore((state) => state.addNotification) + const removeNotification = useNotificationStore((state) => state.removeNotification) + const { isReconnecting } = useSocket() + const reconnectingNotificationIdRef = useRef(null) const isOfflineMode = hasOperationError + useEffect(() => { + if (isReconnecting && !reconnectingNotificationIdRef.current && !isOfflineMode) { + const id = addNotification({ + level: 'error', + message: 'Reconnecting...', + }) + reconnectingNotificationIdRef.current = id + } else if (!isReconnecting && reconnectingNotificationIdRef.current) { + removeNotification(reconnectingNotificationIdRef.current) + reconnectingNotificationIdRef.current = null + } + + return () => { + if (reconnectingNotificationIdRef.current) { + removeNotification(reconnectingNotificationIdRef.current) + reconnectingNotificationIdRef.current = null + } + } + }, [isReconnecting, isOfflineMode, addNotification, removeNotification]) + useEffect(() => { if (!isOfflineMode || hasShownOfflineNotification) { return } + if (reconnectingNotificationIdRef.current) { + removeNotification(reconnectingNotificationIdRef.current) + reconnectingNotificationIdRef.current = null + } + try { addNotification({ level: 'error', @@ -78,7 +107,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP } catch (error) { logger.error('Failed to add offline notification', { error }) } - }, [addNotification, hasShownOfflineNotification, isOfflineMode]) + }, [addNotification, removeNotification, hasShownOfflineNotification, isOfflineMode]) const { data: workspacePermissions, diff --git a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/plan-configs.ts b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/plan-configs.ts index f604dd76e..3438d72b2 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/plan-configs.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/plan-configs.ts @@ -13,8 +13,8 @@ import { SlackMonoIcon } from '@/components/icons' import type { PlanFeature } from '@/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/components/plan-card' export const PRO_PLAN_FEATURES: PlanFeature[] = [ - { icon: Zap, text: '25 runs per minute (sync)' }, - { icon: Clock, text: '200 runs per minute (async)' }, + { icon: Zap, text: '150 runs per minute (sync)' }, + { icon: Clock, text: '1,000 runs per minute (async)' }, { icon: HardDrive, text: '50GB file storage' }, { icon: Building2, text: 'Unlimited workspaces' }, { icon: Users, text: 'Unlimited invites' }, @@ -22,8 +22,8 @@ export const PRO_PLAN_FEATURES: PlanFeature[] = [ ] export const TEAM_PLAN_FEATURES: PlanFeature[] = [ - { icon: Zap, text: '75 runs per minute (sync)' }, - { icon: Clock, text: '500 runs per minute (async)' }, + { icon: Zap, text: '300 runs per minute (sync)' }, + { icon: Clock, text: '2,500 runs per minute (async)' }, { icon: HardDrive, text: '500GB file storage (pooled)' }, { icon: Building2, text: 'Unlimited workspaces' }, { icon: Users, text: 'Unlimited invites' }, diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 0b4c5d017..425c64882 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -49,6 +49,7 @@ interface SocketContextType { socket: Socket | null isConnected: boolean isConnecting: boolean + isReconnecting: boolean authFailed: boolean currentWorkflowId: string | null currentSocketId: string | null @@ -66,9 +67,16 @@ interface SocketContextType { blockId: string, subblockId: string, value: any, - operationId?: string + operationId: string | undefined, + workflowId: string + ) => void + emitVariableUpdate: ( + variableId: string, + field: string, + value: any, + operationId: string | undefined, + workflowId: string ) => void - emitVariableUpdate: (variableId: string, field: string, value: any, operationId?: string) => void emitCursorUpdate: (cursor: { x: number; y: number } | null) => void emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void @@ -88,6 +96,7 @@ const SocketContext = createContext({ socket: null, isConnected: false, isConnecting: false, + isReconnecting: false, authFailed: false, currentWorkflowId: null, currentSocketId: null, @@ -122,6 +131,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [socket, setSocket] = useState(null) const [isConnected, setIsConnected] = useState(false) const [isConnecting, setIsConnecting] = useState(false) + const [isReconnecting, setIsReconnecting] = useState(false) const [currentWorkflowId, setCurrentWorkflowId] = useState(null) const [currentSocketId, setCurrentSocketId] = useState(null) const [presenceUsers, setPresenceUsers] = useState([]) @@ -236,20 +246,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) { setCurrentWorkflowId(null) setPresenceUsers([]) - logger.info('Socket disconnected', { - reason, - }) + // socket.active indicates if auto-reconnect will happen + if (socketInstance.active) { + setIsReconnecting(true) + logger.info('Socket disconnected, will auto-reconnect', { reason }) + } else { + setIsReconnecting(false) + logger.info('Socket disconnected, no auto-reconnect', { reason }) + } }) - socketInstance.on('connect_error', (error: any) => { + socketInstance.on('connect_error', (error: Error) => { setIsConnecting(false) - logger.error('Socket connection error:', { - message: error.message, - stack: error.stack, - description: error.description, - type: error.type, - transport: error.transport, - }) + logger.error('Socket connection error:', { message: error.message }) // Check if this is an authentication failure const isAuthError = @@ -261,43 +270,41 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.warn( 'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.' ) - // Stop reconnection attempts to prevent infinite loop socketInstance.disconnect() - // Reset state to allow re-initialization when session is restored setSocket(null) setAuthFailed(true) + setIsReconnecting(false) initializedRef.current = false + } else if (socketInstance.active) { + // Temporary failure, will auto-reconnect + setIsReconnecting(true) } }) - socketInstance.on('reconnect', (attemptNumber) => { + // Reconnection events are on the Manager (socket.io), not the socket itself + socketInstance.io.on('reconnect', (attemptNumber) => { setIsConnected(true) + setIsReconnecting(false) setCurrentSocketId(socketInstance.id ?? null) logger.info('Socket reconnected successfully', { attemptNumber, socketId: socketInstance.id, transport: socketInstance.io.engine?.transport?.name, }) - // Note: join-workflow is handled by the useEffect watching isConnected }) - socketInstance.on('reconnect_attempt', (attemptNumber) => { - logger.info('Socket reconnection attempt (fresh token will be generated)', { - attemptNumber, - timestamp: new Date().toISOString(), - }) + socketInstance.io.on('reconnect_attempt', (attemptNumber) => { + setIsReconnecting(true) + logger.info('Socket reconnection attempt', { attemptNumber }) }) - socketInstance.on('reconnect_error', (error: any) => { - logger.error('Socket reconnection error:', { - message: error.message, - attemptNumber: error.attemptNumber, - type: error.type, - }) + socketInstance.io.on('reconnect_error', (error: Error) => { + logger.error('Socket reconnection error:', { message: error.message }) }) - socketInstance.on('reconnect_failed', () => { + socketInstance.io.on('reconnect_failed', () => { logger.error('Socket reconnection failed - all attempts exhausted') + setIsReconnecting(false) setIsConnecting(false) }) @@ -629,6 +636,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { if (commit) { socket.emit('workflow-operation', { + workflowId: currentWorkflowId, operation, target, payload, @@ -645,6 +653,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } pendingPositionUpdates.current.set(blockId, { + workflowId: currentWorkflowId, operation, target, payload, @@ -666,6 +675,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } } else { socket.emit('workflow-operation', { + workflowId: currentWorkflowId, operation, target, payload, @@ -678,47 +688,51 @@ export function SocketProvider({ children, user }: SocketProviderProps) { ) const emitSubblockUpdate = useCallback( - (blockId: string, subblockId: string, value: any, operationId?: string) => { - if (socket && currentWorkflowId) { - socket.emit('subblock-update', { - blockId, - subblockId, - value, - timestamp: Date.now(), - operationId, - }) - } else { - logger.warn('Cannot emit subblock update: no socket connection or workflow room', { - hasSocket: !!socket, - currentWorkflowId, - blockId, - subblockId, - }) + ( + blockId: string, + subblockId: string, + value: any, + operationId: string | undefined, + workflowId: string + ) => { + if (!socket) { + logger.warn('Cannot emit subblock update: no socket connection', { workflowId, blockId }) + return } + socket.emit('subblock-update', { + workflowId, + blockId, + subblockId, + value, + timestamp: Date.now(), + operationId, + }) }, - [socket, currentWorkflowId] + [socket] ) const emitVariableUpdate = useCallback( - (variableId: string, field: string, value: any, operationId?: string) => { - if (socket && currentWorkflowId) { - socket.emit('variable-update', { - variableId, - field, - value, - timestamp: Date.now(), - operationId, - }) - } else { - logger.warn('Cannot emit variable update: no socket connection or workflow room', { - hasSocket: !!socket, - currentWorkflowId, - variableId, - field, - }) + ( + variableId: string, + field: string, + value: any, + operationId: string | undefined, + workflowId: string + ) => { + if (!socket) { + logger.warn('Cannot emit variable update: no socket connection', { workflowId, variableId }) + return } + socket.emit('variable-update', { + workflowId, + variableId, + field, + value, + timestamp: Date.now(), + operationId, + }) }, - [socket, currentWorkflowId] + [socket] ) const lastCursorEmit = useRef(0) @@ -794,6 +808,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socket, isConnected, isConnecting, + isReconnecting, authFailed, currentWorkflowId, currentSocketId, @@ -820,6 +835,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socket, isConnected, isConnecting, + isReconnecting, authFailed, currentWorkflowId, currentSocketId, diff --git a/apps/sim/components/emails/billing/free-tier-upgrade-email.tsx b/apps/sim/components/emails/billing/free-tier-upgrade-email.tsx index 464221d89..9f42559d2 100644 --- a/apps/sim/components/emails/billing/free-tier-upgrade-email.tsx +++ b/apps/sim/components/emails/billing/free-tier-upgrade-email.tsx @@ -13,8 +13,8 @@ interface FreeTierUpgradeEmailProps { const proFeatures = [ { label: '$20/month', desc: 'in credits included' }, - { label: '25 runs/min', desc: 'sync executions' }, - { label: '200 runs/min', desc: 'async executions' }, + { label: '150 runs/min', desc: 'sync executions' }, + { label: '1,000 runs/min', desc: 'async executions' }, { label: '50GB storage', desc: 'for files & assets' }, { label: 'Unlimited', desc: 'workspaces & invites' }, ] diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index ea0600330..caf0aad9f 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -146,10 +146,6 @@ export function useCollaborativeWorkflow() { cancelOperationsForVariable, } = useOperationQueue() - const isInActiveRoom = useCallback(() => { - return !!currentWorkflowId && activeWorkflowId === currentWorkflowId - }, [currentWorkflowId, activeWorkflowId]) - // Register emit functions with operation queue store useEffect(() => { registerEmitFunctions( @@ -162,10 +158,19 @@ export function useCollaborativeWorkflow() { useEffect(() => { const handleWorkflowOperation = (data: any) => { - const { operation, target, payload, userId } = data + const { operation, target, payload, userId, metadata } = data if (isApplyingRemoteChange.current) return + // Filter broadcasts by workflowId to prevent cross-workflow updates + if (metadata?.workflowId && metadata.workflowId !== activeWorkflowId) { + logger.debug('Ignoring workflow operation for different workflow', { + broadcastWorkflowId: metadata.workflowId, + activeWorkflowId, + }) + return + } + logger.info(`Received ${operation} on ${target} from user ${userId}`) // Apply the operation to local state @@ -436,16 +441,24 @@ export function useCollaborativeWorkflow() { } const handleSubblockUpdate = (data: any) => { - const { blockId, subblockId, value, userId } = data + const { workflowId, blockId, subblockId, value, userId } = data if (isApplyingRemoteChange.current) return + // Filter broadcasts by workflowId to prevent cross-workflow updates + if (workflowId && workflowId !== activeWorkflowId) { + logger.debug('Ignoring subblock update for different workflow', { + broadcastWorkflowId: workflowId, + activeWorkflowId, + }) + return + } + logger.info(`Received subblock update from user ${userId}: ${blockId}.${subblockId}`) isApplyingRemoteChange.current = true try { - // The setValue function automatically uses the active workflow ID useSubBlockStore.getState().setValue(blockId, subblockId, value) const blockType = useWorkflowStore.getState().blocks?.[blockId]?.type if (activeWorkflowId && blockType === 'function' && subblockId === 'code') { @@ -459,10 +472,19 @@ export function useCollaborativeWorkflow() { } const handleVariableUpdate = (data: any) => { - const { variableId, field, value, userId } = data + const { workflowId, variableId, field, value, userId } = data if (isApplyingRemoteChange.current) return + // Filter broadcasts by workflowId to prevent cross-workflow updates + if (workflowId && workflowId !== activeWorkflowId) { + logger.debug('Ignoring variable update for different workflow', { + broadcastWorkflowId: workflowId, + activeWorkflowId, + }) + return + } + logger.info(`Received variable update from user ${userId}: ${variableId}.${field}`) isApplyingRemoteChange.current = true @@ -623,13 +645,9 @@ export function useCollaborativeWorkflow() { return } - if (!isInActiveRoom()) { - logger.debug('Skipping operation - not in active workflow', { - currentWorkflowId, - activeWorkflowId, - operation, - target, - }) + // Queue operations if we have an active workflow - queue handles socket readiness + if (!activeWorkflowId) { + logger.debug('Skipping operation - no active workflow', { operation, target }) return } @@ -642,20 +660,13 @@ export function useCollaborativeWorkflow() { target, payload, }, - workflowId: activeWorkflowId || '', + workflowId: activeWorkflowId, userId: session?.user?.id || 'unknown', }) localAction() }, - [ - addToQueue, - session?.user?.id, - isBaselineDiffView, - activeWorkflowId, - isInActiveRoom, - currentWorkflowId, - ] + [addToQueue, session?.user?.id, isBaselineDiffView, activeWorkflowId] ) const collaborativeBatchUpdatePositions = useCallback( @@ -669,8 +680,8 @@ export function useCollaborativeWorkflow() { return } - if (!isInActiveRoom()) { - logger.debug('Skipping batch position update - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch position update - no active workflow') return } @@ -714,7 +725,7 @@ export function useCollaborativeWorkflow() { } } }, - [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, isInActiveRoom, undoRedo] + [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, undoRedo] ) const collaborativeUpdateBlockName = useCallback( @@ -858,8 +869,8 @@ export function useCollaborativeWorkflow() { return } - if (!isInActiveRoom()) { - logger.debug('Skipping batch update parent - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch update parent - no active workflow') return } @@ -928,7 +939,7 @@ export function useCollaborativeWorkflow() { logger.debug('Batch updated parent for blocks', { updateCount: updates.length }) }, - [isBaselineDiffView, isInActiveRoom, undoRedo, addToQueue, activeWorkflowId, session?.user?.id] + [isBaselineDiffView, undoRedo, addToQueue, activeWorkflowId, session?.user?.id] ) const collaborativeToggleBlockAdvancedMode = useCallback( @@ -1020,8 +1031,8 @@ export function useCollaborativeWorkflow() { return false } - if (!isInActiveRoom()) { - logger.debug('Skipping batch add edges - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch add edges - no active workflow') return false } @@ -1055,7 +1066,7 @@ export function useCollaborativeWorkflow() { return true }, - [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, isInActiveRoom, undoRedo] + [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, undoRedo] ) const collaborativeBatchRemoveEdges = useCallback( @@ -1064,8 +1075,8 @@ export function useCollaborativeWorkflow() { return false } - if (!isInActiveRoom()) { - logger.debug('Skipping batch remove edges - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch remove edges - no active workflow') return false } @@ -1113,7 +1124,7 @@ export function useCollaborativeWorkflow() { logger.info('Batch removed edges', { count: validEdgeIds.length }) return true }, - [isBaselineDiffView, isInActiveRoom, addToQueue, activeWorkflowId, session, undoRedo] + [isBaselineDiffView, addToQueue, activeWorkflowId, session, undoRedo] ) const collaborativeSetSubblockValue = useCallback( @@ -1148,11 +1159,9 @@ export function useCollaborativeWorkflow() { // Best-effort; do not block on clearing } - // Only emit to socket if in active room - if (!isInActiveRoom()) { - logger.debug('Local update applied, skipping socket emit - not in active workflow', { - currentWorkflowId, - activeWorkflowId, + // Queue socket operation if we have an active workflow + if (!activeWorkflowId) { + logger.debug('Local update applied, skipping socket queue - no active workflow', { blockId, subblockId, }) @@ -1174,14 +1183,7 @@ export function useCollaborativeWorkflow() { userId: session?.user?.id || 'unknown', }) }, - [ - currentWorkflowId, - activeWorkflowId, - addToQueue, - session?.user?.id, - isBaselineDiffView, - isInActiveRoom, - ] + [activeWorkflowId, addToQueue, session?.user?.id, isBaselineDiffView] ) // Immediate tag selection (uses queue but processes immediately, no debouncing) @@ -1193,13 +1195,8 @@ export function useCollaborativeWorkflow() { return } - if (!isInActiveRoom()) { - logger.debug('Skipping tag selection - not in active workflow', { - currentWorkflowId, - activeWorkflowId, - blockId, - subblockId, - }) + if (!activeWorkflowId) { + logger.debug('Skipping tag selection - no active workflow', { blockId, subblockId }) return } @@ -1220,14 +1217,7 @@ export function useCollaborativeWorkflow() { userId: session?.user?.id || 'unknown', }) }, - [ - isBaselineDiffView, - addToQueue, - currentWorkflowId, - activeWorkflowId, - session?.user?.id, - isInActiveRoom, - ] + [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id] ) const collaborativeUpdateLoopType = useCallback( @@ -1514,8 +1504,8 @@ export function useCollaborativeWorkflow() { subBlockValues: Record> = {}, options?: { skipUndoRedo?: boolean } ) => { - if (!isInActiveRoom()) { - logger.debug('Skipping batch add blocks - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch add blocks - no active workflow') return false } @@ -1568,7 +1558,7 @@ export function useCollaborativeWorkflow() { return true }, - [addToQueue, activeWorkflowId, session?.user?.id, isBaselineDiffView, isInActiveRoom, undoRedo] + [addToQueue, activeWorkflowId, session?.user?.id, isBaselineDiffView, undoRedo] ) const collaborativeBatchRemoveBlocks = useCallback( @@ -1577,8 +1567,8 @@ export function useCollaborativeWorkflow() { return false } - if (!isInActiveRoom()) { - logger.debug('Skipping batch remove blocks - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch remove blocks - no active workflow') return false } @@ -1662,7 +1652,6 @@ export function useCollaborativeWorkflow() { addToQueue, activeWorkflowId, session?.user?.id, - isInActiveRoom, cancelOperationsForBlock, undoRedo, ] diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 86ccbc2c8..6bd9df299 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -161,14 +161,14 @@ export const env = createEnv({ // Rate Limiting Configuration RATE_LIMIT_WINDOW_MS: z.string().optional().default('60000'), // Rate limit window duration in milliseconds (default: 1 minute) MANUAL_EXECUTION_LIMIT: z.string().optional().default('999999'),// Manual execution bypass value (effectively unlimited) - RATE_LIMIT_FREE_SYNC: z.string().optional().default('10'), // Free tier sync API executions per minute - RATE_LIMIT_FREE_ASYNC: z.string().optional().default('50'), // Free tier async API executions per minute - RATE_LIMIT_PRO_SYNC: z.string().optional().default('25'), // Pro tier sync API executions per minute - RATE_LIMIT_PRO_ASYNC: z.string().optional().default('200'), // Pro tier async API executions per minute - RATE_LIMIT_TEAM_SYNC: z.string().optional().default('75'), // Team tier sync API executions per minute - RATE_LIMIT_TEAM_ASYNC: z.string().optional().default('500'), // Team tier async API executions per minute - RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('150'), // Enterprise tier sync API executions per minute - RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('1000'), // Enterprise tier async API executions per minute + RATE_LIMIT_FREE_SYNC: z.string().optional().default('50'), // Free tier sync API executions per minute + RATE_LIMIT_FREE_ASYNC: z.string().optional().default('200'), // Free tier async API executions per minute + RATE_LIMIT_PRO_SYNC: z.string().optional().default('150'), // Pro tier sync API executions per minute + RATE_LIMIT_PRO_ASYNC: z.string().optional().default('1000'), // Pro tier async API executions per minute + RATE_LIMIT_TEAM_SYNC: z.string().optional().default('300'), // Team tier sync API executions per minute + RATE_LIMIT_TEAM_ASYNC: z.string().optional().default('2500'), // Team tier async API executions per minute + RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute + RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute // Knowledge Base Processing Configuration - Shared across all processing methods KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes) diff --git a/apps/sim/lib/core/rate-limiter/types.ts b/apps/sim/lib/core/rate-limiter/types.ts index 282ee09e0..9dac4edb4 100644 --- a/apps/sim/lib/core/rate-limiter/types.ts +++ b/apps/sim/lib/core/rate-limiter/types.ts @@ -28,24 +28,24 @@ function createBucketConfig(ratePerMinute: number, burstMultiplier = 2): TokenBu export const RATE_LIMITS: Record = { free: { - sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_FREE_SYNC) || 10), - async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_FREE_ASYNC) || 50), - apiEndpoint: createBucketConfig(10), - }, - pro: { - sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_PRO_SYNC) || 25), - async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_PRO_ASYNC) || 200), + sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_FREE_SYNC) || 50), + async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_FREE_ASYNC) || 200), apiEndpoint: createBucketConfig(30), }, + pro: { + sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_PRO_SYNC) || 150), + async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_PRO_ASYNC) || 1000), + apiEndpoint: createBucketConfig(100), + }, team: { - sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_TEAM_SYNC) || 75), - async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_TEAM_ASYNC) || 500), - apiEndpoint: createBucketConfig(60), + sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_TEAM_SYNC) || 300), + async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_TEAM_ASYNC) || 2500), + apiEndpoint: createBucketConfig(200), }, enterprise: { - sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_ENTERPRISE_SYNC) || 150), - async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_ENTERPRISE_ASYNC) || 1000), - apiEndpoint: createBucketConfig(120), + sync: createBucketConfig(Number.parseInt(env.RATE_LIMIT_ENTERPRISE_SYNC) || 600), + async: createBucketConfig(Number.parseInt(env.RATE_LIMIT_ENTERPRISE_ASYNC) || 5000), + apiEndpoint: createBucketConfig(500), }, } diff --git a/apps/sim/socket/handlers/subblocks.ts b/apps/sim/socket/handlers/subblocks.ts index a630151dd..23896fed3 100644 --- a/apps/sim/socket/handlers/subblocks.ts +++ b/apps/sim/socket/handlers/subblocks.ts @@ -39,16 +39,23 @@ export function cleanupPendingSubblocksForSocket(socketId: string): void { export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('subblock-update', async (data) => { - const { blockId, subblockId, value, timestamp, operationId } = data + const { + workflowId: payloadWorkflowId, + blockId, + subblockId, + value, + timestamp, + operationId, + } = data try { - const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) - if (!workflowId || !session) { + if (!sessionWorkflowId || !session) { logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, { socketId: socket.id, - hasWorkflowId: !!workflowId, + hasWorkflowId: !!sessionWorkflowId, hasSession: !!session, }) socket.emit('operation-forbidden', { @@ -61,6 +68,24 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: return } + const workflowId = payloadWorkflowId || sessionWorkflowId + + if (payloadWorkflowId && payloadWorkflowId !== sessionWorkflowId) { + logger.warn('Workflow ID mismatch in subblock update', { + payloadWorkflowId, + sessionWorkflowId, + socketId: socket.id, + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: 'Workflow ID mismatch', + retryable: true, + }) + } + return + } + const hasRoom = await roomManager.hasWorkflowRoom(workflowId) if (!hasRoom) { logger.debug(`Ignoring subblock update: workflow room not found`, { @@ -182,20 +207,17 @@ async function flushSubblockUpdate( if (updateSuccessful) { // Broadcast to room excluding all senders (works cross-pod via Redis adapter) const senderSocketIds = [...pending.opToSocket.values()] + const broadcastPayload = { + workflowId, + blockId, + subblockId, + value, + timestamp, + } if (senderSocketIds.length > 0) { - io.to(workflowId).except(senderSocketIds).emit('subblock-update', { - blockId, - subblockId, - value, - timestamp, - }) + io.to(workflowId).except(senderSocketIds).emit('subblock-update', broadcastPayload) } else { - io.to(workflowId).emit('subblock-update', { - blockId, - subblockId, - value, - timestamp, - }) + io.to(workflowId).emit('subblock-update', broadcastPayload) } // Confirm all coalesced operationIds (io.to(socketId) works cross-pod) diff --git a/apps/sim/socket/handlers/variables.ts b/apps/sim/socket/handlers/variables.ts index 0421b4e60..5b36873a8 100644 --- a/apps/sim/socket/handlers/variables.ts +++ b/apps/sim/socket/handlers/variables.ts @@ -35,16 +35,16 @@ export function cleanupPendingVariablesForSocket(socketId: string): void { export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('variable-update', async (data) => { - const { variableId, field, value, timestamp, operationId } = data + const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data try { - const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) - if (!workflowId || !session) { + if (!sessionWorkflowId || !session) { logger.debug(`Ignoring variable update: socket not connected to any workflow room`, { socketId: socket.id, - hasWorkflowId: !!workflowId, + hasWorkflowId: !!sessionWorkflowId, hasSession: !!session, }) socket.emit('operation-forbidden', { @@ -57,6 +57,24 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: return } + const workflowId = payloadWorkflowId || sessionWorkflowId + + if (payloadWorkflowId && payloadWorkflowId !== sessionWorkflowId) { + logger.warn('Workflow ID mismatch in variable update', { + payloadWorkflowId, + sessionWorkflowId, + socketId: socket.id, + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: 'Workflow ID mismatch', + retryable: true, + }) + } + return + } + const hasRoom = await roomManager.hasWorkflowRoom(workflowId) if (!hasRoom) { logger.debug(`Ignoring variable update: workflow room not found`, { @@ -179,20 +197,17 @@ async function flushVariableUpdate( if (updateSuccessful) { // Broadcast to room excluding all senders (works cross-pod via Redis adapter) const senderSocketIds = [...pending.opToSocket.values()] + const broadcastPayload = { + workflowId, + variableId, + field, + value, + timestamp, + } if (senderSocketIds.length > 0) { - io.to(workflowId).except(senderSocketIds).emit('variable-update', { - variableId, - field, - value, - timestamp, - }) + io.to(workflowId).except(senderSocketIds).emit('variable-update', broadcastPayload) } else { - io.to(workflowId).emit('variable-update', { - variableId, - field, - value, - timestamp, - }) + io.to(workflowId).emit('variable-update', broadcastPayload) } // Confirm all coalesced operationIds (io.to(socketId) works cross-pod) diff --git a/apps/sim/stores/operation-queue/store.ts b/apps/sim/stores/operation-queue/store.ts index e9f3deedc..657535c5b 100644 --- a/apps/sim/stores/operation-queue/store.ts +++ b/apps/sim/stores/operation-queue/store.ts @@ -24,16 +24,40 @@ let emitWorkflowOperation: | ((operation: string, target: string, payload: any, operationId?: string) => void) | null = null let emitSubblockUpdate: - | ((blockId: string, subblockId: string, value: any, operationId?: string) => void) + | (( + blockId: string, + subblockId: string, + value: any, + operationId: string | undefined, + workflowId: string + ) => void) | null = null let emitVariableUpdate: - | ((variableId: string, field: string, value: any, operationId?: string) => void) + | (( + variableId: string, + field: string, + value: any, + operationId: string | undefined, + workflowId: string + ) => void) | null = null export function registerEmitFunctions( workflowEmit: (operation: string, target: string, payload: any, operationId?: string) => void, - subblockEmit: (blockId: string, subblockId: string, value: any, operationId?: string) => void, - variableEmit: (variableId: string, field: string, value: any, operationId?: string) => void, + subblockEmit: ( + blockId: string, + subblockId: string, + value: any, + operationId: string | undefined, + workflowId: string + ) => void, + variableEmit: ( + variableId: string, + field: string, + value: any, + operationId: string | undefined, + workflowId: string + ) => void, workflowId: string | null ) { emitWorkflowOperation = workflowEmit @@ -196,14 +220,16 @@ export const useOperationQueueStore = create((set, get) => } if (!retryable) { - logger.debug('Operation marked as non-retryable, removing from queue', { operationId }) + logger.error( + 'Operation failed with non-retryable error - state out of sync, triggering offline mode', + { + operationId, + operation: operation.operation.operation, + target: operation.operation.target, + } + ) - set((state) => ({ - operations: state.operations.filter((op) => op.id !== operationId), - isProcessing: false, - })) - - get().processNextOperation() + get().triggerOfflineMode() return } @@ -305,11 +331,23 @@ export const useOperationQueueStore = create((set, get) => const { operation: op, target, payload } = nextOperation.operation if (op === 'subblock-update' && target === 'subblock') { if (emitSubblockUpdate) { - emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, nextOperation.id) + emitSubblockUpdate( + payload.blockId, + payload.subblockId, + payload.value, + nextOperation.id, + nextOperation.workflowId + ) } } else if (op === 'variable-update' && target === 'variable') { if (emitVariableUpdate) { - emitVariableUpdate(payload.variableId, payload.field, payload.value, nextOperation.id) + emitVariableUpdate( + payload.variableId, + payload.field, + payload.value, + nextOperation.id, + nextOperation.workflowId + ) } } else { if (emitWorkflowOperation) { diff --git a/helm/sim/values.yaml b/helm/sim/values.yaml index e78e0f917..aa227c987 100644 --- a/helm/sim/values.yaml +++ b/helm/sim/values.yaml @@ -125,8 +125,8 @@ app: # Rate Limiting Configuration (per minute) RATE_LIMIT_WINDOW_MS: "60000" # Rate limit window duration (1 minute) - RATE_LIMIT_FREE_SYNC: "10" # Sync API executions per minute - RATE_LIMIT_FREE_ASYNC: "50" # Async API executions per minute + RATE_LIMIT_FREE_SYNC: "50" # Sync API executions per minute + RATE_LIMIT_FREE_ASYNC: "200" # Async API executions per minute # UI Branding & Whitelabeling Configuration NEXT_PUBLIC_BRAND_NAME: "Sim" # Custom brand name