Files
InvokeAI/invokeai/app/api/sockets.py
Lincoln Stein 33ec16deb4 Feature: Shared/private workflows and image boards in multiuser mode (#9018)
* feat: Per-user workflow libraries in multiuser mode (#114)

* Add per-user workflow isolation: migration 28, service updates, router ownership checks, is_public endpoint, schema regeneration, frontend UI

Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

* feat: add shared workflow checkbox to Details panel, auto-tag, gate edit/delete, fix tests

Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

* Restrict model sync to admin users only (#118)

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

* feat: distinct splash screens for admin/non-admin users in multiuser mode (#116)

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

* Disable Save when editing another user's shared workflow in multiuser mode (#120)

* Disable Save when editing another user's shared workflow in multiuser mode

Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

* chore(app): ruff

* Add board visibility (private/shared/public) feature with tests and UI

Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

* Enforce read-only access for non-owners of shared/public boards in UI

Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

* Fix remaining board access enforcement: invoke icon, drag-out, change-board filter, archive

Co-authored-by: lstein <111189+lstein@users.noreply.github.com>

* fix: allow drag from shared boards to non-board targets (viewer, ref image, etc.)

Previously, images in shared boards owned by another user could not be
dragged at all — the draggable setup was completely skipped in
GalleryImage.tsx when canWriteImages was false. This blocked ALL drop
targets including the viewer, reference image pane, and canvas.

Now images are always draggable. The board-move restriction is enforced
in the dnd target isValid functions instead:
- addImageToBoardDndTarget: rejects moves from shared boards the user
  doesn't own (unless admin or board is public)
- removeImageFromBoardDndTarget: same check

Other drop targets (viewer, reference images, canvas, comparison, etc.)
remain fully functional for shared board images.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(security): add auth requirement to all sensitive routes in multimodal mode

* chore(backend): ruff

* fix (backend): improve user isolation for session queue and recall parameters

 - Sanitize session queue information of all cross-user fields except for the timestamps and status.
 - Recall parameters are now user-scoped.
 - Queue status endpoints now report user-scoped activity rather than global activity
 - Tests added:

  TestSessionQueueSanitization (4 tests):
  1. test_owner_sees_all_fields - Owner sees complete queue item data
  2. test_admin_sees_all_fields - Admin sees complete queue item data
  3. test_non_owner_sees_only_status_timestamps_errors -
     Non-owner sees only item_id, queue_id, status, and timestamps; everything else is redacted
  4. test_sanitization_does_not_mutate_original - Sanitization doesn't modify the original object

  TestRecallParametersIsolation (2 tests):

  5. test_user1_write_does_not_leak_to_user2 - User1's recall params are not visible in user2's client state
  6. test_two_users_independent_state - Both users can write recall params independently without overwriting each other

fix(backend): queue status endpoints report user-scoped stats rather than global stats

* fix(workflow): do not filter default workflows in multiuser mode

  Problem: When categories=['user', 'default'] (or no category filter)
  and user_id was set for multiuser scoping, the SQL query became
     WHERE category IN ('user', 'default') AND user_id = ?,
     which  excluded default workflows (owned by "system").

  Fix: Changed user_id = ? to (user_id = ? OR category = 'default') in
  all 6 occurrences across workflow_records_sqlite.py — in get_many,
  counts_by_category, counts_by_tag, and get_all_tags. Default
  workflows are now always visible regardless of user scoping.

  Tests added (2):
  - test_default_workflows_visible_when_listing_user_and_default — categories=['user','default'] includes both
  - test_default_workflows_visible_when_no_category_filter — no filter still shows defaults

* fix(multiuser): scope queue/recall/intermediates endpoints to current user

Several read-only and event-emitting endpoints were leaking aggregate
cross-user activity in multiuser mode:

- recall_parameters_updated event was broadcast to every queue
  subscriber. Added user_id to the event and routed it to the owner +
  admin rooms only.
- get_queue_status, get_batch_status, counts_by_destination and
  get_intermediates_count now scope counts to the calling user
  (admins still see global state). Removed the now-redundant
  user_pending/user_in_progress fields and simplified QueueCountBadge.
- get_queue_status hides current item_id/session_id/batch_id when the
  current item belongs to another user.

Also fixes test_session_queue_sanitization assertions that lagged
behind the recently expanded redaction set.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore(backend): ruff

* fix(multiuser): reject anonymous websockets and scope queue item events

Close three cross-user leaks in the websocket layer:

- _handle_connect() now rejects connections without a valid JWT in
  multiuser mode (previously fell through to user_id="system"), so
  anonymous clients can no longer subscribe to queue rooms and observe
  other users' activity. In single-user mode it still accepts as system
  admin.
- _handle_sub_queue() no longer silently falls back to the system user
  for an unknown sid in multiuser mode; it refuses the subscription.
- QueueItemStatusChangedEvent and BatchEnqueuedEvent are now routed to
  user:{user_id} + admin rooms instead of the full queue room. Both
  events carry unsanitized user_id, batch_id, origin, destination,
  session_id, and error metadata and must not be broadcast.
- BatchEnqueuedEvent gains a user_id field; emit_batch_enqueued and
  enqueue_batch thread it through.

New TestWebSocketAuth suite covers connect accept/reject for both
modes, sub_queue refusal, and private routing of the queue item and
batch events (plus a QueueClearedEvent sanity check).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(multiuser): verify user record on websocket connect

A deleted or deactivated user with an unexpired JWT could still open a
websocket and subscribe to queue rooms. Now _handle_connect() checks the
backing user record (exists + is_active) in multiuser mode, mirroring
the REST auth path in auth_dependencies.py. Fails closed if the user
service is unavailable.

Tests: added deleted-user and inactive-user rejection tests; updated
valid-token test to create the user in the database first.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(multiuser): close bulk download cross-user exfiltration path

Backend:
- POST /download now validates image read access (per-image) and board
  read access (per-board) before queuing the download.
- GET /download/{name} is intentionally unauthenticated because the
  browser triggers it via <a download> which cannot carry Authorization
  headers. Access control relies on POST-time checks, UUID filename
  unguessability, private socket event routing, and single-fetch deletion.
- Added _assert_board_read_access() helper to images router.
- Threaded user_id through bulk download handler, base class, event
  emission, and BulkDownloadEventBase so events carry the initiator.
- Bulk download service now tracks download ownership via _download_owners
  dict (cleaned up on delete).
- Socket bulk_download room subscription restricted to authenticated
  sockets in multiuser mode.
- Added error-catching in FastAPIEventService._dispatch_from_queue to
  prevent silent event dispatch failures.

Frontend:
- Fixed pre-existing race condition where the "Preparing Download" toast
  from the POST response overwrote the "Ready to Download" toast from the
  socket event (background task completes in ~17ms, so the socket event
  can arrive before Redux processes the HTTP response). Toast IDs are now
  distinct: "preparing:{name}" vs "{name}".
- bulk_download_complete/error handlers now dismiss the preparing toast.

Tests (8 new):
- Bulk download by image names rejected for non-owner (403)
- Bulk download by image names allowed for owner (202)
- Bulk download from private board rejected (403)
- Bulk download from shared board allowed (202)
- Admin can bulk download any images (202)
- Bulk download events carry user_id
- Bulk download event emitted to download room
- GET /download unauthenticated returns 404 for unknown files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(multiuser): enforce board visibility on image listing endpoints

GET /api/v1/images?board_id=... and GET /api/v1/images/names?board_id=...
passed board_id directly to the SQL layer without checking board
visibility. The SQL only applied user_id filtering for board_id="none"
(uncategorized images), so any authenticated user who knew a private
board ID could enumerate its images.

Both endpoints now call _assert_board_read_access() before querying,
returning 403 unless the caller is the board owner, an admin, or the
board is Shared/Public.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore(backend): ruff

* fix(multiuser): require image ownership when adding images to boards

add_image_to_board and add_images_to_board only checked write access to
the destination board, never verifying that the caller owned the source
image.  An attacker could add a victim's image to their own board, then
exploit the board-ownership fallback in _assert_image_owner to gain
delete/patch/star/unstar rights on the image.

Both endpoints now call _assert_image_direct_owner which requires direct
image ownership (image_records.user_id) or admin — board ownership is
intentionally not sufficient, preventing the escalation chain.

Also fixed a pre-existing bug where HTTPException from the inner loop in
add_images_to_board was caught by the outer except-Exception and returned
as 500 instead of propagating the correct status code.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore(backend): ruff

* fix(multiuser): validate image access in recall parameter resolution

The recall endpoint loaded image files and ran ControlNet preprocessors
on any image_name supplied in control_layers or ip_adapters without
checking that the caller could read the image.  An attacker who knew
another user's image UUID could extract dimensions and, for supported
preprocessors, mint a derived processed image they could then fetch.

Added _assert_recall_image_access() which validates read access for every
image referenced in the request before any resolution or processing
occurs.  Access is granted to the image owner, admins, or when the image
sits on a Shared/Public board.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(multiuser): require admin auth on model install job endpoints

list_model_installs, get_model_install_job, pause, resume,
restart_failed, and restart_file were unauthenticated — any caller who
could reach the API could view sensitive install job fields (source,
local_path, error_traceback) and interfere with installation state.

All six endpoints now require AdminUserOrDefault, consistent with the
neighboring cancel and prune routes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(multiuser): close bulk download exfiltration and additional review findings

Bulk download capability token exfiltration:
- Socket events now route to user:{user_id} + admin rooms instead of the
  shared 'default' room (the earlier toast race that blocked this approach
  was fixed in a prior commit).
- GET /download/{name} re-requires CurrentUserOrDefault and enforces
  ownership via get_owner().
- Frontend download handler replaced <a download> (which cannot carry auth
  headers) with fetch() + Authorization header + programmatic blob download.

Additional fixes from reviewer tests:
- Public boards now grant write access in _assert_board_write_access and
  mutation rights in _assert_image_owner (BoardVisibility.Public).
- Uncategorized image listing (GET /boards/none/image_names) now filters
  to the caller's images only, preventing cross-user enumeration.
- board_images router uses board_image_records.get_board_for_image()
  instead of images.get_dto() to avoid dependency on image_files service.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(multiuser): add user_id scoping to workflow SQL mutations

Defense-in-depth: the route layer already checks ownership before
calling update/delete/update_is_public/update_opened_at, but the SQL
statements did not include AND user_id = ?, so a bypass of the route
check would allow cross-user mutations.

All four methods now accept an optional user_id parameter.  When
provided, the SQL WHERE clause is scoped to that user.  The route layer
passes current_user.user_id for non-admin callers and None for admins.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(multiuser): allow non-owner uploads to public boards

upload_image() blocked non-owner uploads even to public boards.  The
board write check now allows uploads when board_visibility is Public,
consistent with the public-board semantics in _assert_board_write_access
and _assert_image_owner.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: lstein <111189+lstein@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Jonathan <34005131+JPPhoto@users.noreply.github.com>
2026-04-13 17:27:20 -04:00

363 lines
17 KiB
Python

# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
from typing import Any
from fastapi import FastAPI
from pydantic import BaseModel
from socketio import ASGIApp, AsyncServer
from invokeai.app.services.auth.token_service import verify_token
from invokeai.app.services.events.events_common import (
BatchEnqueuedEvent,
BulkDownloadCompleteEvent,
BulkDownloadErrorEvent,
BulkDownloadEventBase,
BulkDownloadStartedEvent,
DownloadCancelledEvent,
DownloadCompleteEvent,
DownloadErrorEvent,
DownloadEventBase,
DownloadProgressEvent,
DownloadStartedEvent,
FastAPIEvent,
InvocationCompleteEvent,
InvocationErrorEvent,
InvocationProgressEvent,
InvocationStartedEvent,
ModelEventBase,
ModelInstallCancelledEvent,
ModelInstallCompleteEvent,
ModelInstallDownloadProgressEvent,
ModelInstallDownloadsCompleteEvent,
ModelInstallErrorEvent,
ModelInstallStartedEvent,
ModelLoadCompleteEvent,
ModelLoadStartedEvent,
QueueClearedEvent,
QueueEventBase,
QueueItemStatusChangedEvent,
RecallParametersUpdatedEvent,
register_events,
)
from invokeai.backend.util.logging import InvokeAILogger
logger = InvokeAILogger.get_logger()
class QueueSubscriptionEvent(BaseModel):
"""Event data for subscribing to the socket.io queue room.
This is a pydantic model to ensure the data is in the correct format."""
queue_id: str
class BulkDownloadSubscriptionEvent(BaseModel):
"""Event data for subscribing to the socket.io bulk downloads room.
This is a pydantic model to ensure the data is in the correct format."""
bulk_download_id: str
QUEUE_EVENTS = {
InvocationStartedEvent,
InvocationProgressEvent,
InvocationCompleteEvent,
InvocationErrorEvent,
QueueItemStatusChangedEvent,
BatchEnqueuedEvent,
QueueClearedEvent,
RecallParametersUpdatedEvent,
}
MODEL_EVENTS = {
DownloadCancelledEvent,
DownloadCompleteEvent,
DownloadErrorEvent,
DownloadProgressEvent,
DownloadStartedEvent,
ModelLoadStartedEvent,
ModelLoadCompleteEvent,
ModelInstallDownloadProgressEvent,
ModelInstallDownloadsCompleteEvent,
ModelInstallStartedEvent,
ModelInstallCompleteEvent,
ModelInstallCancelledEvent,
ModelInstallErrorEvent,
}
BULK_DOWNLOAD_EVENTS = {BulkDownloadStartedEvent, BulkDownloadCompleteEvent, BulkDownloadErrorEvent}
class SocketIO:
_sub_queue = "subscribe_queue"
_unsub_queue = "unsubscribe_queue"
_sub_bulk_download = "subscribe_bulk_download"
_unsub_bulk_download = "unsubscribe_bulk_download"
def __init__(self, app: FastAPI):
self._sio = AsyncServer(async_mode="asgi", cors_allowed_origins="*")
self._app = ASGIApp(socketio_server=self._sio, socketio_path="/ws/socket.io")
app.mount("/ws", self._app)
# Track user information for each socket connection
self._socket_users: dict[str, dict[str, Any]] = {}
# Set up authentication middleware
self._sio.on("connect", handler=self._handle_connect)
self._sio.on("disconnect", handler=self._handle_disconnect)
self._sio.on(self._sub_queue, handler=self._handle_sub_queue)
self._sio.on(self._unsub_queue, handler=self._handle_unsub_queue)
self._sio.on(self._sub_bulk_download, handler=self._handle_sub_bulk_download)
self._sio.on(self._unsub_bulk_download, handler=self._handle_unsub_bulk_download)
register_events(QUEUE_EVENTS, self._handle_queue_event)
register_events(MODEL_EVENTS, self._handle_model_event)
register_events(BULK_DOWNLOAD_EVENTS, self._handle_bulk_image_download_event)
async def _handle_connect(self, sid: str, environ: dict, auth: dict | None) -> bool:
"""Handle socket connection and authenticate the user.
Returns True to accept the connection, False to reject it.
Stores user_id in the internal socket users dict for later use.
In multiuser mode, connections without a valid token are rejected outright
so that anonymous clients cannot subscribe to queue rooms and observe
queue activity belonging to other users. In single-user mode, unauthenticated
connections are accepted as the system admin user.
"""
# Extract token from auth data or headers
token = None
if auth and isinstance(auth, dict):
token = auth.get("token")
if not token and environ:
# Try to get token from headers
headers = environ.get("HTTP_AUTHORIZATION", "")
if headers.startswith("Bearer "):
token = headers[7:]
# Verify the token
if token:
token_data = verify_token(token)
if token_data:
# In multiuser mode, also verify the backing user record still
# exists and is active — mirrors the REST auth check in
# auth_dependencies.py. A deleted or deactivated user whose
# JWT has not yet expired must not be allowed to open a socket.
if self._is_multiuser_enabled():
try:
from invokeai.app.api.dependencies import ApiDependencies
user = ApiDependencies.invoker.services.users.get(token_data.user_id)
if user is None or not user.is_active:
logger.warning(f"Rejecting socket {sid}: user {token_data.user_id} not found or inactive")
return False
except Exception:
# If user service is unavailable, fail closed
logger.warning(f"Rejecting socket {sid}: unable to verify user record")
return False
# Store user_id and is_admin in socket users dict
self._socket_users[sid] = {
"user_id": token_data.user_id,
"is_admin": token_data.is_admin,
}
logger.info(
f"Socket {sid} connected with user_id: {token_data.user_id}, is_admin: {token_data.is_admin}"
)
return True
# No valid token provided. In multiuser mode this is not allowed — reject
# the connection so anonymous clients cannot subscribe to queue rooms.
# In single-user mode, fall through and accept the socket as system admin.
if self._is_multiuser_enabled():
logger.warning(
f"Rejecting socket {sid} connection: multiuser mode is enabled and no valid auth token was provided"
)
return False
self._socket_users[sid] = {
"user_id": "system",
"is_admin": True,
}
logger.debug(f"Socket {sid} connected as system admin (single-user mode)")
return True
@staticmethod
def _is_multiuser_enabled() -> bool:
"""Check whether multiuser mode is enabled. Fails closed if configuration
is not yet initialized, which should not happen in practice but prevents
accidentally opening the socket during startup races."""
try:
# Imported here to avoid a circular import at module load time.
from invokeai.app.api.dependencies import ApiDependencies
return bool(ApiDependencies.invoker.services.configuration.multiuser)
except Exception:
# If dependencies are not initialized, fail closed (treat as multiuser)
# so we never accidentally admit an anonymous socket.
return True
async def _handle_disconnect(self, sid: str) -> None:
"""Handle socket disconnection and cleanup user info."""
if sid in self._socket_users:
del self._socket_users[sid]
logger.debug(f"Socket {sid} disconnected and cleaned up")
async def _handle_sub_queue(self, sid: str, data: Any) -> None:
"""Handle queue subscription and add socket to both queue and user-specific rooms."""
queue_id = QueueSubscriptionEvent(**data).queue_id
# Check if we have user info for this socket. In multiuser mode _handle_connect
# will have already rejected any socket without a valid token, so missing user
# info here is a bug — refuse the subscription rather than silently falling back
# to an anonymous system user who could then receive queue item events.
if sid not in self._socket_users:
if self._is_multiuser_enabled():
logger.warning(
f"Refusing queue subscription for socket {sid}: no user info (socket not authenticated via connect event)"
)
return
# Single-user mode: safe to fall back to the system admin user.
self._socket_users[sid] = {
"user_id": "system",
"is_admin": True,
}
user_id = self._socket_users[sid]["user_id"]
is_admin = self._socket_users[sid]["is_admin"]
# Add socket to the queue room
await self._sio.enter_room(sid, queue_id)
# Also add socket to a user-specific room for event filtering
user_room = f"user:{user_id}"
await self._sio.enter_room(sid, user_room)
# If admin, also add to admin room to receive all events
if is_admin:
await self._sio.enter_room(sid, "admin")
logger.debug(
f"Socket {sid} (user_id: {user_id}, is_admin: {is_admin}) subscribed to queue {queue_id} and user room {user_room}"
)
async def _handle_unsub_queue(self, sid: str, data: Any) -> None:
await self._sio.leave_room(sid, QueueSubscriptionEvent(**data).queue_id)
async def _handle_sub_bulk_download(self, sid: str, data: Any) -> None:
# In multiuser mode, only allow authenticated sockets to subscribe.
# Bulk download events are routed to user-specific rooms, so the
# bulk_download_id room subscription is only kept for single-user
# backward compatibility.
if self._is_multiuser_enabled() and sid not in self._socket_users:
logger.warning(f"Refusing bulk download subscription for unknown socket {sid} in multiuser mode")
return
await self._sio.enter_room(sid, BulkDownloadSubscriptionEvent(**data).bulk_download_id)
async def _handle_unsub_bulk_download(self, sid: str, data: Any) -> None:
await self._sio.leave_room(sid, BulkDownloadSubscriptionEvent(**data).bulk_download_id)
async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]):
"""Handle queue events with user isolation.
All queue item events (invocation events AND QueueItemStatusChangedEvent) are
private to the owning user and admins. They carry unsanitized user_id, batch_id,
session_id, origin, destination and error metadata, and must never be broadcast
to the whole queue room — otherwise any other authenticated subscriber could
observe cross-user queue activity.
RecallParametersUpdatedEvent is also private to the owner + admins.
BatchEnqueuedEvent carries the enqueuing user's batch_id/origin/counts and
is also routed privately. QueueClearedEvent is the only queue event that
is still broadcast to the whole queue room.
IMPORTANT: Check InvocationEventBase BEFORE QueueItemEventBase since InvocationEventBase
inherits from QueueItemEventBase. The order of isinstance checks matters!
"""
try:
event_name, event_data = event
# Import here to avoid circular dependency
from invokeai.app.services.events.events_common import InvocationEventBase, QueueItemEventBase
# Check InvocationEventBase FIRST (before QueueItemEventBase) since it's a subclass
# Invocation events (progress, started, complete, error) are private to owner + admins
if isinstance(event_data, InvocationEventBase) and hasattr(event_data, "user_id"):
user_room = f"user:{event_data.user_id}"
# Emit to the user's room
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room)
# Also emit to admin room so admins can see all events, but strip image preview data
# from InvocationProgressEvent to prevent admins from seeing other users' image content
if isinstance(event_data, InvocationProgressEvent):
admin_event_data = event_data.model_copy(update={"image": None})
await self._sio.emit(event=event_name, data=admin_event_data.model_dump(mode="json"), room="admin")
else:
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin")
logger.debug(f"Emitted private invocation event {event_name} to user room {user_room} and admin room")
# Other queue item events (QueueItemStatusChangedEvent) carry unsanitized
# user_id, batch_id, session_id, origin, destination and error metadata.
# They are private to the owning user + admins — never broadcast to the
# full queue room.
elif isinstance(event_data, QueueItemEventBase) and hasattr(event_data, "user_id"):
user_room = f"user:{event_data.user_id}"
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room)
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin")
logger.debug(f"Emitted private queue item event {event_name} to user room {user_room} and admin room")
# RecallParametersUpdatedEvent is private - only emit to owner + admins
elif isinstance(event_data, RecallParametersUpdatedEvent):
user_room = f"user:{event_data.user_id}"
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room)
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin")
logger.debug(f"Emitted private recall_parameters_updated event to user room {user_room} and admin room")
# BatchEnqueuedEvent carries the enqueuing user's batch_id, origin, and
# enqueued counts. Route it privately to the owner + admins so other
# users do not observe cross-user batch activity.
elif isinstance(event_data, BatchEnqueuedEvent):
user_room = f"user:{event_data.user_id}"
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room)
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin")
logger.debug(f"Emitted private batch_enqueued event to user room {user_room} and admin room")
else:
# For remaining queue events (e.g. QueueClearedEvent) that do not
# carry user identity, emit to all subscribers in the queue room.
await self._sio.emit(
event=event_name, data=event_data.model_dump(mode="json"), room=event_data.queue_id
)
logger.debug(
f"Emitted general queue event {event_name} to all subscribers in queue {event_data.queue_id}"
)
except Exception as e:
# Log any unhandled exceptions in event handling to prevent silent failures
logger.error(f"Error handling queue event {event[0]}: {e}", exc_info=True)
async def _handle_model_event(self, event: FastAPIEvent[ModelEventBase | DownloadEventBase]) -> None:
await self._sio.emit(event=event[0], data=event[1].model_dump(mode="json"))
async def _handle_bulk_image_download_event(self, event: FastAPIEvent[BulkDownloadEventBase]) -> None:
event_name, event_data = event
# Route to user-specific + admin rooms so that other authenticated
# users cannot learn the bulk_download_item_name (the capability token
# needed to fetch the zip from the unauthenticated GET endpoint).
# In single-user mode (user_id="system"), fall back to the shared
# bulk_download_id room for backward compatibility.
if hasattr(event_data, "user_id") and event_data.user_id != "system":
user_room = f"user:{event_data.user_id}"
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room)
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin")
else:
await self._sio.emit(
event=event_name, data=event_data.model_dump(mode="json"), room=event_data.bulk_download_id
)