Spaces:
Running
Running
from concurrent.futures import ThreadPoolExecutor | |
from pathlib import Path | |
from typing import Optional | |
import uuid | |
from lcm.lcm_scheduler import LCMScheduler | |
from lcm.lcm_pipeline import LatentConsistencyModelPipeline | |
from lcm.lcm_i2i_pipeline import LatentConsistencyModelImg2ImgPipeline, LCMSchedulerWithTimestamp | |
from diffusers.image_processor import PipelineImageInput | |
# import modules.scripts as scripts | |
# import modules.shared | |
# from modules import script_callbacks | |
import os | |
import random | |
import time | |
import numpy as np | |
import gradio as gr | |
from PIL import Image, PngImagePlugin | |
import torch | |
scheduler = LCMScheduler.from_pretrained( | |
"SimianLuo/LCM_Dreamshaper_v7", subfolder="scheduler") | |
pipe = LatentConsistencyModelPipeline.from_pretrained( | |
"SimianLuo/LCM_Dreamshaper_v7", scheduler = scheduler, safety_checker = None) | |
DESCRIPTION = '''# Latent Consistency Model | |
Running [LCM_Dreamshaper_v7](https://huggingface.co/SimianLuo/LCM_Dreamshaper_v7) | [Project Page](https://latent-consistency-models.github.io) | [Extension Page](https://github.com/0xbitches/sd-webui-lcm) | |
''' | |
MAX_SEED = np.iinfo(np.int32).max | |
MAX_IMAGE_SIZE = int(os.getenv("MAX_IMAGE_SIZE", "768")) | |
def randomize_seed_fn(seed: int, randomize_seed: bool) -> int: | |
if randomize_seed: | |
seed = random.randint(0, MAX_SEED) | |
return seed | |
def save_image(img, metadata: dict): | |
save_dir = './outputs/LCM-txt2img/' | |
Path(save_dir).mkdir(exist_ok=True, parents=True) | |
seed = metadata["seed"] | |
unique_id = uuid.uuid4() | |
filename = save_dir + f"{unique_id}-{seed}" + ".png" | |
meta_tuples = [(k, str(v)) for k, v in metadata.items()] | |
png_info = PngImagePlugin.PngInfo() | |
for k, v in meta_tuples: | |
png_info.add_text(k, v) | |
img.save(filename, pnginfo=png_info) | |
return filename | |
def save_images(image_array, metadata: dict): | |
paths = [] | |
with ThreadPoolExecutor() as executor: | |
paths = list(executor.map(save_image, image_array, | |
[metadata]*len(image_array))) | |
return paths | |
def generate( | |
prompt: str, | |
seed: int = 0, | |
width: int = 512, | |
height: int = 512, | |
guidance_scale: float = 8.0, | |
num_inference_steps: int = 4, | |
num_images: int = 4, | |
randomize_seed: bool = False, | |
use_fp16: bool = True, | |
use_torch_compile: bool = False, | |
use_cpu: bool = False, | |
progress=gr.Progress(track_tqdm=True) | |
) -> Image.Image: | |
seed = randomize_seed_fn(seed, randomize_seed) | |
torch.manual_seed(seed) | |
selected_device = 'cuda' | |
if use_cpu: | |
selected_device = "cpu" | |
if use_fp16: | |
use_fp16 = False | |
print("LCM warning: running on CPU, overrode FP16 with FP32") | |
global pipe, scheduler | |
pipe = LatentConsistencyModelPipeline( | |
vae= pipe.vae, | |
text_encoder = pipe.text_encoder, | |
tokenizer = pipe.tokenizer, | |
unet = pipe.unet, | |
scheduler = scheduler, | |
safety_checker = pipe.safety_checker, | |
feature_extractor = pipe.feature_extractor, | |
) | |
# pipe = LatentConsistencyModelPipeline.from_pretrained( | |
# "SimianLuo/LCM_Dreamshaper_v7", scheduler = scheduler, safety_checker = None) | |
if use_fp16: | |
pipe.to(torch_device=selected_device, torch_dtype=torch.float16) | |
else: | |
pipe.to(torch_device=selected_device, torch_dtype=torch.float32) | |
# Windows does not support torch.compile for now | |
if os.name != 'nt' and use_torch_compile: | |
pipe.unet = torch.compile(pipe.unet, mode='max-autotune') | |
start_time = time.time() | |
result = pipe( | |
prompt=prompt, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=num_inference_steps, | |
num_images_per_prompt=num_images, | |
original_inference_steps=50, | |
output_type="pil", | |
device = selected_device | |
).images | |
paths = save_images(result, metadata={"prompt": prompt, "seed": seed, "width": width, | |
"height": height, "guidance_scale": guidance_scale, "num_inference_steps": num_inference_steps}) | |
elapsed_time = time.time() - start_time | |
print("LCM inference time: ", elapsed_time, "seconds") | |
return paths, seed | |
def generate_i2i( | |
prompt: str, | |
image: PipelineImageInput = None, | |
strength: float = 0.8, | |
seed: int = 0, | |
guidance_scale: float = 8.0, | |
num_inference_steps: int = 4, | |
num_images: int = 4, | |
randomize_seed: bool = False, | |
use_fp16: bool = True, | |
use_torch_compile: bool = False, | |
use_cpu: bool = False, | |
progress=gr.Progress(track_tqdm=True), | |
width: Optional[int] = 512, | |
height: Optional[int] = 512, | |
) -> Image.Image: | |
seed = randomize_seed_fn(seed, randomize_seed) | |
torch.manual_seed(seed) | |
selected_device = 'cuda' | |
if use_cpu: | |
selected_device = "cpu" | |
if use_fp16: | |
use_fp16 = False | |
print("LCM warning: running on CPU, overrode FP16 with FP32") | |
global pipe, scheduler | |
pipe = LatentConsistencyModelImg2ImgPipeline( | |
vae= pipe.vae, | |
text_encoder = pipe.text_encoder, | |
tokenizer = pipe.tokenizer, | |
unet = pipe.unet, | |
scheduler = None, #scheduler, | |
safety_checker = pipe.safety_checker, | |
feature_extractor = pipe.feature_extractor, | |
requires_safety_checker = False, | |
) | |
# pipe = LatentConsistencyModelImg2ImgPipeline.from_pretrained( | |
# "SimianLuo/LCM_Dreamshaper_v7", safety_checker = None) | |
if use_fp16: | |
pipe.to(torch_device=selected_device, torch_dtype=torch.float16) | |
else: | |
pipe.to(torch_device=selected_device, torch_dtype=torch.float32) | |
# Windows does not support torch.compile for now | |
if os.name != 'nt' and use_torch_compile: | |
pipe.unet = torch.compile(pipe.unet, mode='max-autotune') | |
width, height = image.size | |
start_time = time.time() | |
result = pipe( | |
prompt=prompt, | |
image=image, | |
strength=strength, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=num_inference_steps, | |
num_images_per_prompt=num_images, | |
original_inference_steps=50, | |
output_type="pil", | |
device = selected_device | |
).images | |
paths = save_images(result, metadata={"prompt": prompt, "seed": seed, "width": width, | |
"height": height, "guidance_scale": guidance_scale, "num_inference_steps": num_inference_steps}) | |
elapsed_time = time.time() - start_time | |
print("LCM inference time: ", elapsed_time, "seconds") | |
return paths, seed | |
import cv2 | |
def video_to_frames(video_path): | |
# Open the video file | |
cap = cv2.VideoCapture(video_path) | |
# Check if the video opened successfully | |
if not cap.isOpened(): | |
print("Error: LCM Could not open video.") | |
return | |
# Read frames from the video | |
pil_images = [] | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Convert BGR to RGB (OpenCV uses BGR by default) | |
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Convert numpy array to PIL Image | |
pil_image = Image.fromarray(rgb_frame) | |
# Append the PIL Image to the list | |
pil_images.append(pil_image) | |
# Release the video capture object | |
cap.release() | |
return pil_images | |
def frames_to_video(pil_images, output_path, fps): | |
if not pil_images: | |
print("Error: No images to convert.") | |
return | |
img_array = [] | |
for pil_image in pil_images: | |
img_array.append(np.array(pil_image)) | |
height, width, layers = img_array[0].shape | |
size = (width, height) | |
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, size) | |
for i in range(len(img_array)): | |
out.write(cv2.cvtColor(img_array[i], cv2.COLOR_RGB2BGR)) | |
out.release() | |
def generate_v2v( | |
prompt: str, | |
video: any = None, | |
strength: float = 0.8, | |
seed: int = 0, | |
guidance_scale: float = 8.0, | |
num_inference_steps: int = 4, | |
randomize_seed: bool = False, | |
use_fp16: bool = True, | |
use_torch_compile: bool = False, | |
use_cpu: bool = False, | |
fps: int = 10, | |
save_frames: bool = False, | |
# progress=gr.Progress(track_tqdm=True), | |
width: Optional[int] = 512, | |
height: Optional[int] = 512, | |
num_images: Optional[int] = 1, | |
) -> Image.Image: | |
seed = randomize_seed_fn(seed, randomize_seed) | |
torch.manual_seed(seed) | |
selected_device = 'cuda' | |
if use_cpu: | |
selected_device = "cpu" | |
if use_fp16: | |
use_fp16 = False | |
print("LCM warning: running on CPU, overrode FP16 with FP32") | |
global pipe, scheduler | |
pipe = LatentConsistencyModelImg2ImgPipeline( | |
vae= pipe.vae, | |
text_encoder = pipe.text_encoder, | |
tokenizer = pipe.tokenizer, | |
unet = pipe.unet, | |
scheduler = None, | |
safety_checker = pipe.safety_checker, | |
feature_extractor = pipe.feature_extractor, | |
requires_safety_checker = False, | |
) | |
# pipe = LatentConsistencyModelImg2ImgPipeline.from_pretrained( | |
# "SimianLuo/LCM_Dreamshaper_v7", safety_checker = None) | |
if use_fp16: | |
pipe.to(torch_device=selected_device, torch_dtype=torch.float16) | |
else: | |
pipe.to(torch_device=selected_device, torch_dtype=torch.float32) | |
# Windows does not support torch.compile for now | |
if os.name != 'nt' and use_torch_compile: | |
pipe.unet = torch.compile(pipe.unet, mode='max-autotune') | |
frames = video_to_frames(video) | |
if frames is None: | |
print("Error: LCM could not convert video.") | |
return | |
width, height = frames[0].size | |
start_time = time.time() | |
results = [] | |
for frame in frames: | |
result = pipe( | |
prompt=prompt, | |
image=frame, | |
strength=strength, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=num_inference_steps, | |
num_images_per_prompt=1, | |
original_inference_steps=50, | |
output_type="pil", | |
device = selected_device | |
).images | |
if save_frames: | |
paths = save_images(result, metadata={"prompt": prompt, "seed": seed, "width": width, | |
"height": height, "guidance_scale": guidance_scale, "num_inference_steps": num_inference_steps}) | |
results.extend(result) | |
elapsed_time = time.time() - start_time | |
print("LCM vid2vid inference complete! Processing", len(frames), "frames took", elapsed_time, "seconds") | |
save_dir = './outputs/LCM-vid2vid/' | |
Path(save_dir).mkdir(exist_ok=True, parents=True) | |
unique_id = uuid.uuid4() | |
_, input_ext = os.path.splitext(video) | |
output_path = save_dir + f"{unique_id}-{seed}" + f"{input_ext}" | |
frames_to_video(results, output_path, fps) | |
return output_path | |
examples = [ | |
"portrait photo of a girl, photograph, highly detailed face, depth of field, moody light, golden hour, style by Dan Winters, Russell James, Steve McCurry, centered, extremely detailed, Nikon D850, award winning photography", | |
"Self-portrait oil painting, a beautiful cyborg with golden hair, 8k", | |
"Astronaut in a jungle, cold color palette, muted colors, detailed, 8k", | |
"A photo of beautiful mountain with realistic sunset and blue lake, highly detailed, masterpiece", | |
] | |
with gr.Blocks() as lcm: | |
with gr.Tab("LCM txt2img"): | |
gr.Markdown("Latent Consistency Models: Synthesizing High-Resolution Images with Few-step Inference") | |
gr.Markdown("Try the guide on Colab's free tier [![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/R3gm/InsightSolver-Colab/blob/main/Latent_Consistency_Models.ipynb)") | |
with gr.Row(): | |
prompt = gr.Textbox(label="Prompt", | |
show_label=False, | |
lines=3, | |
placeholder="Prompt", | |
elem_classes=["prompt"]) | |
run_button = gr.Button("Run", scale=0) | |
with gr.Row(): | |
result = gr.Gallery( | |
label="Generated images", show_label=False, elem_id="gallery", grid=[2], preview=True | |
) | |
with gr.Accordion("Advanced options", open=False): | |
seed = gr.Slider( | |
label="Seed", | |
minimum=0, | |
maximum=MAX_SEED, | |
step=1, | |
value=0, | |
randomize=True | |
) | |
randomize_seed = gr.Checkbox( | |
label="Randomize seed across runs", value=True) | |
use_fp16 = gr.Checkbox( | |
label="Run LCM in fp16 (for lower VRAM)", value=False) | |
use_torch_compile = gr.Checkbox( | |
label="Run LCM with torch.compile (currently not supported on Windows)", value=False) | |
use_cpu = gr.Checkbox(label="Run LCM on CPU", value=True) | |
with gr.Row(): | |
width = gr.Slider( | |
label="Width", | |
minimum=256, | |
maximum=MAX_IMAGE_SIZE, | |
step=32, | |
value=512, | |
) | |
height = gr.Slider( | |
label="Height", | |
minimum=256, | |
maximum=MAX_IMAGE_SIZE, | |
step=32, | |
value=512, | |
) | |
with gr.Row(): | |
guidance_scale = gr.Slider( | |
label="Guidance scale for base", | |
minimum=2, | |
maximum=14, | |
step=0.1, | |
value=8.0, | |
) | |
num_inference_steps = gr.Slider( | |
label="Number of inference steps for base", | |
minimum=1, | |
maximum=8, | |
step=1, | |
value=4, | |
) | |
with gr.Row(): | |
num_images = gr.Slider( | |
label="Number of images (batch count)", | |
minimum=1, | |
maximum=int(os.getenv("MAX_NUM_IMAGES")), | |
step=1, | |
value=1, | |
) | |
gr.Examples( | |
examples=examples, | |
inputs=prompt, | |
outputs=result, | |
fn=generate | |
) | |
run_button.click( | |
fn=generate, | |
inputs=[ | |
prompt, | |
seed, | |
width, | |
height, | |
guidance_scale, | |
num_inference_steps, | |
num_images, | |
randomize_seed, | |
use_fp16, | |
use_torch_compile, | |
use_cpu | |
], | |
outputs=[result, seed], | |
) | |
with gr.Tab("LCM img2img"): | |
with gr.Row(): | |
prompt = gr.Textbox(label="Prompt", | |
show_label=False, | |
lines=3, | |
placeholder="Prompt", | |
elem_classes=["prompt"]) | |
run_i2i_button = gr.Button("Run", scale=0) | |
with gr.Row(): | |
image_input = gr.Image(label="Upload your Image", type="pil") | |
result = gr.Gallery( | |
label="Generated images", | |
show_label=False, | |
elem_id="gallery", | |
preview=True | |
) | |
with gr.Accordion("Advanced options", open=False): | |
seed = gr.Slider( | |
label="Seed", | |
minimum=0, | |
maximum=MAX_SEED, | |
step=1, | |
value=0, | |
randomize=True | |
) | |
randomize_seed = gr.Checkbox( | |
label="Randomize seed across runs", value=True) | |
use_fp16 = gr.Checkbox( | |
label="Run LCM in fp16 (for lower VRAM)", value=False) | |
use_torch_compile = gr.Checkbox( | |
label="Run LCM with torch.compile (currently not supported on Windows)", value=False) | |
use_cpu = gr.Checkbox(label="Run LCM on CPU", value=True) | |
with gr.Row(): | |
guidance_scale = gr.Slider( | |
label="Guidance scale for base", | |
minimum=2, | |
maximum=14, | |
step=0.1, | |
value=8.0, | |
) | |
num_inference_steps = gr.Slider( | |
label="Number of inference steps for base", | |
minimum=1, | |
maximum=8, | |
step=1, | |
value=4, | |
) | |
with gr.Row(): | |
num_images = gr.Slider( | |
label="Number of images (batch count)", | |
minimum=1, | |
maximum=int(os.getenv("MAX_NUM_IMAGES")), | |
step=1, | |
value=1, | |
) | |
strength = gr.Slider( | |
label="Prompt Strength", | |
minimum=0.1, | |
maximum=1.0, | |
step=0.1, | |
value=0.5, | |
) | |
run_i2i_button.click( | |
fn=generate_i2i, | |
inputs=[ | |
prompt, | |
image_input, | |
strength, | |
seed, | |
guidance_scale, | |
num_inference_steps, | |
num_images, | |
randomize_seed, | |
use_fp16, | |
use_torch_compile, | |
use_cpu | |
], | |
outputs=[result, seed], | |
) | |
with gr.Tab("LCM vid2vid"): | |
show_v2v = False if os.getenv("SHOW_VID2VID") == "NO" else True | |
gr.Markdown("Not recommended for use with CPU. Duplicate the space and modify SHOW_VID2VID to enable it. 🚫💻") | |
with gr.Tabs(visible=show_v2v) as tabs: | |
#with gr.Tab("", visible=show_v2v): | |
with gr.Row(): | |
prompt = gr.Textbox(label="Prompt", | |
show_label=False, | |
lines=3, | |
placeholder="Prompt", | |
elem_classes=["prompt"]) | |
run_v2v_button = gr.Button("Run", scale=0) | |
with gr.Row(): | |
video_input = gr.Video(label="Source Video") | |
video_output = gr.Video(label="Generated Video") | |
with gr.Accordion("Advanced options", open=False): | |
seed = gr.Slider( | |
label="Seed", | |
minimum=0, | |
maximum=MAX_SEED, | |
step=1, | |
value=0, | |
randomize=True | |
) | |
randomize_seed = gr.Checkbox( | |
label="Randomize seed across runs", value=True) | |
use_fp16 = gr.Checkbox( | |
label="Run LCM in fp16 (for lower VRAM)", value=False) | |
use_torch_compile = gr.Checkbox( | |
label="Run LCM with torch.compile (currently not supported on Windows)", value=False) | |
use_cpu = gr.Checkbox(label="Run LCM on CPU", value=True) | |
save_frames = gr.Checkbox(label="Save intermediate frames", value=False) | |
with gr.Row(): | |
guidance_scale = gr.Slider( | |
label="Guidance scale for base", | |
minimum=2, | |
maximum=14, | |
step=0.1, | |
value=8.0, | |
) | |
num_inference_steps = gr.Slider( | |
label="Number of inference steps for base", | |
minimum=1, | |
maximum=8, | |
step=1, | |
value=4, | |
) | |
with gr.Row(): | |
fps = gr.Slider( | |
label="Output FPS", | |
minimum=1, | |
maximum=200, | |
step=1, | |
value=10, | |
) | |
strength = gr.Slider( | |
label="Prompt Strength", | |
minimum=0.1, | |
maximum=1.0, | |
step=0.05, | |
value=0.5, | |
) | |
run_v2v_button.click( | |
fn=generate_v2v, | |
inputs=[ | |
prompt, | |
video_input, | |
strength, | |
seed, | |
guidance_scale, | |
num_inference_steps, | |
randomize_seed, | |
use_fp16, | |
use_torch_compile, | |
use_cpu, | |
fps, | |
save_frames | |
], | |
outputs=video_output, | |
) | |
if __name__ == "__main__": | |
lcm.queue().launch() |