Files
InvokeAI/invokeai/app/services/session_queue/session_queue_base.py
psychedelicious 402cf9b0ee feat: refactor services folder/module structure
Refactor services folder/module structure.

**Motivation**

While working on our services I've repeatedly encountered circular imports and a general lack of clarity regarding where to put things. The structure introduced goes a long way towards resolving those issues, setting us up for a clean structure going forward.

**Services**

Services are now in their own folder with a few files:

- `services/{service_name}/__init__.py`: init as needed, mostly empty now
- `services/{service_name}/{service_name}_base.py`: the base class for the service
- `services/{service_name}/{service_name}_{impl_type}.py`: the default concrete implementation of the service - typically one of `sqlite`, `default`, or `memory`
- `services/{service_name}/{service_name}_common.py`: any common items - models, exceptions, utilities, etc

Though it's a bit verbose to have the service name both as the folder name and the prefix for files, I found it is _extremely_ confusing to have all of the base classes just be named `base.py`. So, at the cost of some verbosity when importing things, I've included the service name in the filename.

There are some minor logic changes. For example, in `InvocationProcessor`, instead of assigning the model manager service to a variable to be used later in the file, the service is used directly via the `Invoker`.

**Shared**

Things that are used across disparate services are in `services/shared/`:

- `default_graphs.py`: previously in `services/`
- `graphs.py`: previously in `services/`
- `paginatation`: generic pagination models used in a few services
- `sqlite`: the `SqliteDatabase` class, other sqlite-specific things
2023-10-12 12:15:06 -04:00

113 lines
3.3 KiB
Python

from abc import ABC, abstractmethod
from typing import Optional
from invokeai.app.services.session_queue.session_queue_common import (
QUEUE_ITEM_STATUS,
Batch,
BatchStatus,
CancelByBatchIDsResult,
CancelByQueueIDResult,
ClearResult,
EnqueueBatchResult,
EnqueueGraphResult,
IsEmptyResult,
IsFullResult,
PruneResult,
SessionQueueItem,
SessionQueueItemDTO,
SessionQueueStatus,
)
from invokeai.app.services.shared.graph import Graph
from invokeai.app.services.shared.pagination import CursorPaginatedResults
class SessionQueueBase(ABC):
"""Base class for session queue"""
@abstractmethod
def dequeue(self) -> Optional[SessionQueueItem]:
"""Dequeues the next session queue item."""
pass
@abstractmethod
def enqueue_graph(self, queue_id: str, graph: Graph, prepend: bool) -> EnqueueGraphResult:
"""Enqueues a single graph for execution."""
pass
@abstractmethod
def enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueBatchResult:
"""Enqueues all permutations of a batch for execution."""
pass
@abstractmethod
def get_current(self, queue_id: str) -> Optional[SessionQueueItem]:
"""Gets the currently-executing session queue item"""
pass
@abstractmethod
def get_next(self, queue_id: str) -> Optional[SessionQueueItem]:
"""Gets the next session queue item (does not dequeue it)"""
pass
@abstractmethod
def clear(self, queue_id: str) -> ClearResult:
"""Deletes all session queue items"""
pass
@abstractmethod
def prune(self, queue_id: str) -> PruneResult:
"""Deletes all completed and errored session queue items"""
pass
@abstractmethod
def is_empty(self, queue_id: str) -> IsEmptyResult:
"""Checks if the queue is empty"""
pass
@abstractmethod
def is_full(self, queue_id: str) -> IsFullResult:
"""Checks if the queue is empty"""
pass
@abstractmethod
def get_queue_status(self, queue_id: str) -> SessionQueueStatus:
"""Gets the status of the queue"""
pass
@abstractmethod
def get_batch_status(self, queue_id: str, batch_id: str) -> BatchStatus:
"""Gets the status of a batch"""
pass
@abstractmethod
def cancel_queue_item(self, item_id: int, error: Optional[str] = None) -> SessionQueueItem:
"""Cancels a session queue item"""
pass
@abstractmethod
def cancel_by_batch_ids(self, queue_id: str, batch_ids: list[str]) -> CancelByBatchIDsResult:
"""Cancels all queue items with matching batch IDs"""
pass
@abstractmethod
def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult:
"""Cancels all queue items with matching queue ID"""
pass
@abstractmethod
def list_queue_items(
self,
queue_id: str,
limit: int,
priority: int,
cursor: Optional[int] = None,
status: Optional[QUEUE_ITEM_STATUS] = None,
) -> CursorPaginatedResults[SessionQueueItemDTO]:
"""Gets a page of session queue items"""
pass
@abstractmethod
def get_queue_item(self, item_id: int) -> SessionQueueItem:
"""Gets a session queue item by ID"""
pass