import gc from math import ceil from typing import Any, List import random import numpy as np import torch import logging from backend.device import is_openvino_device from backend.lora import load_lora_weight from backend.controlnet import ( load_controlnet_adapters, update_controlnet_arguments, ) from backend.models.lcmdiffusion_setting import ( DiffusionTask, LCMDiffusionSetting, LCMLora, ) from backend.openvino.pipelines import ( get_ov_image_to_image_pipeline, get_ov_text_to_image_pipeline, ov_load_taesd, ) from backend.pipelines.lcm import ( get_image_to_image_pipeline, get_lcm_model_pipeline, load_taesd, ) from backend.pipelines.lcm_lora import get_lcm_lora_pipeline from constants import DEVICE, GGUF_THREADS from diffusers import LCMScheduler from image_ops import resize_pil_image from backend.openvino.flux_pipeline import get_flux_pipeline from backend.openvino.ov_hc_stablediffusion_pipeline import OvHcLatentConsistency from backend.gguf.gguf_diffusion import ( GGUFDiffusion, ModelConfig, Txt2ImgConfig, SampleMethod, ) from paths import get_app_path from pprint import pprint try: # support for token merging; keeping it optional for now import tomesd except ImportError: print("tomesd library unavailable; disabling token merging support") tomesd = None class LCMTextToImage: def __init__( self, device: str = "cpu", ) -> None: self.pipeline = None self.use_openvino = False self.device = "" self.previous_model_id = None self.previous_use_tae_sd = False self.previous_use_lcm_lora = False self.previous_ov_model_id = "" self.previous_token_merging = 0.0 self.previous_safety_checker = False self.previous_use_openvino = False self.img_to_img_pipeline = None self.is_openvino_init = False self.previous_lora = None self.task_type = DiffusionTask.text_to_image self.previous_use_gguf_model = False self.previous_gguf_model = None self.torch_data_type = ( torch.float32 if is_openvino_device() or DEVICE == "mps" else torch.float16 ) self.ov_model_id = None print(f"Torch datatype : {self.torch_data_type}") def _pipeline_to_device(self): print(f"Pipeline device : {DEVICE}") print(f"Pipeline dtype : {self.torch_data_type}") self.pipeline.to( torch_device=DEVICE, torch_dtype=self.torch_data_type, ) def _add_freeu(self): pipeline_class = self.pipeline.__class__.__name__ if isinstance(self.pipeline.scheduler, LCMScheduler): if pipeline_class == "StableDiffusionPipeline": print("Add FreeU - SD") self.pipeline.enable_freeu( s1=0.9, s2=0.2, b1=1.2, b2=1.4, ) elif pipeline_class == "StableDiffusionXLPipeline": print("Add FreeU - SDXL") self.pipeline.enable_freeu( s1=0.6, s2=0.4, b1=1.1, b2=1.2, ) def _enable_vae_tiling(self): self.pipeline.vae.enable_tiling() def _update_lcm_scheduler_params(self): if isinstance(self.pipeline.scheduler, LCMScheduler): self.pipeline.scheduler = LCMScheduler.from_config( self.pipeline.scheduler.config, beta_start=0.001, beta_end=0.01, ) def _is_hetero_pipeline(self) -> bool: return "square" in self.ov_model_id.lower() def _load_ov_hetero_pipeline(self): print("Loading Heterogeneous Compute pipeline") if DEVICE.upper()=="NPU": device = ["NPU", "NPU", "NPU"] self.pipeline = OvHcLatentConsistency(self.ov_model_id,device) else: self.pipeline = OvHcLatentConsistency(self.ov_model_id) def _generate_images_hetero_compute( self, lcm_diffusion_setting: LCMDiffusionSetting, ): print("Using OpenVINO ") if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value: return [ self.pipeline.generate( prompt=lcm_diffusion_setting.prompt, neg_prompt=lcm_diffusion_setting.negative_prompt, init_image=None, strength=1.0, num_inference_steps=lcm_diffusion_setting.inference_steps, ) ] else: return [ self.pipeline.generate( prompt=lcm_diffusion_setting.prompt, neg_prompt=lcm_diffusion_setting.negative_prompt, init_image=lcm_diffusion_setting.init_image, strength=lcm_diffusion_setting.strength, num_inference_steps=lcm_diffusion_setting.inference_steps, ) ] def _is_valid_mode( self, modes: List, ) -> bool: return modes.count(True) == 1 or modes.count(False) == 3 def _validate_mode( self, modes: List, ) -> None: if not self._is_valid_mode(modes): raise ValueError("Invalid mode,delete configs/settings.yaml and retry!") def init( self, device: str = "cpu", lcm_diffusion_setting: LCMDiffusionSetting = LCMDiffusionSetting(), ) -> None: # Mode validation either LCM LoRA or OpenVINO or GGUF modes = [ lcm_diffusion_setting.use_gguf_model, lcm_diffusion_setting.use_openvino, lcm_diffusion_setting.use_lcm_lora, ] self._validate_mode(modes) self.device = device self.use_openvino = lcm_diffusion_setting.use_openvino model_id = lcm_diffusion_setting.lcm_model_id use_local_model = lcm_diffusion_setting.use_offline_model use_tiny_auto_encoder = lcm_diffusion_setting.use_tiny_auto_encoder use_lora = lcm_diffusion_setting.use_lcm_lora lcm_lora: LCMLora = lcm_diffusion_setting.lcm_lora token_merging = lcm_diffusion_setting.token_merging self.ov_model_id = lcm_diffusion_setting.openvino_lcm_model_id if lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value: lcm_diffusion_setting.init_image = resize_pil_image( lcm_diffusion_setting.init_image, lcm_diffusion_setting.image_width, lcm_diffusion_setting.image_height, ) if ( self.pipeline is None or self.previous_model_id != model_id or self.previous_use_tae_sd != use_tiny_auto_encoder or self.previous_lcm_lora_base_id != lcm_lora.base_model_id or self.previous_lcm_lora_id != lcm_lora.lcm_lora_id or self.previous_use_lcm_lora != use_lora or self.previous_ov_model_id != self.ov_model_id or self.previous_token_merging != token_merging or self.previous_safety_checker != lcm_diffusion_setting.use_safety_checker or self.previous_use_openvino != lcm_diffusion_setting.use_openvino or self.previous_use_gguf_model != lcm_diffusion_setting.use_gguf_model or self.previous_gguf_model != lcm_diffusion_setting.gguf_model or ( self.use_openvino and ( self.previous_task_type != lcm_diffusion_setting.diffusion_task or self.previous_lora != lcm_diffusion_setting.lora ) ) or lcm_diffusion_setting.rebuild_pipeline ): if self.use_openvino and is_openvino_device(): if self.pipeline: del self.pipeline self.pipeline = None gc.collect() self.is_openvino_init = True if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value ): print( f"***** Init Text to image (OpenVINO) - {self.ov_model_id} *****" ) if "flux" in self.ov_model_id.lower(): print("Loading OpenVINO Flux pipeline") self.pipeline = get_flux_pipeline( self.ov_model_id, lcm_diffusion_setting.use_tiny_auto_encoder, ) elif self._is_hetero_pipeline(): self._load_ov_hetero_pipeline() else: self.pipeline = get_ov_text_to_image_pipeline( self.ov_model_id, use_local_model, ) elif ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): if not self.pipeline and self._is_hetero_pipeline(): self._load_ov_hetero_pipeline() else: print( f"***** Image to image (OpenVINO) - {self.ov_model_id} *****" ) self.pipeline = get_ov_image_to_image_pipeline( self.ov_model_id, use_local_model, ) elif lcm_diffusion_setting.use_gguf_model: model = lcm_diffusion_setting.gguf_model.diffusion_path print(f"***** Init Text to image (GGUF) - {model} *****") # if self.pipeline: # self.pipeline.terminate() # del self.pipeline # self.pipeline = None self._init_gguf_diffusion(lcm_diffusion_setting) else: if self.pipeline or self.img_to_img_pipeline: self.pipeline = None self.img_to_img_pipeline = None gc.collect() controlnet_args = load_controlnet_adapters(lcm_diffusion_setting) if use_lora: print( f"***** Init LCM-LoRA pipeline - {lcm_lora.base_model_id} *****" ) self.pipeline = get_lcm_lora_pipeline( lcm_lora.base_model_id, lcm_lora.lcm_lora_id, use_local_model, torch_data_type=self.torch_data_type, pipeline_args=controlnet_args, ) else: print(f"***** Init LCM Model pipeline - {model_id} *****") self.pipeline = get_lcm_model_pipeline( model_id, use_local_model, controlnet_args, ) self.img_to_img_pipeline = get_image_to_image_pipeline(self.pipeline) if tomesd and token_merging > 0.001: print(f"***** Token Merging: {token_merging} *****") tomesd.apply_patch(self.pipeline, ratio=token_merging) tomesd.apply_patch(self.img_to_img_pipeline, ratio=token_merging) if use_tiny_auto_encoder: if self.use_openvino and is_openvino_device(): if self.pipeline.__class__.__name__ != "OVFluxPipeline": print("Using Tiny Auto Encoder (OpenVINO)") ov_load_taesd( self.pipeline, use_local_model, ) else: print("Using Tiny Auto Encoder") load_taesd( self.pipeline, use_local_model, self.torch_data_type, ) load_taesd( self.img_to_img_pipeline, use_local_model, self.torch_data_type, ) if not self.use_openvino and not is_openvino_device(): self._pipeline_to_device() if not self._is_hetero_pipeline(): if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value and lcm_diffusion_setting.use_openvino ): self.pipeline.scheduler = LCMScheduler.from_config( self.pipeline.scheduler.config, ) else: if not lcm_diffusion_setting.use_gguf_model: self._update_lcm_scheduler_params() if use_lora: self._add_freeu() self.previous_model_id = model_id self.previous_ov_model_id = self.ov_model_id self.previous_use_tae_sd = use_tiny_auto_encoder self.previous_lcm_lora_base_id = lcm_lora.base_model_id self.previous_lcm_lora_id = lcm_lora.lcm_lora_id self.previous_use_lcm_lora = use_lora self.previous_token_merging = lcm_diffusion_setting.token_merging self.previous_safety_checker = lcm_diffusion_setting.use_safety_checker self.previous_use_openvino = lcm_diffusion_setting.use_openvino self.previous_task_type = lcm_diffusion_setting.diffusion_task self.previous_lora = lcm_diffusion_setting.lora.model_copy(deep=True) self.previous_use_gguf_model = lcm_diffusion_setting.use_gguf_model self.previous_gguf_model = lcm_diffusion_setting.gguf_model.model_copy( deep=True ) lcm_diffusion_setting.rebuild_pipeline = False if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value ): print(f"Pipeline : {self.pipeline}") elif ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): if self.use_openvino and is_openvino_device(): print(f"Pipeline : {self.pipeline}") else: print(f"Pipeline : {self.img_to_img_pipeline}") if self.use_openvino: if lcm_diffusion_setting.lora.enabled: print("Warning: Lora models not supported on OpenVINO mode") elif not lcm_diffusion_setting.use_gguf_model: adapters = self.pipeline.get_active_adapters() print(f"Active adapters : {adapters}") def _get_timesteps(self): time_steps = self.pipeline.scheduler.config.get("timesteps") time_steps_value = [int(time_steps)] if time_steps else None return time_steps_value def generate( self, lcm_diffusion_setting: LCMDiffusionSetting, reshape: bool = False, ) -> Any: guidance_scale = lcm_diffusion_setting.guidance_scale img_to_img_inference_steps = lcm_diffusion_setting.inference_steps check_step_value = int( lcm_diffusion_setting.inference_steps * lcm_diffusion_setting.strength ) if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value and check_step_value < 1 ): img_to_img_inference_steps = ceil(1 / lcm_diffusion_setting.strength) print( f"Strength: {lcm_diffusion_setting.strength},{img_to_img_inference_steps}" ) pipeline_extra_args = {} if lcm_diffusion_setting.use_seed: cur_seed = lcm_diffusion_setting.seed # for multiple images with a fixed seed, use sequential seeds seeds = [(cur_seed + i) for i in range(lcm_diffusion_setting.number_of_images)] else: seeds = [random.randint(0,999999999) for i in range(lcm_diffusion_setting.number_of_images)] if self.use_openvino: # no support for generators; try at least to ensure reproducible results for single images np.random.seed(seeds[0]) if self._is_hetero_pipeline(): torch.manual_seed(seeds[0]) lcm_diffusion_setting.seed=seeds[0] else: pipeline_extra_args['generator'] = [ torch.Generator(device=self.device).manual_seed(s) for s in seeds] is_openvino_pipe = lcm_diffusion_setting.use_openvino and is_openvino_device() if is_openvino_pipe and not self._is_hetero_pipeline(): print("Using OpenVINO") if reshape and not self.is_openvino_init: print("Reshape and compile") self.pipeline.reshape( batch_size=-1, height=lcm_diffusion_setting.image_height, width=lcm_diffusion_setting.image_width, num_images_per_prompt=lcm_diffusion_setting.number_of_images, ) self.pipeline.compile() if self.is_openvino_init: self.is_openvino_init = False if is_openvino_pipe and self._is_hetero_pipeline(): return self._generate_images_hetero_compute(lcm_diffusion_setting) elif lcm_diffusion_setting.use_gguf_model: return self._generate_images_gguf(lcm_diffusion_setting) if lcm_diffusion_setting.clip_skip > 1: # We follow the convention that "CLIP Skip == 2" means "skip # the last layer", so "CLIP Skip == 1" means "no skipping" pipeline_extra_args["clip_skip"] = lcm_diffusion_setting.clip_skip - 1 if not lcm_diffusion_setting.use_safety_checker: self.pipeline.safety_checker = None if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value and not is_openvino_pipe ): self.img_to_img_pipeline.safety_checker = None if ( not lcm_diffusion_setting.use_lcm_lora and not lcm_diffusion_setting.use_openvino and lcm_diffusion_setting.guidance_scale != 1.0 ): print("Not using LCM-LoRA so setting guidance_scale 1.0") guidance_scale = 1.0 controlnet_args = update_controlnet_arguments(lcm_diffusion_setting) if lcm_diffusion_setting.use_openvino: if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value ): result_images = self.pipeline( prompt=lcm_diffusion_setting.prompt, negative_prompt=lcm_diffusion_setting.negative_prompt, num_inference_steps=lcm_diffusion_setting.inference_steps, guidance_scale=guidance_scale, width=lcm_diffusion_setting.image_width, height=lcm_diffusion_setting.image_height, num_images_per_prompt=lcm_diffusion_setting.number_of_images, ).images elif ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): result_images = self.pipeline( image=lcm_diffusion_setting.init_image, strength=lcm_diffusion_setting.strength, prompt=lcm_diffusion_setting.prompt, negative_prompt=lcm_diffusion_setting.negative_prompt, num_inference_steps=img_to_img_inference_steps * 3, guidance_scale=guidance_scale, num_images_per_prompt=lcm_diffusion_setting.number_of_images, ).images else: if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value ): result_images = self.pipeline( prompt=lcm_diffusion_setting.prompt, negative_prompt=lcm_diffusion_setting.negative_prompt, num_inference_steps=lcm_diffusion_setting.inference_steps, guidance_scale=guidance_scale, width=lcm_diffusion_setting.image_width, height=lcm_diffusion_setting.image_height, num_images_per_prompt=lcm_diffusion_setting.number_of_images, timesteps=self._get_timesteps(), **pipeline_extra_args, **controlnet_args, ).images elif ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): result_images = self.img_to_img_pipeline( image=lcm_diffusion_setting.init_image, strength=lcm_diffusion_setting.strength, prompt=lcm_diffusion_setting.prompt, negative_prompt=lcm_diffusion_setting.negative_prompt, num_inference_steps=img_to_img_inference_steps, guidance_scale=guidance_scale, width=lcm_diffusion_setting.image_width, height=lcm_diffusion_setting.image_height, num_images_per_prompt=lcm_diffusion_setting.number_of_images, **pipeline_extra_args, **controlnet_args, ).images for (i, seed) in enumerate(seeds): result_images[i].info['image_seed'] = seed return result_images def _init_gguf_diffusion( self, lcm_diffusion_setting: LCMDiffusionSetting, ): config = ModelConfig() config.model_path = lcm_diffusion_setting.gguf_model.diffusion_path config.diffusion_model_path = lcm_diffusion_setting.gguf_model.diffusion_path config.clip_l_path = lcm_diffusion_setting.gguf_model.clip_path config.t5xxl_path = lcm_diffusion_setting.gguf_model.t5xxl_path config.vae_path = lcm_diffusion_setting.gguf_model.vae_path config.n_threads = GGUF_THREADS print(f"GGUF Threads : {GGUF_THREADS} ") print("GGUF - Model config") pprint(lcm_diffusion_setting.gguf_model.model_dump()) self.pipeline = GGUFDiffusion( get_app_path(), # Place DLL in fastsdcpu folder config, True, ) def _generate_images_gguf( self, lcm_diffusion_setting: LCMDiffusionSetting, ): if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value: t2iconfig = Txt2ImgConfig() t2iconfig.prompt = lcm_diffusion_setting.prompt t2iconfig.batch_count = lcm_diffusion_setting.number_of_images t2iconfig.cfg_scale = lcm_diffusion_setting.guidance_scale t2iconfig.height = lcm_diffusion_setting.image_height t2iconfig.width = lcm_diffusion_setting.image_width t2iconfig.sample_steps = lcm_diffusion_setting.inference_steps t2iconfig.sample_method = SampleMethod.EULER if lcm_diffusion_setting.use_seed: t2iconfig.seed = lcm_diffusion_setting.seed else: t2iconfig.seed = -1 return self.pipeline.generate_text2mg(t2iconfig)