Merge branch 'main' into add_lora_support

This commit is contained in:
Jordan
2023-02-22 13:28:04 -08:00
committed by GitHub
39 changed files with 2416 additions and 2914 deletions

View File

@@ -211,7 +211,7 @@ class Generate:
Globals.full_precision = self.precision == "float32"
if is_xformers_available():
if not Globals.disable_xformers:
if torch.cuda.is_available() and not Globals.disable_xformers:
print(">> xformers memory-efficient attention is available and enabled")
else:
print(
@@ -221,9 +221,13 @@ class Generate:
print(">> xformers not installed")
# model caching system for fast switching
self.model_manager = ModelManager(mconfig, self.device, self.precision,
max_loaded_models=max_loaded_models,
sequential_offload=self.free_gpu_mem)
self.model_manager = ModelManager(
mconfig,
self.device,
self.precision,
max_loaded_models=max_loaded_models,
sequential_offload=self.free_gpu_mem,
)
# don't accept invalid models
fallback = self.model_manager.default_model() or FALLBACK_MODEL_NAME
model = model or fallback
@@ -246,7 +250,7 @@ class Generate:
# load safety checker if requested
if safety_checker:
try:
print(">> Initializing safety checker")
print(">> Initializing NSFW checker")
from diffusers.pipelines.stable_diffusion.safety_checker import (
StableDiffusionSafetyChecker,
)
@@ -270,6 +274,8 @@ class Generate:
"** An error was encountered while installing the safety checker:"
)
print(traceback.format_exc())
else:
print(">> NSFW checker is disabled")
def prompt2png(self, prompt, outdir, **kwargs):
"""

View File

@@ -5,16 +5,19 @@ import sys
import traceback
from argparse import Namespace
from pathlib import Path
from typing import List, Optional, Union
from typing import Union
import click
from compel import PromptParser
if sys.platform == "darwin":
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
import pyparsing # type: ignore
import ldm.invoke
from ..generate import Generate
from .args import (Args, dream_cmd_from_png, metadata_dumps,
metadata_from_png)
@@ -24,7 +27,6 @@ from .image_util import make_grid
from .log import write_log
from .model_manager import ModelManager
from .pngwriter import PngWriter, retrieve_metadata, write_metadata
from .prompt_parser import PromptParser
from .readline import Completer, get_completer
from ..util import url_attachment_name
@@ -82,6 +84,7 @@ def main():
import transformers # type: ignore
from ldm.generate import Generate
transformers.logging.set_verbosity_error()
import diffusers
@@ -154,11 +157,14 @@ def main():
report_model_error(opt, e)
# try to autoconvert new models
# autoimport new .ckpt files
if path := opt.autoimport:
gen.model_manager.heuristic_import(
str(path), convert=False, commit_to_conf=opt.conf
)
if path := opt.autoconvert:
gen.model_manager.autoconvert_weights(
conf_path=opt.conf,
weights_directory=path,
gen.model_manager.heuristic_import(
str(path), convert=True, commit_to_conf=opt.conf
)
# web server loops forever
@@ -528,32 +534,25 @@ def do_command(command: str, gen, opt: Args, completer) -> tuple:
"** please provide (1) a URL to a .ckpt file to import; (2) a local path to a .ckpt file; or (3) a diffusers repository id in the form stabilityai/stable-diffusion-2-1"
)
else:
import_model(path[1], gen, opt, completer)
completer.add_history(command)
try:
import_model(path[1], gen, opt, completer)
completer.add_history(command)
except KeyboardInterrupt:
print('\n')
operation = None
elif command.startswith("!convert"):
elif command.startswith(("!convert","!optimize")):
path = shlex.split(command)
if len(path) < 2:
print("** please provide the path to a .ckpt or .safetensors model")
elif not os.path.exists(path[1]):
print(f"** {path[1]}: model not found")
else:
optimize_model(path[1], gen, opt, completer)
completer.add_history(command)
try:
convert_model(path[1], gen, opt, completer)
completer.add_history(command)
except KeyboardInterrupt:
print('\n')
operation = None
elif command.startswith("!optimize"):
path = shlex.split(command)
if len(path) < 2:
print("** please provide an installed model name")
elif not path[1] in gen.model_manager.list_models():
print(f"** {path[1]}: model not found")
else:
optimize_model(path[1], gen, opt, completer)
completer.add_history(command)
operation = None
elif command.startswith("!edit"):
path = shlex.split(command)
if len(path) < 2:
@@ -625,190 +624,69 @@ def set_default_output_dir(opt: Args, completer: Completer):
completer.set_default_dir(opt.outdir)
def import_model(model_path: str, gen, opt, completer):
def import_model(model_path: str, gen, opt, completer, convert=False) -> str:
"""
model_path can be (1) a URL to a .ckpt file; (2) a local .ckpt file path;
(3) a huggingface repository id; or (4) a local directory containing a
diffusers model.
"""
model_path = model_path.replace('\\','/') # windows
model_path = model_path.replace("\\", "/") # windows
default_name = Path(model_path).stem
model_name = None
model_desc = None
if model_path.startswith(("http:", "https:", "ftp:")):
model_name = import_ckpt_model(model_path, gen, opt, completer)
elif (
os.path.exists(model_path)
and model_path.endswith((".ckpt", ".safetensors"))
and os.path.isfile(model_path)
if (
Path(model_path).is_dir()
and not (Path(model_path) / "model_index.json").exists()
):
model_name = import_ckpt_model(model_path, gen, opt, completer)
elif os.path.isdir(model_path):
# Allow for a directory containing multiple models.
models = list(Path(model_path).rglob("*.ckpt")) + list(
Path(model_path).rglob("*.safetensors")
)
if models:
# Only the last model name will be used below.
for model in sorted(models):
if click.confirm(f"Import {model.stem} ?", default=True):
model_name = import_ckpt_model(model, gen, opt, completer)
print()
else:
model_name = import_diffuser_model(Path(model_path), gen, opt, completer)
elif re.match(r"^[\w.+-]+/[\w.+-]+$", model_path):
model_name = import_diffuser_model(model_path, gen, opt, completer)
pass
else:
print(
f"** {model_path} is neither the path to a .ckpt file nor a diffusers repository id. Can't import."
)
if model_path.startswith(('http:','https:')):
try:
default_name = url_attachment_name(model_path)
default_name = Path(default_name).stem
except Exception as e:
print(f'** URL: {str(e)}')
model_name, model_desc = _get_model_name_and_desc(
gen.model_manager,
completer,
model_name=default_name,
)
imported_name = gen.model_manager.heuristic_import(
model_path,
model_name=model_name,
description=model_desc,
convert=convert,
)
if not model_name:
if not imported_name:
print("** Import failed or was skipped")
return
if not _verify_load(model_name, gen):
if not _verify_load(imported_name, gen):
print("** model failed to load. Discarding configuration entry")
gen.model_manager.del_model(model_name)
gen.model_manager.del_model(imported_name)
return
if click.confirm('Make this the default model?', default=False):
gen.model_manager.set_default_model(model_name)
if click.confirm("Make this the default model?", default=False):
gen.model_manager.set_default_model(imported_name)
gen.model_manager.commit(opt.conf)
completer.update_models(gen.model_manager.list_models())
print(f">> {model_name} successfully installed")
def import_checkpoint_list(models: List[Path], gen, opt, completer)->List[str]:
'''
Does a mass import of all the checkpoint/safetensors on a path list
'''
model_names = list()
choice = input('** Directory of checkpoint/safetensors models detected. Install <a>ll or <s>elected models? [a] ') or 'a'
do_all = choice.startswith('a')
if do_all:
config_file = _ask_for_config_file(models[0], completer, plural=True)
manager = gen.model_manager
for model in sorted(models):
model_name = f'{model.stem}'
model_description = f'Imported model {model_name}'
if model_name in manager.model_names():
print(f'** {model_name} is already imported. Skipping.')
elif manager.import_ckpt_model(
model,
config = config_file,
model_name = model_name,
model_description = model_description,
commit_to_conf = opt.conf):
model_names.append(model_name)
print(f'>> Model {model_name} imported successfully')
else:
print(f'** Model {model} failed to import')
else:
for model in sorted(models):
if click.confirm(f'Import {model.stem} ?', default=True):
if model_name := import_ckpt_model(model, gen, opt, completer):
print(f'>> Model {model.stem} imported successfully')
model_names.append(model_name)
else:
print(f'** Model {model} failed to import')
print()
return model_names
def import_diffuser_model(
path_or_repo: Union[Path, str], gen, _, completer
) -> Optional[str]:
path_or_repo = path_or_repo.replace('\\','/') # windows
manager = gen.model_manager
default_name = Path(path_or_repo).stem
default_description = f"Imported model {default_name}"
model_name, model_description = _get_model_name_and_desc(
manager,
completer,
model_name=default_name,
model_description=default_description,
)
vae = None
if click.confirm('Replace this model\'s VAE with "stabilityai/sd-vae-ft-mse"?', default=False):
vae = dict(repo_id='stabilityai/sd-vae-ft-mse')
if not manager.import_diffuser_model(
path_or_repo, model_name=model_name, vae=vae, description=model_description
):
print("** model failed to import")
return None
return model_name
def import_ckpt_model(
path_or_url: Union[Path, str], gen, opt, completer
) -> Optional[str]:
path_or_url = path_or_url.replace('\\','/')
manager = gen.model_manager
is_a_url = str(path_or_url).startswith(('http:','https:'))
base_name = Path(url_attachment_name(path_or_url)).name if is_a_url else Path(path_or_url).name
default_name = Path(base_name).stem
default_description = f"Imported model {default_name}"
model_name, model_description = _get_model_name_and_desc(
manager,
completer,
model_name=default_name,
model_description=default_description,
)
config_file = None
default = (
Path(Globals.root, "configs/stable-diffusion/v1-inpainting-inference.yaml")
if re.search("inpaint", default_name, flags=re.IGNORECASE)
else Path(Globals.root, "configs/stable-diffusion/v1-inference.yaml")
)
completer.complete_extensions((".yaml", ".yml"))
completer.set_line(str(default))
done = False
while not done:
config_file = input("Configuration file for this model: ").strip()
done = os.path.exists(config_file)
completer.complete_extensions((".ckpt", ".safetensors"))
vae = None
default = Path(
Globals.root, "models/ldm/stable-diffusion-v1/vae-ft-mse-840000-ema-pruned.ckpt"
)
completer.set_line(str(default))
done = False
while not done:
vae = input("VAE file for this model (leave blank for none): ").strip() or None
done = (not vae) or os.path.exists(vae)
completer.complete_extensions(None)
if not manager.import_ckpt_model(
path_or_url,
config=config_file,
vae=vae,
model_name=model_name,
model_description=model_description,
commit_to_conf=opt.conf,
):
print("** model failed to import")
return None
return model_name
def _verify_load(model_name: str, gen) -> bool:
print(">> Verifying that new model loads...")
current_model = gen.model_name
try:
if not gen.model_manager.get_model(model_name):
if not gen.set_model(model_name):
return False
except Exception as e:
print(f'** model failed to load: {str(e)}')
print('** note that importing 2.X checkpoints is not supported. Please use !convert_model instead.')
print(f"** model failed to load: {str(e)}")
print(
"** note that importing 2.X checkpoints is not supported. Please use !convert_model instead."
)
return False
if click.confirm('Keep model loaded?', default=True):
if click.confirm("Keep model loaded?", default=True):
gen.set_model(model_name)
else:
print(">> Restoring previous model")
@@ -820,6 +698,7 @@ def _get_model_name_and_desc(
model_manager, completer, model_name: str = "", model_description: str = ""
):
model_name = _get_model_name(model_manager.list_models(), completer, model_name)
model_description = model_description or f"Imported model {model_name}"
completer.set_line(model_description)
model_description = (
input(f"Description for this model [{model_description}]: ").strip()
@@ -827,46 +706,11 @@ def _get_model_name_and_desc(
)
return model_name, model_description
def _ask_for_config_file(model_path: Union[str,Path], completer, plural: bool=False)->Path:
default = '1'
if re.search('inpaint',str(model_path),flags=re.IGNORECASE):
default = '3'
choices={
'1': 'v1-inference.yaml',
'2': 'v2-inference-v.yaml',
'3': 'v1-inpainting-inference.yaml',
}
prompt = '''What type of models are these?:
[1] Models based on Stable Diffusion 1.X
[2] Models based on Stable Diffusion 2.X
[3] Inpainting models based on Stable Diffusion 1.X
[4] Something else''' if plural else '''What type of model is this?:
[1] A model based on Stable Diffusion 1.X
[2] A model based on Stable Diffusion 2.X
[3] An inpainting models based on Stable Diffusion 1.X
[4] Something else'''
print(prompt)
choice = input(f'Your choice: [{default}] ')
choice = choice.strip() or default
if config_file := choices.get(choice,None):
return Path('configs','stable-diffusion',config_file)
# otherwise ask user to select
done = False
completer.complete_extensions(('.yaml','.yml'))
completer.set_line(str(Path(Globals.root,'configs/stable-diffusion/')))
while not done:
config_path = input('Configuration file for this model (leave blank to abort): ').strip()
done = not config_path or os.path.exists(config_path)
return config_path
def optimize_model(model_name_or_path: Union[Path,str], gen, opt, completer):
model_name_or_path = model_name_or_path.replace('\\','/') # windows
def convert_model(model_name_or_path: Union[Path, str], gen, opt, completer) -> str:
model_name_or_path = model_name_or_path.replace("\\", "/") # windows
manager = gen.model_manager
ckpt_path = None
original_config_file=None
original_config_file = None
if model_name_or_path == gen.model_name:
print("** Can't convert the active model. !switch to another model first. **")
return
@@ -876,61 +720,39 @@ def optimize_model(model_name_or_path: Union[Path,str], gen, opt, completer):
original_config_file = Path(model_info["config"])
model_name = model_name_or_path
model_description = model_info["description"]
vae = model_info["vae"]
else:
print(f"** {model_name_or_path} is not a legacy .ckpt weights file")
return
elif os.path.exists(model_name_or_path):
original_config_file = original_config_file or _ask_for_config_file(model_name_or_path, completer)
if not original_config_file:
return
ckpt_path = Path(model_name_or_path)
model_name, model_description = _get_model_name_and_desc(
manager, completer, ckpt_path.stem, f"Converted model {ckpt_path.stem}"
if vae_repo := ldm.invoke.model_manager.VAE_TO_REPO_ID.get(Path(vae).stem):
vae_repo = dict(repo_id=vae_repo)
else:
vae_repo = None
model_name = manager.convert_and_import(
ckpt_path,
diffusers_path=Path(
Globals.root, "models", Globals.converted_ckpts_dir, model_name_or_path
),
model_name=model_name,
model_description=model_description,
original_config_file=original_config_file,
vae=vae_repo,
)
else:
print(
f"** {model_name_or_path} is neither an existing model nor the path to a .ckpt file"
)
try:
model_name = import_model(model_name_or_path, gen, opt, completer, convert=True)
except KeyboardInterrupt:
return
if not model_name:
print("** Conversion failed. Aborting.")
return
if not ckpt_path.is_absolute():
ckpt_path = Path(Globals.root, ckpt_path)
if original_config_file and not original_config_file.is_absolute():
original_config_file = Path(Globals.root, original_config_file)
diffuser_path = Path(
Globals.root, "models", Globals.converted_ckpts_dir, model_name
)
if diffuser_path.exists():
print(
f"** {model_name_or_path} is already optimized. Will not overwrite. If this is an error, please remove the directory {diffuser_path} and try again."
)
return
vae = None
if click.confirm('Replace this model\'s VAE with "stabilityai/sd-vae-ft-mse"?', default=False):
vae = dict(repo_id='stabilityai/sd-vae-ft-mse')
new_config = gen.model_manager.convert_and_import(
ckpt_path,
diffuser_path,
model_name=model_name,
model_description=model_description,
vae=vae,
original_config_file=original_config_file,
commit_to_conf=opt.conf,
)
if not new_config:
return
completer.update_models(gen.model_manager.list_models())
if click.confirm(f'Load optimized model {model_name}?', default=True):
gen.set_model(model_name)
if click.confirm(f'Delete the original .ckpt file at {ckpt_path}?',default=False):
manager.commit(opt.conf)
if click.confirm(f"Delete the original .ckpt file at {ckpt_path}?", default=False):
ckpt_path.unlink(missing_ok=True)
print(f"{ckpt_path} deleted")
return model_name
def del_config(model_name: str, gen, opt, completer):
@@ -942,11 +764,15 @@ def del_config(model_name: str, gen, opt, completer):
print(f"** Unknown model {model_name}")
return
if not click.confirm(f'Remove {model_name} from the list of models known to InvokeAI?',default=True):
if not click.confirm(
f"Remove {model_name} from the list of models known to InvokeAI?", default=True
):
return
delete_completely = click.confirm('Completely remove the model file or directory from disk?',default=False)
gen.model_manager.del_model(model_name,delete_files=delete_completely)
delete_completely = click.confirm(
"Completely remove the model file or directory from disk?", default=False
)
gen.model_manager.del_model(model_name, delete_files=delete_completely)
gen.model_manager.commit(opt.conf)
print(f"** {model_name} deleted")
completer.update_models(gen.model_manager.list_models())
@@ -969,13 +795,30 @@ def edit_model(model_name: str, gen, opt, completer):
completer.set_line(info[attribute])
info[attribute] = input(f"{attribute}: ") or info[attribute]
if info["format"] == "diffusers":
vae = info.get("vae", dict(repo_id=None, path=None, subfolder=None))
completer.set_line(vae.get("repo_id") or "stabilityai/sd-vae-ft-mse")
vae["repo_id"] = input("External VAE repo_id: ").strip() or None
if not vae["repo_id"]:
completer.set_line(vae.get("path") or "")
vae["path"] = (
input("Path to a local diffusers VAE model (usually none): ").strip()
or None
)
completer.set_line(vae.get("subfolder") or "")
vae["subfolder"] = (
input("Name of subfolder containing the VAE model (usually none): ").strip()
or None
)
info["vae"] = vae
if new_name != model_name:
manager.del_model(model_name)
# this does the update
manager.add_model(new_name, info, True)
if click.confirm('Make this the default model?',default=False):
if click.confirm("Make this the default model?", default=False):
manager.set_default_model(new_name)
manager.commit(opt.conf)
completer.update_models(manager.list_models())
@@ -1353,7 +1196,10 @@ def report_model_error(opt: Namespace, e: Exception):
"** Reconfiguration is being forced by environment variable INVOKE_MODEL_RECONFIGURE"
)
else:
if click.confirm('Do you want to run invokeai-configure script to select and/or reinstall models?', default=True):
if not click.confirm(
'Do you want to run invokeai-configure script to select and/or reinstall models?',
default=False
):
return
print("invokeai-configure is launching....\n")

View File

@@ -93,11 +93,13 @@ import shlex
import sys
from argparse import Namespace
from pathlib import Path
from typing import List
import ldm.invoke
import ldm.invoke.pngwriter
from ldm.invoke.conditioning import split_weighted_subprompts
from ldm.invoke.globals import Globals
from ldm.invoke.prompt_parser import split_weighted_subprompts
APP_ID = ldm.invoke.__app_id__
APP_NAME = ldm.invoke.__app_name__
@@ -172,10 +174,10 @@ class Args(object):
self._arg_switches = self.parse_cmd('') # fill in defaults
self._cmd_switches = self.parse_cmd('') # fill in defaults
def parse_args(self):
def parse_args(self, args: List[str]=None):
'''Parse the shell switches and store.'''
sysargs = args if args is not None else sys.argv[1:]
try:
sysargs = sys.argv[1:]
# pre-parse before we do any initialization to get root directory
# and intercept --version request
switches = self._arg_parser.parse_args(sysargs)
@@ -538,11 +540,17 @@ class Args(object):
default=False,
help='Check for and blur potentially NSFW images. Use --no-nsfw_checker to disable.',
)
model_group.add_argument(
'--autoimport',
default=None,
type=str,
help='Check the indicated directory for .ckpt/.safetensors weights files at startup and import directly',
)
model_group.add_argument(
'--autoconvert',
default=None,
type=str,
help='Check the indicated directory for .ckpt weights files at startup and import as optimized diffuser models',
help='Check the indicated directory for .ckpt/.safetensors weights files at startup and import as optimized diffuser models',
)
model_group.add_argument(
'--patchmatch',
@@ -560,8 +568,8 @@ class Args(object):
'--outdir',
'-o',
type=str,
help='Directory to save generated images and a log of prompts and seeds. Default: outputs/img-samples',
default='outputs/img-samples',
help='Directory to save generated images and a log of prompts and seeds. Default: ROOTDIR/outputs',
default='outputs',
)
file_group.add_argument(
'--prompt_as_dir',

View File

@@ -803,6 +803,7 @@ def load_pipeline_from_original_stable_diffusion_ckpt(
extract_ema:bool=True,
upcast_attn:bool=False,
vae:AutoencoderKL=None,
precision:torch.dtype=torch.float32,
return_generator_pipeline:bool=False,
)->Union[StableDiffusionPipeline,StableDiffusionGeneratorPipeline]:
'''
@@ -828,6 +829,7 @@ def load_pipeline_from_original_stable_diffusion_ckpt(
checkpoints that have both EMA and non-EMA weights. Whether to extract the EMA weights
or not. Defaults to `False`. Pass `True` to extract the EMA weights. EMA weights usually yield higher
quality images for inference. Non-EMA weights are usually better to continue fine-tuning.
:param precision: precision to use - torch.float16, torch.float32 or torch.autocast
:param upcast_attention: Whether the attention computation should always be upcasted. This is necessary when
running stable diffusion 2.1.
'''
@@ -988,12 +990,12 @@ def load_pipeline_from_original_stable_diffusion_ckpt(
safety_checker = StableDiffusionSafetyChecker.from_pretrained('CompVis/stable-diffusion-safety-checker',cache_dir=global_cache_dir("hub"))
feature_extractor = AutoFeatureExtractor.from_pretrained("CompVis/stable-diffusion-safety-checker",cache_dir=cache_dir)
pipe = pipeline_class(
vae=vae,
text_encoder=text_model,
vae=vae.to(precision),
text_encoder=text_model.to(precision),
tokenizer=tokenizer,
unet=unet,
unet=unet.to(precision),
scheduler=scheduler,
safety_checker=safety_checker,
safety_checker=safety_checker.to(precision),
feature_extractor=feature_extractor,
)
else:

View File

@@ -7,61 +7,116 @@ get_uc_and_c_and_ec() get the conditioned and unconditioned latent, an
'''
import re
from typing import Union
from typing import Union, Optional, Any
import torch
from transformers import CLIPTokenizer, CLIPTextModel
from .prompt_parser import PromptParser, Blend, FlattenedPrompt, \
CrossAttentionControlledFragment, CrossAttentionControlSubstitute, Fragment
from ..models.diffusion import cross_attention_control
from compel import Compel
from compel.prompt_parser import FlattenedPrompt, Blend, Fragment, CrossAttentionControlSubstitute, PromptParser
from .devices import torch_dtype
from ..models.diffusion.shared_invokeai_diffusion import InvokeAIDiffuserComponent
from ..modules.encoders.modules import WeightedFrozenCLIPEmbedder
from ..modules.prompt_to_embeddings_converter import WeightedPromptFragmentsToEmbeddingsConverter
from ldm.invoke.globals import Globals
def get_tokenizer(model) -> CLIPTokenizer:
# TODO remove legacy ckpt fallback handling
return (getattr(model, 'tokenizer', None) # diffusers
or model.cond_stage_model.tokenizer) # ldm
def get_text_encoder(model) -> Any:
# TODO remove legacy ckpt fallback handling
return (getattr(model, 'text_encoder', None) # diffusers
or UnsqueezingLDMTransformer(model.cond_stage_model.transformer)) # ldm
class UnsqueezingLDMTransformer:
def __init__(self, ldm_transformer):
self.ldm_transformer = ldm_transformer
@property
def device(self):
return self.ldm_transformer.device
def __call__(self, *args, **kwargs):
insufficiently_unsqueezed_tensor = self.ldm_transformer(*args, **kwargs)
return insufficiently_unsqueezed_tensor.unsqueeze(0)
def get_uc_and_c_and_ec(prompt_string, model, log_tokens=False, skip_normalize_legacy_blend=False):
# lazy-load any deferred textual inversions.
# this might take a couple of seconds the first time a textual inversion is used.
model.textual_inversion_manager.create_deferred_token_ids_for_any_trigger_terms(prompt_string)
prompt, negative_prompt = get_prompt_structure(prompt_string,
skip_normalize_legacy_blend=skip_normalize_legacy_blend)
conditioning = _get_conditioning_for_prompt(prompt, negative_prompt, model, log_tokens)
tokenizer = get_tokenizer(model)
text_encoder = get_text_encoder(model)
compel = Compel(tokenizer=tokenizer,
text_encoder=text_encoder,
textual_inversion_manager=model.textual_inversion_manager,
dtype_for_device_getter=torch_dtype)
return conditioning
positive_prompt_string, negative_prompt_string = split_prompt_to_positive_and_negative(prompt_string)
legacy_blend = try_parse_legacy_blend(positive_prompt_string, skip_normalize_legacy_blend)
positive_prompt: FlattenedPrompt|Blend
if legacy_blend is not None:
positive_prompt = legacy_blend
else:
positive_prompt = Compel.parse_prompt_string(positive_prompt_string)
negative_prompt: FlattenedPrompt|Blend = Compel.parse_prompt_string(negative_prompt_string)
if log_tokens or getattr(Globals, "log_tokenization", False):
log_tokenization(positive_prompt, negative_prompt, tokenizer=tokenizer)
c, options = compel.build_conditioning_tensor_for_prompt_object(positive_prompt)
uc, _ = compel.build_conditioning_tensor_for_prompt_object(negative_prompt)
tokens_count = get_max_token_count(tokenizer, positive_prompt)
ec = InvokeAIDiffuserComponent.ExtraConditioningInfo(tokens_count_including_eos_bos=tokens_count,
cross_attention_control_args=options.get(
'cross_attention_control', None))
return uc, c, ec
def get_prompt_structure(prompt_string, skip_normalize_legacy_blend: bool = False) -> (
Union[FlattenedPrompt, Blend], FlattenedPrompt):
"""
parse the passed-in prompt string and return tuple (positive_prompt, negative_prompt)
"""
prompt, negative_prompt = _parse_prompt_string(prompt_string,
skip_normalize_legacy_blend=skip_normalize_legacy_blend)
return prompt, negative_prompt
Union[FlattenedPrompt, Blend], FlattenedPrompt):
positive_prompt_string, negative_prompt_string = split_prompt_to_positive_and_negative(prompt_string)
legacy_blend = try_parse_legacy_blend(positive_prompt_string, skip_normalize_legacy_blend)
positive_prompt: FlattenedPrompt|Blend
if legacy_blend is not None:
positive_prompt = legacy_blend
else:
positive_prompt = Compel.parse_prompt_string(positive_prompt_string)
negative_prompt: FlattenedPrompt|Blend = Compel.parse_prompt_string(negative_prompt_string)
return positive_prompt, negative_prompt
def get_max_token_count(tokenizer, prompt: FlattenedPrompt|Blend, truncate_if_too_long=True) -> int:
if type(prompt) is Blend:
blend: Blend = prompt
return max([get_max_token_count(tokenizer, c, truncate_if_too_long) for c in blend.prompts])
else:
return len(get_tokens_for_prompt_object(tokenizer, prompt, truncate_if_too_long))
def get_tokens_for_prompt(model, parsed_prompt: FlattenedPrompt, truncate_if_too_long=True) -> [str]:
def get_tokens_for_prompt_object(tokenizer, parsed_prompt: FlattenedPrompt, truncate_if_too_long=True) -> [str]:
if type(parsed_prompt) is Blend:
raise ValueError("Blend is not supported here - you need to get tokens for each of its .children")
text_fragments = [x.text if type(x) is Fragment else
(" ".join([f.text for f in x.original]) if type(x) is CrossAttentionControlSubstitute else
str(x))
for x in parsed_prompt.children]
text = " ".join(text_fragments)
tokens = model.cond_stage_model.tokenizer.tokenize(text)
tokens = tokenizer.tokenize(text)
if truncate_if_too_long:
max_tokens_length = model.cond_stage_model.max_length - 2 # typically 75
max_tokens_length = tokenizer.model_max_length - 2 # typically 75
tokens = tokens[0:max_tokens_length]
return tokens
def _parse_prompt_string(prompt_string_uncleaned, skip_normalize_legacy_blend=False) -> Union[FlattenedPrompt, Blend]:
# Extract Unconditioned Words From Prompt
def split_prompt_to_positive_and_negative(prompt_string_uncleaned):
unconditioned_words = ''
unconditional_regex = r'\[(.*?)\]'
unconditionals = re.findall(unconditional_regex, prompt_string_uncleaned)
if len(unconditionals) > 0:
unconditioned_words = ' '.join(unconditionals)
@@ -71,210 +126,57 @@ def _parse_prompt_string(prompt_string_uncleaned, skip_normalize_legacy_blend=Fa
prompt_string_cleaned = re.sub(' +', ' ', clean_prompt)
else:
prompt_string_cleaned = prompt_string_uncleaned
pp = PromptParser()
parsed_prompt: Union[FlattenedPrompt, Blend] = None
legacy_blend: Blend = pp.parse_legacy_blend(prompt_string_cleaned, skip_normalize_legacy_blend)
if legacy_blend is not None:
parsed_prompt = legacy_blend
else:
# we don't support conjunctions for now
parsed_prompt = pp.parse_conjunction(prompt_string_cleaned).prompts[0]
parsed_negative_prompt: FlattenedPrompt = pp.parse_conjunction(unconditioned_words).prompts[0]
return parsed_prompt, parsed_negative_prompt
return prompt_string_cleaned, unconditioned_words
def _get_conditioning_for_prompt(parsed_prompt: Union[Blend, FlattenedPrompt], parsed_negative_prompt: FlattenedPrompt,
model, log_tokens=False) \
-> tuple[torch.Tensor, torch.Tensor, InvokeAIDiffuserComponent.ExtraConditioningInfo]:
"""
Process prompt structure and tokens, and return (conditioning, unconditioning, extra_conditioning_info)
"""
def log_tokenization(positive_prompt: Blend | FlattenedPrompt,
negative_prompt: Blend | FlattenedPrompt,
tokenizer):
print(f"\n>> [TOKENLOG] Parsed Prompt: {positive_prompt}")
print(f"\n>> [TOKENLOG] Parsed Negative Prompt: {negative_prompt}")
if log_tokens or getattr(Globals, "log_tokenization", False):
print(f"\n>> [TOKENLOG] Parsed Prompt: {parsed_prompt}")
print(f"\n>> [TOKENLOG] Parsed Negative Prompt: {parsed_negative_prompt}")
log_tokenization_for_prompt_object(positive_prompt, tokenizer)
log_tokenization_for_prompt_object(negative_prompt, tokenizer, display_label_prefix="(negative prompt)")
conditioning = None
cac_args: cross_attention_control.Arguments = None
if type(parsed_prompt) is Blend:
conditioning = _get_conditioning_for_blend(model, parsed_prompt, log_tokens)
elif type(parsed_prompt) is FlattenedPrompt:
if parsed_prompt.wants_cross_attention_control:
conditioning, cac_args = _get_conditioning_for_cross_attention_control(model, parsed_prompt, log_tokens)
def log_tokenization_for_prompt_object(p: Blend | FlattenedPrompt, tokenizer, display_label_prefix=None):
display_label_prefix = display_label_prefix or ""
if type(p) is Blend:
blend: Blend = p
for i, c in enumerate(blend.prompts):
log_tokenization_for_prompt_object(
c, tokenizer,
display_label_prefix=f"{display_label_prefix}(blend part {i + 1}, weight={blend.weights[i]})")
elif type(p) is FlattenedPrompt:
flattened_prompt: FlattenedPrompt = p
if flattened_prompt.wants_cross_attention_control:
original_fragments = []
edited_fragments = []
for f in flattened_prompt.children:
if type(f) is CrossAttentionControlSubstitute:
original_fragments += f.original
edited_fragments += f.edited
else:
original_fragments.append(f)
edited_fragments.append(f)
original_text = " ".join([x.text for x in original_fragments])
log_tokenization_for_text(original_text, tokenizer,
display_label=f"{display_label_prefix}(.swap originals)")
edited_text = " ".join([x.text for x in edited_fragments])
log_tokenization_for_text(edited_text, tokenizer,
display_label=f"{display_label_prefix}(.swap replacements)")
else:
conditioning, _ = _get_embeddings_and_tokens_for_prompt(model,
parsed_prompt,
log_tokens=log_tokens,
log_display_label="(prompt)")
else:
raise ValueError(f"parsed_prompt is '{type(parsed_prompt)}' which is not a supported prompt type")
unconditioning, _ = _get_embeddings_and_tokens_for_prompt(model,
parsed_negative_prompt,
log_tokens=log_tokens,
log_display_label="(unconditioning)")
if isinstance(conditioning, dict):
# hybrid conditioning is in play
unconditioning, conditioning = _flatten_hybrid_conditioning(unconditioning, conditioning)
if cac_args is not None:
print(
">> Hybrid conditioning cannot currently be combined with cross attention control. Cross attention control will be ignored.")
cac_args = None
if type(parsed_prompt) is Blend:
blend: Blend = parsed_prompt
all_token_sequences = [get_tokens_for_prompt(model, p) for p in blend.prompts]
longest_token_sequence = max(all_token_sequences, key=lambda t: len(t))
eos_token_index = len(longest_token_sequence)+1
else:
tokens = get_tokens_for_prompt(model, parsed_prompt)
eos_token_index = len(tokens)+1
return (
unconditioning, conditioning, InvokeAIDiffuserComponent.ExtraConditioningInfo(
tokens_count_including_eos_bos=eos_token_index + 1,
cross_attention_control_args=cac_args
)
)
text = " ".join([x.text for x in flattened_prompt.children])
log_tokenization_for_text(text, tokenizer, display_label=display_label_prefix)
def _get_conditioning_for_cross_attention_control(model, prompt: FlattenedPrompt, log_tokens: bool = True):
original_prompt = FlattenedPrompt()
edited_prompt = FlattenedPrompt()
# for name, a0, a1, b0, b1 in edit_opcodes: only name == 'equal' is currently parsed
original_token_count = 0
edited_token_count = 0
edit_options = []
edit_opcodes = []
# beginning of sequence
edit_opcodes.append(
('equal', original_token_count, original_token_count + 1, edited_token_count, edited_token_count + 1))
edit_options.append(None)
original_token_count += 1
edited_token_count += 1
for fragment in prompt.children:
if type(fragment) is CrossAttentionControlSubstitute:
original_prompt.append(fragment.original)
edited_prompt.append(fragment.edited)
to_replace_token_count = _get_tokens_length(model, fragment.original)
replacement_token_count = _get_tokens_length(model, fragment.edited)
edit_opcodes.append(('replace',
original_token_count, original_token_count + to_replace_token_count,
edited_token_count, edited_token_count + replacement_token_count
))
original_token_count += to_replace_token_count
edited_token_count += replacement_token_count
edit_options.append(fragment.options)
# elif type(fragment) is CrossAttentionControlAppend:
# edited_prompt.append(fragment.fragment)
else:
# regular fragment
original_prompt.append(fragment)
edited_prompt.append(fragment)
count = _get_tokens_length(model, [fragment])
edit_opcodes.append(('equal', original_token_count, original_token_count + count, edited_token_count,
edited_token_count + count))
edit_options.append(None)
original_token_count += count
edited_token_count += count
# end of sequence
edit_opcodes.append(
('equal', original_token_count, original_token_count + 1, edited_token_count, edited_token_count + 1))
edit_options.append(None)
original_token_count += 1
edited_token_count += 1
original_embeddings, original_tokens = _get_embeddings_and_tokens_for_prompt(model,
original_prompt,
log_tokens=log_tokens,
log_display_label="(.swap originals)")
# naïvely building a single edited_embeddings like this disregards the effects of changing the absolute location of
# subsequent tokens when there is >1 edit and earlier edits change the total token count.
# eg "a cat.swap(smiling dog, s_start=0.5) eating a hotdog.swap(pizza)" - when the 'pizza' edit is active but the
# 'cat' edit is not, the 'pizza' feature vector will nevertheless be affected by the introduction of the extra
# token 'smiling' in the inactive 'cat' edit.
# todo: build multiple edited_embeddings, one for each edit, and pass just the edited fragments through to the CrossAttentionControl functions
edited_embeddings, edited_tokens = _get_embeddings_and_tokens_for_prompt(model,
edited_prompt,
log_tokens=log_tokens,
log_display_label="(.swap replacements)")
conditioning = original_embeddings
edited_conditioning = edited_embeddings
# print('>> got edit_opcodes', edit_opcodes, 'options', edit_options)
cac_args = cross_attention_control.Arguments(
edited_conditioning=edited_conditioning,
edit_opcodes=edit_opcodes,
edit_options=edit_options
)
return conditioning, cac_args
def _get_conditioning_for_blend(model, blend: Blend, log_tokens: bool = False):
embeddings_to_blend = None
for i, flattened_prompt in enumerate(blend.prompts):
this_embedding, _ = _get_embeddings_and_tokens_for_prompt(model,
flattened_prompt,
log_tokens=log_tokens,
log_display_label=f"(blend part {i + 1}, weight={blend.weights[i]})")
embeddings_to_blend = this_embedding if embeddings_to_blend is None else torch.cat(
(embeddings_to_blend, this_embedding))
conditioning = WeightedPromptFragmentsToEmbeddingsConverter.apply_embedding_weights(embeddings_to_blend.unsqueeze(0),
blend.weights,
normalize=blend.normalize_weights)
return conditioning
def _get_embeddings_and_tokens_for_prompt(model, flattened_prompt: FlattenedPrompt, log_tokens: bool = False,
log_display_label: str = None):
if type(flattened_prompt) is not FlattenedPrompt:
raise Exception(f"embeddings can only be made from FlattenedPrompts, got {type(flattened_prompt)} instead")
fragments = [x.text for x in flattened_prompt.children]
weights = [x.weight for x in flattened_prompt.children]
embeddings, tokens = model.get_learned_conditioning([fragments], return_tokens=True, fragment_weights=[weights])
if log_tokens or getattr(Globals, "log_tokenization", False):
text = " ".join(fragments)
log_tokenization(text, model, display_label=log_display_label)
return embeddings, tokens
def _get_tokens_length(model, fragments: list[Fragment]):
fragment_texts = [x.text for x in fragments]
tokens = model.cond_stage_model.get_token_ids(fragment_texts, include_start_and_end_markers=False)
return sum([len(x) for x in tokens])
def _flatten_hybrid_conditioning(uncond, cond):
'''
This handles the choice between a conditional conditioning
that is a tensor (used by cross attention) vs one that has additional
dimensions as well, as used by 'hybrid'
'''
assert isinstance(uncond, dict)
assert isinstance(cond, dict)
cond_flattened = dict()
for k in cond:
if isinstance(cond[k], list):
cond_flattened[k] = [
torch.cat([uncond[k][i], cond[k][i]])
for i in range(len(cond[k]))
]
else:
cond_flattened[k] = torch.cat([uncond[k], cond[k]])
return uncond, cond_flattened
def log_tokenization(text, model, display_label=None):
def log_tokenization_for_text(text, tokenizer, display_label=None):
""" shows how the prompt is tokenized
# usually tokens have '</w>' to indicate end-of-word,
# but for readability it has been replaced with ' '
"""
tokens = model.cond_stage_model.tokenizer.tokenize(text)
tokens = tokenizer.tokenize(text)
tokenized = ""
discarded = ""
usedTokens = 0
@@ -284,7 +186,7 @@ def log_tokenization(text, model, display_label=None):
token = tokens[i].replace('</w>', ' ')
# alternate color
s = (usedTokens % 6) + 1
if i < model.cond_stage_model.max_length:
if i < tokenizer.model_max_length:
tokenized = tokenized + f"\x1b[0;3{s};40m{token}"
usedTokens += 1
else: # over max token length
@@ -293,7 +195,58 @@ def log_tokenization(text, model, display_label=None):
if usedTokens > 0:
print(f'\n>> [TOKENLOG] Tokens {display_label or ""} ({usedTokens}):')
print(f'{tokenized}\x1b[0m')
if discarded != "":
print(f'\n>> [TOKENLOG] Tokens Discarded ({totalTokens - usedTokens}):')
print(f'{discarded}\x1b[0m')
def try_parse_legacy_blend(text: str, skip_normalize: bool=False) -> Optional[Blend]:
weighted_subprompts = split_weighted_subprompts(text, skip_normalize=skip_normalize)
if len(weighted_subprompts) <= 1:
return None
strings = [x[0] for x in weighted_subprompts]
weights = [x[1] for x in weighted_subprompts]
pp = PromptParser()
parsed_conjunctions = [pp.parse_conjunction(x) for x in strings]
flattened_prompts = [x.prompts[0] for x in parsed_conjunctions]
return Blend(prompts=flattened_prompts, weights=weights, normalize_weights=not skip_normalize)
def split_weighted_subprompts(text, skip_normalize=False)->list:
"""
Legacy blend parsing.
grabs all text up to the first occurrence of ':'
uses the grabbed text as a sub-prompt, and takes the value following ':' as weight
if ':' has no value defined, defaults to 1.0
repeats until no text remaining
"""
prompt_parser = re.compile("""
(?P<prompt> # capture group for 'prompt'
(?:\\\:|[^:])+ # match one or more non ':' characters or escaped colons '\:'
) # end 'prompt'
(?: # non-capture group
:+ # match one or more ':' characters
(?P<weight> # capture group for 'weight'
-?\d+(?:\.\d+)? # match positive or negative integer or decimal number
)? # end weight capture group, make optional
\s* # strip spaces after weight
| # OR
$ # else, if no ':' then match end of line
) # end non-capture group
""", re.VERBOSE)
parsed_prompts = [(match.group("prompt").replace("\\:", ":"), float(
match.group("weight") or 1)) for match in re.finditer(prompt_parser, text)]
if skip_normalize:
return parsed_prompts
weight_sum = sum(map(lambda x: x[1], parsed_prompts))
if weight_sum == 0:
print(
"* Warning: Subprompt weights add up to zero. Discarding and using even weights instead.")
equal_weight = 1 / max(len(parsed_prompts), 1)
return [(x[0], equal_weight) for x in parsed_prompts]
return [(x[0], x[1] / weight_sum) for x in parsed_prompts]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,495 @@
#!/usr/bin/env python
# Copyright (c) 2022 Lincoln D. Stein (https://github.com/lstein)
# Before running stable-diffusion on an internet-isolated machine,
# run this script from one with internet connectivity. The
# two machines must share a common .cache directory.
"""
This is the npyscreen frontend to the model installation application.
The work is actually done in backend code in model_install_backend.py.
"""
import argparse
import curses
import os
import sys
import traceback
from argparse import Namespace
from pathlib import Path
from typing import List
import npyscreen
import torch
from npyscreen import widget
from omegaconf import OmegaConf
from ..devices import choose_precision, choose_torch_device
from ..globals import Globals, global_config_dir
from .model_install_backend import (Dataset_path, default_config_file,
default_dataset, get_root,
install_requested_models,
recommended_datasets)
from .widgets import (MultiSelectColumns, TextBox,
OffsetButtonPress, CenteredTitleText)
class addModelsForm(npyscreen.FormMultiPage):
# for responsive resizing - disabled
#FIX_MINIMUM_SIZE_WHEN_CREATED = False
def __init__(self, parentApp, name, multipage=False, *args, **keywords):
self.multipage = multipage
self.initial_models = OmegaConf.load(Dataset_path)
try:
self.existing_models = OmegaConf.load(default_config_file())
except:
self.existing_models = dict()
self.starter_model_list = [
x for x in list(self.initial_models.keys()) if x not in self.existing_models
]
self.installed_models = dict()
super().__init__(parentApp=parentApp, name=name, *args, **keywords)
def create(self):
window_height, window_width = curses.initscr().getmaxyx()
starter_model_labels = self._get_starter_model_labels()
recommended_models = [
x
for x in self.starter_model_list
if self.initial_models[x].get("recommended", False)
]
self.installed_models = sorted(
[x for x in list(self.initial_models.keys()) if x in self.existing_models]
)
self.nextrely -= 1
self.add_widget_intelligent(
npyscreen.FixedText,
value="Use ctrl-N and ctrl-P to move to the <N>ext and <P>revious fields,",
editable=False,
color='CAUTION',
)
self.add_widget_intelligent(
npyscreen.FixedText,
value="Use cursor arrows to make a selection, and space to toggle checkboxes.",
editable=False,
color='CAUTION'
)
self.nextrely += 1
if len(self.installed_models) > 0:
self.add_widget_intelligent(
CenteredTitleText,
name="== INSTALLED STARTER MODELS ==",
editable=False,
color="CONTROL",
)
self.nextrely -= 1
self.add_widget_intelligent(
CenteredTitleText,
name="Currently installed starter models. Uncheck to delete:",
editable=False,
labelColor="CAUTION",
)
self.nextrely -= 1
columns = self._get_columns()
self.previously_installed_models = self.add_widget_intelligent(
MultiSelectColumns,
columns=columns,
values=self.installed_models,
value=[x for x in range(0, len(self.installed_models))],
max_height=1 + len(self.installed_models) // columns,
relx=4,
slow_scroll=True,
scroll_exit=True,
)
self.purge_deleted = self.add_widget_intelligent(
npyscreen.Checkbox,
name="Purge deleted models from disk",
value=False,
scroll_exit=True,
relx=4,
)
self.nextrely += 1
self.add_widget_intelligent(
CenteredTitleText,
name="== STARTER MODELS (recommended ones selected) ==",
editable=False,
color="CONTROL",
)
self.nextrely -= 1
self.add_widget_intelligent(
CenteredTitleText,
name="Select from a starter set of Stable Diffusion models from HuggingFace:",
editable=False,
labelColor="CAUTION",
)
self.nextrely -= 1
# if user has already installed some initial models, then don't patronize them
# by showing more recommendations
show_recommended = not self.existing_models
self.models_selected = self.add_widget_intelligent(
npyscreen.MultiSelect,
name="Install Starter Models",
values=starter_model_labels,
value=[
self.starter_model_list.index(x)
for x in self.starter_model_list
if show_recommended and x in recommended_models
],
max_height=len(starter_model_labels) + 1,
relx=4,
scroll_exit=True,
)
self.add_widget_intelligent(
CenteredTitleText,
name='== IMPORT LOCAL AND REMOTE MODELS ==',
editable=False,
color="CONTROL",
)
self.nextrely -= 1
for line in [
"In the box below, enter URLs, file paths, or HuggingFace repository IDs.",
"Separate model names by lines or whitespace (Use shift-control-V to paste):",
]:
self.add_widget_intelligent(
CenteredTitleText,
name=line,
editable=False,
labelColor="CONTROL",
relx = 4,
)
self.nextrely -= 1
self.import_model_paths = self.add_widget_intelligent(
TextBox, max_height=5, scroll_exit=True, editable=True, relx=4
)
self.nextrely += 1
self.show_directory_fields = self.add_widget_intelligent(
npyscreen.FormControlCheckbox,
name="Select a directory for models to import",
value=False,
)
self.autoload_directory = self.add_widget_intelligent(
npyscreen.TitleFilename,
name="Directory (<tab> autocompletes):",
select_dir=True,
must_exist=True,
use_two_lines=False,
labelColor="DANGER",
begin_entry_at=34,
scroll_exit=True,
)
self.autoscan_on_startup = self.add_widget_intelligent(
npyscreen.Checkbox,
name="Scan this directory each time InvokeAI starts for new models to import",
value=False,
relx=4,
scroll_exit=True,
)
self.nextrely += 1
self.convert_models = self.add_widget_intelligent(
npyscreen.TitleSelectOne,
name="== CONVERT IMPORTED MODELS INTO DIFFUSERS==",
values=["Keep original format", "Convert to diffusers"],
value=0,
begin_entry_at=4,
max_height=4,
hidden=True, # will appear when imported models box is edited
scroll_exit=True,
)
self.cancel = self.add_widget_intelligent(
npyscreen.ButtonPress,
name="CANCEL",
rely=-3,
when_pressed_function=self.on_cancel,
)
done_label = "DONE"
back_label = "BACK"
button_length = len(done_label)
button_offset = 0
if self.multipage:
button_length += len(back_label) + 1
button_offset += len(back_label) + 1
self.back_button = self.add_widget_intelligent(
OffsetButtonPress,
name=back_label,
relx=(window_width - button_length) // 2,
offset=-3,
rely=-3,
when_pressed_function=self.on_back,
)
self.ok_button = self.add_widget_intelligent(
OffsetButtonPress,
name=done_label,
offset=+3,
relx=button_offset + 1 + (window_width - button_length) // 2,
rely=-3,
when_pressed_function=self.on_ok,
)
for i in [self.autoload_directory, self.autoscan_on_startup]:
self.show_directory_fields.addVisibleWhenSelected(i)
self.show_directory_fields.when_value_edited = self._clear_scan_directory
self.import_model_paths.when_value_edited = self._show_hide_convert
self.autoload_directory.when_value_edited = self._show_hide_convert
def resize(self):
super().resize()
self.models_selected.values = self._get_starter_model_labels()
def _clear_scan_directory(self):
if not self.show_directory_fields.value:
self.autoload_directory.value = ""
def _show_hide_convert(self):
model_paths = self.import_model_paths.value or ""
autoload_directory = self.autoload_directory.value or ""
self.convert_models.hidden = (
len(model_paths) == 0 and len(autoload_directory) == 0
)
def _get_starter_model_labels(self) -> List[str]:
window_height, window_width = curses.initscr().getmaxyx()
label_width = 25
checkbox_width = 4
spacing_width = 2
description_width = window_width - label_width - checkbox_width - spacing_width
im = self.initial_models
names = self.starter_model_list
descriptions = [
im[x].description[0 : description_width - 3] + "..."
if len(im[x].description) > description_width
else im[x].description
for x in names
]
return [
f"%-{label_width}s %s" % (names[x], descriptions[x])
for x in range(0, len(names))
]
def _get_columns(self) -> int:
window_height, window_width = curses.initscr().getmaxyx()
cols = (
4
if window_width > 240
else 3
if window_width > 160
else 2
if window_width > 80
else 1
)
return min(cols, len(self.installed_models))
def on_ok(self):
self.parentApp.setNextForm(None)
self.editing = False
self.parentApp.user_cancelled = False
self.marshall_arguments()
def on_back(self):
self.parentApp.switchFormPrevious()
self.editing = False
def on_cancel(self):
if npyscreen.notify_yes_no(
"Are you sure you want to cancel?\nYou may re-run this script later using the invoke.sh or invoke.bat command.\n"
):
self.parentApp.setNextForm(None)
self.parentApp.user_cancelled = True
self.editing = False
def marshall_arguments(self):
"""
Assemble arguments and store as attributes of the application:
.starter_models: dict of model names to install from INITIAL_CONFIGURE.yaml
True => Install
False => Remove
.scan_directory: Path to a directory of models to scan and import
.autoscan_on_startup: True if invokeai should scan and import at startup time
.import_model_paths: list of URLs, repo_ids and file paths to import
.convert_to_diffusers: if True, convert legacy checkpoints into diffusers
"""
# we're using a global here rather than storing the result in the parentapp
# due to some bug in npyscreen that is causing attributes to be lost
selections = self.parentApp.user_selections
# starter models to install/remove
starter_models = dict(
map(
lambda x: (self.starter_model_list[x], True), self.models_selected.value
)
)
selections.purge_deleted_models = False
if hasattr(self, "previously_installed_models"):
unchecked = [
self.previously_installed_models.values[x]
for x in range(0, len(self.previously_installed_models.values))
if x not in self.previously_installed_models.value
]
starter_models.update(map(lambda x: (x, False), unchecked))
selections.purge_deleted_models = self.purge_deleted.value
selections.starter_models = starter_models
# load directory and whether to scan on startup
if self.show_directory_fields.value:
selections.scan_directory = self.autoload_directory.value
selections.autoscan_on_startup = self.autoscan_on_startup.value
else:
selections.scan_directory = None
selections.autoscan_on_startup = False
# URLs and the like
selections.import_model_paths = self.import_model_paths.value.split()
selections.convert_to_diffusers = self.convert_models.value[0] == 1
class AddModelApplication(npyscreen.NPSAppManaged):
def __init__(self):
super().__init__()
self.user_cancelled = False
self.user_selections = Namespace(
starter_models=None,
purge_deleted_models=False,
scan_directory=None,
autoscan_on_startup=None,
import_model_paths=None,
convert_to_diffusers=None,
)
def onStart(self):
npyscreen.setTheme(npyscreen.Themes.DefaultTheme)
self.main_form = self.addForm(
"MAIN", addModelsForm, name="Install Stable Diffusion Models"
)
# --------------------------------------------------------
def process_and_execute(opt: Namespace, selections: Namespace):
models_to_remove = [
x for x in selections.starter_models if not selections.starter_models[x]
]
models_to_install = [
x for x in selections.starter_models if selections.starter_models[x]
]
directory_to_scan = selections.scan_directory
scan_at_startup = selections.autoscan_on_startup
potential_models_to_install = selections.import_model_paths
convert_to_diffusers = selections.convert_to_diffusers
install_requested_models(
install_initial_models=models_to_install,
remove_models=models_to_remove,
scan_directory=Path(directory_to_scan) if directory_to_scan else None,
external_models=potential_models_to_install,
scan_at_startup=scan_at_startup,
convert_to_diffusers=convert_to_diffusers,
precision="float32"
if opt.full_precision
else choose_precision(torch.device(choose_torch_device())),
purge_deleted=selections.purge_deleted_models,
config_file_path=Path(opt.config_file) if opt.config_file else None,
)
# --------------------------------------------------------
def select_and_download_models(opt: Namespace):
precision = (
"float32"
if opt.full_precision
else choose_precision(torch.device(choose_torch_device()))
)
if opt.default_only:
install_requested_models(
install_initial_models=default_dataset(),
precision=precision,
)
elif opt.yes_to_all:
install_requested_models(
install_initial_models=recommended_datasets(),
precision=precision,
)
else:
installApp = AddModelApplication()
installApp.run()
if not installApp.user_cancelled:
process_and_execute(opt, installApp.user_selections)
# -------------------------------------
def main():
parser = argparse.ArgumentParser(description="InvokeAI model downloader")
parser.add_argument(
"--full-precision",
dest="full_precision",
action=argparse.BooleanOptionalAction,
type=bool,
default=False,
help="use 32-bit weights instead of faster 16-bit weights",
)
parser.add_argument(
"--yes",
"-y",
dest="yes_to_all",
action="store_true",
help='answer "yes" to all prompts',
)
parser.add_argument(
"--default_only",
action="store_true",
help="only install the default model",
)
parser.add_argument(
"--config_file",
"-c",
dest="config_file",
type=str,
default=None,
help="path to configuration file to create",
)
parser.add_argument(
"--root_dir",
dest="root",
type=str,
default=None,
help="path to root of install directory",
)
opt = parser.parse_args()
# setting a global here
Globals.root = os.path.expanduser(get_root(opt.root) or "")
if not global_config_dir().exists():
print(
">> Your InvokeAI root directory is not set up. Calling invokeai-configure."
)
import ldm.invoke.config.invokeai_configure
ldm.invoke.config.invokeai_configure.main()
sys.exit(0)
try:
select_and_download_models(opt)
except AssertionError as e:
print(str(e))
sys.exit(-1)
except KeyboardInterrupt:
print("\nGoodbye! Come back soon.")
except (widget.NotEnoughSpaceForWidget, Exception) as e:
if str(e).startswith("Height of 1 allocated"):
print(
"** Insufficient vertical space for the interface. Please make your window taller and try again"
)
elif str(e).startswith("addwstr"):
print(
"** Insufficient horizontal space for the interface. Please make your window wider and try again."
)
else:
print(f"** An error has occurred: {str(e)}")
traceback.print_exc()
sys.exit(-1)
# -------------------------------------
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,452 @@
"""
Utility (backend) functions used by model_install.py
"""
import os
import re
import shutil
import sys
import warnings
from pathlib import Path
from tempfile import TemporaryFile
import requests
from diffusers import AutoencoderKL
from huggingface_hub import hf_hub_url
from omegaconf import OmegaConf
from omegaconf.dictconfig import DictConfig
from tqdm import tqdm
from typing import List
import invokeai.configs as configs
from ..generator.diffusers_pipeline import StableDiffusionGeneratorPipeline
from ..globals import Globals, global_cache_dir, global_config_dir
from ..model_manager import ModelManager
warnings.filterwarnings("ignore")
# --------------------------globals-----------------------
Model_dir = "models"
Weights_dir = "ldm/stable-diffusion-v1/"
# the initial "configs" dir is now bundled in the `invokeai.configs` package
Dataset_path = Path(configs.__path__[0]) / "INITIAL_MODELS.yaml"
# initial models omegaconf
Datasets = None
Config_preamble = """
# This file describes the alternative machine learning models
# available to InvokeAI script.
#
# To add a new model, follow the examples below. Each
# model requires a model config file, a weights file,
# and the width and height of the images it
# was trained on.
"""
def default_config_file():
return Path(global_config_dir()) / "models.yaml"
def sd_configs():
return Path(global_config_dir()) / "stable-diffusion"
def initial_models():
global Datasets
if Datasets:
return Datasets
return (Datasets := OmegaConf.load(Dataset_path))
def install_requested_models(
install_initial_models: List[str] = None,
remove_models: List[str] = None,
scan_directory: Path = None,
external_models: List[str] = None,
scan_at_startup: bool = False,
convert_to_diffusers: bool = False,
precision: str = "float16",
purge_deleted: bool = False,
config_file_path: Path = None,
):
config_file_path=config_file_path or default_config_file()
if not config_file_path.exists():
open(config_file_path,'w')
model_manager= ModelManager(OmegaConf.load(config_file_path),precision=precision)
if remove_models and len(remove_models) > 0:
print("== DELETING UNCHECKED STARTER MODELS ==")
for model in remove_models:
print(f'{model}...')
model_manager.del_model(model, delete_files=purge_deleted)
model_manager.commit(config_file_path)
if install_initial_models and len(install_initial_models) > 0:
print("== INSTALLING SELECTED STARTER MODELS ==")
successfully_downloaded = download_weight_datasets(
models=install_initial_models,
access_token=None,
precision=precision,
) # FIX: for historical reasons, we don't use model manager here
update_config_file(successfully_downloaded, config_file_path)
if len(successfully_downloaded) < len(install_initial_models):
print("** Some of the model downloads were not successful")
# due to above, we have to reload the model manager because conf file
# was changed behind its back
model_manager= ModelManager(OmegaConf.load(config_file_path),precision=precision)
external_models = external_models or list()
if scan_directory:
external_models.append(str(scan_directory))
if len(external_models)>0:
print("== INSTALLING EXTERNAL MODELS ==")
for path_url_or_repo in external_models:
try:
model_manager.heuristic_import(
path_url_or_repo,
convert=convert_to_diffusers,
commit_to_conf=config_file_path
)
except KeyboardInterrupt:
sys.exit(-1)
except Exception:
pass
if scan_at_startup and scan_directory.is_dir():
argument = '--autoconvert' if convert_to_diffusers else '--autoimport'
initfile = Path(Globals.root, Globals.initfile)
replacement = Path(Globals.root, f'{Globals.initfile}.new')
with open(initfile,'r') as input:
with open(replacement,'w') as output:
while line := input.readline():
if not line.startswith(argument):
output.writelines([line])
output.writelines([f'{argument} {str(scan_directory)}'])
os.replace(replacement,initfile)
# -------------------------------------
def yes_or_no(prompt: str, default_yes=True):
default = "y" if default_yes else "n"
response = input(f"{prompt} [{default}] ") or default
if default_yes:
return response[0] not in ("n", "N")
else:
return response[0] in ("y", "Y")
# -------------------------------------
def get_root(root: str = None) -> str:
if root:
return root
elif os.environ.get("INVOKEAI_ROOT"):
return os.environ.get("INVOKEAI_ROOT")
else:
return Globals.root
# ---------------------------------------------
def recommended_datasets() -> dict:
datasets = dict()
for ds in initial_models().keys():
if initial_models()[ds].get("recommended", False):
datasets[ds] = True
return datasets
# ---------------------------------------------
def default_dataset() -> dict:
datasets = dict()
for ds in initial_models().keys():
if initial_models()[ds].get("default", False):
datasets[ds] = True
return datasets
# ---------------------------------------------
def all_datasets() -> dict:
datasets = dict()
for ds in initial_models().keys():
datasets[ds] = True
return datasets
# ---------------------------------------------
# look for legacy model.ckpt in models directory and offer to
# normalize its name
def migrate_models_ckpt():
model_path = os.path.join(Globals.root, Model_dir, Weights_dir)
if not os.path.exists(os.path.join(model_path, "model.ckpt")):
return
new_name = initial_models()["stable-diffusion-1.4"]["file"]
print('The Stable Diffusion v4.1 "model.ckpt" is already installed. The name will be changed to {new_name} to avoid confusion.')
print(f"model.ckpt => {new_name}")
os.replace(
os.path.join(model_path, "model.ckpt"), os.path.join(model_path, new_name)
)
# ---------------------------------------------
def download_weight_datasets(
models: List[str], access_token: str, precision: str = "float32"
):
migrate_models_ckpt()
successful = dict()
for mod in models:
print(f"Downloading {mod}:")
successful[mod] = _download_repo_or_file(
initial_models()[mod], access_token, precision=precision
)
return successful
def _download_repo_or_file(
mconfig: DictConfig, access_token: str, precision: str = "float32"
) -> Path:
path = None
if mconfig["format"] == "ckpt":
path = _download_ckpt_weights(mconfig, access_token)
else:
path = _download_diffusion_weights(mconfig, access_token, precision=precision)
if "vae" in mconfig and "repo_id" in mconfig["vae"]:
_download_diffusion_weights(
mconfig["vae"], access_token, precision=precision
)
return path
def _download_ckpt_weights(mconfig: DictConfig, access_token: str) -> Path:
repo_id = mconfig["repo_id"]
filename = mconfig["file"]
cache_dir = os.path.join(Globals.root, Model_dir, Weights_dir)
return hf_download_with_resume(
repo_id=repo_id,
model_dir=cache_dir,
model_name=filename,
access_token=access_token,
)
# ---------------------------------------------
def download_from_hf(
model_class: object, model_name: str, cache_subdir: Path = Path("hub"), **kwargs
):
print("", file=sys.stderr) # to prevent tqdm from overwriting
path = global_cache_dir(cache_subdir)
model = model_class.from_pretrained(
model_name,
cache_dir=path,
resume_download=True,
**kwargs,
)
model_name = "--".join(("models", *model_name.split("/")))
return path / model_name if model else None
def _download_diffusion_weights(
mconfig: DictConfig, access_token: str, precision: str = "float32"
):
repo_id = mconfig["repo_id"]
model_class = (
StableDiffusionGeneratorPipeline
if mconfig.get("format", None) == "diffusers"
else AutoencoderKL
)
extra_arg_list = [{"revision": "fp16"}, {}] if precision == "float16" else [{}]
path = None
for extra_args in extra_arg_list:
try:
path = download_from_hf(
model_class,
repo_id,
cache_subdir="diffusers",
safety_checker=None,
**extra_args,
)
except OSError as e:
if str(e).startswith("fp16 is not a valid"):
pass
else:
print(f"An unexpected error occurred while downloading the model: {e})")
if path:
break
return path
# ---------------------------------------------
def hf_download_with_resume(
repo_id: str, model_dir: str, model_name: str, access_token: str = None
) -> Path:
model_dest = Path(os.path.join(model_dir, model_name))
os.makedirs(model_dir, exist_ok=True)
url = hf_hub_url(repo_id, model_name)
header = {"Authorization": f"Bearer {access_token}"} if access_token else {}
open_mode = "wb"
exist_size = 0
if os.path.exists(model_dest):
exist_size = os.path.getsize(model_dest)
header["Range"] = f"bytes={exist_size}-"
open_mode = "ab"
resp = requests.get(url, headers=header, stream=True)
total = int(resp.headers.get("content-length", 0))
if (
resp.status_code == 416
): # "range not satisfiable", which means nothing to return
print(f"* {model_name}: complete file found. Skipping.")
return model_dest
elif resp.status_code != 200:
print(f"** An error occurred during downloading {model_name}: {resp.reason}")
elif exist_size > 0:
print(f"* {model_name}: partial file found. Resuming...")
else:
print(f"* {model_name}: Downloading...")
try:
if total < 2000:
print(f"*** ERROR DOWNLOADING {model_name}: {resp.text}")
return None
with open(model_dest, open_mode) as file, tqdm(
desc=model_name,
initial=exist_size,
total=total + exist_size,
unit="iB",
unit_scale=True,
unit_divisor=1000,
) as bar:
for data in resp.iter_content(chunk_size=1024):
size = file.write(data)
bar.update(size)
except Exception as e:
print(f"An error occurred while downloading {model_name}: {str(e)}")
return None
return model_dest
# ---------------------------------------------
def update_config_file(successfully_downloaded: dict, config_file: Path):
config_file = (
Path(config_file) if config_file is not None else default_config_file()
)
# In some cases (incomplete setup, etc), the default configs directory might be missing.
# Create it if it doesn't exist.
# this check is ignored if opt.config_file is specified - user is assumed to know what they
# are doing if they are passing a custom config file from elsewhere.
if config_file is default_config_file() and not config_file.parent.exists():
configs_src = Dataset_path.parent
configs_dest = default_config_file().parent
shutil.copytree(configs_src, configs_dest, dirs_exist_ok=True)
yaml = new_config_file_contents(successfully_downloaded, config_file)
try:
backup = None
if os.path.exists(config_file):
print(
f"** {config_file.name} exists. Renaming to {config_file.stem}.yaml.orig"
)
backup = config_file.with_suffix(".yaml.orig")
## Ugh. Windows is unable to overwrite an existing backup file, raises a WinError 183
if sys.platform == "win32" and backup.is_file():
backup.unlink()
config_file.rename(backup)
with TemporaryFile() as tmp:
tmp.write(Config_preamble.encode())
tmp.write(yaml.encode())
with open(str(config_file.expanduser().resolve()), "wb") as new_config:
tmp.seek(0)
new_config.write(tmp.read())
except Exception as e:
print(f"**Error creating config file {config_file}: {str(e)} **")
if backup is not None:
print("restoring previous config file")
## workaround, for WinError 183, see above
if sys.platform == "win32" and config_file.is_file():
config_file.unlink()
backup.rename(config_file)
return
print(f"Successfully created new configuration file {config_file}")
# ---------------------------------------------
def new_config_file_contents(
successfully_downloaded: dict, config_file: Path,
) -> str:
if config_file.exists():
conf = OmegaConf.load(str(config_file.expanduser().resolve()))
else:
conf = OmegaConf.create()
default_selected = None
for model in successfully_downloaded:
# a bit hacky - what we are doing here is seeing whether a checkpoint
# version of the model was previously defined, and whether the current
# model is a diffusers (indicated with a path)
if conf.get(model) and Path(successfully_downloaded[model]).is_dir():
delete_weights(model, conf[model])
stanza = {}
mod = initial_models()[model]
stanza["description"] = mod["description"]
stanza["repo_id"] = mod["repo_id"]
stanza["format"] = mod["format"]
# diffusers don't need width and height (probably .ckpt doesn't either)
# so we no longer require these in INITIAL_MODELS.yaml
if "width" in mod:
stanza["width"] = mod["width"]
if "height" in mod:
stanza["height"] = mod["height"]
if "file" in mod:
stanza["weights"] = os.path.relpath(
successfully_downloaded[model], start=Globals.root
)
stanza["config"] = os.path.normpath(os.path.join(sd_configs(), mod["config"]))
if "vae" in mod:
if "file" in mod["vae"]:
stanza["vae"] = os.path.normpath(
os.path.join(Model_dir, Weights_dir, mod["vae"]["file"])
)
else:
stanza["vae"] = mod["vae"]
if mod.get("default", False):
stanza["default"] = True
default_selected = True
conf[model] = stanza
# if no default model was chosen, then we select the first
# one in the list
if not default_selected:
conf[list(successfully_downloaded.keys())[0]]["default"] = True
return OmegaConf.to_yaml(conf)
# ---------------------------------------------
def delete_weights(model_name: str, conf_stanza: dict):
if not (weights := conf_stanza.get("weights")):
return
if re.match("/VAE/", conf_stanza.get("config")):
return
print(
f"\n** The checkpoint version of {model_name} is superseded by the diffusers version. Deleting the original file {weights}?"
)
weights = Path(weights)
if not weights.is_absolute():
weights = Path(Globals.root) / weights
try:
weights.unlink()
except OSError as e:
print(str(e))

View File

@@ -0,0 +1,139 @@
'''
Widget class definitions used by model_select.py, merge_diffusers.py and textual_inversion.py
'''
import math
import npyscreen
import curses
class IntSlider(npyscreen.Slider):
def translate_value(self):
stri = "%2d / %2d" % (self.value, self.out_of)
l = (len(str(self.out_of))) * 2 + 4
stri = stri.rjust(l)
return stri
# -------------------------------------
class CenteredTitleText(npyscreen.TitleText):
def __init__(self,*args,**keywords):
super().__init__(*args,**keywords)
self.resize()
def resize(self):
super().resize()
maxy, maxx = self.parent.curses_pad.getmaxyx()
label = self.name
self.relx = (maxx - len(label)) // 2
begin_entry_at = -self.relx + 2
# -------------------------------------
class CenteredButtonPress(npyscreen.ButtonPress):
def resize(self):
super().resize()
maxy, maxx = self.parent.curses_pad.getmaxyx()
label = self.name
self.relx = (maxx - len(label)) // 2
# -------------------------------------
class OffsetButtonPress(npyscreen.ButtonPress):
def __init__(self, screen, offset=0, *args, **keywords):
super().__init__(screen, *args, **keywords)
self.offset = offset
def resize(self):
maxy, maxx = self.parent.curses_pad.getmaxyx()
width = len(self.name)
self.relx = self.offset + (maxx - width) // 2
class IntTitleSlider(npyscreen.TitleText):
_entry_type = IntSlider
class FloatSlider(npyscreen.Slider):
# this is supposed to adjust display precision, but doesn't
def translate_value(self):
stri = "%3.2f / %3.2f" % (self.value, self.out_of)
l = (len(str(self.out_of))) * 2 + 4
stri = stri.rjust(l)
return stri
class FloatTitleSlider(npyscreen.TitleText):
_entry_type = FloatSlider
class MultiSelectColumns(npyscreen.MultiSelect):
def __init__(self, screen, columns: int=1, values: list=[], **keywords):
self.columns = columns
self.value_cnt = len(values)
self.rows = math.ceil(self.value_cnt / self.columns)
super().__init__(screen,values=values, **keywords)
def make_contained_widgets(self):
self._my_widgets = []
column_width = self.width // self.columns
for h in range(self.value_cnt):
self._my_widgets.append(
self._contained_widgets(self.parent,
rely=self.rely + (h % self.rows) * self._contained_widget_height,
relx=self.relx + (h // self.rows) * column_width,
max_width=column_width,
max_height=self.__class__._contained_widget_height,
)
)
def set_up_handlers(self):
super().set_up_handlers()
self.handlers.update({
curses.KEY_UP: self.h_cursor_line_left,
curses.KEY_DOWN: self.h_cursor_line_right,
}
)
def h_cursor_line_down(self, ch):
self.cursor_line += self.rows
if self.cursor_line >= len(self.values):
if self.scroll_exit:
self.cursor_line = len(self.values)-self.rows
self.h_exit_down(ch)
return True
else:
self.cursor_line -= self.rows
return True
def h_cursor_line_up(self, ch):
self.cursor_line -= self.rows
if self.cursor_line < 0:
if self.scroll_exit:
self.cursor_line = 0
self.h_exit_up(ch)
else:
self.cursor_line = 0
def h_cursor_line_left(self,ch):
super().h_cursor_line_up(ch)
def h_cursor_line_right(self,ch):
super().h_cursor_line_down(ch)
class TextBox(npyscreen.MultiLineEdit):
def update(self, clear=True):
if clear: self.clear()
HEIGHT = self.height
WIDTH = self.width
# draw box.
self.parent.curses_pad.hline(self.rely, self.relx, curses.ACS_HLINE, WIDTH)
self.parent.curses_pad.hline(self.rely + HEIGHT, self.relx, curses.ACS_HLINE, WIDTH)
self.parent.curses_pad.vline(self.rely, self.relx, curses.ACS_VLINE, self.height)
self.parent.curses_pad.vline(self.rely, self.relx+WIDTH, curses.ACS_VLINE, HEIGHT)
# draw corners
self.parent.curses_pad.addch(self.rely, self.relx, curses.ACS_ULCORNER, )
self.parent.curses_pad.addch(self.rely, self.relx+WIDTH, curses.ACS_URCORNER, )
self.parent.curses_pad.addch(self.rely+HEIGHT, self.relx, curses.ACS_LLCORNER, )
self.parent.curses_pad.addch(self.rely+HEIGHT, self.relx+WIDTH, curses.ACS_LRCORNER, )
# fool our superclass into thinking drawing area is smaller - this is really hacky but it seems to work
(relx,rely,height,width) = (self.relx, self.rely, self.height, self.width)
self.relx += 1
self.rely += 1
self.height -= 1
self.width -= 1
super().update(clear=False)
(self.relx,self.rely,self.height,self.width) = (relx, rely, height, width)

View File

@@ -339,7 +339,6 @@ class Generator:
if self.caution_img:
return self.caution_img
path = Path(web_assets.__path__[0]) / CAUTION_IMG
print(f'DEBUG: path to caution = {path}')
caution = Image.open(path)
self.caution_img = caution.resize((caution.width // 2, caution.height //2))
return self.caution_img

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import dataclasses
import inspect
import psutil
import secrets
from collections.abc import Sequence
from dataclasses import dataclass, field
@@ -32,7 +33,7 @@ from ldm.modules.lora_manager import LoraManager
from ..devices import normalize_device, CPU_DEVICE
from ..offloading import LazilyLoadedModelGroup, FullyLoadedModelGroup, ModelGroup
from ...models.diffusion.cross_attention_map_saving import AttentionMapSaver
from ...modules.prompt_to_embeddings_converter import WeightedPromptFragmentsToEmbeddingsConverter
from compel import EmbeddingsProvider
@dataclass
@@ -297,7 +298,7 @@ class StableDiffusionGeneratorPipeline(StableDiffusionPipeline):
self.lora_manager = LoraManager(self)
# InvokeAI's interface for text embeddings and whatnot
self.prompt_fragments_to_embeddings_converter = WeightedPromptFragmentsToEmbeddingsConverter(
self.embeddings_provider = EmbeddingsProvider(
tokenizer=self.tokenizer,
text_encoder=self.text_encoder,
textual_inversion_manager=self.textual_inversion_manager
@@ -311,7 +312,7 @@ class StableDiffusionGeneratorPipeline(StableDiffusionPipeline):
"""
if xformers is available, use it, otherwise use sliced attention.
"""
if is_xformers_available() and not Globals.disable_xformers:
if torch.cuda.is_available() and is_xformers_available() and not Globals.disable_xformers:
self.enable_xformers_memory_efficient_attention()
else:
if torch.backends.mps.is_available():
@@ -729,15 +730,15 @@ class StableDiffusionGeneratorPipeline(StableDiffusionPipeline):
"""
Compatibility function for ldm.models.diffusion.ddpm.LatentDiffusion.
"""
return self.prompt_fragments_to_embeddings_converter.get_embeddings_for_weighted_prompt_fragments(
text=c,
fragment_weights=fragment_weights,
return self.embeddings_provider.get_embeddings_for_weighted_prompt_fragments(
text_batch=c,
fragment_weights_batch=fragment_weights,
should_return_tokens=return_tokens,
device=self._model_group.device_for(self.unet))
@property
def cond_stage_model(self):
return self.prompt_fragments_to_embeddings_converter
return self.embeddings_provider
@torch.inference_mode()
def _tokenize(self, prompt: Union[str, List[str]]):

View File

@@ -40,8 +40,6 @@ class Omnibus(Img2Img,Txt2Img):
self.perlin = perlin
num_samples = 1
print('DEBUG: IN OMNIBUS')
sampler.make_schedule(
ddim_num_steps=steps, ddim_eta=ddim_eta, verbose=False
)

View File

@@ -20,6 +20,7 @@ from diffusers import logging as dlogging
from npyscreen import widget
from omegaconf import OmegaConf
from ldm.invoke.config.widgets import FloatTitleSlider
from ldm.invoke.globals import (Globals, global_cache_dir, global_config_file,
global_models_dir, global_set_root)
from ldm.invoke.model_manager import ModelManager
@@ -172,18 +173,6 @@ def _parse_args() -> Namespace:
# ------------------------- GUI HERE -------------------------
class FloatSlider(npyscreen.Slider):
def translate_value(self):
stri = "%3.2f / %3.2f" % (self.value, self.out_of)
l = (len(str(self.out_of))) * 2 + 4
stri = stri.rjust(l)
return stri
class FloatTitleSlider(npyscreen.TitleText):
_entry_type = FloatSlider
class mergeModelsForm(npyscreen.FormMultiPageAction):
interpolations = ["weighted_sum", "sigmoid", "inv_sigmoid"]

View File

@@ -11,10 +11,12 @@ import gc
import hashlib
import io
import os
import re
import sys
import textwrap
import time
import warnings
from enum import Enum
from pathlib import Path
from shutil import move, rmtree
from typing import Any, Optional, Union
@@ -31,12 +33,22 @@ from omegaconf.dictconfig import DictConfig
from picklescan.scanner import scan_file_path
from ldm.invoke.devices import CPU_DEVICE
from ldm.invoke.generator.diffusers_pipeline import \
StableDiffusionGeneratorPipeline
from ldm.invoke.globals import (Globals, global_autoscan_dir, global_cache_dir,
global_models_dir)
from ldm.util import (ask_user, download_with_resume,
url_attachment_name, instantiate_from_config)
from ldm.invoke.generator.diffusers_pipeline import StableDiffusionGeneratorPipeline
from ldm.invoke.globals import Globals, global_cache_dir
from ldm.util import (
ask_user,
download_with_resume,
instantiate_from_config,
url_attachment_name,
)
class SDLegacyType(Enum):
V1 = 1
V1_INPAINT = 2
V2 = 3
UNKNOWN = 99
DEFAULT_MAX_MODELS = 2
VAE_TO_REPO_ID = { # hack, see note in convert_and_import()
@@ -51,7 +63,7 @@ class ModelManager(object):
device_type: torch.device = CPU_DEVICE,
precision: str = "float16",
max_loaded_models=DEFAULT_MAX_MODELS,
sequential_offload = False
sequential_offload=False,
):
"""
Initialize with the path to the models.yaml config file,
@@ -129,6 +141,7 @@ class ModelManager(object):
for model_name in self.config:
if self.config[model_name].get("default"):
return model_name
return list(self.config.keys())[0] # first one
def set_default_model(self, model_name: str) -> None:
"""
@@ -375,21 +388,31 @@ class ModelManager(object):
print(
f">> Converting legacy checkpoint {model_name} into a diffusers model..."
)
from ldm.invoke.ckpt_to_diffuser import \
load_pipeline_from_original_stable_diffusion_ckpt
from ldm.invoke.ckpt_to_diffuser import (
load_pipeline_from_original_stable_diffusion_ckpt,
)
self.offload_model(self.current_model)
if vae_config := self._choose_diffusers_vae(model_name):
vae = self._load_vae(vae_config)
if self._has_cuda():
torch.cuda.empty_cache()
pipeline = load_pipeline_from_original_stable_diffusion_ckpt(
checkpoint_path=weights,
original_config_file=config,
vae=vae,
return_generator_pipeline=True,
precision=torch.float16
if self.precision == "float16"
else torch.float32,
)
if self.sequential_offload:
pipeline.enable_offload_submodels(self.device)
else:
pipeline.to(self.device)
return (
pipeline.to(self.device).to(
torch.float16 if self.precision == "float16" else torch.float32
),
pipeline,
width,
height,
"NOHASH",
@@ -466,19 +489,6 @@ class ModelManager(object):
for module in model.modules():
if isinstance(module, (torch.nn.Conv2d, torch.nn.ConvTranspose2d)):
module._orig_padding_mode = module.padding_mode
# usage statistics
toc = time.time()
print(">> Model loaded in", "%4.2fs" % (toc - tic))
if self._has_cuda():
print(
">> Max VRAM used to load the model:",
"%4.2fG" % (torch.cuda.max_memory_allocated() / 1e9),
"\n>> Current VRAM usage:"
"%4.2fG" % (torch.cuda.memory_allocated() / 1e9),
)
return model, width, height, model_hash
def _load_diffusers_model(self, mconfig):
@@ -496,8 +506,8 @@ class ModelManager(object):
safety_checker=None, local_files_only=not Globals.internet_available
)
if "vae" in mconfig and mconfig["vae"] is not None:
vae = self._load_vae(mconfig["vae"])
pipeline_args.update(vae=vae)
if vae := self._load_vae(mconfig["vae"]):
pipeline_args.update(vae=vae)
if not isinstance(name_or_path, Path):
pipeline_args.update(cache_dir=global_cache_dir("diffusers"))
if using_fp16:
@@ -555,7 +565,7 @@ class ModelManager(object):
f'"{model_name}" is not a known model name. Please check your models.yaml file'
)
if "path" in mconfig:
if "path" in mconfig and mconfig["path"] is not None:
path = Path(mconfig["path"])
if not path.is_absolute():
path = Path(Globals.root, path).resolve()
@@ -610,13 +620,13 @@ class ModelManager(object):
print("### Exiting InvokeAI")
sys.exit()
else:
print(">> Model scanned ok!")
print(">> Model scanned ok")
def import_diffuser_model(
self,
repo_or_path: Union[str, Path],
model_name: str = None,
description: str = None,
model_description: str = None,
vae: dict = None,
commit_to_conf: Path = None,
) -> bool:
@@ -632,21 +642,24 @@ class ModelManager(object):
models.yaml file.
"""
model_name = model_name or Path(repo_or_path).stem
description = description or f"imported diffusers model {model_name}"
description = model_description or f"imported diffusers model {model_name}"
new_config = dict(
description=description,
description=model_description,
vae=vae,
format="diffusers",
)
print(f"DEBUG: here i am 1")
if isinstance(repo_or_path, Path) and repo_or_path.exists():
new_config.update(path=str(repo_or_path))
else:
new_config.update(repo_id=repo_or_path)
print(f"DEBUG: here i am 2")
self.add_model(model_name, new_config, True)
print(f"DEBUG: config = {self.config}")
if commit_to_conf:
self.commit(commit_to_conf)
return True
return model_name
def import_ckpt_model(
self,
@@ -656,7 +669,7 @@ class ModelManager(object):
model_name: str = None,
model_description: str = None,
commit_to_conf: Path = None,
) -> bool:
) -> str:
"""
Attempts to install the indicated ckpt file and returns True if successful.
@@ -673,19 +686,23 @@ class ModelManager(object):
then these will be derived from the weight file name. If you provide a commit_to_conf
path to the configuration file, then the new entry will be committed to the
models.yaml file.
Return value is the name of the imported file, or None if an error occurred.
"""
if str(weights).startswith(("http:", "https:")):
model_name = model_name or url_attachment_name(weights)
weights_path = self._resolve_path(weights, "models/ldm/stable-diffusion-v1")
config_path = self._resolve_path(config, "configs/stable-diffusion")
config_path = self._resolve_path(config, "configs/stable-diffusion")
if weights_path is None or not weights_path.exists():
return False
return
if config_path is None or not config_path.exists():
return False
return
model_name = model_name or Path(weights).stem # note this gives ugly pathnames if used on a URL without a Content-Disposition header
model_name = (
model_name or Path(weights).stem
) # note this gives ugly pathnames if used on a URL without a Content-Disposition header
model_description = (
model_description or f"imported stable diffusion weights file {model_name}"
)
@@ -702,43 +719,205 @@ class ModelManager(object):
self.add_model(model_name, new_config, True)
if commit_to_conf:
self.commit(commit_to_conf)
return True
return model_name
def autoconvert_weights(
@classmethod
def probe_model_type(self, checkpoint: dict) -> SDLegacyType:
"""
Given a pickle or safetensors model object, probes contents
of the object and returns an SDLegacyType indicating its
format. Valid return values include:
SDLegacyType.V1
SDLegacyType.V1_INPAINT
SDLegacyType.V2
UNKNOWN
"""
key_name = "model.diffusion_model.input_blocks.2.1.transformer_blocks.0.attn2.to_k.weight"
if key_name in checkpoint and checkpoint[key_name].shape[-1] == 1024:
return SDLegacyType.V2
try:
state_dict = checkpoint.get("state_dict") or checkpoint
in_channels = state_dict[
"model.diffusion_model.input_blocks.0.0.weight"
].shape[1]
if in_channels == 9:
return SDLegacyType.V1_INPAINT
elif in_channels == 4:
return SDLegacyType.V1
else:
return SDLegacyType.UNKNOWN
except KeyError:
return SDLegacyType.UNKNOWN
def heuristic_import(
self,
conf_path: Path,
weights_directory: Path = None,
dest_directory: Path = None,
):
path_url_or_repo: str,
convert: bool = False,
model_name: str = None,
description: str = None,
commit_to_conf: Path = None,
) -> str:
"""
Scan the indicated directory for .ckpt files, convert into diffuser models,
and import.
Accept a string which could be:
- a HF diffusers repo_id
- a URL pointing to a legacy .ckpt or .safetensors file
- a local path pointing to a legacy .ckpt or .safetensors file
- a local directory containing .ckpt and .safetensors files
- a local directory containing a diffusers model
After determining the nature of the model and downloading it
(if necessary), the file is probed to determine the correct
configuration file (if needed) and it is imported.
The model_name and/or description can be provided. If not, they will
be generated automatically.
If convert is true, legacy models will be converted to diffusers
before importing.
If commit_to_conf is provided, the newly loaded model will be written
to the `models.yaml` file at the indicated path. Otherwise, the changes
will only remain in memory.
The (potentially derived) name of the model is returned on success, or None
on failure. When multiple models are added from a directory, only the last
imported one is returned.
"""
weights_directory = weights_directory or global_autoscan_dir()
dest_directory = dest_directory or Path(
global_models_dir(), Globals.converted_ckpts_dir
)
model_path: Path = None
thing = path_url_or_repo # to save typing
print(">> Checking for unconverted .ckpt files in {weights_directory}")
ckpt_files = dict()
for root, dirs, files in os.walk(weights_directory):
for f in files:
if not f.endswith(".ckpt"):
continue
basename = Path(f).stem
dest = Path(dest_directory, basename)
if not dest.exists():
ckpt_files[Path(root, f)] = dest
print(f">> Probing {thing} for import")
if len(ckpt_files) == 0:
if thing.startswith(("http:", "https:", "ftp:")):
print(f" | {thing} appears to be a URL")
model_path = self._resolve_path(
thing, "models/ldm/stable-diffusion-v1"
) # _resolve_path does a download if needed
elif Path(thing).is_file() and thing.endswith((".ckpt", ".safetensors")):
if Path(thing).stem in ["model", "diffusion_pytorch_model"]:
print(
f" | {Path(thing).name} appears to be part of a diffusers model. Skipping import"
)
return
else:
print(f" | {thing} appears to be a checkpoint file on disk")
model_path = self._resolve_path(thing, "models/ldm/stable-diffusion-v1")
elif Path(thing).is_dir() and Path(thing, "model_index.json").exists():
print(f" | {thing} appears to be a diffusers file on disk")
model_name = self.import_diffuser_model(
thing,
vae=dict(repo_id="stabilityai/sd-vae-ft-mse"),
model_name=model_name,
description=description,
commit_to_conf=commit_to_conf,
)
elif Path(thing).is_dir():
if (Path(thing) / "model_index.json").exists():
print(f">> {thing} appears to be a diffusers model.")
model_name = self.import_diffuser_model(
thing, commit_to_conf=commit_to_conf
)
else:
print(
f">> {thing} appears to be a directory. Will scan for models to import"
)
for m in list(Path(thing).rglob("*.ckpt")) + list(
Path(thing).rglob("*.safetensors")
):
if model_name := self.heuristic_import(
str(m), convert, commit_to_conf=commit_to_conf
):
print(f" >> {model_name} successfully imported")
return model_name
elif re.match(r"^[\w.+-]+/[\w.+-]+$", thing):
print(f" | {thing} appears to be a HuggingFace diffusers repo_id")
model_name = self.import_diffuser_model(
thing, commit_to_conf=commit_to_conf
)
pipeline, _, _, _ = self._load_diffusers_model(self.config[model_name])
else:
print(
f"** {thing}: Unknown thing. Please provide a URL, file path, directory or HuggingFace repo_id"
)
# Model_path is set in the event of a legacy checkpoint file.
# If not set, we're all done
if not model_path:
return
print(
f">> New .ckpt file(s) found in {weights_directory}. Optimizing and importing..."
if model_path.stem in self.config: # already imported
print(" | Already imported. Skipping")
return
# another round of heuristics to guess the correct config file.
checkpoint = (
safetensors.torch.load_file(model_path)
if model_path.suffix == ".safetensors"
else torch.load(model_path)
)
for ckpt in ckpt_files:
self.convert_and_import(ckpt, ckpt_files[ckpt])
self.commit(conf_path)
model_type = self.probe_model_type(checkpoint)
model_config_file = None
if model_type == SDLegacyType.V1:
print(" | SD-v1 model detected")
model_config_file = Path(
Globals.root, "configs/stable-diffusion/v1-inference.yaml"
)
elif model_type == SDLegacyType.V1_INPAINT:
print(" | SD-v1 inpainting model detected")
model_config_file = Path(
Globals.root, "configs/stable-diffusion/v1-inpainting-inference.yaml"
)
elif model_type == SDLegacyType.V2:
print(
" | SD-v2 model detected; model will be converted to diffusers format"
)
model_config_file = Path(
Globals.root, "configs/stable-diffusion/v2-inference-v.yaml"
)
convert = True
else:
print(
f"** {thing} is a legacy checkpoint file but not in a known Stable Diffusion model. Skipping import"
)
return
if convert:
diffuser_path = Path(
Globals.root, "models", Globals.converted_ckpts_dir, model_path.stem
)
model_name = self.convert_and_import(
model_path,
diffusers_path=diffuser_path,
vae=dict(repo_id="stabilityai/sd-vae-ft-mse"),
model_name=model_name,
model_description=description,
original_config_file=model_config_file,
commit_to_conf=commit_to_conf,
)
else:
model_name = self.import_ckpt_model(
model_path,
config=model_config_file,
model_name=model_name,
model_description=description,
vae=str(
Path(
Globals.root,
"models/ldm/stable-diffusion-v1/vae-ft-mse-840000-ema-pruned.ckpt",
)
),
commit_to_conf=commit_to_conf,
)
if commit_to_conf:
self.commit(commit_to_conf)
return model_name
def convert_and_import(
self,
@@ -754,6 +933,12 @@ class ModelManager(object):
Convert a legacy ckpt weights file to diffuser model and import
into models.yaml.
"""
ckpt_path = self._resolve_path(ckpt_path, "models/ldm/stable-diffusion-v1")
if original_config_file:
original_config_file = self._resolve_path(
original_config_file, "configs/stable-diffusion"
)
new_config = None
from ldm.invoke.ckpt_to_diffuser import convert_ckpt_to_diffuser
@@ -768,7 +953,7 @@ class ModelManager(object):
model_description = model_description or f"Optimized version of {model_name}"
print(f">> Optimizing {model_name} (30-60s)")
try:
# By passing the specified VAE too the conversion function, the autoencoder
# By passing the specified VAE to the conversion function, the autoencoder
# will be built into the model rather than tacked on afterward via the config file
vae_model = self._load_vae(vae) if vae else None
convert_ckpt_to_diffuser(
@@ -795,9 +980,11 @@ class ModelManager(object):
print(">> Conversion succeeded")
except Exception as e:
print(f"** Conversion failed: {str(e)}")
print("** If you are trying to convert an inpainting or 2.X model, please indicate the correct config file (e.g. v1-inpainting-inference.yaml)")
print(
"** If you are trying to convert an inpainting or 2.X model, please indicate the correct config file (e.g. v1-inpainting-inference.yaml)"
)
return new_config
return model_name
def search_models(self, search_folder):
print(f">> Finding Models In: {search_folder}")
@@ -812,10 +999,11 @@ class ModelManager(object):
found_models = []
for file in files:
location = str(file.resolve()).replace("\\", "/")
if 'model.safetensors' not in location and 'diffusion_pytorch_model.safetensors' not in location:
found_models.append(
{"name": file.stem, "location": location}
)
if (
"model.safetensors" not in location
and "diffusion_pytorch_model.safetensors" not in location
):
found_models.append({"name": file.stem, "location": location})
return search_folder, found_models
@@ -975,7 +1163,7 @@ class ModelManager(object):
print("** Migration is done. Continuing...")
def _resolve_path(
self, source: Union[str, Path], dest_directory: str
self, source: Union[str, Path], dest_directory: str
) -> Optional[Path]:
resolved_path = None
if str(source).startswith(("http:", "https:", "ftp:")):
@@ -1113,7 +1301,12 @@ class ModelManager(object):
def _load_vae(self, vae_config) -> AutoencoderKL:
vae_args = {}
name_or_path = self.model_name_or_path(vae_config)
try:
name_or_path = self.model_name_or_path(vae_config)
except Exception:
return None
if name_or_path is None:
return None
using_fp16 = self.precision == "float16"
vae_args.update(

View File

@@ -1,655 +0,0 @@
import string
from typing import Union, Optional
import re
import pyparsing as pp
'''
This module parses prompt strings and produces tree-like structures that can be used generate and control the conditioning tensors.
weighted subprompts.
Useful class exports:
PromptParser - parses prompts
Useful function exports:
split_weighted_subpromopts() split subprompts, normalize and weight them
log_tokenization() print out colour-coded tokens and warn if truncated
'''
class Prompt():
"""
Mid-level structure for storing the tree-like result of parsing a prompt. A Prompt may not represent the whole of
the singular user-defined "prompt string" (although it can) - for example, if the user specifies a Blend, the objects
that are to be blended together are stored individuall as Prompt objects.
Nesting makes this object not suitable for directly tokenizing; instead call flatten() on the containing Conjunction
to produce a FlattenedPrompt.
"""
def __init__(self, parts: list):
for c in parts:
if type(c) is not Attention and not issubclass(type(c), BaseFragment) and type(c) is not pp.ParseResults:
raise PromptParser.ParsingException(f"Prompt cannot contain {type(c).__name__} ({c}), only {[c.__name__ for c in BaseFragment.__subclasses__()]} are allowed")
self.children = parts
def __repr__(self):
return f"Prompt:{self.children}"
def __eq__(self, other):
return type(other) is Prompt and other.children == self.children
class BaseFragment:
pass
class FlattenedPrompt():
"""
A Prompt that has been passed through flatten(). Its children can be readily tokenized.
"""
def __init__(self, parts: list=[]):
self.children = []
for part in parts:
self.append(part)
def append(self, fragment: Union[list, BaseFragment, tuple]):
# verify type correctness
if type(fragment) is list:
for x in fragment:
self.append(x)
elif issubclass(type(fragment), BaseFragment):
self.children.append(fragment)
elif type(fragment) is tuple:
# upgrade tuples to Fragments
if type(fragment[0]) is not str or (type(fragment[1]) is not float and type(fragment[1]) is not int):
raise PromptParser.ParsingException(
f"FlattenedPrompt cannot contain {fragment}, only Fragments or (str, float) tuples are allowed")
self.children.append(Fragment(fragment[0], fragment[1]))
else:
raise PromptParser.ParsingException(
f"FlattenedPrompt cannot contain {fragment}, only Fragments or (str, float) tuples are allowed")
@property
def is_empty(self):
return len(self.children) == 0 or \
(len(self.children) == 1 and len(self.children[0].text) == 0)
@property
def wants_cross_attention_control(self):
return any(
[issubclass(type(x), CrossAttentionControlledFragment) for x in self.children]
)
def __repr__(self):
return f"FlattenedPrompt:{self.children}"
def __eq__(self, other):
return type(other) is FlattenedPrompt and other.children == self.children
class Fragment(BaseFragment):
"""
A Fragment is a chunk of plain text and an optional weight. The text should be passed as-is to the CLIP tokenizer.
"""
def __init__(self, text: str, weight: float=1):
assert(type(text) is str)
if '\\"' in text or '\\(' in text or '\\)' in text:
#print("Fragment converting escaped \( \) \\\" into ( ) \"")
text = text.replace('\\(', '(').replace('\\)', ')').replace('\\"', '"')
self.text = text
self.weight = float(weight)
def __repr__(self):
return "Fragment:'"+self.text+"'@"+str(self.weight)
def __eq__(self, other):
return type(other) is Fragment \
and other.text == self.text \
and other.weight == self.weight
class Attention():
"""
Nestable weight control for fragments. Each object in the children array may in turn be an Attention object;
weights should be considered to accumulate as the tree is traversed to deeper levels of nesting.
Do not traverse directly; instead obtain a FlattenedPrompt by calling Flatten() on a top-level Conjunction object.
"""
def __init__(self, weight: float, children: list):
if type(weight) is not float:
raise PromptParser.ParsingException(
f"Attention weight must be float (got {type(weight).__name__} {weight})")
self.weight = weight
if type(children) is not list:
raise PromptParser.ParsingException(f"cannot make Attention with non-list of children (got {type(children)})")
assert(type(children) is list)
self.children = children
#print(f"A: requested attention '{children}' to {weight}")
def __repr__(self):
return f"Attention:{self.children} * {self.weight}"
def __eq__(self, other):
return type(other) is Attention and other.weight == self.weight and other.fragment == self.fragment
class CrossAttentionControlledFragment(BaseFragment):
pass
class CrossAttentionControlSubstitute(CrossAttentionControlledFragment):
"""
A Cross-Attention Controlled ('prompt2prompt') fragment, for use inside a Prompt, Attention, or FlattenedPrompt.
Representing an "original" word sequence that supplies feature vectors for an initial diffusion operation, and an
"edited" word sequence, to which the attention maps produced by the "original" word sequence are applied. Intuitively,
the result should be an "edited" image that looks like the "original" image with concepts swapped.
eg "a cat sitting on a car" (original) -> "a smiling dog sitting on a car" (edited): the edited image should look
almost exactly the same as the original, but with a smiling dog rendered in place of the cat. The
CrossAttentionControlSubstitute object representing this swap may be confined to the tokens being swapped:
CrossAttentionControlSubstitute(original=[Fragment('cat')], edited=[Fragment('dog')])
or it may represent a larger portion of the token sequence:
CrossAttentionControlSubstitute(original=[Fragment('a cat sitting on a car')],
edited=[Fragment('a smiling dog sitting on a car')])
In either case expect it to be embedded in a Prompt or FlattenedPrompt:
FlattenedPrompt([
Fragment('a'),
CrossAttentionControlSubstitute(original=[Fragment('cat')], edited=[Fragment('dog')]),
Fragment('sitting on a car')
])
"""
def __init__(self, original: list, edited: list, options: dict=None):
self.original = original if len(original)>0 else [Fragment('')]
self.edited = edited if len(edited)>0 else [Fragment('')]
default_options = {
's_start': 0.0,
's_end': 0.2062994740159002, # ~= shape_freedom=0.5
't_start': 0.1,
't_end': 1.0
}
merged_options = default_options
if options is not None:
shape_freedom = options.pop('shape_freedom', None)
if shape_freedom is not None:
# high shape freedom = SD can do what it wants with the shape of the object
# high shape freedom => s_end = 0
# low shape freedom => s_end = 1
# shape freedom is in a "linear" space, while noticeable changes to s_end are typically closer around 0,
# and there is very little perceptible difference as s_end increases above 0.5
# so for shape_freedom = 0.5 we probably want s_end to be 0.2
# -> cube root and subtract from 1.0
merged_options['s_end'] = 1.0 - shape_freedom ** (1. / 3.)
#print('converted shape_freedom argument to', merged_options)
merged_options.update(options)
self.options = merged_options
def __repr__(self):
return f"CrossAttentionControlSubstitute:({self.original}->{self.edited} ({self.options})"
def __eq__(self, other):
return type(other) is CrossAttentionControlSubstitute \
and other.original == self.original \
and other.edited == self.edited \
and other.options == self.options
class CrossAttentionControlAppend(CrossAttentionControlledFragment):
def __init__(self, fragment: Fragment):
self.fragment = fragment
def __repr__(self):
return "CrossAttentionControlAppend:",self.fragment
def __eq__(self, other):
return type(other) is CrossAttentionControlAppend \
and other.fragment == self.fragment
class Conjunction():
"""
Storage for one or more Prompts or Blends, each of which is to be separately diffused and then the results merged
by weighted sum in latent space.
"""
def __init__(self, prompts: list, weights: list = None):
# force everything to be a Prompt
#print("making conjunction with", prompts, "types", [type(p).__name__ for p in prompts])
self.prompts = [x if (type(x) is Prompt
or type(x) is Blend
or type(x) is FlattenedPrompt)
else Prompt(x) for x in prompts]
self.weights = [1.0]*len(self.prompts) if (weights is None or len(weights)==0) else list(weights)
if len(self.weights) != len(self.prompts):
raise PromptParser.ParsingException(f"while parsing Conjunction: mismatched parts/weights counts {prompts}, {weights}")
self.type = 'AND'
def __repr__(self):
return f"Conjunction:{self.prompts} | weights {self.weights}"
def __eq__(self, other):
return type(other) is Conjunction \
and other.prompts == self.prompts \
and other.weights == self.weights
class Blend():
"""
Stores a Blend of multiple Prompts. To apply, build feature vectors for each of the child Prompts and then perform a
weighted blend of the feature vectors to produce a single feature vector that is effectively a lerp between the
Prompts.
"""
def __init__(self, prompts: list, weights: list[float], normalize_weights: bool=True):
#print("making Blend with prompts", prompts, "and weights", weights)
weights = [1.0]*len(prompts) if (weights is None or len(weights)==0) else list(weights)
if len(prompts) != len(weights):
raise PromptParser.ParsingException(f"while parsing Blend: mismatched prompts/weights counts {prompts}, {weights}")
for p in prompts:
if type(p) is not Prompt and type(p) is not FlattenedPrompt:
raise(PromptParser.ParsingException(f"{type(p)} cannot be added to a Blend, only Prompts or FlattenedPrompts"))
for f in p.children:
if isinstance(f, CrossAttentionControlSubstitute):
raise(PromptParser.ParsingException(f"while parsing Blend: sorry, you cannot do .swap() as part of a Blend"))
# upcast all lists to Prompt objects
self.prompts = [x if (type(x) is Prompt or type(x) is FlattenedPrompt)
else Prompt(x)
for x in prompts]
self.prompts = prompts
self.weights = weights
self.normalize_weights = normalize_weights
@property
def wants_cross_attention_control(self):
# blends cannot cross-attention control
return False
def __repr__(self):
return f"Blend:{self.prompts} | weights {' ' if self.normalize_weights else '(non-normalized) '}{self.weights}"
def __eq__(self, other):
return other.__repr__() == self.__repr__()
class PromptParser():
class ParsingException(Exception):
pass
class UnrecognizedOperatorException(ParsingException):
def __init__(self, operator:str):
super().__init__("Unrecognized operator: " + operator)
def __init__(self, attention_plus_base=1.1, attention_minus_base=0.9):
self.conjunction, self.prompt = build_parser_syntax(attention_plus_base, attention_minus_base)
def parse_conjunction(self, prompt: str) -> Conjunction:
'''
:param prompt: The prompt string to parse
:return: a Conjunction representing the parsed results.
'''
#print(f"!!parsing '{prompt}'")
if len(prompt.strip()) == 0:
return Conjunction(prompts=[FlattenedPrompt([('', 1.0)])], weights=[1.0])
root = self.conjunction.parse_string(prompt)
#print(f"'{prompt}' parsed to root", root)
#fused = fuse_fragments(parts)
#print("fused to", fused)
return self.flatten(root[0])
def parse_legacy_blend(self, text: str, skip_normalize: bool = False) -> Optional[Blend]:
weighted_subprompts = split_weighted_subprompts(text, skip_normalize=skip_normalize)
if len(weighted_subprompts) <= 1:
return None
strings = [x[0] for x in weighted_subprompts]
weights = [x[1] for x in weighted_subprompts]
parsed_conjunctions = [self.parse_conjunction(x) for x in strings]
flattened_prompts = [x.prompts[0] for x in parsed_conjunctions]
return Blend(prompts=flattened_prompts, weights=weights, normalize_weights=not skip_normalize)
def flatten(self, root: Conjunction, verbose = False) -> Conjunction:
"""
Flattening a Conjunction traverses all of the nested tree-like structures in each of its Prompts or Blends,
producing from each of these walks a linear sequence of Fragment or CrossAttentionControlSubstitute objects
that can be readily tokenized without the need to walk a complex tree structure.
:param root: The Conjunction to flatten.
:return: A Conjunction containing the result of flattening each of the prompts in the passed-in root.
"""
def fuse_fragments(items):
# print("fusing fragments in ", items)
result = []
for x in items:
if type(x) is CrossAttentionControlSubstitute:
original_fused = fuse_fragments(x.original)
edited_fused = fuse_fragments(x.edited)
result.append(CrossAttentionControlSubstitute(original_fused, edited_fused, options=x.options))
else:
last_weight = result[-1].weight \
if (len(result) > 0 and not issubclass(type(result[-1]), CrossAttentionControlledFragment)) \
else None
this_text = x.text
this_weight = x.weight
if last_weight is not None and last_weight == this_weight:
last_text = result[-1].text
result[-1] = Fragment(last_text + ' ' + this_text, last_weight)
else:
result.append(x)
return result
def flatten_internal(node, weight_scale, results, prefix):
verbose and print(prefix + "flattening", node, "...")
if type(node) is pp.ParseResults or type(node) is list:
for x in node:
results = flatten_internal(x, weight_scale, results, prefix+' pr ')
#print(prefix, " ParseResults expanded, results is now", results)
elif type(node) is Attention:
# if node.weight < 1:
# todo: inject a blend when flattening attention with weight <1"
for index,c in enumerate(node.children):
results = flatten_internal(c, weight_scale * node.weight, results, prefix + f" att{index} ")
elif type(node) is Fragment:
results += [Fragment(node.text, node.weight*weight_scale)]
elif type(node) is CrossAttentionControlSubstitute:
original = flatten_internal(node.original, weight_scale, [], prefix + ' CAo ')
edited = flatten_internal(node.edited, weight_scale, [], prefix + ' CAe ')
results += [CrossAttentionControlSubstitute(original, edited, options=node.options)]
elif type(node) is Blend:
flattened_subprompts = []
#print(" flattening blend with prompts", node.prompts, "weights", node.weights)
for prompt in node.prompts:
# prompt is a list
flattened_subprompts = flatten_internal(prompt, weight_scale, flattened_subprompts, prefix+'B ')
results += [Blend(prompts=flattened_subprompts, weights=node.weights, normalize_weights=node.normalize_weights)]
elif type(node) is Prompt:
#print(prefix + "about to flatten Prompt with children", node.children)
flattened_prompt = []
for child in node.children:
flattened_prompt = flatten_internal(child, weight_scale, flattened_prompt, prefix+'P ')
results += [FlattenedPrompt(parts=fuse_fragments(flattened_prompt))]
#print(prefix + "after flattening Prompt, results is", results)
else:
raise PromptParser.ParsingException(f"unhandled node type {type(node)} when flattening {node}")
verbose and print(prefix + "-> after flattening", type(node).__name__, "results is", results)
return results
verbose and print("flattening", root)
flattened_parts = []
for part in root.prompts:
flattened_parts += flatten_internal(part, 1.0, [], ' C| ')
verbose and print("flattened to", flattened_parts)
weights = root.weights
return Conjunction(flattened_parts, weights)
def build_parser_syntax(attention_plus_base: float, attention_minus_base: float):
def make_operator_object(x):
#print('making operator for', x)
target = x[0]
operator = x[1]
arguments = x[2]
if operator == '.attend':
weight_raw = arguments[0]
weight = 1.0
if type(weight_raw) is float or type(weight_raw) is int:
weight = weight_raw
elif type(weight_raw) is str:
base = attention_plus_base if weight_raw[0] == '+' else attention_minus_base
weight = pow(base, len(weight_raw))
return Attention(weight=weight, children=[x for x in x[0]])
elif operator == '.swap':
return CrossAttentionControlSubstitute(target, arguments, x.as_dict())
elif operator == '.blend':
prompts = [Prompt(p) for p in x[0]]
weights_raw = x[2]
normalize_weights = True
if len(weights_raw) > 0 and weights_raw[-1][0] == 'no_normalize':
normalize_weights = False
weights_raw = weights_raw[:-1]
weights = [float(w[0]) for w in weights_raw]
return Blend(prompts=prompts, weights=weights, normalize_weights=normalize_weights)
elif operator == '.and' or operator == '.add':
prompts = [Prompt(p) for p in x[0]]
weights = [float(w[0]) for w in x[2]]
return Conjunction(prompts=prompts, weights=weights)
raise PromptParser.UnrecognizedOperatorException(operator)
def parse_fragment_str(x, expression: pp.ParseExpression, in_quotes: bool = False, in_parens: bool = False):
#print(f"parsing fragment string for {x}")
fragment_string = x[0]
if len(fragment_string.strip()) == 0:
return Fragment('')
if in_quotes:
# escape unescaped quotes
fragment_string = fragment_string.replace('"', '\\"')
try:
result = (expression + pp.StringEnd()).parse_string(fragment_string)
#print("parsed to", result)
return result
except pp.ParseException as e:
#print("parse_fragment_str couldn't parse prompt string:", e)
raise
# meaningful symbols
lparen = pp.Literal("(").suppress()
rparen = pp.Literal(")").suppress()
quote = pp.Literal('"').suppress()
comma = pp.Literal(",").suppress()
dot = pp.Literal(".").suppress()
equals = pp.Literal("=").suppress()
escaped_lparen = pp.Literal('\\(')
escaped_rparen = pp.Literal('\\)')
escaped_quote = pp.Literal('\\"')
escaped_comma = pp.Literal('\\,')
escaped_dot = pp.Literal('\\.')
escaped_plus = pp.Literal('\\+')
escaped_minus = pp.Literal('\\-')
escaped_equals = pp.Literal('\\=')
syntactic_symbols = {
'(': escaped_lparen,
')': escaped_rparen,
'"': escaped_quote,
',': escaped_comma,
'.': escaped_dot,
'+': escaped_plus,
'-': escaped_minus,
'=': escaped_equals,
}
syntactic_chars = "".join(syntactic_symbols.keys())
# accepts int or float notation, always maps to float
number = pp.pyparsing_common.real | \
pp.Combine(pp.Optional("-")+pp.Word(pp.nums)).set_parse_action(pp.token_map(float))
# for options
keyword = pp.Word(pp.alphanums + '_')
# a word that absolutely does not contain any meaningful syntax
non_syntax_word = pp.Combine(pp.OneOrMore(pp.MatchFirst([
pp.Or(syntactic_symbols.values()),
pp.one_of(['-', '+']) + pp.NotAny(pp.White() | pp.Char(syntactic_chars) | pp.StringEnd()),
# build character-by-character
pp.CharsNotIn(string.whitespace + syntactic_chars, exact=1)
])))
non_syntax_word.set_parse_action(lambda x: [Fragment(t) for t in x])
non_syntax_word.set_name('non_syntax_word')
non_syntax_word.set_debug(False)
# a word that can contain any character at all - greedily consumes syntax, so use with care
free_word = pp.CharsNotIn(string.whitespace).set_parse_action(lambda x: Fragment(x[0]))
free_word.set_name('free_word')
free_word.set_debug(False)
# ok here we go. forward declare some things..
attention = pp.Forward()
cross_attention_substitute = pp.Forward()
parenthesized_fragment = pp.Forward()
quoted_fragment = pp.Forward()
# the types of things that can go into a fragment, consisting of syntax-full and/or strictly syntax-free components
fragment_part_expressions = [
attention,
cross_attention_substitute,
parenthesized_fragment,
quoted_fragment,
non_syntax_word
]
# a fragment that is permitted to contain commas
fragment_including_commas = pp.ZeroOrMore(pp.MatchFirst(
fragment_part_expressions + [
pp.Literal(',').set_parse_action(lambda x: Fragment(x[0]))
]
))
# a fragment that is not permitted to contain commas
fragment_excluding_commas = pp.ZeroOrMore(pp.MatchFirst(
fragment_part_expressions
))
# a fragment in double quotes (may be nested)
quoted_fragment << pp.QuotedString(quote_char='"', esc_char=None, esc_quote='\\"')
quoted_fragment.set_parse_action(lambda x: parse_fragment_str(x, fragment_including_commas, in_quotes=True))
# a fragment inside parentheses (may be nested)
parenthesized_fragment << (lparen + fragment_including_commas + rparen)
parenthesized_fragment.set_name('parenthesized_fragment')
parenthesized_fragment.set_debug(False)
# a string of the form (<keyword>=<float|keyword> | <float> | <keyword>) where keyword is alphanumeric + '_'
option = pp.Group(pp.MatchFirst([
keyword + equals + (number | keyword), # option=value
number.copy().set_parse_action(pp.token_map(str)), # weight
keyword # flag
]))
# options for an operator, eg "s_start=0.1, 0.3, no_normalize"
options = pp.Dict(pp.Optional(pp.delimited_list(option)))
options.set_name('options')
options.set_debug(False)
# a fragment which can be used as the target for an operator - either quoted or in parentheses, or a bare vanilla word
potential_operator_target = (quoted_fragment | parenthesized_fragment | non_syntax_word)
# a fragment whose weight has been increased or decreased by a given amount
attention_weight_operator = pp.Word('+') | pp.Word('-') | number
attention_explicit = (
pp.Group(potential_operator_target)
+ pp.Literal('.attend')
+ lparen
+ pp.Group(attention_weight_operator)
+ rparen
)
attention_explicit.set_parse_action(make_operator_object)
attention_implicit = (
pp.Group(potential_operator_target)
+ pp.NotAny(pp.White()) # do not permit whitespace between term and operator
+ pp.Group(attention_weight_operator)
)
attention_implicit.set_parse_action(lambda x: make_operator_object([x[0], '.attend', x[1]]))
attention << (attention_explicit | attention_implicit)
attention.set_name('attention')
attention.set_debug(False)
# cross-attention control by swapping one fragment for another
cross_attention_substitute << (
pp.Group(potential_operator_target).set_name('ca-target').set_debug(False)
+ pp.Literal(".swap").set_name('ca-operator').set_debug(False)
+ lparen
+ pp.Group(fragment_excluding_commas).set_name('ca-replacement').set_debug(False)
+ pp.Optional(comma + options).set_name('ca-options').set_debug(False)
+ rparen
)
cross_attention_substitute.set_name('cross_attention_substitute')
cross_attention_substitute.set_debug(False)
cross_attention_substitute.set_parse_action(make_operator_object)
# an entire self-contained prompt, which can be used in a Blend or Conjunction
prompt = pp.ZeroOrMore(pp.MatchFirst([
cross_attention_substitute,
attention,
quoted_fragment,
parenthesized_fragment,
free_word,
pp.White().suppress()
]))
quoted_prompt = quoted_fragment.copy().set_parse_action(lambda x: parse_fragment_str(x, prompt, in_quotes=True))
# a blend/lerp between the feature vectors for two or more prompts
blend = (
lparen
+ pp.Group(pp.delimited_list(pp.Group(potential_operator_target | quoted_prompt), min=1)).set_name('bl-target').set_debug(False)
+ rparen
+ pp.Literal(".blend").set_name('bl-operator').set_debug(False)
+ lparen
+ pp.Group(options).set_name('bl-options').set_debug(False)
+ rparen
)
blend.set_name('blend')
blend.set_debug(False)
blend.set_parse_action(make_operator_object)
# an operator to direct stable diffusion to step multiple times, once for each target, and then add the results together with different weights
explicit_conjunction = (
lparen
+ pp.Group(pp.delimited_list(pp.Group(potential_operator_target | quoted_prompt), min=1)).set_name('cj-target').set_debug(False)
+ rparen
+ pp.one_of([".and", ".add"]).set_name('cj-operator').set_debug(False)
+ lparen
+ pp.Group(options).set_name('cj-options').set_debug(False)
+ rparen
)
explicit_conjunction.set_name('explicit_conjunction')
explicit_conjunction.set_debug(False)
explicit_conjunction.set_parse_action(make_operator_object)
# by default a prompt consists of a Conjunction with a single term
implicit_conjunction = (blend | pp.Group(prompt)) + pp.StringEnd()
implicit_conjunction.set_parse_action(lambda x: Conjunction(x))
conjunction = (explicit_conjunction | implicit_conjunction)
return conjunction, prompt
def split_weighted_subprompts(text, skip_normalize=False)->list:
"""
Legacy blend parsing.
grabs all text up to the first occurrence of ':'
uses the grabbed text as a sub-prompt, and takes the value following ':' as weight
if ':' has no value defined, defaults to 1.0
repeats until no text remaining
"""
prompt_parser = re.compile("""
(?P<prompt> # capture group for 'prompt'
(?:\\\:|[^:])+ # match one or more non ':' characters or escaped colons '\:'
) # end 'prompt'
(?: # non-capture group
:+ # match one or more ':' characters
(?P<weight> # capture group for 'weight'
-?\d+(?:\.\d+)? # match positive or negative integer or decimal number
)? # end weight capture group, make optional
\s* # strip spaces after weight
| # OR
$ # else, if no ':' then match end of line
) # end non-capture group
""", re.VERBOSE)
parsed_prompts = [(match.group("prompt").replace("\\:", ":"), float(
match.group("weight") or 1)) for match in re.finditer(prompt_parser, text)]
if skip_normalize:
return parsed_prompts
weight_sum = sum(map(lambda x: x[1], parsed_prompts))
if weight_sum == 0:
print(
"* Warning: Subprompt weights add up to zero. Discarding and using even weights instead.")
equal_weight = 1 / max(len(parsed_prompts), 1)
return [(x[0], equal_weight) for x in parsed_prompts]
return [(x[0], x[1] / weight_sum) for x in parsed_prompts]

View File

@@ -421,7 +421,6 @@ def do_front_end(args: Namespace):
save_args(args)
try:
print(f"DEBUG: args = {args}")
do_textual_inversion_training(**args)
copy_to_embeddings_folder(args)
except Exception as e:
@@ -454,7 +453,7 @@ def main():
'** Not enough window space for the interface. Please make your window larger and try again.'
)
else:
print(f"** A layout error has occurred: {str(e)}")
print(f"** An error has occurred: {str(e)}")
sys.exit(-1)

View File

@@ -430,7 +430,7 @@ class TextualInversionDataset(Dataset):
placeholder_token="*",
center_crop=False,
):
self.data_root = data_root
self.data_root = Path(data_root)
self.tokenizer = tokenizer
self.learnable_property = learnable_property
self.size = size
@@ -439,9 +439,9 @@ class TextualInversionDataset(Dataset):
self.flip_p = flip_p
self.image_paths = [
os.path.join(self.data_root, file_path)
for file_path in os.listdir(self.data_root)
if os.path.isfile(file_path) and file_path.endswith(('.png','.PNG','.jpg','.JPG','.jpeg','.JPEG','.gif','.GIF'))
self.data_root / file_path
for file_path in self.data_root.iterdir()
if file_path.is_file() and file_path.name.endswith(('.png','.PNG','.jpg','.JPG','.jpeg','.JPEG','.gif','.GIF'))
]
self.num_images = len(self.image_paths)

View File

@@ -1,3 +1,8 @@
# adapted from bloc97's CrossAttentionControl colab
# https://github.com/bloc97/CrossAttentionControl
import enum
import math
from typing import Optional, Callable
@@ -6,35 +11,13 @@ import psutil
import torch
import diffusers
from torch import nn
from compel.cross_attention_control import Arguments
from diffusers.models.unet_2d_condition import UNet2DConditionModel
from diffusers.models.cross_attention import AttnProcessor
from ldm.invoke.devices import torch_dtype
# adapted from bloc97's CrossAttentionControl colab
# https://github.com/bloc97/CrossAttentionControl
class Arguments:
def __init__(self, edited_conditioning: torch.Tensor, edit_opcodes: list[tuple], edit_options: dict):
"""
:param edited_conditioning: if doing cross-attention control, the edited conditioning [1 x 77 x 768]
:param edit_opcodes: if doing cross-attention control, a list of difflib.SequenceMatcher-like opcodes describing how to map original conditioning tokens to edited conditioning tokens (only the 'equal' opcode is required)
:param edit_options: if doing cross-attention control, per-edit options. there should be 1 item in edit_options for each item in edit_opcodes.
"""
# todo: rewrite this to take embedding fragments rather than a single edited_conditioning vector
self.edited_conditioning = edited_conditioning
self.edit_opcodes = edit_opcodes
if edited_conditioning is not None:
assert len(edit_opcodes) == len(edit_options), \
"there must be 1 edit_options dict for each edit_opcodes tuple"
non_none_edit_options = [x for x in edit_options if x is not None]
assert len(non_none_edit_options)>0, "missing edit_options"
if len(non_none_edit_options)>1:
print('warning: cross-attention control options are not working properly for >1 edit')
self.edit_options = non_none_edit_options[0]
class CrossAttentionType(enum.Enum):
SELF = 1
TOKENS = 2
@@ -319,7 +302,6 @@ def override_cross_attention(model, context: Context, is_running_diffusers = Fal
Inject attention parameters and functions into the passed in model to enable cross attention editing.
:param model: The unet model to inject into.
:param cross_attention_control_args: Arugments passeed to the CrossAttentionControl implementations
:return: None
"""
@@ -523,7 +505,7 @@ from dataclasses import field, dataclass
import torch
from diffusers.models.cross_attention import CrossAttention, CrossAttnProcessor, SlicedAttnProcessor, AttnProcessor
from diffusers.models.cross_attention import CrossAttention, CrossAttnProcessor, SlicedAttnProcessor
@dataclass

View File

@@ -1,236 +0,0 @@
import math
import torch
from transformers import CLIPTokenizer, CLIPTextModel
from ldm.invoke.devices import torch_dtype
from ldm.modules.textual_inversion_manager import TextualInversionManager
class WeightedPromptFragmentsToEmbeddingsConverter():
def __init__(self,
tokenizer: CLIPTokenizer, # converts strings to lists of int token ids
text_encoder: CLIPTextModel, # convert a list of int token ids to a tensor of embeddings
textual_inversion_manager: TextualInversionManager = None
):
self.tokenizer = tokenizer
self.text_encoder = text_encoder
self.textual_inversion_manager = textual_inversion_manager
@property
def max_length(self):
return self.tokenizer.model_max_length
def get_embeddings_for_weighted_prompt_fragments(self,
text: list[list[str]],
fragment_weights: list[list[float]],
should_return_tokens: bool = False,
device='cpu'
) -> torch.Tensor:
'''
:param text: A list of fragments of text to which different weights are to be applied.
:param fragment_weights: A batch of lists of weights, one for each entry in `fragments`.
:return: A tensor of shape `[1, 77, token_dim]` containing weighted embeddings where token_dim is 768 for SD1
and 1280 for SD2
'''
if len(text) != len(fragment_weights):
raise ValueError(f"lengths of text and fragment_weights lists are not the same ({len(text)} != {len(fragment_weights)})")
batch_z = None
batch_tokens = None
for fragments, weights in zip(text, fragment_weights):
# First, weight tokens in individual fragments by scaling the feature vectors as requested (effectively
# applying a multiplier to the CFG scale on a per-token basis).
# For tokens weighted<1, intuitively we want SD to become not merely *less* interested in the concept
# captured by the fragment but actually *dis*interested in it (a 0.01 interest in "red" is still an active
# interest, however small, in redness; what the user probably intends when they attach the number 0.01 to
# "red" is to tell SD that it should almost completely *ignore* redness).
# To do this, the embedding is lerped away from base_embedding in the direction of an embedding for a prompt
# string from which the low-weighted fragment has been simply removed. The closer the weight is to zero, the
# closer the resulting embedding is to an embedding for a prompt that simply lacks this fragment.
# handle weights >=1
tokens, per_token_weights = self.get_token_ids_and_expand_weights(fragments, weights, device=device)
base_embedding = self.build_weighted_embedding_tensor(tokens, per_token_weights)
# this is our starting point
embeddings = base_embedding.unsqueeze(0)
per_embedding_weights = [1.0]
# now handle weights <1
# Do this by building extra embeddings tensors that lack the words being <1 weighted. These will be lerped
# with the embeddings tensors that have the words, such that if the weight of a word is 0.5, the resulting
# embedding will be exactly half-way between the unweighted prompt and the prompt with the <1 weighted words
# removed.
# eg for "mountain:1 man:0.5", intuitively the "man" should be "half-gone". therefore, append an embedding
# for "mountain" (i.e. without "man") to the already-produced embedding for "mountain man", and weight it
# such that the resulting lerped embedding is exactly half-way between "mountain man" and "mountain".
for index, fragment_weight in enumerate(weights):
if fragment_weight < 1:
fragments_without_this = fragments[:index] + fragments[index+1:]
weights_without_this = weights[:index] + weights[index+1:]
tokens, per_token_weights = self.get_token_ids_and_expand_weights(fragments_without_this, weights_without_this, device=device)
embedding_without_this = self.build_weighted_embedding_tensor(tokens, per_token_weights)
embeddings = torch.cat((embeddings, embedding_without_this.unsqueeze(0)), dim=1)
# weight of the embedding *without* this fragment gets *stronger* as its weight approaches 0
# if fragment_weight = 0, basically we want embedding_without_this to completely overwhelm base_embedding
# therefore:
# fragment_weight = 1: we are at base_z => lerp weight 0
# fragment_weight = 0.5: we are halfway between base_z and here => lerp weight 1
# fragment_weight = 0: we're now entirely overriding base_z ==> lerp weight inf
# so let's use tan(), because:
# tan is 0.0 at 0,
# 1.0 at PI/4, and
# inf at PI/2
# -> tan((1-weight)*PI/2) should give us ideal lerp weights
epsilon = 1e-9
fragment_weight = max(epsilon, fragment_weight) # inf is bad
embedding_lerp_weight = math.tan((1.0 - fragment_weight) * math.pi / 2)
# todo handle negative weight?
per_embedding_weights.append(embedding_lerp_weight)
lerped_embeddings = self.apply_embedding_weights(embeddings, per_embedding_weights, normalize=True).squeeze(0)
#print(f"assembled tokens for '{fragments}' into tensor of shape {lerped_embeddings.shape}")
# append to batch
batch_z = lerped_embeddings.unsqueeze(0) if batch_z is None else torch.cat([batch_z, lerped_embeddings.unsqueeze(0)], dim=1)
batch_tokens = tokens.unsqueeze(0) if batch_tokens is None else torch.cat([batch_tokens, tokens.unsqueeze(0)], dim=1)
# should have shape (B, 77, 768)
#print(f"assembled all tokens into tensor of shape {batch_z.shape}")
if should_return_tokens:
return batch_z, batch_tokens
else:
return batch_z
def get_token_ids(self, fragments: list[str], include_start_and_end_markers: bool = True) -> list[list[int]]:
"""
Convert a list of strings like `["a cat", "sitting", "on a mat"]` into a list of lists of token ids like
`[[bos, 0, 1, eos], [bos, 2, eos], [bos, 3, 0, 4, eos]]`. bos/eos markers are skipped if
`include_start_and_end_markers` is `False`. Each list will be restricted to the maximum permitted length
(typically 75 tokens + eos/bos markers).
:param fragments: The strings to convert.
:param include_start_and_end_markers:
:return:
"""
# for args documentation see ENCODE_KWARGS_DOCSTRING in tokenization_utils_base.py (in `transformers` lib)
token_ids_list = self.tokenizer(
fragments,
truncation=True,
max_length=self.max_length,
return_overflowing_tokens=False,
padding='do_not_pad',
return_tensors=None, # just give me lists of ints
)['input_ids']
result = []
for token_ids in token_ids_list:
# trim eos/bos
token_ids = token_ids[1:-1]
# pad for textual inversions with vector length >1
token_ids = self.textual_inversion_manager.expand_textual_inversion_token_ids_if_necessary(token_ids)
# restrict length to max_length-2 (leaving room for bos/eos)
token_ids = token_ids[0:self.max_length - 2]
# add back eos/bos if requested
if include_start_and_end_markers:
token_ids = [self.tokenizer.bos_token_id] + token_ids + [self.tokenizer.eos_token_id]
result.append(token_ids)
return result
@classmethod
def apply_embedding_weights(self, embeddings: torch.Tensor, per_embedding_weights: list[float], normalize:bool) -> torch.Tensor:
per_embedding_weights = torch.tensor(per_embedding_weights, dtype=embeddings.dtype, device=embeddings.device)
if normalize:
per_embedding_weights = per_embedding_weights / torch.sum(per_embedding_weights)
reshaped_weights = per_embedding_weights.reshape(per_embedding_weights.shape + (1, 1,))
#reshaped_weights = per_embedding_weights.reshape(per_embedding_weights.shape + (1,1,)).expand(embeddings.shape)
return torch.sum(embeddings * reshaped_weights, dim=1)
# lerped embeddings has shape (77, 768)
def get_token_ids_and_expand_weights(self, fragments: list[str], weights: list[float], device: str) -> (torch.Tensor, torch.Tensor):
'''
Given a list of text fragments and corresponding weights: tokenize each fragment, append the token sequences
together and return a padded token sequence starting with the bos marker, ending with the eos marker, and padded
or truncated as appropriate to `self.max_length`. Also return a list of weights expanded from the passed-in
weights to match each token.
:param fragments: Text fragments to tokenize and concatenate. May be empty.
:param weights: Per-fragment weights (i.e. quasi-CFG scaling). Values from 0 to inf are permitted. In practise with SD1.5
values >1.6 tend to produce garbage output. Must have same length as `fragment`.
:return: A tuple of tensors `(token_ids, weights)`. `token_ids` is ints, `weights` is floats, both have shape `[self.max_length]`.
'''
if len(fragments) != len(weights):
raise ValueError(f"lengths of text and fragment_weights lists are not the same ({len(fragments)} != {len(weights)})")
# empty is meaningful
if len(fragments) == 0:
fragments = ['']
weights = [1.0]
per_fragment_token_ids = self.get_token_ids(fragments, include_start_and_end_markers=False)
all_token_ids = []
per_token_weights = []
#print("all fragments:", fragments, weights)
for this_fragment_token_ids, weight in zip(per_fragment_token_ids, weights):
# append
all_token_ids += this_fragment_token_ids
# fill out weights tensor with one float per token
per_token_weights += [float(weight)] * len(this_fragment_token_ids)
# leave room for bos/eos
max_token_count_without_bos_eos_markers = self.max_length - 2
if len(all_token_ids) > max_token_count_without_bos_eos_markers:
excess_token_count = len(all_token_ids) - max_token_count_without_bos_eos_markers
# TODO build nice description string of how the truncation was applied
# this should be done by calling self.tokenizer.convert_ids_to_tokens() then passing the result to
# self.tokenizer.convert_tokens_to_string() for the token_ids on each side of the truncation limit.
print(f">> Prompt is {excess_token_count} token(s) too long and has been truncated")
all_token_ids = all_token_ids[0:max_token_count_without_bos_eos_markers]
per_token_weights = per_token_weights[0:max_token_count_without_bos_eos_markers]
# pad out to a self.max_length-entry array: [bos_token, <prompt tokens>, eos_token, pad_token…]
# (typically self.max_length == 77)
all_token_ids = [self.tokenizer.bos_token_id] + all_token_ids + [self.tokenizer.eos_token_id]
per_token_weights = [1.0] + per_token_weights + [1.0]
pad_length = self.max_length - len(all_token_ids)
all_token_ids += [self.tokenizer.pad_token_id] * pad_length
per_token_weights += [1.0] * pad_length
all_token_ids_tensor = torch.tensor(all_token_ids, dtype=torch.long, device=device)
per_token_weights_tensor = torch.tensor(per_token_weights, dtype=torch_dtype(self.text_encoder.device), device=device)
#print(f"assembled all_token_ids_tensor with shape {all_token_ids_tensor.shape}")
return all_token_ids_tensor, per_token_weights_tensor
def build_weighted_embedding_tensor(self, token_ids: torch.Tensor, per_token_weights: torch.Tensor) -> torch.Tensor:
'''
Build a tensor that embeds the passed-in token IDs and applies the given per_token weights
:param token_ids: A tensor of shape `[self.max_length]` containing token IDs (ints)
:param per_token_weights: A tensor of shape `[self.max_length]` containing weights (floats)
:return: A tensor of shape `[1, self.max_length, token_dim]` representing the requested weighted embeddings
where `token_dim` is 768 for SD1 and 1280 for SD2.
'''
#print(f"building weighted embedding tensor for {tokens} with weights {per_token_weights}")
if token_ids.shape != torch.Size([self.max_length]):
raise ValueError(f"token_ids has shape {token_ids.shape} - expected [{self.max_length}]")
z = self.text_encoder(token_ids.unsqueeze(0), return_dict=False)[0]
empty_token_ids = torch.tensor([self.tokenizer.bos_token_id] +
[self.tokenizer.pad_token_id] * (self.max_length-2) +
[self.tokenizer.eos_token_id], dtype=torch.int, device=z.device).unsqueeze(0)
empty_z = self.text_encoder(empty_token_ids).last_hidden_state
batch_weights_expanded = per_token_weights.reshape(per_token_weights.shape + (1,)).expand(z.shape).to(z)
z_delta_from_empty = z - empty_z
weighted_z = empty_z + (z_delta_from_empty * batch_weights_expanded)
return weighted_z

View File

@@ -8,6 +8,7 @@ import torch
from picklescan.scanner import scan_file_path
from transformers import CLIPTextModel, CLIPTokenizer
from compel.embeddings_provider import BaseTextualInversionManager
from ldm.invoke.concepts_lib import HuggingFaceConceptsLibrary
@@ -23,7 +24,7 @@ class TextualInversion:
return self.embedding.shape[0]
class TextualInversionManager:
class TextualInversionManager(BaseTextualInversionManager):
def __init__(
self,
tokenizer: CLIPTokenizer,
@@ -134,7 +135,7 @@ class TextualInversionManager:
def _add_textual_inversion(
self, trigger_str, embedding, defer_injecting_tokens=False
) -> TextualInversion:
) -> Optional[TextualInversion]:
"""
Add a textual inversion to be recognised.
:param trigger_str: The trigger text in the prompt that activates this textual inversion. If unknown to the embedder's tokenizer, will be added.

View File

@@ -306,8 +306,12 @@ def download_with_resume(url: str, dest: Path, access_token: str = None) -> Path
dest/filename
:param access_token: Access token to access this resource
'''
resp = requests.get(url, stream=True)
total = int(resp.headers.get("content-length", 0))
header = {"Authorization": f"Bearer {access_token}"} if access_token else {}
open_mode = "wb"
exist_size = 0
resp = requests.get(url, header, stream=True)
content_length = int(resp.headers.get("content-length", 0))
if dest.is_dir():
try:
@@ -318,41 +322,41 @@ def download_with_resume(url: str, dest: Path, access_token: str = None) -> Path
else:
dest.parent.mkdir(parents=True, exist_ok=True)
print(f'DEBUG: after many manipulations, dest={dest}')
header = {"Authorization": f"Bearer {access_token}"} if access_token else {}
open_mode = "wb"
exist_size = 0
if dest.exists():
exist_size = dest.stat().st_size
header["Range"] = f"bytes={exist_size}-"
open_mode = "ab"
resp = requests.get(url, headers=header, stream=True) # new request with range
if exist_size > content_length:
print('* corrupt existing file found. re-downloading')
os.remove(dest)
exist_size = 0
if (
resp.status_code == 416
): # "range not satisfiable", which means nothing to return
resp.status_code == 416 or exist_size == content_length
):
print(f"* {dest}: complete file found. Skipping.")
return dest
elif resp.status_code == 206 or exist_size > 0:
print(f"* {dest}: partial file found. Resuming...")
elif resp.status_code != 200:
print(f"** An error occurred during downloading {dest}: {resp.reason}")
elif exist_size > 0:
print(f"* {dest}: partial file found. Resuming...")
else:
print(f"* {dest}: Downloading...")
try:
if total < 2000:
if content_length < 2000:
print(f"*** ERROR DOWNLOADING {url}: {resp.text}")
return None
with open(dest, open_mode) as file, tqdm(
desc=str(dest),
initial=exist_size,
total=total + exist_size,
unit="iB",
unit_scale=True,
unit_divisor=1000,
desc=str(dest),
initial=exist_size,
total=content_length,
unit="iB",
unit_scale=True,
unit_divisor=1000,
) as bar:
for data in resp.iter_content(chunk_size=1024):
size = file.write(data)