import os import gc import gradio as gr import numpy as np import torch import json import spaces import config import utils import logging from PIL import Image, PngImagePlugin from datetime import datetime from diffusers.models import AutoencoderKL from diffusers import StableDiffusionXLPipeline, StableDiffusionXLImg2ImgPipeline logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) DESCRIPTION = "Multidiffusion demo" if not torch.cuda.is_available(): DESCRIPTION += "\n
Running on CPU 🥶 This demo does not work on CPU.
" IS_COLAB = utils.is_google_colab() or os.getenv("IS_COLAB") == "1" HF_TOKEN = os.getenv("HF_TOKEN") CACHE_EXAMPLES = torch.cuda.is_available() and os.getenv("CACHE_EXAMPLES") == "1" MIN_IMAGE_SIZE = int(os.getenv("MIN_IMAGE_SIZE", "512")) MAX_IMAGE_SIZE = int(os.getenv("MAX_IMAGE_SIZE", "2048")) USE_TORCH_COMPILE = os.getenv("USE_TORCH_COMPILE") == "1" ENABLE_CPU_OFFLOAD = os.getenv("ENABLE_CPU_OFFLOAD") == "1" OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./outputs") MODEL = os.getenv( "MODEL", "https://huggingface.co/AstraliteHeart/pony-diffusion-v6/blob/main/v6.safetensors", ) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") def load_pipeline(model_name): vae = AutoencoderKL.from_pretrained( "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16, ) pipeline = ( StableDiffusionXLPipeline.from_single_file if MODEL.endswith(".safetensors") else StableDiffusionXLPipeline.from_pretrained ) pipe = pipeline( model_name, vae=vae, torch_dtype=torch.float16, custom_pipeline="lpw_stable_diffusion_xl", use_safetensors=True, add_watermarker=False, use_auth_token=HF_TOKEN, variant="fp16", ) pipe.to(device) return pipe def parse_json_parameters(json_str): try: params = json.loads(json_str) required_keys = ['prompt', 'negative_prompt', 'resolution', 'guidance_scale', 'num_inference_steps', 'seed', 'sampler'] for key in required_keys: if key not in params: raise ValueError(f"Missing required key: {key}") # Parse resolution width, height = map(int, params['resolution'].split(' x ')) return { 'prompt': params['prompt'], 'negative_prompt': params['negative_prompt'], 'seed': params['seed'], 'width': width, 'height': height, 'guidance_scale': params['guidance_scale'], 'num_inference_steps': params['num_inference_steps'], 'sampler': params['sampler'], 'use_upscaler': params.get('use_upscaler', False) } except json.JSONDecodeError: raise ValueError("Invalid JSON format") except Exception as e: raise ValueError(f"Error parsing JSON: {str(e)}") @spaces.GPU def generate( prompt: str, negative_prompt: str = "", seed: int = 0, custom_width: int = 1024, custom_height: int = 1024, guidance_scale: float = 7.0, num_inference_steps: int = 30, sampler: str = "DPM++ 2M SDE Karras", aspect_ratio_selector: str = "1024 x 1024", use_upscaler: bool = False, upscaler_strength: float = 0.55, upscale_by: float = 1.5, json_params: str = "", progress=gr.Progress(track_tqdm=True), ) -> Image: if json_params: try: params = parse_json_parameters(json_params) prompt = params['prompt'] negative_prompt = params['negative_prompt'] seed = params['seed'] custom_width = params['width'] custom_height = params['height'] guidance_scale = params['guidance_scale'] num_inference_steps = params['num_inference_steps'] sampler = params['sampler'] use_upscaler = params['use_upscaler'] except ValueError as e: raise gr.Error(str(e)) generator = utils.seed_everything(seed) width, height = utils.aspect_ratio_handler( aspect_ratio_selector, custom_width, custom_height, ) width, height = utils.preprocess_image_dimensions(width, height) backup_scheduler = pipe.scheduler pipe.scheduler = utils.get_scheduler(pipe.scheduler.config, sampler) if use_upscaler: upscaler_pipe = StableDiffusionXLImg2ImgPipeline(**pipe.components) metadata = { "prompt": prompt, "negative_prompt": negative_prompt, "resolution": f"{width} x {height}", "guidance_scale": guidance_scale, "num_inference_steps": num_inference_steps, "seed": seed, "sampler": sampler, } if use_upscaler: new_width = int(width * upscale_by) new_height = int(height * upscale_by) metadata["use_upscaler"] = { "upscale_method": "nearest-exact", "upscaler_strength": upscaler_strength, "upscale_by": upscale_by, "new_resolution": f"{new_width} x {new_height}", } else: metadata["use_upscaler"] = None logger.info(json.dumps(metadata, indent=4)) try: if use_upscaler: latents = pipe( prompt=prompt, negative_prompt=negative_prompt, width=width, height=height, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, generator=generator, output_type="latent", ).images upscaled_latents = utils.upscale(latents, "nearest-exact", upscale_by) images = upscaler_pipe( prompt=prompt, negative_prompt=negative_prompt, image=upscaled_latents, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, strength=upscaler_strength, generator=generator, output_type="pil", ).images else: images = pipe( prompt=prompt, negative_prompt=negative_prompt, width=width, height=height, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, generator=generator, output_type="pil", ).images if images and IS_COLAB: for image in images: filepath = utils.save_image(image, metadata, OUTPUT_DIR) logger.info(f"Image saved as {filepath} with metadata") return images, metadata except Exception as e: logger.exception(f"An error occurred: {e}") raise finally: if use_upscaler: del upscaler_pipe pipe.scheduler = backup_scheduler utils.free_memory() # Initialize an empty list to store the generation history generation_history = [] # Function to update the history list def update_history_list(): return [item["image"] for item in generation_history] # Function to handle image click in history def handle_image_click(evt: gr.SelectData): selected = generation_history[evt.index] return selected["image"], json.dumps(selected["metadata"], indent=2) # Modify the generate function to add results to the history def generate_and_update_history(*args, **kwargs): images, metadata = generate(*args, **kwargs) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") generation_history.insert(0, { "prompt": metadata["prompt"], "timestamp": timestamp, "image": images[0], "metadata": metadata }) if len(generation_history) > 10: # Limit history to 10 items generation_history.pop() return images[0], json.dumps(metadata, indent=2), update_history_list() if torch.cuda.is_available(): pipe = load_pipeline(MODEL) logger.info("Loaded on Device!") else: pipe = None with gr.Blocks(css="style.css") as demo: title = gr.HTML( f"""