Compare commits

...

57 Commits

Author SHA1 Message Date
Ryan Dick
f01e41ceaf First pass at dynamically calculating the working memory requirements for the VAE decoding operation. Still need to tune SD3 and FLUX. 2024-12-19 15:26:16 -05:00
Ryan Dick
609ed06265 Add AutoencoderKL to the list of models that opt-out of partial loading. 2024-12-19 15:25:23 -05:00
Ryan Dick
f9e899a6ba Make pinned pytorch version slightly more specific. We need at least 2.4 for access to torch.nn.functional.rms_norm(...). 2024-12-19 14:03:01 -05:00
Ryan Dick
9262c0ec53 Do not raise if a cache entry is deleted twice and ensure that OOM errors propagate up the stack. 2024-12-19 18:32:01 +00:00
Ryan Dick
7fddb06dc4 Add a list of models that opt-out of partial loading. 2024-12-19 16:00:56 +00:00
Ryan Dick
239297caf6 Tidy the API for overriding the working_mem_bytes for a particular operation. 2024-12-19 05:05:04 +00:00
Ryan Dick
20f0b2f4fa Update app config docstring. 2024-12-19 04:33:26 +00:00
Ryan Dick
cfb8815355 Remove unused and outdated get_cache_size and set_cache_size endpoints. 2024-12-19 04:06:08 +00:00
Ryan Dick
c866b5a799 Allow legacy ram/vram configs to override default behavior if set. 2024-12-19 04:06:08 +00:00
Ryan Dick
3b76812d43 Only support partial model loading on CUDA. 2024-12-18 19:13:15 -05:00
Ryan Dick
a8f3471fc7 Drop models from the cache if we fail loading/unloading them. 2024-12-18 23:53:25 +00:00
Ryan Dick
6d8dee05a9 Use the cpu state dict strategy for managing CachedModelOnlyFullLoad memory. 2024-12-18 22:52:57 +00:00
Ryan Dick
e684e49299 Do not apply the autocast context when models are fully loaded onto the GPU - it adds some overhead. 2024-12-18 21:51:39 +00:00
Ryan Dick
4ce2042d65 Add remove_autocast_from_module_forward(...) utility. 2024-12-18 20:28:32 +00:00
Ryan Dick
05a50b557a Update logic to enforce max size of RAM cache to avoid overfilling. 2024-12-18 20:21:38 +00:00
Ryan Dick
85e1e9587e Add info logs each time a model is loaded. 2024-12-18 19:52:54 +00:00
Ryan Dick
8e763e87bb Allow invocations to request more working VRAM when loading a model via the ModelCache. 2024-12-18 19:52:34 +00:00
Ryan Dick
4a4360a40c Add enable_partial_loading config. 2024-12-18 17:17:08 +00:00
Ryan Dick
612d6b00e3 In FluxTextEncoderInvocation, make sure model is locked before loading next model. 2024-12-18 17:12:12 +00:00
Ryan Dick
7a5dd084ad Update MPS cache limit logic. 2024-12-17 23:44:17 -05:00
Ryan Dick
79a4d0890f WIP - add device_working_mem_gb config 2024-12-18 03:31:37 +00:00
Ryan Dick
e0c899104b Consolidate the LayerPatching patching modes into a single implementation. 2024-12-17 18:33:36 +00:00
Ryan Dick
c37bb6375c Rename model_patcher.py -> layer_patcher.py. 2024-12-17 17:19:12 +00:00
Ryan Dick
4716170988 Use torch.device('cpu') instead of 'cpu' when calling .to(), because some custom models don't support the latter. 2024-12-17 17:14:42 +00:00
Ryan Dick
463196d781 Update apply_smart_model_patches() so that layer restore matches the behavior of non-smart mode. 2024-12-17 17:13:45 +00:00
Ryan Dick
e1e756800d Enable LoRAPatcher.apply_smart_lora_patches(...) throughout the stack. 2024-12-17 15:50:51 +00:00
Ryan Dick
ab337594b8 (minor) Rename num_layers -> num_loras in unit tests. 2024-12-17 15:39:01 +00:00
Ryan Dick
699e4e5995 Add test_apply_smart_lora_patches_to_partially_loaded_model(...). 2024-12-17 15:32:51 +00:00
Ryan Dick
33f17520ca Add LoRAPatcher.smart_apply_lora_patches() 2024-12-17 15:29:04 +00:00
Ryan Dick
46d061212c Update CachedModelWithPartialLoad to operate on state_dicts rather than moving torch.nn.Modules around. 2024-12-17 15:18:55 +00:00
Ryan Dick
829dddefc8 Bump bitsandbytes. The new verson contains improvements to state_dict loading/saving for LLM.int8 and promises improved speed on some HW. 2024-12-17 15:18:55 +00:00
Ryan Dick
b6c159cfdb Fix bug with partial offload of model buffers. 2024-12-17 15:18:55 +00:00
Ryan Dick
5a31c467a3 Fix bug in ModelCache that was causing it to offload more models from VRAM than necessary. 2024-12-17 15:18:55 +00:00
Ryan Dick
13dbde2429 Fix handling of torch.nn.Module buffers in CachedModelWithPartialLoad. 2024-12-17 15:18:55 +00:00
Ryan Dick
a8ee72d7fb Maintain a read-only CPU state dict copy in CachedModelWithPartialLoad. 2024-12-17 15:18:55 +00:00
Ryan Dick
7a002e1b05 Memoize frequently accessed values in CachedModelWithPartialLoad. 2024-12-17 15:18:55 +00:00
Ryan Dick
b50dd8502f More ModelCache logging improvements. 2024-12-17 15:18:55 +00:00
Ryan Dick
f4c13b057d Cleanup of ModelCache and added a bunch of debug logging. 2024-12-17 15:18:55 +00:00
Ryan Dick
cb884ee567 Fix a couple of bugs to get basic vanilla partial model load working with the model cache. 2024-12-17 15:18:55 +00:00
Ryan Dick
050d4465e6 WIP - first pass at overhauling ModelCache to work with partial loads. 2024-12-17 15:18:55 +00:00
Ryan Dick
e48bb844b9 Delete experimental torch device autocasting solutions and clean up TorchFunctionAutocastDeviceContext. 2024-12-17 15:18:55 +00:00
Ryan Dick
57eb05983b Create CachedModelOnlyFullLoad class. 2024-12-17 15:18:55 +00:00
Ryan Dick
dc3be08653 Move CachedModelWithPartialLoad into the main model_cache/ directory. 2024-12-17 15:18:55 +00:00
Ryan Dick
ae1041286f Get rid of ModelLocker. It was an unnecessary layer of indirection. 2024-12-17 15:18:55 +00:00
Ryan Dick
6e270cc5bf Move lock(...) and unlock(...) logic from ModelLocker to the ModelCache and make a bunch of ModelCache properties/methods private. 2024-12-17 15:18:55 +00:00
Ryan Dick
6dc447aba8 Pull get_model_cache_key(...) out of ModelCache. The ModelCache should not be concerned with implementation details like the submodel_type. 2024-12-17 15:18:55 +00:00
Ryan Dick
a4c0fcb6c8 Rename model_cache_default.py -> model_cache.py. 2024-12-17 15:18:55 +00:00
Ryan Dick
1f3580716c Remove ModelCacheBase. 2024-12-17 15:18:55 +00:00
Ryan Dick
405e53f80a Move CacheStats to its own file. 2024-12-17 15:18:55 +00:00
Ryan Dick
be120ff587 Move CacheRecord out to its own file. 2024-12-17 15:18:55 +00:00
Ryan Dick
f8a3002d34 Rip out ModelLockerBase. 2024-12-17 15:18:55 +00:00
Ryan Dick
c785282c94 Tidy up CachedModel and improve unit test coverage. 2024-12-17 15:18:54 +00:00
Ryan Dick
f4fd3e0cc9 Alternative implementation with torch.nn.Linear module streaming. 2024-12-17 15:18:54 +00:00
Ryan Dick
ae04fa5e60 Add TorchFunctionAutocastContext 2024-12-17 15:18:54 +00:00
Ryan Dick
838e1e1438 Remove debug logs. 2024-12-17 15:18:54 +00:00
Ryan Dick
e3e8e95da6 Add basic CachedModel class with features for partial load/unload. 2024-12-17 15:18:54 +00:00
Ryan Dick
030832f30b Naive TorchAutocastContext. 2024-12-17 15:18:54 +00:00
46 changed files with 1827 additions and 1258 deletions

View File

@@ -1364,7 +1364,6 @@ the in-memory loaded model:
|----------------|-----------------|------------------|
| `config` | AnyModelConfig | A copy of the model's configuration record for retrieving base type, etc. |
| `model` | AnyModel | The instantiated model (details below) |
| `locker` | ModelLockerBase | A context manager that mediates the movement of the model into VRAM |
### get_model_by_key(key, [submodel]) -> LoadedModel

View File

