mirror of
https://github.com/invoke-ai/InvokeAI.git
synced 2026-01-22 18:47:56 -05:00
Compare commits
44 Commits
v4.2.7
...
ryan/groun
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b5832768dc | ||
|
|
fca119773b | ||
|
|
0193267a53 | ||
|
|
73386826d6 | ||
|
|
9f448fecb7 | ||
|
|
bcd1483a14 | ||
|
|
e206890e25 | ||
|
|
0a7048f650 | ||
|
|
e8ecf5e155 | ||
|
|
33e8604b57 | ||
|
|
cec7399366 | ||
|
|
bdae81e429 | ||
|
|
67c32f3d6c | ||
|
|
5701c79fab | ||
|
|
2da9f913f3 | ||
|
|
6b10b59abe | ||
|
|
918f77bce0 | ||
|
|
aca2a2fa13 | ||
|
|
ff6398f7d8 | ||
|
|
2ad13ac7eb | ||
|
|
693a3eaff5 | ||
|
|
171a4e6d80 | ||
|
|
e3a75a8adf | ||
|
|
ee7503ce13 | ||
|
|
310719eb4c | ||
|
|
e8e24822ec | ||
|
|
c57a7afb87 | ||
|
|
84d028898c | ||
|
|
ed0174fbc6 | ||
|
|
5b84e117b2 | ||
|
|
5810cee6c9 | ||
|
|
bd8890be11 | ||
|
|
adf1a977ea | ||
|
|
6af659b1da | ||
|
|
416d29fb83 | ||
|
|
19c00241c6 | ||
|
|
c323a760a5 | ||
|
|
9d1fcba415 | ||
|
|
ca21996a97 | ||
|
|
62aa064e56 | ||
|
|
87eb018380 | ||
|
|
5003e5d763 | ||
|
|
58f3072b91 | ||
|
|
9e7b470189 |
@@ -37,9 +37,9 @@ from invokeai.app.services.shared.invocation_context import InvocationContext
|
||||
from invokeai.app.util.controlnet_utils import prepare_control_image
|
||||
from invokeai.backend.ip_adapter.ip_adapter import IPAdapter
|
||||
from invokeai.backend.lora import LoRAModelRaw
|
||||
from invokeai.backend.model_manager import BaseModelType
|
||||
from invokeai.backend.model_manager import BaseModelType, ModelVariantType
|
||||
from invokeai.backend.model_patcher import ModelPatcher
|
||||
from invokeai.backend.stable_diffusion import PipelineIntermediateState, set_seamless
|
||||
from invokeai.backend.stable_diffusion import PipelineIntermediateState
|
||||
from invokeai.backend.stable_diffusion.denoise_context import DenoiseContext, DenoiseInputs
|
||||
from invokeai.backend.stable_diffusion.diffusers_pipeline import (
|
||||
ControlNetData,
|
||||
@@ -60,8 +60,12 @@ from invokeai.backend.stable_diffusion.diffusion_backend import StableDiffusionB
|
||||
from invokeai.backend.stable_diffusion.extension_callback_type import ExtensionCallbackType
|
||||
from invokeai.backend.stable_diffusion.extensions.controlnet import ControlNetExt
|
||||
from invokeai.backend.stable_diffusion.extensions.freeu import FreeUExt
|
||||
from invokeai.backend.stable_diffusion.extensions.inpaint import InpaintExt
|
||||
from invokeai.backend.stable_diffusion.extensions.inpaint_model import InpaintModelExt
|
||||
from invokeai.backend.stable_diffusion.extensions.preview import PreviewExt
|
||||
from invokeai.backend.stable_diffusion.extensions.rescale_cfg import RescaleCFGExt
|
||||
from invokeai.backend.stable_diffusion.extensions.seamless import SeamlessExt
|
||||
from invokeai.backend.stable_diffusion.extensions.t2i_adapter import T2IAdapterExt
|
||||
from invokeai.backend.stable_diffusion.extensions_manager import ExtensionsManager
|
||||
from invokeai.backend.stable_diffusion.schedulers import SCHEDULER_MAP
|
||||
from invokeai.backend.stable_diffusion.schedulers.schedulers import SCHEDULER_NAME_VALUES
|
||||
@@ -498,6 +502,33 @@ class DenoiseLatentsInvocation(BaseInvocation):
|
||||
)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def parse_t2i_adapter_field(
|
||||
exit_stack: ExitStack,
|
||||
context: InvocationContext,
|
||||
t2i_adapters: Optional[Union[T2IAdapterField, list[T2IAdapterField]]],
|
||||
ext_manager: ExtensionsManager,
|
||||
) -> None:
|
||||
if t2i_adapters is None:
|
||||
return
|
||||
|
||||
# Handle the possibility that t2i_adapters could be a list or a single T2IAdapterField.
|
||||
if isinstance(t2i_adapters, T2IAdapterField):
|
||||
t2i_adapters = [t2i_adapters]
|
||||
|
||||
for t2i_adapter_field in t2i_adapters:
|
||||
ext_manager.add_extension(
|
||||
T2IAdapterExt(
|
||||
node_context=context,
|
||||
model_id=t2i_adapter_field.t2i_adapter_model,
|
||||
image=context.images.get_pil(t2i_adapter_field.image.image_name),
|
||||
weight=t2i_adapter_field.weight,
|
||||
begin_step_percent=t2i_adapter_field.begin_step_percent,
|
||||
end_step_percent=t2i_adapter_field.end_step_percent,
|
||||
resize_mode=t2i_adapter_field.resize_mode,
|
||||
)
|
||||
)
|
||||
|
||||
def prep_ip_adapter_image_prompts(
|
||||
self,
|
||||
context: InvocationContext,
|
||||
@@ -707,7 +738,7 @@ class DenoiseLatentsInvocation(BaseInvocation):
|
||||
else:
|
||||
masked_latents = torch.where(mask < 0.5, 0.0, latents)
|
||||
|
||||
return 1 - mask, masked_latents, self.denoise_mask.gradient
|
||||
return mask, masked_latents, self.denoise_mask.gradient
|
||||
|
||||
@staticmethod
|
||||
def prepare_noise_and_latents(
|
||||
@@ -765,10 +796,6 @@ class DenoiseLatentsInvocation(BaseInvocation):
|
||||
dtype = TorchDevice.choose_torch_dtype()
|
||||
|
||||
seed, noise, latents = self.prepare_noise_and_latents(context, self.noise, self.latents)
|
||||
latents = latents.to(device=device, dtype=dtype)
|
||||
if noise is not None:
|
||||
noise = noise.to(device=device, dtype=dtype)
|
||||
|
||||
_, _, latent_height, latent_width = latents.shape
|
||||
|
||||
conditioning_data = self.get_conditioning_data(
|
||||
@@ -801,21 +828,6 @@ class DenoiseLatentsInvocation(BaseInvocation):
|
||||
denoising_end=self.denoising_end,
|
||||
)
|
||||
|
||||
denoise_ctx = DenoiseContext(
|
||||
inputs=DenoiseInputs(
|
||||
orig_latents=latents,
|
||||
timesteps=timesteps,
|
||||
init_timestep=init_timestep,
|
||||
noise=noise,
|
||||
seed=seed,
|
||||
scheduler_step_kwargs=scheduler_step_kwargs,
|
||||
conditioning_data=conditioning_data,
|
||||
attention_processor_cls=CustomAttnProcessor2_0,
|
||||
),
|
||||
unet=None,
|
||||
scheduler=scheduler,
|
||||
)
|
||||
|
||||
# get the unet's config so that we can pass the base to sd_step_callback()
|
||||
unet_config = context.models.get_config(self.unet.unet.key)
|
||||
|
||||
@@ -833,6 +845,40 @@ class DenoiseLatentsInvocation(BaseInvocation):
|
||||
if self.unet.freeu_config:
|
||||
ext_manager.add_extension(FreeUExt(self.unet.freeu_config))
|
||||
|
||||
### seamless
|
||||
if self.unet.seamless_axes:
|
||||
ext_manager.add_extension(SeamlessExt(self.unet.seamless_axes))
|
||||
|
||||
### inpaint
|
||||
mask, masked_latents, is_gradient_mask = self.prep_inpaint_mask(context, latents)
|
||||
# NOTE: We used to identify inpainting models by inpecting the shape of the loaded UNet model weights. Now we
|
||||
# use the ModelVariantType config. During testing, there was a report of a user with models that had an
|
||||
# incorrect ModelVariantType value. Re-installing the model fixed the issue. If this issue turns out to be
|
||||
# prevalent, we will have to revisit how we initialize the inpainting extensions.
|
||||
if unet_config.variant == ModelVariantType.Inpaint:
|
||||
ext_manager.add_extension(InpaintModelExt(mask, masked_latents, is_gradient_mask))
|
||||
elif mask is not None:
|
||||
ext_manager.add_extension(InpaintExt(mask, is_gradient_mask))
|
||||
|
||||
# Initialize context for modular denoise
|
||||
latents = latents.to(device=device, dtype=dtype)
|
||||
if noise is not None:
|
||||
noise = noise.to(device=device, dtype=dtype)
|
||||
denoise_ctx = DenoiseContext(
|
||||
inputs=DenoiseInputs(
|
||||
orig_latents=latents,
|
||||
timesteps=timesteps,
|
||||
init_timestep=init_timestep,
|
||||
noise=noise,
|
||||
seed=seed,
|
||||
scheduler_step_kwargs=scheduler_step_kwargs,
|
||||
conditioning_data=conditioning_data,
|
||||
attention_processor_cls=CustomAttnProcessor2_0,
|
||||
),
|
||||
unet=None,
|
||||
scheduler=scheduler,
|
||||
)
|
||||
|
||||
# context for loading additional models
|
||||
with ExitStack() as exit_stack:
|
||||
# later should be smth like:
|
||||
@@ -840,6 +886,7 @@ class DenoiseLatentsInvocation(BaseInvocation):
|
||||
# ext = extension_field.to_extension(exit_stack, context, ext_manager)
|
||||
# ext_manager.add_extension(ext)
|
||||
self.parse_controlnet_field(exit_stack, context, self.control, ext_manager)
|
||||
self.parse_t2i_adapter_field(exit_stack, context, self.t2i_adapter, ext_manager)
|
||||
|
||||
# ext: t2i/ip adapter
|
||||
ext_manager.run_callback(ExtensionCallbackType.SETUP, denoise_ctx)
|
||||
@@ -871,6 +918,10 @@ class DenoiseLatentsInvocation(BaseInvocation):
|
||||
seed, noise, latents = self.prepare_noise_and_latents(context, self.noise, self.latents)
|
||||
|
||||
mask, masked_latents, gradient_mask = self.prep_inpaint_mask(context, latents)
|
||||
# At this point, the mask ranges from 0 (leave unchanged) to 1 (inpaint).
|
||||
# We invert the mask here for compatibility with the old backend implementation.
|
||||
if mask is not None:
|
||||
mask = 1 - mask
|
||||
|
||||
# TODO(ryand): I have hard-coded `do_classifier_free_guidance=True` to mirror the behaviour of ControlNets,
|
||||
# below. Investigate whether this is appropriate.
|
||||
@@ -915,7 +966,7 @@ class DenoiseLatentsInvocation(BaseInvocation):
|
||||
ExitStack() as exit_stack,
|
||||
unet_info.model_on_device() as (model_state_dict, unet),
|
||||
ModelPatcher.apply_freeu(unet, self.unet.freeu_config),
|
||||
set_seamless(unet, self.unet.seamless_axes), # FIXME
|
||||
SeamlessExt.static_patch_model(unet, self.unet.seamless_axes), # FIXME
|
||||
# Apply the LoRA after unet has been moved to its target device for faster patching.
|
||||
ModelPatcher.apply_lora_unet(
|
||||
unet,
|
||||
|
||||
@@ -242,6 +242,23 @@ class ConditioningField(BaseModel):
|
||||
)
|
||||
|
||||
|
||||
class BoundingBoxField(BaseModel):
|
||||
"""A bounding box primitive value."""
|
||||
|
||||
x_min: int = Field(ge=0, description="The minimum x-coordinate of the bounding box (inclusive).")
|
||||
x_max: int = Field(ge=0, description="The maximum x-coordinate of the bounding box (exclusive).")
|
||||
y_min: int = Field(ge=0, description="The minimum y-coordinate of the bounding box (inclusive).")
|
||||
y_max: int = Field(ge=0, description="The maximum y-coordinate of the bounding box (exclusive).")
|
||||
|
||||
score: Optional[float] = Field(
|
||||
default=None,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="The score associated with the bounding box. In the range [0, 1]. This value is typically set "
|
||||
"when the bounding box was produced by a detector and has an associated confidence score.",
|
||||
)
|
||||
|
||||
|
||||
class MetadataField(RootModel[dict[str, Any]]):
|
||||
"""
|
||||
Pydantic model for metadata with custom root of type dict[str, Any].
|
||||
|
||||
95
invokeai/app/invocations/grounding_dino.py
Normal file
95
invokeai/app/invocations/grounding_dino.py
Normal file
@@ -0,0 +1,95 @@
|
||||
from pathlib import Path
|
||||
|
||||
import torch
|
||||
from PIL import Image
|
||||
from transformers import pipeline
|
||||
from transformers.pipelines import ZeroShotObjectDetectionPipeline
|
||||
|
||||
from invokeai.app.invocations.baseinvocation import BaseInvocation, invocation
|
||||
from invokeai.app.invocations.fields import BoundingBoxField, ImageField, InputField
|
||||
from invokeai.app.invocations.primitives import BoundingBoxCollectionOutput
|
||||
from invokeai.app.services.shared.invocation_context import InvocationContext
|
||||
from invokeai.backend.image_util.grounding_dino.detection_result import DetectionResult
|
||||
from invokeai.backend.image_util.grounding_dino.grounding_dino_pipeline import GroundingDinoPipeline
|
||||
|
||||
GROUNDING_DINO_MODEL_ID = "IDEA-Research/grounding-dino-tiny"
|
||||
|
||||
|
||||
@invocation(
|
||||
"grounding_dino",
|
||||
title="Grounding DINO (Text Prompt Object Detection)",
|
||||
tags=["prompt", "object detection"],
|
||||
category="image",
|
||||
version="1.0.0",
|
||||
)
|
||||
class GroundingDinoInvocation(BaseInvocation):
|
||||
"""Runs a Grounding DINO model (https://arxiv.org/pdf/2303.05499). Performs zero-shot bounding-box object detection
|
||||
from a text prompt.
|
||||
|
||||
Reference:
|
||||
- https://huggingface.co/docs/transformers/v4.43.3/en/model_doc/grounding-dino#grounded-sam
|
||||
- https://github.com/NielsRogge/Transformers-Tutorials/blob/a39f33ac1557b02ebfb191ea7753e332b5ca933f/Grounding%20DINO/GroundingDINO_with_Segment_Anything.ipynb
|
||||
"""
|
||||
|
||||
prompt: str = InputField(description="The prompt describing the object to segment.")
|
||||
image: ImageField = InputField(description="The image to segment.")
|
||||
detection_threshold: float = InputField(
|
||||
description="The detection threshold for the Grounding DINO model. All detected bounding boxes with scores above this threshold will be returned.",
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
default=0.3,
|
||||
)
|
||||
|
||||
@torch.no_grad()
|
||||
def invoke(self, context: InvocationContext) -> BoundingBoxCollectionOutput:
|
||||
# The model expects a 3-channel RGB image.
|
||||
image_pil = context.images.get_pil(self.image.image_name, mode="RGB")
|
||||
|
||||
detections = self._detect(
|
||||
context=context, image=image_pil, labels=[self.prompt], threshold=self.detection_threshold
|
||||
)
|
||||
|
||||
# Convert detections to BoundingBoxCollectionOutput.
|
||||
bounding_boxes: list[BoundingBoxField] = []
|
||||
for detection in detections:
|
||||
bounding_boxes.append(
|
||||
BoundingBoxField(
|
||||
x_min=detection.box.xmin,
|
||||
x_max=detection.box.xmax,
|
||||
y_min=detection.box.ymin,
|
||||
y_max=detection.box.ymax,
|
||||
score=detection.score,
|
||||
)
|
||||
)
|
||||
return BoundingBoxCollectionOutput(collection=bounding_boxes)
|
||||
|
||||
@staticmethod
|
||||
def _load_grounding_dino(model_path: Path):
|
||||
grounding_dino_pipeline = pipeline(
|
||||
model=str(model_path),
|
||||
task="zero-shot-object-detection",
|
||||
local_files_only=True,
|
||||
# TODO(ryand): Setting the torch_dtype here doesn't work. Investigate whether fp16 is supported by the
|
||||
# model, and figure out how to make it work in the pipeline.
|
||||
# torch_dtype=TorchDevice.choose_torch_dtype(),
|
||||
)
|
||||
assert isinstance(grounding_dino_pipeline, ZeroShotObjectDetectionPipeline)
|
||||
return GroundingDinoPipeline(grounding_dino_pipeline)
|
||||
|
||||
def _detect(
|
||||
self,
|
||||
context: InvocationContext,
|
||||
image: Image.Image,
|
||||
labels: list[str],
|
||||
threshold: float = 0.3,
|
||||
) -> list[DetectionResult]:
|
||||
"""Use Grounding DINO to detect bounding boxes for a set of labels in an image."""
|
||||
# TODO(ryand): I copied this "."-handling logic from the transformers example code. Test it and see if it
|
||||
# actually makes a difference.
|
||||
labels = [label if label.endswith(".") else label + "." for label in labels]
|
||||
|
||||
with context.models.load_remote_model(
|
||||
source=GROUNDING_DINO_MODEL_ID, loader=GroundingDinoInvocation._load_grounding_dino
|
||||
) as detector:
|
||||
assert isinstance(detector, GroundingDinoPipeline)
|
||||
return detector.detect(image=image, candidate_labels=labels, threshold=threshold)
|
||||
@@ -24,7 +24,7 @@ from invokeai.app.invocations.fields import (
|
||||
from invokeai.app.invocations.model import VAEField
|
||||
from invokeai.app.invocations.primitives import ImageOutput
|
||||
from invokeai.app.services.shared.invocation_context import InvocationContext
|
||||
from invokeai.backend.stable_diffusion import set_seamless
|
||||
from invokeai.backend.stable_diffusion.extensions.seamless import SeamlessExt
|
||||
from invokeai.backend.stable_diffusion.vae_tiling import patch_vae_tiling_params
|
||||
from invokeai.backend.util.devices import TorchDevice
|
||||
|
||||
@@ -59,7 +59,7 @@ class LatentsToImageInvocation(BaseInvocation, WithMetadata, WithBoard):
|
||||
|
||||
vae_info = context.models.load(self.vae.vae)
|
||||
assert isinstance(vae_info.model, (AutoencoderKL, AutoencoderTiny))
|
||||
with set_seamless(vae_info.model, self.vae.seamless_axes), vae_info as vae:
|
||||
with SeamlessExt.static_patch_model(vae_info.model, self.vae.seamless_axes), vae_info as vae:
|
||||
assert isinstance(vae, (AutoencoderKL, AutoencoderTiny))
|
||||
latents = latents.to(vae.device)
|
||||
if self.fp32:
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import numpy as np
|
||||
import torch
|
||||
from PIL import Image
|
||||
|
||||
from invokeai.app.invocations.baseinvocation import BaseInvocation, Classification, InvocationContext, invocation
|
||||
from invokeai.app.invocations.fields import ImageField, InputField, TensorField, WithMetadata
|
||||
from invokeai.app.invocations.primitives import MaskOutput
|
||||
from invokeai.app.invocations.fields import ImageField, InputField, TensorField, WithBoard, WithMetadata
|
||||
from invokeai.app.invocations.primitives import ImageOutput, MaskOutput
|
||||
|
||||
|
||||
@invocation(
|
||||
@@ -118,3 +119,28 @@ class ImageMaskToTensorInvocation(BaseInvocation, WithMetadata):
|
||||
height=mask.shape[1],
|
||||
width=mask.shape[2],
|
||||
)
|
||||
|
||||
|
||||
@invocation(
|
||||
"tensor_mask_to_image",
|
||||
title="Tensor Mask to Image",
|
||||
tags=["mask"],
|
||||
category="mask",
|
||||
version="1.0.0",
|
||||
)
|
||||
class MaskTensorToImageInvocation(BaseInvocation, WithMetadata, WithBoard):
|
||||
"""Convert a mask tensor to an image."""
|
||||
|
||||
mask: TensorField = InputField(description="The mask tensor to convert.")
|
||||
|
||||
def invoke(self, context: InvocationContext) -> ImageOutput:
|
||||
mask = context.tensors.load(self.mask.tensor_name)
|
||||
# Ensure that the mask is binary.
|
||||
if mask.dtype != torch.bool:
|
||||
mask = mask > 0.5
|
||||
mask_np = mask.float().cpu().detach().numpy() * 255
|
||||
mask_np = mask_np.astype(np.uint8)
|
||||
|
||||
mask_pil = Image.fromarray(mask_np, mode="L")
|
||||
image_dto = context.images.save(image=mask_pil)
|
||||
return ImageOutput.build(image_dto)
|
||||
|
||||
@@ -7,6 +7,7 @@ import torch
|
||||
from invokeai.app.invocations.baseinvocation import BaseInvocation, BaseInvocationOutput, invocation, invocation_output
|
||||
from invokeai.app.invocations.constants import LATENT_SCALE_FACTOR
|
||||
from invokeai.app.invocations.fields import (
|
||||
BoundingBoxField,
|
||||
ColorField,
|
||||
ConditioningField,
|
||||
DenoiseMaskField,
|
||||
@@ -469,3 +470,24 @@ class ConditioningCollectionInvocation(BaseInvocation):
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
# region BoundingBox
|
||||
|
||||
|
||||
@invocation_output("bounding_box_output")
|
||||
class BoundingBoxOutput(BaseInvocationOutput):
|
||||
"""Base class for nodes that output a single bounding box"""
|
||||
|
||||
bounding_box: BoundingBoxField = OutputField(description="The output bounding box.")
|
||||
|
||||
|
||||
@invocation_output("bounding_box_collection_output")
|
||||
class BoundingBoxCollectionOutput(BaseInvocationOutput):
|
||||
"""Base class for nodes that output a collection of bounding boxes"""
|
||||
|
||||
collection: list[BoundingBoxField] = OutputField(
|
||||
description="The output bounding boxes.",
|
||||
)
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
155
invokeai/app/invocations/segment_anything_model.py
Normal file
155
invokeai/app/invocations/segment_anything_model.py
Normal file
@@ -0,0 +1,155 @@
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from PIL import Image
|
||||
from transformers import AutoModelForMaskGeneration, AutoProcessor
|
||||
from transformers.models.sam import SamModel
|
||||
from transformers.models.sam.processing_sam import SamProcessor
|
||||
|
||||
from invokeai.app.invocations.baseinvocation import BaseInvocation, invocation
|
||||
from invokeai.app.invocations.fields import BoundingBoxField, ImageField, InputField, TensorField
|
||||
from invokeai.app.invocations.primitives import MaskOutput
|
||||
from invokeai.app.services.shared.invocation_context import InvocationContext
|
||||
from invokeai.backend.image_util.segment_anything.mask_refinement import mask_to_polygon, polygon_to_mask
|
||||
from invokeai.backend.image_util.segment_anything.segment_anything_model import SegmentAnythingModel
|
||||
|
||||
SEGMENT_ANYTHING_MODEL_ID = "facebook/sam-vit-base"
|
||||
|
||||
|
||||
@invocation(
|
||||
"segment_anything_model",
|
||||
title="Segment Anything Model",
|
||||
tags=["prompt", "segmentation"],
|
||||
category="segmentation",
|
||||
version="1.0.0",
|
||||
)
|
||||
class SegmentAnythingModelInvocation(BaseInvocation):
|
||||
"""Runs a Segment Anything Model (https://arxiv.org/pdf/2304.02643).
|
||||
|
||||
Reference:
|
||||
- https://huggingface.co/docs/transformers/v4.43.3/en/model_doc/grounding-dino#grounded-sam
|
||||
- https://github.com/NielsRogge/Transformers-Tutorials/blob/a39f33ac1557b02ebfb191ea7753e332b5ca933f/Grounding%20DINO/GroundingDINO_with_Segment_Anything.ipynb
|
||||
"""
|
||||
|
||||
image: ImageField = InputField(description="The image to segment.")
|
||||
bounding_boxes: list[BoundingBoxField] = InputField(description="The bounding boxes to prompt the SAM model with.")
|
||||
apply_polygon_refinement: bool = InputField(
|
||||
description="Whether to apply polygon refinement to the masks. This will smooth the edges of the masks slightly and ensure that each mask consists of a single closed polygon (before merging).",
|
||||
default=True,
|
||||
)
|
||||
mask_filter: Literal["all", "largest", "highest_box_score"] = InputField(
|
||||
description="The filtering to apply to the detected masks before merging them into a final output.",
|
||||
default="all",
|
||||
)
|
||||
|
||||
@torch.no_grad()
|
||||
def invoke(self, context: InvocationContext) -> MaskOutput:
|
||||
# The models expect a 3-channel RGB image.
|
||||
image_pil = context.images.get_pil(self.image.image_name, mode="RGB")
|
||||
|
||||
if len(self.bounding_boxes) == 0:
|
||||
combined_mask = torch.zeros(image_pil.size[::-1], dtype=torch.bool)
|
||||
else:
|
||||
masks = self._segment(context=context, image=image_pil)
|
||||
masks = self._filter_masks(masks=masks, bounding_boxes=self.bounding_boxes)
|
||||
|
||||
# masks contains bool values, so we merge them via max-reduce.
|
||||
combined_mask, _ = torch.stack(masks).max(dim=0)
|
||||
|
||||
mask_tensor_name = context.tensors.save(combined_mask)
|
||||
height, width = combined_mask.shape
|
||||
return MaskOutput(mask=TensorField(tensor_name=mask_tensor_name), width=width, height=height)
|
||||
|
||||
@staticmethod
|
||||
def _load_sam_model(model_path: Path):
|
||||
sam_model = AutoModelForMaskGeneration.from_pretrained(
|
||||
model_path,
|
||||
local_files_only=True,
|
||||
# TODO(ryand): Setting the torch_dtype here doesn't work. Investigate whether fp16 is supported by the
|
||||
# model, and figure out how to make it work in the pipeline.
|
||||
# torch_dtype=TorchDevice.choose_torch_dtype(),
|
||||
)
|
||||
assert isinstance(sam_model, SamModel)
|
||||
|
||||
sam_processor = AutoProcessor.from_pretrained(model_path, local_files_only=True)
|
||||
assert isinstance(sam_processor, SamProcessor)
|
||||
return SegmentAnythingModel(sam_model=sam_model, sam_processor=sam_processor)
|
||||
|
||||
def _segment(
|
||||
self,
|
||||
context: InvocationContext,
|
||||
image: Image.Image,
|
||||
) -> list[torch.Tensor]:
|
||||
"""Use Segment Anything (SAM) to generate masks given an image + a set of bounding boxes."""
|
||||
# Convert the bounding boxes to the SAM input format.
|
||||
sam_bounding_boxes = [[bb.x_min, bb.y_min, bb.x_max, bb.y_max] for bb in self.bounding_boxes]
|
||||
|
||||
with (
|
||||
context.models.load_remote_model(
|
||||
source=SEGMENT_ANYTHING_MODEL_ID, loader=SegmentAnythingModelInvocation._load_sam_model
|
||||
) as sam_pipeline,
|
||||
):
|
||||
assert isinstance(sam_pipeline, SegmentAnythingModel)
|
||||
masks = sam_pipeline.segment(image=image, bounding_boxes=sam_bounding_boxes)
|
||||
|
||||
masks = self._process_masks(masks)
|
||||
if self.apply_polygon_refinement:
|
||||
masks = self._apply_polygon_refinement(masks)
|
||||
|
||||
return masks
|
||||
|
||||
def _process_masks(self, masks: torch.Tensor) -> list[torch.Tensor]:
|
||||
"""Convert the tensor output from the Segment Anything model from a tensor of shape
|
||||
[num_masks, channels, height, width] to a list of tensors of shape [height, width].
|
||||
"""
|
||||
assert masks.dtype == torch.bool
|
||||
# [num_masks, channels, height, width] -> [num_masks, height, width]
|
||||
masks, _ = masks.max(dim=1)
|
||||
# Split the first dimension into a list of masks.
|
||||
return list(masks.cpu().unbind(dim=0))
|
||||
|
||||
def _apply_polygon_refinement(self, masks: list[torch.Tensor]) -> list[torch.Tensor]:
|
||||
"""Apply polygon refinement to the masks.
|
||||
|
||||
Convert each mask to a polygon, then back to a mask. This has the following effect:
|
||||
- Smooth the edges of the mask slightly.
|
||||
- Ensure that each mask consists of a single closed polygon
|
||||
- Removes small mask pieces.
|
||||
- Removes holes from the mask.
|
||||
"""
|
||||
# Convert tensor masks to np masks.
|
||||
np_masks = [mask.cpu().numpy().astype(np.uint8) for mask in masks]
|
||||
|
||||
# Apply polygon refinement.
|
||||
for idx, mask in enumerate(np_masks):
|
||||
shape = mask.shape
|
||||
assert len(shape) == 2 # Assert length to satisfy type checker.
|
||||
polygon = mask_to_polygon(mask)
|
||||
mask = polygon_to_mask(polygon, shape)
|
||||
np_masks[idx] = mask
|
||||
|
||||
# Convert np masks back to tensor masks.
|
||||
masks = [torch.tensor(mask, dtype=torch.bool) for mask in np_masks]
|
||||
|
||||
return masks
|
||||
|
||||
def _filter_masks(self, masks: list[torch.Tensor], bounding_boxes: list[BoundingBoxField]) -> list[torch.Tensor]:
|
||||
"""Filter the detected masks based on the specified mask filter."""
|
||||
assert len(masks) == len(bounding_boxes)
|
||||
|
||||
if self.mask_filter == "all":
|
||||
return masks
|
||||
elif self.mask_filter == "largest":
|
||||
# Find the largest mask.
|
||||
return [max(masks, key=lambda x: float(x.sum()))]
|
||||
elif self.mask_filter == "highest_box_score":
|
||||
# Find the index of the bounding box with the highest score.
|
||||
# Note that we fallback to -1.0 if the score is None. This is mainly to satisfy the type checker. In most
|
||||
# cases the scores should all be non-None when using this filtering mode. That being said, -1.0 is a
|
||||
# reasonable fallback since the expected score range is [0.0, 1.0].
|
||||
max_score_idx = max(range(len(bounding_boxes)), key=lambda i: bounding_boxes[i].score or -1.0)
|
||||
return [masks[max_score_idx]]
|
||||
else:
|
||||
raise ValueError(f"Invalid mask filter: {self.mask_filter}")
|
||||
@@ -0,0 +1,22 @@
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
|
||||
class BoundingBox(BaseModel):
|
||||
"""Bounding box helper class."""
|
||||
|
||||
xmin: int
|
||||
ymin: int
|
||||
xmax: int
|
||||
ymax: int
|
||||
|
||||
|
||||
class DetectionResult(BaseModel):
|
||||
"""Detection result from Grounding DINO."""
|
||||
|
||||
score: float
|
||||
label: str
|
||||
box: BoundingBox
|
||||
model_config = ConfigDict(
|
||||
# Allow arbitrary types for mask, since it will be a numpy array.
|
||||
arbitrary_types_allowed=True
|
||||
)
|
||||
@@ -0,0 +1,36 @@
|
||||
from typing import Optional
|
||||
|
||||
import torch
|
||||
from PIL import Image
|
||||
from transformers.pipelines import ZeroShotObjectDetectionPipeline
|
||||
|
||||
from invokeai.backend.image_util.grounding_dino.detection_result import DetectionResult
|
||||
from invokeai.backend.raw_model import RawModel
|
||||
|
||||
|
||||
class GroundingDinoPipeline(RawModel):
|
||||
"""A wrapper class for a ZeroShotObjectDetectionPipeline that makes it compatible with the model manager's memory
|
||||
management system.
|
||||
"""
|
||||
|
||||
def __init__(self, pipeline: ZeroShotObjectDetectionPipeline):
|
||||
self._pipeline = pipeline
|
||||
|
||||
def detect(self, image: Image.Image, candidate_labels: list[str], threshold: float = 0.1) -> list[DetectionResult]:
|
||||
results = self._pipeline(image=image, candidate_labels=candidate_labels, threshold=threshold)
|
||||
results = [DetectionResult.model_validate(result) for result in results]
|
||||
return results
|
||||
|
||||
def to(self, device: Optional[torch.device] = None, dtype: Optional[torch.dtype] = None):
|
||||
# HACK(ryand): The GroundingDinoPipeline does not work on MPS devices. We only allow it to be moved to CPU or
|
||||
# CUDA.
|
||||
if device is not None and device.type not in {"cpu", "cuda"}:
|
||||
device = None
|
||||
self._pipeline.model.to(device=device, dtype=dtype)
|
||||
self._pipeline.device = self._pipeline.model.device
|
||||
|
||||
def calc_size(self) -> int:
|
||||
# HACK(ryand): Fix the circular import issue.
|
||||
from invokeai.backend.model_manager.load.model_util import calc_module_size
|
||||
|
||||
return calc_module_size(self._pipeline.model)
|
||||
@@ -0,0 +1,50 @@
|
||||
# This file contains utilities for Grounded-SAM mask refinement based on:
|
||||
# https://github.com/NielsRogge/Transformers-Tutorials/blob/a39f33ac1557b02ebfb191ea7753e332b5ca933f/Grounding%20DINO/GroundingDINO_with_Segment_Anything.ipynb
|
||||
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import numpy.typing as npt
|
||||
|
||||
|
||||
def mask_to_polygon(mask: npt.NDArray[np.uint8]) -> list[tuple[int, int]]:
|
||||
"""Convert a binary mask to a polygon.
|
||||
|
||||
Returns:
|
||||
list[list[int]]: List of (x, y) coordinates representing the vertices of the polygon.
|
||||
"""
|
||||
# Find contours in the binary mask.
|
||||
contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
# Find the contour with the largest area.
|
||||
largest_contour = max(contours, key=cv2.contourArea)
|
||||
|
||||
# Extract the vertices of the contour.
|
||||
polygon = largest_contour.reshape(-1, 2).tolist()
|
||||
|
||||
return polygon
|
||||
|
||||
|
||||
def polygon_to_mask(
|
||||
polygon: list[tuple[int, int]], image_shape: tuple[int, int], fill_value: int = 1
|
||||
) -> npt.NDArray[np.uint8]:
|
||||
"""Convert a polygon to a segmentation mask.
|
||||
|
||||
Args:
|
||||
polygon (list): List of (x, y) coordinates representing the vertices of the polygon.
|
||||
image_shape (tuple): Shape of the image (height, width) for the mask.
|
||||
fill_value (int): Value to fill the polygon with.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Segmentation mask with the polygon filled (with value 255).
|
||||
"""
|
||||
# Create an empty mask.
|
||||
mask = np.zeros(image_shape, dtype=np.uint8)
|
||||
|
||||
# Convert polygon to an array of points.
|
||||
pts = np.array(polygon, dtype=np.int32)
|
||||
|
||||
# Fill the polygon with white color (255).
|
||||
cv2.fillPoly(mask, [pts], color=(fill_value,))
|
||||
|
||||
return mask
|
||||
@@ -0,0 +1,53 @@
|
||||
from typing import Optional
|
||||
|
||||
import torch
|
||||
from PIL import Image
|
||||
from transformers.models.sam import SamModel
|
||||
from transformers.models.sam.processing_sam import SamProcessor
|
||||
|
||||
from invokeai.backend.raw_model import RawModel
|
||||
|
||||
|
||||
class SegmentAnythingModel(RawModel):
|
||||
"""A wrapper class for the transformers SAM model and processor that makes it compatible with the model manager."""
|
||||
|
||||
def __init__(self, sam_model: SamModel, sam_processor: SamProcessor):
|
||||
self._sam_model = sam_model
|
||||
self._sam_processor = sam_processor
|
||||
|
||||
def to(self, device: Optional[torch.device] = None, dtype: Optional[torch.dtype] = None):
|
||||
# HACK(ryand): The SAM pipeline does not work on MPS devices. We only allow it to be moved to CPU or CUDA.
|
||||
if device is not None and device.type not in {"cpu", "cuda"}:
|
||||
device = None
|
||||
self._sam_model.to(device=device, dtype=dtype)
|
||||
|
||||
def calc_size(self) -> int:
|
||||
# HACK(ryand): Fix the circular import issue.
|
||||
from invokeai.backend.model_manager.load.model_util import calc_module_size
|
||||
|
||||
return calc_module_size(self._sam_model)
|
||||
|
||||
def segment(self, image: Image.Image, bounding_boxes: list[list[int]]) -> torch.Tensor:
|
||||
"""Run the SAM model.
|
||||
|
||||
Args:
|
||||
image (Image.Image): The image to segment.
|
||||
bounding_boxes (list[list[int]]): The bounding box prompts. Each bounding box is in the format
|
||||
[xmin, ymin, xmax, ymax].
|
||||
|
||||
Returns:
|
||||
torch.Tensor: The segmentation masks. dtype: torch.bool. shape: [num_masks, channels, height, width].
|
||||
"""
|
||||
# Add batch dimension of 1 to the bounding boxes.
|
||||
boxes = [bounding_boxes]
|
||||
inputs = self._sam_processor(images=image, input_boxes=boxes, return_tensors="pt").to(self._sam_model.device)
|
||||
outputs = self._sam_model(**inputs)
|
||||
masks = self._sam_processor.post_process_masks(
|
||||
masks=outputs.pred_masks,
|
||||
original_sizes=inputs.original_sizes,
|
||||
reshaped_input_sizes=inputs.reshaped_input_sizes,
|
||||
)
|
||||
|
||||
# There should be only one batch.
|
||||
assert len(masks) == 1
|
||||
return masks[0]
|
||||
@@ -11,6 +11,8 @@ from diffusers.pipelines.pipeline_utils import DiffusionPipeline
|
||||
from diffusers.schedulers.scheduling_utils import SchedulerMixin
|
||||
from transformers import CLIPTokenizer
|
||||
|
||||
from invokeai.backend.image_util.grounding_dino.grounding_dino_pipeline import GroundingDinoPipeline
|
||||
from invokeai.backend.image_util.segment_anything.segment_anything_model import SegmentAnythingModel
|
||||
from invokeai.backend.ip_adapter.ip_adapter import IPAdapter
|
||||
from invokeai.backend.lora import LoRAModelRaw
|
||||
from invokeai.backend.model_manager.config import AnyModel
|
||||
@@ -34,7 +36,17 @@ def calc_model_size_by_data(logger: logging.Logger, model: AnyModel) -> int:
|
||||
elif isinstance(model, CLIPTokenizer):
|
||||
# TODO(ryand): Accurately calculate the tokenizer's size. It's small enough that it shouldn't matter for now.
|
||||
return 0
|
||||
elif isinstance(model, (TextualInversionModelRaw, IPAdapter, LoRAModelRaw, SpandrelImageToImageModel)):
|
||||
elif isinstance(
|
||||
model,
|
||||
(
|
||||
TextualInversionModelRaw,
|
||||
IPAdapter,
|
||||
LoRAModelRaw,
|
||||
SpandrelImageToImageModel,
|
||||
GroundingDinoPipeline,
|
||||
SegmentAnythingModel,
|
||||
),
|
||||
):
|
||||
return model.calc_size()
|
||||
else:
|
||||
# TODO(ryand): Promote this from a log to an exception once we are confident that we are handling all of the
|
||||
|
||||
@@ -7,11 +7,9 @@ from invokeai.backend.stable_diffusion.diffusers_pipeline import ( # noqa: F401
|
||||
StableDiffusionGeneratorPipeline,
|
||||
)
|
||||
from invokeai.backend.stable_diffusion.diffusion import InvokeAIDiffuserComponent # noqa: F401
|
||||
from invokeai.backend.stable_diffusion.seamless import set_seamless # noqa: F401
|
||||
|
||||
__all__ = [
|
||||
"PipelineIntermediateState",
|
||||
"StableDiffusionGeneratorPipeline",
|
||||
"InvokeAIDiffuserComponent",
|
||||
"set_seamless",
|
||||
]
|
||||
|
||||
120
invokeai/backend/stable_diffusion/extensions/inpaint.py
Normal file
120
invokeai/backend/stable_diffusion/extensions/inpaint.py
Normal file
@@ -0,0 +1,120 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
import einops
|
||||
import torch
|
||||
from diffusers import UNet2DConditionModel
|
||||
|
||||
from invokeai.backend.stable_diffusion.extension_callback_type import ExtensionCallbackType
|
||||
from invokeai.backend.stable_diffusion.extensions.base import ExtensionBase, callback
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from invokeai.backend.stable_diffusion.denoise_context import DenoiseContext
|
||||
|
||||
|
||||
class InpaintExt(ExtensionBase):
|
||||
"""An extension for inpainting with non-inpainting models. See `InpaintModelExt` for inpainting with inpainting
|
||||
models.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mask: torch.Tensor,
|
||||
is_gradient_mask: bool,
|
||||
):
|
||||
"""Initialize InpaintExt.
|
||||
Args:
|
||||
mask (torch.Tensor): The inpainting mask. Shape: (1, 1, latent_height, latent_width). Values are
|
||||
expected to be in the range [0, 1]. A value of 1 means that the corresponding 'pixel' should not be
|
||||
inpainted.
|
||||
is_gradient_mask (bool): If True, mask is interpreted as a gradient mask meaning that the mask values range
|
||||
from 0 to 1. If False, mask is interpreted as binary mask meaning that the mask values are either 0 or
|
||||
1.
|
||||
"""
|
||||
super().__init__()
|
||||
self._mask = mask
|
||||
self._is_gradient_mask = is_gradient_mask
|
||||
|
||||
# Noise, which used to noisify unmasked part of image
|
||||
# if noise provided to context, then it will be used
|
||||
# if no noise provided, then noise will be generated based on seed
|
||||
self._noise: Optional[torch.Tensor] = None
|
||||
|
||||
@staticmethod
|
||||
def _is_normal_model(unet: UNet2DConditionModel):
|
||||
"""Checks if the provided UNet belongs to a regular model.
|
||||
The `in_channels` of a UNet vary depending on model type:
|
||||
- normal - 4
|
||||
- depth - 5
|
||||
- inpaint - 9
|
||||
"""
|
||||
return unet.conv_in.in_channels == 4
|
||||
|
||||
def _apply_mask(self, ctx: DenoiseContext, latents: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
|
||||
batch_size = latents.size(0)
|
||||
mask = einops.repeat(self._mask, "b c h w -> (repeat b) c h w", repeat=batch_size)
|
||||
if t.dim() == 0:
|
||||
# some schedulers expect t to be one-dimensional.
|
||||
# TODO: file diffusers bug about inconsistency?
|
||||
t = einops.repeat(t, "-> batch", batch=batch_size)
|
||||
# Noise shouldn't be re-randomized between steps here. The multistep schedulers
|
||||
# get very confused about what is happening from step to step when we do that.
|
||||
mask_latents = ctx.scheduler.add_noise(ctx.inputs.orig_latents, self._noise, t)
|
||||
# TODO: Do we need to also apply scheduler.scale_model_input? Or is add_noise appropriately scaled already?
|
||||
# mask_latents = self.scheduler.scale_model_input(mask_latents, t)
|
||||
mask_latents = einops.repeat(mask_latents, "b c h w -> (repeat b) c h w", repeat=batch_size)
|
||||
if self._is_gradient_mask:
|
||||
threshold = (t.item()) / ctx.scheduler.config.num_train_timesteps
|
||||
mask_bool = mask < 1 - threshold
|
||||
masked_input = torch.where(mask_bool, latents, mask_latents)
|
||||
else:
|
||||
masked_input = torch.lerp(latents, mask_latents.to(dtype=latents.dtype), mask.to(dtype=latents.dtype))
|
||||
return masked_input
|
||||
|
||||
@callback(ExtensionCallbackType.PRE_DENOISE_LOOP)
|
||||
def init_tensors(self, ctx: DenoiseContext):
|
||||
if not self._is_normal_model(ctx.unet):
|
||||
raise ValueError(
|
||||
"InpaintExt should be used only on normal (non-inpainting) models. This could be caused by an "
|
||||
"inpainting model that was incorrectly marked as a non-inpainting model. In some cases, this can be "
|
||||
"fixed by removing and re-adding the model (so that it gets re-probed)."
|
||||
)
|
||||
|
||||
self._mask = self._mask.to(device=ctx.latents.device, dtype=ctx.latents.dtype)
|
||||
|
||||
self._noise = ctx.inputs.noise
|
||||
# 'noise' might be None if the latents have already been noised (e.g. when running the SDXL refiner).
|
||||
# We still need noise for inpainting, so we generate it from the seed here.
|
||||
if self._noise is None:
|
||||
self._noise = torch.randn(
|
||||
ctx.latents.shape,
|
||||
dtype=torch.float32,
|
||||
device="cpu",
|
||||
generator=torch.Generator(device="cpu").manual_seed(ctx.seed),
|
||||
).to(device=ctx.latents.device, dtype=ctx.latents.dtype)
|
||||
|
||||
# Use negative order to make extensions with default order work with patched latents
|
||||
@callback(ExtensionCallbackType.PRE_STEP, order=-100)
|
||||
def apply_mask_to_initial_latents(self, ctx: DenoiseContext):
|
||||
ctx.latents = self._apply_mask(ctx, ctx.latents, ctx.timestep)
|
||||
|
||||
# TODO: redo this with preview events rewrite
|
||||
# Use negative order to make extensions with default order work with patched latents
|
||||
@callback(ExtensionCallbackType.POST_STEP, order=-100)
|
||||
def apply_mask_to_step_output(self, ctx: DenoiseContext):
|
||||
timestep = ctx.scheduler.timesteps[-1]
|
||||
if hasattr(ctx.step_output, "denoised"):
|
||||
ctx.step_output.denoised = self._apply_mask(ctx, ctx.step_output.denoised, timestep)
|
||||
elif hasattr(ctx.step_output, "pred_original_sample"):
|
||||
ctx.step_output.pred_original_sample = self._apply_mask(ctx, ctx.step_output.pred_original_sample, timestep)
|
||||
else:
|
||||
ctx.step_output.pred_original_sample = self._apply_mask(ctx, ctx.step_output.prev_sample, timestep)
|
||||
|
||||
# Restore unmasked part after the last step is completed
|
||||
@callback(ExtensionCallbackType.POST_DENOISE_LOOP)
|
||||
def restore_unmasked(self, ctx: DenoiseContext):
|
||||
if self._is_gradient_mask:
|
||||
ctx.latents = torch.where(self._mask < 1, ctx.latents, ctx.inputs.orig_latents)
|
||||
else:
|
||||
ctx.latents = torch.lerp(ctx.latents, ctx.inputs.orig_latents, self._mask)
|
||||
@@ -0,0 +1,88 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
import torch
|
||||
from diffusers import UNet2DConditionModel
|
||||
|
||||
from invokeai.backend.stable_diffusion.extension_callback_type import ExtensionCallbackType
|
||||
from invokeai.backend.stable_diffusion.extensions.base import ExtensionBase, callback
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from invokeai.backend.stable_diffusion.denoise_context import DenoiseContext
|
||||
|
||||
|
||||
class InpaintModelExt(ExtensionBase):
|
||||
"""An extension for inpainting with inpainting models. See `InpaintExt` for inpainting with non-inpainting
|
||||
models.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mask: Optional[torch.Tensor],
|
||||
masked_latents: Optional[torch.Tensor],
|
||||
is_gradient_mask: bool,
|
||||
):
|
||||
"""Initialize InpaintModelExt.
|
||||
Args:
|
||||
mask (Optional[torch.Tensor]): The inpainting mask. Shape: (1, 1, latent_height, latent_width). Values are
|
||||
expected to be in the range [0, 1]. A value of 1 means that the corresponding 'pixel' should not be
|
||||
inpainted.
|
||||
masked_latents (Optional[torch.Tensor]): Latents of initial image, with masked out by black color inpainted area.
|
||||
If mask provided, then too should be provided. Shape: (1, 1, latent_height, latent_width)
|
||||
is_gradient_mask (bool): If True, mask is interpreted as a gradient mask meaning that the mask values range
|
||||
from 0 to 1. If False, mask is interpreted as binary mask meaning that the mask values are either 0 or
|
||||
1.
|
||||
"""
|
||||
super().__init__()
|
||||
if mask is not None and masked_latents is None:
|
||||
raise ValueError("Source image required for inpaint mask when inpaint model used!")
|
||||
|
||||
# Inverse mask, because inpaint models treat mask as: 0 - remain same, 1 - inpaint
|
||||
self._mask = None
|
||||
if mask is not None:
|
||||
self._mask = 1 - mask
|
||||
self._masked_latents = masked_latents
|
||||
self._is_gradient_mask = is_gradient_mask
|
||||
|
||||
@staticmethod
|
||||
def _is_inpaint_model(unet: UNet2DConditionModel):
|
||||
"""Checks if the provided UNet belongs to a regular model.
|
||||
The `in_channels` of a UNet vary depending on model type:
|
||||
- normal - 4
|
||||
- depth - 5
|
||||
- inpaint - 9
|
||||
"""
|
||||
return unet.conv_in.in_channels == 9
|
||||
|
||||
@callback(ExtensionCallbackType.PRE_DENOISE_LOOP)
|
||||
def init_tensors(self, ctx: DenoiseContext):
|
||||
if not self._is_inpaint_model(ctx.unet):
|
||||
raise ValueError("InpaintModelExt should be used only on inpaint models!")
|
||||
|
||||
if self._mask is None:
|
||||
self._mask = torch.ones_like(ctx.latents[:1, :1])
|
||||
self._mask = self._mask.to(device=ctx.latents.device, dtype=ctx.latents.dtype)
|
||||
|
||||
if self._masked_latents is None:
|
||||
self._masked_latents = torch.zeros_like(ctx.latents[:1])
|
||||
self._masked_latents = self._masked_latents.to(device=ctx.latents.device, dtype=ctx.latents.dtype)
|
||||
|
||||
# Do last so that other extensions works with normal latents
|
||||
@callback(ExtensionCallbackType.PRE_UNET, order=1000)
|
||||
def append_inpaint_layers(self, ctx: DenoiseContext):
|
||||
batch_size = ctx.unet_kwargs.sample.shape[0]
|
||||
b_mask = torch.cat([self._mask] * batch_size)
|
||||
b_masked_latents = torch.cat([self._masked_latents] * batch_size)
|
||||
ctx.unet_kwargs.sample = torch.cat(
|
||||
[ctx.unet_kwargs.sample, b_mask, b_masked_latents],
|
||||
dim=1,
|
||||
)
|
||||
|
||||
# Restore unmasked part as inpaint model can change unmasked part slightly
|
||||
@callback(ExtensionCallbackType.POST_DENOISE_LOOP)
|
||||
def restore_unmasked(self, ctx: DenoiseContext):
|
||||
if self._is_gradient_mask:
|
||||
ctx.latents = torch.where(self._mask > 0, ctx.latents, ctx.inputs.orig_latents)
|
||||
else:
|
||||
ctx.latents = torch.lerp(ctx.inputs.orig_latents, ctx.latents, self._mask)
|
||||
71
invokeai/backend/stable_diffusion/extensions/seamless.py
Normal file
71
invokeai/backend/stable_diffusion/extensions/seamless.py
Normal file
@@ -0,0 +1,71 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import contextmanager
|
||||
from typing import Callable, Dict, List, Optional, Tuple
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from diffusers import UNet2DConditionModel
|
||||
from diffusers.models.lora import LoRACompatibleConv
|
||||
|
||||
from invokeai.backend.stable_diffusion.extensions.base import ExtensionBase
|
||||
|
||||
|
||||
class SeamlessExt(ExtensionBase):
|
||||
def __init__(
|
||||
self,
|
||||
seamless_axes: List[str],
|
||||
):
|
||||
super().__init__()
|
||||
self._seamless_axes = seamless_axes
|
||||
|
||||
@contextmanager
|
||||
def patch_unet(self, unet: UNet2DConditionModel, cached_weights: Optional[Dict[str, torch.Tensor]] = None):
|
||||
with self.static_patch_model(
|
||||
model=unet,
|
||||
seamless_axes=self._seamless_axes,
|
||||
):
|
||||
yield
|
||||
|
||||
@staticmethod
|
||||
@contextmanager
|
||||
def static_patch_model(
|
||||
model: torch.nn.Module,
|
||||
seamless_axes: List[str],
|
||||
):
|
||||
if not seamless_axes:
|
||||
yield
|
||||
return
|
||||
|
||||
x_mode = "circular" if "x" in seamless_axes else "constant"
|
||||
y_mode = "circular" if "y" in seamless_axes else "constant"
|
||||
|
||||
# override conv_forward
|
||||
# https://github.com/huggingface/diffusers/issues/556#issuecomment-1993287019
|
||||
def _conv_forward_asymmetric(
|
||||
self, input: torch.Tensor, weight: torch.Tensor, bias: Optional[torch.Tensor] = None
|
||||
):
|
||||
self.paddingX = (self._reversed_padding_repeated_twice[0], self._reversed_padding_repeated_twice[1], 0, 0)
|
||||
self.paddingY = (0, 0, self._reversed_padding_repeated_twice[2], self._reversed_padding_repeated_twice[3])
|
||||
working = torch.nn.functional.pad(input, self.paddingX, mode=x_mode)
|
||||
working = torch.nn.functional.pad(working, self.paddingY, mode=y_mode)
|
||||
return torch.nn.functional.conv2d(
|
||||
working, weight, bias, self.stride, torch.nn.modules.utils._pair(0), self.dilation, self.groups
|
||||
)
|
||||
|
||||
original_layers: List[Tuple[nn.Conv2d, Callable]] = []
|
||||
try:
|
||||
for layer in model.modules():
|
||||
if not isinstance(layer, torch.nn.Conv2d):
|
||||
continue
|
||||
|
||||
if isinstance(layer, LoRACompatibleConv) and layer.lora_layer is None:
|
||||
layer.lora_layer = lambda *x: 0
|
||||
original_layers.append((layer, layer._conv_forward))
|
||||
layer._conv_forward = _conv_forward_asymmetric.__get__(layer, torch.nn.Conv2d)
|
||||
|
||||
yield
|
||||
|
||||
finally:
|
||||
for layer, orig_conv_forward in original_layers:
|
||||
layer._conv_forward = orig_conv_forward
|
||||
120
invokeai/backend/stable_diffusion/extensions/t2i_adapter.py
Normal file
120
invokeai/backend/stable_diffusion/extensions/t2i_adapter.py
Normal file
@@ -0,0 +1,120 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import TYPE_CHECKING, List, Optional, Union
|
||||
|
||||
import torch
|
||||
from diffusers import T2IAdapter
|
||||
from PIL.Image import Image
|
||||
|
||||
from invokeai.app.util.controlnet_utils import prepare_control_image
|
||||
from invokeai.backend.model_manager import BaseModelType
|
||||
from invokeai.backend.stable_diffusion.diffusion.conditioning_data import ConditioningMode
|
||||
from invokeai.backend.stable_diffusion.extension_callback_type import ExtensionCallbackType
|
||||
from invokeai.backend.stable_diffusion.extensions.base import ExtensionBase, callback
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from invokeai.app.invocations.model import ModelIdentifierField
|
||||
from invokeai.app.services.shared.invocation_context import InvocationContext
|
||||
from invokeai.app.util.controlnet_utils import CONTROLNET_RESIZE_VALUES
|
||||
from invokeai.backend.stable_diffusion.denoise_context import DenoiseContext
|
||||
|
||||
|
||||
class T2IAdapterExt(ExtensionBase):
|
||||
def __init__(
|
||||
self,
|
||||
node_context: InvocationContext,
|
||||
model_id: ModelIdentifierField,
|
||||
image: Image,
|
||||
weight: Union[float, List[float]],
|
||||
begin_step_percent: float,
|
||||
end_step_percent: float,
|
||||
resize_mode: CONTROLNET_RESIZE_VALUES,
|
||||
):
|
||||
super().__init__()
|
||||
self._node_context = node_context
|
||||
self._model_id = model_id
|
||||
self._image = image
|
||||
self._weight = weight
|
||||
self._resize_mode = resize_mode
|
||||
self._begin_step_percent = begin_step_percent
|
||||
self._end_step_percent = end_step_percent
|
||||
|
||||
self._adapter_state: Optional[List[torch.Tensor]] = None
|
||||
|
||||
# The max_unet_downscale is the maximum amount that the UNet model downscales the latent image internally.
|
||||
model_config = self._node_context.models.get_config(self._model_id.key)
|
||||
if model_config.base == BaseModelType.StableDiffusion1:
|
||||
self._max_unet_downscale = 8
|
||||
elif model_config.base == BaseModelType.StableDiffusionXL:
|
||||
self._max_unet_downscale = 4
|
||||
else:
|
||||
raise ValueError(f"Unexpected T2I-Adapter base model type: '{model_config.base}'.")
|
||||
|
||||
@callback(ExtensionCallbackType.SETUP)
|
||||
def setup(self, ctx: DenoiseContext):
|
||||
t2i_model: T2IAdapter
|
||||
with self._node_context.models.load(self._model_id) as t2i_model:
|
||||
_, _, latents_height, latents_width = ctx.inputs.orig_latents.shape
|
||||
|
||||
self._adapter_state = self._run_model(
|
||||
model=t2i_model,
|
||||
image=self._image,
|
||||
latents_height=latents_height,
|
||||
latents_width=latents_width,
|
||||
)
|
||||
|
||||
def _run_model(
|
||||
self,
|
||||
model: T2IAdapter,
|
||||
image: Image,
|
||||
latents_height: int,
|
||||
latents_width: int,
|
||||
):
|
||||
# Resize the T2I-Adapter input image.
|
||||
# We select the resize dimensions so that after the T2I-Adapter's total_downscale_factor is applied, the
|
||||
# result will match the latent image's dimensions after max_unet_downscale is applied.
|
||||
input_height = latents_height // self._max_unet_downscale * model.total_downscale_factor
|
||||
input_width = latents_width // self._max_unet_downscale * model.total_downscale_factor
|
||||
|
||||
# Note: We have hard-coded `do_classifier_free_guidance=False`. This is because we only want to prepare
|
||||
# a single image. If CFG is enabled, we will duplicate the resultant tensor after applying the
|
||||
# T2I-Adapter model.
|
||||
#
|
||||
# Note: We re-use the `prepare_control_image(...)` from ControlNet for T2I-Adapter, because it has many
|
||||
# of the same requirements (e.g. preserving binary masks during resize).
|
||||
t2i_image = prepare_control_image(
|
||||
image=image,
|
||||
do_classifier_free_guidance=False,
|
||||
width=input_width,
|
||||
height=input_height,
|
||||
num_channels=model.config["in_channels"],
|
||||
device=model.device,
|
||||
dtype=model.dtype,
|
||||
resize_mode=self._resize_mode,
|
||||
)
|
||||
|
||||
return model(t2i_image)
|
||||
|
||||
@callback(ExtensionCallbackType.PRE_UNET)
|
||||
def pre_unet_step(self, ctx: DenoiseContext):
|
||||
# skip if model not active in current step
|
||||
total_steps = len(ctx.inputs.timesteps)
|
||||
first_step = math.floor(self._begin_step_percent * total_steps)
|
||||
last_step = math.ceil(self._end_step_percent * total_steps)
|
||||
if ctx.step_index < first_step or ctx.step_index > last_step:
|
||||
return
|
||||
|
||||
weight = self._weight
|
||||
if isinstance(weight, list):
|
||||
weight = weight[ctx.step_index]
|
||||
|
||||
adapter_state = self._adapter_state
|
||||
if ctx.conditioning_mode == ConditioningMode.Both:
|
||||
adapter_state = [torch.cat([v] * 2) for v in adapter_state]
|
||||
|
||||
if ctx.unet_kwargs.down_intrablock_additional_residuals is None:
|
||||
ctx.unet_kwargs.down_intrablock_additional_residuals = [v * weight for v in adapter_state]
|
||||
else:
|
||||
for i, value in enumerate(adapter_state):
|
||||
ctx.unet_kwargs.down_intrablock_additional_residuals[i] += value * weight
|
||||
@@ -1,51 +0,0 @@
|
||||
from contextlib import contextmanager
|
||||
from typing import Callable, List, Optional, Tuple, Union
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from diffusers.models.autoencoders.autoencoder_kl import AutoencoderKL
|
||||
from diffusers.models.autoencoders.autoencoder_tiny import AutoencoderTiny
|
||||
from diffusers.models.lora import LoRACompatibleConv
|
||||
from diffusers.models.unets.unet_2d_condition import UNet2DConditionModel
|
||||
|
||||
|
||||
@contextmanager
|
||||
def set_seamless(model: Union[UNet2DConditionModel, AutoencoderKL, AutoencoderTiny], seamless_axes: List[str]):
|
||||
if not seamless_axes:
|
||||
yield
|
||||
return
|
||||
|
||||
# override conv_forward
|
||||
# https://github.com/huggingface/diffusers/issues/556#issuecomment-1993287019
|
||||
def _conv_forward_asymmetric(self, input: torch.Tensor, weight: torch.Tensor, bias: Optional[torch.Tensor] = None):
|
||||
self.paddingX = (self._reversed_padding_repeated_twice[0], self._reversed_padding_repeated_twice[1], 0, 0)
|
||||
self.paddingY = (0, 0, self._reversed_padding_repeated_twice[2], self._reversed_padding_repeated_twice[3])
|
||||
working = torch.nn.functional.pad(input, self.paddingX, mode=x_mode)
|
||||
working = torch.nn.functional.pad(working, self.paddingY, mode=y_mode)
|
||||
return torch.nn.functional.conv2d(
|
||||
working, weight, bias, self.stride, torch.nn.modules.utils._pair(0), self.dilation, self.groups
|
||||
)
|
||||
|
||||
original_layers: List[Tuple[nn.Conv2d, Callable]] = []
|
||||
|
||||
try:
|
||||
x_mode = "circular" if "x" in seamless_axes else "constant"
|
||||
y_mode = "circular" if "y" in seamless_axes else "constant"
|
||||
|
||||
conv_layers: List[torch.nn.Conv2d] = []
|
||||
|
||||
for module in model.modules():
|
||||
if isinstance(module, torch.nn.Conv2d):
|
||||
conv_layers.append(module)
|
||||
|
||||
for layer in conv_layers:
|
||||
if isinstance(layer, LoRACompatibleConv) and layer.lora_layer is None:
|
||||
layer.lora_layer = lambda *x: 0
|
||||
original_layers.append((layer, layer._conv_forward))
|
||||
layer._conv_forward = _conv_forward_asymmetric.__get__(layer, torch.nn.Conv2d)
|
||||
|
||||
yield
|
||||
|
||||
finally:
|
||||
for layer, orig_conv_forward in original_layers:
|
||||
layer._conv_forward = orig_conv_forward
|
||||
@@ -10,32 +10,32 @@ import {
|
||||
import { boardsApi } from 'services/api/endpoints/boards';
|
||||
import { imagesApi } from 'services/api/endpoints/images';
|
||||
|
||||
// Type inference doesn't work for this if you inline it in the listener for some reason
|
||||
const matchAnyBoardDeleted = isAnyOf(
|
||||
imagesApi.endpoints.deleteBoard.matchFulfilled,
|
||||
imagesApi.endpoints.deleteBoardAndImages.matchFulfilled
|
||||
);
|
||||
|
||||
export const addArchivedOrDeletedBoardListener = (startAppListening: AppStartListening) => {
|
||||
/**
|
||||
* The auto-add board shouldn't be set to an archived board or deleted board. When we archive a board, delete
|
||||
* a board, or change a the archived board visibility flag, we may need to reset the auto-add board.
|
||||
*/
|
||||
startAppListening({
|
||||
matcher: isAnyOf(
|
||||
// If a board is deleted, we'll need to reset the auto-add board
|
||||
imagesApi.endpoints.deleteBoard.matchFulfilled,
|
||||
imagesApi.endpoints.deleteBoardAndImages.matchFulfilled
|
||||
),
|
||||
matcher: matchAnyBoardDeleted,
|
||||
effect: async (action, { dispatch, getState }) => {
|
||||
const state = getState();
|
||||
const queryArgs = selectListBoardsQueryArgs(state);
|
||||
const queryResult = boardsApi.endpoints.listAllBoards.select(queryArgs)(state);
|
||||
const deletedBoardId = action.meta.arg.originalArgs;
|
||||
const { autoAddBoardId, selectedBoardId } = state.gallery;
|
||||
|
||||
if (!queryResult.data) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!queryResult.data.find((board) => board.board_id === selectedBoardId)) {
|
||||
// If the deleted board was currently selected, we should reset the selected board to uncategorized
|
||||
if (deletedBoardId === selectedBoardId) {
|
||||
dispatch(boardIdSelected({ boardId: 'none' }));
|
||||
dispatch(galleryViewChanged('images'));
|
||||
}
|
||||
if (!queryResult.data.find((board) => board.board_id === autoAddBoardId)) {
|
||||
|
||||
// If the deleted board was selected for auto-add, we should reset the auto-add board to uncategorized
|
||||
if (deletedBoardId === autoAddBoardId) {
|
||||
dispatch(autoAddBoardIdChanged('none'));
|
||||
}
|
||||
},
|
||||
@@ -46,14 +46,8 @@ export const addArchivedOrDeletedBoardListener = (startAppListening: AppStartLis
|
||||
matcher: boardsApi.endpoints.updateBoard.matchFulfilled,
|
||||
effect: async (action, { dispatch, getState }) => {
|
||||
const state = getState();
|
||||
const queryArgs = selectListBoardsQueryArgs(state);
|
||||
const queryResult = boardsApi.endpoints.listAllBoards.select(queryArgs)(state);
|
||||
const { shouldShowArchivedBoards } = state.gallery;
|
||||
|
||||
if (!queryResult.data) {
|
||||
return;
|
||||
}
|
||||
|
||||
const wasArchived = action.meta.arg.originalArgs.changes.archived === true;
|
||||
|
||||
if (wasArchived && !shouldShowArchivedBoards) {
|
||||
@@ -71,7 +65,7 @@ export const addArchivedOrDeletedBoardListener = (startAppListening: AppStartLis
|
||||
const shouldShowArchivedBoards = action.payload;
|
||||
|
||||
// We only need to take action if we have just hidden archived boards.
|
||||
if (!shouldShowArchivedBoards) {
|
||||
if (shouldShowArchivedBoards) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -86,14 +80,16 @@ export const addArchivedOrDeletedBoardListener = (startAppListening: AppStartLis
|
||||
|
||||
// Handle the case where selected board is archived
|
||||
const selectedBoard = queryResult.data.find((b) => b.board_id === selectedBoardId);
|
||||
if (selectedBoard && selectedBoard.archived) {
|
||||
if (!selectedBoard || selectedBoard.archived) {
|
||||
// If we can't find the selected board or it's archived, we should reset the selected board to uncategorized
|
||||
dispatch(boardIdSelected({ boardId: 'none' }));
|
||||
dispatch(galleryViewChanged('images'));
|
||||
}
|
||||
|
||||
// Handle the case where auto-add board is archived
|
||||
const autoAddBoard = queryResult.data.find((b) => b.board_id === autoAddBoardId);
|
||||
if (autoAddBoard && autoAddBoard.archived) {
|
||||
if (!autoAddBoard || autoAddBoard.archived) {
|
||||
// If we can't find the auto-add board or it's archived, we should reset the selected board to uncategorized
|
||||
dispatch(autoAddBoardIdChanged('none'));
|
||||
}
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user