From fd82763412ae1e397b206aa2b46e38c00a455c3a Mon Sep 17 00:00:00 2001 From: Sergey Borisov Date: Thu, 18 May 2023 03:56:52 +0300 Subject: [PATCH] Model manager draft --- invokeai/backend/__init__.py | 2 +- invokeai/backend/model_management/__init__.py | 2 +- .../backend/model_management/model_cache.py | 1022 +++++++++-------- .../backend/model_management/model_manager.py | 48 +- .../frontend/web/src/services/thunks/model.ts | 7 +- 5 files changed, 561 insertions(+), 520 deletions(-) diff --git a/invokeai/backend/__init__.py b/invokeai/backend/__init__.py index e06e220ffe..448bd59f00 100644 --- a/invokeai/backend/__init__.py +++ b/invokeai/backend/__init__.py @@ -10,7 +10,7 @@ from .generator import ( Img2Img, Inpaint ) -from .model_management import ModelManager, ModelCache, ModelStatus, SDModelType, SDModelInfo +from .model_management import ModelManager, ModelCache, SDModelType, SDModelInfo from .safety_checker import SafetyChecker from .args import Args from .globals import Globals diff --git a/invokeai/backend/model_management/__init__.py b/invokeai/backend/model_management/__init__.py index 89114d7de4..2ed7e21ef8 100644 --- a/invokeai/backend/model_management/__init__.py +++ b/invokeai/backend/model_management/__init__.py @@ -2,4 +2,4 @@ Initialization file for invokeai.backend.model_management """ from .model_manager import ModelManager, SDModelInfo -from .model_cache import ModelCache, ModelStatus, SDModelType +from .model_cache import ModelCache, SDModelType diff --git a/invokeai/backend/model_management/model_cache.py b/invokeai/backend/model_management/model_cache.py index ff1be73b02..8050eb58b8 100644 --- a/invokeai/backend/model_management/model_cache.py +++ b/invokeai/backend/model_management/model_cache.py @@ -18,125 +18,447 @@ context. Use like this: import contextlib import gc +import os +import sys import hashlib import warnings -from collections import Counter from contextlib import suppress from enum import Enum from pathlib import Path -from typing import Dict, Sequence, Union, Set, Tuple, types, Optional +from typing import Dict, Sequence, Union, Tuple, types, Optional, List, Type, Any import torch import safetensors.torch - -from diffusers import DiffusionPipeline, StableDiffusionPipeline, AutoencoderKL, SchedulerMixin, UNet2DConditionModel, ConfigMixin + +from diffusers import DiffusionPipeline, SchedulerMixin, ConfigMixin from diffusers import logging as diffusers_logging -from diffusers.pipelines.stable_diffusion.safety_checker import \ - StableDiffusionSafetyChecker from huggingface_hub import HfApi from picklescan.scanner import scan_file_path from pydantic import BaseModel -from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizer from transformers import logging as transformers_logging import invokeai.backend.util.logging as logger from ..globals import global_cache_dir -from ..stable_diffusion import StableDiffusionGeneratorPipeline + + +def get_model_path(repo_id_or_path: str): + if os.path.exists(repo_id_or_path): + return repo_id_or_path + + cache = scan_cache_dir(global_cache_dir("hub")) + for repo in cache.repos: + if repo.repo_id != repo_id_or_path: + continue + for rev in repo.revisions: + if "main" in rev.refs: + return rev.snapshot_path + raise Exception(f"{repo_id_or_path} - not found") + +def calc_model_size_by_fs( + repo_id_or_path: str, + subfolder: Optional[str] = None, + variant: Optional[str] = None +): + model_path = get_model_path(repo_id_or_path) + if subfolder is not None: + model_path = os.path.join(model_path, subfolder) + + all_files = os.listdir(model_path) + all_files = [f for f in all_files if os.path.isfile(os.path.join(model_path, f))] + + fp16_files = set([f for f in all_files if ".fp16." in f or ".fp16-" in f]) + bit8_files = set([f for f in all_files if ".8bit." in f or ".8bit-" in f]) + other_files = set(all_files) - fp16_files - bit8_files + + if variant is None: + files = other_files + elif variant == "fp16": + files = fp16_files + elif variant == "8bit": + files = bit8_files + else: + raise NotImplementedError(f"Unknown variant: {variant}") + + # try read from index if exists + index_postfix = ".index.json" + if variant is not None: + index_postfix = f".index.{variant}.json" + + for file in files: + if not file.endswith(index_postfix): + continue + try: + with open(os.path.join(model_path, index_file), "r") as f: + index_data = json.loads(f.read()) + return int(index_data["metadata"]["total_size"]) + except: + pass + + # calculate files size if there is no index file + formats = [ + (".safetensors",), # safetensors + (".bin",), # torch + (".onnx", ".pb"), # onnx + (".msgpack",), # flax + (".ckpt",), # tf + (".h5",), # tf2 + ] + + for file_format in formats: + model_files = [f for f in files if f.endswith(file_format)] + if len(model_files) == 0: + continue + + model_size = 0 + for model_file in model_files: + file_stats = os.stat(os.path.join(model_path, model_file)) + model_size += file_stats.st_size + return model_size + + #raise NotImplementedError(f"Unknown model structure! Files: {all_files}") + return 0 # scheduler/feature_extractor/tokenizer - models without loading to gpu + + +def calc_model_size_by_data(model) -> int: + if isinstance(model, DiffusionPipeline): + return _calc_pipeline_by_data(model) + elif isinstance(model, torch.nn.Module): + return _calc_model_by_data(model) + else: + return 0 + + +def _calc_pipeline_by_data(pipeline) -> int: + res = 0 + for submodel_key in pipeline.components.keys(): + submodel = getattr(pipeline, submodel_key) + if submodel is not None and isinstance(submodel, torch.nn.Module): + res += _calc_model_by_data(submodel) + return res + + +def _calc_model_by_data(model) -> int: + mem_params = sum([param.nelement()*param.element_size() for param in model.parameters()]) + mem_bufs = sum([buf.nelement()*buf.element_size() for buf in model.buffers()]) + mem = mem_params + mem_bufs # in bytes + return mem + + + + +class SDModelType(str, Enum): + Diffusers = "diffusers" + Classifier = "classifier" + UNet = "unet" + TextEncoder = "text_encoder" + Tokenizer = "tokenizer" + Vae = "vae" + Scheduler = "scheduler" + + +class ModelInfoBase: + #model_path: str + #model_type: SDModelType + + def __init__(self, repo_id_or_path: str, model_type: SDModelType): + self.repo_id_or_path = repo_id_or_path # TODO: or use allways path? + self.model_path = get_model_path(repo_id_or_path) + self.model_type = model_type + + def _definition_to_type(self, subtypes: List[str]) -> Type: + if len(subtypes) < 2: + raise Exception("Invalid subfolder definition!") + if subtypes[0] in ["diffusers", "transformers"]: + res_type = sys.modules[subtypes[0]] + subtypes = subtypes[1:] + + else: + res_type = sys.modules["diffusers"] + res_type = getattr(res_type, "pipelines") + + + for subtype in subtypes: + res_type = getattr(res_type, subtype) + return res_type + + +class DiffusersModelInfo(ModelInfoBase): + #child_types: Dict[str, Type] + #child_sizes: Dict[str, int] + + def __init__(self, repo_id_or_path: str, model_type: SDModelType): + assert model_type == SDModelType.Diffusers + super().__init__(repo_id_or_path, model_type) + + self.child_types: Dict[str, Type] = dict() + self.child_sizes: Dict[str, int] = dict() + + try: + config_data = DiffusionPipeline.load_config(repo_id_or_path) + #config_data = json.loads(os.path.join(self.model_path, "model_index.json")) + except: + raise Exception("Invalid diffusers model! (model_index.json not found or invalid)") + + config_data.pop("_ignore_files", None) + + # retrieve all folder_names that contain relevant files + child_components = [k for k, v in config_data.items() if isinstance(v, list)] + + for child_name in child_components: + child_type = self._definition_to_type(config_data[child_name]) + self.child_types[child_name] = child_type + self.child_sizes[child_name] = calc_model_size_by_fs(repo_id_or_path, subfolder=child_name) + + + def get_size(self, child_type: Optional[SDModelType] = None): + if child_type is None: + return sum(self.child_sizes.values()) + else: + return self.child_sizes[child_type] + + + def get_model( + self, + child_type: Optional[SDModelType] = None, + torch_dtype: Optional[torch.dtype] = None, + ): + # return pipeline in different function to pass more arguments + if child_type is None: + raise Exception("Child model type can't be null on diffusers model") + if child_type not in self.child_types: + return None # TODO: or raise + + # TODO: + for variant in ["fp16", "main", None]: + try: + model = self.child_types[child_type].from_pretrained( + self.repo_id_or_path, + subfolder=child_type.value, + cache_dir=global_cache_dir('hub'), + torch_dtype=torch_dtype, + variant=variant, + ) + break + except Exception as e: + print("====ERR LOAD====") + print(f"{variant}: {e}") + + # calc more accurate size + self.child_sizes[child_type] = calc_model_size_by_data(model) + return model + + + def get_pipeline(self, **kwrags): + return DiffusionPipeline.from_pretrained( + self.repo_id_or_path, + **kwargs, + ) + + +class EmptyConfigLoader(ConfigMixin): + + @classmethod + def load_config(cls, *args, **kwargs): + cls.config_name = kwargs.pop("config_name") + return super().load_config(*args, **kwargs) + + +class ClassifierModelInfo(ModelInfoBase): + #child_types: Dict[str, Type] + #child_sizes: Dict[str, int] + + def __init__(self, repo_id_or_path: str, model_type: SDModelType): + assert model_type == SDModelType.Classifier + super().__init__(repo_id_or_path, model_type) + + self.child_types: Dict[str, Type] = dict() + self.child_sizes: Dict[str, int] = dict() + + try: + main_config = EmptyConfigLoader.load_config(repo_id_or_path, config_name="config.json") + #main_config = json.loads(os.path.join(self.model_path, "config.json")) + except: + raise Exception("Invalid classifier model! (config.json not found or invalid)") + + self._load_tokenizer(main_config) + self._load_text_encoder(main_config) + self._load_feature_extractor(main_config) + + + def _load_tokenizer(self, main_config: dict): + try: + tokenizer_config = EmptyConfigLoader.load_config(repo_id_or_path, config_name="tokenizer_config.json") + #tokenizer_config = json.loads(os.path.join(self.model_path, "tokenizer_config.json")) + except: + raise Exception("Invalid classifier model! (Failed to load tokenizer_config.json)") + + if "tokenizer_class" in tokenizer_config: + tokenizer_class_name = tokenizer_config["tokenizer_class"] + elif "model_type" in main_config: + tokenizer_class_name = transformers.models.auto.tokenization_auto.TOKENIZER_MAPPING_NAMES[main_config["model_type"]] + else: + raise Exception("Invalid classifier model! (Failed to detect tokenizer type)") + + self.child_types[SDModelType.Tokenizer] = self._definition_to_type(["transformers", tokenizer_class_name]) + self.child_sizes[SDModelType.Tokenizer] = 0 + + + def _load_text_encoder(self, main_config: dict): + if "architectures" in main_config and len(main_config["architectures"]) > 0: + text_encoder_class_name = main_config["architectures"][0] + elif "model_type" in main_config: + text_encoder_class_name = transformers.models.auto.modeling_auto.MODEL_FOR_PRETRAINING_MAPPING_NAMES[main_config["model_type"]] + else: + raise Exception("Invalid classifier model! (Failed to detect text_encoder type)") + + self.child_types[SDModelType.TextEncoder] = self._definition_to_type(["transformers", text_encoder_class_name]) + self.child_sizes[SDModelType.TextEncoder] = calc_model_size_by_fs(repo_id_or_path) + + + def _load_feature_extractor(self, main_config: dict): + self.child_sizes[SDModelType.FeatureExtractor] = 0 + try: + feature_extractor_config = EmptyConfigLoader.load_config(repo_id_or_path, config_name="preprocessor_config.json") + except: + return # feature extractor not passed with t5 + + try: + feature_extractor_class_name = feature_extractor_config["feature_extractor_type"] + self.child_types[SDModelType.FeatureExtractor] = self._definition_to_type(["transformers", feature_extractor_class_name]) + except: + raise Exception("Invalid classifier model! (Unknown feature_extrator type)") + + + def get_size(self, child_type: Optional[SDModelType] = None): + if child_type is None: + return sum(self.child_sizes.values()) + else: + return self.child_sizes[child_type] + + + def get_model( + self, + child_type: Optional[SDModelType] = None, + torch_dtype: Optional[torch.dtype] = None, + ): + if child_type is None: + raise Exception("Child model type can't be null on classififer model") + if child_type not in self.child_types: + return None # TODO: or raise + + model = self.child_types[child_type].from_pretrained( + self.repo_id_or_path, + subfolder=child_type.value, + cache_dir=global_cache_dir('hub'), + torch_dtype=torch_dtype, + ) + # calc more accurate size + self.child_sizes[child_type] = calc_model_size_by_data(model) + return model + + + +class VaeModelInfo(ModelInfoBase): + #vae_class: Type + #model_size: int + + def __init__(self, repo_id_or_path: str, model_type: SDModelType): + assert model_type == SDModelType.Vae + super().__init__(repo_id_or_path, model_type) + + try: + config = EmptyConfigLoader.load_config(repo_id_or_path, config_name="config.json") + #config = json.loads(os.path.join(self.model_path, "config.json")) + except: + raise Exception("Invalid vae model! (config.json not found or invalid)") + + try: + vae_class_name = config.get("_class_name", "AutoencoderKL") + self.vae_class = self._definition_to_type(["diffusers", vae_class_name]) + self.model_size = calc_model_size_by_fs(repo_id_or_path) + except: + raise Exception("Invalid vae model! (Unkown vae type)") + + def get_size(self, child_type: Optional[SDModelType] = None): + if child_type is not None: + raise Exception("There is no child models in vae model") + return self.model_size + + def get_model( + self, + child_type: Optional[SDModelType] = None, + torch_dtype: Optional[torch.dtype] = None, + ): + if child_type is not None: + raise Exception("There is no child models in vae model") + + model = self.vae_type.from_pretrained( + self.repo_id_or_path, + cache_dir=global_cache_dir('hub'), + torch_dtype=torch_dtype, + ) + # calc more accurate size + self.model_size = calc_model_size_by_data(model) + return model + + +MODEL_TYPES = { + SDModelType.Diffusers: DiffusersModelInfo, + SDModelType.Classifier: ClassifierModelInfo, + SDModelType.Vae: VaeModelInfo, +} + # Maximum size of the cache, in gigs # Default is roughly enough to hold three fp16 diffusers models in RAM simultaneously DEFAULT_MAX_CACHE_SIZE = 6.0 # actual size of a gig -GIG = 1073741824 - -# This is the mapping from the stable diffusion submodel dict key to the class -class LoraType(dict): - pass -class TIType(dict): - pass - -class SDModelType(str, Enum): - Diffusers="diffusers" # whole pipeline - Vae="vae" # diffusers parts - TextEncoder="text_encoder" - Tokenizer="tokenizer" - UNet="unet" - Scheduler="scheduler" - SafetyChecker="safety_checker" - FeatureExtractor="feature_extractor" - # These are all loaded as dicts of tensors, and we - # distinguish them by class - Lora="lora" - TextualInversion="textual_inversion" +GIG = 1073741824 # TODO: class EmptyScheduler(SchedulerMixin, ConfigMixin): pass -MODEL_CLASSES = { - SDModelType.Diffusers: StableDiffusionGeneratorPipeline, - SDModelType.Vae: AutoencoderKL, - SDModelType.TextEncoder: CLIPTextModel, # TODO: t5 - SDModelType.Tokenizer: CLIPTokenizer, # TODO: t5 - SDModelType.UNet: UNet2DConditionModel, - SDModelType.Scheduler: EmptyScheduler, - SDModelType.SafetyChecker: StableDiffusionSafetyChecker, - SDModelType.FeatureExtractor: CLIPFeatureExtractor, - - SDModelType.Lora: LoraType, - SDModelType.TextualInversion: TIType, -} - -DIFFUSERS_PARTS = { - SDModelType.Vae, - SDModelType.TextEncoder, - SDModelType.Tokenizer, - SDModelType.UNet, - SDModelType.Scheduler, - SDModelType.SafetyChecker, - SDModelType.FeatureExtractor, -} - -class ModelStatus(Enum): - unknown='unknown' - not_loaded='not loaded' - in_ram='cached' - in_vram='in gpu' - active='locked in gpu' - -# This is used to guesstimate the size of a model before we load it. -# After loading, we will know it exactly. -# Sizes are in Gigs, estimated for float16; double for float32 -SIZE_GUESSTIMATE = { - SDModelType.Diffusers: 2.2, - SDModelType.Vae: 0.35, - SDModelType.TextEncoder: 0.5, - SDModelType.Tokenizer: 0.001, - SDModelType.UNet: 3.4, - SDModelType.Scheduler: 0.001, - SDModelType.SafetyChecker: 1.2, - SDModelType.FeatureExtractor: 0.001, - SDModelType.Lora: 0.1, - SDModelType.TextualInversion: 0.001, -} - -# The list of model classes we know how to fetch, for typechecking -ModelClass = Union[tuple([x for x in MODEL_CLASSES.values()])] -DiffusionClasses = (StableDiffusionGeneratorPipeline, AutoencoderKL, EmptyScheduler, UNet2DConditionModel, CLIPTextModel) - -class UnsafeModelException(Exception): - "Raised when a legacy model file fails the picklescan test" - pass - -class UnscannableModelException(Exception): - "Raised when picklescan is unable to scan a legacy model file" - pass - class ModelLocker(object): "Forward declaration" pass +class ModelCache(object): + "Forward declaration" + pass + +class _CacheRecord: + model: Any + size: int + _locks: int + _cache: ModelCache + + def __init__(self, cache, model: Any, size: int): + self._cache = cache + self.model = model + self.size = size + self._locks = 0 + + def lock(self): + self._locks += 1 + + def unlock(self): + self._locks -= 1 + assert self._locks >= 0 + + @property + def locked(self): + return self._locks > 0 + + @property + def loaded(self): + if self.model is not None and hasattr(self.model, "device"): + return self.model.device != self._cache.storage_device + else: + return False + + class ModelCache(object): def __init__( self, @@ -158,7 +480,11 @@ class ModelCache(object): :param sequential_offload: Conserve VRAM by loading and unloading each stage of the pipeline sequentially :param sha_chunksize: Chunksize to use when calculating sha256 model hash ''' - self.models: dict = dict() + max_cache_size = 9999 + execution_device = torch.device('cuda') + + self.models: Dict[str, _CacheRecord] = dict() + self.model_infos: Dict[str, ModelInfoBase] = dict() self.stack: Sequence = list() self.lazy_offloading = lazy_offloading self.sequential_offload: bool=sequential_offload @@ -169,145 +495,105 @@ class ModelCache(object): self.storage_device: torch.device=storage_device self.sha_chunksize=sha_chunksize self.logger = logger - self.loaded_models: set = set() # set of model keys loaded in GPU - self.locked_models: Counter = Counter() # set of model keys locked in GPU - self.model_sizes: Dict[str,int] = dict() + + def get_key( + self, + model_path: str, + model_type: SDModelType, + revision: Optional[str] = None, + submodel_type: Optional[SDModelType] = None, + ): + revision = revision or "main" + + key = f"{model_path}:{model_type}:{revision}" + if submodel_type: + key += f":{submodel_type}" + return key + + #def get_model( + # self, + # repo_id_or_path: Union[str, Path], + # model_type: SDModelType = SDModelType.Diffusers, + # subfolder: Path = None, + # submodel: SDModelType = None, + # revision: str = None, + # attach_model_part: Tuple[SDModelType, str] = (None, None), + # gpu_load: bool = True, + #) -> ModelLocker: # ?? what does it return + def _get_model_info( + self, + model_path: str, + model_type: SDModelType, + revision: str, + ): + model_info_key = self.get_key( + model_path=model_path, + model_type=model_type, + revision=revision, + submodel_type=None, + ) + + if model_info_key not in self.model_infos: + if model_type not in MODEL_TYPES: + raise Exception(f"Unknown/unsupported model type: {model_type}") + + self.model_infos[model_info_key] = MODEL_TYPES[model_type]( + model_path, + model_type, + ) + + return self.model_infos[model_info_key] def get_model( self, repo_id_or_path: Union[str, Path], model_type: SDModelType = SDModelType.Diffusers, - subfolder: Path = None, submodel: SDModelType = None, revision: str = None, - attach_model_parts: Optional[Set[Tuple[SDModelType, str]]] = None, gpu_load: bool = True, - ) -> ModelLocker: # ?? what does it return - ''' - Load and return a HuggingFace model wrapped in a context manager generator, with RAM caching. - Use like this: + ) -> Any: - cache = ModelCache() - with cache.get_model('stabilityai/stable-diffusion-2') as model: - do_something_with_the_model(model) - - While in context, model will be locked into GPU. If you want to do something - with the model while it is in RAM, just use the context's `model` attribute: - - context = cache.get_model('stabilityai/stable-diffusion-2') - context.model.device - # device(type='cpu') - - with context as model: - model.device - # device(type='cuda') - - You can fetch an individual part of a diffusers model by passing the submodel - argument: - - vae_context = cache.get_model( - 'stabilityai/sd-stable-diffusion-2', - submodel=SDModelType.Vae - ) - - This is equivalent to: - - vae_context = cache.get_model( - 'stabilityai/sd-stable-diffusion-2', - model_type = SDModelType.Vae, - subfolder='vae' - ) - - Vice versa, you can load and attach an external submodel to a diffusers model - before returning it by passing the attach_submodel argument. This only works with - diffusers models: - - pipeline_context = cache.get_model( - 'runwayml/stable-diffusion-v1-5', - attach_model_parts=set( - [SDModelType.Vae,'stabilityai/sd-vae-ft-mse'] - [SDModelType.UNet,'runwayml/stable-diffusion-1.5','unet'] #type, ID, subfolder - ) - ) - - The model will be locked into GPU VRAM for the duration of the context. - :param repo_id_or_path: either the HuggingFace repo_id or a Path to a local model - :param model_type: An SDModelType enum indicating the type of the (parent) model - :param subfolder: name of a subfolder in which the model can be found, e.g. "vae" - :param submodel: an SDModelType enum indicating the model part to return, e.g. SDModelType.Vae - :param attach_model_parts: load and attach a diffusers model component. Pass a set of tuple of format (SDModelType,repo_id_or_path,subfolder) - :param revision: model revision - :param gpu_load: load the model into GPU [default True] - ''' - key = self._model_key( # internal unique identifier for the model - repo_id_or_path, - revision, - subfolder, - model_type, + model_path = get_model_path(repo_id_or_path) + model_info = self._get_model_info( + model_path=model_path, + model_type=model_type, + revision=revision, ) - # optimization: if caller is asking to load a submodel of a diffusers pipeline, then - # check whether it is already cached in RAM and return it instead of loading from disk again - if subfolder and not submodel: - possible_parent_key = self._model_key( - repo_id_or_path, - revision, - None, - SDModelType.Diffusers - ) - if possible_parent_key in self.models: - key = possible_parent_key - submodel = model_type + key = self.get_key( + model_path=model_path, + model_type=model_type, + revision=revision, + submodel_type=submodel, + ) - # Look for the model in the cache RAM - if key in self.models: # cached - move to bottom of stack (most recently used) - with contextlib.suppress(ValueError): - self.stack.remove(key) - self.stack.append(key) - model = self.models[key] - - else: # not cached -load - self.logger.info(f'Loading model {repo_id_or_path}, type {model_type}') + if key not in self.models: + self.logger.info(f'Loading model {repo_id_or_path}, type {model_type}:{submodel}') # this will remove older cached models until # there is sufficient room to load the requested model - self._make_cache_room(key, model_type) + self._make_cache_room(model_info.get_size(submodel)) # clean memory to make MemoryUsage() more accurate gc.collect() - model = self._load_model_from_storage( - repo_id_or_path=repo_id_or_path, - model_type=model_type, - subfolder=subfolder, - revision=revision, - ) - - if mem_used := self.calc_model_size(model): + model_obj = model_info.get_model(submodel, torch_dtype=self.precision) + if mem_used := model_info.get_size(submodel): logger.debug(f'CPU RAM used for load: {(mem_used/GIG):.2f} GB') - self.model_sizes[key] = mem_used # remember size of this model for cache cleansing self.current_cache_size += mem_used # increment size of the cache - # this is a bit of legacy work needed to support the old-style "load this diffuser with custom VAE" - if model_type == SDModelType.Diffusers and attach_model_parts: - for attach_model_part in attach_model_parts: - self.attach_part(model, *attach_model_part) + self.models[key] = _CacheRecord(self, model_obj, mem_used) - self.stack.append(key) # add to LRU cache - self.models[key] = model # keep copy of model in dict - - if submodel: - model = getattr(model, submodel) + with suppress(Exception): + self.stack.remove(key) + self.stack.append(key) - return self.ModelLocker(self, key, model, gpu_load) + return self.ModelLocker(self, key, self.models[key].model, gpu_load) def uncache_model(self, key: str): '''Remove corresponding model from the cache''' - if key is not None and key in self.models: - self.models.pop(key, None) - self.locked_models.pop(key, None) - self.loaded_models.discard(key) - with contextlib.suppress(ValueError): - self.stack.remove(key) + self.models.pop(key, None) + with contextlib.suppress(ValueError): + self.stack.remove(key) class ModelLocker(object): def __init__(self, cache, key, model, gpu_load): @@ -318,109 +604,45 @@ class ModelCache(object): # is garbage collected. Needs testing! self.model = model - def __enter__(self)->ModelClass: - cache = self.cache - key = self.key - model = self.model - + def __enter__(self) -> Any: + if not hasattr(self.model, 'to'): + return self.model + + cache_entry = self.cache.models[self.key] + # NOTE that the model has to have the to() method in order for this # code to move it into GPU! - if self.gpu_load and hasattr(model,'to'): - cache.loaded_models.add(key) - cache.locked_models[key] += 1 + if self.gpu_load: + cache_entry.lock() - if cache.lazy_offloading: - cache._offload_unlocked_models() + if self.cache.lazy_offloading: + self.cache._offload_unlocked_models() - if model.device != cache.execution_device and \ - not (self.cache.sequential_offload \ - and isinstance(model, StableDiffusionGeneratorPipeline) - ): - - cache.logger.debug(f'Moving {key} into {cache.execution_device}') + if self.model.device != self.cache.execution_device: + self.cache.logger.debug(f'Moving {self.key} into {self.cache.execution_device}') with VRAMUsage() as mem: - self._to(model,cache.execution_device) - - self.cache.logger.debug(f'Locked {key} in {cache.execution_device}') - cache.logger.debug(f'GPU VRAM used for load: {(mem.vram_used/GIG):.2f} GB') - cache.model_sizes[key] = mem.vram_used # more accurate size + self.model.to(self.cache.execution_device) # move into GPU + self.cache.logger.debug(f'GPU VRAM used for load: {(mem.vram_used/GIG):.2f} GB') - cache._print_cuda_stats() - - else: - # in the event that the caller wants the model in RAM, we - # move it into CPU if it is in GPU and not locked - if hasattr(model, 'to') and (key in cache.loaded_models - and cache.locked_models[key] == 0): - self._to(model,cache.storage_device) - # model.to(cache.storage_device) - cache.loaded_models.remove(key) - return model + self.cache.logger.debug(f'Locking {self.key} in {self.cache.execution_device}') + self.cache._print_cuda_stats() + + # TODO: not fully understand + # in the event that the caller wants the model in RAM, we + # move it into CPU if it is in GPU and not locked + elif cache_entry.loaded and not cache_entry.locked: + self.model.to(self.cache.storage_device) + + return self.model def __exit__(self, type, value, traceback): if not hasattr(self.model, 'to'): return - key = self.key - cache = self.cache - cache.locked_models[key] -= 1 - if not cache.lazy_offloading: - cache._offload_unlocked_models() - cache._print_cuda_stats() - - def _to(self, model, device): - model.to(device) - if isinstance(model,MODEL_CLASSES[SDModelType.Diffusers]): - for part in DIFFUSERS_PARTS: - with suppress(Exception): - getattr(model,part).to(device) - - def attach_part( - self, - diffusers_model: StableDiffusionPipeline, - part_type: SDModelType, - part_id: str, - subfolder: Optional[str] = None - ): - ''' - Attach a diffusers model part to a diffusers model. This can be - used to replace the VAE, tokenizer, textencoder, unet, etc. - :param diffuser_model: The diffusers model to attach the part to. - :param part_type: An SD ModelType indicating the part - :param part_id: A HF repo_id for the part - ''' - part = self._load_diffusers_from_storage( - part_id, - model_type=part_type, - subfolder=subfolder, - ) - if hasattr(part,'to'): - part.to(diffusers_model.device) - setattr(diffusers_model, part_type, part) - self.logger.debug(f'Attached {part_type} {part_id}') - - def status( - self, - repo_id_or_path: Union[str, Path], - model_type: SDModelType = SDModelType.Diffusers, - revision: str = None, - subfolder: Path = None, - ) -> ModelStatus: - key = self._model_key( - repo_id_or_path, - revision, - subfolder, - model_type, - ) - if key not in self.models: - return ModelStatus.not_loaded - if key in self.loaded_models: - if self.locked_models[key] > 0: - return ModelStatus.active - else: - return ModelStatus.in_vram - else: - return ModelStatus.in_ram + self.cache.models[self.key].unlock() + if not self.cache.lazy_offloading: + self.cache._offload_unlocked_models() + self.cache._print_cuda_stats() def model_hash( self, @@ -443,178 +665,53 @@ class ModelCache(object): "Return the current size of the cache, in GB" return self.current_cache_size / GIG - @classmethod - def scan_model(cls, model_name, checkpoint): - """ - Apply picklescanner to the indicated checkpoint and issue a warning - and option to exit if an infected file is identified. - """ - # scan model - logger.debug(f"Scanning Model: {model_name}") - scan_result = scan_file_path(checkpoint) - if scan_result.infected_files != 0: - if scan_result.infected_files == 1: - raise UnsafeModelException("The legacy model you are trying to load may contain malware. Aborting.") - else: - raise UnscannableModelException("InvokeAI was unable to scan the legacy model you requested. Aborting") - else: - logger.debug("Model scanned ok") - - @staticmethod - def _model_key(path, revision, subfolder, model_class) -> str: - return ':'.join([ - str(path), - str(revision or ''), - str(subfolder or ''), - model_class, - ]) - def _has_cuda(self) -> bool: return self.execution_device.type == 'cuda' def _print_cuda_stats(self): vram = "%4.2fG" % (torch.cuda.memory_allocated() / GIG) ram = "%4.2fG" % (self.current_cache_size / GIG) - cached_models = len(self.models) - loaded_models = len(self.loaded_models) - locked_models = len([x for x in self.locked_models if self.locked_models[x]>0]) - logger.debug(f"Current VRAM/RAM usage: {vram}/{ram}; cached_models/loaded_models/locked_models = {cached_models}/{loaded_models}/{locked_models}") - def _make_cache_room(self, key, model_type): + loaded_models = 0 + locked_models = 0 + for cache_entry in self.models.values(): + if cache_entry.loaded: + loaded_models += 1 + if cache_entry.locked: + locked_models += 1 + + logger.debug(f"Current VRAM/RAM usage: {vram}/{ram}; locked_models/loaded_models = {locked_models}/{loaded_models}") + + def _make_cache_room(self, model_size): # calculate how much memory this model will require - multiplier = 2 if self.precision==torch.float32 else 1 - bytes_needed = int(self.model_sizes.get(key,0) or SIZE_GUESSTIMATE.get(model_type,0.5)*GIG*multiplier) + #multiplier = 2 if self.precision==torch.float32 else 1 + bytes_needed = model_size maximum_size = self.max_cache_size * GIG # stored in GB, convert to bytes current_size = self.current_cache_size - adjective = 'guesstimated' if key not in self.model_sizes else 'known from previous load' - logger.debug(f'{(bytes_needed/GIG):.2f} GB needed to load this model ({adjective})') - while current_size+bytes_needed > maximum_size: - if least_recently_used_key := self.stack.pop(0): - model_size = self.model_sizes.get(least_recently_used_key,0) - logger.debug(f'Max cache size exceeded: cache_size={(current_size/GIG):.2f} GB, need an additional {(bytes_needed/GIG):.2f} GB') - logger.debug(f'Unloading model {least_recently_used_key} to free {(model_size/GIG):.2f} GB') - self.uncache_model(least_recently_used_key) - current_size -= model_size + if current_size + bytes_needed > maximum_size: + logger.debug(f'Max cache size exceeded: {(current_size/GIG):.2f}/{self.max_cache_size:.2f} GB, need an additional {(bytes_needed/GIG):.2f} GB') + + pos = 0 + while current_size + bytes_needed > maximum_size and current_size > 0 and len(self.stack) > 0 and pos < len(self.stack): + model_key = self.stack[pos] + cache_entry = self.models[model_key] + if not cache_entry.locked: + logger.debug(f'Unloading model {model_key} to free {(model_size/GIG):.2f} GB (-{(cache_entry.size/GIG):.2f} GB)') + self.uncache_model(model_key) # del self.stack[pos] + current_size -= cache_entry.size + else: + pos += 1 + self.current_cache_size = current_size gc.collect() def _offload_unlocked_models(self): - to_offload = set() - for key in self.loaded_models: - if key not in self.locked_models or self.locked_models[key] == 0: + for key in self.models.keys(): + cache_entry = self.models[key] + if not cache_entry.locked and cache_entry.loaded: self.logger.debug(f'Offloading {key} from {self.execution_device} into {self.storage_device}') - to_offload.add(key) - for key in to_offload: - self.models[key].to(self.storage_device) - self.loaded_models.remove(key) - - def _load_model_from_storage( - self, - repo_id_or_path: Union[str, Path], - subfolder: Optional[Path] = None, - revision: Optional[str] = None, - model_type: SDModelType = SDModelType.Diffusers, - ) -> ModelClass: - ''' - Load and return a HuggingFace model. - :param repo_id_or_path: either the HuggingFace repo_id or a Path to a local model - :param subfolder: name of a subfolder in which the model can be found, e.g. "vae" - :param revision: model revision - :param model_type: type of model to return, defaults to SDModelType.Diffusers - ''' - # silence transformer and diffuser warnings - with SilenceWarnings(): - if model_type==SDModelType.Lora: - model = self._load_lora_from_storage(repo_id_or_path) - elif model_type==SDModelType.TextualInversion: - model = self._load_ti_from_storage(repo_id_or_path) - else: - model = self._load_diffusers_from_storage( - repo_id_or_path, - subfolder, - revision, - model_type, - ) - if self.sequential_offload and isinstance(model, StableDiffusionGeneratorPipeline): - model.enable_offload_submodels(self.execution_device) - return model - - def _load_diffusers_from_storage( - self, - repo_id_or_path: Union[str, Path], - subfolder: Optional[Path] = None, - revision: Optional[str] = None, - model_type: ModelClass = StableDiffusionGeneratorPipeline, - ) -> ModelClass: - ''' - Load and return a HuggingFace model using from_pretrained(). - :param repo_id_or_path: either the HuggingFace repo_id or a Path to a local model - :param subfolder: name of a subfolder in which the model can be found, e.g. "vae" - :param revision: model revision - :param model_class: class of model to return, defaults to StableDiffusionGeneratorPIpeline - ''' - - model_class = MODEL_CLASSES[model_type] - - if revision is not None: - revisions = [revision] - elif self.precision == torch.float16: - revisions = ['fp16', 'main'] - else: - revisions = ['main'] - - extra_args = dict() - if model_class in DiffusionClasses: - extra_args.update( - torch_dtype=self.precision, - ) - if model_class == StableDiffusionGeneratorPipeline: - extra_args.update( - safety_checker=None, - ) - - for rev in revisions: - try: - model = model_class.from_pretrained( - repo_id_or_path, - revision=rev, - subfolder=subfolder or '.', - cache_dir=global_cache_dir('hub'), - **extra_args, - ) - self.logger.debug(f'Found revision {rev}') - break - except OSError: - pass - return model - - def _load_lora_from_storage(self, lora_path: Path) -> LoraType: - assert False, "_load_lora_from_storage() is not yet implemented" - - def _load_ti_from_storage(self, lora_path: Path) -> TIType: - assert False, "_load_ti_from_storage() is not yet implemented" - - def _legacy_model_hash(self, checkpoint_path: Union[str, Path]) -> str: - sha = hashlib.sha256() - path = Path(checkpoint_path) - assert path.is_file(),f"File {checkpoint_path} not found" - - hashpath = path.parent / f"{path.name}.sha256" - if hashpath.exists() and path.stat().st_mtime <= hashpath.stat().st_mtime: - with open(hashpath) as f: - hash = f.read() - return hash - - logger.debug(f'computing hash of model {path.name}') - with open(path, "rb") as f: - while chunk := f.read(self.sha_chunksize): - sha.update(chunk) - hash = sha.hexdigest() - - with open(hashpath, "w") as f: - f.write(hash) - return hash + cache_entry.model.to(self.storage_device) def _local_model_hash(self, model_path: Union[str, Path]) -> str: sha = hashlib.sha256() @@ -649,31 +746,6 @@ class ModelCache(object): raise KeyError(f"Revision '{revision}' not found in {repo_id}") return desired_revisions[0].target_commit - @staticmethod - def calc_model_size(model) -> int: - if isinstance(model,DiffusionPipeline): - return ModelCache._calc_pipeline(model) - elif isinstance(model,torch.nn.Module): - return ModelCache._calc_model(model) - else: - return None - - @staticmethod - def _calc_pipeline(pipeline) -> int: - res = 0 - for submodel_key in pipeline.components.keys(): - submodel = getattr(pipeline, submodel_key) - if submodel is not None and isinstance(submodel, torch.nn.Module): - res += ModelCache._calc_model(submodel) - return res - - @staticmethod - def _calc_model(model) -> int: - mem_params = sum([param.nelement()*param.element_size() for param in model.parameters()]) - mem_bufs = sum([buf.nelement()*buf.element_size() for buf in model.buffers()]) - mem = mem_params + mem_bufs # in bytes - return mem - class SilenceWarnings(object): def __init__(self): self.transformers_verbosity = transformers_logging.get_verbosity() diff --git a/invokeai/backend/model_management/model_manager.py b/invokeai/backend/model_management/model_manager.py index 98f6eb39bd..6fe1c38168 100644 --- a/invokeai/backend/model_management/model_manager.py +++ b/invokeai/backend/model_management/model_manager.py @@ -15,8 +15,6 @@ return a SDModelInfo object that contains the following attributes: * revision -- revision of the model if coming from a repo id, e.g. 'fp16' * precision -- torch precision of the model - * status -- a ModelStatus enum corresponding to one of - 'not_loaded', 'in_ram', 'in_vram' or 'active' Typical usage: @@ -157,8 +155,8 @@ from invokeai.backend.globals import (Globals, global_cache_dir, from invokeai.backend.util import download_with_resume from ..util import CUDA_DEVICE -from .model_cache import (ModelCache, ModelLocker, ModelStatus, SDModelType, - SilenceWarnings, DIFFUSERS_PARTS) +from .model_cache import (ModelCache, ModelLocker, SDModelType, + SilenceWarnings) # We are only starting to number the config file with release 3. # The config file version doesn't have to start at release version, but it will help @@ -174,7 +172,6 @@ class SDModelInfo(): hash: str location: Union[Path,str] precision: torch.dtype - subfolder: Path = None revision: str = None _cache: ModelCache = None @@ -183,17 +180,6 @@ class SDModelInfo(): def __exit__(self,*args, **kwargs): self.context.__exit__(*args, **kwargs) - - @property - def status(self)->ModelStatus: - '''Return load status of this model as a model_cache.ModelStatus enum''' - if not self._cache: - return ModelStatus.unknown - return self._cache.status( - self.location, - revision = self.revision, - subfolder = self.subfolder - ) class InvalidModelError(Exception): "Raised when an invalid model is requested" @@ -355,7 +341,6 @@ class ModelManager(object): or global_resolve_path(mconfig.get('weights') ) - subfolder = mconfig.get('subfolder') revision = mconfig.get('revision') hash = self.cache.model_hash(location, revision) @@ -367,31 +352,18 @@ class ModelManager(object): model_type = submodel submodel = None - # We don't need to load whole model if the user is asking for just a piece of it - elif model_type == SDModelType.Diffusers and submodel and not subfolder: - model_type = submodel - subfolder = submodel.value - submodel = None - # to support the traditional way of attaching a VAE # to a model, we hacked in `attach_model_part` - # TODO: generalize this - external_parts = set() - if model_type == SDModelType.Diffusers: - for part in DIFFUSERS_PARTS: - with suppress(Exception): - if part_config := mconfig.get(part): - id = part_config.get('path') or part_config.get('repo_id') - subfolder = part_config.get('subfolder') - external_parts.add((part, id, subfolder)) + # TODO: + if model_type == SDModelType.Vae and "vae" in mconfig: + print("NOT_IMPLEMENTED - RETURN CUSTOM VAE") + model_context = self.cache.get_model( location, model_type = model_type, revision = revision, - subfolder = subfolder, submodel = submodel, - attach_model_parts = external_parts, ) # in case we need to communicate information about this @@ -407,7 +379,6 @@ class ModelManager(object): location = location, revision = revision, precision = self.cache.precision, - subfolder = subfolder, _cache = self.cache ) @@ -513,18 +484,13 @@ class ModelManager(object): model_format = stanza.get('format') # Common Attribs - status = self.cache.status( - stanza.get('weights') or stanza.get('repo_id'), - revision=stanza.get('revision'), - subfolder=stanza.get('subfolder'), - ) description = stanza.get("description", None) models[stanza_type][model_name].update( model_name=model_name, model_type=stanza_type, format=model_format, description=description, - status=status.value, + status="unknown", # TODO: no more status as model loaded separately ) # Checkpoint Config Parse diff --git a/invokeai/frontend/web/src/services/thunks/model.ts b/invokeai/frontend/web/src/services/thunks/model.ts index 84f7a24e81..97008d8092 100644 --- a/invokeai/frontend/web/src/services/thunks/model.ts +++ b/invokeai/frontend/web/src/services/thunks/model.ts @@ -14,7 +14,7 @@ export const receivedModels = createAppAsyncThunk( const response = await ModelsService.listModels(); const deserializedModels = reduce( - response.models, + response.models['diffusers'], (modelsAccumulator, model, modelName) => { modelsAccumulator[modelName] = { ...model, name: modelName }; @@ -23,7 +23,10 @@ export const receivedModels = createAppAsyncThunk( {} as Record ); - models.info({ response }, `Received ${size(response.models)} models`); + models.info( + { response }, + `Received ${size(response.models['diffusers'])} models` + ); return deserializedModels; }