improve sockets with reconnecting

This commit is contained in:
Vikhyath Mondreti
2026-01-31 13:25:52 -08:00
parent afead54c2e
commit 2879ab6c0f
6 changed files with 275 additions and 178 deletions

View File

@@ -1,10 +1,11 @@
'use client'
import type React from 'react'
import { createContext, useCallback, useContext, useEffect, useMemo, useState } from 'react'
import { createContext, useCallback, useContext, useEffect, useMemo, useRef, useState } from 'react'
import { createLogger } from '@sim/logger'
import { useQueryClient } from '@tanstack/react-query'
import { useParams } from 'next/navigation'
import { useSocket } from '@/app/workspace/providers/socket-provider'
import {
useWorkspacePermissionsQuery,
type WorkspacePermissions,
@@ -57,14 +58,35 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
const [hasShownOfflineNotification, setHasShownOfflineNotification] = useState(false)
const hasOperationError = useOperationQueueStore((state) => state.hasOperationError)
const addNotification = useNotificationStore((state) => state.addNotification)
const removeNotification = useNotificationStore((state) => state.removeNotification)
const { isReconnecting } = useSocket()
const reconnectingNotificationIdRef = useRef<string | null>(null)
const isOfflineMode = hasOperationError
useEffect(() => {
if (isReconnecting && !reconnectingNotificationIdRef.current && !isOfflineMode) {
const id = addNotification({
level: 'error',
message: 'Reconnecting...',
})
reconnectingNotificationIdRef.current = id
} else if (!isReconnecting && reconnectingNotificationIdRef.current) {
removeNotification(reconnectingNotificationIdRef.current)
reconnectingNotificationIdRef.current = null
}
}, [isReconnecting, isOfflineMode, addNotification, removeNotification])
useEffect(() => {
if (!isOfflineMode || hasShownOfflineNotification) {
return
}
if (reconnectingNotificationIdRef.current) {
removeNotification(reconnectingNotificationIdRef.current)
reconnectingNotificationIdRef.current = null
}
try {
addNotification({
level: 'error',
@@ -78,7 +100,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
} catch (error) {
logger.error('Failed to add offline notification', { error })
}
}, [addNotification, hasShownOfflineNotification, isOfflineMode])
}, [addNotification, removeNotification, hasShownOfflineNotification, isOfflineMode])
const {
data: workspacePermissions,

View File

@@ -49,6 +49,7 @@ interface SocketContextType {
socket: Socket | null
isConnected: boolean
isConnecting: boolean
isReconnecting: boolean
authFailed: boolean
currentWorkflowId: string | null
currentSocketId: string | null
@@ -88,6 +89,7 @@ const SocketContext = createContext<SocketContextType>({
socket: null,
isConnected: false,
isConnecting: false,
isReconnecting: false,
authFailed: false,
currentWorkflowId: null,
currentSocketId: null,
@@ -122,6 +124,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [socket, setSocket] = useState<Socket | null>(null)
const [isConnected, setIsConnected] = useState(false)
const [isConnecting, setIsConnecting] = useState(false)
const [isReconnecting, setIsReconnecting] = useState(false)
const [currentWorkflowId, setCurrentWorkflowId] = useState<string | null>(null)
const [currentSocketId, setCurrentSocketId] = useState<string | null>(null)
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
@@ -236,20 +239,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
setCurrentWorkflowId(null)
setPresenceUsers([])
logger.info('Socket disconnected', {
reason,
})
// socket.active indicates if auto-reconnect will happen
if (socketInstance.active) {
setIsReconnecting(true)
logger.info('Socket disconnected, will auto-reconnect', { reason })
} else {
setIsReconnecting(false)
logger.info('Socket disconnected, no auto-reconnect', { reason })
}
})
socketInstance.on('connect_error', (error: any) => {
socketInstance.on('connect_error', (error: Error) => {
setIsConnecting(false)
logger.error('Socket connection error:', {
message: error.message,
stack: error.stack,
description: error.description,
type: error.type,
transport: error.transport,
})
logger.error('Socket connection error:', { message: error.message })
// Check if this is an authentication failure
const isAuthError =
@@ -261,43 +263,41 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.warn(
'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.'
)
// Stop reconnection attempts to prevent infinite loop
socketInstance.disconnect()
// Reset state to allow re-initialization when session is restored
setSocket(null)
setAuthFailed(true)
setIsReconnecting(false)
initializedRef.current = false
} else if (socketInstance.active) {
// Temporary failure, will auto-reconnect
setIsReconnecting(true)
}
})
socketInstance.on('reconnect', (attemptNumber) => {
// Reconnection events are on the Manager (socket.io), not the socket itself
socketInstance.io.on('reconnect', (attemptNumber) => {
setIsConnected(true)
setIsReconnecting(false)
setCurrentSocketId(socketInstance.id ?? null)
logger.info('Socket reconnected successfully', {
attemptNumber,
socketId: socketInstance.id,
transport: socketInstance.io.engine?.transport?.name,
})
// Note: join-workflow is handled by the useEffect watching isConnected
})
socketInstance.on('reconnect_attempt', (attemptNumber) => {
logger.info('Socket reconnection attempt (fresh token will be generated)', {
attemptNumber,
timestamp: new Date().toISOString(),
})
socketInstance.io.on('reconnect_attempt', (attemptNumber) => {
setIsReconnecting(true)
logger.info('Socket reconnection attempt', { attemptNumber })
})
socketInstance.on('reconnect_error', (error: any) => {
logger.error('Socket reconnection error:', {
message: error.message,
attemptNumber: error.attemptNumber,
type: error.type,
})
socketInstance.io.on('reconnect_error', (error: Error) => {
logger.error('Socket reconnection error:', { message: error.message })
})
socketInstance.on('reconnect_failed', () => {
socketInstance.io.on('reconnect_failed', () => {
logger.error('Socket reconnection failed - all attempts exhausted')
setIsReconnecting(false)
setIsConnecting(false)
})
@@ -629,6 +629,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
if (commit) {
socket.emit('workflow-operation', {
workflowId: currentWorkflowId,
operation,
target,
payload,
@@ -645,6 +646,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
pendingPositionUpdates.current.set(blockId, {
workflowId: currentWorkflowId,
operation,
target,
payload,
@@ -666,6 +668,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
} else {
socket.emit('workflow-operation', {
workflowId: currentWorkflowId,
operation,
target,
payload,
@@ -678,47 +681,53 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
)
const emitSubblockUpdate = useCallback(
(blockId: string, subblockId: string, value: any, operationId?: string) => {
if (socket && currentWorkflowId) {
socket.emit('subblock-update', {
blockId,
subblockId,
value,
timestamp: Date.now(),
operationId,
})
} else {
logger.warn('Cannot emit subblock update: no socket connection or workflow room', {
hasSocket: !!socket,
currentWorkflowId,
blockId,
subblockId,
})
(
blockId: string,
subblockId: string,
value: any,
operationId?: string,
workflowId?: string
) => {
if (!workflowId) {
logger.error('emitSubblockUpdate called without workflowId', { blockId, subblockId })
return
}
if (!socket) {
logger.warn('Cannot emit subblock update: no socket connection', { workflowId, blockId })
return
}
socket.emit('subblock-update', {
workflowId,
blockId,
subblockId,
value,
timestamp: Date.now(),
operationId,
})
},
[socket, currentWorkflowId]
[socket]
)
const emitVariableUpdate = useCallback(
(variableId: string, field: string, value: any, operationId?: string) => {
if (socket && currentWorkflowId) {
socket.emit('variable-update', {
variableId,
field,
value,
timestamp: Date.now(),
operationId,
})
} else {
logger.warn('Cannot emit variable update: no socket connection or workflow room', {
hasSocket: !!socket,
currentWorkflowId,
variableId,
field,
})
(variableId: string, field: string, value: any, operationId?: string, workflowId?: string) => {
if (!workflowId) {
logger.error('emitVariableUpdate called without workflowId', { variableId, field })
return
}
if (!socket) {
logger.warn('Cannot emit variable update: no socket connection', { workflowId, variableId })
return
}
socket.emit('variable-update', {
workflowId,
variableId,
field,
value,
timestamp: Date.now(),
operationId,
})
},
[socket, currentWorkflowId]
[socket]
)
const lastCursorEmit = useRef(0)
@@ -794,6 +803,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
isReconnecting,
authFailed,
currentWorkflowId,
currentSocketId,
@@ -820,6 +830,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
isReconnecting,
authFailed,
currentWorkflowId,
currentSocketId,

View File

@@ -146,10 +146,6 @@ export function useCollaborativeWorkflow() {
cancelOperationsForVariable,
} = useOperationQueue()
const isInActiveRoom = useCallback(() => {
return !!currentWorkflowId && activeWorkflowId === currentWorkflowId
}, [currentWorkflowId, activeWorkflowId])
// Register emit functions with operation queue store
useEffect(() => {
registerEmitFunctions(
@@ -162,10 +158,19 @@ export function useCollaborativeWorkflow() {
useEffect(() => {
const handleWorkflowOperation = (data: any) => {
const { operation, target, payload, userId } = data
const { operation, target, payload, userId, metadata } = data
if (isApplyingRemoteChange.current) return
// Filter broadcasts by workflowId to prevent cross-workflow updates
if (metadata?.workflowId && metadata.workflowId !== activeWorkflowId) {
logger.debug('Ignoring workflow operation for different workflow', {
broadcastWorkflowId: metadata.workflowId,
activeWorkflowId,
})
return
}
logger.info(`Received ${operation} on ${target} from user ${userId}`)
// Apply the operation to local state
@@ -436,16 +441,24 @@ export function useCollaborativeWorkflow() {
}
const handleSubblockUpdate = (data: any) => {
const { blockId, subblockId, value, userId } = data
const { workflowId, blockId, subblockId, value, userId } = data
if (isApplyingRemoteChange.current) return
// Filter broadcasts by workflowId to prevent cross-workflow updates
if (workflowId && workflowId !== activeWorkflowId) {
logger.debug('Ignoring subblock update for different workflow', {
broadcastWorkflowId: workflowId,
activeWorkflowId,
})
return
}
logger.info(`Received subblock update from user ${userId}: ${blockId}.${subblockId}`)
isApplyingRemoteChange.current = true
try {
// The setValue function automatically uses the active workflow ID
useSubBlockStore.getState().setValue(blockId, subblockId, value)
const blockType = useWorkflowStore.getState().blocks?.[blockId]?.type
if (activeWorkflowId && blockType === 'function' && subblockId === 'code') {
@@ -459,10 +472,19 @@ export function useCollaborativeWorkflow() {
}
const handleVariableUpdate = (data: any) => {
const { variableId, field, value, userId } = data
const { workflowId, variableId, field, value, userId } = data
if (isApplyingRemoteChange.current) return
// Filter broadcasts by workflowId to prevent cross-workflow updates
if (workflowId && workflowId !== activeWorkflowId) {
logger.debug('Ignoring variable update for different workflow', {
broadcastWorkflowId: workflowId,
activeWorkflowId,
})
return
}
logger.info(`Received variable update from user ${userId}: ${variableId}.${field}`)
isApplyingRemoteChange.current = true
@@ -623,13 +645,9 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping operation - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
operation,
target,
})
// Queue operations if we have an active workflow - queue handles socket readiness
if (!activeWorkflowId) {
logger.debug('Skipping operation - no active workflow', { operation, target })
return
}
@@ -642,20 +660,13 @@ export function useCollaborativeWorkflow() {
target,
payload,
},
workflowId: activeWorkflowId || '',
workflowId: activeWorkflowId,
userId: session?.user?.id || 'unknown',
})
localAction()
},
[
addToQueue,
session?.user?.id,
isBaselineDiffView,
activeWorkflowId,
isInActiveRoom,
currentWorkflowId,
]
[addToQueue, session?.user?.id, isBaselineDiffView, activeWorkflowId]
)
const collaborativeBatchUpdatePositions = useCallback(
@@ -669,8 +680,8 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch position update - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch position update - no active workflow')
return
}
@@ -714,7 +725,7 @@ export function useCollaborativeWorkflow() {
}
}
},
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, isInActiveRoom, undoRedo]
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, undoRedo]
)
const collaborativeUpdateBlockName = useCallback(
@@ -858,8 +869,8 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch update parent - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch update parent - no active workflow')
return
}
@@ -928,7 +939,7 @@ export function useCollaborativeWorkflow() {
logger.debug('Batch updated parent for blocks', { updateCount: updates.length })
},
[isBaselineDiffView, isInActiveRoom, undoRedo, addToQueue, activeWorkflowId, session?.user?.id]
[isBaselineDiffView, undoRedo, addToQueue, activeWorkflowId, session?.user?.id]
)
const collaborativeToggleBlockAdvancedMode = useCallback(
@@ -1020,8 +1031,8 @@ export function useCollaborativeWorkflow() {
return false
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch add edges - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch add edges - no active workflow')
return false
}
@@ -1055,7 +1066,7 @@ export function useCollaborativeWorkflow() {
return true
},
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, isInActiveRoom, undoRedo]
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id, undoRedo]
)
const collaborativeBatchRemoveEdges = useCallback(
@@ -1064,8 +1075,8 @@ export function useCollaborativeWorkflow() {
return false
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch remove edges - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch remove edges - no active workflow')
return false
}
@@ -1113,7 +1124,7 @@ export function useCollaborativeWorkflow() {
logger.info('Batch removed edges', { count: validEdgeIds.length })
return true
},
[isBaselineDiffView, isInActiveRoom, addToQueue, activeWorkflowId, session, undoRedo]
[isBaselineDiffView, addToQueue, activeWorkflowId, session, undoRedo]
)
const collaborativeSetSubblockValue = useCallback(
@@ -1148,11 +1159,9 @@ export function useCollaborativeWorkflow() {
// Best-effort; do not block on clearing
}
// Only emit to socket if in active room
if (!isInActiveRoom()) {
logger.debug('Local update applied, skipping socket emit - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
// Queue socket operation if we have an active workflow
if (!activeWorkflowId) {
logger.debug('Local update applied, skipping socket queue - no active workflow', {
blockId,
subblockId,
})
@@ -1174,14 +1183,7 @@ export function useCollaborativeWorkflow() {
userId: session?.user?.id || 'unknown',
})
},
[
currentWorkflowId,
activeWorkflowId,
addToQueue,
session?.user?.id,
isBaselineDiffView,
isInActiveRoom,
]
[activeWorkflowId, addToQueue, session?.user?.id, isBaselineDiffView]
)
// Immediate tag selection (uses queue but processes immediately, no debouncing)
@@ -1193,13 +1195,8 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping tag selection - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
blockId,
subblockId,
})
if (!activeWorkflowId) {
logger.debug('Skipping tag selection - no active workflow', { blockId, subblockId })
return
}
@@ -1220,14 +1217,7 @@ export function useCollaborativeWorkflow() {
userId: session?.user?.id || 'unknown',
})
},
[
isBaselineDiffView,
addToQueue,
currentWorkflowId,
activeWorkflowId,
session?.user?.id,
isInActiveRoom,
]
[isBaselineDiffView, addToQueue, activeWorkflowId, session?.user?.id]
)
const collaborativeUpdateLoopType = useCallback(
@@ -1514,8 +1504,8 @@ export function useCollaborativeWorkflow() {
subBlockValues: Record<string, Record<string, unknown>> = {},
options?: { skipUndoRedo?: boolean }
) => {
if (!isInActiveRoom()) {
logger.debug('Skipping batch add blocks - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch add blocks - no active workflow')
return false
}
@@ -1568,7 +1558,7 @@ export function useCollaborativeWorkflow() {
return true
},
[addToQueue, activeWorkflowId, session?.user?.id, isBaselineDiffView, isInActiveRoom, undoRedo]
[addToQueue, activeWorkflowId, session?.user?.id, isBaselineDiffView, undoRedo]
)
const collaborativeBatchRemoveBlocks = useCallback(
@@ -1577,8 +1567,8 @@ export function useCollaborativeWorkflow() {
return false
}
if (!isInActiveRoom()) {
logger.debug('Skipping batch remove blocks - not in active workflow')
if (!activeWorkflowId) {
logger.debug('Skipping batch remove blocks - no active workflow')
return false
}
@@ -1662,7 +1652,6 @@ export function useCollaborativeWorkflow() {
addToQueue,
activeWorkflowId,
session?.user?.id,
isInActiveRoom,
cancelOperationsForBlock,
undoRedo,
]

View File

@@ -39,16 +39,23 @@ export function cleanupPendingSubblocksForSocket(socketId: string): void {
export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('subblock-update', async (data) => {
const { blockId, subblockId, value, timestamp, operationId } = data
const {
workflowId: payloadWorkflowId,
blockId,
subblockId,
value,
timestamp,
operationId,
} = data
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
if (!sessionWorkflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasWorkflowId: !!sessionWorkflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
@@ -61,6 +68,24 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
return
}
const workflowId = payloadWorkflowId || sessionWorkflowId
if (payloadWorkflowId && payloadWorkflowId !== sessionWorkflowId) {
logger.warn('Workflow ID mismatch in subblock update', {
payloadWorkflowId,
sessionWorkflowId,
socketId: socket.id,
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Workflow ID mismatch',
retryable: true,
})
}
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
@@ -182,20 +207,17 @@ async function flushSubblockUpdate(
if (updateSuccessful) {
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
const broadcastPayload = {
workflowId,
blockId,
subblockId,
value,
timestamp,
}
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
io.to(workflowId).except(senderSocketIds).emit('subblock-update', broadcastPayload)
} else {
io.to(workflowId).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
io.to(workflowId).emit('subblock-update', broadcastPayload)
}
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)

View File

@@ -35,16 +35,16 @@ export function cleanupPendingVariablesForSocket(socketId: string): void {
export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('variable-update', async (data) => {
const { variableId, field, value, timestamp, operationId } = data
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
if (!sessionWorkflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasWorkflowId: !!sessionWorkflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
@@ -57,6 +57,24 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
return
}
const workflowId = payloadWorkflowId || sessionWorkflowId
if (payloadWorkflowId && payloadWorkflowId !== sessionWorkflowId) {
logger.warn('Workflow ID mismatch in variable update', {
payloadWorkflowId,
sessionWorkflowId,
socketId: socket.id,
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Workflow ID mismatch',
retryable: true,
})
}
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring variable update: workflow room not found`, {
@@ -179,20 +197,17 @@ async function flushVariableUpdate(
if (updateSuccessful) {
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
const broadcastPayload = {
workflowId,
variableId,
field,
value,
timestamp,
}
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
io.to(workflowId).except(senderSocketIds).emit('variable-update', broadcastPayload)
} else {
io.to(workflowId).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
io.to(workflowId).emit('variable-update', broadcastPayload)
}
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)

View File

@@ -24,16 +24,40 @@ let emitWorkflowOperation:
| ((operation: string, target: string, payload: any, operationId?: string) => void)
| null = null
let emitSubblockUpdate:
| ((blockId: string, subblockId: string, value: any, operationId?: string) => void)
| ((
blockId: string,
subblockId: string,
value: any,
operationId?: string,
workflowId?: string
) => void)
| null = null
let emitVariableUpdate:
| ((variableId: string, field: string, value: any, operationId?: string) => void)
| ((
variableId: string,
field: string,
value: any,
operationId?: string,
workflowId?: string
) => void)
| null = null
export function registerEmitFunctions(
workflowEmit: (operation: string, target: string, payload: any, operationId?: string) => void,
subblockEmit: (blockId: string, subblockId: string, value: any, operationId?: string) => void,
variableEmit: (variableId: string, field: string, value: any, operationId?: string) => void,
subblockEmit: (
blockId: string,
subblockId: string,
value: any,
operationId?: string,
workflowId?: string
) => void,
variableEmit: (
variableId: string,
field: string,
value: any,
operationId?: string,
workflowId?: string
) => void,
workflowId: string | null
) {
emitWorkflowOperation = workflowEmit
@@ -196,14 +220,16 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
}
if (!retryable) {
logger.debug('Operation marked as non-retryable, removing from queue', { operationId })
logger.error(
'Operation failed with non-retryable error - state out of sync, triggering offline mode',
{
operationId,
operation: operation.operation.operation,
target: operation.operation.target,
}
)
set((state) => ({
operations: state.operations.filter((op) => op.id !== operationId),
isProcessing: false,
}))
get().processNextOperation()
get().triggerOfflineMode()
return
}
@@ -305,11 +331,23 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
const { operation: op, target, payload } = nextOperation.operation
if (op === 'subblock-update' && target === 'subblock') {
if (emitSubblockUpdate) {
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, nextOperation.id)
emitSubblockUpdate(
payload.blockId,
payload.subblockId,
payload.value,
nextOperation.id,
nextOperation.workflowId
)
}
} else if (op === 'variable-update' && target === 'variable') {
if (emitVariableUpdate) {
emitVariableUpdate(payload.variableId, payload.field, payload.value, nextOperation.id)
emitVariableUpdate(
payload.variableId,
payload.field,
payload.value,
nextOperation.id,
nextOperation.workflowId
)
}
} else {
if (emitWorkflowOperation) {