diff --git a/invokeai/app/invocations/generate.py b/invokeai/app/invocations/generate.py index 83220d89efa..fc3aca4a195 100644 --- a/invokeai/app/invocations/generate.py +++ b/invokeai/app/invocations/generate.py @@ -21,7 +21,8 @@ import re from ...backend.model_management.lora import ModelPatcher from ...backend.stable_diffusion.diffusers_pipeline import StableDiffusionGeneratorPipeline -from .model import UNetField, ClipField, VaeField +from .model import UNetField, VaeField +from .compel import ConditioningField from contextlib import contextmanager, ExitStack, ContextDecorator SAMPLER_NAME_VALUES = Literal[tuple(InvokeAIGenerator.schedulers())] @@ -63,19 +64,15 @@ class InpaintInvocation(BaseInvocation): type: Literal["inpaint"] = "inpaint" - prompt: Optional[str] = Field(description="The prompt to generate an image from") + positive_conditioning: Optional[ConditioningField] = Field(description="Positive conditioning for generation") + negative_conditioning: Optional[ConditioningField] = Field(description="Negative conditioning for generation") seed: int = Field(ge=0, le=SEED_MAX, description="The seed to use (omit for random)", default_factory=get_random_seed) steps: int = Field(default=30, gt=0, description="The number of steps to use to generate the image") width: int = Field(default=512, multiple_of=8, gt=0, description="The width of the resulting image", ) height: int = Field(default=512, multiple_of=8, gt=0, description="The height of the resulting image", ) cfg_scale: float = Field(default=7.5, ge=1, description="The Classifier-Free Guidance, higher values may result in a result closer to the prompt", ) scheduler: SAMPLER_NAME_VALUES = Field(default="euler", description="The scheduler to use" ) - #model: str = Field(default="", description="The model to use (currently ignored)") - #progress_images: bool = Field(default=False, description="Whether or not to produce progress images during generation", ) - #control_model: Optional[str] = Field(default=None, description="The control model to use") - #control_image: Optional[ImageField] = Field(default=None, description="The processed control image") unet: UNetField = Field(default=None, description="UNet model") - clip: ClipField = Field(default=None, description="Clip model") vae: VaeField = Field(default=None, description="Vae model") # Inputs @@ -151,64 +148,34 @@ def dispatch_progress( source_node_id=source_node_id, ) + def get_conditioning(self, context): + c, extra_conditioning_info = context.services.latents.get(self.positive_conditioning.conditioning_name) + uc, _ = context.services.latents.get(self.negative_conditioning.conditioning_name) + + return (uc, c, extra_conditioning_info) + @contextmanager - def load_model_old_way(self, context): - with ExitStack() as stack: - unet_info = context.services.model_manager.get_model(**self.unet.unet.dict()) - tokenizer_info = context.services.model_manager.get_model(**self.clip.tokenizer.dict()) - text_encoder_info = context.services.model_manager.get_model(**self.clip.text_encoder.dict()) - vae_info = context.services.model_manager.get_model(**self.vae.vae.dict()) - - #unet = stack.enter_context(unet_info) - #tokenizer = stack.enter_context(tokenizer_info) - #text_encoder = stack.enter_context(text_encoder_info) - #vae = stack.enter_context(vae_info) - with vae_info as vae: - device = vae.device - dtype = vae.dtype - - # not load models to gpu as it should be handled by pipeline - unet = unet_info.context.model - tokenizer = tokenizer_info.context.model - text_encoder = text_encoder_info.context.model - vae = vae_info.context.model - - scheduler = get_scheduler( - context=context, - scheduler_info=self.unet.scheduler, - scheduler_name=self.scheduler, - ) + def load_model_old_way(self, context, scheduler): + unet_info = context.services.model_manager.get_model(**self.unet.unet.dict()) + vae_info = context.services.model_manager.get_model(**self.vae.vae.dict()) + + #unet = unet_info.context.model + #vae = vae_info.context.model + with ExitStack() as stack: loras = [(stack.enter_context(context.services.model_manager.get_model(**lora.dict(exclude={"weight"}))), lora.weight) for lora in self.unet.loras] - ti_list = [] - for trigger in re.findall(r"<[a-zA-Z0-9., _-]+>", self.prompt): - name = trigger[1:-1] - try: - ti_list.append( - stack.enter_context( - context.services.model_manager.get_model( - model_name=name, - base_model=self.clip.text_encoder.base_model, - model_type=ModelType.TextualInversion, - ) - ) - ) - except Exception: - #print(e) - #import traceback - #print(traceback.format_exc()) - print(f"Warn: trigger: \"{trigger}\" not found") - - - with ModelPatcher.apply_lora_unet(unet, loras),\ - ModelPatcher.apply_lora_text_encoder(text_encoder, loras),\ - ModelPatcher.apply_ti(tokenizer, text_encoder, ti_list) as (ti_tokenizer, ti_manager): + + with vae_info as vae,\ + unet_info as unet,\ + ModelPatcher.apply_lora_unet(unet, loras): + + device = context.services.model_manager.mgr.cache.execution_device + dtype = context.services.model_manager.mgr.cache.precision pipeline = StableDiffusionGeneratorPipeline( - # TODO: ti_manager vae=vae, - text_encoder=text_encoder, - tokenizer=ti_tokenizer, + text_encoder=None, + tokenizer=None, unet=unet, scheduler=scheduler, safety_checker=None, @@ -242,14 +209,22 @@ def invoke(self, context: InvocationContext) -> ImageOutput: ) source_node_id = graph_execution_state.prepared_source_mapping[self.id] - with self.load_model_old_way(context) as model: + conditioning = self.get_conditioning(context) + scheduler = get_scheduler( + context=context, + scheduler_info=self.unet.scheduler, + scheduler_name=self.scheduler, + ) + + with self.load_model_old_way(context, scheduler) as model: outputs = Inpaint(model).generate( - prompt=self.prompt, + conditioning=conditioning, + scheduler=scheduler, init_image=image, mask_image=mask, step_callback=partial(self.dispatch_progress, context, source_node_id), **self.dict( - exclude={"prompt", "image", "mask"} + exclude={"positive_conditioning", "negative_conditioning", "scheduler", "image", "mask"} ), # Shorthand for passing all of the parameters above manually ) diff --git a/invokeai/backend/generator/base.py b/invokeai/backend/generator/base.py index fb293ab5b2d..a379cf63509 100644 --- a/invokeai/backend/generator/base.py +++ b/invokeai/backend/generator/base.py @@ -29,7 +29,6 @@ from ..image_util import configure_model_padding from ..util.util import rand_perlin_2d from ..safety_checker import SafetyChecker -from ..prompting.conditioning import get_uc_and_c_and_ec from ..stable_diffusion.diffusers_pipeline import StableDiffusionGeneratorPipeline from ..stable_diffusion.schedulers import SCHEDULER_MAP @@ -81,13 +80,15 @@ def __init__(self, self.params=params self.kwargs = kwargs - def generate(self, - prompt: str='', - callback: Optional[Callable]=None, - step_callback: Optional[Callable]=None, - iterations: int=1, - **keyword_args, - )->Iterator[InvokeAIGeneratorOutput]: + def generate( + self, + conditioning: tuple, + scheduler, + callback: Optional[Callable]=None, + step_callback: Optional[Callable]=None, + iterations: int=1, + **keyword_args, + )->Iterator[InvokeAIGeneratorOutput]: ''' Return an iterator across the indicated number of generations. Each time the iterator is called it will return an InvokeAIGeneratorOutput @@ -116,11 +117,6 @@ def generate(self, model_name = model_info.name model_hash = model_info.hash with model_info.context as model: - scheduler: Scheduler = self.get_scheduler( - model=model, - scheduler_name=generator_args.get('scheduler') - ) - uc, c, extra_conditioning_info = get_uc_and_c_and_ec(prompt,model=model) gen_class = self._generator_class() generator = gen_class(model, self.params.precision, **self.kwargs) if self.params.variation_amount > 0: @@ -143,12 +139,12 @@ def generate(self, iteration_count = range(iterations) if iterations else itertools.count(start=0, step=1) for i in iteration_count: - results = generator.generate(prompt, - conditioning=(uc, c, extra_conditioning_info), - step_callback=step_callback, - sampler=scheduler, - **generator_args, - ) + results = generator.generate( + conditioning=conditioning, + step_callback=step_callback, + sampler=scheduler, + **generator_args, + ) output = InvokeAIGeneratorOutput( image=results[0][0], seed=results[0][1], @@ -170,20 +166,6 @@ def schedulers(self)->List[str]: def load_generator(self, model: StableDiffusionGeneratorPipeline, generator_class: Type[Generator]): return generator_class(model, self.params.precision) - def get_scheduler(self, scheduler_name:str, model: StableDiffusionGeneratorPipeline)->Scheduler: - scheduler_class, scheduler_extra_config = SCHEDULER_MAP.get(scheduler_name, SCHEDULER_MAP['ddim']) - - scheduler_config = model.scheduler.config - if "_backup" in scheduler_config: - scheduler_config = scheduler_config["_backup"] - scheduler_config = {**scheduler_config, **scheduler_extra_config, "_backup": scheduler_config} - scheduler = scheduler_class.from_config(scheduler_config) - - # hack copied over from generate.py - if not hasattr(scheduler, 'uses_inpainting_model'): - scheduler.uses_inpainting_model = lambda: False - return scheduler - @classmethod def _generator_class(cls)->Type[Generator]: ''' @@ -281,7 +263,7 @@ def __init__(self, model: DiffusionPipeline, precision: str, **kwargs): self.model = model self.precision = precision self.seed = None - self.latent_channels = model.channels + self.latent_channels = model.unet.config.in_channels self.downsampling_factor = downsampling # BUG: should come from model or config self.safety_checker = None self.perlin = 0.0 @@ -292,7 +274,7 @@ def __init__(self, model: DiffusionPipeline, precision: str, **kwargs): self.free_gpu_mem = None # this is going to be overridden in img2img.py, txt2img.py and inpaint.py - def get_make_image(self, prompt, **kwargs): + def get_make_image(self, **kwargs): """ Returns a function returning an image derived from the prompt and the initial image Return value depends on the seed at the time you call it @@ -308,7 +290,6 @@ def set_variation(self, seed, variation_amount, with_variations): def generate( self, - prompt, width, height, sampler, @@ -333,7 +314,6 @@ def generate( saver.get_stacked_maps_image() ) make_image = self.get_make_image( - prompt, sampler=sampler, init_image=init_image, width=width, diff --git a/invokeai/backend/generator/embiggen.py b/invokeai/backend/generator/embiggen.py deleted file mode 100644 index 6eae5732b03..00000000000 --- a/invokeai/backend/generator/embiggen.py +++ /dev/null @@ -1,559 +0,0 @@ -""" -invokeai.backend.generator.embiggen descends from .generator -and generates with .generator.img2img -""" - -import numpy as np -import torch -from PIL import Image -from tqdm import trange - -import invokeai.backend.util.logging as logger - -from .base import Generator -from .img2img import Img2Img - -class Embiggen(Generator): - def __init__(self, model, precision): - super().__init__(model, precision) - self.init_latent = None - - # Replace generate because Embiggen doesn't need/use most of what it does normallly - def generate( - self, - prompt, - iterations=1, - seed=None, - image_callback=None, - step_callback=None, - **kwargs, - ): - make_image = self.get_make_image(prompt, step_callback=step_callback, **kwargs) - results = [] - seed = seed if seed else self.new_seed() - - # Noise will be generated by the Img2Img generator when called - for _ in trange(iterations, desc="Generating"): - # make_image will call Img2Img which will do the equivalent of get_noise itself - image = make_image() - results.append([image, seed]) - if image_callback is not None: - image_callback(image, seed, prompt_in=prompt) - seed = self.new_seed() - return results - - @torch.no_grad() - def get_make_image( - self, - prompt, - sampler, - steps, - cfg_scale, - ddim_eta, - conditioning, - init_img, - strength, - width, - height, - embiggen, - embiggen_tiles, - step_callback=None, - **kwargs, - ): - """ - Returns a function returning an image derived from the prompt and multi-stage twice-baked potato layering over the img2img on the initial image - Return value depends on the seed at the time you call it - """ - assert ( - not sampler.uses_inpainting_model() - ), "--embiggen is not supported by inpainting models" - - # Construct embiggen arg array, and sanity check arguments - if embiggen == None: # embiggen can also be called with just embiggen_tiles - embiggen = [1.0] # If not specified, assume no scaling - elif embiggen[0] < 0: - embiggen[0] = 1.0 - logger.warning( - "Embiggen scaling factor cannot be negative, fell back to the default of 1.0 !" - ) - if len(embiggen) < 2: - embiggen.append(0.75) - elif embiggen[1] > 1.0 or embiggen[1] < 0: - embiggen[1] = 0.75 - logger.warning( - "Embiggen upscaling strength for ESRGAN must be between 0 and 1, fell back to the default of 0.75 !" - ) - if len(embiggen) < 3: - embiggen.append(0.25) - elif embiggen[2] < 0: - embiggen[2] = 0.25 - logger.warning( - "Overlap size for Embiggen must be a positive ratio between 0 and 1 OR a number of pixels, fell back to the default of 0.25 !" - ) - - # Convert tiles from their user-freindly count-from-one to count-from-zero, because we need to do modulo math - # and then sort them, because... people. - if embiggen_tiles: - embiggen_tiles = list(map(lambda n: n - 1, embiggen_tiles)) - embiggen_tiles.sort() - - if strength >= 0.5: - logger.warning( - f"Embiggen may produce mirror motifs if the strength (-f) is too high (currently {strength}). Try values between 0.35-0.45." - ) - - # Prep img2img generator, since we wrap over it - gen_img2img = Img2Img(self.model, self.precision) - - # Open original init image (not a tensor) to manipulate - initsuperimage = Image.open(init_img) - - with Image.open(init_img) as img: - initsuperimage = img.convert("RGB") - - # Size of the target super init image in pixels - initsuperwidth, initsuperheight = initsuperimage.size - - # Increase by scaling factor if not already resized, using ESRGAN as able - if embiggen[0] != 1.0: - initsuperwidth = round(initsuperwidth * embiggen[0]) - initsuperheight = round(initsuperheight * embiggen[0]) - if embiggen[1] > 0: # No point in ESRGAN upscaling if strength is set zero - from ..restoration.realesrgan import ESRGAN - - esrgan = ESRGAN() - logger.info( - f"ESRGAN upscaling init image prior to cutting with Embiggen with strength {embiggen[1]}" - ) - if embiggen[0] > 2: - initsuperimage = esrgan.process( - initsuperimage, - embiggen[1], # upscale strength - self.seed, - 4, # upscale scale - ) - else: - initsuperimage = esrgan.process( - initsuperimage, - embiggen[1], # upscale strength - self.seed, - 2, # upscale scale - ) - # We could keep recursively re-running ESRGAN for a requested embiggen[0] larger than 4x - # but from personal experiance it doesn't greatly improve anything after 4x - # Resize to target scaling factor resolution - initsuperimage = initsuperimage.resize( - (initsuperwidth, initsuperheight), Image.Resampling.LANCZOS - ) - - # Use width and height as tile widths and height - # Determine buffer size in pixels - if embiggen[2] < 1: - if embiggen[2] < 0: - embiggen[2] = 0 - overlap_size_x = round(embiggen[2] * width) - overlap_size_y = round(embiggen[2] * height) - else: - overlap_size_x = round(embiggen[2]) - overlap_size_y = round(embiggen[2]) - - # With overall image width and height known, determine how many tiles we need - def ceildiv(a, b): - return -1 * (-a // b) - - # X and Y needs to be determined independantly (we may have savings on one based on the buffer pixel count) - # (initsuperwidth - width) is the area remaining to the right that we need to layers tiles to fill - # (width - overlap_size_x) is how much new we can fill with a single tile - emb_tiles_x = 1 - emb_tiles_y = 1 - if (initsuperwidth - width) > 0: - emb_tiles_x = ceildiv(initsuperwidth - width, width - overlap_size_x) + 1 - if (initsuperheight - height) > 0: - emb_tiles_y = ceildiv(initsuperheight - height, height - overlap_size_y) + 1 - # Sanity - assert ( - emb_tiles_x > 1 or emb_tiles_y > 1 - ), f"ERROR: Based on the requested dimensions of {initsuperwidth}x{initsuperheight} and tiles of {width}x{height} you don't need to Embiggen! Check your arguments." - - # Prep alpha layers -------------- - # https://stackoverflow.com/questions/69321734/how-to-create-different-transparency-like-gradient-with-python-pil - # agradientL is Left-side transparent - agradientL = ( - Image.linear_gradient("L").rotate(90).resize((overlap_size_x, height)) - ) - # agradientT is Top-side transparent - agradientT = Image.linear_gradient("L").resize((width, overlap_size_y)) - # radial corner is the left-top corner, made full circle then cut to just the left-top quadrant - agradientC = Image.new("L", (256, 256)) - for y in range(256): - for x in range(256): - # Find distance to lower right corner (numpy takes arrays) - distanceToLR = np.sqrt([(255 - x) ** 2 + (255 - y) ** 2])[0] - # Clamp values to max 255 - if distanceToLR > 255: - distanceToLR = 255 - # Place the pixel as invert of distance - agradientC.putpixel((x, y), round(255 - distanceToLR)) - - # Create alternative asymmetric diagonal corner to use on "tailing" intersections to prevent hard edges - # Fits for a left-fading gradient on the bottom side and full opacity on the right side. - agradientAsymC = Image.new("L", (256, 256)) - for y in range(256): - for x in range(256): - value = round(max(0, x - (255 - y)) * (255 / max(1, y))) - # Clamp values - value = max(0, value) - value = min(255, value) - agradientAsymC.putpixel((x, y), value) - - # Create alpha layers default fully white - alphaLayerL = Image.new("L", (width, height), 255) - alphaLayerT = Image.new("L", (width, height), 255) - alphaLayerLTC = Image.new("L", (width, height), 255) - # Paste gradients into alpha layers - alphaLayerL.paste(agradientL, (0, 0)) - alphaLayerT.paste(agradientT, (0, 0)) - alphaLayerLTC.paste(agradientL, (0, 0)) - alphaLayerLTC.paste(agradientT, (0, 0)) - alphaLayerLTC.paste(agradientC.resize((overlap_size_x, overlap_size_y)), (0, 0)) - # make masks with an asymmetric upper-right corner so when the curved transparent corner of the next tile - # to its right is placed it doesn't reveal a hard trailing semi-transparent edge in the overlapping space - alphaLayerTaC = alphaLayerT.copy() - alphaLayerTaC.paste( - agradientAsymC.rotate(270).resize((overlap_size_x, overlap_size_y)), - (width - overlap_size_x, 0), - ) - alphaLayerLTaC = alphaLayerLTC.copy() - alphaLayerLTaC.paste( - agradientAsymC.rotate(270).resize((overlap_size_x, overlap_size_y)), - (width - overlap_size_x, 0), - ) - - if embiggen_tiles: - # Individual unconnected sides - alphaLayerR = Image.new("L", (width, height), 255) - alphaLayerR.paste(agradientL.rotate(180), (width - overlap_size_x, 0)) - alphaLayerB = Image.new("L", (width, height), 255) - alphaLayerB.paste(agradientT.rotate(180), (0, height - overlap_size_y)) - alphaLayerTB = Image.new("L", (width, height), 255) - alphaLayerTB.paste(agradientT, (0, 0)) - alphaLayerTB.paste(agradientT.rotate(180), (0, height - overlap_size_y)) - alphaLayerLR = Image.new("L", (width, height), 255) - alphaLayerLR.paste(agradientL, (0, 0)) - alphaLayerLR.paste(agradientL.rotate(180), (width - overlap_size_x, 0)) - - # Sides and corner Layers - alphaLayerRBC = Image.new("L", (width, height), 255) - alphaLayerRBC.paste(agradientL.rotate(180), (width - overlap_size_x, 0)) - alphaLayerRBC.paste(agradientT.rotate(180), (0, height - overlap_size_y)) - alphaLayerRBC.paste( - agradientC.rotate(180).resize((overlap_size_x, overlap_size_y)), - (width - overlap_size_x, height - overlap_size_y), - ) - alphaLayerLBC = Image.new("L", (width, height), 255) - alphaLayerLBC.paste(agradientL, (0, 0)) - alphaLayerLBC.paste(agradientT.rotate(180), (0, height - overlap_size_y)) - alphaLayerLBC.paste( - agradientC.rotate(90).resize((overlap_size_x, overlap_size_y)), - (0, height - overlap_size_y), - ) - alphaLayerRTC = Image.new("L", (width, height), 255) - alphaLayerRTC.paste(agradientL.rotate(180), (width - overlap_size_x, 0)) - alphaLayerRTC.paste(agradientT, (0, 0)) - alphaLayerRTC.paste( - agradientC.rotate(270).resize((overlap_size_x, overlap_size_y)), - (width - overlap_size_x, 0), - ) - - # All but X layers - alphaLayerABT = Image.new("L", (width, height), 255) - alphaLayerABT.paste(alphaLayerLBC, (0, 0)) - alphaLayerABT.paste(agradientL.rotate(180), (width - overlap_size_x, 0)) - alphaLayerABT.paste( - agradientC.rotate(180).resize((overlap_size_x, overlap_size_y)), - (width - overlap_size_x, height - overlap_size_y), - ) - alphaLayerABL = Image.new("L", (width, height), 255) - alphaLayerABL.paste(alphaLayerRTC, (0, 0)) - alphaLayerABL.paste(agradientT.rotate(180), (0, height - overlap_size_y)) - alphaLayerABL.paste( - agradientC.rotate(180).resize((overlap_size_x, overlap_size_y)), - (width - overlap_size_x, height - overlap_size_y), - ) - alphaLayerABR = Image.new("L", (width, height), 255) - alphaLayerABR.paste(alphaLayerLBC, (0, 0)) - alphaLayerABR.paste(agradientT, (0, 0)) - alphaLayerABR.paste( - agradientC.resize((overlap_size_x, overlap_size_y)), (0, 0) - ) - alphaLayerABB = Image.new("L", (width, height), 255) - alphaLayerABB.paste(alphaLayerRTC, (0, 0)) - alphaLayerABB.paste(agradientL, (0, 0)) - alphaLayerABB.paste( - agradientC.resize((overlap_size_x, overlap_size_y)), (0, 0) - ) - - # All-around layer - alphaLayerAA = Image.new("L", (width, height), 255) - alphaLayerAA.paste(alphaLayerABT, (0, 0)) - alphaLayerAA.paste(agradientT, (0, 0)) - alphaLayerAA.paste( - agradientC.resize((overlap_size_x, overlap_size_y)), (0, 0) - ) - alphaLayerAA.paste( - agradientC.rotate(270).resize((overlap_size_x, overlap_size_y)), - (width - overlap_size_x, 0), - ) - - # Clean up temporary gradients - del agradientL - del agradientT - del agradientC - - def make_image(): - # Make main tiles ------------------------------------------------- - if embiggen_tiles: - logger.info(f"Making {len(embiggen_tiles)} Embiggen tiles...") - else: - logger.info( - f"Making {(emb_tiles_x * emb_tiles_y)} Embiggen tiles ({emb_tiles_x}x{emb_tiles_y})..." - ) - - emb_tile_store = [] - # Although we could use the same seed for every tile for determinism, at higher strengths this may - # produce duplicated structures for each tile and make the tiling effect more obvious - # instead track and iterate a local seed we pass to Img2Img - seed = self.seed - seedintlimit = ( - np.iinfo(np.uint32).max - 1 - ) # only retreive this one from numpy - - for tile in range(emb_tiles_x * emb_tiles_y): - # Don't iterate on first tile - if tile != 0: - if seed < seedintlimit: - seed += 1 - else: - seed = 0 - - # Determine if this is a re-run and replace - if embiggen_tiles and not tile in embiggen_tiles: - continue - # Get row and column entries - emb_row_i = tile // emb_tiles_x - emb_column_i = tile % emb_tiles_x - # Determine bounds to cut up the init image - # Determine upper-left point - if emb_column_i + 1 == emb_tiles_x: - left = initsuperwidth - width - else: - left = round(emb_column_i * (width - overlap_size_x)) - if emb_row_i + 1 == emb_tiles_y: - top = initsuperheight - height - else: - top = round(emb_row_i * (height - overlap_size_y)) - right = left + width - bottom = top + height - - # Cropped image of above dimension (does not modify the original) - newinitimage = initsuperimage.crop((left, top, right, bottom)) - # DEBUG: - # newinitimagepath = init_img[0:-4] + f'_emb_Ti{tile}.png' - # newinitimage.save(newinitimagepath) - - if embiggen_tiles: - logger.debug( - f"Making tile #{tile + 1} ({embiggen_tiles.index(tile) + 1} of {len(embiggen_tiles)} requested)" - ) - else: - logger.debug(f"Starting {tile + 1} of {(emb_tiles_x * emb_tiles_y)} tiles") - - # create a torch tensor from an Image - newinitimage = np.array(newinitimage).astype(np.float32) / 255.0 - newinitimage = newinitimage[None].transpose(0, 3, 1, 2) - newinitimage = torch.from_numpy(newinitimage) - newinitimage = 2.0 * newinitimage - 1.0 - newinitimage = newinitimage.to(self.model.device) - clear_cuda_cache = ( - kwargs["clear_cuda_cache"] if "clear_cuda_cache" in kwargs else None - ) - - tile_results = gen_img2img.generate( - prompt, - iterations=1, - seed=seed, - sampler=sampler, - steps=steps, - cfg_scale=cfg_scale, - conditioning=conditioning, - ddim_eta=ddim_eta, - image_callback=None, # called only after the final image is generated - step_callback=step_callback, # called after each intermediate image is generated - width=width, - height=height, - init_image=newinitimage, # notice that init_image is different from init_img - mask_image=None, - strength=strength, - clear_cuda_cache=clear_cuda_cache, - ) - - emb_tile_store.append(tile_results[0][0]) - # DEBUG (but, also has other uses), worth saving if you want tiles without a transparency overlap to manually composite - # emb_tile_store[-1].save(init_img[0:-4] + f'_emb_To{tile}.png') - del newinitimage - - # Sanity check we have them all - if len(emb_tile_store) == (emb_tiles_x * emb_tiles_y) or ( - embiggen_tiles != [] and len(emb_tile_store) == len(embiggen_tiles) - ): - outputsuperimage = Image.new("RGBA", (initsuperwidth, initsuperheight)) - if embiggen_tiles: - outputsuperimage.alpha_composite( - initsuperimage.convert("RGBA"), (0, 0) - ) - for tile in range(emb_tiles_x * emb_tiles_y): - if embiggen_tiles: - if tile in embiggen_tiles: - intileimage = emb_tile_store.pop(0) - else: - continue - else: - intileimage = emb_tile_store[tile] - intileimage = intileimage.convert("RGBA") - # Get row and column entries - emb_row_i = tile // emb_tiles_x - emb_column_i = tile % emb_tiles_x - if emb_row_i == 0 and emb_column_i == 0 and not embiggen_tiles: - left = 0 - top = 0 - else: - # Determine upper-left point - if emb_column_i + 1 == emb_tiles_x: - left = initsuperwidth - width - else: - left = round(emb_column_i * (width - overlap_size_x)) - if emb_row_i + 1 == emb_tiles_y: - top = initsuperheight - height - else: - top = round(emb_row_i * (height - overlap_size_y)) - # Handle gradients for various conditions - # Handle emb_rerun case - if embiggen_tiles: - # top of image - if emb_row_i == 0: - if emb_column_i == 0: - if (tile + 1) in embiggen_tiles: # Look-ahead right - if ( - tile + emb_tiles_x - ) not in embiggen_tiles: # Look-ahead down - intileimage.putalpha(alphaLayerB) - # Otherwise do nothing on this tile - elif ( - tile + emb_tiles_x - ) in embiggen_tiles: # Look-ahead down only - intileimage.putalpha(alphaLayerR) - else: - intileimage.putalpha(alphaLayerRBC) - elif emb_column_i == emb_tiles_x - 1: - if ( - tile + emb_tiles_x - ) in embiggen_tiles: # Look-ahead down - intileimage.putalpha(alphaLayerL) - else: - intileimage.putalpha(alphaLayerLBC) - else: - if (tile + 1) in embiggen_tiles: # Look-ahead right - if ( - tile + emb_tiles_x - ) in embiggen_tiles: # Look-ahead down - intileimage.putalpha(alphaLayerL) - else: - intileimage.putalpha(alphaLayerLBC) - elif ( - tile + emb_tiles_x - ) in embiggen_tiles: # Look-ahead down only - intileimage.putalpha(alphaLayerLR) - else: - intileimage.putalpha(alphaLayerABT) - # bottom of image - elif emb_row_i == emb_tiles_y - 1: - if emb_column_i == 0: - if (tile + 1) in embiggen_tiles: # Look-ahead right - intileimage.putalpha(alphaLayerTaC) - else: - intileimage.putalpha(alphaLayerRTC) - elif emb_column_i == emb_tiles_x - 1: - # No tiles to look ahead to - intileimage.putalpha(alphaLayerLTC) - else: - if (tile + 1) in embiggen_tiles: # Look-ahead right - intileimage.putalpha(alphaLayerLTaC) - else: - intileimage.putalpha(alphaLayerABB) - # vertical middle of image - else: - if emb_column_i == 0: - if (tile + 1) in embiggen_tiles: # Look-ahead right - if ( - tile + emb_tiles_x - ) in embiggen_tiles: # Look-ahead down - intileimage.putalpha(alphaLayerTaC) - else: - intileimage.putalpha(alphaLayerTB) - elif ( - tile + emb_tiles_x - ) in embiggen_tiles: # Look-ahead down only - intileimage.putalpha(alphaLayerRTC) - else: - intileimage.putalpha(alphaLayerABL) - elif emb_column_i == emb_tiles_x - 1: - if ( - tile + emb_tiles_x - ) in embiggen_tiles: # Look-ahead down - intileimage.putalpha(alphaLayerLTC) - else: - intileimage.putalpha(alphaLayerABR) - else: - if (tile + 1) in embiggen_tiles: # Look-ahead right - if ( - tile + emb_tiles_x - ) in embiggen_tiles: # Look-ahead down - intileimage.putalpha(alphaLayerLTaC) - else: - intileimage.putalpha(alphaLayerABR) - elif ( - tile + emb_tiles_x - ) in embiggen_tiles: # Look-ahead down only - intileimage.putalpha(alphaLayerABB) - else: - intileimage.putalpha(alphaLayerAA) - # Handle normal tiling case (much simpler - since we tile left to right, top to bottom) - else: - if emb_row_i == 0 and emb_column_i >= 1: - intileimage.putalpha(alphaLayerL) - elif emb_row_i >= 1 and emb_column_i == 0: - if ( - emb_column_i + 1 == emb_tiles_x - ): # If we don't have anything that can be placed to the right - intileimage.putalpha(alphaLayerT) - else: - intileimage.putalpha(alphaLayerTaC) - else: - if ( - emb_column_i + 1 == emb_tiles_x - ): # If we don't have anything that can be placed to the right - intileimage.putalpha(alphaLayerLTC) - else: - intileimage.putalpha(alphaLayerLTaC) - # Layer tile onto final image - outputsuperimage.alpha_composite(intileimage, (left, top)) - else: - logger.error( - "Could not find all Embiggen output tiles in memory? Something must have gone wrong with img2img generation." - ) - - # after internal loops and patching up return Embiggen image - return outputsuperimage - - # end of function declaration - return make_image diff --git a/invokeai/backend/generator/img2img.py b/invokeai/backend/generator/img2img.py index 2c62bec4d6b..1cfbeb66c0f 100644 --- a/invokeai/backend/generator/img2img.py +++ b/invokeai/backend/generator/img2img.py @@ -22,7 +22,6 @@ def __init__(self, model, precision): def get_make_image( self, - prompt, sampler, steps, cfg_scale, diff --git a/invokeai/backend/generator/inpaint.py b/invokeai/backend/generator/inpaint.py index a7fec83eb7d..eaf40471095 100644 --- a/invokeai/backend/generator/inpaint.py +++ b/invokeai/backend/generator/inpaint.py @@ -161,9 +161,7 @@ def seam_paint( im: Image.Image, seam_size: int, seam_blur: int, - prompt, seed, - sampler, steps, cfg_scale, ddim_eta, @@ -177,8 +175,6 @@ def seam_paint( mask = self.mask_edge(hard_mask, seam_size, seam_blur) make_image = self.get_make_image( - prompt, - sampler, steps, cfg_scale, ddim_eta, @@ -203,8 +199,6 @@ def seam_paint( @torch.no_grad() def get_make_image( self, - prompt, - sampler, steps, cfg_scale, ddim_eta, @@ -306,7 +300,6 @@ def get_make_image( # noinspection PyTypeChecker pipeline: StableDiffusionGeneratorPipeline = self.model - pipeline.scheduler = sampler # todo: support cross-attention control uc, c, _ = conditioning @@ -345,9 +338,7 @@ def make_image(x_T: torch.Tensor, seed: int): result, seam_size, seam_blur, - prompt, seed, - sampler, seam_steps, cfg_scale, ddim_eta, @@ -360,8 +351,6 @@ def make_image(x_T: torch.Tensor, seed: int): # Restore original settings self.get_make_image( - prompt, - sampler, steps, cfg_scale, ddim_eta, diff --git a/invokeai/backend/generator/txt2img.py b/invokeai/backend/generator/txt2img.py deleted file mode 100644 index 9ea19bd896c..00000000000 --- a/invokeai/backend/generator/txt2img.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -invokeai.backend.generator.txt2img inherits from invokeai.backend.generator -""" -import PIL.Image -import torch - -from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from diffusers.models.controlnet import ControlNetModel, ControlNetOutput -from diffusers.pipelines.controlnet import MultiControlNetModel - -from ..stable_diffusion import ( - ConditioningData, - PostprocessingSettings, - StableDiffusionGeneratorPipeline, -) -from .base import Generator - - -class Txt2Img(Generator): - def __init__(self, model, precision, - control_model: Optional[Union[ControlNetModel, List[ControlNetModel]]] = None, - **kwargs): - self.control_model = control_model - if isinstance(self.control_model, list): - self.control_model = MultiControlNetModel(self.control_model) - super().__init__(model, precision, **kwargs) - - @torch.no_grad() - def get_make_image( - self, - prompt, - sampler, - steps, - cfg_scale, - ddim_eta, - conditioning, - width, - height, - step_callback=None, - threshold=0.0, - warmup=0.2, - perlin=0.0, - h_symmetry_time_pct=None, - v_symmetry_time_pct=None, - attention_maps_callback=None, - **kwargs, - ): - """ - Returns a function returning an image derived from the prompt and the initial image - Return value depends on the seed at the time you call it - kwargs are 'width' and 'height' - """ - self.perlin = perlin - control_image = kwargs.get("control_image", None) - do_classifier_free_guidance = cfg_scale > 1.0 - - # noinspection PyTypeChecker - pipeline: StableDiffusionGeneratorPipeline = self.model - pipeline.control_model = self.control_model - pipeline.scheduler = sampler - - uc, c, extra_conditioning_info = conditioning - conditioning_data = ConditioningData( - uc, - c, - cfg_scale, - extra_conditioning_info, - postprocessing_settings=PostprocessingSettings( - threshold=threshold, - warmup=warmup, - h_symmetry_time_pct=h_symmetry_time_pct, - v_symmetry_time_pct=v_symmetry_time_pct, - ), - ).add_scheduler_args_if_applicable(pipeline.scheduler, eta=ddim_eta) - - # FIXME: still need to test with different widths, heights, devices, dtypes - # and add in batch_size, num_images_per_prompt? - if control_image is not None: - if isinstance(self.control_model, ControlNetModel): - control_image = pipeline.prepare_control_image( - image=control_image, - do_classifier_free_guidance=do_classifier_free_guidance, - width=width, - height=height, - # batch_size=batch_size * num_images_per_prompt, - # num_images_per_prompt=num_images_per_prompt, - device=self.control_model.device, - dtype=self.control_model.dtype, - ) - elif isinstance(self.control_model, MultiControlNetModel): - images = [] - for image_ in control_image: - image_ = pipeline.prepare_control_image( - image=image_, - do_classifier_free_guidance=do_classifier_free_guidance, - width=width, - height=height, - # batch_size=batch_size * num_images_per_prompt, - # num_images_per_prompt=num_images_per_prompt, - device=self.control_model.device, - dtype=self.control_model.dtype, - ) - images.append(image_) - control_image = images - kwargs["control_image"] = control_image - - def make_image(x_T: torch.Tensor, _: int) -> PIL.Image.Image: - pipeline_output = pipeline.image_from_embeddings( - latents=torch.zeros_like(x_T, dtype=self.torch_dtype()), - noise=x_T, - num_inference_steps=steps, - conditioning_data=conditioning_data, - callback=step_callback, - **kwargs, - ) - - if ( - pipeline_output.attention_map_saver is not None - and attention_maps_callback is not None - ): - attention_maps_callback(pipeline_output.attention_map_saver) - - return pipeline.numpy_to_pil(pipeline_output.images)[0] - - return make_image diff --git a/invokeai/backend/generator/txt2img2img.py b/invokeai/backend/generator/txt2img2img.py deleted file mode 100644 index 1257a44fb15..00000000000 --- a/invokeai/backend/generator/txt2img2img.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -invokeai.backend.generator.txt2img inherits from invokeai.backend.generator -""" - -import math -from typing import Callable, Optional - -import torch -from diffusers.utils.logging import get_verbosity, set_verbosity, set_verbosity_error - -from ..stable_diffusion import PostprocessingSettings -from .base import Generator -from ..stable_diffusion.diffusers_pipeline import StableDiffusionGeneratorPipeline -from ..stable_diffusion.diffusers_pipeline import ConditioningData -from ..stable_diffusion.diffusers_pipeline import trim_to_multiple_of - -import invokeai.backend.util.logging as logger - -class Txt2Img2Img(Generator): - def __init__(self, model, precision): - super().__init__(model, precision) - self.init_latent = None # for get_noise() - - def get_make_image( - self, - prompt: str, - sampler, - steps: int, - cfg_scale: float, - ddim_eta, - conditioning, - width: int, - height: int, - strength: float, - step_callback: Optional[Callable] = None, - threshold=0.0, - warmup=0.2, - perlin=0.0, - h_symmetry_time_pct=None, - v_symmetry_time_pct=None, - attention_maps_callback=None, - **kwargs, - ): - """ - Returns a function returning an image derived from the prompt and the initial image - Return value depends on the seed at the time you call it - kwargs are 'width' and 'height' - """ - self.perlin = perlin - - # noinspection PyTypeChecker - pipeline: StableDiffusionGeneratorPipeline = self.model - pipeline.scheduler = sampler - - uc, c, extra_conditioning_info = conditioning - conditioning_data = ConditioningData( - uc, - c, - cfg_scale, - extra_conditioning_info, - postprocessing_settings=PostprocessingSettings( - threshold=threshold, - warmup=0.2, - h_symmetry_time_pct=h_symmetry_time_pct, - v_symmetry_time_pct=v_symmetry_time_pct, - ), - ).add_scheduler_args_if_applicable(pipeline.scheduler, eta=ddim_eta) - - def make_image(x_T: torch.Tensor, _: int): - first_pass_latent_output, _ = pipeline.latents_from_embeddings( - latents=torch.zeros_like(x_T), - num_inference_steps=steps, - conditioning_data=conditioning_data, - noise=x_T, - callback=step_callback, - ) - - # Get our initial generation width and height directly from the latent output so - # the message below is accurate. - init_width = first_pass_latent_output.size()[3] * self.downsampling_factor - init_height = first_pass_latent_output.size()[2] * self.downsampling_factor - logger.info( - f"Interpolating from {init_width}x{init_height} to {width}x{height} using DDIM sampling" - ) - - # resizing - resized_latents = torch.nn.functional.interpolate( - first_pass_latent_output, - size=( - height // self.downsampling_factor, - width // self.downsampling_factor, - ), - mode="bilinear", - ) - - # Free up memory from the last generation. - clear_cuda_cache = kwargs["clear_cuda_cache"] or None - if clear_cuda_cache is not None: - clear_cuda_cache() - - second_pass_noise = self.get_noise_like( - resized_latents, override_perlin=True - ) - - # Clear symmetry for the second pass - from dataclasses import replace - - new_postprocessing_settings = replace( - conditioning_data.postprocessing_settings, h_symmetry_time_pct=None - ) - new_postprocessing_settings = replace( - new_postprocessing_settings, v_symmetry_time_pct=None - ) - new_conditioning_data = replace( - conditioning_data, postprocessing_settings=new_postprocessing_settings - ) - - verbosity = get_verbosity() - set_verbosity_error() - pipeline_output = pipeline.img2img_from_latents_and_embeddings( - resized_latents, - num_inference_steps=steps, - conditioning_data=new_conditioning_data, - strength=strength, - noise=second_pass_noise, - callback=step_callback, - ) - set_verbosity(verbosity) - - if ( - pipeline_output.attention_map_saver is not None - and attention_maps_callback is not None - ): - attention_maps_callback(pipeline_output.attention_map_saver) - - return pipeline.numpy_to_pil(pipeline_output.images)[0] - - # FIXME: do we really need something entirely different for the inpainting model? - - # in the case of the inpainting model being loaded, the trick of - # providing an interpolated latent doesn't work, so we transiently - # create a 512x512 PIL image, upscale it, and run the inpainting - # over it in img2img mode. Because the inpaing model is so conservative - # it doesn't change the image (much) - - return make_image - - def get_noise_like(self, like: torch.Tensor, override_perlin: bool = False): - device = like.device - if device.type == "mps": - x = torch.randn_like(like, device="cpu", dtype=self.torch_dtype()).to( - device - ) - else: - x = torch.randn_like(like, device=device, dtype=self.torch_dtype()) - if self.perlin > 0.0 and override_perlin == False: - shape = like.shape - x = (1 - self.perlin) * x + self.perlin * self.get_perlin_noise( - shape[3], shape[2] - ) - return x - - # returns a tensor filled with random numbers from a normal distribution - def get_noise(self, width, height, scale=True): - # print(f"Get noise: {width}x{height}") - if scale: - # Scale the input width and height for the initial generation - # Make their area equivalent to the model's resolution area (e.g. 512*512 = 262144), - # while keeping the minimum dimension at least 0.5 * resolution (e.g. 512*0.5 = 256) - - aspect = width / height - dimension = self.model.unet.config.sample_size * self.model.vae_scale_factor - min_dimension = math.floor(dimension * 0.5) - model_area = ( - dimension * dimension - ) # hardcoded for now since all models are trained on square images - - if aspect > 1.0: - init_height = max(min_dimension, math.sqrt(model_area / aspect)) - init_width = init_height * aspect - else: - init_width = max(min_dimension, math.sqrt(model_area * aspect)) - init_height = init_width / aspect - - scaled_width, scaled_height = trim_to_multiple_of( - math.floor(init_width), math.floor(init_height) - ) - - else: - scaled_width = width - scaled_height = height - - device = self.model.device - channels = self.latent_channels - if channels == 9: - channels = 4 # we don't really want noise for all the mask channels - shape = ( - 1, - channels, - scaled_height // self.downsampling_factor, - scaled_width // self.downsampling_factor, - ) - if self.use_mps_noise or device.type == "mps": - tensor = torch.empty(size=shape, device="cpu") - tensor = self.get_noise_like(like=tensor).to(device) - else: - tensor = torch.empty(size=shape, device=device) - tensor = self.get_noise_like(like=tensor) - return tensor diff --git a/invokeai/backend/model_management/lora.py b/invokeai/backend/model_management/lora.py index 46638878aaf..c351a76590f 100644 --- a/invokeai/backend/model_management/lora.py +++ b/invokeai/backend/model_management/lora.py @@ -556,8 +556,8 @@ def apply_ti( new_tokens_added = None try: - ti_manager = TextualInversionManager() ti_tokenizer = copy.deepcopy(tokenizer) + ti_manager = TextualInversionManager(ti_tokenizer) init_tokens_count = text_encoder.resize_token_embeddings(None).num_embeddings def _get_trigger(ti, index): @@ -650,22 +650,24 @@ def from_checkpoint( class TextualInversionManager(BaseTextualInversionManager): pad_tokens: Dict[int, List[int]] + tokenizer: CLIPTokenizer - def __init__(self): + def __init__(self, tokenizer: CLIPTokenizer): self.pad_tokens = dict() + self.tokenizer = tokenizer def expand_textual_inversion_token_ids_if_necessary( self, token_ids: list[int] ) -> list[int]: - #if token_ids[0] == self.tokenizer.bos_token_id: - # raise ValueError("token_ids must not start with bos_token_id") - #if token_ids[-1] == self.tokenizer.eos_token_id: - # raise ValueError("token_ids must not end with eos_token_id") - if len(self.pad_tokens) == 0: return token_ids + if token_ids[0] == self.tokenizer.bos_token_id: + raise ValueError("token_ids must not start with bos_token_id") + if token_ids[-1] == self.tokenizer.eos_token_id: + raise ValueError("token_ids must not end with eos_token_id") + new_token_ids = [] for token_id in token_ids: new_token_ids.append(token_id) diff --git a/invokeai/backend/model_management/models/textual_inversion.py b/invokeai/backend/model_management/models/textual_inversion.py index e8c96ff31e0..66847f53ebd 100644 --- a/invokeai/backend/model_management/models/textual_inversion.py +++ b/invokeai/backend/model_management/models/textual_inversion.py @@ -1,3 +1,4 @@ +import os import torch from typing import Optional from .base import ( diff --git a/invokeai/backend/prompting/__init__.py b/invokeai/backend/prompting/__init__.py deleted file mode 100644 index b52206dd944..00000000000 --- a/invokeai/backend/prompting/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -""" -Initialization file for invokeai.backend.prompting -""" -from .conditioning import ( - get_prompt_structure, - get_tokens_for_prompt_object, - get_uc_and_c_and_ec, - split_weighted_subprompts, -) diff --git a/invokeai/backend/prompting/conditioning.py b/invokeai/backend/prompting/conditioning.py deleted file mode 100644 index d0703427941..00000000000 --- a/invokeai/backend/prompting/conditioning.py +++ /dev/null @@ -1,297 +0,0 @@ -""" -This module handles the generation of the conditioning tensors. - -Useful function exports: - -get_uc_and_c_and_ec() get the conditioned and unconditioned latent, and edited conditioning if we're doing cross-attention control - -""" -import re -import torch -from typing import Optional, Union - -from compel import Compel -from compel.prompt_parser import ( - Blend, - CrossAttentionControlSubstitute, - FlattenedPrompt, - Fragment, - PromptParser, - Conjunction, -) - -import invokeai.backend.util.logging as logger - -from invokeai.app.services.config import InvokeAIAppConfig -from ..stable_diffusion import InvokeAIDiffuserComponent -from ..util import torch_dtype - -config = InvokeAIAppConfig.get_config() - -def get_uc_and_c_and_ec(prompt_string, - model: InvokeAIDiffuserComponent, - log_tokens=False, skip_normalize_legacy_blend=False): - # lazy-load any deferred textual inversions. - # this might take a couple of seconds the first time a textual inversion is used. - model.textual_inversion_manager.create_deferred_token_ids_for_any_trigger_terms(prompt_string) - - compel = Compel(tokenizer=model.tokenizer, - text_encoder=model.text_encoder, - textual_inversion_manager=model.textual_inversion_manager, - dtype_for_device_getter=torch_dtype, - truncate_long_prompts=False, - ) - - # get rid of any newline characters - prompt_string = prompt_string.replace("\n", " ") - positive_prompt_string, negative_prompt_string = split_prompt_to_positive_and_negative(prompt_string) - - legacy_blend = try_parse_legacy_blend(positive_prompt_string, skip_normalize_legacy_blend) - positive_conjunction: Conjunction - if legacy_blend is not None: - positive_conjunction = legacy_blend - else: - positive_conjunction = Compel.parse_prompt_string(positive_prompt_string) - positive_prompt = positive_conjunction.prompts[0] - - negative_conjunction = Compel.parse_prompt_string(negative_prompt_string) - negative_prompt: FlattenedPrompt | Blend = negative_conjunction.prompts[0] - - tokens_count = get_max_token_count(model.tokenizer, positive_prompt) - if log_tokens or config.log_tokenization: - log_tokenization(positive_prompt, negative_prompt, tokenizer=model.tokenizer) - - c, options = compel.build_conditioning_tensor_for_prompt_object(positive_prompt) - uc, _ = compel.build_conditioning_tensor_for_prompt_object(negative_prompt) - [c, uc] = compel.pad_conditioning_tensors_to_same_length([c, uc]) - - ec = InvokeAIDiffuserComponent.ExtraConditioningInfo(tokens_count_including_eos_bos=tokens_count, - cross_attention_control_args=options.get( - 'cross_attention_control', None)) - return uc, c, ec - -def get_prompt_structure( - prompt_string, skip_normalize_legacy_blend: bool = False -) -> (Union[FlattenedPrompt, Blend], FlattenedPrompt): - ( - positive_prompt_string, - negative_prompt_string, - ) = split_prompt_to_positive_and_negative(prompt_string) - legacy_blend = try_parse_legacy_blend( - positive_prompt_string, skip_normalize_legacy_blend - ) - positive_prompt: Conjunction - if legacy_blend is not None: - positive_conjunction = legacy_blend - else: - positive_conjunction = Compel.parse_prompt_string(positive_prompt_string) - positive_prompt = positive_conjunction.prompts[0] - negative_conjunction = Compel.parse_prompt_string(negative_prompt_string) - negative_prompt: FlattenedPrompt|Blend = negative_conjunction.prompts[0] - - return positive_prompt, negative_prompt - -def get_max_token_count( - tokenizer, prompt: Union[FlattenedPrompt, Blend], truncate_if_too_long=False -) -> int: - if type(prompt) is Blend: - blend: Blend = prompt - return max( - [ - get_max_token_count(tokenizer, c, truncate_if_too_long) - for c in blend.prompts - ] - ) - else: - return len( - get_tokens_for_prompt_object(tokenizer, prompt, truncate_if_too_long) - ) - - -def get_tokens_for_prompt_object( - tokenizer, parsed_prompt: FlattenedPrompt, truncate_if_too_long=True -) -> [str]: - if type(parsed_prompt) is Blend: - raise ValueError( - "Blend is not supported here - you need to get tokens for each of its .children" - ) - - text_fragments = [ - x.text - if type(x) is Fragment - else ( - " ".join([f.text for f in x.original]) - if type(x) is CrossAttentionControlSubstitute - else str(x) - ) - for x in parsed_prompt.children - ] - text = " ".join(text_fragments) - tokens = tokenizer.tokenize(text) - if truncate_if_too_long: - max_tokens_length = tokenizer.model_max_length - 2 # typically 75 - tokens = tokens[0:max_tokens_length] - return tokens - - -def split_prompt_to_positive_and_negative(prompt_string_uncleaned: str): - unconditioned_words = "" - unconditional_regex = r"\[(.*?)\]" - unconditionals = re.findall(unconditional_regex, prompt_string_uncleaned) - if len(unconditionals) > 0: - unconditioned_words = " ".join(unconditionals) - - # Remove Unconditioned Words From Prompt - unconditional_regex_compile = re.compile(unconditional_regex) - clean_prompt = unconditional_regex_compile.sub(" ", prompt_string_uncleaned) - prompt_string_cleaned = re.sub(" +", " ", clean_prompt) - else: - prompt_string_cleaned = prompt_string_uncleaned - return prompt_string_cleaned, unconditioned_words - - -def log_tokenization( - positive_prompt: Union[Blend, FlattenedPrompt], - negative_prompt: Union[Blend, FlattenedPrompt], - tokenizer, -): - logger.info(f"[TOKENLOG] Parsed Prompt: {positive_prompt}") - logger.info(f"[TOKENLOG] Parsed Negative Prompt: {negative_prompt}") - - log_tokenization_for_prompt_object(positive_prompt, tokenizer) - log_tokenization_for_prompt_object( - negative_prompt, tokenizer, display_label_prefix="(negative prompt)" - ) - - -def log_tokenization_for_prompt_object( - p: Union[Blend, FlattenedPrompt], tokenizer, display_label_prefix=None -): - display_label_prefix = display_label_prefix or "" - if type(p) is Blend: - blend: Blend = p - for i, c in enumerate(blend.prompts): - log_tokenization_for_prompt_object( - c, - tokenizer, - display_label_prefix=f"{display_label_prefix}(blend part {i + 1}, weight={blend.weights[i]})", - ) - elif type(p) is FlattenedPrompt: - flattened_prompt: FlattenedPrompt = p - if flattened_prompt.wants_cross_attention_control: - original_fragments = [] - edited_fragments = [] - for f in flattened_prompt.children: - if type(f) is CrossAttentionControlSubstitute: - original_fragments += f.original - edited_fragments += f.edited - else: - original_fragments.append(f) - edited_fragments.append(f) - - original_text = " ".join([x.text for x in original_fragments]) - log_tokenization_for_text( - original_text, - tokenizer, - display_label=f"{display_label_prefix}(.swap originals)", - ) - edited_text = " ".join([x.text for x in edited_fragments]) - log_tokenization_for_text( - edited_text, - tokenizer, - display_label=f"{display_label_prefix}(.swap replacements)", - ) - else: - text = " ".join([x.text for x in flattened_prompt.children]) - log_tokenization_for_text( - text, tokenizer, display_label=display_label_prefix - ) - - -def log_tokenization_for_text(text, tokenizer, display_label=None, truncate_if_too_long=False): - """shows how the prompt is tokenized - # usually tokens have '' to indicate end-of-word, - # but for readability it has been replaced with ' ' - """ - tokens = tokenizer.tokenize(text) - tokenized = "" - discarded = "" - usedTokens = 0 - totalTokens = len(tokens) - - for i in range(0, totalTokens): - token = tokens[i].replace("", " ") - # alternate color - s = (usedTokens % 6) + 1 - if truncate_if_too_long and i >= tokenizer.model_max_length: - discarded = discarded + f"\x1b[0;3{s};40m{token}" - else: - tokenized = tokenized + f"\x1b[0;3{s};40m{token}" - usedTokens += 1 - - if usedTokens > 0: - logger.info(f'[TOKENLOG] Tokens {display_label or ""} ({usedTokens}):') - logger.debug(f"{tokenized}\x1b[0m") - - if discarded != "": - logger.info(f"[TOKENLOG] Tokens Discarded ({totalTokens - usedTokens}):") - logger.debug(f"{discarded}\x1b[0m") - -def try_parse_legacy_blend(text: str, skip_normalize: bool = False) -> Optional[Conjunction]: - weighted_subprompts = split_weighted_subprompts(text, skip_normalize=skip_normalize) - if len(weighted_subprompts) <= 1: - return None - strings = [x[0] for x in weighted_subprompts] - - pp = PromptParser() - parsed_conjunctions = [pp.parse_conjunction(x) for x in strings] - flattened_prompts = [] - weights = [] - for i, x in enumerate(parsed_conjunctions): - if len(x.prompts)>0: - flattened_prompts.append(x.prompts[0]) - weights.append(weighted_subprompts[i][1]) - return Conjunction([Blend(prompts=flattened_prompts, weights=weights, normalize_weights=not skip_normalize)]) - -def split_weighted_subprompts(text, skip_normalize=False) -> list: - """ - Legacy blend parsing. - - grabs all text up to the first occurrence of ':' - uses the grabbed text as a sub-prompt, and takes the value following ':' as weight - if ':' has no value defined, defaults to 1.0 - repeats until no text remaining - """ - prompt_parser = re.compile( - """ - (?P # capture group for 'prompt' - (?:\\\:|[^:])+ # match one or more non ':' characters or escaped colons '\:' - ) # end 'prompt' - (?: # non-capture group - :+ # match one or more ':' characters - (?P # capture group for 'weight' - -?\d+(?:\.\d+)? # match positive or negative integer or decimal number - )? # end weight capture group, make optional - \s* # strip spaces after weight - | # OR - $ # else, if no ':' then match end of line - ) # end non-capture group - """, - re.VERBOSE, - ) - parsed_prompts = [ - (match.group("prompt").replace("\\:", ":"), float(match.group("weight") or 1)) - for match in re.finditer(prompt_parser, text) - ] - if len(parsed_prompts) == 0: - return [] - if skip_normalize: - return parsed_prompts - weight_sum = sum(map(lambda x: x[1], parsed_prompts)) - if weight_sum == 0: - logger.warning( - "Subprompt weights add up to zero. Discarding and using even weights instead." - ) - equal_weight = 1 / max(len(parsed_prompts), 1) - return [(x[0], equal_weight) for x in parsed_prompts] - return [(x[0], x[1] / weight_sum) for x in parsed_prompts] diff --git a/invokeai/backend/stable_diffusion/__init__.py b/invokeai/backend/stable_diffusion/__init__.py index 55333d35898..ff47bd5f653 100644 --- a/invokeai/backend/stable_diffusion/__init__.py +++ b/invokeai/backend/stable_diffusion/__init__.py @@ -10,4 +10,3 @@ from .diffusion import InvokeAIDiffuserComponent from .diffusion.cross_attention_map_saving import AttentionMapSaver from .diffusion.shared_invokeai_diffusion import PostprocessingSettings -from .textual_inversion_manager import TextualInversionManager diff --git a/invokeai/backend/stable_diffusion/diffusers_pipeline.py b/invokeai/backend/stable_diffusion/diffusers_pipeline.py index 2922238af91..798f398ed63 100644 --- a/invokeai/backend/stable_diffusion/diffusers_pipeline.py +++ b/invokeai/backend/stable_diffusion/diffusers_pipeline.py @@ -16,7 +16,6 @@ import psutil import torch import torchvision.transforms as T -from compel import EmbeddingsProvider from diffusers.models import AutoencoderKL, UNet2DConditionModel from diffusers.models.controlnet import ControlNetModel, ControlNetOutput from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput @@ -48,7 +47,6 @@ PostprocessingSettings, ) from .offloading import FullyLoadedModelGroup, LazilyLoadedModelGroup, ModelGroup -from .textual_inversion_manager import TextualInversionManager @dataclass class PipelineIntermediateState: @@ -344,18 +342,6 @@ def __init__( self.invokeai_diffuser = InvokeAIDiffuserComponent( self.unet, self._unet_forward, is_running_diffusers=True ) - use_full_precision = precision == "float32" or precision == "autocast" - self.textual_inversion_manager = TextualInversionManager( - tokenizer=self.tokenizer, - text_encoder=self.text_encoder, - full_precision=use_full_precision, - ) - # InvokeAI's interface for text embeddings and whatnot - self.embeddings_provider = EmbeddingsProvider( - tokenizer=self.tokenizer, - text_encoder=self.text_encoder, - textual_inversion_manager=self.textual_inversion_manager, - ) self._model_group = FullyLoadedModelGroup(execution_device or self.unet.device) self._model_group.install(*self._submodels) @@ -405,50 +391,6 @@ def _adjust_memory_efficient_attention(self, latents: torch.Tensor): else: self.disable_attention_slicing() - def enable_offload_submodels(self, device: torch.device): - """ - Offload each submodel when it's not in use. - - Useful for low-vRAM situations where the size of the model in memory is a big chunk of - the total available resource, and you want to free up as much for inference as possible. - - This requires more moving parts and may add some delay as the U-Net is swapped out for the - VAE and vice-versa. - """ - models = self._submodels - if self._model_group is not None: - self._model_group.uninstall(*models) - group = LazilyLoadedModelGroup(device) - group.install(*models) - self._model_group = group - - def disable_offload_submodels(self): - """ - Leave all submodels loaded. - - Appropriate for cases where the size of the model in memory is small compared to the memory - required for inference. Avoids the delay and complexity of shuffling the submodels to and - from the GPU. - """ - models = self._submodels - if self._model_group is not None: - self._model_group.uninstall(*models) - group = FullyLoadedModelGroup(self._model_group.execution_device) - group.install(*models) - self._model_group = group - - def offload_all(self): - """Offload all this pipeline's models to CPU.""" - self._model_group.offload_current() - - def ready(self): - """ - Ready this pipeline's models. - - i.e. preload them to the GPU if appropriate. - """ - self._model_group.ready() - def to(self, torch_device: Optional[Union[str, torch.device]] = None, silence_dtype_warnings=False): # overridden method; types match the superclass. if torch_device is None: @@ -992,25 +934,6 @@ def run_safety_checker(self, image, device=None, dtype=None): device = self._model_group.device_for(self.safety_checker) return super().run_safety_checker(image, device, dtype) - @torch.inference_mode() - def get_learned_conditioning( - self, c: List[List[str]], *, return_tokens=True, fragment_weights=None - ): - """ - Compatibility function for invokeai.models.diffusion.ddpm.LatentDiffusion. - """ - return self.embeddings_provider.get_embeddings_for_weighted_prompt_fragments( - text_batch=c, - fragment_weights_batch=fragment_weights, - should_return_tokens=return_tokens, - device=self._model_group.device_for(self.unet), - ) - - @property - def channels(self) -> int: - """Compatible with DiffusionWrapper""" - return self.unet.config.in_channels - def decode_latents(self, latents): # Explicit call to get the vae loaded, since `decode` isn't the forward method. self._model_group.load(self.vae) diff --git a/invokeai/backend/stable_diffusion/textual_inversion_manager.py b/invokeai/backend/stable_diffusion/textual_inversion_manager.py deleted file mode 100644 index 9476c12dc51..00000000000 --- a/invokeai/backend/stable_diffusion/textual_inversion_manager.py +++ /dev/null @@ -1,429 +0,0 @@ -import traceback -from dataclasses import dataclass -from pathlib import Path -from typing import Optional, Union, List - -import safetensors.torch -import torch - -from compel.embeddings_provider import BaseTextualInversionManager -from picklescan.scanner import scan_file_path -from transformers import CLIPTextModel, CLIPTokenizer - -import invokeai.backend.util.logging as logger -from .concepts_lib import HuggingFaceConceptsLibrary - -@dataclass -class EmbeddingInfo: - name: str - embedding: torch.Tensor - num_vectors_per_token: int - token_dim: int - trained_steps: int = None - trained_model_name: str = None - trained_model_checksum: str = None - -@dataclass -class TextualInversion: - trigger_string: str - embedding: torch.Tensor - trigger_token_id: Optional[int] = None - pad_token_ids: Optional[list[int]] = None - - @property - def embedding_vector_length(self) -> int: - return self.embedding.shape[0] - - -class TextualInversionManager(BaseTextualInversionManager): - def __init__( - self, - tokenizer: CLIPTokenizer, - text_encoder: CLIPTextModel, - full_precision: bool = True, - ): - self.tokenizer = tokenizer - self.text_encoder = text_encoder - self.full_precision = full_precision - self.hf_concepts_library = HuggingFaceConceptsLibrary() - self.trigger_to_sourcefile = dict() - default_textual_inversions: list[TextualInversion] = [] - self.textual_inversions = default_textual_inversions - - def load_huggingface_concepts(self, concepts: list[str]): - for concept_name in concepts: - if concept_name in self.hf_concepts_library.concepts_loaded: - continue - trigger = self.hf_concepts_library.concept_to_trigger(concept_name) - if ( - self.has_textual_inversion_for_trigger_string(trigger) - or self.has_textual_inversion_for_trigger_string(concept_name) - or self.has_textual_inversion_for_trigger_string(f"<{concept_name}>") - ): # in case a token with literal angle brackets encountered - logger.info(f"Loaded local embedding for trigger {concept_name}") - continue - bin_file = self.hf_concepts_library.get_concept_model_path(concept_name) - if not bin_file: - continue - logger.info(f"Loaded remote embedding for trigger {concept_name}") - self.load_textual_inversion(bin_file) - self.hf_concepts_library.concepts_loaded[concept_name] = True - - def get_all_trigger_strings(self) -> list[str]: - return [ti.trigger_string for ti in self.textual_inversions] - - def load_textual_inversion( - self, ckpt_path: Union[str, Path], defer_injecting_tokens: bool = False - ): - ckpt_path = Path(ckpt_path) - - if not ckpt_path.is_file(): - return - - if str(ckpt_path).endswith(".DS_Store"): - return - - embedding_list = self._parse_embedding(str(ckpt_path)) - for embedding_info in embedding_list: - if (self.text_encoder.get_input_embeddings().weight.data[0].shape[0] != embedding_info.token_dim): - logger.warning( - f"Notice: {ckpt_path.parents[0].name}/{ckpt_path.name} was trained on a model with an incompatible token dimension: {self.text_encoder.get_input_embeddings().weight.data[0].shape[0]} vs {embedding_info.token_dim}." - ) - continue - - # Resolve the situation in which an earlier embedding has claimed the same - # trigger string. We replace the trigger with '', as we used to. - trigger_str = embedding_info.name - sourcefile = ( - f"{ckpt_path.parent.name}/{ckpt_path.name}" - if ckpt_path.name == "learned_embeds.bin" - else ckpt_path.name - ) - - if trigger_str in self.trigger_to_sourcefile: - replacement_trigger_str = ( - f"<{ckpt_path.parent.name}>" - if ckpt_path.name == "learned_embeds.bin" - else f"<{ckpt_path.stem}>" - ) - logger.info( - f"{sourcefile}: Trigger token '{trigger_str}' is already claimed by '{self.trigger_to_sourcefile[trigger_str]}'. Trigger this concept with {replacement_trigger_str}" - ) - trigger_str = replacement_trigger_str - - try: - self._add_textual_inversion( - trigger_str, - embedding_info.embedding, - defer_injecting_tokens=defer_injecting_tokens, - ) - # remember which source file claims this trigger - self.trigger_to_sourcefile[trigger_str] = sourcefile - - except ValueError as e: - logger.debug(f'Ignoring incompatible embedding {embedding_info["name"]}') - logger.debug(f"The error was {str(e)}") - - def _add_textual_inversion( - self, trigger_str, embedding, defer_injecting_tokens=False - ) -> Optional[TextualInversion]: - """ - Add a textual inversion to be recognised. - :param trigger_str: The trigger text in the prompt that activates this textual inversion. If unknown to the embedder's tokenizer, will be added. - :param embedding: The actual embedding data that will be inserted into the conditioning at the point where the token_str appears. - :return: The token id for the added embedding, either existing or newly-added. - """ - if trigger_str in [ti.trigger_string for ti in self.textual_inversions]: - logger.warning( - f"TextualInversionManager refusing to overwrite already-loaded token '{trigger_str}'" - ) - return - if not self.full_precision: - embedding = embedding.half() - if len(embedding.shape) == 1: - embedding = embedding.unsqueeze(0) - elif len(embedding.shape) > 2: - raise ValueError( - f"** TextualInversionManager cannot add {trigger_str} because the embedding shape {embedding.shape} is incorrect. The embedding must have shape [token_dim] or [V, token_dim] where V is vector length and token_dim is 768 for SD1 or 1280 for SD2." - ) - - try: - ti = TextualInversion(trigger_string=trigger_str, embedding=embedding) - if not defer_injecting_tokens: - self._inject_tokens_and_assign_embeddings(ti) - self.textual_inversions.append(ti) - return ti - - except ValueError as e: - if str(e).startswith("Warning"): - logger.warning(f"{str(e)}") - else: - traceback.print_exc() - logger.error( - f"TextualInversionManager was unable to add a textual inversion with trigger string {trigger_str}." - ) - raise - - def _inject_tokens_and_assign_embeddings(self, ti: TextualInversion) -> int: - if ti.trigger_token_id is not None: - raise ValueError( - f"Tokens already injected for textual inversion with trigger '{ti.trigger_string}'" - ) - - trigger_token_id = self._get_or_create_token_id_and_assign_embedding( - ti.trigger_string, ti.embedding[0] - ) - - if ti.embedding_vector_length > 1: - # for embeddings with vector length > 1 - pad_token_strings = [ - ti.trigger_string + "-!pad-" + str(pad_index) - for pad_index in range(1, ti.embedding_vector_length) - ] - # todo: batched UI for faster loading when vector length >2 - pad_token_ids = [ - self._get_or_create_token_id_and_assign_embedding( - pad_token_str, ti.embedding[1 + i] - ) - for (i, pad_token_str) in enumerate(pad_token_strings) - ] - else: - pad_token_ids = [] - - ti.trigger_token_id = trigger_token_id - ti.pad_token_ids = pad_token_ids - return ti.trigger_token_id - - def has_textual_inversion_for_trigger_string(self, trigger_string: str) -> bool: - try: - ti = self.get_textual_inversion_for_trigger_string(trigger_string) - return ti is not None - except StopIteration: - return False - - def get_textual_inversion_for_trigger_string( - self, trigger_string: str - ) -> TextualInversion: - return next( - ti for ti in self.textual_inversions if ti.trigger_string == trigger_string - ) - - def get_textual_inversion_for_token_id(self, token_id: int) -> TextualInversion: - return next( - ti for ti in self.textual_inversions if ti.trigger_token_id == token_id - ) - - def create_deferred_token_ids_for_any_trigger_terms( - self, prompt_string: str - ) -> list[int]: - injected_token_ids = [] - for ti in self.textual_inversions: - if ti.trigger_token_id is None and ti.trigger_string in prompt_string: - if ti.embedding_vector_length > 1: - logger.info( - f"Preparing tokens for textual inversion {ti.trigger_string}..." - ) - try: - self._inject_tokens_and_assign_embeddings(ti) - except ValueError as e: - logger.debug( - f"Ignoring incompatible embedding trigger {ti.trigger_string}" - ) - logger.debug(f"The error was {str(e)}") - continue - injected_token_ids.append(ti.trigger_token_id) - injected_token_ids.extend(ti.pad_token_ids) - return injected_token_ids - - def expand_textual_inversion_token_ids_if_necessary( - self, prompt_token_ids: list[int] - ) -> list[int]: - """ - Insert padding tokens as necessary into the passed-in list of token ids to match any textual inversions it includes. - - :param prompt_token_ids: The prompt as a list of token ids (`int`s). Should not include bos and eos markers. - :return: The prompt token ids with any necessary padding to account for textual inversions inserted. May be too - long - caller is responsible for prepending/appending eos and bos token ids, and truncating if necessary. - """ - if len(prompt_token_ids) == 0: - return prompt_token_ids - - if prompt_token_ids[0] == self.tokenizer.bos_token_id: - raise ValueError("prompt_token_ids must not start with bos_token_id") - if prompt_token_ids[-1] == self.tokenizer.eos_token_id: - raise ValueError("prompt_token_ids must not end with eos_token_id") - textual_inversion_trigger_token_ids = [ - ti.trigger_token_id for ti in self.textual_inversions - ] - prompt_token_ids = prompt_token_ids.copy() - for i, token_id in reversed(list(enumerate(prompt_token_ids))): - if token_id in textual_inversion_trigger_token_ids: - textual_inversion = next( - ti - for ti in self.textual_inversions - if ti.trigger_token_id == token_id - ) - for pad_idx in range(0, textual_inversion.embedding_vector_length - 1): - prompt_token_ids.insert( - i + pad_idx + 1, textual_inversion.pad_token_ids[pad_idx] - ) - - return prompt_token_ids - - def _get_or_create_token_id_and_assign_embedding( - self, token_str: str, embedding: torch.Tensor - ) -> int: - if len(embedding.shape) != 1: - raise ValueError( - "Embedding has incorrect shape - must be [token_dim] where token_dim is 768 for SD1 or 1280 for SD2" - ) - existing_token_id = self.tokenizer.convert_tokens_to_ids(token_str) - if existing_token_id == self.tokenizer.unk_token_id: - num_tokens_added = self.tokenizer.add_tokens(token_str) - current_embeddings = self.text_encoder.resize_token_embeddings(None) - current_token_count = current_embeddings.num_embeddings - new_token_count = current_token_count + num_tokens_added - # the following call is slow - todo make batched for better performance with vector length >1 - self.text_encoder.resize_token_embeddings(new_token_count) - - token_id = self.tokenizer.convert_tokens_to_ids(token_str) - if token_id == self.tokenizer.unk_token_id: - raise RuntimeError(f"Unable to find token id for token '{token_str}'") - if ( - self.text_encoder.get_input_embeddings().weight.data[token_id].shape - != embedding.shape - ): - raise ValueError( - f"Warning. Cannot load embedding for {token_str}. It was trained on a model with token dimension {embedding.shape[0]}, but the current model has token dimension {self.text_encoder.get_input_embeddings().weight.data[token_id].shape[0]}." - ) - self.text_encoder.get_input_embeddings().weight.data[token_id] = embedding - - return token_id - - - def _parse_embedding(self, embedding_file: str)->List[EmbeddingInfo]: - suffix = Path(embedding_file).suffix - try: - if suffix in [".pt",".ckpt",".bin"]: - scan_result = scan_file_path(embedding_file) - if scan_result.infected_files > 0: - logger.critical( - f"Security Issues Found in Model: {scan_result.issues_count}" - ) - logger.critical("For your safety, InvokeAI will not load this embed.") - return list() - ckpt = torch.load(embedding_file,map_location="cpu") - else: - ckpt = safetensors.torch.load_file(embedding_file) - except Exception as e: - logger.warning(f"Notice: unrecognized embedding file format: {embedding_file}: {e}") - return list() - - # try to figure out what kind of embedding file it is and parse accordingly - keys = list(ckpt.keys()) - if all(x in keys for x in ['string_to_token','string_to_param','name','step']): - return self._parse_embedding_v1(ckpt, embedding_file) # example rem_rezero.pt - - elif all(x in keys for x in ['string_to_token','string_to_param']): - return self._parse_embedding_v2(ckpt, embedding_file) # example midj-strong.pt - - elif 'emb_params' in keys: - return self._parse_embedding_v3(ckpt, embedding_file) # example easynegative.safetensors - - else: - return self._parse_embedding_v4(ckpt, embedding_file) # usually a '.bin' file - - def _parse_embedding_v1(self, embedding_ckpt: dict, file_path: str)->List[EmbeddingInfo]: - basename = Path(file_path).stem - logger.debug(f'Loading v1 embedding file: {basename}') - - embeddings = list() - token_counter = -1 - for token,embedding in embedding_ckpt["string_to_param"].items(): - if token_counter < 0: - trigger = embedding_ckpt["name"] - elif token_counter == 0: - trigger = '' - else: - trigger = f'<{basename}-{int(token_counter:=token_counter)}>' - token_counter += 1 - embedding_info = EmbeddingInfo( - name = trigger, - embedding = embedding, - num_vectors_per_token = embedding.size()[0], - token_dim = embedding.size()[1], - trained_steps = embedding_ckpt["step"], - trained_model_name = embedding_ckpt["sd_checkpoint_name"], - trained_model_checksum = embedding_ckpt["sd_checkpoint"] - ) - embeddings.append(embedding_info) - return embeddings - - def _parse_embedding_v2 ( - self, embedding_ckpt: dict, file_path: str - ) -> List[EmbeddingInfo]: - """ - This handles embedding .pt file variant #2. - """ - basename = Path(file_path).stem - logger.debug(f'Loading v2 embedding file: {basename}') - embeddings = list() - - if isinstance( - list(embedding_ckpt["string_to_token"].values())[0], torch.Tensor - ): - token_counter = 0 - for token,embedding in embedding_ckpt["string_to_param"].items(): - trigger = token if token != '*' \ - else f'<{basename}>' if token_counter == 0 \ - else f'<{basename}-{int(token_counter:=token_counter+1)}>' - embedding_info = EmbeddingInfo( - name = trigger, - embedding = embedding, - num_vectors_per_token = embedding.size()[0], - token_dim = embedding.size()[1], - ) - embeddings.append(embedding_info) - else: - logger.warning(f"{basename}: Unrecognized embedding format") - - return embeddings - - def _parse_embedding_v3(self, embedding_ckpt: dict, file_path: str)->List[EmbeddingInfo]: - """ - Parse 'version 3' of the .pt textual inversion embedding files. - """ - basename = Path(file_path).stem - logger.debug(f'Loading v3 embedding file: {basename}') - embedding = embedding_ckpt['emb_params'] - embedding_info = EmbeddingInfo( - name = f'<{basename}>', - embedding = embedding, - num_vectors_per_token = embedding.size()[0], - token_dim = embedding.size()[1], - ) - return [embedding_info] - - def _parse_embedding_v4(self, embedding_ckpt: dict, filepath: str)->List[EmbeddingInfo]: - """ - Parse 'version 4' of the textual inversion embedding files. This one - is usually associated with .bin files trained by HuggingFace diffusers. - """ - basename = Path(filepath).stem - short_path = Path(filepath).parents[0].name+'/'+Path(filepath).name - - logger.debug(f'Loading v4 embedding file: {short_path}') - - embeddings = list() - if list(embedding_ckpt.keys()) == 0: - logger.warning(f"Invalid embeddings file: {short_path}") - else: - for token,embedding in embedding_ckpt.items(): - embedding_info = EmbeddingInfo( - name = token or f"<{basename}>", - embedding = embedding, - num_vectors_per_token = 1, # All Concepts seem to default to 1 - token_dim = embedding.size()[0], - ) - embeddings.append(embedding_info) - return embeddings