fix(queueing): make debouncing native to queue (#682)

This commit is contained in:
Vikhyath Mondreti
2025-07-14 15:33:25 -07:00
committed by GitHub
parent 8a9bc4e929
commit e8c51e99a2
2 changed files with 82 additions and 57 deletions

View File

@@ -181,7 +181,7 @@ export function useSubBlockValue<T = any>(
triggerWorkflowUpdate = false,
options?: UseSubBlockValueOptions
): readonly [T | null, (value: T) => void] {
const { debounceMs = 150, isStreaming = false, onStreamingEnd } = options || {}
const { isStreaming = false, onStreamingEnd } = options || {}
const { collaborativeSetSubblockValue } = useCollaborativeWorkflow()
@@ -202,8 +202,7 @@ export function useSubBlockValue<T = any>(
// Previous model reference for detecting model changes
const prevModelRef = useRef<string | null>(null)
// Debouncing refs
const debounceTimerRef = useRef<NodeJS.Timeout | null>(null)
// Streaming refs
const lastEmittedValueRef = useRef<T | null>(null)
const streamingValueRef = useRef<T | null>(null)
const wasStreamingRef = useRef<boolean>(false)
@@ -232,15 +231,6 @@ export function useSubBlockValue<T = any>(
// Compute the modelValue based on block type
const modelValue = isProviderBasedBlock ? (modelSubBlockValue as string) : null
// Cleanup timer on unmount
useEffect(() => {
return () => {
if (debounceTimerRef.current) {
clearTimeout(debounceTimerRef.current)
}
}
}, [])
// Emit the value to socket/DB
const emitValue = useCallback(
(value: T) => {
@@ -299,26 +289,12 @@ export function useSubBlockValue<T = any>(
storeApiKeyValue(blockId, blockType, modelValue, newValue, storeValue)
}
// Clear any existing debounce timer
if (debounceTimerRef.current) {
clearTimeout(debounceTimerRef.current)
debounceTimerRef.current = null
}
// If streaming, just store the value without emitting
if (isStreaming) {
streamingValueRef.current = valueCopy
} else {
// Detect large changes for extended debounce
const isLargeChange = detectLargeChange(lastEmittedValueRef.current, valueCopy)
const effectiveDebounceMs = isLargeChange ? debounceMs * 2 : debounceMs
// Debounce the socket emission
debounceTimerRef.current = setTimeout(() => {
if (valueRef.current !== null && valueRef.current !== lastEmittedValueRef.current) {
emitValue(valueCopy)
}
}, effectiveDebounceMs)
// Emit immediately - let the operation queue handle debouncing and deduplication
emitValue(valueCopy)
}
if (triggerWorkflowUpdate) {
@@ -335,7 +311,6 @@ export function useSubBlockValue<T = any>(
triggerWorkflowUpdate,
modelValue,
isStreaming,
debounceMs,
emitValue,
]
)
@@ -412,26 +387,3 @@ export function useSubBlockValue<T = any>(
// Return appropriate tuple based on whether options were provided
return [storeValue !== undefined ? storeValue : initialValue, setValue] as const
}
// Helper function to detect large changes
function detectLargeChange(oldValue: any, newValue: any): boolean {
// Handle null/undefined
if (oldValue == null && newValue == null) return false
if (oldValue == null || newValue == null) return true
// For strings, check if it's a large paste or deletion
if (typeof oldValue === 'string' && typeof newValue === 'string') {
const sizeDiff = Math.abs(newValue.length - oldValue.length)
// Consider it a large change if more than 50 characters changed at once
return sizeDiff > 50
}
// For arrays, check length difference
if (Array.isArray(oldValue) && Array.isArray(newValue)) {
const sizeDiff = Math.abs(newValue.length - oldValue.length)
return sizeDiff > 5
}
// For other types, always treat as small change
return false
}

View File

@@ -34,6 +34,7 @@ interface OperationQueueState {
const retryTimeouts = new Map<string, NodeJS.Timeout>()
const operationTimeouts = new Map<string, NodeJS.Timeout>()
const subblockDebounceTimeouts = new Map<string, NodeJS.Timeout>()
let emitWorkflowOperation:
| ((operation: string, target: string, payload: any, operationId?: string) => void)
@@ -59,6 +60,54 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
hasOperationError: false,
addToQueue: (operation) => {
// Handle debouncing for subblock operations
if (
operation.operation.operation === 'subblock-update' &&
operation.operation.target === 'subblock'
) {
const { blockId, subblockId } = operation.operation.payload
const debounceKey = `${blockId}-${subblockId}`
const existingTimeout = subblockDebounceTimeouts.get(debounceKey)
if (existingTimeout) {
clearTimeout(existingTimeout)
}
set((state) => ({
operations: state.operations.filter(
(op) =>
!(
op.status === 'pending' &&
op.operation.operation === 'subblock-update' &&
op.operation.target === 'subblock' &&
op.operation.payload?.blockId === blockId &&
op.operation.payload?.subblockId === subblockId
)
),
}))
const timeoutId = setTimeout(() => {
subblockDebounceTimeouts.delete(debounceKey)
const queuedOp: QueuedOperation = {
...operation,
timestamp: Date.now(),
retryCount: 0,
status: 'pending',
}
set((state) => ({
operations: [...state.operations, queuedOp],
}))
get().processNextOperation()
}, 150) // 150ms debounce for subblock operations
subblockDebounceTimeouts.set(debounceKey, timeoutId)
return
}
// Handle non-subblock operations (existing logic)
const state = get()
// Check for duplicate operation ID
@@ -80,13 +129,8 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
// For block operations, check the block ID specifically
((operation.operation.target === 'block' &&
op.operation.payload?.id === operation.operation.payload?.id) ||
// For subblock operations, check blockId and subblockId
(operation.operation.target === 'subblock' &&
op.operation.payload?.blockId === operation.operation.payload?.blockId &&
op.operation.payload?.subblockId === operation.operation.payload?.subblockId) ||
// For other operations, fall back to full payload comparison
(operation.operation.target !== 'block' &&
operation.operation.target !== 'subblock' &&
JSON.stringify(op.operation.payload) === JSON.stringify(operation.operation.payload)))
)
@@ -127,6 +171,7 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
confirmOperation: (operationId) => {
const state = get()
const operation = state.operations.find((op) => op.id === operationId)
const newOperations = state.operations.filter((op) => op.id !== operationId)
const retryTimeout = retryTimeouts.get(operationId)
@@ -141,6 +186,20 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
operationTimeouts.delete(operationId)
}
// Clean up any debounce timeouts for subblock operations
if (
operation?.operation.operation === 'subblock-update' &&
operation.operation.target === 'subblock'
) {
const { blockId, subblockId } = operation.operation.payload
const debounceKey = `${blockId}-${subblockId}`
const debounceTimeout = subblockDebounceTimeouts.get(debounceKey)
if (debounceTimeout) {
clearTimeout(debounceTimeout)
subblockDebounceTimeouts.delete(debounceKey)
}
}
logger.debug('Removing operation from queue', {
operationId,
remainingOps: newOperations.length,
@@ -166,6 +225,20 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
operationTimeouts.delete(operationId)
}
// Clean up any debounce timeouts for subblock operations
if (
operation.operation.operation === 'subblock-update' &&
operation.operation.target === 'subblock'
) {
const { blockId, subblockId } = operation.operation.payload
const debounceKey = `${blockId}-${subblockId}`
const debounceTimeout = subblockDebounceTimeouts.get(debounceKey)
if (debounceTimeout) {
clearTimeout(debounceTimeout)
subblockDebounceTimeouts.delete(debounceKey)
}
}
if (operation.retryCount < 3) {
const newRetryCount = operation.retryCount + 1
const delay = 2 ** newRetryCount * 1000 // 2s, 4s, 8s