mirror of
https://github.com/invoke-ai/InvokeAI.git
synced 2026-01-23 00:38:08 -05:00
There's no longer any need for session-scoped events now that we have the session queue. Session started/completed/canceled map 1-to-1 to queue item status events, but queue item status events also have an event for failed state. We can simplify queue and processor handling substantially by removing session events and instead using queue item events. - Remove the session-scoped events entirely. - Remove all event handling from session queue. The processor still needs to respond to some events from the queue: `QueueClearedEvent`, `BatchEnqueuedEvent` and `QueueItemStatusChangedEvent`. - Pass an `is_canceled` callback to the invocation context instead of the cancel event - Update processor logic to ensure the local instance of the current queue item is synced with the instance in the database. This prevents race conditions and ensures lifecycle callback do not get stale callbacks. - Update docstrings and comments - Add `complete_queue_item` method to session queue service as an explicit way to mark a queue item as successfully completed. Previously, the queue listened for session complete events to do this. Closes #6442
124 lines
3.6 KiB
Python
124 lines
3.6 KiB
Python
from abc import ABC, abstractmethod
|
|
from typing import Optional
|
|
|
|
from invokeai.app.services.session_queue.session_queue_common import (
|
|
QUEUE_ITEM_STATUS,
|
|
Batch,
|
|
BatchStatus,
|
|
CancelByBatchIDsResult,
|
|
CancelByQueueIDResult,
|
|
ClearResult,
|
|
EnqueueBatchResult,
|
|
IsEmptyResult,
|
|
IsFullResult,
|
|
PruneResult,
|
|
SessionQueueItem,
|
|
SessionQueueItemDTO,
|
|
SessionQueueStatus,
|
|
)
|
|
from invokeai.app.services.shared.graph import GraphExecutionState
|
|
from invokeai.app.services.shared.pagination import CursorPaginatedResults
|
|
|
|
|
|
class SessionQueueBase(ABC):
|
|
"""Base class for session queue"""
|
|
|
|
@abstractmethod
|
|
def dequeue(self) -> Optional[SessionQueueItem]:
|
|
"""Dequeues the next session queue item."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueBatchResult:
|
|
"""Enqueues all permutations of a batch for execution."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_current(self, queue_id: str) -> Optional[SessionQueueItem]:
|
|
"""Gets the currently-executing session queue item"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_next(self, queue_id: str) -> Optional[SessionQueueItem]:
|
|
"""Gets the next session queue item (does not dequeue it)"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def clear(self, queue_id: str) -> ClearResult:
|
|
"""Deletes all session queue items"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def prune(self, queue_id: str) -> PruneResult:
|
|
"""Deletes all completed and errored session queue items"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def is_empty(self, queue_id: str) -> IsEmptyResult:
|
|
"""Checks if the queue is empty"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def is_full(self, queue_id: str) -> IsFullResult:
|
|
"""Checks if the queue is empty"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_queue_status(self, queue_id: str) -> SessionQueueStatus:
|
|
"""Gets the status of the queue"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_batch_status(self, queue_id: str, batch_id: str) -> BatchStatus:
|
|
"""Gets the status of a batch"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def complete_queue_item(self, item_id: int) -> SessionQueueItem:
|
|
"""Completes a session queue item"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def cancel_queue_item(self, item_id: int) -> SessionQueueItem:
|
|
"""Cancels a session queue item"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def fail_queue_item(
|
|
self, item_id: int, error_type: str, error_message: str, error_traceback: str
|
|
) -> SessionQueueItem:
|
|
"""Fails a session queue item"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def cancel_by_batch_ids(self, queue_id: str, batch_ids: list[str]) -> CancelByBatchIDsResult:
|
|
"""Cancels all queue items with matching batch IDs"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult:
|
|
"""Cancels all queue items with matching queue ID"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def list_queue_items(
|
|
self,
|
|
queue_id: str,
|
|
limit: int,
|
|
priority: int,
|
|
cursor: Optional[int] = None,
|
|
status: Optional[QUEUE_ITEM_STATUS] = None,
|
|
) -> CursorPaginatedResults[SessionQueueItemDTO]:
|
|
"""Gets a page of session queue items"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_queue_item(self, item_id: int) -> SessionQueueItem:
|
|
"""Gets a session queue item by ID"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_queue_item_session(self, item_id: int, session: GraphExecutionState) -> SessionQueueItem:
|
|
"""Sets the session for a session queue item. Use this to update the session state."""
|
|
pass
|