fix(sockets): move debounce to server side (#1265)

* fix(sockets): move debounce to server side

* remove comments / unused onBlur
This commit is contained in:
Vikhyath Mondreti
2025-09-06 12:49:35 -07:00
committed by GitHub
parent 851031239d
commit 1e14743391
9 changed files with 378 additions and 493 deletions

View File

@@ -14,7 +14,6 @@ import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/c
import { useWand } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand'
import type { SubBlockConfig } from '@/blocks/types'
import { useTagSelection } from '@/hooks/use-tag-selection'
import { useOperationQueueStore } from '@/stores/operation-queue/store'
const logger = createLogger('LongInput')
@@ -382,11 +381,6 @@ export function LongInput({
onScroll={handleScroll}
onWheel={handleWheel}
onKeyDown={handleKeyDown}
onBlur={() => {
try {
useOperationQueueStore.getState().flushDebouncedForBlock(blockId)
} catch {}
}}
onFocus={() => {
setShowEnvVars(false)
setShowTags(false)

View File

@@ -14,7 +14,6 @@ import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/c
import { useWand } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand'
import type { SubBlockConfig } from '@/blocks/types'
import { useTagSelection } from '@/hooks/use-tag-selection'
import { useOperationQueueStore } from '@/stores/operation-queue/store'
const logger = createLogger('ShortInput')
@@ -396,9 +395,6 @@ export function ShortInput({
onBlur={() => {
setIsFocused(false)
setShowEnvVars(false)
try {
useOperationQueueStore.getState().flushDebouncedForBlock(blockId)
} catch {}
}}
onDrop={handleDrop}
onDragOver={handleDragOver}

View File

@@ -18,7 +18,7 @@ interface UseSubBlockValueOptions {
/**
* Custom hook to get and set values for a sub-block in a workflow.
* Handles complex object values properly by using deep equality comparison.
* Includes automatic debouncing and explicit streaming mode for AI generation.
* Supports explicit streaming mode for AI generation.
*
* @param blockId The ID of the block containing the sub-block
* @param subBlockId The ID of the sub-block
@@ -181,7 +181,7 @@ export function useSubBlockValue<T = any>(
}
}
// Emit immediately - let the operation queue handle debouncing and deduplication
// Emit immediately; the client queue coalesces same-key ops and the server debounces
emitValue(valueCopy)
if (triggerWorkflowUpdate) {

View File

@@ -514,16 +514,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
`URL workflow changed from ${currentWorkflowId} to ${urlWorkflowId}, switching rooms`
)
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
// Flush debounced updates for the old workflow before switching rooms
if (currentWorkflowId) {
useOperationQueueStore.getState().flushDebouncedForWorkflow(currentWorkflowId)
} else {
useOperationQueueStore.getState().flushAllDebounced()
}
} catch {}
// Leave current workflow first if we're in one
if (currentWorkflowId) {
logger.info(`Leaving current workflow ${currentWorkflowId} before joining ${urlWorkflowId}`)
@@ -583,7 +573,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.info(`Leaving workflow: ${currentWorkflowId}`)
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
useOperationQueueStore.getState().flushDebouncedForWorkflow(currentWorkflowId)
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
} catch {}
socket.emit('leave-workflow')

View File

@@ -8,6 +8,16 @@ import type { RoomManager } from '@/socket-server/rooms/manager'
const logger = createLogger('SubblocksHandlers')
type PendingSubblock = {
latest: { blockId: string; subblockId: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
// Map operationId -> socketId to emit confirmations/failures to correct clients
opToSocket: Map<string, string>
}
// Keyed by `${workflowId}:${blockId}:${subblockId}`
const pendingSubblockUpdates = new Map<string, PendingSubblock>()
export function setupSubblocksHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
@@ -46,93 +56,31 @@ export function setupSubblocksHandlers(
userPresence.lastActivity = Date.now()
}
// First, verify that the workflow still exists in the database
const workflowExists = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (workflowExists.length === 0) {
logger.warn(`Ignoring subblock update: workflow ${workflowId} no longer exists`, {
socketId: socket.id,
blockId,
subblockId,
})
roomManager.cleanupUserFromRoom(socket.id, workflowId)
return
}
let updateSuccessful = false
await db.transaction(async (tx) => {
const [block] = await tx
.select({ subBlocks: workflowBlocks.subBlocks })
.from(workflowBlocks)
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
.limit(1)
if (!block) {
// Block was deleted - this is a normal race condition in collaborative editing
logger.debug(
`Ignoring subblock update for deleted block: ${workflowId}/${blockId}.${subblockId}`
)
return
}
const subBlocks = (block.subBlocks as any) || {}
if (!subBlocks[subblockId]) {
// Create new subblock with minimal structure
subBlocks[subblockId] = {
id: subblockId,
type: 'unknown', // Will be corrected by next collaborative update
value: value,
// Server-side debounce/coalesce by workflowId+blockId+subblockId
const debouncedKey = `${workflowId}:${blockId}:${subblockId}`
const existing = pendingSubblockUpdates.get(debouncedKey)
if (existing) {
clearTimeout(existing.timeout)
existing.latest = { blockId, subblockId, value, timestamp }
if (operationId) existing.opToSocket.set(operationId, socket.id)
existing.timeout = setTimeout(async () => {
await flushSubblockUpdate(workflowId, existing, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}, 25)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
const timeout = setTimeout(async () => {
const pending = pendingSubblockUpdates.get(debouncedKey)
if (pending) {
await flushSubblockUpdate(workflowId, pending, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}
} else {
// Preserve existing id and type, only update value
subBlocks[subblockId] = {
...subBlocks[subblockId],
value: value,
}
}
await tx
.update(workflowBlocks)
.set({
subBlocks: subBlocks,
updatedAt: new Date(),
})
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
updateSuccessful = true
})
// Only broadcast to other clients if the update was successful
if (updateSuccessful) {
socket.to(workflowId).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
senderId: socket.id,
userId: session.userId,
})
// Emit confirmation if operationId is provided
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
serverTimestamp: Date.now(),
})
}
logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`)
} else if (operationId) {
// Block was deleted - notify client that operation completed (but didn't update anything)
socket.emit('operation-failed', {
operationId,
error: 'Block no longer exists',
retryable: false, // No point retrying for deleted blocks
}, 25)
pendingSubblockUpdates.set(debouncedKey, {
latest: { blockId, subblockId, value, timestamp },
timeout,
opToSocket,
})
}
} catch (error) {
@@ -140,12 +88,12 @@ export function setupSubblocksHandlers(
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
// Emit operation-failed for queue-tracked operations
// Best-effort failure for the single operation if provided
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: errorMessage,
retryable: true, // Subblock updates are generally retryable
retryable: true,
})
}
@@ -159,3 +107,119 @@ export function setupSubblocksHandlers(
}
})
}
async function flushSubblockUpdate(
workflowId: string,
pending: PendingSubblock,
roomManager: RoomManager
) {
const { blockId, subblockId, value, timestamp } = pending.latest
try {
// Verify workflow still exists
const workflowExists = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (workflowExists.length === 0) {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
}
})
return
}
let updateSuccessful = false
await db.transaction(async (tx) => {
const [block] = await tx
.select({ subBlocks: workflowBlocks.subBlocks })
.from(workflowBlocks)
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
.limit(1)
if (!block) {
return
}
const subBlocks = (block.subBlocks as any) || {}
if (!subBlocks[subblockId]) {
subBlocks[subblockId] = { id: subblockId, type: 'unknown', value }
} else {
subBlocks[subblockId] = { ...subBlocks[subblockId], value }
}
await tx
.update(workflowBlocks)
.set({ subBlocks, updatedAt: new Date() })
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
updateSuccessful = true
})
if (updateSuccessful) {
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
const senderSocketIds = new Set(pending.opToSocket.values())
const io = (roomManager as any).io
if (io) {
// Get all sockets in the room
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
if (roomSockets) {
roomSockets.forEach((socketId: string) => {
// Only emit to sockets that didn't send any of the coalesced ops
if (!senderSocketIds.has(socketId)) {
const sock = io.sockets.sockets.get(socketId)
if (sock) {
sock.emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
}
}
})
}
}
// Confirm all coalesced operationIds
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
}
})
logger.debug(`Flushed subblock update ${workflowId}: ${blockId}.${subblockId}`)
} else {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Block no longer exists',
retryable: false,
})
}
})
}
} catch (error) {
logger.error('Error flushing subblock update:', error)
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
}
})
}
}

View File

@@ -8,6 +8,15 @@ import type { RoomManager } from '@/socket-server/rooms/manager'
const logger = createLogger('VariablesHandlers')
type PendingVariable = {
latest: { variableId: string; field: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
opToSocket: Map<string, string>
}
// Keyed by `${workflowId}:${variableId}:${field}`
const pendingVariableUpdates = new Map<string, PendingVariable>()
export function setupVariablesHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
@@ -47,85 +56,30 @@ export function setupVariablesHandlers(
userPresence.lastActivity = Date.now()
}
const workflowExists = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (workflowExists.length === 0) {
logger.warn(`Ignoring variable update: workflow ${workflowId} no longer exists`, {
socketId: socket.id,
variableId,
field,
})
roomManager.cleanupUserFromRoom(socket.id, workflowId)
return
}
let updateSuccessful = false
await db.transaction(async (tx) => {
const [workflowRecord] = await tx
.select({ variables: workflow.variables })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflowRecord) {
logger.debug(
`Ignoring variable update for deleted workflow: ${workflowId}/${variableId}.${field}`
)
return
}
const variables = (workflowRecord.variables as any) || {}
if (!variables[variableId]) {
logger.debug(
`Ignoring variable update for deleted variable: ${workflowId}/${variableId}.${field}`
)
return
}
variables[variableId] = {
...variables[variableId],
[field]: value,
}
await tx
.update(workflow)
.set({
variables: variables,
updatedAt: new Date(),
})
.where(eq(workflow.id, workflowId))
updateSuccessful = true
})
if (updateSuccessful) {
socket.to(workflowId).emit('variable-update', {
variableId,
field,
value,
timestamp,
senderId: socket.id,
userId: session.userId,
})
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
serverTimestamp: Date.now(),
})
}
logger.debug(`Variable update in workflow ${workflowId}: ${variableId}.${field}`)
} else if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Variable no longer exists',
retryable: false,
const debouncedKey = `${workflowId}:${variableId}:${field}`
const existing = pendingVariableUpdates.get(debouncedKey)
if (existing) {
clearTimeout(existing.timeout)
existing.latest = { variableId, field, value, timestamp }
if (operationId) existing.opToSocket.set(operationId, socket.id)
existing.timeout = setTimeout(async () => {
await flushVariableUpdate(workflowId, existing, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}, 25)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
const timeout = setTimeout(async () => {
const pending = pendingVariableUpdates.get(debouncedKey)
if (pending) {
await flushVariableUpdate(workflowId, pending, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}
}, 25)
pendingVariableUpdates.set(debouncedKey, {
latest: { variableId, field, value, timestamp },
timeout,
opToSocket,
})
}
} catch (error) {
@@ -150,3 +104,118 @@ export function setupVariablesHandlers(
}
})
}
async function flushVariableUpdate(
workflowId: string,
pending: PendingVariable,
roomManager: RoomManager
) {
const { variableId, field, value, timestamp } = pending.latest
try {
const workflowExists = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (workflowExists.length === 0) {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
}
})
return
}
let updateSuccessful = false
await db.transaction(async (tx) => {
const [workflowRecord] = await tx
.select({ variables: workflow.variables })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflowRecord) {
return
}
const variables = (workflowRecord.variables as any) || {}
if (!variables[variableId]) {
return
}
variables[variableId] = {
...variables[variableId],
[field]: value,
}
await tx
.update(workflow)
.set({ variables, updatedAt: new Date() })
.where(eq(workflow.id, workflowId))
updateSuccessful = true
})
if (updateSuccessful) {
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
const senderSocketIds = new Set(pending.opToSocket.values())
const io = (roomManager as any).io
if (io) {
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
if (roomSockets) {
roomSockets.forEach((socketId: string) => {
if (!senderSocketIds.has(socketId)) {
const sock = io.sockets.sockets.get(socketId)
if (sock) {
sock.emit('variable-update', {
variableId,
field,
value,
timestamp,
})
}
}
})
}
}
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
}
})
logger.debug(`Flushed variable update ${workflowId}: ${variableId}.${field}`)
} else {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Variable no longer exists',
retryable: false,
})
}
})
}
} catch (error) {
logger.error('Error flushing variable update:', error)
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
}
})
}
}

View File

@@ -260,6 +260,10 @@ export class RoomManager {
}
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this.io.to(workflowId).emit(event, payload)
}
/**
* Get the number of unique users in a workflow room
* (not the number of socket connections)

View File

@@ -93,11 +93,6 @@ function handleBeforeUnload(event: BeforeUnloadEvent): void {
}
}
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
useOperationQueueStore.getState().flushAllDebounced()
} catch {}
// Standard beforeunload pattern
event.preventDefault()
event.returnValue = ''

View File

@@ -31,10 +31,6 @@ interface OperationQueueState {
cancelOperationsForBlock: (blockId: string) => void
cancelOperationsForVariable: (variableId: string) => void
flushAllDebounced: () => void
flushDebouncedForBlock: (blockId: string) => void
flushDebouncedForVariable: (variableId: string) => void
flushDebouncedForWorkflow: (workflowId: string) => void
cancelOperationsForWorkflow: (workflowId: string) => void
triggerOfflineMode: () => void
@@ -44,14 +40,6 @@ interface OperationQueueState {
const retryTimeouts = new Map<string, NodeJS.Timeout>()
const operationTimeouts = new Map<string, NodeJS.Timeout>()
type PendingDebouncedOperation = {
timeout: NodeJS.Timeout
op: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>
}
const subblockDebounced = new Map<string, PendingDebouncedOperation>()
const variableDebounced = new Map<string, PendingDebouncedOperation>()
let emitWorkflowOperation:
| ((operation: string, target: string, payload: any, operationId?: string) => void)
| null = null
@@ -82,107 +70,52 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
hasOperationError: false,
addToQueue: (operation) => {
// Handle debouncing for regular subblock operations (but not immediate ones like tag selections)
// Immediate coalescing without client-side debouncing:
// For subblock updates, keep only latest pending op for the same blockId+subblockId
if (
operation.operation.operation === 'subblock-update' &&
operation.operation.target === 'subblock' &&
!operation.immediate
operation.operation.target === 'subblock'
) {
const { blockId, subblockId } = operation.operation.payload
const debounceKey = `${blockId}-${subblockId}`
const existing = subblockDebounced.get(debounceKey)
if (existing) {
clearTimeout(existing.timeout)
}
set((state) => ({
operations: state.operations.filter(
(op) =>
!(
op.status === 'pending' &&
op.operation.operation === 'subblock-update' &&
op.operation.target === 'subblock' &&
op.operation.payload?.blockId === blockId &&
op.operation.payload?.subblockId === subblockId
)
),
operations: [
...state.operations.filter(
(op) =>
!(
op.status === 'pending' &&
op.operation.operation === 'subblock-update' &&
op.operation.target === 'subblock' &&
op.operation.payload?.blockId === blockId &&
op.operation.payload?.subblockId === subblockId
)
),
],
}))
const timeoutId = setTimeout(() => {
const pending = subblockDebounced.get(debounceKey)
subblockDebounced.delete(debounceKey)
if (pending) {
const queuedOp: QueuedOperation = {
...pending.op,
timestamp: Date.now(),
retryCount: 0,
status: 'pending',
}
set((state) => ({
operations: [...state.operations, queuedOp],
}))
get().processNextOperation()
}
}, 25)
subblockDebounced.set(debounceKey, { timeout: timeoutId, op: operation })
return
}
// Handle debouncing for variable operations
// For variable updates, keep only latest pending op for same variableId+field
if (
operation.operation.operation === 'variable-update' &&
operation.operation.target === 'variable' &&
!operation.immediate
operation.operation.target === 'variable'
) {
const { variableId, field } = operation.operation.payload
const debounceKey = `${variableId}-${field}`
const existing = variableDebounced.get(debounceKey)
if (existing) {
clearTimeout(existing.timeout)
}
set((state) => ({
operations: state.operations.filter(
(op) =>
!(
op.status === 'pending' &&
op.operation.operation === 'variable-update' &&
op.operation.target === 'variable' &&
op.operation.payload?.variableId === variableId &&
op.operation.payload?.field === field
)
),
operations: [
...state.operations.filter(
(op) =>
!(
op.status === 'pending' &&
op.operation.operation === 'variable-update' &&
op.operation.target === 'variable' &&
op.operation.payload?.variableId === variableId &&
op.operation.payload?.field === field
)
),
],
}))
const timeoutId = setTimeout(() => {
const pending = variableDebounced.get(debounceKey)
variableDebounced.delete(debounceKey)
if (pending) {
const queuedOp: QueuedOperation = {
...pending.op,
timestamp: Date.now(),
retryCount: 0,
status: 'pending',
}
set((state) => ({
operations: [...state.operations, queuedOp],
}))
get().processNextOperation()
}
}, 25)
variableDebounced.set(debounceKey, { timeout: timeoutId, op: operation })
return
}
// Handle non-subblock operations (existing logic)
// Handle remaining logic
const state = get()
// Check for duplicate operation ID
@@ -261,34 +194,6 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
operationTimeouts.delete(operationId)
}
// Clean up any debounce timeouts for subblock operations
if (
operation?.operation.operation === 'subblock-update' &&
operation.operation.target === 'subblock'
) {
const { blockId, subblockId } = operation.operation.payload
const debounceKey = `${blockId}-${subblockId}`
const pending = subblockDebounced.get(debounceKey)
if (pending) {
clearTimeout(pending.timeout)
subblockDebounced.delete(debounceKey)
}
}
// Clean up any debounce timeouts for variable operations
if (
operation?.operation.operation === 'variable-update' &&
operation.operation.target === 'variable'
) {
const { variableId, field } = operation.operation.payload
const debounceKey = `${variableId}-${field}`
const pending = variableDebounced.get(debounceKey)
if (pending) {
clearTimeout(pending.timeout)
variableDebounced.delete(debounceKey)
}
}
logger.debug('Removing operation from queue', {
operationId,
remainingOps: newOperations.length,
@@ -314,34 +219,6 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
operationTimeouts.delete(operationId)
}
// Clean up any debounce timeouts for subblock operations
if (
operation.operation.operation === 'subblock-update' &&
operation.operation.target === 'subblock'
) {
const { blockId, subblockId } = operation.operation.payload
const debounceKey = `${blockId}-${subblockId}`
const pending = subblockDebounced.get(debounceKey)
if (pending) {
clearTimeout(pending.timeout)
subblockDebounced.delete(debounceKey)
}
}
// Clean up any debounce timeouts for variable operations
if (
operation.operation.operation === 'variable-update' &&
operation.operation.target === 'variable'
) {
const { variableId, field } = operation.operation.payload
const debounceKey = `${variableId}-${field}`
const pending = variableDebounced.get(debounceKey)
if (pending) {
clearTimeout(pending.timeout)
variableDebounced.delete(debounceKey)
}
}
if (!retryable) {
logger.debug('Operation marked as non-retryable, removing from queue', { operationId })
@@ -354,14 +231,30 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
return
}
if (operation.retryCount < 3) {
const newRetryCount = operation.retryCount + 1
const delay = 2 ** newRetryCount * 1000 // 2s, 4s, 8s
// More aggressive retry for subblock/variable updates, less aggressive for structural ops
const isSubblockOrVariable =
(operation.operation.operation === 'subblock-update' &&
operation.operation.target === 'subblock') ||
(operation.operation.operation === 'variable-update' &&
operation.operation.target === 'variable')
logger.warn(`Operation failed, retrying in ${delay}ms (attempt ${newRetryCount}/3)`, {
operationId,
retryCount: newRetryCount,
})
const maxRetries = isSubblockOrVariable ? 5 : 3 // 5 retries for text, 3 for structural
if (operation.retryCount < maxRetries) {
const newRetryCount = operation.retryCount + 1
// Faster retries for subblock/variable, exponential for structural
const delay = isSubblockOrVariable
? Math.min(1000 * newRetryCount, 3000) // 1s, 2s, 3s, 3s, 3s (cap at 3s)
: 2 ** newRetryCount * 1000 // 2s, 4s, 8s (exponential for structural)
logger.warn(
`Operation failed, retrying in ${delay}ms (attempt ${newRetryCount}/${maxRetries})`,
{
operationId,
retryCount: newRetryCount,
operation: operation.operation.operation,
}
)
// Update retry count and mark as pending for retry
set((state) => ({
@@ -381,7 +274,12 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
retryTimeouts.set(operationId, timeout)
} else {
logger.error('Operation failed after max retries, triggering offline mode', { operationId })
// Always trigger offline mode when we can't persist - never silently drop data
logger.error('Operation failed after max retries, triggering offline mode', {
operationId,
operation: operation.operation.operation,
retryCount: operation.retryCount,
})
get().triggerOfflineMode()
}
},
@@ -452,14 +350,22 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
}
}
// Create operation timeout
// Create operation timeout - longer for subblock/variable updates to handle reconnects
const isSubblockOrVariable =
(nextOperation.operation.operation === 'subblock-update' &&
nextOperation.operation.target === 'subblock') ||
(nextOperation.operation.operation === 'variable-update' &&
nextOperation.operation.target === 'variable')
const timeoutDuration = isSubblockOrVariable ? 15000 : 5000 // 15s for text edits, 5s for structural ops
const timeoutId = setTimeout(() => {
logger.warn('Operation timeout - no server response after 5 seconds', {
logger.warn(`Operation timeout - no server response after ${timeoutDuration}ms`, {
operationId: nextOperation.id,
operation: nextOperation.operation.operation,
})
operationTimeouts.delete(nextOperation.id)
get().handleOperationTimeout(nextOperation.id)
}, 5000)
}, timeoutDuration)
operationTimeouts.set(nextOperation.id, timeoutId)
},
@@ -467,15 +373,7 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
cancelOperationsForBlock: (blockId: string) => {
logger.debug('Canceling all operations for block', { blockId })
// Cancel all debounce timeouts for this block's subblocks
const keysToDelete: string[] = []
for (const [key, pending] of subblockDebounced.entries()) {
if (key.startsWith(`${blockId}-`)) {
clearTimeout(pending.timeout)
keysToDelete.push(key)
}
}
keysToDelete.forEach((key) => subblockDebounced.delete(key))
// No debounced timeouts to cancel (moved to server-side)
// Find and cancel operation timeouts for operations related to this block
const state = get()
@@ -516,7 +414,6 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
logger.debug('Cancelled operations for block', {
blockId,
cancelledDebounceTimeouts: keysToDelete.length,
cancelledOperations: operationsToCancel.length,
})
@@ -527,15 +424,7 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
cancelOperationsForVariable: (variableId: string) => {
logger.debug('Canceling all operations for variable', { variableId })
// Cancel all debounce timeouts for this variable
const keysToDelete: string[] = []
for (const [key, pending] of variableDebounced.entries()) {
if (key.startsWith(`${variableId}-`)) {
clearTimeout(pending.timeout)
keysToDelete.push(key)
}
}
keysToDelete.forEach((key) => variableDebounced.delete(key))
// No debounced timeouts to cancel (moved to server-side)
// Find and cancel operation timeouts for operations related to this variable
const state = get()
@@ -578,7 +467,6 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
logger.debug('Cancelled operations for variable', {
variableId,
cancelledDebounceTimeouts: keysToDelete.length,
cancelledOperations: operationsToCancel.length,
})
@@ -586,120 +474,6 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
get().processNextOperation()
},
flushAllDebounced: () => {
const toEnqueue: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>[] = []
subblockDebounced.forEach((pending, key) => {
clearTimeout(pending.timeout)
subblockDebounced.delete(key)
toEnqueue.push(pending.op)
})
variableDebounced.forEach((pending, key) => {
clearTimeout(pending.timeout)
variableDebounced.delete(key)
toEnqueue.push(pending.op)
})
if (toEnqueue.length === 0) return
set((state) => ({
operations: [
...state.operations,
...toEnqueue.map((op) => ({
...op,
timestamp: Date.now(),
retryCount: 0,
status: 'pending' as const,
})),
],
}))
get().processNextOperation()
},
flushDebouncedForBlock: (blockId: string) => {
const toEnqueue: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>[] = []
const keys: string[] = []
subblockDebounced.forEach((pending, key) => {
if (key.startsWith(`${blockId}-`)) {
clearTimeout(pending.timeout)
keys.push(key)
toEnqueue.push(pending.op)
}
})
keys.forEach((k) => subblockDebounced.delete(k))
if (toEnqueue.length === 0) return
set((state) => ({
operations: [
...state.operations,
...toEnqueue.map((op) => ({
...op,
timestamp: Date.now(),
retryCount: 0,
status: 'pending' as const,
})),
],
}))
get().processNextOperation()
},
flushDebouncedForVariable: (variableId: string) => {
const toEnqueue: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>[] = []
const keys: string[] = []
variableDebounced.forEach((pending, key) => {
if (key.startsWith(`${variableId}-`)) {
clearTimeout(pending.timeout)
keys.push(key)
toEnqueue.push(pending.op)
}
})
keys.forEach((k) => variableDebounced.delete(k))
if (toEnqueue.length === 0) return
set((state) => ({
operations: [
...state.operations,
...toEnqueue.map((op) => ({
...op,
timestamp: Date.now(),
retryCount: 0,
status: 'pending' as const,
})),
],
}))
get().processNextOperation()
},
flushDebouncedForWorkflow: (workflowId: string) => {
const toEnqueue: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>[] = []
const subblockKeys: string[] = []
subblockDebounced.forEach((pending, key) => {
if (pending.op.workflowId === workflowId) {
clearTimeout(pending.timeout)
subblockKeys.push(key)
toEnqueue.push(pending.op)
}
})
subblockKeys.forEach((k) => subblockDebounced.delete(k))
const variableKeys: string[] = []
variableDebounced.forEach((pending, key) => {
if (pending.op.workflowId === workflowId) {
clearTimeout(pending.timeout)
variableKeys.push(key)
toEnqueue.push(pending.op)
}
})
variableKeys.forEach((k) => variableDebounced.delete(k))
if (toEnqueue.length === 0) return
set((state) => ({
operations: [
...state.operations,
...toEnqueue.map((op) => ({
...op,
timestamp: Date.now(),
retryCount: 0,
status: 'pending' as const,
})),
],
}))
get().processNextOperation()
},
cancelOperationsForWorkflow: (workflowId: string) => {
const state = get()
retryTimeouts.forEach((timeout, opId) => {