mirror of
https://github.com/invoke-ai/InvokeAI.git
synced 2026-02-04 07:15:13 -05:00
Around the time we (I) implemented pydantic events, I noticed a short pause between progress images every 4 or 5 steps when generating with SDXL. It didn't happen with SD1.5, but I did notice that with SD1.5, we'd get 4 or 5 progress events simultaneously. I'd expect one event every ~25ms, matching my it/s with SD1.5. Mysterious! Digging in, I found an issue is related to our use of a synchronous queue for events. When the event queue is empty, we must call `asyncio.sleep` before checking again. We were sleeping for 100ms. Said another way, every time we clear the event queue, we have to wait 100ms before another event can be dispatched, even if it is put on the queue immediately after we start waiting. In practice, this means our events get buffered into batches, dispatched once every 100ms. This explains why I was getting batches of 4 or 5 SD1.5 progress events at once, but not the intermittent SDXL delay. But this 100ms wait has another effect when the events are put on the queue in intervals that don't perfectly line up with the 100ms wait. This is most noticeable when the time between events is >100ms, and can add up to 100ms delay before the event is dispatched. For example, say the queue is empty and we start a 100ms wait. Then, immediately after - like 0.01ms later - we push an event on to the queue. We still need to wait another 99.9ms before that event will be dispatched. That's the SDXL delay. The easy fix is to reduce the sleep to something like 0.01 seconds, but this feels kinda dirty. Can't we just wait on the queue and dispatch every event immediately? Not with the normal synchronous queue - but we can with `asyncio.Queue`. I switched the events queue to use `asyncio.Queue` (as seen in this commit), which lets us asynchronous wait on the queue in a loop. Unfortunately, I ran into another issue - events now felt like their timing was inconsistent, but in a different way than with the 100ms sleep. The time between pushing events on the queue and dispatching them was not consistently ~0ms as I'd expect - it was highly variable from ~0ms up to ~100ms. This is resolved by passing the asyncio loop directly into the events service and using its methods to create the task and interact with the queue. I don't fully understand why this resolved the issue, because either way we are interacting with the same event loop (as shown by `asyncio.get_running_loop()`). I suppose there's some scheduling magic happening.
152 lines
6.4 KiB
Python
152 lines
6.4 KiB
Python
# Copyright (c) 2022 Kyle Schouviller (https://github.com/kyle0654)
|
|
|
|
import asyncio
|
|
from logging import Logger
|
|
|
|
import torch
|
|
|
|
from invokeai.app.services.board_image_records.board_image_records_sqlite import SqliteBoardImageRecordStorage
|
|
from invokeai.app.services.board_images.board_images_default import BoardImagesService
|
|
from invokeai.app.services.board_records.board_records_sqlite import SqliteBoardRecordStorage
|
|
from invokeai.app.services.boards.boards_default import BoardService
|
|
from invokeai.app.services.bulk_download.bulk_download_default import BulkDownloadService
|
|
from invokeai.app.services.config.config_default import InvokeAIAppConfig
|
|
from invokeai.app.services.download.download_default import DownloadQueueService
|
|
from invokeai.app.services.events.events_fastapievents import FastAPIEventService
|
|
from invokeai.app.services.image_files.image_files_disk import DiskImageFileStorage
|
|
from invokeai.app.services.image_records.image_records_sqlite import SqliteImageRecordStorage
|
|
from invokeai.app.services.images.images_default import ImageService
|
|
from invokeai.app.services.invocation_cache.invocation_cache_memory import MemoryInvocationCache
|
|
from invokeai.app.services.invocation_services import InvocationServices
|
|
from invokeai.app.services.invocation_stats.invocation_stats_default import InvocationStatsService
|
|
from invokeai.app.services.invoker import Invoker
|
|
from invokeai.app.services.model_images.model_images_default import ModelImageFileStorageDisk
|
|
from invokeai.app.services.model_manager.model_manager_default import ModelManagerService
|
|
from invokeai.app.services.model_records.model_records_sql import ModelRecordServiceSQL
|
|
from invokeai.app.services.names.names_default import SimpleNameService
|
|
from invokeai.app.services.object_serializer.object_serializer_disk import ObjectSerializerDisk
|
|
from invokeai.app.services.object_serializer.object_serializer_forward_cache import ObjectSerializerForwardCache
|
|
from invokeai.app.services.session_processor.session_processor_default import (
|
|
DefaultSessionProcessor,
|
|
DefaultSessionRunner,
|
|
)
|
|
from invokeai.app.services.session_queue.session_queue_sqlite import SqliteSessionQueue
|
|
from invokeai.app.services.shared.sqlite.sqlite_util import init_db
|
|
from invokeai.app.services.urls.urls_default import LocalUrlService
|
|
from invokeai.app.services.workflow_records.workflow_records_sqlite import SqliteWorkflowRecordsStorage
|
|
from invokeai.backend.stable_diffusion.diffusion.conditioning_data import ConditioningFieldData
|
|
from invokeai.backend.util.logging import InvokeAILogger
|
|
from invokeai.version.invokeai_version import __version__
|
|
|
|
|
|
# TODO: is there a better way to achieve this?
|
|
def check_internet() -> bool:
|
|
"""
|
|
Return true if the internet is reachable.
|
|
It does this by pinging huggingface.co.
|
|
"""
|
|
import urllib.request
|
|
|
|
host = "http://huggingface.co"
|
|
try:
|
|
urllib.request.urlopen(host, timeout=1)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
logger = InvokeAILogger.get_logger()
|
|
|
|
|
|
class ApiDependencies:
|
|
"""Contains and initializes all dependencies for the API"""
|
|
|
|
invoker: Invoker
|
|
|
|
@staticmethod
|
|
def initialize(
|
|
config: InvokeAIAppConfig,
|
|
event_handler_id: int,
|
|
loop: asyncio.AbstractEventLoop,
|
|
logger: Logger = logger,
|
|
) -> None:
|
|
logger.info(f"InvokeAI version {__version__}")
|
|
logger.info(f"Root directory = {str(config.root_path)}")
|
|
|
|
output_folder = config.outputs_path
|
|
if output_folder is None:
|
|
raise ValueError("Output folder is not set")
|
|
|
|
image_files = DiskImageFileStorage(f"{output_folder}/images")
|
|
|
|
model_images_folder = config.models_path
|
|
|
|
db = init_db(config=config, logger=logger, image_files=image_files)
|
|
|
|
configuration = config
|
|
logger = logger
|
|
|
|
board_image_records = SqliteBoardImageRecordStorage(db=db)
|
|
board_images = BoardImagesService()
|
|
board_records = SqliteBoardRecordStorage(db=db)
|
|
boards = BoardService()
|
|
events = FastAPIEventService(event_handler_id, loop=loop)
|
|
bulk_download = BulkDownloadService()
|
|
image_records = SqliteImageRecordStorage(db=db)
|
|
images = ImageService()
|
|
invocation_cache = MemoryInvocationCache(max_cache_size=config.node_cache_size)
|
|
tensors = ObjectSerializerForwardCache(
|
|
ObjectSerializerDisk[torch.Tensor](output_folder / "tensors", ephemeral=True)
|
|
)
|
|
conditioning = ObjectSerializerForwardCache(
|
|
ObjectSerializerDisk[ConditioningFieldData](output_folder / "conditioning", ephemeral=True)
|
|
)
|
|
download_queue_service = DownloadQueueService(app_config=configuration, event_bus=events)
|
|
model_images_service = ModelImageFileStorageDisk(model_images_folder / "model_images")
|
|
model_manager = ModelManagerService.build_model_manager(
|
|
app_config=configuration,
|
|
model_record_service=ModelRecordServiceSQL(db=db, logger=logger),
|
|
download_queue=download_queue_service,
|
|
events=events,
|
|
)
|
|
names = SimpleNameService()
|
|
performance_statistics = InvocationStatsService()
|
|
session_processor = DefaultSessionProcessor(session_runner=DefaultSessionRunner())
|
|
session_queue = SqliteSessionQueue(db=db)
|
|
urls = LocalUrlService()
|
|
workflow_records = SqliteWorkflowRecordsStorage(db=db)
|
|
|
|
services = InvocationServices(
|
|
board_image_records=board_image_records,
|
|
board_images=board_images,
|
|
board_records=board_records,
|
|
boards=boards,
|
|
bulk_download=bulk_download,
|
|
configuration=configuration,
|
|
events=events,
|
|
image_files=image_files,
|
|
image_records=image_records,
|
|
images=images,
|
|
invocation_cache=invocation_cache,
|
|
logger=logger,
|
|
model_images=model_images_service,
|
|
model_manager=model_manager,
|
|
download_queue=download_queue_service,
|
|
names=names,
|
|
performance_statistics=performance_statistics,
|
|
session_processor=session_processor,
|
|
session_queue=session_queue,
|
|
urls=urls,
|
|
workflow_records=workflow_records,
|
|
tensors=tensors,
|
|
conditioning=conditioning,
|
|
)
|
|
|
|
ApiDependencies.invoker = Invoker(services)
|
|
db.clean()
|
|
|
|
@staticmethod
|
|
def shutdown() -> None:
|
|
if ApiDependencies.invoker:
|
|
ApiDependencies.invoker.stop()
|