feat: canvas flow rework (wip)

This commit is contained in:
psychedelicious
2025-06-03 14:49:00 +10:00
parent 791f9de99e
commit 6f3a349ad4
7 changed files with 94 additions and 260 deletions

View File

@@ -91,7 +91,7 @@ async def list_queue_items(
@session_queue_router.get(
"/{queue_id}/all",
"/{queue_id}/list_all",
operation_id="list_all_queue_items",
responses={
200: {"model": list[SessionQueueItem]},

View File

@@ -66,7 +66,7 @@ import { useHotkeys } from 'react-hotkeys-hook';
import { Trans, useTranslation } from 'react-i18next';
import { PiDotsThreeOutlineVerticalFill, PiUploadBold } from 'react-icons/pi';
import { getImageDTOSafe, useGetImageDTOQuery } from 'services/api/endpoints/images';
import { queueItemsAdapterSelectors, useListQueueItemsQuery } from 'services/api/endpoints/queue';
import { useListAllQueueItemsQuery } from 'services/api/endpoints/queue';
import type { ImageDTO, S } from 'services/api/types';
import type { ProgressAndResult } from 'services/events/stores';
import { $progressImages, $socket, useMapSelector } from 'services/events/stores';
@@ -351,17 +351,12 @@ const StagingArea = memo(() => {
const [canScrollLeft, setCanScrollLeft] = useState(false);
const [canScrollRight, setCanScrollRight] = useState(false);
const scrollableRef = useRef<HTMLDivElement>(null);
const { data } = useListQueueItemsQuery({ destination: 'canvas' });
const items = useMemo(() => {
if (!data) {
return EMPTY_ARRAY;
}
return queueItemsAdapterSelectors.selectAll(data);
}, [data]);
const { data } = useListAllQueueItemsQuery({ destination: 'canvas' });
const items = useMemo(() => data?.filter(({ status }) => status !== 'canceled') ?? EMPTY_ARRAY, [data]);
const selectedItem = useMemo(
() =>
data && selectedItemId !== null ? queueItemsAdapterSelectors.selectById(data, String(selectedItemId)) : null,
[data, selectedItemId]
items.length > 0 && selectedItemId !== null ? items.find(({ item_id }) => item_id === selectedItemId) : null,
[items, selectedItemId]
);
useEffect(() => {

View File

@@ -13,7 +13,7 @@ import { memo, useCallback, useEffect, useMemo, useRef, useState } from 'react';
import { useTranslation } from 'react-i18next';
import type { Components, ItemContent } from 'react-virtuoso';
import { Virtuoso } from 'react-virtuoso';
import { queueItemsAdapterSelectors, useListQueueItemsQuery } from 'services/api/endpoints/queue';
import { useListQueueItemsQuery } from 'services/api/endpoints/queue';
import type { S } from 'services/api/types';
import QueueItemComponent from './QueueItemComponent';
@@ -70,7 +70,7 @@ const QueueList = () => {
if (!listQueueItemsData) {
return [];
}
return queueItemsAdapterSelectors.selectAll(listQueueItemsData);
return listQueueItemsData.items;
}, [listQueueItemsData]);
const handleLoadMore = useCallback(() => {

View File

@@ -1,14 +1,9 @@
import type { EntityState, ThunkDispatch, UnknownAction } from '@reduxjs/toolkit';
import { createEntityAdapter } from '@reduxjs/toolkit';
import { getSelectorsOptions } from 'app/store/createMemoizedSelector';
import { $queueId } from 'app/store/nanostores/queueId';
import { listParamsReset } from 'features/queue/store/queueSlice';
import queryString from 'query-string';
import type { components, paths } from 'services/api/schema';
import type { S } from 'services/api/types';
import type { ApiTagDescription } from '..';
import { api, buildV1Url, LIST_TAG } from '..';
import { api, buildV1Url, LIST_ALL_TAG, LIST_TAG } from '..';
/**
* Builds an endpoint URL for the queue router
@@ -36,30 +31,6 @@ export type SessionQueueItemStatus = NonNullable<
NonNullable<paths['/api/v1/queue/{queue_id}/list']['get']['parameters']['query']>['status']
>;
export const queueItemsAdapter = createEntityAdapter<S['SessionQueueItem'], string>({
selectId: (queueItem) => String(queueItem.item_id),
sortComparer: (a, b) => {
// Sort by priority in descending order
if (a.priority > b.priority) {
return -1;
}
if (a.priority < b.priority) {
return 1;
}
// If priority is the same, sort by id in ascending order
if (a.item_id < b.item_id) {
return -1;
}
if (a.item_id > b.item_id) {
return 1;
}
return 0;
},
});
export const queueItemsAdapterSelectors = queueItemsAdapter.getSelectors(undefined, getSelectorsOptions);
export const queueApi = api.injectEndpoints({
endpoints: (build) => ({
enqueueBatch: build.mutation<
@@ -71,58 +42,14 @@ export const queueApi = api.injectEndpoints({
body: arg,
method: 'POST',
}),
invalidatesTags: ['CurrentSessionQueueItem', 'NextSessionQueueItem', 'QueueCountsByDestination'],
onQueryStarted: async (arg, api) => {
const { dispatch, queryFulfilled } = api;
try {
const { data } = await queryFulfilled;
resetListQueryData(dispatch);
/**
* When a batch is enqueued, we need to update the queue status. While it might be templting to invalidate the
* `SessionQueueStatus` tag here, this can introduce a race condition when the queue item executes quickly:
*
* - Enqueue via this query
* - On success, we invalidate `SessionQueueStatus` tag - network request sent to server
* - The server gets the queue status request and responds, but this takes some time... in the meantime:
* - The new queue item starts executing, and we receive a socket queue item status changed event
* - We optimistically update the queue status in the queue item status changed socket handler
* - At this point, the queue status is correct
* - Finally, we get the queue status from the tag invalidation request - but it's reporting the queue status
* from _before_ the last queue event
* - The queue status is now incorrect!
*
* Ok, what if we just never did optimistic updates and invalidated the tag in the queue event handlers instead?
* It's much simpler that way, but it causes a lot of network requests - 3 per queue item, as it moves from
* pending -> in_progress -> completed/failed/canceled.
*
* We can do a bit of extra work here, incrementing the pending and total counts in the queue status, and do
* similar optimistic updates in the socket handler. Because this optimistic update runs immediately after the
* enqueue network request, it should always occur _before_ the next queue event, so no race condition:
*
* - Enqueue batch via this query
* - On success, optimistically update - this happens immediately on the HTTP OK - before the next queue event
* - At this point, the queue status is correct
* - A queue item status changes and we receive a socket event w/ updated status
* - Update status optimistically in socket handler
* - Queue status is still correct
*
* This problem occurs most commonly with canvas filters like Canny edge detection, which are single-node
* graphs that execute very quickly. Image generation graphs take long enough to not trigger this race
* condition - even when all nodes are cached on the server.
*/
dispatch(
queueApi.util.updateQueryData('getQueueStatus', undefined, (draft) => {
if (!draft) {
return;
}
draft.queue.pending += data.enqueued;
draft.queue.total += data.enqueued;
})
);
} catch {
// no-op
}
},
invalidatesTags: [
'SessionQueueStatus',
'CurrentSessionQueueItem',
'NextSessionQueueItem',
'QueueCountsByDestination',
{ type: 'SessionQueueItem', id: LIST_TAG },
{ type: 'SessionQueueItem', id: LIST_ALL_TAG },
],
}),
resumeProcessor: build.mutation<
paths['/api/v1/queue/{queue_id}/processor/resume']['put']['responses']['200']['content']['application/json'],
@@ -152,16 +79,7 @@ export const queueApi = api.injectEndpoints({
url: buildQueueUrl('prune'),
method: 'PUT',
}),
invalidatesTags: ['SessionQueueStatus', 'BatchStatus'],
onQueryStarted: async (arg, api) => {
const { dispatch, queryFulfilled } = api;
try {
await queryFulfilled;
resetListQueryData(dispatch);
} catch {
// no-op
}
},
invalidatesTags: ['SessionQueueStatus', 'BatchStatus', { type: 'SessionQueueItem', id: LIST_TAG }],
}),
clearQueue: build.mutation<
paths['/api/v1/queue/{queue_id}/clear']['put']['responses']['200']['content']['application/json'],
@@ -178,16 +96,9 @@ export const queueApi = api.injectEndpoints({
'CurrentSessionQueueItem',
'NextSessionQueueItem',
'QueueCountsByDestination',
{ type: 'SessionQueueItem', id: LIST_TAG },
{ type: 'SessionQueueItem', id: LIST_ALL_TAG },
],
onQueryStarted: async (arg, api) => {
const { dispatch, queryFulfilled } = api;
try {
await queryFulfilled;
resetListQueryData(dispatch);
} catch {
// no-op
}
},
}),
getCurrentQueueItem: build.query<
paths['/api/v1/queue/{queue_id}/current']['get']['responses']['200']['content']['application/json'],
@@ -271,25 +182,6 @@ export const queueApi = api.injectEndpoints({
url: buildQueueUrl(`i/${item_id}/cancel`),
method: 'PUT',
}),
onQueryStarted: async (item_id, { dispatch, queryFulfilled }) => {
try {
const { data } = await queryFulfilled;
dispatch(
queueApi.util.updateQueryData('listQueueItems', undefined, (draft) => {
queueItemsAdapter.updateOne(draft, {
id: String(item_id),
changes: {
status: data.status,
completed_at: data.completed_at,
updated_at: data.updated_at,
},
});
})
);
} catch {
// no-op
}
},
invalidatesTags: (result) => {
if (!result) {
return [];
@@ -313,16 +205,19 @@ export const queueApi = api.injectEndpoints({
method: 'PUT',
body,
}),
onQueryStarted: async (arg, api) => {
const { dispatch, queryFulfilled } = api;
try {
await queryFulfilled;
resetListQueryData(dispatch);
} catch {
// no-op
invalidatesTags: (result, error, { batch_ids }) => {
if (!result) {
return [];
}
return [
'SessionQueueStatus',
'BatchStatus',
'QueueCountsByDestination',
{ type: 'SessionQueueItem', id: LIST_TAG },
{ type: 'SessionQueueItem', id: LIST_ALL_TAG },
...batch_ids.map((id) => ({ type: 'BatchStatus', id }) satisfies ApiTagDescription),
];
},
invalidatesTags: ['SessionQueueStatus', 'BatchStatus', 'QueueCountsByDestination'],
}),
cancelByBatchDestination: build.mutation<
paths['/api/v1/queue/{queue_id}/cancel_by_destination']['put']['responses']['200']['content']['application/json'],
@@ -333,20 +228,17 @@ export const queueApi = api.injectEndpoints({
method: 'PUT',
params,
}),
onQueryStarted: async (arg, api) => {
const { dispatch, queryFulfilled } = api;
try {
await queryFulfilled;
resetListQueryData(dispatch);
} catch {
// no-op
}
},
invalidatesTags: (result, error, { destination }) => {
if (!result) {
return [];
}
return ['SessionQueueStatus', 'BatchStatus', { type: 'QueueCountsByDestination', id: destination }];
return [
'SessionQueueStatus',
'BatchStatus',
{ type: 'SessionQueueItem', id: LIST_TAG },
{ type: 'SessionQueueItem', id: LIST_ALL_TAG },
{ type: 'QueueCountsByDestination', id: destination },
];
},
}),
cancelAllExceptCurrent: build.mutation<
@@ -357,16 +249,7 @@ export const queueApi = api.injectEndpoints({
url: buildQueueUrl('cancel_all_except_current'),
method: 'PUT',
}),
onQueryStarted: async (arg, api) => {
const { dispatch, queryFulfilled } = api;
try {
await queryFulfilled;
resetListQueryData(dispatch);
} catch {
// no-op
}
},
invalidatesTags: ['SessionQueueStatus', 'BatchStatus', 'QueueCountsByDestination'],
invalidatesTags: ['SessionQueueStatus', 'BatchStatus', 'QueueCountsByDestination', 'SessionQueueItem'],
}),
retryItemsById: build.mutation<
paths['/api/v1/queue/{queue_id}/retry_items_by_id']['put']['responses']['200']['content']['application/json'],
@@ -377,44 +260,64 @@ export const queueApi = api.injectEndpoints({
method: 'PUT',
body,
}),
onQueryStarted: async (arg, api) => {
const { dispatch, queryFulfilled } = api;
try {
await queryFulfilled;
resetListQueryData(dispatch);
} catch {
// no-op
invalidatesTags: (result, error, item_ids) => {
if (!result) {
return [];
}
return [
'CurrentSessionQueueItem',
'NextSessionQueueItem',
'QueueCountsByDestination',
{ type: 'SessionQueueItem', id: LIST_TAG },
{ type: 'SessionQueueItem', id: LIST_ALL_TAG },
...item_ids.map((id) => ({ type: 'SessionQueueItem', id }) satisfies ApiTagDescription),
];
},
invalidatesTags: ['CurrentSessionQueueItem', 'NextSessionQueueItem', 'QueueCountsByDestination'],
}),
listQueueItems: build.query<
EntityState<S['SessionQueueItem'], string> & {
has_more: boolean;
},
components['schemas']['CursorPaginatedResults_SessionQueueItem_'],
{ cursor?: number; priority?: number; destination?: string } | undefined
>({
query: (queryArgs) => ({
url: getListQueueItemsUrl(queryArgs),
method: 'GET',
}),
serializeQueryArgs: () => {
return buildQueueUrl('list');
},
transformResponse: (response: components['schemas']['CursorPaginatedResults_SessionQueueItem_']) =>
queueItemsAdapter.upsertMany(
queueItemsAdapter.getInitialState({
has_more: response.has_more,
}),
response.items
),
merge: (cache, response) => {
queueItemsAdapter.upsertMany(cache, queueItemsAdapterSelectors.selectAll(response));
cache.has_more = response.has_more;
},
forceRefetch: ({ currentArg, previousArg }) => currentArg !== previousArg,
keepUnusedDataFor: 60 * 5, // 5 minutes
providesTags: ['FetchOnReconnect', { type: 'SessionQueueItem', id: LIST_TAG }],
providesTags: (result, _error, _args) => {
if (!result) {
return [];
}
return [
'FetchOnReconnect',
{ type: 'SessionQueueItem', id: LIST_TAG },
...result.items.map(({ item_id }) => ({ type: 'SessionQueueItem', id: item_id }) satisfies ApiTagDescription),
];
},
}),
listAllQueueItems: build.query<
paths['/api/v1/queue/{queue_id}/list_all']['get']['responses']['200']['content']['application/json'],
paths['/api/v1/queue/{queue_id}/list_all']['get']['parameters']['query']
>({
query: (queryArgs) => {
const q = queryArgs
? queryString.stringify(queryArgs, {
arrayFormat: 'none',
})
: undefined;
return q ? buildQueueUrl(`list_all?${q}`) : buildQueueUrl('list_all');
},
providesTags: (result, _error, _args) => {
if (!result) {
return [];
}
const tags: ApiTagDescription[] = [
'FetchOnReconnect',
{ type: 'SessionQueueItem', id: LIST_ALL_TAG },
...result.map(({ item_id }) => ({ type: 'SessionQueueItem', id: item_id }) satisfies ApiTagDescription),
];
return tags;
},
}),
getQueueCountsByDestination: build.query<
paths['/api/v1/queue/{queue_id}/counts_by_destination']['get']['responses']['200']['content']['application/json'],
@@ -440,6 +343,7 @@ export const {
useGetQueueStatusQuery,
useGetQueueItemQuery,
useListQueueItemsQuery,
useListAllQueueItemsQuery,
useCancelQueueItemMutation,
useGetBatchStatusQuery,
useGetCurrentQueueItemQuery,
@@ -450,24 +354,6 @@ export const {
export const selectQueueStatus = queueApi.endpoints.getQueueStatus.select();
export const selectCanvasQueueCounts = queueApi.endpoints.getQueueCountsByDestination.select({ destination: 'canvas' });
const resetListQueryData = (
// eslint-disable-next-line @typescript-eslint/no-explicit-any
dispatch: ThunkDispatch<any, any, UnknownAction>
) => {
dispatch(
queueApi.util.updateQueryData('listQueueItems', undefined, (draft) => {
// remove all items from the list
queueItemsAdapter.removeAll(draft);
// reset the has_more flag
draft.has_more = false;
})
);
// set the list cursor and priority to undefined
dispatch(listParamsReset());
// we have to manually kick off another query to get the first page and re-initialize the list
dispatch(queueApi.endpoints.listQueueItems.initiate(undefined));
};
export const enqueueMutationFixedCacheKeyOptions = {
fixedCacheKey: 'enqueueBatch',
} as const;

View File

@@ -56,6 +56,7 @@ const tagTypes = [
] as const;
export type ApiTagDescription = TagDescription<(typeof tagTypes)[number]>;
export const LIST_TAG = 'LIST';
export const LIST_ALL_TAG = 'LIST_ALL';
const dynamicBaseQuery: BaseQueryFn<string | FetchArgs, unknown, FetchBaseQueryError> = (args, api, extraOptions) => {
const baseUrl = $baseUrl.get();

View File

@@ -1164,7 +1164,7 @@ export type paths = {
patch?: never;
trace?: never;
};
"/api/v1/queue/{queue_id}/all": {
"/api/v1/queue/{queue_id}/list_all": {
parameters: {
query?: never;
header?: never;

View File

@@ -21,7 +21,7 @@ import { toast } from 'features/toast/toast';
import { t } from 'i18next';
import { forEach, isNil, round } from 'lodash-es';
import type { ApiTagDescription } from 'services/api';
import { api, LIST_TAG } from 'services/api';
import { api, LIST_ALL_TAG, LIST_TAG } from 'services/api';
import { modelsApi } from 'services/api/endpoints/models';
import { queueApi } from 'services/api/endpoints/queue';
import { workflowsApi } from 'services/api/endpoints/workflows';
@@ -363,68 +363,20 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis
socket.on('queue_item_status_changed', (data) => {
// we've got new status for the queue item, batch and queue
const {
item_id,
session_id,
status,
started_at,
updated_at,
completed_at,
batch_status,
queue_status,
error_type,
error_message,
error_traceback,
destination,
credits,
} = data;
const { item_id, session_id, status, batch_status, error_type, error_message, destination } = data;
log.debug({ data }, `Queue item ${item_id} status updated: ${status}`);
// // Update this specific queue item in the list of queue items (this is the queue item DTO, without the session)
// dispatch(
// queueApi.util.updateQueryData('listQueueItems', undefined, (draft) => {
// queueItemsAdapter.updateOne(draft, {
// id: String(item_id),
// changes: {
// status,
// started_at,
// updated_at: updated_at ?? undefined,
// completed_at: completed_at ?? undefined,
// error_type,
// error_message,
// error_traceback,
// credits,
// },
// });
// })
// );
// Optimistic update of the queue status. We prefer to do an optimistic update over tag invalidation due to the
// frequency of `queue_item_status_changed` events.
dispatch(
queueApi.util.updateQueryData('getQueueStatus', undefined, (draft) => {
if (!draft) {
return;
}
/**
* Update the queue status - though the getQueueStatus query response contains the processor status (i.e. running
* or paused), that data is not provided in the event we are handling. So we can only update `draft.queue` here.
*/
Object.assign(draft.queue, queue_status);
})
);
// Update the batch status
dispatch(queueApi.util.updateQueryData('getBatchStatus', { batch_id: batch_status.batch_id }, () => batch_status));
// Invalidate caches for things we cannot easily update
const tagsToInvalidate: ApiTagDescription[] = [
'SessionQueueStatus',
'CurrentSessionQueueItem',
'NextSessionQueueItem',
'InvocationCacheStatus',
{ type: 'SessionQueueItem', id: item_id },
{ type: 'SessionQueueItem', id: LIST_TAG },
{ type: 'SessionQueueItem', id: LIST_ALL_TAG },
{ type: 'BatchStatus', id: batch_status.batch_id },
];
if (destination) {
tagsToInvalidate.push({ type: 'QueueCountsByDestination', id: destination });