refactored code; added watermark and nsfw facilities to app config route

This commit is contained in:
Lincoln Stein
2023-07-24 22:02:57 -04:00
committed by psychedelicious
parent 5beb11f4e2
commit 85ad5ef204
7 changed files with 139 additions and 138 deletions

View File

@@ -12,5 +12,4 @@ from .model_management import (
ModelManager, ModelCache, BaseModelType,
ModelType, SubModelType, ModelInfo
)
from .safety_checker import SafetyChecker
from .model_management.models import SilenceWarnings

View File

@@ -28,7 +28,6 @@ from diffusers.schedulers import SchedulerMixin as Scheduler
import invokeai.backend.util.logging as logger
from ..image_util import configure_model_padding
from ..util.util import rand_perlin_2d
from ..safety_checker import SafetyChecker
from ..stable_diffusion.diffusers_pipeline import StableDiffusionGeneratorPipeline
from ..stable_diffusion.schedulers import SCHEDULER_MAP
@@ -52,7 +51,6 @@ class InvokeAIGeneratorBasicParams:
v_symmetry_time_pct: Optional[float]=None
variation_amount: float = 0.0
with_variations: list=field(default_factory=list)
safety_checker: Optional[SafetyChecker]=None
@dataclass
class InvokeAIGeneratorOutput:
@@ -240,7 +238,6 @@ class Generator:
self.seed = None
self.latent_channels = model.unet.config.in_channels
self.downsampling_factor = downsampling # BUG: should come from model or config
self.safety_checker = None
self.perlin = 0.0
self.threshold = 0
self.variation_amount = 0
@@ -277,12 +274,10 @@ class Generator:
perlin=0.0,
h_symmetry_time_pct=None,
v_symmetry_time_pct=None,
safety_checker: SafetyChecker=None,
free_gpu_mem: bool = False,
**kwargs,
):
scope = nullcontext
self.safety_checker = safety_checker
self.free_gpu_mem = free_gpu_mem
attention_maps_images = []
attention_maps_callback = lambda saver: attention_maps_images.append(
@@ -329,9 +324,6 @@ class Generator:
# Pass on the seed in case a layer beneath us needs to generate noise on its own.
image = make_image(x_T, seed)
if self.safety_checker is not None:
image = self.safety_checker.check(image)
results.append([image, seed, attention_maps_images])
if image_callback is not None:

View File

@@ -0,0 +1,34 @@
"""
This module defines a singleton object, "invisible_watermark" that
wraps the invisible watermark model. It respects the global "invisible_watermark"
configuration variable, that allows the watermarking to be supressed.
"""
import numpy as np
import cv2
from PIL import Image
from imwatermark import WatermarkEncoder
from invokeai.app.services.config import InvokeAIAppConfig
import invokeai.backend.util.logging as logger
config = InvokeAIAppConfig.get_config()
class InvisibleWatermark:
"""
Wrapper around InvisibleWatermark module.
"""
@classmethod
def invisible_watermark_available(self) -> bool:
return config.invisible_watermark
@classmethod
def add_watermark(self, image: Image, watermark_text:str) -> Image:
if not self.invisible_watermark_available():
return image
logger.debug(f'Applying invisible watermark "{watermark_text}"')
bgr = cv2.cvtColor(np.array(image.convert("RGB")), cv2.COLOR_RGB2BGR)
encoder = WatermarkEncoder()
encoder.set_watermark('bytes', watermark_text.encode('utf-8'))
bgr_encoded = encoder.encode(bgr, 'dwtDct')
return Image.fromarray(
cv2.cvtColor(bgr_encoded, cv2.COLOR_BGR2RGB)
).convert("RGBA")

View File

@@ -0,0 +1,63 @@
"""
This module defines a singleton object, "safety_checker" that
wraps the safety_checker model. It respects the global "nsfw_checker"
configuration variable, that allows the checker to be supressed.
"""
import numpy as np
from PIL import Image
from invokeai.backend import SilenceWarnings
from invokeai.app.services.config import InvokeAIAppConfig
from invokeai.backend.util.devices import choose_torch_device
import invokeai.backend.util.logging as logger
config = InvokeAIAppConfig.get_config()
CHECKER_PATH = 'core/convert/stable-diffusion-safety-checker'
class SafetyChecker:
"""
Wrapper around SafetyChecker model.
"""
safety_checker = None
feature_extractor = None
tried_load: bool = False
@classmethod
def _load_safety_checker(self):
if self.tried_load:
return
if config.nsfw_checker:
try:
from diffusers.pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker
from transformers import AutoFeatureExtractor
self.safety_checker = StableDiffusionSafetyChecker.from_pretrained(
config.models_path / CHECKER_PATH
)
self.feature_extractor = AutoFeatureExtractor.from_pretrained(
config.models_path / CHECKER_PATH)
logger.info('NSFW checker initialized')
except Exception as e:
logger.warning(f'Could not load NSFW checker: {str(e)}')
else:
logger.info('NSFW checker loading disabled')
self.tried_load = True
@classmethod
def safety_checker_available(self) -> bool:
self._load_safety_checker()
return self.safety_checker is not None
@classmethod
def has_nsfw_concept(self, image: Image) -> bool:
if not self.safety_checker_available():
return False
device = choose_torch_device()
features = self.feature_extractor([image], return_tensors="pt")
features.to(device)
self.safety_checker.to(device)
x_image = np.array(image).astype(np.float32) / 255.0
x_image = x_image[None].transpose(0, 3, 1, 2)
with SilenceWarnings():
checked_image, has_nsfw_concept = self.safety_checker(images=x_image, clip_input=features.pixel_values)
return has_nsfw_concept[0]

View File

@@ -1,77 +0,0 @@
'''
SafetyChecker class - checks images against the StabilityAI NSFW filter
and blurs images that contain potential NSFW content.
'''
import diffusers
import numpy as np
import torch
import traceback
from diffusers.pipelines.stable_diffusion.safety_checker import (
StableDiffusionSafetyChecker,
)
from pathlib import Path
from PIL import Image, ImageFilter
from transformers import AutoFeatureExtractor
import invokeai.assets.web as web_assets
import invokeai.backend.util.logging as logger
from invokeai.app.services.config import InvokeAIAppConfig
from .util import CPU_DEVICE
config = InvokeAIAppConfig.get_config()
class SafetyChecker(object):
CAUTION_IMG = "caution.png"
def __init__(self, device: torch.device):
path = Path(web_assets.__path__[0]) / self.CAUTION_IMG
caution = Image.open(path)
self.caution_img = caution.resize((caution.width // 2, caution.height // 2))
self.device = device
try:
safety_model_id = config.models_path / 'core/convert/stable-diffusion-safety-checker'
feature_extractor_id = config.models_path / 'core/convert/stable-diffusion-safety-checker-extractor'
self.safety_checker = StableDiffusionSafetyChecker.from_pretrained(safety_model_id)
self.safety_feature_extractor = AutoFeatureExtractor.from_pretrained(feature_extractor_id)
except Exception:
logger.error(
"An error was encountered while installing the safety checker:"
)
print(traceback.format_exc())
def check(self, image: Image.Image):
"""
Check provided image against the StabilityAI safety checker and return
"""
self.safety_checker.to(self.device)
features = self.safety_feature_extractor([image], return_tensors="pt")
features.to(self.device)
# unfortunately checker requires the numpy version, so we have to convert back
x_image = np.array(image).astype(np.float32) / 255.0
x_image = x_image[None].transpose(0, 3, 1, 2)
diffusers.logging.set_verbosity_error()
checked_image, has_nsfw_concept = self.safety_checker(
images=x_image, clip_input=features.pixel_values
)
self.safety_checker.to(CPU_DEVICE) # offload
if has_nsfw_concept[0]:
logger.warning(
"An image with potential non-safe content has been detected. A blurred image will be returned."
)
return self.blur(image)
else:
return image
def blur(self, input):
blurry = input.filter(filter=ImageFilter.GaussianBlur(radius=32))
try:
if caution := self.caution_img:
blurry.paste(caution, (0, 0), caution)
except FileNotFoundError:
pass
return blurry