from typing import Optional from fastapi import Body, HTTPException, Path, Query from fastapi.routing import APIRouter from pydantic import BaseModel from invokeai.app.api.auth_dependencies import AdminUserOrDefault, CurrentUserOrDefault from invokeai.app.api.dependencies import ApiDependencies from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatus from invokeai.app.services.session_queue.session_queue_common import ( Batch, BatchStatus, CancelAllExceptCurrentResult, CancelByBatchIDsResult, CancelByDestinationResult, ClearResult, DeleteAllExceptCurrentResult, DeleteByDestinationResult, EnqueueBatchResult, ItemIdsResult, PruneResult, RetryItemsResult, SessionQueueCountsByDestination, SessionQueueItem, SessionQueueItemNotFoundError, SessionQueueStatus, ) from invokeai.app.services.shared.graph import Graph, GraphExecutionState from invokeai.app.services.shared.sqlite.sqlite_common import SQLiteDirection session_queue_router = APIRouter(prefix="/v1/queue", tags=["queue"]) class SessionQueueAndProcessorStatus(BaseModel): """The overall status of session queue and processor""" queue: SessionQueueStatus processor: SessionProcessorStatus def sanitize_queue_item_for_user( queue_item: SessionQueueItem, current_user_id: str, is_admin: bool ) -> SessionQueueItem: """Sanitize queue item for non-admin users viewing other users' items. For non-admin users viewing queue items belonging to other users, only timestamps, status, and error information are exposed. All other fields (user identity, generation parameters, graphs, workflows) are stripped. Args: queue_item: The queue item to sanitize current_user_id: The ID of the current user viewing the item is_admin: Whether the current user is an admin Returns: The sanitized queue item (sensitive fields cleared if necessary) """ # Admins and item owners can see everything if is_admin or queue_item.user_id == current_user_id: return queue_item # For non-admins viewing other users' items, strip everything except # item_id, queue_id, status, and timestamps sanitized_item = queue_item.model_copy(deep=False) sanitized_item.user_id = "redacted" sanitized_item.user_display_name = None sanitized_item.user_email = None sanitized_item.batch_id = "redacted" sanitized_item.session_id = "redacted" sanitized_item.origin = None sanitized_item.destination = None sanitized_item.priority = 0 sanitized_item.field_values = None sanitized_item.retried_from_item_id = None sanitized_item.workflow = None sanitized_item.error_type = None sanitized_item.error_message = None sanitized_item.error_traceback = None sanitized_item.session = GraphExecutionState( id="redacted", graph=Graph(), ) return sanitized_item @session_queue_router.post( "/{queue_id}/enqueue_batch", operation_id="enqueue_batch", responses={ 201: {"model": EnqueueBatchResult}, }, ) async def enqueue_batch( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), batch: Batch = Body(description="Batch to process"), prepend: bool = Body(default=False, description="Whether or not to prepend this batch in the queue"), ) -> EnqueueBatchResult: """Processes a batch and enqueues the output graphs for execution for the current user.""" try: return await ApiDependencies.invoker.services.session_queue.enqueue_batch( queue_id=queue_id, batch=batch, prepend=prepend, user_id=current_user.user_id ) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while enqueuing batch: {e}") @session_queue_router.get( "/{queue_id}/list_all", operation_id="list_all_queue_items", responses={ 200: {"model": list[SessionQueueItem]}, }, ) async def list_all_queue_items( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), destination: Optional[str] = Query(default=None, description="The destination of queue items to fetch"), ) -> list[SessionQueueItem]: """Gets all queue items""" try: items = ApiDependencies.invoker.services.session_queue.list_all_queue_items( queue_id=queue_id, destination=destination, ) # Sanitize items for non-admin users return [sanitize_queue_item_for_user(item, current_user.user_id, current_user.is_admin) for item in items] except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while listing all queue items: {e}") @session_queue_router.get( "/{queue_id}/item_ids", operation_id="get_queue_item_ids", responses={ 200: {"model": ItemIdsResult}, }, ) async def get_queue_item_ids( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), order_dir: SQLiteDirection = Query(default=SQLiteDirection.Descending, description="The order of sort"), ) -> ItemIdsResult: """Gets all queue item ids that match the given parameters. Non-admin users only see their own items.""" try: user_id = None if current_user.is_admin else current_user.user_id return ApiDependencies.invoker.services.session_queue.get_queue_item_ids( queue_id=queue_id, order_dir=order_dir, user_id=user_id ) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while listing all queue item ids: {e}") @session_queue_router.post( "/{queue_id}/items_by_ids", operation_id="get_queue_items_by_item_ids", responses={200: {"model": list[SessionQueueItem]}}, ) async def get_queue_items_by_item_ids( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), item_ids: list[int] = Body( embed=True, description="Object containing list of queue item ids to fetch queue items for" ), ) -> list[SessionQueueItem]: """Gets queue items for the specified queue item ids. Maintains order of item ids.""" try: session_queue_service = ApiDependencies.invoker.services.session_queue # Fetch queue items preserving the order of requested item ids queue_items: list[SessionQueueItem] = [] for item_id in item_ids: try: queue_item = session_queue_service.get_queue_item(item_id=item_id) if queue_item.queue_id != queue_id: # Auth protection for items from other queues continue # Sanitize item for non-admin users sanitized_item = sanitize_queue_item_for_user(queue_item, current_user.user_id, current_user.is_admin) queue_items.append(sanitized_item) except Exception: # Skip missing queue items - they may have been deleted between item id fetch and queue item fetch continue return queue_items except Exception: raise HTTPException(status_code=500, detail="Failed to get queue items") @session_queue_router.put( "/{queue_id}/processor/resume", operation_id="resume", responses={200: {"model": SessionProcessorStatus}}, ) async def resume( current_user: AdminUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> SessionProcessorStatus: """Resumes session processor. Admin only.""" try: return ApiDependencies.invoker.services.session_processor.resume() except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while resuming queue: {e}") @session_queue_router.put( "/{queue_id}/processor/pause", operation_id="pause", responses={200: {"model": SessionProcessorStatus}}, ) async def pause( current_user: AdminUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> SessionProcessorStatus: """Pauses session processor. Admin only.""" try: return ApiDependencies.invoker.services.session_processor.pause() except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while pausing queue: {e}") @session_queue_router.put( "/{queue_id}/cancel_all_except_current", operation_id="cancel_all_except_current", responses={200: {"model": CancelAllExceptCurrentResult}}, ) async def cancel_all_except_current( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> CancelAllExceptCurrentResult: """Immediately cancels all queue items except in-processing items. Non-admin users can only cancel their own items.""" try: # Admin users can cancel all items, non-admin users can only cancel their own user_id = None if current_user.is_admin else current_user.user_id return ApiDependencies.invoker.services.session_queue.cancel_all_except_current( queue_id=queue_id, user_id=user_id ) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while canceling all except current: {e}") @session_queue_router.put( "/{queue_id}/delete_all_except_current", operation_id="delete_all_except_current", responses={200: {"model": DeleteAllExceptCurrentResult}}, ) async def delete_all_except_current( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> DeleteAllExceptCurrentResult: """Immediately deletes all queue items except in-processing items. Non-admin users can only delete their own items.""" try: # Admin users can delete all items, non-admin users can only delete their own user_id = None if current_user.is_admin else current_user.user_id return ApiDependencies.invoker.services.session_queue.delete_all_except_current( queue_id=queue_id, user_id=user_id ) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while deleting all except current: {e}") @session_queue_router.put( "/{queue_id}/cancel_by_batch_ids", operation_id="cancel_by_batch_ids", responses={200: {"model": CancelByBatchIDsResult}}, ) async def cancel_by_batch_ids( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), batch_ids: list[str] = Body(description="The list of batch_ids to cancel all queue items for", embed=True), ) -> CancelByBatchIDsResult: """Immediately cancels all queue items from the given batch ids. Non-admin users can only cancel their own items.""" try: # Admin users can cancel all items, non-admin users can only cancel their own user_id = None if current_user.is_admin else current_user.user_id return ApiDependencies.invoker.services.session_queue.cancel_by_batch_ids( queue_id=queue_id, batch_ids=batch_ids, user_id=user_id ) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while canceling by batch id: {e}") @session_queue_router.put( "/{queue_id}/cancel_by_destination", operation_id="cancel_by_destination", responses={200: {"model": CancelByDestinationResult}}, ) async def cancel_by_destination( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), destination: str = Query(description="The destination to cancel all queue items for"), ) -> CancelByDestinationResult: """Immediately cancels all queue items with the given destination. Non-admin users can only cancel their own items.""" try: # Admin users can cancel all items, non-admin users can only cancel their own user_id = None if current_user.is_admin else current_user.user_id return ApiDependencies.invoker.services.session_queue.cancel_by_destination( queue_id=queue_id, destination=destination, user_id=user_id ) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while canceling by destination: {e}") @session_queue_router.put( "/{queue_id}/retry_items_by_id", operation_id="retry_items_by_id", responses={200: {"model": RetryItemsResult}}, ) async def retry_items_by_id( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), item_ids: list[int] = Body(description="The queue item ids to retry"), ) -> RetryItemsResult: """Retries the given queue items. Users can only retry their own items unless they are an admin.""" try: # Check authorization: user must own all items or be an admin if not current_user.is_admin: for item_id in item_ids: try: queue_item = ApiDependencies.invoker.services.session_queue.get_queue_item(item_id) if queue_item.user_id != current_user.user_id: raise HTTPException( status_code=403, detail=f"You do not have permission to retry queue item {item_id}" ) except SessionQueueItemNotFoundError: # Skip items that don't exist - they will be handled by retry_items_by_id continue return ApiDependencies.invoker.services.session_queue.retry_items_by_id(queue_id=queue_id, item_ids=item_ids) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while retrying queue items: {e}") @session_queue_router.put( "/{queue_id}/clear", operation_id="clear", responses={ 200: {"model": ClearResult}, }, ) async def clear( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> ClearResult: """Clears the queue entirely. Admin users clear all items; non-admin users only clear their own items. If there's a currently-executing item, users can only cancel it if they own it or are an admin.""" try: queue_item = ApiDependencies.invoker.services.session_queue.get_current(queue_id) if queue_item is not None: # Check authorization for canceling the current item if queue_item.user_id != current_user.user_id and not current_user.is_admin: raise HTTPException( status_code=403, detail="You do not have permission to cancel the currently executing queue item" ) ApiDependencies.invoker.services.session_queue.cancel_queue_item(queue_item.item_id) # Admin users can clear all items, non-admin users can only clear their own user_id = None if current_user.is_admin else current_user.user_id clear_result = ApiDependencies.invoker.services.session_queue.clear(queue_id, user_id=user_id) return clear_result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while clearing queue: {e}") @session_queue_router.put( "/{queue_id}/prune", operation_id="prune", responses={ 200: {"model": PruneResult}, }, ) async def prune( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> PruneResult: """Prunes all completed or errored queue items. Non-admin users can only prune their own items.""" try: # Admin users can prune all items, non-admin users can only prune their own user_id = None if current_user.is_admin else current_user.user_id return ApiDependencies.invoker.services.session_queue.prune(queue_id, user_id=user_id) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while pruning queue: {e}") @session_queue_router.get( "/{queue_id}/current", operation_id="get_current_queue_item", responses={ 200: {"model": Optional[SessionQueueItem]}, }, ) async def get_current_queue_item( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> Optional[SessionQueueItem]: """Gets the currently execution queue item""" try: item = ApiDependencies.invoker.services.session_queue.get_current(queue_id) if item is not None: item = sanitize_queue_item_for_user(item, current_user.user_id, current_user.is_admin) return item except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while getting current queue item: {e}") @session_queue_router.get( "/{queue_id}/next", operation_id="get_next_queue_item", responses={ 200: {"model": Optional[SessionQueueItem]}, }, ) async def get_next_queue_item( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> Optional[SessionQueueItem]: """Gets the next queue item, without executing it""" try: item = ApiDependencies.invoker.services.session_queue.get_next(queue_id) if item is not None: item = sanitize_queue_item_for_user(item, current_user.user_id, current_user.is_admin) return item except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while getting next queue item: {e}") @session_queue_router.get( "/{queue_id}/status", operation_id="get_queue_status", responses={ 200: {"model": SessionQueueAndProcessorStatus}, }, ) async def get_queue_status( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> SessionQueueAndProcessorStatus: """Gets the status of the session queue. Non-admin users see only their own counts and cannot see current item details unless they own it.""" try: user_id = None if current_user.is_admin else current_user.user_id queue = ApiDependencies.invoker.services.session_queue.get_queue_status(queue_id, user_id=user_id) processor = ApiDependencies.invoker.services.session_processor.get_status() return SessionQueueAndProcessorStatus(queue=queue, processor=processor) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while getting queue status: {e}") @session_queue_router.get( "/{queue_id}/b/{batch_id}/status", operation_id="get_batch_status", responses={ 200: {"model": BatchStatus}, }, ) async def get_batch_status( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), batch_id: str = Path(description="The batch to get the status of"), ) -> BatchStatus: """Gets the status of a batch. Non-admin users only see their own batches.""" try: user_id = None if current_user.is_admin else current_user.user_id return ApiDependencies.invoker.services.session_queue.get_batch_status( queue_id=queue_id, batch_id=batch_id, user_id=user_id ) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while getting batch status: {e}") @session_queue_router.get( "/{queue_id}/i/{item_id}", operation_id="get_queue_item", responses={ 200: {"model": SessionQueueItem}, }, response_model_exclude_none=True, ) async def get_queue_item( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), item_id: int = Path(description="The queue item to get"), ) -> SessionQueueItem: """Gets a queue item""" try: queue_item = ApiDependencies.invoker.services.session_queue.get_queue_item(item_id=item_id) if queue_item.queue_id != queue_id: raise HTTPException(status_code=404, detail=f"Queue item with id {item_id} not found in queue {queue_id}") # Sanitize item for non-admin users return sanitize_queue_item_for_user(queue_item, current_user.user_id, current_user.is_admin) except SessionQueueItemNotFoundError: raise HTTPException(status_code=404, detail=f"Queue item with id {item_id} not found in queue {queue_id}") except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while fetching queue item: {e}") @session_queue_router.delete( "/{queue_id}/i/{item_id}", operation_id="delete_queue_item", ) async def delete_queue_item( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), item_id: int = Path(description="The queue item to delete"), ) -> None: """Deletes a queue item. Users can only delete their own items unless they are an admin.""" try: # Get the queue item to check ownership queue_item = ApiDependencies.invoker.services.session_queue.get_queue_item(item_id) # Check authorization: user must own the item or be an admin if queue_item.user_id != current_user.user_id and not current_user.is_admin: raise HTTPException(status_code=403, detail="You do not have permission to delete this queue item") ApiDependencies.invoker.services.session_queue.delete_queue_item(item_id) except SessionQueueItemNotFoundError: raise HTTPException(status_code=404, detail=f"Queue item with id {item_id} not found in queue {queue_id}") except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while deleting queue item: {e}") @session_queue_router.put( "/{queue_id}/i/{item_id}/cancel", operation_id="cancel_queue_item", responses={ 200: {"model": SessionQueueItem}, }, ) async def cancel_queue_item( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), item_id: int = Path(description="The queue item to cancel"), ) -> SessionQueueItem: """Cancels a queue item. Users can only cancel their own items unless they are an admin.""" try: # Get the queue item to check ownership queue_item = ApiDependencies.invoker.services.session_queue.get_queue_item(item_id) # Check authorization: user must own the item or be an admin if queue_item.user_id != current_user.user_id and not current_user.is_admin: raise HTTPException(status_code=403, detail="You do not have permission to cancel this queue item") return ApiDependencies.invoker.services.session_queue.cancel_queue_item(item_id) except SessionQueueItemNotFoundError: raise HTTPException(status_code=404, detail=f"Queue item with id {item_id} not found in queue {queue_id}") except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while canceling queue item: {e}") @session_queue_router.get( "/{queue_id}/counts_by_destination", operation_id="counts_by_destination", responses={200: {"model": SessionQueueCountsByDestination}}, ) async def counts_by_destination( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to query"), destination: str = Query(description="The destination to query"), ) -> SessionQueueCountsByDestination: """Gets the counts of queue items by destination. Non-admin users only see their own items.""" try: user_id = None if current_user.is_admin else current_user.user_id return ApiDependencies.invoker.services.session_queue.get_counts_by_destination( queue_id=queue_id, destination=destination, user_id=user_id ) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while fetching counts by destination: {e}") @session_queue_router.delete( "/{queue_id}/d/{destination}", operation_id="delete_by_destination", responses={200: {"model": DeleteByDestinationResult}}, ) async def delete_by_destination( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to query"), destination: str = Path(description="The destination to query"), ) -> DeleteByDestinationResult: """Deletes all items with the given destination. Non-admin users can only delete their own items.""" try: # Admin users can delete all items, non-admin users can only delete their own user_id = None if current_user.is_admin else current_user.user_id return ApiDependencies.invoker.services.session_queue.delete_by_destination( queue_id=queue_id, destination=destination, user_id=user_id ) except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error while deleting by destination: {e}")