@@ -4,7 +4,6 @@
import contextlib
import io
import pathlib
import shutil
import traceback
from copy import deepcopy
from enum import Enum
@@ -21,7 +20,6 @@ from starlette.exceptions import HTTPException
from typing_extensions import Annotated
from invokeai.app.api.dependencies import ApiDependencies
from invokeai.app.services.config import get_config
from invokeai.app.services.model_images.model_images_common import ModelImageFileNotFoundException
from invokeai.app.services.model_install.model_install_common import ModelInstallJob
from invokeai.app.services.model_records import (
@@ -37,7 +35,7 @@ from invokeai.backend.model_manager.config import (
ModelFormat,
ModelType,
)
from invokeai.backend.model_manager.load.model_cache.model_cache_base import CacheStats
from invokeai.backend.model_manager.load.model_cache.cache_stats import CacheStats
from invokeai.backend.model_manager.metadata.fetch.huggingface import HuggingFaceMetadataFetch
from invokeai.backend.model_manager.metadata.metadata_base import ModelMetadataWithFiles, UnknownMetadataException
from invokeai.backend.model_manager.search import ModelSearch
@@ -848,74 +846,6 @@ async def get_starter_models() -> StarterModelResponse:
return StarterModelResponse(starter_models=starter_models, starter_bundles=starter_bundles)
@model_manager_router.get(
"/model_cache",
operation_id="get_cache_size",
response_model=float,
summary="Get maximum size of model manager RAM or VRAM cache.",
)
async def get_cache_size(cache_type: CacheType = Query(description="The cache type", default=CacheType.RAM)) -> float:
"""Return the current RAM or VRAM cache size setting (in GB)."""
cache = ApiDependencies.invoker.services.model_manager.load.ram_cache
value = 0.0
if cache_type == CacheType.RAM:
value = cache.max_cache_size
elif cache_type == CacheType.VRAM:
value = cache.max_vram_cache_size
return value
@model_manager_router.put(
"/model_cache",
operation_id="set_cache_size",
response_model=float,
summary="Set maximum size of model manager RAM or VRAM cache, optionally writing new value out to invokeai.yaml config file.",
)
async def set_cache_size(
value: float = Query(description="The new value for the maximum cache size"),
cache_type: CacheType = Query(description="The cache type", default=CacheType.RAM),
persist: bool = Query(description="Write new value out to invokeai.yaml", default=False),
) -> float:
"""Set the current RAM or VRAM cache size setting (in GB). ."""
cache = ApiDependencies.invoker.services.model_manager.load.ram_cache
app_config = get_config()
# Record initial state.
vram_old = app_config.vram
ram_old = app_config.ram
# Prepare target state.
vram_new = vram_old
ram_new = ram_old
if cache_type == CacheType.RAM:
ram_new = value
elif cache_type == CacheType.VRAM:
vram_new = value
else:
raise ValueError(f"Unexpected {cache_type=}.")
config_path = app_config.config_file_path
new_config_path = config_path.with_suffix(".yaml.new")
try:
# Try to apply the target state.
cache.max_vram_cache_size = vram_new
cache.max_cache_size = ram_new
app_config.ram = ram_new
app_config.vram = vram_new
if persist:
app_config.write_file(new_config_path)
shutil.move(new_config_path, config_path)
except Exception as e:
# If there was a failure, restore the initial state.
cache.max_cache_size = ram_old
cache.max_vram_cache_size = vram_old
app_config.ram = ram_old
app_config.vram = vram_old
raise RuntimeError("Failed to update cache size") from e
return value
@model_manager_router.get(
"/stats",
operation_id="get_stats",

View File

@@ -20,8 +20,8 @@ from invokeai.app.invocations.primitives import ConditioningOutput
from invokeai.app.services.shared.invocation_context import InvocationContext
from invokeai.app.util.ti_utils import generate_ti_list
from invokeai.backend.model_patcher import ModelPatcher
from invokeai.backend.patches.layer_patcher import LayerPatcher
from invokeai.backend.patches.model_patch_raw import ModelPatchRaw
from invokeai.backend.patches.model_patcher import LayerPatcher
from invokeai.backend.stable_diffusion.diffusion.conditioning_data import (
BasicConditioningInfo,
ConditioningFieldData,
@@ -82,10 +82,11 @@ class CompelInvocation(BaseInvocation):
# apply all patches while the model is on the target device
text_encoder_info.model_on_device() as (cached_weights, text_encoder),
tokenizer_info as tokenizer,
LayerPatcher.apply_model_patches(
LayerPatcher.apply_smart_model_patches(
model=text_encoder,
patches=_lora_loader(),
prefix="lora_te_",
dtype=TorchDevice.choose_torch_dtype(),
cached_weights=cached_weights,
),
# Apply CLIP Skip after LoRA to prevent LoRA application from failing on skipped layers.
@@ -179,10 +180,11 @@ class SDXLPromptInvocationBase:
# apply all patches while the model is on the target device
text_encoder_info.model_on_device() as (cached_weights, text_encoder),
tokenizer_info as tokenizer,
LayerPatcher.apply_model_patches(
text_encoder,
LayerPatcher.apply_smart_model_patches(
model=text_encoder,
patches=_lora_loader(),
prefix=lora_prefix,
dtype=TorchDevice.choose_torch_dtype(),
cached_weights=cached_weights,
),
# Apply CLIP Skip after LoRA to prevent LoRA application from failing on skipped layers.

View File

@@ -39,8 +39,8 @@ from invokeai.app.util.controlnet_utils import prepare_control_image
from invokeai.backend.ip_adapter.ip_adapter import IPAdapter
from invokeai.backend.model_manager import BaseModelType, ModelVariantType
from invokeai.backend.model_patcher import ModelPatcher
from invokeai.backend.patches.layer_patcher import LayerPatcher
from invokeai.backend.patches.model_patch_raw import ModelPatchRaw
from invokeai.backend.patches.model_patcher import LayerPatcher
from invokeai.backend.stable_diffusion import PipelineIntermediateState
from invokeai.backend.stable_diffusion.denoise_context import DenoiseContext, DenoiseInputs
from invokeai.backend.stable_diffusion.diffusers_pipeline import (
@@ -1003,10 +1003,11 @@ class DenoiseLatentsInvocation(BaseInvocation):
ModelPatcher.apply_freeu(unet, self.unet.freeu_config),
SeamlessExt.static_patch_model(unet, self.unet.seamless_axes), # FIXME
# Apply the LoRA after unet has been moved to its target device for faster patching.
LayerPatcher.apply_model_patches(
LayerPatcher.apply_smart_model_patches(
model=unet,
patches=_lora_loader(),
prefix="lora_unet_",
dtype=unet.dtype,
cached_weights=cached_weights,
),
):

View File

@@ -48,9 +48,9 @@ from invokeai.backend.flux.sampling_utils import (
)
from invokeai.backend.flux.text_conditioning import FluxTextConditioning
from invokeai.backend.model_manager.config import ModelFormat
from invokeai.backend.patches.layer_patcher import LayerPatcher
from invokeai.backend.patches.lora_conversions.flux_lora_constants import FLUX_LORA_TRANSFORMER_PREFIX
from invokeai.backend.patches.model_patch_raw import ModelPatchRaw
from invokeai.backend.patches.model_patcher import LayerPatcher
from invokeai.backend.stable_diffusion.diffusers_pipeline import PipelineIntermediateState
from invokeai.backend.stable_diffusion.diffusion.conditioning_data import FLUXConditioningInfo
from invokeai.backend.util.devices import TorchDevice
@@ -301,36 +301,33 @@ class FluxDenoiseInvocation(BaseInvocation, WithMetadata, WithBoard):
config = transformer_info.config
assert config is not None
# Apply LoRA models to the transformer.
# Note: We apply the LoRA after the transformer has been moved to its target device for faster patching.
# Determine if the model is quantized.
# If the model is quantized, then we need to apply the LoRA weights as sidecar layers. This results in
# slower inference than direct patching, but is agnostic to the quantization format.
if config.format in [ModelFormat.Checkpoint]:
# The model is non-quantized, so we can apply the LoRA weights directly into the model.
exit_stack.enter_context(
LayerPatcher.apply_model_patches(
model=transformer,
patches=self._lora_iterator(context),
prefix=FLUX_LORA_TRANSFORMER_PREFIX,
cached_weights=cached_weights,
)
)
model_is_quantized = False
elif config.format in [
ModelFormat.BnbQuantizedLlmInt8b,
ModelFormat.BnbQuantizednf4b,
ModelFormat.GGUFQuantized,
]:
# The model is quantized, so apply the LoRA weights as sidecar layers. This results in slower inference,
# than directly patching the weights, but is agnostic to the quantization format.
exit_stack.enter_context(
LayerPatcher.apply_model_sidecar_patches(
model=transformer,
patches=self._lora_iterator(context),
prefix=FLUX_LORA_TRANSFORMER_PREFIX,
dtype=inference_dtype,
)
)
model_is_quantized = True
else:
raise ValueError(f"Unsupported model format: {config.format}")
# Apply LoRA models to the transformer.
# Note: We apply the LoRA after the transformer has been moved to its target device for faster patching.
exit_stack.enter_context(
LayerPatcher.apply_smart_model_patches(
model=transformer,
patches=self._lora_iterator(context),
prefix=FLUX_LORA_TRANSFORMER_PREFIX,
dtype=inference_dtype,
cached_weights=cached_weights,
force_sidecar_patching=model_is_quantized,
)
)
# Prepare IP-Adapter extensions.
pos_ip_adapter_extensions, neg_ip_adapter_extensions = self._prep_ip_adapter_extensions(
pos_image_prompt_clip_embeds=pos_image_prompt_clip_embeds,

View File

@@ -18,10 +18,11 @@ from invokeai.app.invocations.primitives import FluxConditioningOutput
from invokeai.app.services.shared.invocation_context import InvocationContext
from invokeai.backend.flux.modules.conditioner import HFEncoder
from invokeai.backend.model_manager.config import ModelFormat
from invokeai.backend.patches.layer_patcher import LayerPatcher
from invokeai.backend.patches.lora_conversions.flux_lora_constants import FLUX_LORA_CLIP_PREFIX
from invokeai.backend.patches.model_patch_raw import ModelPatchRaw
from invokeai.backend.patches.model_patcher import LayerPatcher
from invokeai.backend.stable_diffusion.diffusion.conditioning_data import ConditioningFieldData, FLUXConditioningInfo
from invokeai.backend.util.devices import TorchDevice
@invocation(
@@ -69,14 +70,11 @@ class FluxTextEncoderInvocation(BaseInvocation):
)
def _t5_encode(self, context: InvocationContext) -> torch.Tensor:
t5_tokenizer_info = context.models.load(self.t5_encoder.tokenizer)
t5_text_encoder_info = context.models.load(self.t5_encoder.text_encoder)
prompt = [self.prompt]
with (
t5_text_encoder_info as t5_text_encoder,
t5_tokenizer_info as t5_tokenizer,
context.models.load(self.t5_encoder.text_encoder) as t5_text_encoder,
context.models.load(self.t5_encoder.tokenizer) as t5_tokenizer,
):
assert isinstance(t5_text_encoder, T5EncoderModel)
assert isinstance(t5_tokenizer, T5Tokenizer)
@@ -90,14 +88,12 @@ class FluxTextEncoderInvocation(BaseInvocation):
return prompt_embeds
def _clip_encode(self, context: InvocationContext) -> torch.Tensor:
clip_tokenizer_info = context.models.load(self.clip.tokenizer)
clip_text_encoder_info = context.models.load(self.clip.text_encoder)
prompt = [self.prompt]
clip_text_encoder_info = context.models.load(self.clip.text_encoder)
with (
clip_text_encoder_info.model_on_device() as (cached_weights, clip_text_encoder),
clip_tokenizer_info as clip_tokenizer,
context.models.load(self.clip.tokenizer) as clip_tokenizer,
ExitStack() as exit_stack,
):
assert isinstance(clip_text_encoder, CLIPTextModel)
@@ -111,10 +107,11 @@ class FluxTextEncoderInvocation(BaseInvocation):
if clip_text_encoder_config.format in [ModelFormat.Diffusers]:
# The model is non-quantized, so we can apply the LoRA weights directly into the model.
exit_stack.enter_context(
LayerPatcher.apply_model_patches(
LayerPatcher.apply_smart_model_patches(
model=clip_text_encoder,
patches=self._clip_lora_iterator(context),
prefix=FLUX_LORA_CLIP_PREFIX,
dtype=TorchDevice.choose_torch_dtype(),
cached_weights=cached_weights,
)
)

View File

@@ -3,6 +3,7 @@ from einops import rearrange
from PIL import Image
from invokeai.app.invocations.baseinvocation import BaseInvocation, invocation
from invokeai.app.invocations.constants import LATENT_SCALE_FACTOR
from invokeai.app.invocations.fields import (
FieldDescriptions,
Input,
@@ -38,8 +39,22 @@ class FluxVaeDecodeInvocation(BaseInvocation, WithMetadata, WithBoard):
input=Input.Connection,
)
def _estimate_working_memory(self, latents: torch.Tensor, vae: AutoEncoder) -> int:
"""Estimate the working memory required by the invocation in bytes."""
# It was found experimentally that the peak working memory scales linearly with the number of pixels and the
# element size (precision). This estimate is accurate for both SD1 and SDXL.
out_h = LATENT_SCALE_FACTOR * latents.shape[-2]
out_w = LATENT_SCALE_FACTOR * latents.shape[-1]
element_size = next(vae.parameters()).element_size()
# TODO(ryand): Need to tune this value, it was copied from the SD1 implementation.
scaling_constant = 960 # Determined experimentally.
working_memory = out_h * out_w * element_size * scaling_constant
return working_memory
def _vae_decode(self, vae_info: LoadedModel, latents: torch.Tensor) -> Image.Image:
with vae_info as vae:
estimated_working_memory = self._estimate_working_memory(latents, vae_info.model)
with vae_info.model_on_device(working_mem_bytes=estimated_working_memory) as (_, vae):
assert isinstance(vae, AutoEncoder)
vae_dtype = next(iter(vae.parameters())).dtype
latents = latents.to(device=TorchDevice.choose_torch_device(), dtype=vae_dtype)

View File

@@ -34,7 +34,7 @@ from invokeai.backend.util.devices import TorchDevice
title="Latents to Image",
tags=["latents", "image", "vae", "l2i"],
category="latents",
version="1.3.0",
version="1.3.1",
)
class LatentsToImageInvocation(BaseInvocation, WithMetadata, WithBoard):
"""Generates an image from latents."""
@@ -53,13 +53,32 @@ class LatentsToImageInvocation(BaseInvocation, WithMetadata, WithBoard):
tile_size: int = InputField(default=0, multiple_of=8, description=FieldDescriptions.vae_tile_size)
fp32: bool = InputField(default=DEFAULT_PRECISION == torch.float32, description=FieldDescriptions.fp32)
def _estimate_working_memory(self, latents: torch.Tensor) -> int:
"""Estimate the working memory required by the invocation in bytes."""
# It was found experimentally that the peak working memory scales linearly with the number of pixels and the
# element size (precision). This estimate is accurate for both SD1 and SDXL.
out_h = LATENT_SCALE_FACTOR * latents.shape[-2]
out_w = LATENT_SCALE_FACTOR * latents.shape[-1]
element_size = 4 if self.fp32 else 2
scaling_constant = 960 # Determined experimentally.
working_memory = out_h * out_w * element_size * scaling_constant
if self.fp32:
# If we are running in FP32, then we should account for the likely increase in model size (~250MB).
working_memory += 250 * 2**20
return working_memory
@torch.no_grad()
def invoke(self, context: InvocationContext) -> ImageOutput:
latents = context.tensors.load(self.latents.latents_name)
vae_info = context.models.load(self.vae.vae)
assert isinstance(vae_info.model, (AutoencoderKL, AutoencoderTiny))
with SeamlessExt.static_patch_model(vae_info.model, self.vae.seamless_axes), vae_info as vae:
with (
SeamlessExt.static_patch_model(vae_info.model, self.vae.seamless_axes),
vae_info.model_on_device(working_mem_bytes=self._estimate_working_memory(latents)) as (_, vae),
):
context.util.signal_progress("Running VAE decoder")
assert isinstance(vae, (AutoencoderKL, AutoencoderTiny))
latents = latents.to(vae.device)

View File

@@ -6,6 +6,7 @@ from einops import rearrange
from PIL import Image
from invokeai.app.invocations.baseinvocation import BaseInvocation, invocation
from invokeai.app.invocations.constants import LATENT_SCALE_FACTOR
from invokeai.app.invocations.fields import (
FieldDescriptions,
Input,
@@ -26,7 +27,7 @@ from invokeai.backend.util.devices import TorchDevice
title="SD3 Latents to Image",
tags=["latents", "image", "vae", "l2i", "sd3"],
category="latents",
version="1.3.0",
version="1.3.1",
)
class SD3LatentsToImageInvocation(BaseInvocation, WithMetadata, WithBoard):
"""Generates an image from latents."""
@@ -40,13 +41,30 @@ class SD3LatentsToImageInvocation(BaseInvocation, WithMetadata, WithBoard):
input=Input.Connection,
)
def _estimate_working_memory(self, latents: torch.Tensor, vae: AutoencoderKL) -> int:
"""Estimate the working memory required by the invocation in bytes."""
# It was found experimentally that the peak working memory scales linearly with the number of pixels and the
# element size (precision). This estimate is accurate for both SD1 and SDXL.
out_h = LATENT_SCALE_FACTOR * latents.shape[-2]
out_w = LATENT_SCALE_FACTOR * latents.shape[-1]
element_size = next(vae.parameters()).element_size()
# TODO(ryand): Need to tune this value, it was copied from the SD1 implementation.
scaling_constant = 960 # Determined experimentally.
working_memory = out_h * out_w * element_size * scaling_constant
return working_memory
@torch.no_grad()
def invoke(self, context: InvocationContext) -> ImageOutput:
latents = context.tensors.load(self.latents.latents_name)
vae_info = context.models.load(self.vae.vae)
assert isinstance(vae_info.model, (AutoencoderKL))
with SeamlessExt.static_patch_model(vae_info.model, self.vae.seamless_axes), vae_info as vae:
estimated_working_memory = self._estimate_working_memory(latents, vae_info.model)
with (
SeamlessExt.static_patch_model(vae_info.model, self.vae.seamless_axes),
vae_info.model_on_device(working_mem_bytes=estimated_working_memory) as (_, vae),
):
context.util.signal_progress("Running VAE")
assert isinstance(vae, (AutoencoderKL))
latents = latents.to(vae.device)

View File

@@ -17,10 +17,11 @@ from invokeai.app.invocations.model import CLIPField, T5EncoderField
from invokeai.app.invocations.primitives import SD3ConditioningOutput
from invokeai.app.services.shared.invocation_context import InvocationContext
from invokeai.backend.model_manager.config import ModelFormat
from invokeai.backend.patches.layer_patcher import LayerPatcher
from invokeai.backend.patches.lora_conversions.flux_lora_constants import FLUX_LORA_CLIP_PREFIX
from invokeai.backend.patches.model_patch_raw import ModelPatchRaw
from invokeai.backend.patches.model_patcher import LayerPatcher
from invokeai.backend.stable_diffusion.diffusion.conditioning_data import ConditioningFieldData, SD3ConditioningInfo
from invokeai.backend.util.devices import TorchDevice
# The SD3 T5 Max Sequence Length set based on the default in diffusers.
SD3_T5_MAX_SEQ_LEN = 256
@@ -150,10 +151,11 @@ class Sd3TextEncoderInvocation(BaseInvocation):
if clip_text_encoder_config.format in [ModelFormat.Diffusers]:
# The model is non-quantized, so we can apply the LoRA weights directly into the model.
exit_stack.enter_context(
LayerPatcher.apply_model_patches(
LayerPatcher.apply_smart_model_patches(
model=clip_text_encoder,
patches=self._clip_lora_iterator(context, clip_model),
prefix=FLUX_LORA_CLIP_PREFIX,
dtype=TorchDevice.choose_torch_dtype(),
cached_weights=cached_weights,
)
)

View File

@@ -22,8 +22,8 @@ from invokeai.app.invocations.fields import (
from invokeai.app.invocations.model import UNetField
from invokeai.app.invocations.primitives import LatentsOutput
from invokeai.app.services.shared.invocation_context import InvocationContext
from invokeai.backend.patches.layer_patcher import LayerPatcher
from invokeai.backend.patches.model_patch_raw import ModelPatchRaw
from invokeai.backend.patches.model_patcher import LayerPatcher
from invokeai.backend.stable_diffusion.diffusers_pipeline import ControlNetData, PipelineIntermediateState
from invokeai.backend.stable_diffusion.multi_diffusion_pipeline import (
MultiDiffusionPipeline,
@@ -207,7 +207,9 @@ class TiledMultiDiffusionDenoiseLatents(BaseInvocation):
with (
ExitStack() as exit_stack,
unet_info as unet,
LayerPatcher.apply_model_patches(model=unet, patches=_lora_loader(), prefix="lora_unet_"),
LayerPatcher.apply_smart_model_patches(
model=unet, patches=_lora_loader(), prefix="lora_unet_", dtype=unet.dtype
),
):
assert isinstance(unet, UNet2DConditionModel)
latents = latents.to(device=unet.device, dtype=unet.dtype)

View File

@@ -13,7 +13,6 @@ from functools import lru_cache
from pathlib import Path
from typing import Any, Literal, Optional
import psutil
import yaml
from pydantic import BaseModel, Field, PrivateAttr, field_validator
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict
@@ -25,8 +24,6 @@ from invokeai.frontend.cli.arg_parser import InvokeAIArgs
INIT_FILE = Path("invokeai.yaml")
DB_FILE = Path("invokeai.db")
LEGACY_INIT_FILE = Path("invokeai.init")
DEFAULT_RAM_CACHE = 10.0
DEFAULT_VRAM_CACHE = 0.25
DEVICE = Literal["auto", "cpu", "cuda", "cuda:1", "mps"]
PRECISION = Literal["auto", "float16", "bfloat16", "float32"]
ATTENTION_TYPE = Literal["auto", "normal", "xformers", "sliced", "torch-sdp"]
@@ -36,24 +33,6 @@ LOG_LEVEL = Literal["debug", "info", "warning", "error", "critical"]
CONFIG_SCHEMA_VERSION = "4.0.2"
def get_default_ram_cache_size() -> float:
"""Run a heuristic for the default RAM cache based on installed RAM."""
# On some machines, psutil.virtual_memory().total gives a value that is slightly less than the actual RAM, so the
# limits are set slightly lower than than what we expect the actual RAM to be.
GB = 1024**3
max_ram = psutil.virtual_memory().total / GB
if max_ram >= 60:
return 15.0
if max_ram >= 30:
return 7.5
if max_ram >= 14:
return 4.0
return 2.1 # 2.1 is just large enough for sd 1.5 ;-)
class URLRegexTokenPair(BaseModel):
url_regex: str = Field(description="Regular expression to match against the URL")
token: str = Field(description="Token to use when the URL matches the regex")
@@ -102,10 +81,12 @@ class InvokeAIAppConfig(BaseSettings):
profile_graphs: Enable graph profiling using `cProfile`.
profile_prefix: An optional prefix for profile output files.
profiles_dir: Path to profiles output directory.
ram: Maximum memory amount used by memory model cache for rapid switching (GB).
vram: Amount of VRAM reserved for model storage (GB).
lazy_offload: Keep models in VRAM until their space is needed.
ram: The maximum amount of CPU RAM to use for model caching in GB. If unset, the limit will be configured based on the available RAM. In most cases, it is recommended to leave this unset.
vram: The amount of VRAM to use for model caching in GB. If unset, the limit will be configured based on the available VRAM and the device_working_mem_gb. In most cases, it is recommended to leave this unset.
lazy_offload: DEPRECATED: This setting is no longer used. Lazy-offloading is enabled by default. This config setting will be removed once the new model cache behaviour is out of beta.
log_memory_usage: If True, a memory snapshot will be captured before and after every model cache operation, and the result will be logged (at debug level). There is a time cost to capturing the memory snapshots, so it is recommended to only enable this feature if you are actively inspecting the model cache's behaviour.
device_working_mem_gb: The amount of working memory to keep available on the compute device (in GB). Has no effect if running on CPU. If you are experiencing OOM errors, try increasing this value.
enable_partial_loading: Enable partial loading of models. This enables models to run with reduced VRAM requirements (at the cost of slower speed) by streaming the model from RAM to VRAM as its used. In some edge cases, partial loading can cause models to run more slowly if they were previously being fully loaded into VRAM.
device: Preferred execution device. `auto` will choose the device depending on the hardware platform and the installed torch capabilities.<br>Valid values: `auto`, `cpu`, `cuda`, `cuda:1`, `mps`
precision: Floating point precision. `float16` will consume half the memory of `float32` but produce slightly lower-quality images. The `auto` setting will guess the proper precision based on your video card and operating system.<br>Valid values: `auto`, `float16`, `bfloat16`, `float32`
sequential_guidance: Whether to calculate guidance in serial instead of in parallel, lowering memory requirements.
@@ -172,10 +153,12 @@ class InvokeAIAppConfig(BaseSettings):
profiles_dir: Path = Field(default=Path("profiles"), description="Path to profiles output directory.")
# CACHE
ram: float = Field(default_factory=get_default_ram_cache_size, gt=0, description="Maximum memory amount used by memory model cache for rapid switching (GB).")
vram: float = Field(default=DEFAULT_VRAM_CACHE, ge=0, description="Amount of VRAM reserved for model storage (GB).")
lazy_offload: bool = Field(default=True, description="Keep models in VRAM until their space is needed.")
ram: Optional[float] = Field(default=None, gt=0, description="The maximum amount of CPU RAM to use for model caching in GB. If unset, the limit will be configured based on the available RAM. In most cases, it is recommended to leave this unset.")
vram: Optional[float] = Field(default=None, ge=0, description="The amount of VRAM to use for model caching in GB. If unset, the limit will be configured based on the available VRAM and the device_working_mem_gb. In most cases, it is recommended to leave this unset.")
lazy_offload: bool = Field(default=True, description="DEPRECATED: This setting is no longer used. Lazy-offloading is enabled by default. This config setting will be removed once the new model cache behaviour is out of beta.")
log_memory_usage: bool = Field(default=False, description="If True, a memory snapshot will be captured before and after every model cache operation, and the result will be logged (at debug level). There is a time cost to capturing the memory snapshots, so it is recommended to only enable this feature if you are actively inspecting the model cache's behaviour.")
device_working_mem_gb: float = Field(default=2, description="The amount of working memory to keep available on the compute device (in GB). Has no effect if running on CPU. If you are experiencing OOM errors, try increasing this value.")
enable_partial_loading: bool = Field(default=False, description="Enable partial loading of models. This enables models to run with reduced VRAM requirements (at the cost of slower speed) by streaming the model from RAM to VRAM as its used. In some edge cases, partial loading can cause models to run more slowly if they were previously being fully loaded into VRAM.")
# DEVICE
device: DEVICE = Field(default="auto", description="Preferred execution device. `auto` will choose the device depending on the hardware platform and the installed torch capabilities.")

View File

@@ -22,7 +22,6 @@ class ModelCacheStatsSummary:
"""The stats for the model cache."""
high_water_mark_gb: float
cache_size_gb: float
total_usage_gb: float
cache_hits: int
cache_misses: int
@@ -79,7 +78,7 @@ class InvocationStatsSummary:
_str += f" Model cache misses: {self.model_cache_stats.cache_misses}\n"
_str += f" Models cached: {self.model_cache_stats.models_cached}\n"
_str += f" Models cleared from cache: {self.model_cache_stats.models_cleared}\n"
_str += f" Cache high water mark: {self.model_cache_stats.high_water_mark_gb:4.2f}/{self.model_cache_stats.cache_size_gb:4.2f}G\n"
_str += f" Cache high water mark: {self.model_cache_stats.high_water_mark_gb:4.2f}G\n"
return _str

View File

@@ -20,7 +20,7 @@ from invokeai.app.services.invocation_stats.invocation_stats_common import (
NodeExecutionStatsSummary,
)
from invokeai.app.services.invoker import Invoker
from invokeai.backend.model_manager.load.model_cache import CacheStats
from invokeai.backend.model_manager.load.model_cache.cache_stats import CacheStats
# Size of 1GB in bytes.
GB = 2**30
@@ -111,7 +111,6 @@ class InvocationStatsService(InvocationStatsServiceBase):
cache_hits=cache_stats.hits,
cache_misses=cache_stats.misses,
high_water_mark_gb=cache_stats.high_watermark / GB,
cache_size_gb=cache_stats.cache_size / GB,
total_usage_gb=sum(list(cache_stats.loaded_model_sizes.values())) / GB,
models_cached=cache_stats.in_cache,
models_cleared=cache_stats.cleared,

View File

@@ -7,14 +7,18 @@ from typing import Callable, Optional
from invokeai.backend.model_manager import AnyModel, AnyModelConfig, SubModelType
from invokeai.backend.model_manager.load import LoadedModel, LoadedModelWithoutConfig
from invokeai.backend.model_manager.load.model_cache.model_cache_base import ModelCacheBase
from invokeai.backend.model_manager.load.model_cache.model_cache import ModelCache
class ModelLoadServiceBase(ABC):
"""Wrapper around AnyModelLoader."""
@abstractmethod
def load_model(self, model_config: AnyModelConfig, submodel_type: Optional[SubModelType] = None) -> LoadedModel:
def load_model(
self,
model_config: AnyModelConfig,
submodel_type: Optional[SubModelType] = None,
) -> LoadedModel:
"""
Given a model's configuration, load it and return the LoadedModel object.
@@ -24,7 +28,7 @@ class ModelLoadServiceBase(ABC):
@property
@abstractmethod
def ram_cache(self) -> ModelCacheBase[AnyModel]:
def ram_cache(self) -> ModelCache:
"""Return the RAM cache used by this loader."""
@abstractmethod

View File

@@ -18,7 +18,7 @@ from invokeai.backend.model_manager.load import (
ModelLoaderRegistry,
ModelLoaderRegistryBase,
)
from invokeai.backend.model_manager.load.model_cache.model_cache_base import ModelCacheBase
from invokeai.backend.model_manager.load.model_cache.model_cache import ModelCache
from invokeai.backend.model_manager.load.model_loaders.generic_diffusers import GenericDiffusersLoader
from invokeai.backend.util.devices import TorchDevice
from invokeai.backend.util.logging import InvokeAILogger
@@ -30,7 +30,7 @@ class ModelLoadService(ModelLoadServiceBase):
def __init__(
self,
app_config: InvokeAIAppConfig,
ram_cache: ModelCacheBase[AnyModel],
ram_cache: ModelCache,
registry: Optional[Type[ModelLoaderRegistryBase]] = ModelLoaderRegistry,
):
"""Initialize the model load service."""
@@ -45,11 +45,15 @@ class ModelLoadService(ModelLoadServiceBase):
self._invoker = invoker
@property
def ram_cache(self) -> ModelCacheBase[AnyModel]:
def ram_cache(self) -> ModelCache:
"""Return the RAM cache used by this loader."""
return self._ram_cache
def load_model(self, model_config: AnyModelConfig, submodel_type: Optional[SubModelType] = None) -> LoadedModel:
def load_model(
self,
model_config: AnyModelConfig,
submodel_type: Optional[SubModelType] = None,
) -> LoadedModel:
"""
Given a model's configuration, load it and return the LoadedModel object.
@@ -78,9 +82,8 @@ class ModelLoadService(ModelLoadServiceBase):
self, model_path: Path, loader: Optional[Callable[[Path], AnyModel]] = None
) -> LoadedModelWithoutConfig:
cache_key = str(model_path)
ram_cache = self.ram_cache
try:
return LoadedModelWithoutConfig(_locker=ram_cache.get(key=cache_key))
return LoadedModelWithoutConfig(cache_record=self._ram_cache.get(key=cache_key), cache=self._ram_cache)
except IndexError:
pass
@@ -109,5 +112,5 @@ class ModelLoadService(ModelLoadServiceBase):
)
assert loader is not None
raw_model = loader(model_path)
ram_cache.put(key=cache_key, model=raw_model)
return LoadedModelWithoutConfig(_locker=ram_cache.get(key=cache_key))
self._ram_cache.put(key=cache_key, model=raw_model)
return LoadedModelWithoutConfig(cache_record=self._ram_cache.get(key=cache_key), cache=self._ram_cache)

View File

@@ -16,7 +16,8 @@ from invokeai.app.services.model_load.model_load_base import ModelLoadServiceBas
from invokeai.app.services.model_load.model_load_default import ModelLoadService
from invokeai.app.services.model_manager.model_manager_base import ModelManagerServiceBase
from invokeai.app.services.model_records.model_records_base import ModelRecordServiceBase
from invokeai.backend.model_manager.load import ModelCache, ModelLoaderRegistry
from invokeai.backend.model_manager.load.model_cache.model_cache import ModelCache
from invokeai.backend.model_manager.load.model_loader_registry import ModelLoaderRegistry
from invokeai.backend.util.devices import TorchDevice
from invokeai.backend.util.logging import InvokeAILogger
@@ -81,9 +82,10 @@ class ModelManagerService(ModelManagerServiceBase):
logger.setLevel(app_config.log_level.upper())
ram_cache = ModelCache(
max_cache_size=app_config.ram,
max_vram_cache_size=app_config.vram,
lazy_offloading=app_config.lazy_offload,
execution_device_working_mem_gb=app_config.device_working_mem_gb,
enable_partial_loading=app_config.enable_partial_loading,
max_ram_cache_size_gb=app_config.ram,
max_vram_cache_size_gb=app_config.vram,
logger=logger,
execution_device=execution_device or TorchDevice.choose_torch_device(),
)

View File

@@ -361,7 +361,9 @@ class ModelsInterface(InvocationContextInterface):
return self._services.model_manager.store.exists(identifier.key)
def load(
self, identifier: Union[str, "ModelIdentifierField"], submodel_type: Optional[SubModelType] = None
self,
identifier: Union[str, "ModelIdentifierField"],
submodel_type: Optional[SubModelType] = None,
) -> LoadedModel:
"""Load a model.

View File

@@ -8,7 +8,7 @@ from pathlib import Path
from invokeai.backend.model_manager.load.load_base import LoadedModel, LoadedModelWithoutConfig, ModelLoaderBase
from invokeai.backend.model_manager.load.load_default import ModelLoader
from invokeai.backend.model_manager.load.model_cache.model_cache_default import ModelCache
from invokeai.backend.model_manager.load.model_cache.model_cache import ModelCache
from invokeai.backend.model_manager.load.model_loader_registry import ModelLoaderRegistry, ModelLoaderRegistryBase
# This registers the subclasses that implement loaders of specific model types

View File

@@ -5,7 +5,6 @@ Base class for model loading in InvokeAI.
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from logging import Logger
from pathlib import Path
from typing import Any, Dict, Generator, Optional, Tuple
@@ -18,19 +17,17 @@ from invokeai.backend.model_manager.config import (
AnyModelConfig,
SubModelType,
)
from invokeai.backend.model_manager.load.model_cache.model_cache_base import ModelCacheBase, ModelLockerBase
from invokeai.backend.model_manager.load.model_cache.cache_record import CacheRecord
from invokeai.backend.model_manager.load.model_cache.model_cache import ModelCache
@dataclass
class LoadedModelWithoutConfig:
"""
Context manager object that mediates transfer from RAM<->VRAM.
"""Context manager object that mediates transfer from RAM<->VRAM.
This is a context manager object that has two distinct APIs:
1. Older API (deprecated):
Use the LoadedModel object directly as a context manager.
It will move the model into VRAM (on CUDA devices), and
Use the LoadedModel object directly as a context manager. It will move the model into VRAM (on CUDA devices), and
return the model in a form suitable for passing to torch.
Example:
```
@@ -40,13 +37,9 @@ class LoadedModelWithoutConfig:
```
2. Newer API (recommended):
Call the LoadedModel's `model_on_device()` method in a
context. It returns a tuple consisting of a copy of
the model's state dict in CPU RAM followed by a copy
of the model in VRAM. The state dict is provided to allow
LoRAs and other model patchers to return the model to
its unpatched state without expensive copy and restore
operations.
Call the LoadedModel's `model_on_device()` method in a context. It returns a tuple consisting of a copy of the
model's state dict in CPU RAM followed by a copy of the model in VRAM. The state dict is provided to allow LoRAs and
other model patchers to return the model to its unpatched state without expensive copy and restore operations.
Example:
```
@@ -55,43 +48,53 @@ class LoadedModelWithoutConfig:
image = vae.decode(latents)[0]
```
The state_dict should be treated as a read-only object and
never modified. Also be aware that some loadable models do
not have a state_dict, in which case this value will be None.
The state_dict should be treated as a read-only object and never modified. Also be aware that some loadable models
do not have a state_dict, in which case this value will be None.
"""
_locker: ModelLockerBase
def __init__(self, cache_record: CacheRecord, cache: ModelCache):
self._cache_record = cache_record
self._cache = cache
def __enter__(self) -> AnyModel:
"""Context entry."""
self._locker.lock()
self._cache.lock(self._cache_record.key, None)
return self.model
def __exit__(self, *args: Any, **kwargs: Any) -> None:
"""Context exit."""
self._locker.unlock()
self._cache.unlock(self._cache_record.key)
@contextmanager
def model_on_device(self) -> Generator[Tuple[Optional[Dict[str, torch.Tensor]], AnyModel], None, None]:
"""Return a tuple consisting of the model's state dict (if it exists) and the locked model on execution device."""
locked_model = self._locker.lock()
def model_on_device(
self, working_mem_bytes: Optional[int] = None
) -> Generator[Tuple[Optional[Dict[str, torch.Tensor]], AnyModel], None, None]:
"""Return a tuple consisting of the model's state dict (if it exists) and the locked model on execution device.
:param working_mem_bytes: The amount of working memory to keep available on the compute device when loading the
model.
"""
self._cache.lock(self._cache_record.key, working_mem_bytes)
try:
state_dict = self._locker.get_state_dict()
yield (state_dict, locked_model)
yield (self._cache_record.cached_model.get_cpu_state_dict(), self._cache_record.cached_model.model)
finally:
self._locker.unlock()
self._cache.unlock(self._cache_record.key)
@property
def model(self) -> AnyModel:
"""Return the model without locking it."""
return self._locker.model
return self._cache_record.cached_model.model
@dataclass
class LoadedModel(LoadedModelWithoutConfig):
"""Context manager object that mediates transfer from RAM<->VRAM."""
config: Optional[AnyModelConfig] = None
def __init__(
self,
config: Optional[AnyModelConfig],
cache_record: CacheRecord,
cache: ModelCache,
):
super().__init__(cache_record=cache_record, cache=cache)
self.config = config
# TODO(MM2):
@@ -110,13 +113,17 @@ class ModelLoaderBase(ABC):
self,
app_config: InvokeAIAppConfig,
logger: Logger,
ram_cache: ModelCacheBase[AnyModel],
ram_cache: ModelCache,
):
"""Initialize the loader."""
pass
@abstractmethod
def load_model(self, model_config: AnyModelConfig, submodel_type: Optional[SubModelType] = None) -> LoadedModel:
def load_model(
self,
model_config: AnyModelConfig,
submodel_type: Optional[SubModelType] = None,
) -> LoadedModel:
"""
Return a model given its confguration.
@@ -138,6 +145,6 @@ class ModelLoaderBase(ABC):
@property
@abstractmethod
def ram_cache(self) -> ModelCacheBase[AnyModel]:
def ram_cache(self) -> ModelCache:
"""Return the ram cache associated with this loader."""
pass

View File

@@ -14,7 +14,8 @@ from invokeai.backend.model_manager import (
)
from invokeai.backend.model_manager.config import DiffusersConfigBase
from invokeai.backend.model_manager.load.load_base import LoadedModel, ModelLoaderBase
from invokeai.backend.model_manager.load.model_cache.model_cache_base import ModelCacheBase, ModelLockerBase
from invokeai.backend.model_manager.load.model_cache.cache_record import CacheRecord
from invokeai.backend.model_manager.load.model_cache.model_cache import ModelCache, get_model_cache_key
from invokeai.backend.model_manager.load.model_util import calc_model_size_by_fs
from invokeai.backend.model_manager.load.optimizations import skip_torch_weight_init
from invokeai.backend.util.devices import TorchDevice
@@ -28,7 +29,7 @@ class ModelLoader(ModelLoaderBase):
self,
app_config: InvokeAIAppConfig,
logger: Logger,
ram_cache: ModelCacheBase[AnyModel],
ram_cache: ModelCache,
):
"""Initialize the loader."""
self._app_config = app_config
@@ -37,7 +38,11 @@ class ModelLoader(ModelLoaderBase):
self._torch_dtype = TorchDevice.choose_torch_dtype()
self._torch_device = TorchDevice.choose_torch_device()
def load_model(self, model_config: AnyModelConfig, submodel_type: Optional[SubModelType] = None) -> LoadedModel:
def load_model(
self,
model_config: AnyModelConfig,
submodel_type: Optional[SubModelType] = None,
) -> LoadedModel:
"""
Return a model given its configuration.
@@ -54,11 +59,11 @@ class ModelLoader(ModelLoaderBase):
raise InvalidModelConfigException(f"Files for model '{model_config.name}' not found at {model_path}")
with skip_torch_weight_init():
locker = self._load_and_cache(model_config, submodel_type)
return LoadedModel(config=model_config, _locker=locker)
cache_record = self._load_and_cache(model_config, submodel_type)
return LoadedModel(config=model_config, cache_record=cache_record, cache=self._ram_cache)
@property
def ram_cache(self) -> ModelCacheBase[AnyModel]:
def ram_cache(self) -> ModelCache:
"""Return the ram cache associated with this loader."""
return self._ram_cache
@@ -66,10 +71,10 @@ class ModelLoader(ModelLoaderBase):
model_base = self._app_config.models_path
return (model_base / config.path).resolve()
def _load_and_cache(self, config: AnyModelConfig, submodel_type: Optional[SubModelType] = None) -> ModelLockerBase:
def _load_and_cache(self, config: AnyModelConfig, submodel_type: Optional[SubModelType] = None) -> CacheRecord:
stats_name = ":".join([config.base, config.type, config.name, (submodel_type or "")])
try:
return self._ram_cache.get(config.key, submodel_type, stats_name=stats_name)
return self._ram_cache.get(key=get_model_cache_key(config.key, submodel_type), stats_name=stats_name)
except IndexError:
pass
@@ -78,16 +83,11 @@ class ModelLoader(ModelLoaderBase):
loaded_model = self._load_model(config, submodel_type)
self._ram_cache.put(
config.key,
submodel_type=submodel_type,
get_model_cache_key(config.key, submodel_type),
model=loaded_model,
)
return self._ram_cache.get(
key=config.key,
submodel_type=submodel_type,
stats_name=stats_name,
)
return self._ram_cache.get(key=get_model_cache_key(config.key, submodel_type), stats_name=stats_name)
def get_size_fs(
self, config: AnyModelConfig, model_path: Path, submodel_type: Optional[SubModelType] = None

View File

@@ -1,6 +0,0 @@
"""Init file for ModelCache."""
from .model_cache_base import ModelCacheBase, CacheStats # noqa F401
from .model_cache_default import ModelCache # noqa F401
_all__ = ["ModelCacheBase", "ModelCache", "CacheStats"]

View File

@@ -0,0 +1,31 @@
from dataclasses import dataclass
from invokeai.backend.model_manager.load.model_cache.cached_model.cached_model_only_full_load import (
CachedModelOnlyFullLoad,
)
from invokeai.backend.model_manager.load.model_cache.cached_model.cached_model_with_partial_load import (
CachedModelWithPartialLoad,
)
@dataclass
class CacheRecord:
"""A class that represents a model in the model cache."""
# Cache key.
key: str
# Model in memory.
cached_model: CachedModelWithPartialLoad | CachedModelOnlyFullLoad
# If locks > 0, the model is actively being used, so we should do our best to keep it on the compute device.
_locks: int = 0
def lock(self) -> None:
self._locks += 1
def unlock(self) -> None:
self._locks -= 1
assert self._locks >= 0
@property
def is_locked(self) -> bool:
return self._locks > 0

View File

@@ -0,0 +1,14 @@
from dataclasses import dataclass, field
from typing import Dict
@dataclass
class CacheStats(object):
"""Collect statistics on cache performance."""
hits: int = 0 # cache hits
misses: int = 0 # cache misses
high_watermark: int = 0 # amount of cache used
in_cache: int = 0 # number of models in cache
cleared: int = 0 # number of models cleared to make space
loaded_model_sizes: Dict[str, int] = field(default_factory=dict)

View File

@@ -0,0 +1,97 @@
from typing import Any
import torch
class CachedModelOnlyFullLoad:
"""A wrapper around a PyTorch model to handle full loads and unloads between the CPU and the compute device.
Note: "VRAM" is used throughout this class to refer to the memory on the compute device. It could be CUDA memory,
MPS memory, etc.
"""
def __init__(self, model: torch.nn.Module | Any, compute_device: torch.device, total_bytes: int):
"""Initialize a CachedModelOnlyFullLoad.
Args:
model (torch.nn.Module | Any): The model to wrap. Should be on the CPU.
compute_device (torch.device): The compute device to move the model to.
total_bytes (int): The total size (in bytes) of all the weights in the model.
"""
# model is often a torch.nn.Module, but could be any model type. Throughout this class, we handle both cases.
self._model = model
self._compute_device = compute_device
self._offload_device = torch.device("cpu")
# A CPU read-only copy of the model's state dict.
self._cpu_state_dict: dict[str, torch.Tensor] | None = None
if isinstance(model, torch.nn.Module):
self._cpu_state_dict = model.state_dict()
self._total_bytes = total_bytes
self._is_in_vram = False
@property
def model(self) -> torch.nn.Module:
return self._model
def get_cpu_state_dict(self) -> dict[str, torch.Tensor] | None:
"""Get a read-only copy of the model's state dict in RAM."""
# TODO(ryand): Document this better.
return self._cpu_state_dict
def total_bytes(self) -> int:
"""Get the total size (in bytes) of all the weights in the model."""
return self._total_bytes
def cur_vram_bytes(self) -> int:
"""Get the size (in bytes) of the weights that are currently in VRAM."""
if self._is_in_vram:
return self._total_bytes
else:
return 0
def is_in_vram(self) -> bool:
"""Return true if the model is currently in VRAM."""
return self._is_in_vram
def full_load_to_vram(self) -> int:
"""Load all weights into VRAM (if supported by the model).
Returns:
The number of bytes loaded into VRAM.
"""
if self._is_in_vram:
# Already in VRAM.
return 0
if not hasattr(self._model, "to"):
# Model doesn't support moving to a device.
return 0
if self._cpu_state_dict is not None:
new_state_dict: dict[str, torch.Tensor] = {}
for k, v in self._cpu_state_dict.items():
new_state_dict[k] = v.to(self._compute_device, copy=True)
self._model.load_state_dict(new_state_dict, assign=True)
self._model.to(self._compute_device)
self._is_in_vram = True
return self._total_bytes
def full_unload_from_vram(self) -> int:
"""Unload all weights from VRAM.
Returns:
The number of bytes unloaded from VRAM.
"""
if not self._is_in_vram:
# Already in RAM.
return 0
if self._cpu_state_dict is not None:
self._model.load_state_dict(self._cpu_state_dict, assign=True)
self._model.to(self._offload_device)
self._is_in_vram = False
return self._total_bytes

View File

@@ -0,0 +1,157 @@
import torch
from invokeai.backend.model_manager.load.model_cache.torch_function_autocast_context import (
add_autocast_to_module_forward,
remove_autocast_from_module_forward,
)
from invokeai.backend.util.calc_tensor_size import calc_tensor_size
def set_nested_attr(obj: object, attr: str, value: object):
"""A helper function that extends setattr() to support nested attributes.
Example:
set_nested_attr(model, "module.encoder.conv1.weight", new_conv1_weight)
"""
attrs = attr.split(".")
for attr in attrs[:-1]:
obj = getattr(obj, attr)
setattr(obj, attrs[-1], value)
class CachedModelWithPartialLoad:
"""A wrapper around a PyTorch model to handle partial loads and unloads between the CPU and the compute device.
Note: "VRAM" is used throughout this class to refer to the memory on the compute device. It could be CUDA memory,
MPS memory, etc.
"""
def __init__(self, model: torch.nn.Module, compute_device: torch.device):
self._model = model
self._compute_device = compute_device
# A CPU read-only copy of the model's state dict.
self._cpu_state_dict: dict[str, torch.Tensor] = model.state_dict()
# TODO(ryand): Handle the case where the model sizes changes after initial load (e.g. due to dtype casting).
# Consider how we should handle this for both self._total_bytes and self._cur_vram_bytes.
self._total_bytes = sum(calc_tensor_size(p) for p in self._cpu_state_dict.values())
self._cur_vram_bytes: int | None = None
self._update_model_autocast_context()
@property
def model(self) -> torch.nn.Module:
return self._model
def get_cpu_state_dict(self) -> dict[str, torch.Tensor] | None:
"""Get a read-only copy of the model's state dict in RAM."""
# TODO(ryand): Document this better.
return self._cpu_state_dict
def total_bytes(self) -> int:
"""Get the total size (in bytes) of all the weights in the model."""
return self._total_bytes
def cur_vram_bytes(self) -> int:
"""Get the size (in bytes) of the weights that are currently in VRAM."""
if self._cur_vram_bytes is None:
cur_state_dict = self._model.state_dict()
self._cur_vram_bytes = sum(
calc_tensor_size(p) for p in cur_state_dict.values() if p.device.type == self._compute_device.type
)
return self._cur_vram_bytes
def full_load_to_vram(self) -> int:
"""Load all weights into VRAM."""
return self.partial_load_to_vram(self.total_bytes())
def full_unload_from_vram(self) -> int:
"""Unload all weights from VRAM."""
return self.partial_unload_from_vram(self.total_bytes())
@torch.no_grad()
def partial_load_to_vram(self, vram_bytes_to_load: int) -> int:
"""Load more weights into VRAM without exceeding vram_bytes_to_load.
Returns:
The number of bytes loaded into VRAM.
"""
# TODO(ryand): Handle the case where an exception is thrown while loading or unloading weights. At the very
# least, we should reset self._cur_vram_bytes to None.
vram_bytes_loaded = 0
cur_state_dict = self._model.state_dict()
for key, param in cur_state_dict.items():
if param.device.type == self._compute_device.type:
continue
param_size = calc_tensor_size(param)
if vram_bytes_loaded + param_size > vram_bytes_to_load:
# TODO(ryand): Should we just break here? If we couldn't fit this parameter into VRAM, is it really
# worth continuing to search for a smaller parameter that would fit?
continue
cur_state_dict[key] = param.to(self._compute_device, copy=True)
vram_bytes_loaded += param_size
if vram_bytes_loaded > 0:
# We load the entire state dict, not just the parameters that changed, in case there are modules that
# override _load_from_state_dict() and do some funky stuff that requires the entire state dict.
# Alternatively, in the future, grouping parameters by module could probably solve this problem.
self._model.load_state_dict(cur_state_dict, assign=True)
if self._cur_vram_bytes is not None:
self._cur_vram_bytes += vram_bytes_loaded
if self._cur_vram_bytes == self.total_bytes():
# HACK(ryand): The model should already be on the compute device, but we have to call this to ensure that
# all non-persistent buffers are moved (i.e. buffers that are not registered in the state dict).
self._model.to(self._compute_device)
self._update_model_autocast_context()
return vram_bytes_loaded
@torch.no_grad()
def partial_unload_from_vram(self, vram_bytes_to_free: int) -> int:
"""Unload weights from VRAM until vram_bytes_to_free bytes are freed. Or the entire model is unloaded.
Returns:
The number of bytes unloaded from VRAM.
"""
vram_bytes_freed = 0
offload_device = "cpu"
cur_state_dict = self._model.state_dict()
for key, param in cur_state_dict.items():
if vram_bytes_freed >= vram_bytes_to_free:
break
if param.device.type == offload_device:
continue
cur_state_dict[key] = self._cpu_state_dict[key]
vram_bytes_freed += calc_tensor_size(param)
if vram_bytes_freed > 0:
self._model.load_state_dict(cur_state_dict, assign=True)
if self._cur_vram_bytes is not None:
self._cur_vram_bytes -= vram_bytes_freed
self._update_model_autocast_context()
return vram_bytes_freed
def _update_model_autocast_context(self):
"""A helper function that should be called whenever the model's VRAM usage changes to add/remove the autocast
context.
"""
if self.cur_vram_bytes() == self.total_bytes():
# We remove the autocast context when the model is fully loaded into VRAM, because the context causes some
# runtime overhead.
remove_autocast_from_module_forward(self._model)
else:
# Monkey-patch the model to add autocasting to the model's forward method.
add_autocast_to_module_forward(self._model, self._compute_device)

View File

@@ -0,0 +1,554 @@
import gc
import logging
import time
from logging import Logger
from typing import Dict, List, Optional
import psutil
import torch
from diffusers.models.autoencoders.autoencoder_kl import AutoencoderKL
from invokeai.backend.flux.ip_adapter.xlabs_ip_adapter_flux import XlabsIpAdapterFlux
from invokeai.backend.flux.modules.autoencoder import AutoEncoder
from invokeai.backend.model_manager import AnyModel, SubModelType
from invokeai.backend.model_manager.load.memory_snapshot import MemorySnapshot
from invokeai.backend.model_manager.load.model_cache.cache_record import CacheRecord
from invokeai.backend.model_manager.load.model_cache.cache_stats import CacheStats
from invokeai.backend.model_manager.load.model_cache.cached_model.cached_model_only_full_load import (
CachedModelOnlyFullLoad,
)
from invokeai.backend.model_manager.load.model_cache.cached_model.cached_model_with_partial_load import (
CachedModelWithPartialLoad,
)
from invokeai.backend.model_manager.load.model_util import calc_model_size_by_data
from invokeai.backend.util.devices import TorchDevice
from invokeai.backend.util.logging import InvokeAILogger
from invokeai.backend.util.prefix_logger_adapter import PrefixedLoggerAdapter
# Size of a GB in bytes.
GB = 2**30
# Size of a MB in bytes.
MB = 2**20
# TODO(ryand): Where should this go? The ModelCache shouldn't be concerned with submodels.
def get_model_cache_key(model_key: str, submodel_type: Optional[SubModelType] = None) -> str:
"""Get the cache key for a model based on the optional submodel type."""
if submodel_type:
return f"{model_key}:{submodel_type.value}"
else:
return model_key
class ModelCache:
"""A cache for managing models in memory.
The cache is based on two levels of model storage:
- execution_device: The device where most models are executed (typically "cuda", "mps", or "cpu").
- storage_device: The device where models are offloaded when not in active use (typically "cpu").
The model cache is based on the following assumptions:
- storage_device_mem_size > execution_device_mem_size
- disk_to_storage_device_transfer_time >> storage_device_to_execution_device_transfer_time
A copy of all models in the cache is always kept on the storage_device. A subset of the models also have a copy on
the execution_device.
Models are moved between the storage_device and the execution_device as necessary. Cache size limits are enforced
on both the storage_device and the execution_device. The execution_device cache uses a smallest-first offload
policy. The storage_device cache uses a least-recently-used (LRU) offload policy.
Note: Neither of these offload policies has really been compared against alternatives. It's likely that different
policies would be better, although the optimal policies are likely heavily dependent on usage patterns and HW
configuration.
The cache returns context manager generators designed to load the model into the execution device (often GPU) within
the context, and unload outside the context.
Example usage:
```
cache = ModelCache(max_cache_size=7.5, max_vram_cache_size=6.0)
with cache.get_model('runwayml/stable-diffusion-1-5') as SD1:
do_something_on_gpu(SD1)
```
"""
def __init__(
self,
execution_device_working_mem_gb: float,
enable_partial_loading: bool,
max_ram_cache_size_gb: float | None = None,
max_vram_cache_size_gb: float | None = None,
execution_device: torch.device | str = "cuda",
storage_device: torch.device | str = "cpu",
log_memory_usage: bool = False,
logger: Optional[Logger] = None,
):
"""Initialize the model RAM cache.
:param execution_device_working_mem_gb: The amount of working memory to keep on the GPU (in GB) i.e. non-model
VRAM.
:param enable_partial_loading: Whether to enable partial loading of models.
:param max_ram_cache_size_gb: The maximum amount of CPU RAM to use for model caching in GB. This parameter is
kept to maintain compatibility with previous versions of the model cache, but should be deprecated in the
future. If set, this parameter overrides the default cache size logic.
:param max_vram_cache_size_gb: The amount of VRAM to use for model caching in GB. This parameter is kept to
maintain compatibility with previous versions of the model cache, but should be deprecated in the future.
If set, this parameter overrides the default cache size logic.
:param execution_device: Torch device to load active model into [torch.device('cuda')]
:param storage_device: Torch device to save inactive model in [torch.device('cpu')]
:param log_memory_usage: If True, a memory snapshot will be captured before and after every model cache
operation, and the result will be logged (at debug level). There is a time cost to capturing the memory
snapshots, so it is recommended to disable this feature unless you are actively inspecting the model cache's
behaviour.
:param logger: InvokeAILogger to use (otherwise creates one)
"""
self._enable_partial_loading = enable_partial_loading
self._execution_device_working_mem_gb = execution_device_working_mem_gb
self._execution_device: torch.device = torch.device(execution_device)
self._storage_device: torch.device = torch.device(storage_device)
self._max_ram_cache_size_gb = max_ram_cache_size_gb
self._max_vram_cache_size_gb = max_vram_cache_size_gb
self._logger = PrefixedLoggerAdapter(
logger or InvokeAILogger.get_logger(self.__class__.__name__), "MODEL CACHE"
)
self._log_memory_usage = log_memory_usage
self._stats: Optional[CacheStats] = None
self._cached_models: Dict[str, CacheRecord] = {}
self._cache_stack: List[str] = []
@property
def stats(self) -> Optional[CacheStats]:
"""Return collected CacheStats object."""
return self._stats
@stats.setter
def stats(self, stats: CacheStats) -> None:
"""Set the CacheStats object for collecting cache statistics."""
self._stats = stats
def put(self, key: str, model: AnyModel) -> None:
"""Add a model to the cache."""
if key in self._cached_models:
self._logger.debug(
f"Attempted to add model {key} ({model.__class__.__name__}), but it already exists in the cache. No action necessary."
)
return
size = calc_model_size_by_data(self._logger, model)
self.make_room(size)
# Partial loading only makes sense on CUDA.
# - When running on CPU, there is no 'loading' to do.
# - When running on MPS, memory is shared with the CPU, so the default OS memory management already handles this
# well.
running_with_cuda = self._execution_device.type == "cuda"
# Specific models that opt-out of partial loading.
partial_loading_opt_out_models = (
# The following models have multiple entrypoints. Our auto-casting context management is only applied to the
# forward method, so a partially loaded AutoEncoder could fail if another entrypoint is used. These models
# can be supported in the future by improving the autocast context management.
# AutoEncoder has three entrypoints: encode, decode, and forward.
AutoEncoder,
# XLabsIPAdapterFlux is a wrapper around two models that are called directly.
XlabsIpAdapterFlux,
AutoencoderKL,
)
# Wrap model.
if (
isinstance(model, torch.nn.Module)
and running_with_cuda
and self._enable_partial_loading
and not isinstance(model, partial_loading_opt_out_models)
):
wrapped_model = CachedModelWithPartialLoad(model, self._execution_device)
else:
wrapped_model = CachedModelOnlyFullLoad(model, self._execution_device, size)
cache_record = CacheRecord(key=key, cached_model=wrapped_model)
self._cached_models[key] = cache_record
self._cache_stack.append(key)
self._logger.debug(
f"Added model {key} (Type: {model.__class__.__name__}, Wrap mode: {wrapped_model.__class__.__name__}, Model size: {size/MB:.2f}MB)"
)
def get(self, key: str, stats_name: Optional[str] = None) -> CacheRecord:
"""Retrieve a model from the cache.
:param key: Model key
:param stats_name: A human-readable id for the model for the purposes of stats reporting.
Raises IndexError if the model is not in the cache.
"""
if key in self._cached_models:
if self.stats:
self.stats.hits += 1
else:
if self.stats:
self.stats.misses += 1
self._logger.debug(f"Cache miss: {key}")
raise IndexError(f"The model with key {key} is not in the cache.")
cache_entry = self._cached_models[key]
# more stats
if self.stats:
stats_name = stats_name or key
self.stats.high_watermark = max(self.stats.high_watermark, self._get_ram_in_use())
self.stats.in_cache = len(self._cached_models)
self.stats.loaded_model_sizes[stats_name] = max(
self.stats.loaded_model_sizes.get(stats_name, 0), cache_entry.cached_model.total_bytes()
)
# this moves the entry to the top (right end) of the stack
self._cache_stack = [k for k in self._cache_stack if k != key]
self._cache_stack.append(key)
self._logger.debug(f"Cache hit: {key} (Type: {cache_entry.cached_model.model.__class__.__name__})")
return cache_entry
def lock(self, key: str, working_mem_bytes: Optional[int]) -> None:
"""Lock a model for use and move it into VRAM.
:param working_mem_bytes: The number of bytes of working memory to keep on the GPU while this model is loaded on
the GPU. If None, self._execution_device_working_mem_gb is used.
"""
cache_entry = self._cached_models[key]
cache_entry.lock()
self._logger.debug(f"Locking model {key} (Type: {cache_entry.cached_model.model.__class__.__name__})")
if self._execution_device.type == "cpu":
# Models don't need to be loaded into VRAM if we're running on CPU.
return
try:
self._load_locked_model(cache_entry, working_mem_bytes)
self._logger.debug(
f"Finished locking model {key} (Type: {cache_entry.cached_model.model.__class__.__name__})"
)
except torch.cuda.OutOfMemoryError:
self._logger.warning("Insufficient GPU memory to load model. Aborting")
cache_entry.unlock()
raise
except Exception:
cache_entry.unlock()
raise
self._log_cache_state()
def unlock(self, key: str) -> None:
"""Unlock a model."""
cache_entry = self._cached_models[key]
cache_entry.unlock()
self._logger.debug(f"Unlocked model {key} (Type: {cache_entry.cached_model.model.__class__.__name__})")
def _load_locked_model(self, cache_entry: CacheRecord, working_mem_bytes: Optional[int] = None) -> None:
"""Helper function for self.lock(). Loads a locked model into VRAM."""
start_time = time.time()
vram_available = self._get_vram_available(working_mem_bytes)
# Calculate model_vram_needed, the amount of additional VRAM that will be used if we fully load the model into
# VRAM.
model_cur_vram_bytes = cache_entry.cached_model.cur_vram_bytes()
model_total_bytes = cache_entry.cached_model.total_bytes()
model_vram_needed = model_total_bytes - model_cur_vram_bytes
# The amount of VRAM that must be freed to make room for model_vram_needed.
vram_bytes_to_free = max(0, model_vram_needed - vram_available)
self._logger.debug(
f"Before unloading: {self._get_vram_state_str(model_cur_vram_bytes, model_total_bytes, vram_available)}"
)
# Make room for the model in VRAM.
# 1. If the model can fit entirely in VRAM, then make enough room for it to be loaded fully.
# 2. If the model can't fit fully into VRAM, then unload all other models and load as much of the model as
# possible.
vram_bytes_freed = self._offload_unlocked_models(vram_bytes_to_free)
self._logger.debug(f"Unloaded models (if necessary): vram_bytes_freed={(vram_bytes_freed/MB):.2f}MB")
# Check the updated vram_available after offloading.
vram_available = self._get_vram_available(working_mem_bytes)
self._logger.debug(
f"After unloading: {self._get_vram_state_str(model_cur_vram_bytes, model_total_bytes, vram_available)}"
)
# Move as much of the model as possible into VRAM.
# For testing, only allow 10% of the model to be loaded into VRAM.
# vram_available = int(model_vram_needed * 0.1)
model_bytes_loaded = self._move_model_to_vram(cache_entry, vram_available)
model_cur_vram_bytes = cache_entry.cached_model.cur_vram_bytes()
vram_available = self._get_vram_available(working_mem_bytes)
loaded_percent = model_cur_vram_bytes / model_total_bytes if model_total_bytes > 0 else 0
self._logger.info(
f"Loaded model '{cache_entry.key}' ({cache_entry.cached_model.model.__class__.__name__}) onto "
f"{self._execution_device.type} device in {(time.time() - start_time):.2f}s. "
f"Total model size: {model_total_bytes/MB:.2f}MB, "
f"VRAM: {model_cur_vram_bytes/MB:.2f}MB ({loaded_percent:.1%})"
)
self._logger.debug(f"Loaded model onto execution device: model_bytes_loaded={(model_bytes_loaded/MB):.2f}MB, ")
self._logger.debug(
f"After loading: {self._get_vram_state_str(model_cur_vram_bytes, model_total_bytes, vram_available)}"
)
def _move_model_to_vram(self, cache_entry: CacheRecord, vram_available: int) -> int:
try:
if isinstance(cache_entry.cached_model, CachedModelWithPartialLoad):
return cache_entry.cached_model.partial_load_to_vram(vram_available)
elif isinstance(cache_entry.cached_model, CachedModelOnlyFullLoad): # type: ignore
# Partial load is not supported, so we have not choice but to try and fit it all into VRAM.
return cache_entry.cached_model.full_load_to_vram()
else:
raise ValueError(f"Unsupported cached model type: {type(cache_entry.cached_model)}")
except Exception as e:
if isinstance(e, torch.cuda.OutOfMemoryError):
self._logger.warning("Insufficient GPU memory to load model. Aborting")
# If an exception occurs, the model could be left in a bad state, so we delete it from the cache entirely.
self._delete_cache_entry(cache_entry)
raise
def _move_model_to_ram(self, cache_entry: CacheRecord, vram_bytes_to_free: int) -> int:
try:
if isinstance(cache_entry.cached_model, CachedModelWithPartialLoad):
return cache_entry.cached_model.partial_unload_from_vram(vram_bytes_to_free)
elif isinstance(cache_entry.cached_model, CachedModelOnlyFullLoad): # type: ignore
return cache_entry.cached_model.full_unload_from_vram()
else:
raise ValueError(f"Unsupported cached model type: {type(cache_entry.cached_model)}")
except Exception:
# If an exception occurs, the model could be left in a bad state, so we delete it from the cache entirely.
self._delete_cache_entry(cache_entry)
raise
def _get_vram_available(self, working_mem_bytes: Optional[int] = None) -> int:
"""Calculate the amount of additional VRAM available for the cache to use (takes into account the working
memory).
"""
# If self._max_vram_cache_size_gb is set, then it overrides the default logic.
if self._max_vram_cache_size_gb is not None:
vram_total_available_to_cache = int(self._max_vram_cache_size_gb * GB)
return vram_total_available_to_cache - self._get_vram_in_use()
working_mem_bytes_default = int(self._execution_device_working_mem_gb * GB)
working_mem_bytes = max(working_mem_bytes or working_mem_bytes_default, working_mem_bytes_default)
if self._execution_device.type == "cuda":
vram_reserved = torch.cuda.memory_reserved(self._execution_device)
vram_free, _vram_total = torch.cuda.mem_get_info(self._execution_device)
vram_available_to_process = vram_free + vram_reserved
elif self._execution_device.type == "mps":
vram_reserved = torch.mps.driver_allocated_memory()
# TODO(ryand): Is it accurate that MPS shares memory with the CPU?
vram_free = psutil.virtual_memory().available
vram_available_to_process = vram_free + vram_reserved
else:
raise ValueError(f"Unsupported execution device: {self._execution_device.type}")
vram_total_available_to_cache = vram_available_to_process - working_mem_bytes
vram_cur_available_to_cache = vram_total_available_to_cache - self._get_vram_in_use()
return vram_cur_available_to_cache
def _get_vram_in_use(self) -> int:
"""Get the amount of VRAM currently in use by the cache."""
return sum(ce.cached_model.cur_vram_bytes() for ce in self._cached_models.values())
def _get_ram_available(self) -> int:
"""Get the amount of RAM available for the cache to use, while keeping memory pressure under control."""
# If self._max_ram_cache_size_gb is set, then it overrides the default logic.
if self._max_ram_cache_size_gb is not None:
ram_total_available_to_cache = int(self._max_ram_cache_size_gb * GB)
return ram_total_available_to_cache - self._get_ram_in_use()
virtual_memory = psutil.virtual_memory()
ram_total = virtual_memory.total
ram_available = virtual_memory.available
ram_used = ram_total - ram_available
# The total size of all the models in the cache will often be larger than the amount of RAM reported by psutil
# (due to lazy-loading and OS RAM caching behaviour). We could just rely on the psutil values, but it feels
# like a bad idea to over-fill the model cache. So, for now, we'll try to keep the total size of models in the
# cache under the total amount of system RAM.
cache_ram_used = self._get_ram_in_use()
ram_used = max(cache_ram_used, ram_used)
# Aim to keep 10% of RAM free.
return int(ram_total * 0.9) - ram_used
def _get_ram_in_use(self) -> int:
"""Get the amount of RAM currently in use."""
return sum(ce.cached_model.total_bytes() for ce in self._cached_models.values())
def _capture_memory_snapshot(self) -> Optional[MemorySnapshot]:
if self._log_memory_usage:
return MemorySnapshot.capture()
return None
def _get_vram_state_str(self, model_cur_vram_bytes: int, model_total_bytes: int, vram_available: int) -> str:
"""Helper function for preparing a VRAM state log string."""
model_cur_vram_bytes_percent = model_cur_vram_bytes / model_total_bytes if model_total_bytes > 0 else 0
return (
f"model_total={model_total_bytes/MB:.0f} MB, "
+ f"model_vram={model_cur_vram_bytes/MB:.0f} MB ({model_cur_vram_bytes_percent:.1%} %), "
# + f"vram_total={int(self._max_vram_cache_size * GB)/MB:.0f} MB, "
+ f"vram_available={(vram_available/MB):.0f} MB, "
)
def _offload_unlocked_models(self, vram_bytes_to_free: int) -> int:
"""Offload models from the execution_device until vram_bytes_to_free bytes are freed, or all models are
offloaded. Of course, locked models are not offloaded.
Returns:
int: The number of bytes freed.
"""
self._logger.debug(f"Offloading unlocked models with goal of freeing {vram_bytes_to_free/MB:.2f}MB of VRAM.")
vram_bytes_freed = 0
# TODO(ryand): Give more thought to the offloading policy used here.
cache_entries_increasing_size = sorted(self._cached_models.values(), key=lambda x: x.cached_model.total_bytes())
for cache_entry in cache_entries_increasing_size:
if vram_bytes_freed >= vram_bytes_to_free:
break
if cache_entry.is_locked:
continue
cache_entry_bytes_freed = self._move_model_to_ram(cache_entry, vram_bytes_to_free - vram_bytes_freed)
if cache_entry_bytes_freed > 0:
self._logger.debug(
f"Unloaded {cache_entry.key} from VRAM to free {(cache_entry_bytes_freed/MB):.0f} MB."
)
vram_bytes_freed += cache_entry_bytes_freed
TorchDevice.empty_cache()
return vram_bytes_freed
def _log_cache_state(self, title: str = "Model cache state:", include_entry_details: bool = True):
if self._logger.getEffectiveLevel() > logging.DEBUG:
# Short circuit if the logger is not set to debug. Some of the data lookups could take a non-negligible
# amount of time.
return
log = f"{title}\n"
log_format = " {:<30} Limit: {:>7.1f} MB, Used: {:>7.1f} MB ({:>5.1%}), Available: {:>7.1f} MB ({:>5.1%})\n"
ram_in_use_bytes = self._get_ram_in_use()
ram_available_bytes = self._get_ram_available()
ram_size_bytes = ram_in_use_bytes + ram_available_bytes
ram_in_use_bytes_percent = ram_in_use_bytes / ram_size_bytes if ram_size_bytes > 0 else 0
ram_available_bytes_percent = ram_available_bytes / ram_size_bytes if ram_size_bytes > 0 else 0
log += log_format.format(
f"Storage Device ({self._storage_device.type})",
ram_size_bytes / MB,
ram_in_use_bytes / MB,
ram_in_use_bytes_percent,
ram_available_bytes / MB,
ram_available_bytes_percent,
)
if self._execution_device.type != "cpu":
vram_in_use_bytes = self._get_vram_in_use()
vram_available_bytes = self._get_vram_available()
vram_size_bytes = vram_in_use_bytes + vram_available_bytes
vram_in_use_bytes_percent = vram_in_use_bytes / vram_size_bytes if vram_size_bytes > 0 else 0
vram_available_bytes_percent = vram_available_bytes / vram_size_bytes if vram_size_bytes > 0 else 0
log += log_format.format(
f"Compute Device ({self._execution_device.type})",
vram_size_bytes / MB,
vram_in_use_bytes / MB,
vram_in_use_bytes_percent,
vram_available_bytes / MB,
vram_available_bytes_percent,
)
if torch.cuda.is_available():
log += " {:<30} {:.1f} MB\n".format("CUDA Memory Allocated:", torch.cuda.memory_allocated() / MB)
log += " {:<30} {}\n".format("Total models:", len(self._cached_models))
if include_entry_details and len(self._cached_models) > 0:
log += " Models:\n"
log_format = (
" {:<80} total={:>7.1f} MB, vram={:>7.1f} MB ({:>5.1%}), ram={:>7.1f} MB ({:>5.1%}), locked={}\n"
)
for cache_record in self._cached_models.values():
total_bytes = cache_record.cached_model.total_bytes()
cur_vram_bytes = cache_record.cached_model.cur_vram_bytes()
cur_vram_bytes_percent = cur_vram_bytes / total_bytes if total_bytes > 0 else 0
cur_ram_bytes = total_bytes - cur_vram_bytes
cur_ram_bytes_percent = cur_ram_bytes / total_bytes if total_bytes > 0 else 0
log += log_format.format(
f"{cache_record.key} ({cache_record.cached_model.model.__class__.__name__}):",
total_bytes / MB,
cur_vram_bytes / MB,
cur_vram_bytes_percent,
cur_ram_bytes / MB,
cur_ram_bytes_percent,
cache_record.is_locked,
)
self._logger.debug(log)
def make_room(self, bytes_needed: int) -> None:
"""Make enough room in the cache to accommodate a new model of indicated size.
Note: This function deletes all of the cache's internal references to a model in order to free it. If there are
external references to the model, there's nothing that the cache can do about it, and those models will not be
garbage-collected.
"""
self._logger.debug(f"Making room for {bytes_needed/MB:.2f}MB of RAM.")
self._log_cache_state(title="Before dropping models:")
ram_bytes_available = self._get_ram_available()
ram_bytes_to_free = max(0, bytes_needed - ram_bytes_available)
ram_bytes_freed = 0
pos = 0
models_cleared = 0
while ram_bytes_freed < ram_bytes_to_free and pos < len(self._cache_stack):
model_key = self._cache_stack[pos]
cache_entry = self._cached_models[model_key]
if not cache_entry.is_locked:
ram_bytes_freed += cache_entry.cached_model.total_bytes()
self._logger.debug(
f"Dropping {model_key} from RAM cache to free {(cache_entry.cached_model.total_bytes()/MB):.2f}MB."
)
self._delete_cache_entry(cache_entry)
del cache_entry
models_cleared += 1
else:
pos += 1
if models_cleared > 0:
# There would likely be some 'garbage' to be collected regardless of whether a model was cleared or not, but
# there is a significant time cost to calling `gc.collect()`, so we want to use it sparingly. (The time cost
# is high even if no garbage gets collected.)
#
# Calling gc.collect(...) when a model is cleared seems like a good middle-ground:
# - If models had to be cleared, it's a signal that we are close to our memory limit.
# - If models were cleared, there's a good chance that there's a significant amount of garbage to be
# collected.
#
# Keep in mind that gc is only responsible for handling reference cycles. Most objects should be cleaned up
# immediately when their reference count hits 0.
if self.stats:
self.stats.cleared = models_cleared
gc.collect()
TorchDevice.empty_cache()
self._logger.debug(f"Dropped {models_cleared} models to free {ram_bytes_freed/MB:.2f}MB of RAM.")
self._log_cache_state(title="After dropping models:")
def _delete_cache_entry(self, cache_entry: CacheRecord) -> None:
"""Delete cache_entry from the cache if it exists. No exception is thrown if it doesn't exist."""
self._cache_stack = [key for key in self._cache_stack if key != cache_entry.key]
self._cached_models.pop(cache_entry.key, None)

View File

@@ -1,221 +0,0 @@
# Copyright (c) 2024 Lincoln D. Stein and the InvokeAI Development team
# TODO: Add Stalker's proper name to copyright
"""
Manage a RAM cache of diffusion/transformer models for fast switching.
They are moved between GPU VRAM and CPU RAM as necessary. If the cache
grows larger than a preset maximum, then the least recently used
model will be cleared and (re)loaded from disk when next needed.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from logging import Logger
from typing import Dict, Generic, Optional, TypeVar
import torch
from invokeai.backend.model_manager.config import AnyModel, SubModelType
class ModelLockerBase(ABC):
"""Base class for the model locker used by the loader."""
@abstractmethod
def lock(self) -> AnyModel:
"""Lock the contained model and move it into VRAM."""
pass
@abstractmethod
def unlock(self) -> None:
"""Unlock the contained model, and remove it from VRAM."""
pass
@abstractmethod
def get_state_dict(self) -> Optional[Dict[str, torch.Tensor]]:
"""Return the state dict (if any) for the cached model."""
pass
@property
@abstractmethod
def model(self) -> AnyModel:
"""Return the model."""
pass
T = TypeVar("T")
@dataclass
class CacheRecord(Generic[T]):
"""
Elements of the cache:
key: Unique key for each model, same as used in the models database.
model: Model in memory.
state_dict: A read-only copy of the model's state dict in RAM. It will be
used as a template for creating a copy in the VRAM.
size: Size of the model
loaded: True if the model's state dict is currently in VRAM
Before a model is executed, the state_dict template is copied into VRAM,
and then injected into the model. When the model is finished, the VRAM
copy of the state dict is deleted, and the RAM version is reinjected
into the model.
The state_dict should be treated as a read-only attribute. Do not attempt
to patch or otherwise modify it. Instead, patch the copy of the state_dict
after it is loaded into the execution device (e.g. CUDA) using the `LoadedModel`
context manager call `model_on_device()`.
"""
key: str
model: T
device: torch.device
state_dict: Optional[Dict[str, torch.Tensor]]
size: int
loaded: bool = False
_locks: int = 0
def lock(self) -> None:
"""Lock this record."""
self._locks += 1
def unlock(self) -> None:
"""Unlock this record."""
self._locks -= 1
assert self._locks >= 0
@property
def locked(self) -> bool:
"""Return true if record is locked."""
return self._locks > 0
@dataclass
class CacheStats(object):
"""Collect statistics on cache performance."""
hits: int = 0 # cache hits
misses: int = 0 # cache misses
high_watermark: int = 0 # amount of cache used
in_cache: int = 0 # number of models in cache
cleared: int = 0 # number of models cleared to make space
cache_size: int = 0 # total size of cache
loaded_model_sizes: Dict[str, int] = field(default_factory=dict)
class ModelCacheBase(ABC, Generic[T]):
"""Virtual base class for RAM model cache."""
@property
@abstractmethod
def storage_device(self) -> torch.device:
"""Return the storage device (e.g. "CPU" for RAM)."""
pass
@property
@abstractmethod
def execution_device(self) -> torch.device:
"""Return the exection device (e.g. "cuda" for VRAM)."""
pass
@property
@abstractmethod
def lazy_offloading(self) -> bool:
"""Return true if the cache is configured to lazily offload models in VRAM."""
pass
@property
@abstractmethod
def max_cache_size(self) -> float:
"""Return the maximum size the RAM cache can grow to."""
pass
@max_cache_size.setter
@abstractmethod
def max_cache_size(self, value: float) -> None:
"""Set the cap on vram cache size."""
@property
@abstractmethod
def max_vram_cache_size(self) -> float:
"""Return the maximum size the VRAM cache can grow to."""
pass
@max_vram_cache_size.setter
@abstractmethod
def max_vram_cache_size(self, value: float) -> float:
"""Set the maximum size the VRAM cache can grow to."""
pass
@abstractmethod
def offload_unlocked_models(self, size_required: int) -> None:
"""Offload from VRAM any models not actively in use."""
pass
@abstractmethod
def move_model_to_device(self, cache_entry: CacheRecord[AnyModel], target_device: torch.device) -> None:
"""Move model into the indicated device."""
pass
@property
@abstractmethod
def stats(self) -> Optional[CacheStats]:
"""Return collected CacheStats object."""
pass
@stats.setter
@abstractmethod
def stats(self, stats: CacheStats) -> None:
"""Set the CacheStats object for collectin cache statistics."""
pass
@property
@abstractmethod
def logger(self) -> Logger:
"""Return the logger used by the cache."""
pass
@abstractmethod
def make_room(self, size: int) -> None:
"""Make enough room in the cache to accommodate a new model of indicated size."""
pass
@abstractmethod
def put(
self,
key: str,
model: T,
submodel_type: Optional[SubModelType] = None,
) -> None:
"""Store model under key and optional submodel_type."""
pass
@abstractmethod
def get(
self,
key: str,
submodel_type: Optional[SubModelType] = None,
stats_name: Optional[str] = None,
) -> ModelLockerBase:
"""
Retrieve model using key and optional submodel_type.
:param key: Opaque model key
:param submodel_type: Type of the submodel to fetch
:param stats_name: A human-readable id for the model for the purposes of
stats reporting.
This may raise an IndexError if the model is not in the cache.
"""
pass
@abstractmethod
def cache_size(self) -> int:
"""Get the total size of the models currently cached."""
pass
@abstractmethod
def print_cuda_stats(self) -> None:
"""Log debugging information on CUDA usage."""
pass

View File

@@ -1,426 +0,0 @@
# Copyright (c) 2024 Lincoln D. Stein and the InvokeAI Development team
# TODO: Add Stalker's proper name to copyright
""" """
import gc
import math
import time
from contextlib import suppress
from logging import Logger
from typing import Dict, List, Optional
import torch
from invokeai.backend.model_manager import AnyModel, SubModelType
from invokeai.backend.model_manager.load.memory_snapshot import MemorySnapshot, get_pretty_snapshot_diff
from invokeai.backend.model_manager.load.model_cache.model_cache_base import (
CacheRecord,
CacheStats,
ModelCacheBase,
ModelLockerBase,
)
from invokeai.backend.model_manager.load.model_cache.model_locker import ModelLocker
from invokeai.backend.model_manager.load.model_util import calc_model_size_by_data
from invokeai.backend.util.devices import TorchDevice
from invokeai.backend.util.logging import InvokeAILogger
# Size of a GB in bytes.
GB = 2**30
# Size of a MB in bytes.
MB = 2**20
class ModelCache(ModelCacheBase[AnyModel]):
"""A cache for managing models in memory.
The cache is based on two levels of model storage:
- execution_device: The device where most models are executed (typically "cuda", "mps", or "cpu").
- storage_device: The device where models are offloaded when not in active use (typically "cpu").
The model cache is based on the following assumptions:
- storage_device_mem_size > execution_device_mem_size
- disk_to_storage_device_transfer_time >> storage_device_to_execution_device_transfer_time
A copy of all models in the cache is always kept on the storage_device. A subset of the models also have a copy on
the execution_device.
Models are moved between the storage_device and the execution_device as necessary. Cache size limits are enforced
on both the storage_device and the execution_device. The execution_device cache uses a smallest-first offload
policy. The storage_device cache uses a least-recently-used (LRU) offload policy.
Note: Neither of these offload policies has really been compared against alternatives. It's likely that different
policies would be better, although the optimal policies are likely heavily dependent on usage patterns and HW
configuration.
The cache returns context manager generators designed to load the model into the execution device (often GPU) within
the context, and unload outside the context.
Example usage:
```
cache = ModelCache(max_cache_size=7.5, max_vram_cache_size=6.0)
with cache.get_model('runwayml/stable-diffusion-1-5') as SD1:
do_something_on_gpu(SD1)
```
"""
def __init__(
self,
max_cache_size: float,
max_vram_cache_size: float,
execution_device: torch.device = torch.device("cuda"),
storage_device: torch.device = torch.device("cpu"),
precision: torch.dtype = torch.float16,
lazy_offloading: bool = True,
log_memory_usage: bool = False,
logger: Optional[Logger] = None,
):
"""
Initialize the model RAM cache.
:param max_cache_size: Maximum size of the storage_device cache in GBs.
:param max_vram_cache_size: Maximum size of the execution_device cache in GBs.
:param execution_device: Torch device to load active model into [torch.device('cuda')]
:param storage_device: Torch device to save inactive model in [torch.device('cpu')]
:param precision: Precision for loaded models [torch.float16]
:param lazy_offloading: Keep model in VRAM until another model needs to be loaded
:param log_memory_usage: If True, a memory snapshot will be captured before and after every model cache
operation, and the result will be logged (at debug level). There is a time cost to capturing the memory
snapshots, so it is recommended to disable this feature unless you are actively inspecting the model cache's
behaviour.
:param logger: InvokeAILogger to use (otherwise creates one)
"""
# allow lazy offloading only when vram cache enabled
self._lazy_offloading = lazy_offloading and max_vram_cache_size > 0
self._max_cache_size: float = max_cache_size
self._max_vram_cache_size: float = max_vram_cache_size
self._execution_device: torch.device = execution_device
self._storage_device: torch.device = storage_device
self._logger = logger or InvokeAILogger.get_logger(self.__class__.__name__)
self._log_memory_usage = log_memory_usage
self._stats: Optional[CacheStats] = None
self._cached_models: Dict[str, CacheRecord[AnyModel]] = {}
self._cache_stack: List[str] = []
@property
def logger(self) -> Logger:
"""Return the logger used by the cache."""
return self._logger
@property
def lazy_offloading(self) -> bool:
"""Return true if the cache is configured to lazily offload models in VRAM."""
return self._lazy_offloading
@property
def storage_device(self) -> torch.device:
"""Return the storage device (e.g. "CPU" for RAM)."""
return self._storage_device
@property
def execution_device(self) -> torch.device:
"""Return the exection device (e.g. "cuda" for VRAM)."""
return self._execution_device
@property
def max_cache_size(self) -> float:
"""Return the cap on cache size."""
return self._max_cache_size
@max_cache_size.setter
def max_cache_size(self, value: float) -> None:
"""Set the cap on cache size."""
self._max_cache_size = value
@property
def max_vram_cache_size(self) -> float:
"""Return the cap on vram cache size."""
return self._max_vram_cache_size
@max_vram_cache_size.setter
def max_vram_cache_size(self, value: float) -> None:
"""Set the cap on vram cache size."""
self._max_vram_cache_size = value
@property
def stats(self) -> Optional[CacheStats]:
"""Return collected CacheStats object."""
return self._stats
@stats.setter
def stats(self, stats: CacheStats) -> None:
"""Set the CacheStats object for collectin cache statistics."""
self._stats = stats
def cache_size(self) -> int:
"""Get the total size of the models currently cached."""
total = 0
for cache_record in self._cached_models.values():
total += cache_record.size
return total
def put(
self,
key: str,
model: AnyModel,
submodel_type: Optional[SubModelType] = None,
) -> None:
"""Store model under key and optional submodel_type."""
key = self._make_cache_key(key, submodel_type)
if key in self._cached_models:
return
size = calc_model_size_by_data(self.logger, model)
self.make_room(size)
running_on_cpu = self.execution_device == torch.device("cpu")
state_dict = model.state_dict() if isinstance(model, torch.nn.Module) and not running_on_cpu else None
cache_record = CacheRecord(key=key, model=model, device=self.storage_device, state_dict=state_dict, size=size)
self._cached_models[key] = cache_record
self._cache_stack.append(key)
def get(
self,
key: str,
submodel_type: Optional[SubModelType] = None,
stats_name: Optional[str] = None,
) -> ModelLockerBase:
"""
Retrieve model using key and optional submodel_type.
:param key: Opaque model key
:param submodel_type: Type of the submodel to fetch
:param stats_name: A human-readable id for the model for the purposes of
stats reporting.
This may raise an IndexError if the model is not in the cache.
"""
key = self._make_cache_key(key, submodel_type)
if key in self._cached_models:
if self.stats:
self.stats.hits += 1
else:
if self.stats:
self.stats.misses += 1
raise IndexError(f"The model with key {key} is not in the cache.")
cache_entry = self._cached_models[key]
# more stats
if self.stats:
stats_name = stats_name or key
self.stats.cache_size = int(self._max_cache_size * GB)
self.stats.high_watermark = max(self.stats.high_watermark, self.cache_size())
self.stats.in_cache = len(self._cached_models)
self.stats.loaded_model_sizes[stats_name] = max(
self.stats.loaded_model_sizes.get(stats_name, 0), cache_entry.size
)
# this moves the entry to the top (right end) of the stack
with suppress(Exception):
self._cache_stack.remove(key)
self._cache_stack.append(key)
return ModelLocker(
cache=self,
cache_entry=cache_entry,
)
def _capture_memory_snapshot(self) -> Optional[MemorySnapshot]:
if self._log_memory_usage:
return MemorySnapshot.capture()
return None
def _make_cache_key(self, model_key: str, submodel_type: Optional[SubModelType] = None) -> str:
if submodel_type:
return f"{model_key}:{submodel_type.value}"
else:
return model_key
def offload_unlocked_models(self, size_required: int) -> None:
"""Offload models from the execution_device to make room for size_required.
:param size_required: The amount of space to clear in the execution_device cache, in bytes.
"""
reserved = self._max_vram_cache_size * GB
vram_in_use = torch.cuda.memory_allocated() + size_required
self.logger.debug(f"{(vram_in_use/GB):.2f}GB VRAM needed for models; max allowed={(reserved/GB):.2f}GB")
for _, cache_entry in sorted(self._cached_models.items(), key=lambda x: x[1].size):
if vram_in_use <= reserved:
break
if not cache_entry.loaded:
continue
if not cache_entry.locked:
self.move_model_to_device(cache_entry, self.storage_device)
cache_entry.loaded = False
vram_in_use = torch.cuda.memory_allocated() + size_required
self.logger.debug(
f"Removing {cache_entry.key} from VRAM to free {(cache_entry.size/GB):.2f}GB; vram free = {(torch.cuda.memory_allocated()/GB):.2f}GB"
)
TorchDevice.empty_cache()
def move_model_to_device(self, cache_entry: CacheRecord[AnyModel], target_device: torch.device) -> None:
"""Move model into the indicated device.
:param cache_entry: The CacheRecord for the model
:param target_device: The torch.device to move the model into
May raise a torch.cuda.OutOfMemoryError
"""
self.logger.debug(f"Called to move {cache_entry.key} to {target_device}")
source_device = cache_entry.device
# Note: We compare device types only so that 'cuda' == 'cuda:0'.
# This would need to be revised to support multi-GPU.
if torch.device(source_device).type == torch.device(target_device).type:
return
# Some models don't have a `to` method, in which case they run in RAM/CPU.
if not hasattr(cache_entry.model, "to"):
return
# This roundabout method for moving the model around is done to avoid
# the cost of moving the model from RAM to VRAM and then back from VRAM to RAM.
# When moving to VRAM, we copy (not move) each element of the state dict from
# RAM to a new state dict in VRAM, and then inject it into the model.
# This operation is slightly faster than running `to()` on the whole model.
#
# When the model needs to be removed from VRAM we simply delete the copy
# of the state dict in VRAM, and reinject the state dict that is cached
# in RAM into the model. So this operation is very fast.
start_model_to_time = time.time()
snapshot_before = self._capture_memory_snapshot()
try:
if cache_entry.state_dict is not None:
assert hasattr(cache_entry.model, "load_state_dict")
if target_device == self.storage_device:
cache_entry.model.load_state_dict(cache_entry.state_dict, assign=True)
else:
new_dict: Dict[str, torch.Tensor] = {}
for k, v in cache_entry.state_dict.items():
new_dict[k] = v.to(target_device, copy=True)
cache_entry.model.load_state_dict(new_dict, assign=True)
cache_entry.model.to(target_device)
cache_entry.device = target_device
except Exception as e: # blow away cache entry
self._delete_cache_entry(cache_entry)
raise e
snapshot_after = self._capture_memory_snapshot()
end_model_to_time = time.time()
self.logger.debug(
f"Moved model '{cache_entry.key}' from {source_device} to"
f" {target_device} in {(end_model_to_time-start_model_to_time):.2f}s."
f"Estimated model size: {(cache_entry.size/GB):.3f} GB."
f"{get_pretty_snapshot_diff(snapshot_before, snapshot_after)}"
)
if (
snapshot_before is not None
and snapshot_after is not None
and snapshot_before.vram is not None
and snapshot_after.vram is not None
):
vram_change = abs(snapshot_before.vram - snapshot_after.vram)
# If the estimated model size does not match the change in VRAM, log a warning.
if not math.isclose(
vram_change,
cache_entry.size,
rel_tol=0.1,
abs_tol=10 * MB,
):
self.logger.debug(
f"Moving model '{cache_entry.key}' from {source_device} to"
f" {target_device} caused an unexpected change in VRAM usage. The model's"
" estimated size may be incorrect. Estimated model size:"
f" {(cache_entry.size/GB):.3f} GB.\n"
f"{get_pretty_snapshot_diff(snapshot_before, snapshot_after)}"
)
def print_cuda_stats(self) -> None:
"""Log CUDA diagnostics."""
vram = "%4.2fG" % (torch.cuda.memory_allocated() / GB)
ram = "%4.2fG" % (self.cache_size() / GB)
in_ram_models = 0
in_vram_models = 0
locked_in_vram_models = 0
for cache_record in self._cached_models.values():
if hasattr(cache_record.model, "device"):
if cache_record.model.device == self.storage_device:
in_ram_models += 1
else:
in_vram_models += 1
if cache_record.locked:
locked_in_vram_models += 1
self.logger.debug(
f"Current VRAM/RAM usage: {vram}/{ram}; models_in_ram/models_in_vram(locked) ="
f" {in_ram_models}/{in_vram_models}({locked_in_vram_models})"
)
def make_room(self, size: int) -> None:
"""Make enough room in the cache to accommodate a new model of indicated size.
Note: This function deletes all of the cache's internal references to a model in order to free it. If there are
external references to the model, there's nothing that the cache can do about it, and those models will not be
garbage-collected.
"""
bytes_needed = size
maximum_size = self.max_cache_size * GB # stored in GB, convert to bytes
current_size = self.cache_size()
if current_size + bytes_needed > maximum_size:
self.logger.debug(
f"Max cache size exceeded: {(current_size/GB):.2f}/{self.max_cache_size:.2f} GB, need an additional"
f" {(bytes_needed/GB):.2f} GB"
)
self.logger.debug(f"Before making_room: cached_models={len(self._cached_models)}")
pos = 0
models_cleared = 0
while current_size + bytes_needed > maximum_size and pos < len(self._cache_stack):
model_key = self._cache_stack[pos]
cache_entry = self._cached_models[model_key]
device = cache_entry.model.device if hasattr(cache_entry.model, "device") else None
self.logger.debug(
f"Model: {model_key}, locks: {cache_entry._locks}, device: {device}, loaded: {cache_entry.loaded}"
)
if not cache_entry.locked:
self.logger.debug(
f"Removing {model_key} from RAM cache to free at least {(size/GB):.2f} GB (-{(cache_entry.size/GB):.2f} GB)"
)
current_size -= cache_entry.size
models_cleared += 1
self._delete_cache_entry(cache_entry)
del cache_entry
else:
pos += 1
if models_cleared > 0:
# There would likely be some 'garbage' to be collected regardless of whether a model was cleared or not, but
# there is a significant time cost to calling `gc.collect()`, so we want to use it sparingly. (The time cost
# is high even if no garbage gets collected.)
#
# Calling gc.collect(...) when a model is cleared seems like a good middle-ground:
# - If models had to be cleared, it's a signal that we are close to our memory limit.
# - If models were cleared, there's a good chance that there's a significant amount of garbage to be
# collected.
#
# Keep in mind that gc is only responsible for handling reference cycles. Most objects should be cleaned up
# immediately when their reference count hits 0.
if self.stats:
self.stats.cleared = models_cleared
gc.collect()
TorchDevice.empty_cache()
self.logger.debug(f"After making room: cached_models={len(self._cached_models)}")
def _delete_cache_entry(self, cache_entry: CacheRecord[AnyModel]) -> None:
self._cache_stack.remove(cache_entry.key)
del self._cached_models[cache_entry.key]

View File

@@ -1,64 +0,0 @@
"""
Base class and implementation of a class that moves models in and out of VRAM.
"""
from typing import Dict, Optional
import torch
from invokeai.backend.model_manager import AnyModel
from invokeai.backend.model_manager.load.model_cache.model_cache_base import (
CacheRecord,
ModelCacheBase,
ModelLockerBase,
)
class ModelLocker(ModelLockerBase):
"""Internal class that mediates movement in and out of GPU."""
def __init__(self, cache: ModelCacheBase[AnyModel], cache_entry: CacheRecord[AnyModel]):
"""
Initialize the model locker.
:param cache: The ModelCache object
:param cache_entry: The entry in the model cache
"""
self._cache = cache
self._cache_entry = cache_entry
@property
def model(self) -> AnyModel:
"""Return the model without moving it around."""
return self._cache_entry.model
def get_state_dict(self) -> Optional[Dict[str, torch.Tensor]]:
"""Return the state dict (if any) for the cached model."""
return self._cache_entry.state_dict
def lock(self) -> AnyModel:
"""Move the model into the execution device (GPU) and lock it."""
self._cache_entry.lock()
try:
if self._cache.lazy_offloading:
self._cache.offload_unlocked_models(self._cache_entry.size)
self._cache.move_model_to_device(self._cache_entry, self._cache.execution_device)
self._cache_entry.loaded = True
self._cache.logger.debug(f"Locking {self._cache_entry.key} in {self._cache.execution_device}")
self._cache.print_cuda_stats()
except torch.cuda.OutOfMemoryError:
self._cache.logger.warning("Insufficient GPU memory to load model. Aborting")
self._cache_entry.unlock()
raise
except Exception:
self._cache_entry.unlock()
raise
return self.model
def unlock(self) -> None:
"""Call upon exit from context."""
self._cache_entry.unlock()
if not self._cache.lazy_offloading:
self._cache.offload_unlocked_models(0)
self._cache.print_cuda_stats()

View File

@@ -0,0 +1,42 @@
from typing import Any, Callable
import torch
from torch.overrides import TorchFunctionMode
def add_autocast_to_module_forward(m: torch.nn.Module, to_device: torch.device):
"""Monkey-patch m.forward(...) with a new forward(...) method that activates device autocasting for its duration."""
old_forward = m.forward
def new_forward(*args: Any, **kwargs: Any):
with TorchFunctionAutocastDeviceContext(to_device):
return old_forward(*args, **kwargs)
m.old_forward = old_forward # type: ignore
m.forward = new_forward
def remove_autocast_from_module_forward(m: torch.nn.Module):
"""Remove the autocast context from m.forward(...) and restore the old forward method."""
if not hasattr(m, "old_forward"):
return
m.forward = m.old_forward
del m.old_forward
def _cast_to_device_and_run(
func: Callable[..., Any], args: tuple[Any, ...], kwargs: dict[str, Any], to_device: torch.device
):
args_on_device = [a.to(to_device) if isinstance(a, torch.Tensor) else a for a in args]
kwargs_on_device = {k: v.to(to_device) if isinstance(v, torch.Tensor) else v for k, v in kwargs.items()}
return func(*args_on_device, **kwargs_on_device)
class TorchFunctionAutocastDeviceContext(TorchFunctionMode):
def __init__(self, to_device: torch.device):
self._to_device = to_device
def __torch_function__(
self, func: Callable[..., Any], types, args: tuple[Any, ...] = (), kwargs: dict[str, Any] | None = None
):
return _cast_to_device_and_run(func, args, kwargs or {}, self._to_device)

View File

@@ -18,7 +18,7 @@ from invokeai.backend.model_manager import (
SubModelType,
)
from invokeai.backend.model_manager.load.load_default import ModelLoader
from invokeai.backend.model_manager.load.model_cache.model_cache_base import ModelCacheBase
from invokeai.backend.model_manager.load.model_cache.model_cache import ModelCache
from invokeai.backend.model_manager.load.model_loader_registry import ModelLoaderRegistry
from invokeai.backend.patches.lora_conversions.flux_control_lora_utils import (
is_state_dict_likely_flux_control,
@@ -46,7 +46,7 @@ class LoRALoader(ModelLoader):
self,
app_config: InvokeAIAppConfig,
logger: Logger,
ram_cache: ModelCacheBase[AnyModel],
ram_cache: ModelCache,
):
"""Initialize the loader."""
super().__init__(app_config, logger, ram_cache)

View File

@@ -25,6 +25,7 @@ from invokeai.backend.model_manager.config import (
DiffusersConfigBase,
MainCheckpointConfig,
)
from invokeai.backend.model_manager.load.model_cache.model_cache import get_model_cache_key
from invokeai.backend.model_manager.load.model_loader_registry import ModelLoaderRegistry
from invokeai.backend.model_manager.load.model_loaders.generic_diffusers import GenericDiffusersLoader
from invokeai.backend.util.silence_warnings import SilenceWarnings
@@ -132,5 +133,5 @@ class StableDiffusionDiffusersModel(GenericDiffusersLoader):
if subtype == submodel_type:
continue
if submodel := getattr(pipeline, subtype.value, None):
self._ram_cache.put(config.key, submodel_type=subtype, model=submodel)
self._ram_cache.put(get_model_cache_key(config.key, subtype), model=submodel)
return getattr(pipeline, submodel_type.value)

View File

@@ -17,58 +17,66 @@ class LayerPatcher:
@staticmethod
@torch.no_grad()
@contextmanager
def apply_model_patches(
def apply_smart_model_patches(
model: torch.nn.Module,
patches: Iterable[Tuple[ModelPatchRaw, float]],
prefix: str,
dtype: torch.dtype,
cached_weights: Optional[Dict[str, torch.Tensor]] = None,
force_direct_patching: bool = False,
force_sidecar_patching: bool = False,
):
"""Apply one or more LoRA patches to a model within a context manager.
Args:
model (torch.nn.Module): The model to patch.
patches (Iterable[Tuple[LoRAModelRaw, float]]): An iterator that returns tuples of LoRA patches and
associated weights. An iterator is used so that the LoRA patches do not need to be loaded into memory
all at once.
prefix (str): The keys in the patches will be filtered to only include weights with this prefix.
cached_weights (Optional[Dict[str, torch.Tensor]], optional): Read-only copy of the model's state dict in
CPU RAM, for efficient unpatching purposes.
"""Apply 'smart' model patching that chooses whether to use direct patching or a sidecar wrapper for each
module.
"""
# original_weights are stored for unpatching layers that are directly patched.
original_weights = OriginalWeightsStorage(cached_weights)
# original_modules are stored for unpatching layers that are wrapped in a LoRASidecarWrapper.
original_modules: dict[str, torch.nn.Module] = {}
try:
for patch, patch_weight in patches:
LayerPatcher.apply_model_patch(
LayerPatcher.apply_smart_model_patch(
model=model,
prefix=prefix,
patch=patch,
patch_weight=patch_weight,
original_weights=original_weights,
original_modules=original_modules,
dtype=dtype,
force_direct_patching=force_direct_patching,
force_sidecar_patching=force_sidecar_patching,
)
del patch
yield
finally:
# Restore directly patched layers.
for param_key, weight in original_weights.get_changed_weights():
cur_param = model.get_parameter(param_key)
cur_param.data = weight.to(dtype=cur_param.dtype, device=cur_param.device, copy=True)
# Restore LoRASidecarWrapper modules.
# Note: This logic assumes no nested modules in original_modules.
for module_key, orig_module in original_modules.items():
module_parent_key, module_name = LayerPatcher._split_parent_key(module_key)
parent_module = model.get_submodule(module_parent_key)
LayerPatcher._set_submodule(parent_module, module_name, orig_module)
@staticmethod
@torch.no_grad()
def apply_model_patch(
def apply_smart_model_patch(
model: torch.nn.Module,
prefix: str,
patch: ModelPatchRaw,
patch_weight: float,
original_weights: OriginalWeightsStorage,
original_modules: dict[str, torch.nn.Module],
dtype: torch.dtype,
force_direct_patching: bool,
force_sidecar_patching: bool,
):
"""Apply a single LoRA patch to a model.
Args:
model (torch.nn.Module): The model to patch.
prefix (str): A string prefix that precedes keys used in the LoRAs weight layers.
patch (LoRAModelRaw): The LoRA model to patch in.
patch_weight (float): The weight of the LoRA patch.
original_weights (OriginalWeightsStorage): Storage for the original weights of the model, for unpatching.
"""Apply a single LoRA patch to a model using the 'smart' patching strategy that chooses whether to use direct
patching or a sidecar wrapper for each module.
"""
if patch_weight == 0:
return
@@ -89,13 +97,51 @@ class LayerPatcher:
model, layer_key[prefix_len:], layer_key_is_flattened=layer_keys_are_flattened
)
LayerPatcher._apply_model_layer_patch(
module_to_patch=module,
module_to_patch_key=module_key,
patch=layer,
patch_weight=patch_weight,
original_weights=original_weights,
)
# Decide whether to use direct patching or a sidecar wrapper.
# Direct patching is preferred, because it results in better runtime speed.
# Reasons to use sidecar patching:
# - The module is quantized, so the caller passed force_sidecar_patching=True.
# - The module is already wrapped in a BaseSidecarWrapper.
# - The module is on the CPU (and we don't want to store a second full copy of the original weights on the
# CPU, since this would double the RAM usage)
# NOTE: For now, we don't check if the layer is quantized here. We assume that this is checked in the caller
# and that the caller will set force_sidecar_patching=True if the layer is quantized.
# TODO(ryand): Handle the case where we are running without a GPU. Should we set a config flag that allows
# forcing full patching even on the CPU?
use_sidecar_patching = False
if force_direct_patching and force_sidecar_patching:
raise ValueError("Cannot force both direct and sidecar patching.")
elif force_direct_patching:
use_sidecar_patching = False
elif force_sidecar_patching:
use_sidecar_patching = True
elif isinstance(module, BaseSidecarWrapper):
use_sidecar_patching = True
elif LayerPatcher._is_any_part_of_layer_on_cpu(module):
use_sidecar_patching = True
if use_sidecar_patching:
LayerPatcher._apply_model_layer_wrapper_patch(
model=model,
module_to_patch=module,
module_to_patch_key=module_key,
patch=layer,
patch_weight=patch_weight,
original_modules=original_modules,
dtype=dtype,
)
else:
LayerPatcher._apply_model_layer_patch(
module_to_patch=module,
module_to_patch_key=module_key,
patch=layer,
patch_weight=patch_weight,
original_weights=original_weights,
)
@staticmethod
def _is_any_part_of_layer_on_cpu(layer: torch.nn.Module) -> bool:
return any(p.device.type == "cpu" for p in layer.parameters())
@staticmethod
@torch.no_grad()
@@ -143,89 +189,6 @@ class LayerPatcher:
patch.to(device=TorchDevice.CPU_DEVICE)
@staticmethod
@torch.no_grad()
@contextmanager
def apply_model_sidecar_patches(
model: torch.nn.Module,
patches: Iterable[Tuple[ModelPatchRaw, float]],
prefix: str,
dtype: torch.dtype,
):
"""Apply one or more LoRA sidecar patches to a model within a context manager. Sidecar patches incur some
overhead compared to normal LoRA patching, but they allow for LoRA layers to applied to base layers in any
quantization format.
Args:
model (torch.nn.Module): The model to patch.
patches (Iterable[Tuple[LoRAModelRaw, float]]): An iterator that returns tuples of LoRA patches and
associated weights. An iterator is used so that the LoRA patches do not need to be loaded into memory
all at once.
prefix (str): The keys in the patches will be filtered to only include weights with this prefix.
dtype (torch.dtype): The compute dtype of the sidecar layers. This cannot easily be inferred from the model,
since the sidecar layers are typically applied on top of quantized layers whose weight dtype is
different from their compute dtype.
"""
original_modules: dict[str, torch.nn.Module] = {}
try:
for patch, patch_weight in patches:
LayerPatcher._apply_model_sidecar_patch(
model=model,
prefix=prefix,
patch=patch,
patch_weight=patch_weight,
original_modules=original_modules,
dtype=dtype,
)
yield
finally:
# Restore original modules.
# Note: This logic assumes no nested modules in original_modules.
for module_key, orig_module in original_modules.items():
module_parent_key, module_name = LayerPatcher._split_parent_key(module_key)
parent_module = model.get_submodule(module_parent_key)
LayerPatcher._set_submodule(parent_module, module_name, orig_module)
@staticmethod
def _apply_model_sidecar_patch(
model: torch.nn.Module,
patch: ModelPatchRaw,
patch_weight: float,
prefix: str,
original_modules: dict[str, torch.nn.Module],
dtype: torch.dtype,
):
"""Apply a single LoRA sidecar patch to a model."""
if patch_weight == 0:
return
# If the layer keys contain a dot, then they are not flattened, and can be directly used to access model
# submodules. If the layer keys do not contain a dot, then they are flattened, meaning that all '.' have been
# replaced with '_'. Non-flattened keys are preferred, because they allow submodules to be accessed directly
# without searching, but some legacy code still uses flattened keys.
layer_keys_are_flattened = "." not in next(iter(patch.layers.keys()))
prefix_len = len(prefix)
for layer_key, layer in patch.layers.items():
if not layer_key.startswith(prefix):
continue
module_key, module = LayerPatcher._get_submodule(
model, layer_key[prefix_len:], layer_key_is_flattened=layer_keys_are_flattened
)
LayerPatcher._apply_model_layer_wrapper_patch(
model=model,
module_to_patch=module,
module_to_patch_key=module_key,
patch=layer,
patch_weight=patch_weight,
original_modules=original_modules,
dtype=dtype,
)
@staticmethod
@torch.no_grad()
def _apply_model_layer_wrapper_patch(

View File

@@ -25,12 +25,9 @@ class InvokeInt8Params(bnb.nn.Int8Params):
self.CB = self.data
self.SCB = self.SCB.cuda()
else:
# we store the 8-bit rows-major weight
# we convert this weight to the turning/ampere weight during the first inference pass
# We quantize the weight and store in 8bit row-major
B = self.data.contiguous().half().cuda(device)
CB, CBt, SCB, SCBt, coo_tensorB = bnb.functional.double_quant(B)
del CBt
del SCBt
CB, SCB, _ = bnb.functional.int8_vectorwise_quant(B)
self.data = CB
self.CB = CB
self.SCB = SCB

View File

@@ -5,8 +5,8 @@ from typing import TYPE_CHECKING
from diffusers import UNet2DConditionModel
from invokeai.backend.patches.layer_patcher import LayerPatcher
from invokeai.backend.patches.model_patch_raw import ModelPatchRaw
from invokeai.backend.patches.model_patcher import LayerPatcher
from invokeai.backend.stable_diffusion.extensions.base import ExtensionBase
if TYPE_CHECKING:
@@ -31,12 +31,16 @@ class LoRAExt(ExtensionBase):
def patch_unet(self, unet: UNet2DConditionModel, original_weights: OriginalWeightsStorage):
lora_model = self._node_context.models.load(self._model_id).model
assert isinstance(lora_model, ModelPatchRaw)
LayerPatcher.apply_model_patch(
LayerPatcher.apply_smart_model_patch(
model=unet,
prefix="lora_unet_",
patch=lora_model,
patch_weight=self._weight,
original_weights=original_weights,
original_modules={},
dtype=unet.dtype,
force_direct_patching=True,
force_sidecar_patching=False,
)
del lora_model

View File

@@ -0,0 +1,12 @@
import logging
from typing import Any, MutableMapping
# Issue with type hints related to LoggerAdapter: https://github.com/python/typeshed/issues/7855
class PrefixedLoggerAdapter(logging.LoggerAdapter): # type: ignore
def __init__(self, logger: logging.Logger, prefix: str):
super().__init__(logger, {})
self.prefix = prefix
def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> tuple[str, MutableMapping[str, Any]]:
return f"[{self.prefix}] {msg}", kwargs

View File

@@ -34,7 +34,7 @@ classifiers = [
dependencies = [
# Core generation dependencies, pinned for reproducible builds.
"accelerate==1.0.1",
"bitsandbytes==0.43.3; sys_platform!='darwin'",
"bitsandbytes==0.45.0; sys_platform!='darwin'",
"clip_anytorch==2.6.0", # replacing "clip @ https://github.com/openai/CLIP/archive/eaa22acb90a5876642d0507623e859909230a52d.zip",
"compel==2.0.2",
"controlnet-aux==0.0.7",
@@ -52,7 +52,7 @@ dependencies = [
"sentencepiece==0.2.0",
"spandrel==0.3.4",
"timm==0.6.13", # needed to override timm latest in controlnet_aux, see https://github.com/isl-org/ZoeDepth/issues/26
"torch<2.5.0", # torch and related dependencies are loosely pinned, will respect requirement of `diffusers[torch]`
"torch~=2.4", # torch and related dependencies are loosely pinned, will respect requirement of `diffusers[torch]`
"torchmetrics",
"torchsde",
"torchvision",

View File

@@ -0,0 +1,50 @@
import pytest
import torch
from invokeai.backend.model_manager.load.model_cache.cached_model.cached_model_only_full_load import (
CachedModelOnlyFullLoad,
)
from tests.backend.model_manager.load.model_cache.dummy_module import DummyModule
parameterize_mps_and_cuda = pytest.mark.parametrize(
("device"),
[
pytest.param(
"mps", marks=pytest.mark.skipif(not torch.backends.mps.is_available(), reason="MPS is not available.")
),
pytest.param("cuda", marks=pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA is not available.")),
],
)
@parameterize_mps_and_cuda
def test_cached_model_total_bytes(device: str):
model = DummyModule()
cached_model = CachedModelOnlyFullLoad(model=model, compute_device=torch.device(device), total_bytes=100)
assert cached_model.total_bytes() == 100
@parameterize_mps_and_cuda
def test_cached_model_is_in_vram(device: str):
model = DummyModule()
cached_model = CachedModelOnlyFullLoad(model=model, compute_device=torch.device(device), total_bytes=100)
assert not cached_model.is_in_vram()
cached_model.full_load_to_vram()
assert cached_model.is_in_vram()
cached_model.full_unload_from_vram()
assert not cached_model.is_in_vram()
@parameterize_mps_and_cuda
def test_cached_model_full_load_and_unload(device: str):
model = DummyModule()
cached_model = CachedModelOnlyFullLoad(model=model, compute_device=torch.device(device), total_bytes=100)
assert cached_model.full_load_to_vram() == 100
assert cached_model.is_in_vram()
assert all(p.device.type == device for p in cached_model.model.parameters())
assert cached_model.full_unload_from_vram() == 100
assert not cached_model.is_in_vram()
assert all(p.device.type == "cpu" for p in cached_model.model.parameters())

View File

@@ -0,0 +1,184 @@
import itertools
import pytest
import torch
from invokeai.backend.model_manager.load.model_cache.cached_model.cached_model_with_partial_load import (
CachedModelWithPartialLoad,
)
from invokeai.backend.util.calc_tensor_size import calc_tensor_size
from tests.backend.model_manager.load.model_cache.dummy_module import DummyModule
parameterize_mps_and_cuda = pytest.mark.parametrize(
("device"),
[
pytest.param(
"mps", marks=pytest.mark.skipif(not torch.backends.mps.is_available(), reason="MPS is not available.")
),
pytest.param("cuda", marks=pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA is not available.")),
],
)
@parameterize_mps_and_cuda
def test_cached_model_total_bytes(device: str):
model = DummyModule()
cached_model = CachedModelWithPartialLoad(model=model, compute_device=torch.device(device))
linear_numel = 10 * 10 + 10
buffer_numel = 10 * 10
assert cached_model.total_bytes() == (2 * linear_numel + buffer_numel) * 4
@parameterize_mps_and_cuda
def test_cached_model_cur_vram_bytes(device: str):
model = DummyModule()
# Model starts in CPU memory.
cached_model = CachedModelWithPartialLoad(model=model, compute_device=torch.device(device))
assert cached_model.cur_vram_bytes() == 0
# Full load the model into VRAM.
cached_model.full_load_to_vram()
assert cached_model.cur_vram_bytes() > 0
assert cached_model.cur_vram_bytes() == cached_model.total_bytes()
assert all(p.device.type == device for p in model.parameters())
assert all(p.device.type == device for p in model.buffers())
@parameterize_mps_and_cuda
def test_cached_model_partial_load(device: str):
model = DummyModule()
# Model starts in CPU memory.
cached_model = CachedModelWithPartialLoad(model=model, compute_device=torch.device(device))
model_total_bytes = cached_model.total_bytes()
assert cached_model.cur_vram_bytes() == 0
# Partially load the model into VRAM.
target_vram_bytes = int(model_total_bytes * 0.6)
loaded_bytes = cached_model.partial_load_to_vram(target_vram_bytes)
assert loaded_bytes > 0
assert loaded_bytes < model_total_bytes
assert loaded_bytes == cached_model.cur_vram_bytes()
assert loaded_bytes == sum(
calc_tensor_size(p) for p in itertools.chain(model.parameters(), model.buffers()) if p.device.type == device
)
@parameterize_mps_and_cuda
def test_cached_model_partial_unload(device: str):
model = DummyModule()
# Model starts in CPU memory.
cached_model = CachedModelWithPartialLoad(model=model, compute_device=torch.device(device))
model_total_bytes = cached_model.total_bytes()
assert cached_model.cur_vram_bytes() == 0
# Full load the model into VRAM.
cached_model.full_load_to_vram()
assert cached_model.cur_vram_bytes() == model_total_bytes
# Partially unload the model from VRAM.
bytes_to_free = int(model_total_bytes * 0.4)
freed_bytes = cached_model.partial_unload_from_vram(bytes_to_free)
assert freed_bytes >= bytes_to_free
assert freed_bytes < model_total_bytes
assert freed_bytes == model_total_bytes - cached_model.cur_vram_bytes()
assert freed_bytes == sum(
calc_tensor_size(p) for p in itertools.chain(model.parameters(), model.buffers()) if p.device.type == "cpu"
)
@parameterize_mps_and_cuda
def test_cached_model_full_load_and_unload(device: str):
model = DummyModule()
cached_model = CachedModelWithPartialLoad(model=model, compute_device=torch.device(device))
# Model starts in CPU memory.
model_total_bytes = cached_model.total_bytes()
assert cached_model.cur_vram_bytes() == 0
# Full load the model into VRAM.
loaded_bytes = cached_model.full_load_to_vram()
assert loaded_bytes > 0
assert loaded_bytes == model_total_bytes
assert loaded_bytes == cached_model.cur_vram_bytes()
assert all(p.device.type == device for p in itertools.chain(model.parameters(), model.buffers()))
# Full unload the model from VRAM.
unloaded_bytes = cached_model.full_unload_from_vram()
assert unloaded_bytes > 0
assert unloaded_bytes == model_total_bytes
assert cached_model.cur_vram_bytes() == 0
assert all(p.device.type == "cpu" for p in itertools.chain(model.parameters(), model.buffers()))
@parameterize_mps_and_cuda
def test_cached_model_full_load_from_partial(device: str):
model = DummyModule()
cached_model = CachedModelWithPartialLoad(model=model, compute_device=torch.device(device))
# Model starts in CPU memory.
model_total_bytes = cached_model.total_bytes()
assert cached_model.cur_vram_bytes() == 0
# Partially load the model into VRAM.
target_vram_bytes = int(model_total_bytes * 0.6)
loaded_bytes = cached_model.partial_load_to_vram(target_vram_bytes)
assert loaded_bytes > 0
assert loaded_bytes < model_total_bytes
assert loaded_bytes == cached_model.cur_vram_bytes()
# Full load the rest of the model into VRAM.
loaded_bytes_2 = cached_model.full_load_to_vram()
assert loaded_bytes_2 > 0
assert loaded_bytes_2 < model_total_bytes
assert loaded_bytes + loaded_bytes_2 == cached_model.cur_vram_bytes()
assert loaded_bytes + loaded_bytes_2 == model_total_bytes
assert all(p.device.type == device for p in itertools.chain(model.parameters(), model.buffers()))
@parameterize_mps_and_cuda
def test_cached_model_full_unload_from_partial(device: str):
model = DummyModule()
cached_model = CachedModelWithPartialLoad(model=model, compute_device=torch.device(device))
# Model starts in CPU memory.
model_total_bytes = cached_model.total_bytes()
assert cached_model.cur_vram_bytes() == 0
# Partially load the model into VRAM.
target_vram_bytes = int(model_total_bytes * 0.6)
loaded_bytes = cached_model.partial_load_to_vram(target_vram_bytes)
assert loaded_bytes > 0
assert loaded_bytes < model_total_bytes
assert loaded_bytes == cached_model.cur_vram_bytes()
# Full unload the model from VRAM.
unloaded_bytes = cached_model.full_unload_from_vram()
assert unloaded_bytes > 0
assert unloaded_bytes == loaded_bytes
assert cached_model.cur_vram_bytes() == 0
assert all(p.device.type == "cpu" for p in itertools.chain(model.parameters(), model.buffers()))
@parameterize_mps_and_cuda
def test_cached_model_get_cpu_state_dict(device: str):
model = DummyModule()
cached_model = CachedModelWithPartialLoad(model=model, compute_device=torch.device(device))
# Model starts in CPU memory.
assert cached_model.cur_vram_bytes() == 0
# The CPU state dict can be accessed and has the expected properties.
cpu_state_dict = cached_model.get_cpu_state_dict()
assert cpu_state_dict is not None
assert len(cpu_state_dict) == len(model.state_dict())
assert all(p.device.type == "cpu" for p in cpu_state_dict.values())
# Full load the model into VRAM.
cached_model.full_load_to_vram()
assert cached_model.cur_vram_bytes() == cached_model.total_bytes()
# The CPU state dict is still available, and still on the CPU.
cpu_state_dict = cached_model.get_cpu_state_dict()
assert cpu_state_dict is not None
assert len(cpu_state_dict) == len(model.state_dict())
assert all(p.device.type == "cpu" for p in cpu_state_dict.values())

View File

@@ -0,0 +1,14 @@
import torch
class DummyModule(torch.nn.Module):
def __init__(self):
super().__init__()
self.linear1 = torch.nn.Linear(10, 10)
self.linear2 = torch.nn.Linear(10, 10)
self.register_buffer("buffer1", torch.ones(10, 10))
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.linear1(x)
x = self.linear2(x)
return x

View File

@@ -0,0 +1,76 @@
import pytest
import torch
from invokeai.backend.model_manager.load.model_cache.torch_function_autocast_context import (
TorchFunctionAutocastDeviceContext,
add_autocast_to_module_forward,
remove_autocast_from_module_forward,
)
from tests.backend.model_manager.load.model_cache.dummy_module import DummyModule
def test_torch_function_autocast_device_context():
if not torch.cuda.is_available():
pytest.skip("CUDA is not available.")
model = DummyModule()
# Model parameters should start off on the CPU.
assert all(p.device.type == "cpu" for p in model.parameters())
with TorchFunctionAutocastDeviceContext(to_device=torch.device("cuda")):
x = torch.randn(10, 10, device="cuda")
y = model(x)
# The model output should be on the GPU.
assert y.device.type == "cuda"
# The model parameters should still be on the CPU.
assert all(p.device.type == "cpu" for p in model.parameters())
def test_add_autocast_to_module_forward():
if not torch.cuda.is_available():
pytest.skip("CUDA is not available.")
model = DummyModule()
assert all(p.device.type == "cpu" for p in model.parameters())
add_autocast_to_module_forward(model, torch.device("cuda"))
# After adding autocast, the model parameters should still be on the CPU.
assert all(p.device.type == "cpu" for p in model.parameters())
x = torch.randn(10, 10, device="cuda")
y = model(x)
# The model output should be on the GPU.
assert y.device.type == "cuda"
# The model parameters should still be on the CPU.
assert all(p.device.type == "cpu" for p in model.parameters())
# The autocast context should automatically be disabled after the model forward call completes.
# So, attempting to perform an operation with comflicting devices should raise an error.
with pytest.raises(RuntimeError):
_ = torch.randn(10, device="cuda") * torch.randn(10, device="cpu")
def test_remove_autocast_from_module_forward():
if not torch.cuda.is_available():
pytest.skip("CUDA is not available.")
model = DummyModule()
# Model parameters should start off on the CPU.
assert all(p.device.type == "cpu" for p in model.parameters())
add_autocast_to_module_forward(model, torch.device("cuda"))
# While the autocast context is active, we should be able to perform operations on the GPU.
x = torch.randn(10, 10, device="cuda")
y = model(x)
assert y.device.type == "cuda"
remove_autocast_from_module_forward(model)
# After removing the autocast context, we should no longer be able to run the model forward method with the GPU.
with pytest.raises(RuntimeError):
_ = model(x)

View File

@@ -25,7 +25,7 @@ from invokeai.backend.model_manager.config import (
ModelVariantType,
VAEDiffusersConfig,
)
from invokeai.backend.model_manager.load import ModelCache
from invokeai.backend.model_manager.load.model_cache.model_cache import ModelCache
from invokeai.backend.util.logging import InvokeAILogger
from tests.backend.model_manager.model_metadata.metadata_examples import (
HFTestLoraMetadata,
@@ -91,9 +91,10 @@ def mm2_download_queue(mm2_session: Session) -> DownloadQueueServiceBase:
@pytest.fixture
def mm2_loader(mm2_app_config: InvokeAIAppConfig) -> ModelLoadServiceBase:
ram_cache = ModelCache(
execution_device_working_mem_gb=mm2_app_config.device_working_mem_gb,
enable_partial_loading=mm2_app_config.enable_partial_loading,
execution_device="cpu",
logger=InvokeAILogger.get_logger(),
max_cache_size=mm2_app_config.ram,
max_vram_cache_size=mm2_app_config.vram,
)
return ModelLoadService(
app_config=mm2_app_config,

View File

@@ -0,0 +1,305 @@
import pytest
import torch
from invokeai.backend.model_manager.load.model_cache.cached_model.cached_model_with_partial_load import (
CachedModelWithPartialLoad,
)
from invokeai.backend.patches.layer_patcher import LayerPatcher
from invokeai.backend.patches.layers.lora_layer import LoRALayer
from invokeai.backend.patches.model_patch_raw import ModelPatchRaw
from invokeai.backend.patches.sidecar_wrappers.base_sidecar_wrapper import BaseSidecarWrapper
class DummyModuleWithOneLayer(torch.nn.Module):
def __init__(self, in_features: int, out_features: int, device: str, dtype: torch.dtype):
super().__init__()
self.linear_layer_1 = torch.nn.Linear(in_features, out_features, device=device, dtype=dtype)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.linear_layer_1(x)
class DummyModuleWithTwoLayers(torch.nn.Module):
def __init__(self, in_features: int, out_features: int, device: str, dtype: torch.dtype):
super().__init__()
self.linear_layer_1 = torch.nn.Linear(in_features, out_features, device=device, dtype=dtype)
self.linear_layer_2 = torch.nn.Linear(out_features, out_features, device=device, dtype=dtype)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.linear_layer_2(self.linear_layer_1(x))
@pytest.mark.parametrize(
"device",
[
"cpu",
pytest.param("cuda", marks=pytest.mark.skipif(not torch.cuda.is_available(), reason="requires CUDA device")),
],
)
@pytest.mark.parametrize("num_loras", [1, 2])
@pytest.mark.parametrize(
["force_sidecar_patching", "force_direct_patching"], [(True, False), (False, True), (False, False)]
)
@torch.no_grad()
def test_apply_smart_model_patches(
device: str, num_loras: int, force_sidecar_patching: bool, force_direct_patching: bool
):
"""Test the basic behavior of ModelPatcher.apply_smart_model_patches(...). Check that unpatching works correctly."""
dtype = torch.float16
linear_in_features = 4
linear_out_features = 8
lora_rank = 2
model = DummyModuleWithOneLayer(linear_in_features, linear_out_features, device=device, dtype=dtype)
# Initialize num_loras LoRA models with weights of 0.5.
lora_weight = 0.5
lora_models: list[tuple[ModelPatchRaw, float]] = []
for _ in range(num_loras):
lora_layers = {
"linear_layer_1": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_rank, linear_in_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_rank), device="cpu", dtype=torch.float16),
},
)
}
lora = ModelPatchRaw(lora_layers)
lora_models.append((lora, lora_weight))
orig_linear_weight = model.linear_layer_1.weight.data.detach().clone()
expected_patched_linear_weight = orig_linear_weight + (lora_rank * lora_weight * num_loras)
# Run inference before patching the model.
input = torch.randn(1, linear_in_features, device=device, dtype=dtype)
output_before_patch = model(input)
expect_sidecar_wrappers = device == "cpu"
if force_sidecar_patching:
expect_sidecar_wrappers = True
elif force_direct_patching:
expect_sidecar_wrappers = False
# Patch the model and run inference during the patch.
with LayerPatcher.apply_smart_model_patches(
model=model,
patches=lora_models,
prefix="",
dtype=dtype,
force_direct_patching=force_direct_patching,
force_sidecar_patching=force_sidecar_patching,
):
if expect_sidecar_wrappers:
# There should be sidecar wrappers in the model.
assert isinstance(model.linear_layer_1, BaseSidecarWrapper)
else:
# There should be no sidecar wrappers in the model.
assert not isinstance(model.linear_layer_1, BaseSidecarWrapper)
torch.testing.assert_close(model.linear_layer_1.weight.data, expected_patched_linear_weight)
# After patching, the patched model should still be on its original device.
assert model.linear_layer_1.weight.data.device.type == device
# After patching, all LoRA layer weights should have been moved back to the cpu.
for lora, _ in lora_models:
assert lora.layers["linear_layer_1"].up.device.type == "cpu"
assert lora.layers["linear_layer_1"].down.device.type == "cpu"
output_during_patch = model(input)
# Run inference after unpatching.
output_after_patch = model(input)
# Check that the output before patching is different from the output during patching.
assert not torch.allclose(output_before_patch, output_during_patch)
# Check that the output before patching is the same as the output after patching.
assert torch.allclose(output_before_patch, output_after_patch)
@pytest.mark.parametrize(["num_loras"], [(1,), (2,)])
@torch.no_grad()
def test_apply_smart_lora_patches_to_partially_loaded_model(num_loras: int):
"""Test the behavior of ModelPatcher.apply_smart_lora_patches(...) when it is applied to a
CachedModelWithPartialLoad that is partially loaded into VRAM.
"""
if not torch.cuda.is_available():
pytest.skip("requires CUDA device")
# Initialize the model on the CPU.
dtype = torch.float16
linear_in_features = 4
linear_out_features = 8
lora_rank = 2
model = DummyModuleWithTwoLayers(linear_in_features, linear_out_features, device="cpu", dtype=dtype)
cached_model = CachedModelWithPartialLoad(model=model, compute_device=torch.device("cuda"))
model_total_bytes = cached_model.total_bytes()
assert cached_model.cur_vram_bytes() == 0
# Partially load the model into VRAM.
target_vram_bytes = int(model_total_bytes * 0.6)
_ = cached_model.partial_load_to_vram(target_vram_bytes)
assert cached_model.model.linear_layer_1.weight.device.type == "cuda"
assert cached_model.model.linear_layer_2.weight.device.type == "cpu"
# Initialize num_loras LoRA models with weights of 0.5.
lora_weight = 0.5
lora_models: list[tuple[ModelPatchRaw, float]] = []
for _ in range(num_loras):
lora_layers = {
"linear_layer_1": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_rank, linear_in_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_rank), device="cpu", dtype=torch.float16),
},
),
"linear_layer_2": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_rank, linear_out_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_rank), device="cpu", dtype=torch.float16),
},
),
}
lora = ModelPatchRaw(lora_layers)
lora_models.append((lora, lora_weight))
# Run inference before patching the model.
input = torch.randn(1, linear_in_features, device="cuda", dtype=dtype)
output_before_patch = cached_model.model(input)
# Patch the model and run inference during the patch.
with LayerPatcher.apply_smart_model_patches(model=cached_model.model, patches=lora_models, prefix="", dtype=dtype):
# Check that the second layer is wrapped in a LoRASidecarWrapper, but the first layer is not.
assert not isinstance(cached_model.model.linear_layer_1, BaseSidecarWrapper)
assert isinstance(cached_model.model.linear_layer_2, BaseSidecarWrapper)
output_during_patch = cached_model.model(input)
# Run inference after unpatching.
output_after_patch = cached_model.model(input)
# Check that the output before patching is different from the output during patching.
assert not torch.allclose(output_before_patch, output_during_patch)
# Check that the output before patching is the same as the output after patching.
assert torch.allclose(output_before_patch, output_after_patch)
@torch.no_grad()
@pytest.mark.parametrize(["num_loras"], [(1,), (2,)])
def test_all_patching_methods_produce_same_output(num_loras: int):
"""Test that apply_lora_wrapper_patches(...) produces the same model outputs as apply_lora_patches(...)."""
dtype = torch.float32
linear_in_features = 4
linear_out_features = 8
lora_rank = 2
model = DummyModuleWithOneLayer(linear_in_features, linear_out_features, device="cpu", dtype=dtype)
# Initialize num_loras LoRA models with weights of 0.5.
lora_weight = 0.5
lora_models: list[tuple[ModelPatchRaw, float]] = []
for _ in range(num_loras):
lora_layers = {
"linear_layer_1": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_rank, linear_in_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_rank), device="cpu", dtype=torch.float16),
},
)
}
lora = ModelPatchRaw(lora_layers)
lora_models.append((lora, lora_weight))
input = torch.randn(1, linear_in_features, device="cpu", dtype=dtype)
with LayerPatcher.apply_smart_model_patches(
model=model, patches=lora_models, prefix="", dtype=dtype, force_direct_patching=True
):
output_force_direct = model(input)
with LayerPatcher.apply_smart_model_patches(
model=model, patches=lora_models, prefix="", dtype=dtype, force_sidecar_patching=True
):
output_force_sidecar = model(input)
with LayerPatcher.apply_smart_model_patches(model=model, patches=lora_models, prefix="", dtype=dtype):
output_smart = model(input)
# Note: We set atol=1e-5 because the test failed occasionally with the default atol=1e-8. Slight numerical
# differences are tolerable and expected due to the difference between sidecar vs. patching.
assert torch.allclose(output_force_direct, output_force_sidecar, atol=1e-5)
assert torch.allclose(output_force_direct, output_smart, atol=1e-5)
@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires CUDA device")
@torch.no_grad()
def test_apply_smart_model_patches_change_device():
"""Test that if LoRA patching is applied on the CPU, and then the patched model is moved to the GPU, unpatching
still behaves correctly.
"""
linear_in_features = 4
linear_out_features = 8
lora_dim = 2
# Initialize the model on the CPU.
model = DummyModuleWithOneLayer(linear_in_features, linear_out_features, device="cpu", dtype=torch.float16)
lora_layers = {
"linear_layer_1": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_dim, linear_in_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_dim), device="cpu", dtype=torch.float16),
},
)
}
lora = ModelPatchRaw(lora_layers)
orig_linear_weight = model.linear_layer_1.weight.data.detach().clone()
with LayerPatcher.apply_smart_model_patches(
model=model, patches=[(lora, 0.5)], prefix="", dtype=torch.float16, force_direct_patching=True
):
# After patching, all LoRA layer weights should have been moved back to the cpu.
assert lora_layers["linear_layer_1"].up.device.type == "cpu"
assert lora_layers["linear_layer_1"].down.device.type == "cpu"
# After patching, the patched model should still be on the CPU.
assert model.linear_layer_1.weight.data.device.type == "cpu"
# There should be no sidecar wrappers in the model.
assert not isinstance(model.linear_layer_1, BaseSidecarWrapper)
# Move the model to the GPU.
assert model.to("cuda")
# After unpatching, the original model weights should have been restored on the GPU.
assert model.linear_layer_1.weight.data.device.type == "cuda"
torch.testing.assert_close(model.linear_layer_1.weight.data, orig_linear_weight, check_device=False)
def test_apply_smart_model_patches_force_sidecar_and_direct_patching():
"""Test that ModelPatcher.apply_smart_model_patches(..., force_direct_patching=True, force_sidecar_patching=True)
raises an error.
"""
linear_in_features = 4
linear_out_features = 8
lora_rank = 2
model = DummyModuleWithOneLayer(linear_in_features, linear_out_features, device="cpu", dtype=torch.float16)
lora_layers = {
"linear_layer_1": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_rank, linear_in_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_rank), device="cpu", dtype=torch.float16),
},
)
}
lora = ModelPatchRaw(lora_layers)
with pytest.raises(ValueError, match="Cannot force both direct and sidecar patching."):
with LayerPatcher.apply_smart_model_patches(
model=model,
patches=[(lora, 0.5)],
prefix="",
dtype=torch.float16,
force_direct_patching=True,
force_sidecar_patching=True,
):
pass

View File

@@ -1,197 +0,0 @@
import pytest
import torch
from invokeai.backend.patches.layers.lora_layer import LoRALayer
from invokeai.backend.patches.model_patch_raw import ModelPatchRaw
from invokeai.backend.patches.model_patcher import LayerPatcher
class DummyModule(torch.nn.Module):
def __init__(self, in_features: int, out_features: int, device: str, dtype: torch.dtype):
super().__init__()
self.linear_layer_1 = torch.nn.Linear(in_features, out_features, device=device, dtype=dtype)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.linear_layer_1(x)
@pytest.mark.parametrize(
["device", "num_layers"],
[
("cpu", 1),
pytest.param("cuda", 1, marks=pytest.mark.skipif(not torch.cuda.is_available(), reason="requires CUDA device")),
("cpu", 2),
pytest.param("cuda", 2, marks=pytest.mark.skipif(not torch.cuda.is_available(), reason="requires CUDA device")),
],
)
@torch.no_grad()
def test_apply_lora_patches(device: str, num_layers: int):
"""Test the basic behavior of ModelPatcher.apply_lora_patches(...). Check that patching and unpatching produce the
correct result, and that model/LoRA tensors are moved between devices as expected.
"""
linear_in_features = 4
linear_out_features = 8
lora_rank = 2
model = DummyModule(linear_in_features, linear_out_features, device=device, dtype=torch.float16)
# Initialize num_layers LoRA models with weights of 0.5.
lora_weight = 0.5
lora_models: list[tuple[ModelPatchRaw, float]] = []
for _ in range(num_layers):
lora_layers = {
"linear_layer_1": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_rank, linear_in_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_rank), device="cpu", dtype=torch.float16),
},
)
}
lora = ModelPatchRaw(lora_layers)
lora_models.append((lora, lora_weight))
orig_linear_weight = model.linear_layer_1.weight.data.detach().clone()
expected_patched_linear_weight = orig_linear_weight + (lora_rank * lora_weight * num_layers)
with LayerPatcher.apply_model_patches(model=model, patches=lora_models, prefix=""):
# After patching, all LoRA layer weights should have been moved back to the cpu.
for lora, _ in lora_models:
assert lora.layers["linear_layer_1"].up.device.type == "cpu"
assert lora.layers["linear_layer_1"].down.device.type == "cpu"
# After patching, the patched model should still be on its original device.
assert model.linear_layer_1.weight.data.device.type == device
torch.testing.assert_close(model.linear_layer_1.weight.data, expected_patched_linear_weight)
# After unpatching, the original model weights should have been restored on the original device.
assert model.linear_layer_1.weight.data.device.type == device
torch.testing.assert_close(model.linear_layer_1.weight.data, orig_linear_weight)
@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires CUDA device")
@torch.no_grad()
def test_apply_lora_patches_change_device():
"""Test that if LoRA patching is applied on the CPU, and then the patched model is moved to the GPU, unpatching
still behaves correctly.
"""
linear_in_features = 4
linear_out_features = 8
lora_dim = 2
# Initialize the model on the CPU.
model = DummyModule(linear_in_features, linear_out_features, device="cpu", dtype=torch.float16)
lora_layers = {
"linear_layer_1": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_dim, linear_in_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_dim), device="cpu", dtype=torch.float16),
},
)
}
lora = ModelPatchRaw(lora_layers)
orig_linear_weight = model.linear_layer_1.weight.data.detach().clone()
with LayerPatcher.apply_model_patches(model=model, patches=[(lora, 0.5)], prefix=""):
# After patching, all LoRA layer weights should have been moved back to the cpu.
assert lora_layers["linear_layer_1"].up.device.type == "cpu"
assert lora_layers["linear_layer_1"].down.device.type == "cpu"
# After patching, the patched model should still be on the CPU.
assert model.linear_layer_1.weight.data.device.type == "cpu"
# Move the model to the GPU.
assert model.to("cuda")
# After unpatching, the original model weights should have been restored on the GPU.
assert model.linear_layer_1.weight.data.device.type == "cuda"
torch.testing.assert_close(model.linear_layer_1.weight.data, orig_linear_weight, check_device=False)
@pytest.mark.parametrize(
["device", "num_layers"],
[
("cpu", 1),
pytest.param("cuda", 1, marks=pytest.mark.skipif(not torch.cuda.is_available(), reason="requires CUDA device")),
("cpu", 2),
pytest.param("cuda", 2, marks=pytest.mark.skipif(not torch.cuda.is_available(), reason="requires CUDA device")),
],
)
def test_apply_lora_sidecar_patches(device: str, num_layers: int):
"""Test the basic behavior of ModelPatcher.apply_lora_sidecar_patches(...). Check that unpatching works correctly."""
dtype = torch.float16
linear_in_features = 4
linear_out_features = 8
lora_rank = 2
model = DummyModule(linear_in_features, linear_out_features, device=device, dtype=dtype)
# Initialize num_layers LoRA models with weights of 0.5.
lora_weight = 0.5
lora_models: list[tuple[ModelPatchRaw, float]] = []
for _ in range(num_layers):
lora_layers = {
"linear_layer_1": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_rank, linear_in_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_rank), device="cpu", dtype=torch.float16),
},
)
}
lora = ModelPatchRaw(lora_layers)
lora_models.append((lora, lora_weight))
# Run inference before patching the model.
input = torch.randn(1, linear_in_features, device=device, dtype=dtype)
output_before_patch = model(input)
# Patch the model and run inference during the patch.
with LayerPatcher.apply_model_sidecar_patches(model=model, patches=lora_models, prefix="", dtype=dtype):
output_during_patch = model(input)
# Run inference after unpatching.
output_after_patch = model(input)
# Check that the output before patching is different from the output during patching.
assert not torch.allclose(output_before_patch, output_during_patch)
# Check that the output before patching is the same as the output after patching.
assert torch.allclose(output_before_patch, output_after_patch)
@torch.no_grad()
@pytest.mark.parametrize(["num_layers"], [(1,), (2,)])
def test_apply_lora_sidecar_patches_matches_apply_lora_patches(num_layers: int):
"""Test that apply_lora_sidecar_patches(...) produces the same model outputs as apply_lora_patches(...)."""
dtype = torch.float32
linear_in_features = 4
linear_out_features = 8
lora_rank = 2
model = DummyModule(linear_in_features, linear_out_features, device="cpu", dtype=dtype)
# Initialize num_layers LoRA models with weights of 0.5.
lora_weight = 0.5
lora_models: list[tuple[ModelPatchRaw, float]] = []
for _ in range(num_layers):
lora_layers = {
"linear_layer_1": LoRALayer.from_state_dict_values(
values={
"lora_down.weight": torch.ones((lora_rank, linear_in_features), device="cpu", dtype=torch.float16),
"lora_up.weight": torch.ones((linear_out_features, lora_rank), device="cpu", dtype=torch.float16),
},
)
}
lora = ModelPatchRaw(lora_layers)
lora_models.append((lora, lora_weight))
input = torch.randn(1, linear_in_features, device="cpu", dtype=dtype)
with LayerPatcher.apply_model_patches(model=model, patches=lora_models, prefix=""):
output_lora_patches = model(input)
with LayerPatcher.apply_model_sidecar_patches(model=model, patches=lora_models, prefix="", dtype=dtype):
output_lora_sidecar_patches = model(input)
# Note: We set atol=1e-5 because the test failed occasionally with the default atol=1e-8. Slight numerical
# differences are tolerable and expected due to the difference between sidecar vs. patching.
assert torch.allclose(output_lora_patches, output_lora_sidecar_patches, atol=1e-5)