diff --git a/examples/prompt2im_hypernet.py b/examples/prompt2im_hypernet.py index c580d70..af8450b 100755 --- a/examples/prompt2im_hypernet.py +++ b/examples/prompt2im_hypernet.py @@ -10,8 +10,8 @@ path = "./models/icb_diffusers_final/" path_hypernet = './models/hypernetworks/digitalben.pt' -pipe = Prompt2ImPipe(path, lpw=True, scheduler='DPMSolverMultistepScheduler') -pipe.setup(width=512, height=512, guidance_scale=5.5, clip_skip=1) +pipe = Prompt2ImPipe(path, lpw=True) +pipe.setup(width=512, height=512, guidance_scale=5.5, clip_skip=1, scheduler='DPMSolverMultistepScheduler') pipe.add_hypernet(path_hypernet, 0.65) # pipe.clear_hypernets() diff --git a/multigen/pipes.py b/multigen/pipes.py index 5ec8e51..3f9b9ef 100755 --- a/multigen/pipes.py +++ b/multigen/pipes.py @@ -1,4 +1,6 @@ import importlib + +import PIL import torch from PIL import Image @@ -124,6 +126,7 @@ def setup(self, steps=50, **args): # TODO? add scheduler to config? self.try_set_scheduler(dict(scheduler=args['scheduler'])) + class Prompt2ImPipe(BasePipe): def __init__(self, model_id: str, @@ -143,8 +146,8 @@ def setup(self, width=768, height=768, guidance_scale=7.5, **args): "guidance_scale": guidance_scale }) - def gen(self, inputs): - inputs = {**inputs} + def gen(self, inputs: dict) -> PIL.Image.Image: + inputs = inputs.copy() inputs.update(self.pipe_params) # allow for scheduler overwrite self.try_set_scheduler(inputs) @@ -158,29 +161,28 @@ def __init__(self, model_id, pipe: Optional[StableDiffusionImg2ImgPipeline] = No super().__init__(model_id=model_id, sd_pipe_class=StableDiffusionImg2ImgPipeline, pipe=pipe, **args) self._input_image = None - def setup(self, fimage, image=None, strength=0.75, gscale=7.5, scale=None, **args): + def setup(self, img_path, image=None, strength=0.75, gscale=7.5, scale=None, **args): super().setup(**args) - self.fname = fimage - self._input_image = Image.open(fimage).convert("RGB") if image is None else image + self.img_path = img_path + self._input_image = Image.open(img_path).convert("RGB") if image is None else image if scale is not None: if not isinstance(scale, list): scale = [8 * (int(self._input_image.size[i] * scale) // 8) for i in range(2)] self._input_image = self._input_image.resize((scale[0], scale[1])) self.pipe_params.update({ "strength": strength, - "guidance_scale": gscale - }) + "guidance_scale": gscale}) def get_config(self): cfg = super().get_config() cfg.update({ - "source_image": self.fname, + "source_image": self.img_path, }) cfg.update(self.pipe_params) return cfg - def gen(self, inputs): - inputs = {**inputs} + def gen(self, inputs: dict) -> PIL.Image.Image: + inputs = inputs.copy() inputs.update(self.pipe_params) inputs.update({"image": self._input_image}) self.try_set_scheduler(inputs) @@ -189,6 +191,9 @@ def gen(self, inputs): class Cond2ImPipe(BasePipe): + """ + Base class for ControlNet pipelines + """ # TODO: set path cpath = "./models-cn/" @@ -201,7 +206,7 @@ class Cond2ImPipe(BasePipe): "depth": "control_v11f1p_sd15_depth", "inpaint": "control_v11p_sd15_inpaint" } - cscalem = { + default_cs = { "canny": 0.75, "pose": 1.0, "ip2p": 0.5, @@ -215,23 +220,23 @@ def __init__(self, model_id, pipe: Optional[StableDiffusionControlNetPipeline] = ctypes=["soft"], **args): if not isinstance(ctypes, list): ctypes = [ctypes] - self.ctypes = ctypes + self.control_types = ctypes self._condition_image = None dtype = torch.float16 if 'torch_type' not in args else args['torch_type'] - cnets = [ControlNetModel.from_pretrained(CIm2ImPipe.cpath+CIm2ImPipe.cmodels[c], torch_dtype=dtype) for c in ctypes] + cnets = [ControlNetModel.from_pretrained(self.cpath+self.cmodels[c], torch_dtype=dtype) for c in ctypes] super().__init__(sd_pipe_class=StableDiffusionControlNetPipeline, model_id=model_id, pipe=pipe, controlnet=cnets, **args) # FIXME: do we need to setup this specific scheduler here? # should we pass its name in setup to super? self.pipe.scheduler = UniPCMultistepScheduler.from_config(self.pipe.scheduler.config) - def setup(self, fimage, width=None, height=None, image=None, cscales=None, guess_mode=False, **args): + def setup(self, img_path, width=None, height=None, image=None, cscales=None, guess_mode=False, **args): super().setup(**args) # TODO: allow multiple input images for multiple control nets - self.fname = fimage - image = Image.open(fimage) if image is None else image + self.img_path = img_path + image = Image.open(img_path) if image is None else image self._condition_image = [image] if cscales is None: - cscales = [CIm2ImPipe.cscalem[c] for c in self.ctypes] + cscales = [self.default_cs[c] for c in self.control_types] self.pipe_params.update({ "width": image.size[0] if width is None else width, "height": image.size[1] if height is None else height, @@ -243,8 +248,8 @@ def setup(self, fimage, width=None, height=None, image=None, cscales=None, guess def get_config(self): cfg = super().get_config() cfg.update({ - "source_image": self.fname, - "control_type": self.ctypes + "source_image": self.img_path, + "control_type": self.control_types }) cfg.update(self.pipe_params) return cfg @@ -258,7 +263,9 @@ def gen(self, inputs): class CIm2ImPipe(Cond2ImPipe): - + """ + ControlNet pipeline with conditional image generation + """ def __init__(self, model_id, pipe: Optional[StableDiffusionControlNetPipeline] = None, ctypes=["soft"], **args): super().__init__(model_id=model_id, pipe=pipe, ctypes=ctypes, **args) @@ -280,15 +287,15 @@ def __init__(self, model_id, pipe: Optional[StableDiffusionControlNetPipeline] = self.dprocessor = DPTImageProcessor.from_pretrained("./models-other/dpt-large") self.dmodel = DPTForDepthEstimation.from_pretrained("./models-other/dpt-large") - def setup(self, fimage, width=None, height=None, image=None, cscales=None, guess_mode=False, **args): - super().setup(fimage, width, height, image, cscales, guess_mode, **args) + def setup(self, img_path, width=None, height=None, image=None, cscales=None, guess_mode=False, **args): + super().setup(img_path, width, height, image, cscales, guess_mode, **args) # Additionally process the input image # REM: CIm2ImPipe expects only one image, which can be the base for multiple control images self._condition_image = self._proc_cimg(np.asarray(self._condition_image[0])) def _proc_cimg(self, oriImg): condition_image = [] - for c in self.ctypes: + for c in self.control_types: if c == "canny": image = canny_processor(oriImg) condition_image += [Image.fromarray(image)] @@ -336,11 +343,11 @@ def __init__(self, model_id, pipe: Optional[StableDiffusionControlNetPipeline] = # should we pass its name in setup to super? self.pipe.scheduler = DDIMScheduler.from_config(self.pipe.scheduler.config) - def setup(self, fimage, mask_image, image=None, **args): + def setup(self, img_path, mask_image, image=None, **args): super().setup(**args) # TODO: allow multiple input images for multiple control nets - self.fname = fimage - self._init_image = Image.open(fimage) if image is None else image + self.img_path = img_path + self._init_image = Image.open(img_path) if image is None else image self._mask_image = mask_image self._control_image = self._make_inpaint_condition(self._init_image, mask_image) # self._condition_image = [image] diff --git a/multigen/sessions.py b/multigen/sessions.py index 540667f..67dcb71 100755 --- a/multigen/sessions.py +++ b/multigen/sessions.py @@ -3,15 +3,16 @@ import json from . import util from .prompting import Cfgen +from .pipes import BasePipe class GenSession: - def __init__(self, session_dir, pipe, config: Cfgen, name_prefix=""): + def __init__(self, session_dir: str, pipe: BasePipe, config: Cfgen, name_prefix=""): self.session_dir = session_dir - self.pipe = pipe - self.model_id = pipe._model_id - self.confg = config + self.pipe: BasePipe = pipe + self.model_id: str = pipe._model_id + self.confg: Cfgen = config self.last_conf = None self.name_prefix = name_prefix