From 2879ab6c0fbb566bc93e3747b563a7b9738ed61f Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 31 Jan 2026 13:25:52 -0800 Subject: [PATCH] improve sockets with reconnecting --- .../workspace-permissions-provider.tsx | 26 +++- .../workspace/providers/socket-provider.tsx | 133 ++++++++++-------- apps/sim/hooks/use-collaborative-workflow.ts | 129 ++++++++--------- apps/sim/socket/handlers/subblocks.ts | 54 ++++--- apps/sim/socket/handlers/variables.ts | 47 ++++--- apps/sim/stores/operation-queue/store.ts | 64 +++++++-- 6 files changed, 275 insertions(+), 178 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx index ab198204d..1429e2eba 100644 --- a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx +++ b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx @@ -1,10 +1,11 @@ 'use client' import type React from 'react' -import { createContext, useCallback, useContext, useEffect, useMemo, useState } from 'react' +import { createContext, useCallback, useContext, useEffect, useMemo, useRef, useState } from 'react' import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' import { useParams } from 'next/navigation' +import { useSocket } from '@/app/workspace/providers/socket-provider' import { useWorkspacePermissionsQuery, type WorkspacePermissions, @@ -57,14 +58,35 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP const [hasShownOfflineNotification, setHasShownOfflineNotification] = useState(false) const hasOperationError = useOperationQueueStore((state) => state.hasOperationError) const addNotification = useNotificationStore((state) => state.addNotification) + const removeNotification = useNotificationStore((state) => state.removeNotification) + const { isReconnecting } = useSocket() + const reconnectingNotificationIdRef = useRef(null) const isOfflineMode = hasOperationError + useEffect(() => { + if (isReconnecting && !reconnectingNotificationIdRef.current && !isOfflineMode) { + const id = addNotification({ + level: 'error', + message: 'Reconnecting...', + }) + reconnectingNotificationIdRef.current = id + } else if (!isReconnecting && reconnectingNotificationIdRef.current) { + removeNotification(reconnectingNotificationIdRef.current) + reconnectingNotificationIdRef.current = null + } + }, [isReconnecting, isOfflineMode, addNotification, removeNotification]) + useEffect(() => { if (!isOfflineMode || hasShownOfflineNotification) { return } + if (reconnectingNotificationIdRef.current) { + removeNotification(reconnectingNotificationIdRef.current) + reconnectingNotificationIdRef.current = null + } + try { addNotification({ level: 'error', @@ -78,7 +100,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP } catch (error) { logger.error('Failed to add offline notification', { error }) } - }, [addNotification, hasShownOfflineNotification, isOfflineMode]) + }, [addNotification, removeNotification, hasShownOfflineNotification, isOfflineMode]) const { data: workspacePermissions, diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 0b4c5d017..9904f2db3 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -49,6 +49,7 @@ interface SocketContextType { socket: Socket | null isConnected: boolean isConnecting: boolean + isReconnecting: boolean authFailed: boolean currentWorkflowId: string | null currentSocketId: string | null @@ -88,6 +89,7 @@ const SocketContext = createContext({ socket: null, isConnected: false, isConnecting: false, + isReconnecting: false, authFailed: false, currentWorkflowId: null, currentSocketId: null, @@ -122,6 +124,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [socket, setSocket] = useState(null) const [isConnected, setIsConnected] = useState(false) const [isConnecting, setIsConnecting] = useState(false) + const [isReconnecting, setIsReconnecting] = useState(false) const [currentWorkflowId, setCurrentWorkflowId] = useState(null) const [currentSocketId, setCurrentSocketId] = useState(null) const [presenceUsers, setPresenceUsers] = useState([]) @@ -236,20 +239,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) { setCurrentWorkflowId(null) setPresenceUsers([]) - logger.info('Socket disconnected', { - reason, - }) + // socket.active indicates if auto-reconnect will happen + if (socketInstance.active) { + setIsReconnecting(true) + logger.info('Socket disconnected, will auto-reconnect', { reason }) + } else { + setIsReconnecting(false) + logger.info('Socket disconnected, no auto-reconnect', { reason }) + } }) - socketInstance.on('connect_error', (error: any) => { + socketInstance.on('connect_error', (error: Error) => { setIsConnecting(false) - logger.error('Socket connection error:', { - message: error.message, - stack: error.stack, - description: error.description, - type: error.type, - transport: error.transport, - }) + logger.error('Socket connection error:', { message: error.message }) // Check if this is an authentication failure const isAuthError = @@ -261,43 +263,41 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.warn( 'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.' ) - // Stop reconnection attempts to prevent infinite loop socketInstance.disconnect() - // Reset state to allow re-initialization when session is restored setSocket(null) setAuthFailed(true) + setIsReconnecting(false) initializedRef.current = false + } else if (socketInstance.active) { + // Temporary failure, will auto-reconnect + setIsReconnecting(true) } }) - socketInstance.on('reconnect', (attemptNumber) => { + // Reconnection events are on the Manager (socket.io), not the socket itself + socketInstance.io.on('reconnect', (attemptNumber) => { setIsConnected(true) + setIsReconnecting(false) setCurrentSocketId(socketInstance.id ?? null) logger.info('Socket reconnected successfully', { attemptNumber, socketId: socketInstance.id, transport: socketInstance.io.engine?.transport?.name, }) - // Note: join-workflow is handled by the useEffect watching isConnected }) - socketInstance.on('reconnect_attempt', (attemptNumber) => { - logger.info('Socket reconnection attempt (fresh token will be generated)', { - attemptNumber, - timestamp: new Date().toISOString(), - }) + socketInstance.io.on('reconnect_attempt', (attemptNumber) => { + setIsReconnecting(true) + logger.info('Socket reconnection attempt', { attemptNumber }) }) - socketInstance.on('reconnect_error', (error: any) => { - logger.error('Socket reconnection error:', { - message: error.message, - attemptNumber: error.attemptNumber, - type: error.type, - }) + socketInstance.io.on('reconnect_error', (error: Error) => { + logger.error('Socket reconnection error:', { message: error.message }) }) - socketInstance.on('reconnect_failed', () => { + socketInstance.io.on('reconnect_failed', () => { logger.error('Socket reconnection failed - all attempts exhausted') + setIsReconnecting(false) setIsConnecting(false) }) @@ -629,6 +629,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { if (commit) { socket.emit('workflow-operation', { + workflowId: currentWorkflowId, operation, target, payload, @@ -645,6 +646,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } pendingPositionUpdates.current.set(blockId, { + workflowId: currentWorkflowId, operation, target, payload, @@ -666,6 +668,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } } else { socket.emit('workflow-operation', { + workflowId: currentWorkflowId, operation, target, payload, @@ -678,47 +681,53 @@ export function SocketProvider({ children, user }: SocketProviderProps) { ) const emitSubblockUpdate = useCallback( - (blockId: string, subblockId: string, value: any, operationId?: string) => { - if (socket && currentWorkflowId) { - socket.emit('subblock-update', { - blockId, - subblockId, - value, - timestamp: Date.now(), - operationId, - }) - } else { - logger.warn('Cannot emit subblock update: no socket connection or workflow room', { - hasSocket: !!socket, - currentWorkflowId, - blockId, - subblockId, - }) + ( + blockId: string, + subblockId: string, + value: any, + operationId?: string, + workflowId?: string + ) => { + if (!workflowId) { + logger.error('emitSubblockUpdate called without workflowId', { blockId, subblockId }) + return } + if (!socket) { + logger.warn('Cannot emit subblock update: no socket connection', { workflowId, blockId }) + return + } + socket.emit('subblock-update', { + workflowId, + blockId, + subblockId, + value, + timestamp: Date.now(), + operationId, + }) }, - [socket, currentWorkflowId] + [socket] ) const emitVariableUpdate = useCallback( - (variableId: string, field: string, value: any, operationId?: string) => { - if (socket && currentWorkflowId) { - socket.emit('variable-update', { - variableId, - field, - value, - timestamp: Date.now(), - operationId, - }) - } else { - logger.warn('Cannot emit variable update: no socket connection or workflow room', { - hasSocket: !!socket, - currentWorkflowId, - variableId, - field, - }) + (variableId: string, field: string, value: any, operationId?: string, workflowId?: string) => { + if (!workflowId) { + logger.error('emitVariableUpdate called without workflowId', { variableId, field }) + return } + if (!socket) { + logger.warn('Cannot emit variable update: no socket connection', { workflowId, variableId }) + return + } + socket.emit('variable-update', { + workflowId, + variableId, + field, + value, + timestamp: Date.now(), + operationId, + }) }, - [socket, currentWorkflowId] + [socket] ) const lastCursorEmit = useRef(0) @@ -794,6 +803,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socket, isConnected, isConnecting, + isReconnecting, authFailed, currentWorkflowId, currentSocketId, @@ -820,6 +830,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socket, isConnected, isConnecting, + isReconnecting, authFailed, currentWorkflowId, currentSocketId, diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index ea0600330..caf0aad9f 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -146,10 +146,6 @@ export function useCollaborativeWorkflow() { cancelOperationsForVariable, } = useOperationQueue() - const isInActiveRoom = useCallback(() => { - return !!currentWorkflowId && activeWorkflowId === currentWorkflowId - }, [currentWorkflowId, activeWorkflowId]) - // Register emit functions with operation queue store useEffect(() => { registerEmitFunctions( @@ -162,10 +158,19 @@ export function useCollaborativeWorkflow() { useEffect(() => { const handleWorkflowOperation = (data: any) => { - const { operation, target, payload, userId } = data + const { operation, target, payload, userId, metadata } = data if (isApplyingRemoteChange.current) return + // Filter broadcasts by workflowId to prevent cross-workflow updates + if (metadata?.workflowId && metadata.workflowId !== activeWorkflowId) { + logger.debug('Ignoring workflow operation for different workflow', { + broadcastWorkflowId: metadata.workflowId, + activeWorkflowId, + }) + return + } + logger.info(`Received ${operation} on ${target} from user ${userId}`) // Apply the operation to local state @@ -436,16 +441,24 @@ export function useCollaborativeWorkflow() { } const handleSubblockUpdate = (data: any) => { - const { blockId, subblockId, value, userId } = data + const { workflowId, blockId, subblockId, value, userId } = data if (isApplyingRemoteChange.current) return + // Filter broadcasts by workflowId to prevent cross-workflow updates + if (workflowId && workflowId !== activeWorkflowId) { + logger.debug('Ignoring subblock update for different workflow', { + broadcastWorkflowId: workflowId, + activeWorkflowId, + }) + return + } + logger.info(`Received subblock update from user ${userId}: ${blockId}.${subblockId}`) isApplyingRemoteChange.current = true try { - // The setValue function automatically uses the active workflow ID useSubBlockStore.getState().setValue(blockId, subblockId, value) const blockType = useWorkflowStore.getState().blocks?.[blockId]?.type if (activeWorkflowId && blockType === 'function' && subblockId === 'code') { @@ -459,10 +472,19 @@ export function useCollaborativeWorkflow() { } const handleVariableUpdate = (data: any) => { - const { variableId, field, value, userId } = data + const { workflowId, variableId, field, value, userId } = data if (isApplyingRemoteChange.current) return + // Filter broadcasts by workflowId to prevent cross-workflow updates + if (workflowId && workflowId !== activeWorkflowId) { + logger.debug('Ignoring variable update for different workflow', { + broadcastWorkflowId: workflowId, + activeWorkflowId, + }) + return + } + logger.info(`Received variable update from user ${userId}: ${variableId}.${field}`) isApplyingRemoteChange.current = true @@ -623,13 +645,9 @@ export function useCollaborativeWorkflow() { return } - if (!isInActiveRoom()) { - logger.debug('Skipping operation - not in active workflow', { - currentWorkflowId, - activeWorkflowId, - operation, - target, - }) + // Queue operations if we have an active workflow - queue handles socket readiness + if (!activeWorkflowId) { + logger.debug('Skipping operation - no active workflow', { operation, target }) return } @@ -642,20 +660,13 @@ export function useCollaborativeWorkflow() { target, payload, }, - workflowId: activeWorkflowId || '', + workflowId: activeWorkflowId, userId: session?.user?.id || 'unknown', }) localAction() }, - [ - addToQueue, - session?.user?.id, - isBaselineDiffView, - activeWorkflowId, - isInActiveRoom, - currentWorkflowId, - ] + [addToQueue, session?.user?.id, isBaselineDiffView, activeWorkflowId] ) const collaborativeBatchUpdatePositions = useCallback( @@ -669,8 +680,8 @@ export function useCollaborativeWorkflow() { return } - if (!isInActiveRoom()) { - logger.debug('Skipping batch position update - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch position update - no active workflow') return } @@ -714,7 +725,7 @@ export function useCollaborativeWorkflow() { } } }, - [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, isInActiveRoom, undoRedo] + [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, undoRedo] ) const collaborativeUpdateBlockName = useCallback( @@ -858,8 +869,8 @@ export function useCollaborativeWorkflow() { return } - if (!isInActiveRoom()) { - logger.debug('Skipping batch update parent - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch update parent - no active workflow') return } @@ -928,7 +939,7 @@ export function useCollaborativeWorkflow() { logger.debug('Batch updated parent for blocks', { updateCount: updates.length }) }, - [isBaselineDiffView, isInActiveRoom, undoRedo, addToQueue, activeWorkflowId, session?.user?.id] + [isBaselineDiffView, undoRedo, addToQueue, activeWorkflowId, session?.user?.id] ) const collaborativeToggleBlockAdvancedMode = useCallback( @@ -1020,8 +1031,8 @@ export function useCollaborativeWorkflow() { return false } - if (!isInActiveRoom()) { - logger.debug('Skipping batch add edges - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch add edges - no active workflow') return false } @@ -1055,7 +1066,7 @@ export function useCollaborativeWorkflow() { return true }, - [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, isInActiveRoom, undoRedo] + [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, undoRedo] ) const collaborativeBatchRemoveEdges = useCallback( @@ -1064,8 +1075,8 @@ export function useCollaborativeWorkflow() { return false } - if (!isInActiveRoom()) { - logger.debug('Skipping batch remove edges - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch remove edges - no active workflow') return false } @@ -1113,7 +1124,7 @@ export function useCollaborativeWorkflow() { logger.info('Batch removed edges', { count: validEdgeIds.length }) return true }, - [isBaselineDiffView, isInActiveRoom, addToQueue, activeWorkflowId, session, undoRedo] + [isBaselineDiffView, addToQueue, activeWorkflowId, session, undoRedo] ) const collaborativeSetSubblockValue = useCallback( @@ -1148,11 +1159,9 @@ export function useCollaborativeWorkflow() { // Best-effort; do not block on clearing } - // Only emit to socket if in active room - if (!isInActiveRoom()) { - logger.debug('Local update applied, skipping socket emit - not in active workflow', { - currentWorkflowId, - activeWorkflowId, + // Queue socket operation if we have an active workflow + if (!activeWorkflowId) { + logger.debug('Local update applied, skipping socket queue - no active workflow', { blockId, subblockId, }) @@ -1174,14 +1183,7 @@ export function useCollaborativeWorkflow() { userId: session?.user?.id || 'unknown', }) }, - [ - currentWorkflowId, - activeWorkflowId, - addToQueue, - session?.user?.id, - isBaselineDiffView, - isInActiveRoom, - ] + [activeWorkflowId, addToQueue, session?.user?.id, isBaselineDiffView] ) // Immediate tag selection (uses queue but processes immediately, no debouncing) @@ -1193,13 +1195,8 @@ export function useCollaborativeWorkflow() { return } - if (!isInActiveRoom()) { - logger.debug('Skipping tag selection - not in active workflow', { - currentWorkflowId, - activeWorkflowId, - blockId, - subblockId, - }) + if (!activeWorkflowId) { + logger.debug('Skipping tag selection - no active workflow', { blockId, subblockId }) return } @@ -1220,14 +1217,7 @@ export function useCollaborativeWorkflow() { userId: session?.user?.id || 'unknown', }) }, - [ - isBaselineDiffView, - addToQueue, - currentWorkflowId, - activeWorkflowId, - session?.user?.id, - isInActiveRoom, - ] + [isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id] ) const collaborativeUpdateLoopType = useCallback( @@ -1514,8 +1504,8 @@ export function useCollaborativeWorkflow() { subBlockValues: Record> = {}, options?: { skipUndoRedo?: boolean } ) => { - if (!isInActiveRoom()) { - logger.debug('Skipping batch add blocks - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch add blocks - no active workflow') return false } @@ -1568,7 +1558,7 @@ export function useCollaborativeWorkflow() { return true }, - [addToQueue, activeWorkflowId, session?.user?.id, isBaselineDiffView, isInActiveRoom, undoRedo] + [addToQueue, activeWorkflowId, session?.user?.id, isBaselineDiffView, undoRedo] ) const collaborativeBatchRemoveBlocks = useCallback( @@ -1577,8 +1567,8 @@ export function useCollaborativeWorkflow() { return false } - if (!isInActiveRoom()) { - logger.debug('Skipping batch remove blocks - not in active workflow') + if (!activeWorkflowId) { + logger.debug('Skipping batch remove blocks - no active workflow') return false } @@ -1662,7 +1652,6 @@ export function useCollaborativeWorkflow() { addToQueue, activeWorkflowId, session?.user?.id, - isInActiveRoom, cancelOperationsForBlock, undoRedo, ] diff --git a/apps/sim/socket/handlers/subblocks.ts b/apps/sim/socket/handlers/subblocks.ts index a630151dd..23896fed3 100644 --- a/apps/sim/socket/handlers/subblocks.ts +++ b/apps/sim/socket/handlers/subblocks.ts @@ -39,16 +39,23 @@ export function cleanupPendingSubblocksForSocket(socketId: string): void { export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('subblock-update', async (data) => { - const { blockId, subblockId, value, timestamp, operationId } = data + const { + workflowId: payloadWorkflowId, + blockId, + subblockId, + value, + timestamp, + operationId, + } = data try { - const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) - if (!workflowId || !session) { + if (!sessionWorkflowId || !session) { logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, { socketId: socket.id, - hasWorkflowId: !!workflowId, + hasWorkflowId: !!sessionWorkflowId, hasSession: !!session, }) socket.emit('operation-forbidden', { @@ -61,6 +68,24 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: return } + const workflowId = payloadWorkflowId || sessionWorkflowId + + if (payloadWorkflowId && payloadWorkflowId !== sessionWorkflowId) { + logger.warn('Workflow ID mismatch in subblock update', { + payloadWorkflowId, + sessionWorkflowId, + socketId: socket.id, + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: 'Workflow ID mismatch', + retryable: true, + }) + } + return + } + const hasRoom = await roomManager.hasWorkflowRoom(workflowId) if (!hasRoom) { logger.debug(`Ignoring subblock update: workflow room not found`, { @@ -182,20 +207,17 @@ async function flushSubblockUpdate( if (updateSuccessful) { // Broadcast to room excluding all senders (works cross-pod via Redis adapter) const senderSocketIds = [...pending.opToSocket.values()] + const broadcastPayload = { + workflowId, + blockId, + subblockId, + value, + timestamp, + } if (senderSocketIds.length > 0) { - io.to(workflowId).except(senderSocketIds).emit('subblock-update', { - blockId, - subblockId, - value, - timestamp, - }) + io.to(workflowId).except(senderSocketIds).emit('subblock-update', broadcastPayload) } else { - io.to(workflowId).emit('subblock-update', { - blockId, - subblockId, - value, - timestamp, - }) + io.to(workflowId).emit('subblock-update', broadcastPayload) } // Confirm all coalesced operationIds (io.to(socketId) works cross-pod) diff --git a/apps/sim/socket/handlers/variables.ts b/apps/sim/socket/handlers/variables.ts index 0421b4e60..5b36873a8 100644 --- a/apps/sim/socket/handlers/variables.ts +++ b/apps/sim/socket/handlers/variables.ts @@ -35,16 +35,16 @@ export function cleanupPendingVariablesForSocket(socketId: string): void { export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { socket.on('variable-update', async (data) => { - const { variableId, field, value, timestamp, operationId } = data + const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data try { - const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) + const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const session = await roomManager.getUserSession(socket.id) - if (!workflowId || !session) { + if (!sessionWorkflowId || !session) { logger.debug(`Ignoring variable update: socket not connected to any workflow room`, { socketId: socket.id, - hasWorkflowId: !!workflowId, + hasWorkflowId: !!sessionWorkflowId, hasSession: !!session, }) socket.emit('operation-forbidden', { @@ -57,6 +57,24 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: return } + const workflowId = payloadWorkflowId || sessionWorkflowId + + if (payloadWorkflowId && payloadWorkflowId !== sessionWorkflowId) { + logger.warn('Workflow ID mismatch in variable update', { + payloadWorkflowId, + sessionWorkflowId, + socketId: socket.id, + }) + if (operationId) { + socket.emit('operation-failed', { + operationId, + error: 'Workflow ID mismatch', + retryable: true, + }) + } + return + } + const hasRoom = await roomManager.hasWorkflowRoom(workflowId) if (!hasRoom) { logger.debug(`Ignoring variable update: workflow room not found`, { @@ -179,20 +197,17 @@ async function flushVariableUpdate( if (updateSuccessful) { // Broadcast to room excluding all senders (works cross-pod via Redis adapter) const senderSocketIds = [...pending.opToSocket.values()] + const broadcastPayload = { + workflowId, + variableId, + field, + value, + timestamp, + } if (senderSocketIds.length > 0) { - io.to(workflowId).except(senderSocketIds).emit('variable-update', { - variableId, - field, - value, - timestamp, - }) + io.to(workflowId).except(senderSocketIds).emit('variable-update', broadcastPayload) } else { - io.to(workflowId).emit('variable-update', { - variableId, - field, - value, - timestamp, - }) + io.to(workflowId).emit('variable-update', broadcastPayload) } // Confirm all coalesced operationIds (io.to(socketId) works cross-pod) diff --git a/apps/sim/stores/operation-queue/store.ts b/apps/sim/stores/operation-queue/store.ts index e9f3deedc..8801a4d91 100644 --- a/apps/sim/stores/operation-queue/store.ts +++ b/apps/sim/stores/operation-queue/store.ts @@ -24,16 +24,40 @@ let emitWorkflowOperation: | ((operation: string, target: string, payload: any, operationId?: string) => void) | null = null let emitSubblockUpdate: - | ((blockId: string, subblockId: string, value: any, operationId?: string) => void) + | (( + blockId: string, + subblockId: string, + value: any, + operationId?: string, + workflowId?: string + ) => void) | null = null let emitVariableUpdate: - | ((variableId: string, field: string, value: any, operationId?: string) => void) + | (( + variableId: string, + field: string, + value: any, + operationId?: string, + workflowId?: string + ) => void) | null = null export function registerEmitFunctions( workflowEmit: (operation: string, target: string, payload: any, operationId?: string) => void, - subblockEmit: (blockId: string, subblockId: string, value: any, operationId?: string) => void, - variableEmit: (variableId: string, field: string, value: any, operationId?: string) => void, + subblockEmit: ( + blockId: string, + subblockId: string, + value: any, + operationId?: string, + workflowId?: string + ) => void, + variableEmit: ( + variableId: string, + field: string, + value: any, + operationId?: string, + workflowId?: string + ) => void, workflowId: string | null ) { emitWorkflowOperation = workflowEmit @@ -196,14 +220,16 @@ export const useOperationQueueStore = create((set, get) => } if (!retryable) { - logger.debug('Operation marked as non-retryable, removing from queue', { operationId }) + logger.error( + 'Operation failed with non-retryable error - state out of sync, triggering offline mode', + { + operationId, + operation: operation.operation.operation, + target: operation.operation.target, + } + ) - set((state) => ({ - operations: state.operations.filter((op) => op.id !== operationId), - isProcessing: false, - })) - - get().processNextOperation() + get().triggerOfflineMode() return } @@ -305,11 +331,23 @@ export const useOperationQueueStore = create((set, get) => const { operation: op, target, payload } = nextOperation.operation if (op === 'subblock-update' && target === 'subblock') { if (emitSubblockUpdate) { - emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, nextOperation.id) + emitSubblockUpdate( + payload.blockId, + payload.subblockId, + payload.value, + nextOperation.id, + nextOperation.workflowId + ) } } else if (op === 'variable-update' && target === 'variable') { if (emitVariableUpdate) { - emitVariableUpdate(payload.variableId, payload.field, payload.value, nextOperation.id) + emitVariableUpdate( + payload.variableId, + payload.field, + payload.value, + nextOperation.id, + nextOperation.workflowId + ) } } else { if (emitWorkflowOperation) {