fix(ui): race condition w/ queue counts

This commit is contained in:
psychedelicious
2025-07-28 19:17:45 +10:00
parent 4f3a5dcc43
commit 6b810cb3fb
2 changed files with 12 additions and 44 deletions

View File

@@ -71,7 +71,6 @@ export const queueApi = api.injectEndpoints({
method: 'POST',
}),
invalidatesTags: [
'SessionQueueStatus',
'CurrentSessionQueueItem',
'NextSessionQueueItem',
'QueueCountsByDestination',
@@ -83,46 +82,10 @@ export const queueApi = api.injectEndpoints({
try {
const { data } = await queryFulfilled;
resetListQueryData(dispatch);
/**
* When a batch is enqueued, we need to update the queue status. While it might be templting to invalidate the
* `SessionQueueStatus` tag here, this can introduce a race condition when the queue item executes quickly:
*
* - Enqueue via this query
* - On success, we invalidate `SessionQueueStatus` tag - network request sent to server
* - The server gets the queue status request and responds, but this takes some time... in the meantime:
* - The new queue item starts executing, and we receive a socket queue item status changed event
* - We optimistically update the queue status in the queue item status changed socket handler
* - At this point, the queue status is correct
* - Finally, we get the queue status from the tag invalidation request - but it's reporting the queue status
* from _before_ the last queue event
* - The queue status is now incorrect!
*
* Ok, what if we just never did optimistic updates and invalidated the tag in the queue event handlers instead?
* It's much simpler that way, but it causes a lot of network requests - 3 per queue item, as it moves from
* pending -> in_progress -> completed/failed/canceled.
*
* We can do a bit of extra work here, incrementing the pending and total counts in the queue status, and do
* similar optimistic updates in the socket handler. Because this optimistic update runs immediately after the
* enqueue network request, it should always occur _before_ the next queue event, so no race condition:
*
* - Enqueue batch via this query
* - On success, optimistically update - this happens immediately on the HTTP OK - before the next queue event
* - At this point, the queue status is correct
* - A queue item status changes and we receive a socket event w/ updated status
* - Update status optimistically in socket handler
* - Queue status is still correct
*
* This problem occurs most commonly with canvas filters like Canny edge detection, which are single-node
* graphs that execute very quickly. Image generation graphs take long enough to not trigger this race
* condition - even when all nodes are cached on the server.
*/
dispatch(
queueApi.util.updateQueryData('getQueueStatus', undefined, (draft) => {
if (!draft) {
return;
}
draft.queue.pending += data.enqueued;
draft.queue.total += data.enqueued;
draft.queue.in_progress += data.item_ids.length;
draft.queue.total += data.item_ids.length;
})
);
} catch {

View File

@@ -381,7 +381,6 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis
// Invalidate caches for things we cannot easily update
const tagsToInvalidate: ApiTagDescription[] = [
'SessionQueueStatus',
'CurrentSessionQueueItem',
'NextSessionQueueItem',
'InvocationCacheStatus',
@@ -394,6 +393,16 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis
tagsToInvalidate.push({ type: 'QueueCountsByDestination', id: destination });
}
dispatch(queueApi.util.invalidateTags(tagsToInvalidate));
dispatch(
queueApi.util.updateQueryData('getQueueStatus', undefined, (draft) => {
draft.queue = data.queue_status;
})
);
dispatch(
queueApi.util.updateQueryData('getBatchStatus', { batch_id: data.batch_id }, (draft) => {
Object.assign(draft, data.batch_status);
})
);
if (status === 'in_progress') {
forEach($nodeExecutionStates.get(), (nes) => {
@@ -408,10 +417,6 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis
clone.outputs = [];
$nodeExecutionStates.setKey(clone.nodeId, clone);
});
if (data.origin === 'canvas') {
// store.dispatch(stagingAreaGenerationStarted({ sessionId: session_id }));
// $progressImages.setKey(session_id, { sessionId: session_id, isFinished: false });
}
} else if (status === 'completed' || status === 'failed' || status === 'canceled') {
finishedQueueItemIds.set(item_id, true);
if (status === 'failed' && error_type) {