From 385a8afacf713739f476d789c7d1aa71135d6188 Mon Sep 17 00:00:00 2001 From: psychedelicious <4822129+psychedelicious@users.noreply.github.com> Date: Sat, 16 Sep 2023 02:24:22 +1000 Subject: [PATCH] feat: add queue_id & support logic --- invokeai/app/api/routers/session_queue.py | 136 ++++++--- invokeai/app/api/sockets.py | 12 +- invokeai/app/services/events.py | 13 +- .../session_execution_base.py | 27 +- .../session_execution_default.py | 48 ++- .../session_queue/session_queue_base.py | 33 ++- .../session_queue/session_queue_common.py | 65 +++-- .../session_queue/session_queue_sqlite.py | 273 ++++++++++++------ .../web/src/app/components/InvokeAIUI.tsx | 11 +- .../socketio/socketQueueItemStatusChanged.ts | 7 +- .../listeners/userInvokedCanvas.ts | 53 ---- .../features/queue/components/QueueTable.tsx | 26 +- .../queue/components/common/QueueItemCard.tsx | 2 +- .../src/features/queue/store/nanoStores.ts | 5 + .../src/features/queue/store/queueSlice.ts | 24 +- .../web/src/services/api/endpoints/queue.ts | 234 +++++++-------- .../frontend/web/src/services/api/schema.d.ts | 191 ++++++++++-- .../frontend/web/src/services/api/types.ts | 2 +- .../frontend/web/src/services/events/types.ts | 12 +- .../services/events/util/setEventListeners.ts | 10 +- 20 files changed, 730 insertions(+), 454 deletions(-) create mode 100644 invokeai/frontend/web/src/features/queue/store/nanoStores.ts diff --git a/invokeai/app/api/routers/session_queue.py b/invokeai/app/api/routers/session_queue.py index b592b86157..4673e2c6bf 100644 --- a/invokeai/app/api/routers/session_queue.py +++ b/invokeai/app/api/routers/session_queue.py @@ -28,45 +28,48 @@ class SessionQueueAndExecutionStatusResult(SessionQueueStatusResult, SessionExec @session_queue_router.post( - "/enqueue", + "/{queue_id}/enqueue", operation_id="enqueue", responses={ 201: {"model": EnqueueResult}, }, ) async def enqueue( + queue_id: str = Path(description="The queue id to perform this operation on"), graph: Graph = Body(description="The graph to enqueue"), prepend: bool = Body(default=False, description="Whether or not to prepend this batch in the queue"), ) -> EnqueueResult: """Enqueues a graph for single execution.""" - return ApiDependencies.invoker.services.session_queue.enqueue(graph=graph, prepend=prepend) + return ApiDependencies.invoker.services.session_queue.enqueue(queue_id=queue_id, graph=graph, prepend=prepend) @session_queue_router.post( - "/enqueue_batch", + "/{queue_id}/enqueue_batch", operation_id="enqueue_batch", responses={ 201: {"model": EnqueueResult}, }, ) async def enqueue_batch( + queue_id: str = Path(description="The queue id to perform this operation on"), batch: Batch = Body(description="Batch to process"), prepend: bool = Body(default=False, description="Whether or not to prepend this batch in the queue"), ) -> EnqueueResult: """Processes a batch and enqueues the output graphs for execution.""" - return ApiDependencies.invoker.services.session_queue.enqueue_batch(batch=batch, prepend=prepend) + return ApiDependencies.invoker.services.session_queue.enqueue_batch(queue_id=queue_id, batch=batch, prepend=prepend) @session_queue_router.get( - "/list", + "/{queue_id}/list", operation_id="list_queue_items", responses={ 200: {"model": CursorPaginatedResults[SessionQueueItemDTO]}, }, ) async def list_queue_items( + queue_id: str = Path(description="The queue id to perform this operation on"), limit: int = Query(default=50, description="The number of items to fetch"), status: Optional[QUEUE_ITEM_STATUS] = Query(default=None, description="The status of items to fetch"), cursor: Optional[int] = Query(default=None, description="The pagination cursor"), @@ -75,135 +78,184 @@ async def list_queue_items( """Gets all queue items (without graphs)""" return ApiDependencies.invoker.services.session_queue.list_queue_items( - limit=limit, status=status, cursor=cursor, priority=priority + queue_id=queue_id, limit=limit, status=status, order_id=cursor, priority=priority ) @session_queue_router.put( - "/start", + "/{queue_id}/start", operation_id="start", ) -async def start() -> None: +async def start( + queue_id: str = Path(description="The queue id to perform this operation on"), +) -> None: """Starts session queue execution""" - return ApiDependencies.invoker.services.session_execution.start() + return ApiDependencies.invoker.services.session_execution.start( + queue_id=queue_id, + ) @session_queue_router.put( - "/stop", + "/{queue_id}/stop", operation_id="stop", ) -async def stop() -> None: +async def stop( + queue_id: str = Path(description="The queue id to perform this operation on"), +) -> None: """Stops session queue execution, waiting for the currently executing session to finish""" - return ApiDependencies.invoker.services.session_execution.stop() + return ApiDependencies.invoker.services.session_execution.stop( + queue_id=queue_id, + ) @session_queue_router.put( - "/cancel", + "/{queue_id}/cancel", operation_id="cancel", ) -async def cancel() -> None: +async def cancel( + queue_id: str = Path(description="The queue id to perform this operation on"), +) -> None: """Stops session queue execution, immediately canceling the currently-executing session""" - return ApiDependencies.invoker.services.session_execution.cancel() + return ApiDependencies.invoker.services.session_execution.cancel( + queue_id=queue_id, + ) @session_queue_router.put( - "/cancel_by_batch_ids", operation_id="cancel_by_batch_ids", responses={200: {"model": CancelByBatchIDsResult}} + "/{queue_id}/cancel_by_batch_ids", + operation_id="cancel_by_batch_ids", + responses={200: {"model": CancelByBatchIDsResult}}, ) async def cancel_by_batch_ids( + queue_id: str = Path(description="The queue id to perform this operation on"), batch_ids: list[str] = Body(description="The list of batch_ids to cancel all queue items for", embed=True), ) -> CancelByBatchIDsResult: """Immediately cancels all queue items from the given batch ids""" - current = ApiDependencies.invoker.services.session_execution.get_current() + current = ApiDependencies.invoker.services.session_execution.get_current( + queue_id=queue_id, + ) if current is not None and current.batch_id in batch_ids: - ApiDependencies.invoker.services.session_execution.cancel() - return ApiDependencies.invoker.services.session_queue.cancel_by_batch_ids(batch_ids) + ApiDependencies.invoker.services.session_execution.cancel( + queue_id=queue_id, + ) + return ApiDependencies.invoker.services.session_queue.cancel_by_batch_ids(queue_id=queue_id, batch_ids=batch_ids) @session_queue_router.put( - "/clear", + "/{queue_id}/clear", operation_id="clear", responses={ 200: {"model": ClearResult}, }, ) -async def clear() -> ClearResult: +async def clear( + queue_id: str = Path(description="The queue id to perform this operation on"), +) -> ClearResult: """Clears the queue entirely, immediately canceling the currently-executing session""" - ApiDependencies.invoker.services.session_execution.cancel() - return ApiDependencies.invoker.services.session_queue.clear() + ApiDependencies.invoker.services.session_execution.cancel( + queue_id=queue_id, + ) + return ApiDependencies.invoker.services.session_queue.clear( + queue_id=queue_id, + ) @session_queue_router.put( - "/prune", + "/{queue_id}/prune", operation_id="prune", responses={ 200: {"model": PruneResult}, }, ) -async def prune() -> PruneResult: +async def prune( + queue_id: str = Path(description="The queue id to perform this operation on"), +) -> PruneResult: """Prunes all completed or errored queue items""" - return ApiDependencies.invoker.services.session_queue.prune() + return ApiDependencies.invoker.services.session_queue.prune( + queue_id=queue_id, + ) @session_queue_router.get( - "/current", + "/{queue_id}/current", operation_id="current", responses={ 200: {"model": Optional[SessionQueueItem]}, }, ) -async def current() -> Optional[SessionQueueItem]: +async def current( + queue_id: str = Path(description="The queue id to perform this operation on"), +) -> Optional[SessionQueueItem]: """Gets the currently execution queue item""" - return ApiDependencies.invoker.services.session_execution.get_current() + return ApiDependencies.invoker.services.session_execution.get_current( + queue_id=queue_id, + ) @session_queue_router.get( - "/peek", + "/{queue_id}/peek", operation_id="peek", responses={ 200: {"model": Optional[SessionQueueItem]}, }, ) -async def peek() -> Optional[SessionQueueItem]: +async def peek( + queue_id: str = Path(description="The queue id to perform this operation on"), +) -> Optional[SessionQueueItem]: """Gets the next queue item, without executing it""" - return ApiDependencies.invoker.services.session_queue.peek() + return ApiDependencies.invoker.services.session_queue.peek( + queue_id=queue_id, + ) @session_queue_router.get( - "/status", + "/{queue_id}/status", operation_id="get_status", responses={ 200: {"model": SessionQueueAndExecutionStatusResult}, }, ) -async def get_status() -> SessionQueueAndExecutionStatusResult: +async def get_status( + queue_id: str = Path(description="The queue id to perform this operation on"), +) -> SessionQueueAndExecutionStatusResult: """Gets the status of the session queue""" - queue_status = ApiDependencies.invoker.services.session_queue.get_status() - execution_status = ApiDependencies.invoker.services.session_execution.get_status() + queue_status = ApiDependencies.invoker.services.session_queue.get_status( + queue_id=queue_id, + ) + execution_status = ApiDependencies.invoker.services.session_execution.get_status( + queue_id=queue_id, + ) return SessionQueueAndExecutionStatusResult(**queue_status.dict(), **execution_status.dict()) @session_queue_router.get( - "/q/{id}", + "/{queue_id}/i/{item_id}", operation_id="get_queue_item", responses={ 200: {"model": SessionQueueItem}, }, ) -async def get_queue_item(id: int = Path(description="The queue item to get")) -> SessionQueueItem: +async def get_queue_item( + queue_id: str = Path(description="The queue id to perform this operation on"), + item_id: str = Path(description="The queue item to get"), +) -> SessionQueueItem: """Gets a queue item""" - return ApiDependencies.invoker.services.session_queue.get_queue_item(id=id) + return ApiDependencies.invoker.services.session_queue.get_queue_item(queue_id=queue_id, item_id=item_id) @session_queue_router.put( - "/q/{id}/cancel", + "/{queue_id}/i/{item_id}/cancel", operation_id="cancel_queue_item", responses={ 200: {"model": SessionQueueItem}, }, ) async def cancel_queue_item( - id: int = Path(description="The queue item to cancel"), + queue_id: str = Path(description="The queue id to perform this operation on"), + item_id: str = Path(description="The queue item to cancel"), ) -> SessionQueueItem: """Deletes a queue item""" - return ApiDependencies.invoker.services.session_queue.set_queue_item_status(id, "canceled") + return ApiDependencies.invoker.services.session_queue.set_queue_item_status( + queue_id=queue_id, item_id=item_id, status="canceled" + ) diff --git a/invokeai/app/api/sockets.py b/invokeai/app/api/sockets.py index 5f43b53c02..956b557d3e 100644 --- a/invokeai/app/api/sockets.py +++ b/invokeai/app/api/sockets.py @@ -41,11 +41,13 @@ class SocketIO: await self.__sio.emit( event=event[1]["event"], data=event[1]["data"], - room="queue", + room=event[1]["data"]["queue_id"], ) - async def _handle_sub_queue(self, sid, *args, **kwargs): - self.__sio.enter_room(sid, "queue") + async def _handle_sub_queue(self, sid, data, *args, **kwargs): + if "queue_id" in data: + self.__sio.enter_room(sid, data["queue_id"]) - async def _handle_unsub_queue(self, sid, *args, **kwargs): - self.__sio.enter_room(sid, "queue") + async def _handle_unsub_queue(self, sid, data, *args, **kwargs): + if "queue_id" in data: + self.__sio.enter_room(sid, data["queue_id"]) diff --git a/invokeai/app/services/events.py b/invokeai/app/services/events.py index 9f98711899..50492a8313 100644 --- a/invokeai/app/services/events.py +++ b/invokeai/app/services/events.py @@ -200,19 +200,10 @@ class EventServiceBase: self.__emit_queue_event( event_name="queue_item_status_changed", payload=dict( - id=session_queue_item.id, + queue_id=session_queue_item.queue_id, + item_id=session_queue_item.item_id, graph_execution_state_id=session_queue_item.session_id, batch_id=session_queue_item.batch_id, status=session_queue_item.status, ), ) - - def emit_queue_status_changed(self, started: bool, stop_after_current: bool) -> None: - """Emitted when a queue item is status_changed""" - self.__emit_queue_event( - event_name="queue_status_changed", - payload=dict( - started=started, - stop_after_current=stop_after_current, - ), - ) diff --git a/invokeai/app/services/session_execution/session_execution_base.py b/invokeai/app/services/session_execution/session_execution_base.py index 2d407295e3..f41662333a 100644 --- a/invokeai/app/services/session_execution/session_execution_base.py +++ b/invokeai/app/services/session_execution/session_execution_base.py @@ -13,31 +13,46 @@ class SessionExecutionServiceBase(ABC): pass @abstractmethod - def invoke_next(self) -> None: + def invoke_next(self, queue_id: str) -> None: """Invokes the next queue item""" pass @abstractmethod - def start(self) -> None: + def start( + self, + queue_id: str, + ) -> None: """Starts session queue execution""" pass @abstractmethod - def stop(self) -> None: + def stop( + self, + queue_id: str, + ) -> None: """Stops session queue execution after the currently executing session finishes""" pass @abstractmethod - def cancel(self) -> None: + def cancel( + self, + queue_id: str, + ) -> None: """Stops session queue execution, immediately canceling the currently-executing session""" pass @abstractmethod - def get_current(self) -> Optional[SessionQueueItem]: + def get_current( + self, + queue_id: str, + ) -> Optional[SessionQueueItem]: """Gets the currently-executing queue item""" pass @abstractmethod - def get_status(self) -> SessionExecutionStatusResult: + def get_status( + self, + queue_id: str, + ) -> SessionExecutionStatusResult: """Gets the status of the session queue""" pass diff --git a/invokeai/app/services/session_execution/session_execution_default.py b/invokeai/app/services/session_execution/session_execution_default.py index 26f382ccce..349390266c 100644 --- a/invokeai/app/services/session_execution/session_execution_default.py +++ b/invokeai/app/services/session_execution/session_execution_default.py @@ -7,7 +7,7 @@ from invokeai.app.services.events import EventServiceBase from invokeai.app.services.invoker import Invoker from invokeai.app.services.session_execution.session_execution_base import SessionExecutionServiceBase from invokeai.app.services.session_execution.session_execution_common import SessionExecutionStatusResult -from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem, SessionQueueItemNotFoundError +from invokeai.app.services.session_queue.session_queue_common import SessionQueueItem class DefaultSessionExecutionService(SessionExecutionServiceBase): @@ -36,80 +36,72 @@ class DefaultSessionExecutionService(SessionExecutionServiceBase): async def _handle_complete_event(self, event: Event, err: bool) -> None: data = event[1]["data"] - try: - queue_item = self._invoker.services.session_queue.get_queue_item_by_session_id( - data["graph_execution_state_id"] - ) - except SessionQueueItemNotFoundError: - # shouldn't happen - we should have a queue item for every session - queue_item = None + queue_item = self._invoker.services.session_queue.get_queue_item_by_session_id(data["graph_execution_state_id"]) # Sessions are marked complete when they have an error, so we get an `invocation_error` # followed by a `graph_execution_state_complete`. Don't mark queue items complete if # they are already marked error. - if queue_item is not None and queue_item.status != "failed": + if queue_item.status != "failed": queue_item = self._invoker.services.session_queue.set_queue_item_status( - queue_item.id, "failed" if err else "completed" + queue_id=queue_item.queue_id, item_id=queue_item.item_id, status="failed" if err else "completed" ) self._invoker.services.events.emit_queue_item_status_changed(queue_item) if self._stop_after_current: self._stop_after_current = False self._started = False - self._emit_queue_status() self._current = None if self._started: - self.invoke_next() - - def _emit_queue_status(self) -> None: - self._invoker.services.events.emit_queue_status_changed(self._started, self._stop_after_current) + self.invoke_next(queue_id=queue_item.queue_id) def _emit_queue_item_status(self) -> None: if self._current is None: return self._invoker.services.events.emit_queue_item_status_changed(self._current) - def invoke_next(self) -> None: + def invoke_next(self, queue_id: str) -> None: # do not invoke if already invoking if self._current: return - queue_item = self._invoker.services.session_queue.dequeue() + queue_item = self._invoker.services.session_queue.dequeue(queue_id=queue_id) if queue_item is None: # queue empty self._current = None self._started = False self._stop_after_current = False - self._emit_queue_status() return self._current = queue_item + # execute the session self._invoker.services.graph_execution_manager.set(queue_item.session) self._emit_queue_item_status() self._invoker.invoke(self._current.session, invoke_all=True) - def start(self) -> None: + def start( + self, + queue_id: str, + ) -> None: if not self._stop_after_current: self._started = True - self._emit_queue_status() - self.invoke_next() + self.invoke_next(queue_id=queue_id) - def stop(self) -> None: + def stop(self, queue_id: str) -> None: self._started = False self._stop_after_current = True - self._emit_queue_status() - def cancel(self) -> None: + def cancel(self, queue_id: str) -> None: if self._current is not None: self._invoker.services.queue.cancel(self._current.session_id) - self._current = self._invoker.services.session_queue.set_queue_item_status(self._current.id, "canceled") + self._current = self._invoker.services.session_queue.set_queue_item_status( + queue_id=self._current.queue_id, item_id=self._current.item_id, status="canceled" + ) self._emit_queue_item_status() self._current = None self._started = False self._stop_after_current = False - self._emit_queue_status() - def get_current(self) -> Optional[SessionQueueItem]: + def get_current(self, queue_id: str) -> Optional[SessionQueueItem]: return self._current - def get_status(self) -> SessionExecutionStatusResult: + def get_status(self, queue_id: str) -> SessionExecutionStatusResult: return SessionExecutionStatusResult(started=self._started, stop_after_current=self._stop_after_current) diff --git a/invokeai/app/services/session_queue/session_queue_base.py b/invokeai/app/services/session_queue/session_queue_base.py index c810851f47..2f2474219a 100644 --- a/invokeai/app/services/session_queue/session_queue_base.py +++ b/invokeai/app/services/session_queue/session_queue_base.py @@ -27,68 +27,69 @@ class SessionQueueBase(ABC): pass @abstractmethod - def enqueue(self, graph: Graph, prepend: bool) -> EnqueueResult: + def enqueue(self, queue_id: str, graph: Graph, prepend: bool) -> EnqueueResult: """Enqueues a single graph for execution.""" pass @abstractmethod - def enqueue_batch(self, batch: Batch, prepend: bool) -> EnqueueResult: + def enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueResult: """Enqueues all permutations of a batch for execution.""" pass @abstractmethod - def dequeue(self) -> Optional[SessionQueueItem]: + def dequeue(self, queue_id: str) -> Optional[SessionQueueItem]: """Dequeues the next session queue item, returning it if one is available.""" pass @abstractmethod - def peek(self) -> Optional[SessionQueueItem]: + def peek(self, queue_id: str) -> Optional[SessionQueueItem]: """Peeks at the next session queue item, returning it if one is available.""" pass @abstractmethod - def clear(self) -> ClearResult: + def clear(self, queue_id: str) -> ClearResult: """Deletes all session queue items""" pass @abstractmethod - def prune(self) -> PruneResult: + def prune(self, queue_id: str) -> PruneResult: """Deletes all completed and errored session queue items""" pass @abstractmethod - def is_empty(self) -> IsEmptyResult: + def is_empty(self, queue_id: str) -> IsEmptyResult: """Checks if the queue is empty""" pass @abstractmethod - def is_full(self) -> IsFullResult: + def is_full(self, queue_id: str) -> IsFullResult: """Checks if the queue is empty""" pass @abstractmethod - def cancel_by_batch_ids(self, batch_ids: list[str]) -> CancelByBatchIDsResult: + def cancel_by_batch_ids(self, queue_id: str, batch_ids: list[str]) -> CancelByBatchIDsResult: """Cancels all queue items with matching batch IDs""" pass @abstractmethod - def get_status(self) -> SessionQueueStatusResult: + def get_status(self, queue_id: str) -> SessionQueueStatusResult: """Gets the number of queue items with each status""" pass @abstractmethod def list_queue_items( self, + queue_id: str, limit: int, priority: int, - cursor: Optional[int] = None, + order_id: Optional[int] = None, status: Optional[QUEUE_ITEM_STATUS] = None, ) -> CursorPaginatedResults[SessionQueueItemDTO]: """Gets a page of session queue items""" pass @abstractmethod - def get_queue_item(self, id: int) -> SessionQueueItem: + def get_queue_item(self, queue_id: str, item_id: str) -> SessionQueueItem: """Gets a session queue item by ID""" pass @@ -98,16 +99,18 @@ class SessionQueueBase(ABC): pass @abstractmethod - def set_queue_item_status(self, id: int, status: QUEUE_ITEM_STATUS) -> SessionQueueItem: + def set_queue_item_status(self, queue_id: str, item_id: str, status: QUEUE_ITEM_STATUS) -> SessionQueueItem: """Sets the status of a session queue item""" pass @abstractmethod - def set_many_queue_item_status(self, ids: list[str], status: QUEUE_ITEM_STATUS) -> SetManyQueueItemStatusResult: + def set_many_queue_item_status( + self, queue_id: str, item_ids: list[str], status: QUEUE_ITEM_STATUS + ) -> SetManyQueueItemStatusResult: """Sets the status of a session queue item""" pass @abstractmethod - def delete_queue_item(self, id: int) -> SessionQueueItem: + def delete_queue_item(self, queue_id: str, item_id: str) -> SessionQueueItem: """Deletes a session queue item by ID""" pass diff --git a/invokeai/app/services/session_queue/session_queue_common.py b/invokeai/app/services/session_queue/session_queue_common.py index 4c566bc1bd..4b44ddddd1 100644 --- a/invokeai/app/services/session_queue/session_queue_common.py +++ b/invokeai/app/services/session_queue/session_queue_common.py @@ -1,7 +1,7 @@ import datetime import json from itertools import chain, product -from typing import Iterable, Literal, NamedTuple, Optional, TypeAlias, Union, cast +from typing import Iterable, Literal, Optional, TypeAlias, Union, cast from pydantic import BaseModel, Field, StrictStr, parse_raw_as, root_validator, validator from pydantic.json import pydantic_encoder @@ -134,6 +134,8 @@ class Batch(BaseModel): # region Queue Items +DEFAULT_QUEUE_ID = "default" + QUEUE_ITEM_STATUS = Literal["pending", "in_progress", "completed", "failed", "canceled"] @@ -150,7 +152,8 @@ def get_session(queue_item_dict: dict) -> GraphExecutionState: class SessionQueueItemWithoutGraph(BaseModel): """Session queue item without the full graph. Used for serialization.""" - id: int = Field(description="The ID of the session queue item") + item_id: str = Field(description="The unique identifier of the session queue item") + order_id: int = Field(description="The auto-incrementing ID of the session queue item") status: QUEUE_ITEM_STATUS = Field(default="pending", description="The status of this queue item") priority: int = Field(default=0, description="The priority of this queue item") batch_id: str = Field(description="The ID of the batch associated with this queue item") @@ -160,6 +163,7 @@ class SessionQueueItemWithoutGraph(BaseModel): field_values: Optional[list[NodeFieldValue]] = Field( default=None, description="The field values that were used for this queue item" ) + queue_id: str = Field(description="The id of the queue with which this item is associated") error: Optional[str] = Field(default=None, description="The error message if this queue item errored") created_at: Union[datetime.datetime, str] = Field(description="When this queue item was created") updated_at: Union[datetime.datetime, str] = Field(description="When this queue item was updated") @@ -174,9 +178,11 @@ class SessionQueueItemWithoutGraph(BaseModel): class Config: schema_extra = { "required": [ - "id", + "item_id", + "order_id", "status", "batch_id", + "queue_id", "session_id", "priority", "session_id", @@ -203,9 +209,11 @@ class SessionQueueItem(SessionQueueItemWithoutGraph): class Config: schema_extra = { "required": [ - "id", + "item_id", + "order_id", "status", "batch_id", + "queue_id", "session_id", "session", "priority", @@ -222,6 +230,7 @@ class SessionQueueItem(SessionQueueItemWithoutGraph): class SessionQueueStatusResult(BaseModel): + queue_id: str = Field(..., description="The ID of the queue") pending: int = Field(..., description="Number of queue items with status 'pending'") in_progress: int = Field(..., description="Number of queue items with status 'in_progress'") completed: int = Field(..., description="Number of queue items with status 'complete'") @@ -232,7 +241,7 @@ class SessionQueueStatusResult(BaseModel): class SetManyQueueItemStatusResult(BaseModel): - ids: list[str] = Field(..., description="The queue item IDs that were updated") + item_ids: list[str] = Field(..., description="The queue item IDs that were updated") status: QUEUE_ITEM_STATUS = Field(..., description="The new status of the queue items") @@ -347,33 +356,43 @@ def calc_session_count(batch: Batch) -> int: return len(data_product) * batch.runs -class ValueTuple(NamedTuple): - session: str - session_id: str - batch_id: str - node_field_values: Optional[str] - priority: int +ValuesToInsert: TypeAlias = list[ + tuple[ + str, # item_id + str, # queue_id + str, # session json + str, # session_id + str, # batch_id + Optional[str], # field_values json + int, # priority + int, # order_id + ] +] +"""(item_id, queue_id, session (json), session_id, batch_id, field_values (json), priority)""" -ValuesToInsert: TypeAlias = list[ValueTuple] - - -def prepare_values_to_insert(batch: Batch, priority: int, max_new_queue_items: int) -> ValuesToInsert: +def prepare_values_to_insert( + queue_id: str, batch: Batch, priority: int, max_new_queue_items: int, order_id: int +) -> ValuesToInsert: values_to_insert: ValuesToInsert = [] - sessions = create_session_nfv_tuples(batch, max_new_queue_items) - for session, field_values in sessions: + session_and_field_value_tuples = create_session_nfv_tuples(batch, max_new_queue_items) + for session, field_values in session_and_field_value_tuples: # sessions must have unique id session.id = uuid_string() values_to_insert.append( - ValueTuple( - session.json(), - session.id, - batch.batch_id, + ( + uuid_string(), # item_id + queue_id, # queue_id + session.json(), # session (json) + session.id, # session_id + batch.batch_id, # batch_id # must use pydantic_encoder bc field_values is a list of models - json.dumps(field_values, default=pydantic_encoder) if field_values else None, - priority, + json.dumps(field_values, default=pydantic_encoder) if field_values else None, # field_values (json) + priority, # priority + order_id, ) ) + order_id += 1 return values_to_insert diff --git a/invokeai/app/services/session_queue/session_queue_sqlite.py b/invokeai/app/services/session_queue/session_queue_sqlite.py index 1ab759fe01..33708aa8e4 100644 --- a/invokeai/app/services/session_queue/session_queue_sqlite.py +++ b/invokeai/app/services/session_queue/session_queue_sqlite.py @@ -6,6 +6,7 @@ from invokeai.app.services.graph import Graph from invokeai.app.services.invoker import Invoker from invokeai.app.services.session_queue.session_queue_base import SessionQueueBase from invokeai.app.services.session_queue.session_queue_common import ( + DEFAULT_QUEUE_ID, QUEUE_ITEM_STATUS, Batch, CancelByBatchIDsResult, @@ -50,8 +51,10 @@ class SqliteSessionQueue(SessionQueueBase): self._cursor.execute( """--sql CREATE TABLE IF NOT EXISTS session_queue ( - id INTEGER PRIMARY KEY AUTOINCREMENT, -- used for ordering, cursor pagination + item_id TEXT NOT NULL PRIMARY KEY, -- the unique identifier of this queue item + order_id INTEGER NOT NULL, -- used for ordering, cursor pagination batch_id TEXT NOT NULL, -- identifier of the batch this queue item belongs to + queue_id TEXT NOT NULL, -- identifier of the queue this queue item belongs to session_id TEXT NOT NULL UNIQUE, -- duplicated data from the session column, for ease of access field_values TEXT, -- NULL if no values are associated with this queue item session TEXT NOT NULL, -- the session to be executed @@ -69,7 +72,13 @@ class SqliteSessionQueue(SessionQueueBase): self._cursor.execute( """--sql - CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_id ON session_queue(id); + CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_item_id ON session_queue(item_id); + """ + ) + + self._cursor.execute( + """--sql + CREATE UNIQUE INDEX IF NOT EXISTS idx_session_queue_order_id ON session_queue(order_id); """ ) @@ -102,11 +111,14 @@ class SqliteSessionQueue(SessionQueueBase): CREATE TRIGGER IF NOT EXISTS tg_session_queue_completed_at AFTER UPDATE OF status ON session_queue FOR EACH ROW - WHEN NEW.status = 'completed' OR NEW.status = 'failed' or NEW.status = 'canceled' + WHEN + NEW.status = 'completed' + OR NEW.status = 'failed' + OR NEW.status = 'canceled' BEGIN UPDATE session_queue SET completed_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') - WHERE id = NEW.id; + WHERE item_id = NEW.item_id; END; """ ) @@ -119,7 +131,7 @@ class SqliteSessionQueue(SessionQueueBase): BEGIN UPDATE session_queue SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW') - WHERE id = old.id; + WHERE item_id = old.item_id; END; """ ) @@ -143,47 +155,67 @@ class SqliteSessionQueue(SessionQueueBase): def start_service(self, invoker: Invoker) -> None: self._invoker = invoker self._set_in_progress_to_canceled() - prune_result = self.prune() + prune_result = self.prune(DEFAULT_QUEUE_ID) self._invoker.services.logger.info(f"Pruned {prune_result.deleted} finished queue items") - def enqueue(self, graph: Graph, prepend: bool) -> EnqueueResult: - return self.enqueue_batch(Batch(graph=graph), prepend=prepend) + def enqueue(self, queue_id: str, graph: Graph, prepend: bool) -> EnqueueResult: + return self.enqueue_batch(queue_id=queue_id, batch=Batch(graph=graph), prepend=prepend) - def _get_current_queue_size(self) -> int: + def _get_current_queue_size(self, queue_id: str) -> int: self._cursor.execute( """--sql SELECT count(*) FROM session_queue - WHERE status = 'pending' - """ + WHERE + queue_id = ? + AND status = 'pending' + """, + (queue_id,), ) return cast(int, self._cursor.fetchone()[0]) - def _get_highest_priority(self) -> int: + def _get_highest_priority(self, queue_id: str) -> int: self._cursor.execute( """--sql SELECT MAX(priority) FROM session_queue - WHERE status = 'pending' - """ + WHERE + queue_id = ? + AND status = 'pending' + """, + (queue_id,), ) return cast(Union[int, None], self._cursor.fetchone()[0]) or 0 - def enqueue_batch(self, batch: Batch, prepend: bool) -> EnqueueResult: + def enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueResult: try: self._lock.acquire() # TODO: how does this work in a multi-user scenario? - current_queue_size = self._get_current_queue_size() + current_queue_size = self._get_current_queue_size(queue_id=queue_id) max_queue_size = self._invoker.services.configuration.get_config().max_queue_size max_new_queue_items = max_queue_size - current_queue_size priority = 0 if prepend: - priority = self._get_highest_priority() + 1 + priority = self._get_highest_priority(queue_id=queue_id) + 1 + + self._cursor.execute( + """--sql + SELECT MAX(order_id) + FROM session_queue + """ + ) + max_order_id = cast(Optional[int], self._cursor.fetchone()[0]) or 0 requested_count = calc_session_count(batch) - values_to_insert = prepare_values_to_insert(batch, priority, max_new_queue_items) + values_to_insert = prepare_values_to_insert( + queue_id=queue_id, + batch=batch, + priority=priority, + max_new_queue_items=max_new_queue_items, + order_id=max_order_id + 1, + ) enqueued_count = len(values_to_insert) if requested_count > enqueued_count: @@ -191,8 +223,8 @@ class SqliteSessionQueue(SessionQueueBase): self._cursor.executemany( """--sql - INSERT INTO session_queue (session, session_id, batch_id, field_values, priority) - VALUES (?, ?, ?, ?, ?) + INSERT INTO session_queue (item_id, queue_id, session, session_id, batch_id, field_values, priority, order_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, values_to_insert, ) @@ -209,16 +241,22 @@ class SqliteSessionQueue(SessionQueueBase): priority=priority, ) - def dequeue(self) -> Optional[SessionQueueItem]: + def dequeue(self, queue_id: str) -> Optional[SessionQueueItem]: try: self._lock.acquire() self._cursor.execute( """--sql - SELECT id FROM session_queue - WHERE status = 'pending' - ORDER BY priority DESC, id ASC -- created_at doesn't have high enough precision to be used for ordering + SELECT item_id + FROM session_queue + WHERE + queue_id = ? + AND status = 'pending' + ORDER BY + priority DESC, + order_id ASC -- created_at doesn't have high enough precision to be used for ordering LIMIT 1 - """ + """, + (queue_id,), ) result = cast(Union[sqlite3.Row, None], self._cursor.fetchone()) except Exception: @@ -228,18 +266,24 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() if result is None: return None - return self.set_queue_item_status(result[0], "in_progress") + return self.set_queue_item_status(queue_id=queue_id, item_id=result[0], status="in_progress") - def peek(self) -> Optional[SessionQueueItem]: + def peek(self, queue_id: str) -> Optional[SessionQueueItem]: try: self._lock.acquire() self._cursor.execute( """--sql - SELECT * FROM session_queue - WHERE status = 'pending' - ORDER BY priority DESC, created_at ASC + SELECT * + FROM session_queue + WHERE + queue_id = ? + AND status = 'pending' + ORDER BY + priority DESC, + created_at ASC LIMIT 1 - """ + """, + (queue_id,), ) result = cast(Union[sqlite3.Row, None], self._cursor.fetchone()) except Exception: @@ -251,16 +295,18 @@ class SqliteSessionQueue(SessionQueueBase): return None return SessionQueueItem.from_dict(dict(result)) - def set_queue_item_status(self, id: int, status: QUEUE_ITEM_STATUS) -> SessionQueueItem: + def set_queue_item_status(self, queue_id: str, item_id: int, status: QUEUE_ITEM_STATUS) -> SessionQueueItem: try: self._lock.acquire() self._cursor.execute( """--sql UPDATE session_queue SET status = ? - WHERE id = ? + WHERE + queue_id = ? + AND item_id = ? """, - (status, id), + (status, queue_id, item_id), ) self._conn.commit() except Exception: @@ -268,32 +314,39 @@ class SqliteSessionQueue(SessionQueueBase): raise finally: self._lock.release() - return self.get_queue_item(id) + return self.get_queue_item(queue_id=queue_id, item_id=item_id) - def set_many_queue_item_status(self, ids: list[str], status: QUEUE_ITEM_STATUS) -> SetManyQueueItemStatusResult: + def set_many_queue_item_status( + self, queue_id: str, item_ids: list[str], status: QUEUE_ITEM_STATUS + ) -> SetManyQueueItemStatusResult: try: self._lock.acquire() # update the queue items - placeholders = ", ".join(["?" for _ in ids]) + placeholders = ", ".join(["?" for _ in item_ids]) update_query = f"""--sql UPDATE session_queue SET status = ? - WHERE id IN ({placeholders}) + WHERE + queue_id in ? + AND item_id IN ({placeholders}) """ - self._cursor.execute(update_query, [status] + ids) + self._cursor.execute(update_query, [queue_id, status] + item_ids) self._conn.commit() # get queue items from list which were set to the status successfully fetch_query = f"""--sql - SELECT id + SELECT item_id FROM session_queue - WHERE status = ? AND id IN ({placeholders}) + WHERE + queue_id = ? + AND status = ? + AND item_id IN ({placeholders}) """ - self._cursor.execute(fetch_query, [status] + ids) + self._cursor.execute(fetch_query, [queue_id, status] + item_ids) result = cast(list[sqlite3.Row], self._cursor.fetchall()) except Exception: self._conn.rollback() @@ -302,16 +355,18 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() updated_ids = [row[0] for row in result] - return SetManyQueueItemStatusResult(ids=updated_ids, status=status) + return SetManyQueueItemStatusResult(item_ids=updated_ids, status=status) - def is_empty(self) -> IsEmptyResult: + def is_empty(self, queue_id: str) -> IsEmptyResult: try: self._lock.acquire() self._cursor.execute( """--sql SELECT count(*) FROM session_queue - """ + WHERE queue_id = ? + """, + (queue_id,), ) is_empty = cast(int, self._cursor.fetchone()[0]) == 0 except Exception: @@ -321,14 +376,16 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() return IsEmptyResult(is_empty=is_empty) - def is_full(self) -> IsFullResult: + def is_full(self, queue_id: str) -> IsFullResult: try: self._lock.acquire() self._cursor.execute( """--sql SELECT count(*) FROM session_queue - """ + WHERE queue_id = ? + """, + (queue_id,), ) max_queue_size = self._invoker.services.configuration.max_queue_size is_full = cast(int, self._cursor.fetchone()[0]) >= max_queue_size @@ -339,16 +396,18 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() return IsFullResult(is_full=is_full) - def delete_queue_item(self, id: int) -> SessionQueueItem: - queue_item = self.get_queue_item(id) + def delete_queue_item(self, queue_id: str, item_id: int) -> SessionQueueItem: + queue_item = self.get_queue_item(queue_id=queue_id, item_id=item_id) try: self._lock.acquire() self._cursor.execute( """--sql DELETE FROM session_queue - WHERE id = ? + WHERE + queue_id = ? + AND item_id = ? """, - (id,), + (queue_id, item_id), ) self._conn.commit() except Exception: @@ -358,19 +417,25 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() return queue_item - def clear(self) -> ClearResult: + def clear(self, queue_id: str) -> ClearResult: try: self._lock.acquire() self._cursor.execute( """--sql - SELECT COUNT(*) FROM session_queue - """ + SELECT COUNT(*) + FROM session_queue + WHERE queue_id = ? + """, + (queue_id,), ) count = self._cursor.fetchone()[0] self._cursor.execute( """--sql - DELETE FROM session_queue - """ + DELETE + FROM session_queue + WHERE queue_id = ? + """, + (queue_id,), ) self._conn.commit() except Exception: @@ -380,22 +445,34 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() return ClearResult(deleted=count) - def prune(self) -> PruneResult: + def prune(self, queue_id: str) -> PruneResult: try: - where = "WHERE status = 'completed' OR status = 'failed' OR status = 'canceled'" + where = """--sql + WHERE + queue_id = ? + AND ( + status = 'completed' + OR status = 'failed' + OR status = 'canceled' + ) + """ self._lock.acquire() self._cursor.execute( f"""--sql - SELECT COUNT(*) FROM session_queue + SELECT COUNT(*) + FROM session_queue {where}; - """ + """, + (queue_id,), ) count = self._cursor.fetchone()[0] self._cursor.execute( f"""--sql - DELETE FROM session_queue + DELETE + FROM session_queue {where}; - """ + """, + (queue_id,), ) self._conn.commit() except Exception: @@ -405,17 +482,25 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() return PruneResult(deleted=count) - def cancel_by_batch_ids(self, batch_ids: list[str]) -> CancelByBatchIDsResult: + def cancel_by_batch_ids(self, queue_id: str, batch_ids: list[str]) -> CancelByBatchIDsResult: try: self._lock.acquire() placeholders = ", ".join(["?" for _ in batch_ids]) - where = f"WHERE batch_id IN ({placeholders}) AND status != 'canceled' AND status != 'completed'" + where = f"""--sql + WHERE + queue_id = ? + AND batch_id IN ({placeholders}) + AND status != 'canceled' + AND status != 'completed' + """ + params = [queue_id] + batch_ids self._cursor.execute( f"""--sql - SELECT COUNT(*) FROM session_queue + SELECT COUNT(*) + FROM session_queue {where}; """, - batch_ids, + tuple(params), ) count = self._cursor.fetchone()[0] self._cursor.execute( @@ -424,7 +509,7 @@ class SqliteSessionQueue(SessionQueueBase): SET status = 'canceled' {where}; """, - batch_ids, + tuple(params), ) self._conn.commit() except Exception: @@ -434,15 +519,17 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() return CancelByBatchIDsResult(canceled=count) - def get_queue_item(self, id: int) -> SessionQueueItem: + def get_queue_item(self, queue_id: str, item_id: int) -> SessionQueueItem: try: self._lock.acquire() self._cursor.execute( """--sql SELECT * FROM session_queue - WHERE id = ? + WHERE + queue_id = ? + AND item_id = ? """, - (id,), + (queue_id, item_id), ) result = cast(Union[sqlite3.Row, None], self._cursor.fetchone()) except Exception: @@ -451,7 +538,7 @@ class SqliteSessionQueue(SessionQueueBase): finally: self._lock.release() if result is None: - raise SessionQueueItemNotFoundError(f"No queue item with id {id}") + raise SessionQueueItemNotFoundError(f"No queue item with id {item_id}") return SessionQueueItem.from_dict(dict(result)) def get_queue_item_by_session_id(self, session_id: str) -> SessionQueueItem: @@ -460,7 +547,8 @@ class SqliteSessionQueue(SessionQueueBase): self._cursor.execute( """--sql SELECT * FROM session_queue - WHERE session_id = ? + WHERE + session_id = ? """, (session_id,), ) @@ -476,15 +564,17 @@ class SqliteSessionQueue(SessionQueueBase): def list_queue_items( self, + queue_id: str, limit: int, priority: int, - cursor: Optional[int] = None, + order_id: Optional[int] = None, status: Optional[QUEUE_ITEM_STATUS] = None, ) -> CursorPaginatedResults[SessionQueueItemDTO]: try: self._lock.acquire() query = """--sql - SELECT id, + SELECT item_id, + order_id, status, priority, field_values, @@ -493,23 +583,33 @@ class SqliteSessionQueue(SessionQueueBase): updated_at, completed_at, session_id, - batch_id + batch_id, + queue_id FROM session_queue - WHERE 1 = 1 + WHERE queue_id = ? """ - params = [] + params: list[Union[str, int]] = [queue_id] if status is not None: - query += " AND status = ?" + query += """--sql + AND status = ? + """ params.append(status) - if cursor is not None: - query += " AND (priority < ?) OR (priority = ? AND id > ?)" - params.extend([priority, priority, cursor]) + if order_id is not None: + query += """--sql + AND (priority < ?) OR (priority = ? AND order_id > ?) + """ + params.extend([priority, priority, order_id]) - query += " ORDER BY priority DESC, id ASC LIMIT ?" + query += """--sql + ORDER BY + priority DESC, + order_id ASC + LIMIT ? + """ params.append(limit + 1) - self._cursor.execute(query, tuple(params)) + self._cursor.execute(query, params) results = cast(list[sqlite3.Row], self._cursor.fetchall()) items = [SessionQueueItemDTO.from_dict(dict(result)) for result in results] has_more = False @@ -524,15 +624,17 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() return CursorPaginatedResults(items=items, limit=limit, has_more=has_more) - def get_status(self) -> SessionQueueStatusResult: + def get_status(self, queue_id: str) -> SessionQueueStatusResult: try: self._lock.acquire() self._cursor.execute( """--sql SELECT status, count(*) FROM session_queue + WHERE queue_id = ? GROUP BY status - """ + """, + (queue_id,), ) result = cast(list[sqlite3.Row], self._cursor.fetchall()) total = sum(row[1] for row in result) @@ -544,6 +646,7 @@ class SqliteSessionQueue(SessionQueueBase): self._lock.release() return SessionQueueStatusResult( + queue_id=queue_id, pending=counts.get("pending", 0), in_progress=counts.get("in_progress", 0), completed=counts.get("completed", 0), diff --git a/invokeai/frontend/web/src/app/components/InvokeAIUI.tsx b/invokeai/frontend/web/src/app/components/InvokeAIUI.tsx index 505b1e13b4..8c8c02ee85 100644 --- a/invokeai/frontend/web/src/app/components/InvokeAIUI.tsx +++ b/invokeai/frontend/web/src/app/components/InvokeAIUI.tsx @@ -17,6 +17,7 @@ import '../../i18n'; import AppDndContext from '../../features/dnd/components/AppDndContext'; import { $customStarUI, CustomStarUi } from 'app/store/nanostores/customStarUI'; import { $headerComponent } from 'app/store/nanostores/headerComponent'; +import { $queueId, DEFAULT_QUEUE_ID } from 'features/queue/store/nanoStores'; const App = lazy(() => import('./App')); const ThemeLocaleProvider = lazy(() => import('./ThemeLocaleProvider')); @@ -28,6 +29,7 @@ interface Props extends PropsWithChildren { headerComponent?: ReactNode; middleware?: Middleware[]; projectId?: string; + queueId?: string; selectedImage?: { imageName: string; action: 'sendToImg2Img' | 'sendToCanvas' | 'useAllParameters'; @@ -42,6 +44,7 @@ const InvokeAIUI = ({ headerComponent, middleware, projectId, + queueId, selectedImage, customStarUi, }: Props) => { @@ -61,6 +64,11 @@ const InvokeAIUI = ({ $projectId.set(projectId); } + // configure API client project header + if (queueId) { + $queueId.set(queueId); + } + // reset dynamically added middlewares resetMiddlewares(); @@ -81,8 +89,9 @@ const InvokeAIUI = ({ $baseUrl.set(undefined); $authToken.set(undefined); $projectId.set(undefined); + $queueId.set(DEFAULT_QUEUE_ID); }; - }, [apiUrl, token, middleware, projectId]); + }, [apiUrl, token, middleware, projectId, queueId]); useEffect(() => { if (customStarUi) { diff --git a/invokeai/frontend/web/src/app/store/middleware/listenerMiddleware/listeners/socketio/socketQueueItemStatusChanged.ts b/invokeai/frontend/web/src/app/store/middleware/listenerMiddleware/listeners/socketio/socketQueueItemStatusChanged.ts index 7583212988..0949962cac 100644 --- a/invokeai/frontend/web/src/app/store/middleware/listenerMiddleware/listeners/socketio/socketQueueItemStatusChanged.ts +++ b/invokeai/frontend/web/src/app/store/middleware/listenerMiddleware/listeners/socketio/socketQueueItemStatusChanged.ts @@ -12,18 +12,17 @@ export const addSocketQueueItemStatusChangedEventListener = () => { actionCreator: socketQueueItemStatusChanged, effect: (action, { dispatch, getState }) => { const log = logger('socketio'); - const { id, status: newStatus } = action.payload.data; + const { item_id, status: newStatus } = action.payload.data; log.debug( action.payload, - `Queue item ${id} status updated: ${newStatus}` + `Queue item ${item_id} status updated: ${newStatus}` ); - // pass along the socket event as an application action dispatch(appSocketQueueItemStatusChanged(action.payload)); dispatch( queueApi.util.updateQueryData('listQueueItems', undefined, (draft) => { queueItemsAdapter.updateOne(draft, { - id, + id: item_id, changes: { status: newStatus }, }); }) diff --git a/invokeai/frontend/web/src/app/store/middleware/listenerMiddleware/listeners/userInvokedCanvas.ts b/invokeai/frontend/web/src/app/store/middleware/listenerMiddleware/listeners/userInvokedCanvas.ts index a1e540aa4b..16c5a59e82 100644 --- a/invokeai/frontend/web/src/app/store/middleware/listenerMiddleware/listeners/userInvokedCanvas.ts +++ b/invokeai/frontend/web/src/app/store/middleware/listenerMiddleware/listeners/userInvokedCanvas.ts @@ -189,59 +189,6 @@ export const addUserInvokedCanvasListener = () => { }) ); } - - // // Create the session, store the request id - // const { requestId: sessionCreatedRequestId } = dispatch( - // sessionCreated({ graph }) - // ); - - // // Take the session created action, matching by its request id - // const [sessionCreatedAction] = await take( - // (action): action is ReturnType => - // sessionCreated.fulfilled.match(action) && - // action.meta.requestId === sessionCreatedRequestId - // ); - // const session_id = sessionCreatedAction.payload.id; - - // // Associate the init image with the session, now that we have the session ID - // if (['img2img', 'inpaint'].includes(generationMode) && canvasInitImage) { - // dispatch( - // imagesApi.endpoints.changeImageSessionId.initiate({ - // imageDTO: canvasInitImage, - // session_id, - // }) - // ); - // } - - // // Associate the mask image with the session, now that we have the session ID - // if (['inpaint'].includes(generationMode) && canvasMaskImage) { - // dispatch( - // imagesApi.endpoints.changeImageSessionId.initiate({ - // imageDTO: canvasMaskImage, - // session_id, - // }) - // ); - // } - - // // Prep the canvas staging area if it is not yet initialized - // if (!state.canvas.layerState.stagingArea.boundingBox) { - // dispatch( - // stagingAreaInitialized({ - // batch_id, - // sessionId: session_id, - // boundingBox: { - // ...state.canvas.boundingBoxCoordinates, - // ...state.canvas.boundingBoxDimensions, - // }, - // }) - // ); - // } - - // // Flag the session with the canvas session ID - // dispatch(canvasSessionIdChanged(session_id)); - - // // We are ready to invoke the session! - // dispatch(sessionReadyToInvoke()); }, }); }; diff --git a/invokeai/frontend/web/src/features/queue/components/QueueTable.tsx b/invokeai/frontend/web/src/features/queue/components/QueueTable.tsx index 3023211bbf..28cc06b2e4 100644 --- a/invokeai/frontend/web/src/features/queue/components/QueueTable.tsx +++ b/invokeai/frontend/web/src/features/queue/components/QueueTable.tsx @@ -83,15 +83,10 @@ const QueueTab = () => { return () => osInstance()?.destroy(); }, [scroller, initialize, osInstance]); - const { data: listQueueItemsData } = useListQueueItemsQuery( - { - cursor: listCursor, - priority: listPriority, - }, - { refetchOnMountOrArgChange: true } - ); - - console.log(listCursor, listPriority); + const { data: listQueueItemsData } = useListQueueItemsQuery({ + cursor: listCursor, + priority: listPriority, + }); const queueItems = useMemo(() => { if (!listQueueItemsData) { @@ -101,9 +96,14 @@ const QueueTab = () => { }, [listQueueItemsData]); const handleLoadMore = useCallback(() => { - dispatch(listCursorChanged(queueItems[queueItems.length - 1]?.id)); + if (!listQueueItemsData?.has_more) { + return; + } + dispatch(listCursorChanged(queueItems[queueItems.length - 1]?.order_id)); dispatch(listPriorityChanged(queueItems[queueItems.length - 1]?.priority)); - }, [dispatch, queueItems]); + }, [dispatch, listQueueItemsData?.has_more, queueItems]); + + console.log(listCursor, listPriority); return ( @@ -157,7 +157,7 @@ const FixedHeaderContent: FixedHeaderContent = () => ( const ItemContent: ItemContent = ( index, - { id, status, batch_id, field_values } + { item_id, status, batch_id, field_values } ) => ( <> = ( .filter((v) => v.node_path !== 'metadata_accumulator') .map(({ node_path, field_name, value }) => ( v.node_path !== 'metadata_accumulator') .map(({ node_path, field_name, value }) => ( {node_path}.{field_name} diff --git a/invokeai/frontend/web/src/features/queue/store/nanoStores.ts b/invokeai/frontend/web/src/features/queue/store/nanoStores.ts new file mode 100644 index 0000000000..462cf69d0a --- /dev/null +++ b/invokeai/frontend/web/src/features/queue/store/nanoStores.ts @@ -0,0 +1,5 @@ +import { atom } from 'nanostores'; + +export const DEFAULT_QUEUE_ID = 'default'; + +export const $queueId = atom(DEFAULT_QUEUE_ID); diff --git a/invokeai/frontend/web/src/features/queue/store/queueSlice.ts b/invokeai/frontend/web/src/features/queue/store/queueSlice.ts index 6a27590969..52001b3cec 100644 --- a/invokeai/frontend/web/src/features/queue/store/queueSlice.ts +++ b/invokeai/frontend/web/src/features/queue/store/queueSlice.ts @@ -1,5 +1,4 @@ import { PayloadAction, createSlice } from '@reduxjs/toolkit'; -import { queueApi } from 'services/api/endpoints/queue'; export interface QueueState { listCursor: number | undefined; @@ -23,25 +22,14 @@ export const queueSlice = createSlice({ listPriorityChanged: (state, action: PayloadAction) => { state.listPriority = action.payload; }, - }, - extraReducers(builder) { - builder.addMatcher( - queueApi.endpoints.enqueueBatch.matchPending, - (state) => { - state.listCursor = undefined; - state.listPriority = undefined; - } - ); - builder.addMatcher( - queueApi.endpoints.enqueueGraph.matchPending, - (state) => { - state.listCursor = undefined; - state.listPriority = undefined; - } - ); + listParamsReset: (state) => { + state.listCursor = undefined; + state.listPriority = undefined; + }, }, }); -export const { listCursorChanged, listPriorityChanged } = queueSlice.actions; +export const { listCursorChanged, listPriorityChanged, listParamsReset } = + queueSlice.actions; export default queueSlice.reducer; diff --git a/invokeai/frontend/web/src/services/api/endpoints/queue.ts b/invokeai/frontend/web/src/services/api/endpoints/queue.ts index 26227ee696..c7a72f77ed 100644 --- a/invokeai/frontend/web/src/services/api/endpoints/queue.ts +++ b/invokeai/frontend/web/src/services/api/endpoints/queue.ts @@ -1,25 +1,41 @@ -import { EntityState, createEntityAdapter } from '@reduxjs/toolkit'; +import { + AnyAction, + EntityState, + ThunkDispatch, + createEntityAdapter, +} from '@reduxjs/toolkit'; import queryString from 'query-string'; -import { ApiTagDescription, LIST_TAG, api } from '..'; +import { ApiTagDescription, api } from '..'; import { components, paths } from '../schema'; +import { $queueId } from 'features/queue/store/nanoStores'; +import { listParamsReset } from 'features/queue/store/queueSlice'; const getListQueueItemsUrl = ( - queryArgs?: paths['/api/v1/queue/list']['get']['parameters']['query'] -) => - queryArgs - ? `queue/list?${queryString.stringify(queryArgs, { arrayFormat: 'none' })}` - : 'queue/list'; + queryArgs?: paths['/api/v1/queue/{queue_id}/list']['get']['parameters']['query'] +) => { + const query = queryArgs + ? queryString.stringify(queryArgs, { + arrayFormat: 'none', + }) + : undefined; + + if (query) { + return `queue/${$queueId.get()}/list?${query}`; + } + + return `queue/${$queueId.get()}/list`; +}; export type SessionQueueItemStatus = NonNullable< NonNullable< - paths['/api/v1/queue/list']['get']['parameters']['query'] + paths['/api/v1/queue/{queue_id}/list']['get']['parameters']['query'] >['status'] >; export const queueItemsAdapter = createEntityAdapter< components['schemas']['SessionQueueItemDTO'] >({ - selectId: (queueItem) => queueItem.id, + selectId: (queueItem) => queueItem.item_id, sortComparer: (a, b) => { // Sort by priority in descending order if (a.priority > b.priority) { @@ -30,10 +46,10 @@ export const queueItemsAdapter = createEntityAdapter< } // If priority is the same, sort by id in ascending order - if (a.id < b.id) { + if (a.order_id < b.order_id) { return -1; } - if (a.id > b.id) { + if (a.order_id > b.order_id) { return 1; } @@ -44,11 +60,11 @@ export const queueItemsAdapter = createEntityAdapter< export const queueApi = api.injectEndpoints({ endpoints: (build) => ({ enqueueBatch: build.mutation< - paths['/api/v1/queue/enqueue_batch']['post']['responses']['201']['content']['application/json'], - paths['/api/v1/queue/enqueue_batch']['post']['requestBody']['content']['application/json'] + paths['/api/v1/queue/{queue_id}/enqueue_batch']['post']['responses']['201']['content']['application/json'], + paths['/api/v1/queue/{queue_id}/enqueue_batch']['post']['requestBody']['content']['application/json'] >({ query: (arg) => ({ - url: 'queue/enqueue_batch', + url: `queue/${$queueId.get()}/enqueue_batch`, body: arg, method: 'POST', }), @@ -56,15 +72,23 @@ export const queueApi = api.injectEndpoints({ 'SessionQueueStatus', 'CurrentSessionQueueItem', 'NextSessionQueueItem', - { type: 'SessionQueueItemDTO', id: LIST_TAG }, ], + onQueryStarted: async (arg, api) => { + const { dispatch, queryFulfilled } = api; + try { + await queryFulfilled; + resetListQueryData(dispatch); + } catch { + // no-op + } + }, }), enqueueGraph: build.mutation< - paths['/api/v1/queue/enqueue']['post']['responses']['201']['content']['application/json'], - paths['/api/v1/queue/enqueue']['post']['requestBody']['content']['application/json'] + paths['/api/v1/queue/{queue_id}/enqueue']['post']['responses']['201']['content']['application/json'], + paths['/api/v1/queue/{queue_id}/enqueue']['post']['requestBody']['content']['application/json'] >({ query: (arg) => ({ - url: 'queue/enqueue', + url: `queue/${$queueId.get()}/enqueue`, body: arg, method: 'POST', }), @@ -72,233 +96,197 @@ export const queueApi = api.injectEndpoints({ 'SessionQueueStatus', 'CurrentSessionQueueItem', 'NextSessionQueueItem', - { type: 'SessionQueueItemDTO', id: LIST_TAG }, ], + onQueryStarted: async (arg, api) => { + const { dispatch, queryFulfilled } = api; + try { + await queryFulfilled; + resetListQueryData(dispatch); + } catch { + // no-op + } + }, }), startQueueExecution: build.mutation({ query: () => ({ - url: 'queue/start', + url: `queue/${$queueId.get()}/start`, method: 'PUT', }), invalidatesTags: ['SessionQueueStatus'], }), stopQueueExecution: build.mutation({ query: () => ({ - url: 'queue/stop', + url: `queue/${$queueId.get()}/stop`, method: 'PUT', }), invalidatesTags: ['SessionQueueStatus'], }), cancelQueueExecution: build.mutation({ query: () => ({ - url: 'queue/cancel', + url: `queue/${$queueId.get()}/cancel`, method: 'PUT', }), invalidatesTags: ['SessionQueueStatus'], }), pruneQueue: build.mutation< - paths['/api/v1/queue/prune']['put']['responses']['200']['content']['application/json'], + paths['/api/v1/queue/{queue_id}/prune']['put']['responses']['200']['content']['application/json'], void >({ query: () => ({ - url: 'queue/prune', + url: `queue/${$queueId.get()}/prune`, method: 'PUT', }), - invalidatesTags: [ - 'SessionQueueStatus', - { type: 'SessionQueueItemDTO', id: LIST_TAG }, - ], + invalidatesTags: ['SessionQueueStatus'], onQueryStarted: async (arg, api) => { const { dispatch, queryFulfilled } = api; - const patch = dispatch( - queueApi.util.updateQueryData( - 'listQueueItems', - undefined, - (draft) => { - const ids = queueItemsAdapter - .getSelectors() - .selectAll(draft) - .filter((item) => - ['completed', 'failed', 'canceled'].includes(item.status) - ) - .map((item) => item.id); - queueItemsAdapter.removeMany(draft, ids); - } - ) - ); try { await queryFulfilled; + resetListQueryData(dispatch); } catch { - patch.undo(); + // no-op } }, }), clearQueue: build.mutation< - paths['/api/v1/queue/clear']['put']['responses']['200']['content']['application/json'], + paths['/api/v1/queue/{queue_id}/clear']['put']['responses']['200']['content']['application/json'], void >({ query: () => ({ - url: 'queue/clear', + url: `queue/${$queueId.get()}/clear`, method: 'PUT', }), invalidatesTags: [ 'SessionQueueStatus', 'CurrentSessionQueueItem', 'NextSessionQueueItem', - { type: 'SessionQueueItemDTO', id: LIST_TAG }, ], onQueryStarted: async (arg, api) => { const { dispatch, queryFulfilled } = api; - const listPatch = dispatch( - queueApi.util.updateQueryData( - 'listQueueItems', - undefined, - (draft) => { - queueItemsAdapter.removeAll(draft); - } - ) - ); - const statusPatch = dispatch( - queueApi.util.updateQueryData( - 'getQueueStatus', - undefined, - (draft) => { - draft.started = false; - draft.canceled = 0; - draft.completed = 0; - draft.failed = 0; - draft.in_progress = 0; - draft.pending = 0; - draft.total = 0; - } - ) - ); try { await queryFulfilled; + resetListQueryData(dispatch); } catch { - listPatch.undo(); - statusPatch.undo(); + // no-op } }, }), getCurrentQueueItem: build.query< - paths['/api/v1/queue/current']['get']['responses']['200']['content']['application/json'], + paths['/api/v1/queue/{queue_id}/current']['get']['responses']['200']['content']['application/json'], void >({ query: () => ({ - url: 'queue/current', + url: `queue/${$queueId.get()}/current`, method: 'GET', }), providesTags: (result) => { const tags: ApiTagDescription[] = ['CurrentSessionQueueItem']; if (result) { - tags.push({ type: 'SessionQueueItem', id: result.id }); + tags.push({ type: 'SessionQueueItem', id: result.item_id }); } return tags; }, }), peekNextQueueItem: build.query< - paths['/api/v1/queue/peek']['get']['responses']['200']['content']['application/json'], + paths['/api/v1/queue/{queue_id}/peek']['get']['responses']['200']['content']['application/json'], void >({ query: () => ({ - url: 'queue/peek', + url: `queue/${$queueId.get()}/peek`, method: 'GET', }), providesTags: (result) => { const tags: ApiTagDescription[] = ['NextSessionQueueItem']; if (result) { - tags.push({ type: 'SessionQueueItem', id: result.id }); + tags.push({ type: 'SessionQueueItem', id: result.item_id }); } return tags; }, }), getQueueStatus: build.query< - paths['/api/v1/queue/status']['get']['responses']['200']['content']['application/json'], + paths['/api/v1/queue/{queue_id}/status']['get']['responses']['200']['content']['application/json'], void >({ query: () => ({ - url: 'queue/status', + url: `queue/${$queueId.get()}/status`, method: 'GET', }), providesTags: ['SessionQueueStatus'], }), getQueueItem: build.query< - paths['/api/v1/queue/q/{id}']['get']['responses']['200']['content']['application/json'], - paths['/api/v1/queue/q/{id}']['get']['parameters']['path']['id'] + paths['/api/v1/queue/{queue_id}/i/{item_id}']['get']['responses']['200']['content']['application/json'], + string >({ - query: (id) => ({ - url: `queue/q/${id}`, + query: (item_id) => ({ + url: `queue/${$queueId.get()}/i/${item_id}`, method: 'GET', }), providesTags: (result) => { if (!result) { return []; } - return [{ type: 'SessionQueueItem', id: result.id }]; + return [{ type: 'SessionQueueItem', id: result.item_id }]; }, }), cancelQueueItem: build.mutation< - paths['/api/v1/queue/q/{id}/cancel']['put']['responses']['200']['content']['application/json'], - paths['/api/v1/queue/q/{id}/cancel']['put']['parameters']['path']['id'] + paths['/api/v1/queue/{queue_id}/i/{item_id}/cancel']['put']['responses']['200']['content']['application/json'], + string >({ - query: (id) => ({ - url: `queue/q/${id}/cancel`, + query: (item_id) => ({ + url: `queue/${$queueId.get()}/i/${item_id}/cancel`, method: 'GET', }), }), cancelByBatchIds: build.mutation< - paths['/api/v1/queue/cancel_by_batch_ids']['put']['responses']['200']['content']['application/json'], - paths['/api/v1/queue/cancel_by_batch_ids']['put']['requestBody']['content']['application/json'] + paths['/api/v1/queue/{queue_id}/cancel_by_batch_ids']['put']['responses']['200']['content']['application/json'], + paths['/api/v1/queue/{queue_id}/cancel_by_batch_ids']['put']['requestBody']['content']['application/json'] >({ query: (body) => ({ - url: `queue/cancel_by_batch_ids`, + url: `queue/${$queueId.get()}/cancel_by_batch_ids`, method: 'PUT', body, }), - invalidatesTags: [ - 'SessionQueueStatus', - 'CurrentSessionQueueItem', - 'NextSessionQueueItem', - { type: 'SessionQueueItemDTO', id: LIST_TAG }, - ], + onQueryStarted: async (arg, api) => { + const { dispatch, queryFulfilled } = api; + try { + await queryFulfilled; + resetListQueryData(dispatch); + } catch { + // no-op + } + }, }), listQueueItems: build.query< - EntityState, + EntityState & { + has_more: boolean; + }, { cursor?: number; priority?: number } | undefined >({ query: (queryArgs) => ({ url: getListQueueItemsUrl(queryArgs), method: 'GET', }), - providesTags: (result) => { - if (!result) { - return []; - } - return [{ type: 'SessionQueueItemDTO', id: LIST_TAG }]; - }, serializeQueryArgs: () => { - return 'queue/list'; + return `queue/${$queueId.get()}/list`; }, transformResponse: ( response: components['schemas']['CursorPaginatedResults_SessionQueueItemDTO_'] ) => queueItemsAdapter.addMany( - queueItemsAdapter.getInitialState(), + queueItemsAdapter.getInitialState({ + has_more: response.has_more, + }), response.items ), merge: (cache, response) => { + console.log(cache); queueItemsAdapter.addMany( cache, queueItemsAdapter.getSelectors().selectAll(response) ); + cache.has_more = response.has_more; }, - forceRefetch: ({ currentArg, previousArg }) => { - return ( - currentArg?.cursor !== previousArg?.cursor || - currentArg?.priority !== previousArg?.priority - ); - }, + forceRefetch: ({ currentArg, previousArg }) => currentArg !== previousArg, + keepUnusedDataFor: 60 * 5, // 5 minutes }), }), }); @@ -318,3 +306,19 @@ export const { usePeekNextQueueItemQuery, useListQueueItemsQuery, } = queueApi; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const resetListQueryData = (dispatch: ThunkDispatch) => { + dispatch( + queueApi.util.updateQueryData('listQueueItems', undefined, (draft) => { + // remove all items from the list + queueItemsAdapter.removeAll(draft); + // reset the has_more flag + draft.has_more = false; + }) + ); + // set the list cursor and priority to undefined + dispatch(listParamsReset()); + // we have to manually kick off another query to get the first page and re-initialize the list + dispatch(queueApi.endpoints.listQueueItems.initiate(undefined)); +}; diff --git a/invokeai/frontend/web/src/services/api/schema.d.ts b/invokeai/frontend/web/src/services/api/schema.d.ts index 1e2a838cc8..14766faad1 100644 --- a/invokeai/frontend/web/src/services/api/schema.d.ts +++ b/invokeai/frontend/web/src/services/api/schema.d.ts @@ -307,98 +307,98 @@ export type paths = { */ post: operations["set_log_level"]; }; - "/api/v1/queue/enqueue": { + "/api/v1/queue/{queue_id}/enqueue": { /** * Enqueue * @description Enqueues a graph for single execution. */ post: operations["enqueue"]; }; - "/api/v1/queue/enqueue_batch": { + "/api/v1/queue/{queue_id}/enqueue_batch": { /** * Enqueue Batch * @description Processes a batch and enqueues the output graphs for execution. */ post: operations["enqueue_batch"]; }; - "/api/v1/queue/list": { + "/api/v1/queue/{queue_id}/list": { /** * List Queue Items * @description Gets all queue items (without graphs) */ get: operations["list_queue_items"]; }; - "/api/v1/queue/start": { + "/api/v1/queue/{queue_id}/start": { /** * Start * @description Starts session queue execution */ put: operations["start"]; }; - "/api/v1/queue/stop": { + "/api/v1/queue/{queue_id}/stop": { /** * Stop * @description Stops session queue execution, waiting for the currently executing session to finish */ put: operations["stop"]; }; - "/api/v1/queue/cancel": { + "/api/v1/queue/{queue_id}/cancel": { /** * Cancel * @description Stops session queue execution, immediately canceling the currently-executing session */ put: operations["cancel"]; }; - "/api/v1/queue/cancel_by_batch_ids": { + "/api/v1/queue/{queue_id}/cancel_by_batch_ids": { /** * Cancel By Batch Ids * @description Immediately cancels all queue items from the given batch ids */ put: operations["cancel_by_batch_ids"]; }; - "/api/v1/queue/clear": { + "/api/v1/queue/{queue_id}/clear": { /** * Clear * @description Clears the queue entirely, immediately canceling the currently-executing session */ put: operations["clear"]; }; - "/api/v1/queue/prune": { + "/api/v1/queue/{queue_id}/prune": { /** * Prune * @description Prunes all completed or errored queue items */ put: operations["prune"]; }; - "/api/v1/queue/current": { + "/api/v1/queue/{queue_id}/current": { /** * Current * @description Gets the currently execution queue item */ get: operations["current"]; }; - "/api/v1/queue/peek": { + "/api/v1/queue/{queue_id}/peek": { /** * Peek * @description Gets the next queue item, without executing it */ get: operations["peek"]; }; - "/api/v1/queue/status": { + "/api/v1/queue/{queue_id}/status": { /** * Get Status * @description Gets the status of the session queue */ get: operations["get_status"]; }; - "/api/v1/queue/q/{id}": { + "/api/v1/queue/{queue_id}/i/{item_id}": { /** * Get Queue Item * @description Gets a queue item */ get: operations["get_queue_item"]; }; - "/api/v1/queue/q/{id}/cancel": { + "/api/v1/queue/{queue_id}/i/{item_id}/cancel": { /** * Cancel Queue Item * @description Deletes a queue item @@ -6874,6 +6874,11 @@ export type components = { * @description Whether the session queue is pending a stop */ stop_after_current: boolean; + /** + * Queue Id + * @description The ID of the queue + */ + queue_id: string; /** * Pending * @description Number of queue items with status 'pending' @@ -6916,10 +6921,15 @@ export type components = { */ SessionQueueItem: { /** - * Id - * @description The ID of the session queue item + * Item Id + * @description The unique identifier of the session queue item */ - id: number; + item_id: string; + /** + * Order Id + * @description The auto-incrementing ID of the session queue item + */ + order_id: number; /** * Status * @description The status of this queue item @@ -6948,6 +6958,11 @@ export type components = { * @description The field values that were used for this queue item */ field_values?: components["schemas"]["NodeFieldValue"][]; + /** + * Queue Id + * @description The id of the queue with which this item is associated + */ + queue_id: string; /** * Error * @description The error message if this queue item errored @@ -6980,10 +6995,15 @@ export type components = { */ SessionQueueItemDTO: { /** - * Id - * @description The ID of the session queue item + * Item Id + * @description The unique identifier of the session queue item */ - id: number; + item_id: string; + /** + * Order Id + * @description The auto-incrementing ID of the session queue item + */ + order_id: number; /** * Status * @description The status of this queue item @@ -7012,6 +7032,11 @@ export type components = { * @description The field values that were used for this queue item */ field_values?: components["schemas"]["NodeFieldValue"][]; + /** + * Queue Id + * @description The id of the queue with which this item is associated + */ + queue_id: string; /** * Error * @description The error message if this queue item errored @@ -9463,6 +9488,12 @@ export type operations = { * @description Enqueues a graph for single execution. */ enqueue: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; requestBody: { content: { "application/json": components["schemas"]["Body_enqueue"]; @@ -9494,6 +9525,12 @@ export type operations = { * @description Processes a batch and enqueues the output graphs for execution. */ enqueue_batch: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; requestBody: { content: { "application/json": components["schemas"]["Body_enqueue_batch"]; @@ -9536,6 +9573,10 @@ export type operations = { /** @description The pagination cursor priority */ priority?: number; }; + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; }; responses: { /** @description Successful Response */ @@ -9557,6 +9598,12 @@ export type operations = { * @description Starts session queue execution */ start: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; responses: { /** @description Successful Response */ 200: { @@ -9564,6 +9611,12 @@ export type operations = { "application/json": unknown; }; }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; }; }; /** @@ -9571,6 +9624,12 @@ export type operations = { * @description Stops session queue execution, waiting for the currently executing session to finish */ stop: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; responses: { /** @description Successful Response */ 200: { @@ -9578,6 +9637,12 @@ export type operations = { "application/json": unknown; }; }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; }; }; /** @@ -9585,6 +9650,12 @@ export type operations = { * @description Stops session queue execution, immediately canceling the currently-executing session */ cancel: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; responses: { /** @description Successful Response */ 200: { @@ -9592,6 +9663,12 @@ export type operations = { "application/json": unknown; }; }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; }; }; /** @@ -9599,6 +9676,12 @@ export type operations = { * @description Immediately cancels all queue items from the given batch ids */ cancel_by_batch_ids: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; requestBody: { content: { "application/json": components["schemas"]["Body_cancel_by_batch_ids"]; @@ -9624,6 +9707,12 @@ export type operations = { * @description Clears the queue entirely, immediately canceling the currently-executing session */ clear: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; responses: { /** @description Successful Response */ 200: { @@ -9631,6 +9720,12 @@ export type operations = { "application/json": components["schemas"]["ClearResult"]; }; }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; }; }; /** @@ -9638,6 +9733,12 @@ export type operations = { * @description Prunes all completed or errored queue items */ prune: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; responses: { /** @description Successful Response */ 200: { @@ -9645,6 +9746,12 @@ export type operations = { "application/json": components["schemas"]["PruneResult"]; }; }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; }; }; /** @@ -9652,6 +9759,12 @@ export type operations = { * @description Gets the currently execution queue item */ current: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; responses: { /** @description Successful Response */ 200: { @@ -9659,6 +9772,12 @@ export type operations = { "application/json": components["schemas"]["SessionQueueItem"]; }; }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; }; }; /** @@ -9666,6 +9785,12 @@ export type operations = { * @description Gets the next queue item, without executing it */ peek: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; responses: { /** @description Successful Response */ 200: { @@ -9673,6 +9798,12 @@ export type operations = { "application/json": components["schemas"]["SessionQueueItem"]; }; }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; }; }; /** @@ -9680,6 +9811,12 @@ export type operations = { * @description Gets the status of the session queue */ get_status: { + parameters: { + path: { + /** @description The queue id to perform this operation on */ + queue_id: string; + }; + }; responses: { /** @description Successful Response */ 200: { @@ -9687,6 +9824,12 @@ export type operations = { "application/json": components["schemas"]["SessionQueueAndExecutionStatusResult"]; }; }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; }; }; /** @@ -9696,8 +9839,10 @@ export type operations = { get_queue_item: { parameters: { path: { + /** @description The queue id to perform this operation on */ + queue_id: string; /** @description The queue item to get */ - id: number; + item_id: string; }; }; responses: { @@ -9722,8 +9867,10 @@ export type operations = { cancel_queue_item: { parameters: { path: { + /** @description The queue id to perform this operation on */ + queue_id: string; /** @description The queue item to cancel */ - id: number; + item_id: string; }; }; responses: { diff --git a/invokeai/frontend/web/src/services/api/types.ts b/invokeai/frontend/web/src/services/api/types.ts index fee527f161..147a04c603 100644 --- a/invokeai/frontend/web/src/services/api/types.ts +++ b/invokeai/frontend/web/src/services/api/types.ts @@ -23,7 +23,7 @@ export type UpdateBoardArg = }; export type BatchConfig = - paths['/api/v1/queue/enqueue_batch']['post']['requestBody']['content']['application/json']; + paths['/api/v1/queue/{queue_id}/enqueue_batch']['post']['requestBody']['content']['application/json']; /** * This is an unsafe type; the object inside is not guaranteed to be valid. diff --git a/invokeai/frontend/web/src/services/events/types.ts b/invokeai/frontend/web/src/services/events/types.ts index 5dc50a8566..5f68439ced 100644 --- a/invokeai/frontend/web/src/services/events/types.ts +++ b/invokeai/frontend/web/src/services/events/types.ts @@ -141,7 +141,8 @@ export type InvocationRetrievalErrorEvent = { * @example socket.on('queue_item_status_changed', (data: QueueItemStatusChangedEvent) => { ... } */ export type QueueItemStatusChangedEvent = { - id: number; + queue_id: string; + item_id: string; graph_execution_state_id: string; batch_id: string; status: components['schemas']['SessionQueueItemDTO']['status']; @@ -164,9 +165,13 @@ export type ClientEmitUnsubscribeSession = { session: string; }; -export type ClientEmitSubscribeQueue = void; +export type ClientEmitSubscribeQueue = { + queue_id: string; +}; -export type ClientEmitUnsubscribeQueue = void; +export type ClientEmitUnsubscribeQueue = { + queue_id: string; +}; export type ServerToClientEvents = { generator_progress: (payload: GeneratorProgressEvent) => void; @@ -181,7 +186,6 @@ export type ServerToClientEvents = { session_retrieval_error: (payload: SessionRetrievalErrorEvent) => void; invocation_retrieval_error: (payload: InvocationRetrievalErrorEvent) => void; queue_item_status_changed: (payload: QueueItemStatusChangedEvent) => void; - queue_status_changed: (payload: QueueStatusChangedEvent) => void; }; export type ClientToServerEvents = { diff --git a/invokeai/frontend/web/src/services/events/util/setEventListeners.ts b/invokeai/frontend/web/src/services/events/util/setEventListeners.ts index 4f4e8adad1..84a9ecfa6b 100644 --- a/invokeai/frontend/web/src/services/events/util/setEventListeners.ts +++ b/invokeai/frontend/web/src/services/events/util/setEventListeners.ts @@ -1,6 +1,7 @@ import { MiddlewareAPI } from '@reduxjs/toolkit'; import { logger } from 'app/logging/logger'; import { AppDispatch, RootState } from 'app/store/store'; +import { $queueId } from 'features/queue/store/nanoStores'; import { addToast } from 'features/system/store/systemSlice'; import { makeToast } from 'features/system/util/makeToast'; import { Socket } from 'socket.io-client'; @@ -16,7 +17,6 @@ import { socketModelLoadCompleted, socketModelLoadStarted, socketQueueItemStatusChanged, - socketQueueStatusChanged, socketSessionRetrievalError, } from '../actions'; import { ClientToServerEvents, ServerToClientEvents } from '../types'; @@ -38,8 +38,8 @@ export const setEventListeners = (arg: SetEventListenersArg) => { log.debug('Connected'); dispatch(socketConnected()); - - socket.emit('subscribe_queue'); + const queue_id = $queueId.get(); + socket.emit('subscribe_queue', { queue_id }); }); socket.on('connect_error', (error) => { @@ -169,8 +169,4 @@ export const setEventListeners = (arg: SetEventListenersArg) => { } dispatch(socketQueueItemStatusChanged({ data })); }); - - socket.on('queue_status_changed', (data) => { - dispatch(socketQueueStatusChanged({ data })); - }); };