Spaces:
Running
on
Zero
Running
on
Zero
import logging | |
import random | |
import warnings | |
import os | |
import gradio as gr | |
import numpy as np | |
import spaces | |
import torch | |
from diffusers import FluxControlNetModel | |
from diffusers.pipelines import FluxControlNetPipeline | |
from gradio_imageslider import ImageSlider | |
from PIL import Image | |
from huggingface_hub import snapshot_download | |
css = """ | |
#col-container { | |
margin: 0 auto; | |
max-width: 512px; | |
} | |
""" | |
# Device and dtype setup | |
if torch.cuda.is_available(): | |
power_device = "GPU" | |
device = "cuda" | |
dtype = torch.bfloat16 | |
else: | |
power_device = "CPU" | |
device = "cpu" | |
dtype = torch.float32 | |
huggingface_token = os.getenv("HUGGINFACE_TOKEN") | |
model_path = snapshot_download( | |
repo_id="black-forest-labs/FLUX.1-dev", | |
repo_type="model", | |
ignore_patterns=["*.md", "*..gitattributes"], | |
local_dir="FLUX.1-dev", | |
token=huggingface_token, | |
) | |
# Load pipeline with memory optimizations | |
controlnet = FluxControlNetModel.from_pretrained( | |
"jasperai/Flux.1-dev-Controlnet-Upscaler", | |
torch_dtype=dtype | |
).to(device) | |
pipe = FluxControlNetPipeline.from_pretrained( | |
model_path, | |
controlnet=controlnet, | |
torch_dtype=dtype | |
) | |
pipe.to(device) | |
# Enable memory optimizations | |
pipe.enable_model_cpu_offload() | |
pipe.enable_attention_slicing() | |
MAX_SEED = 1000000 | |
MAX_PIXEL_BUDGET = 512 * 512 # Reduced from 1024 * 1024 | |
def check_resources(): | |
if torch.cuda.is_available(): | |
gpu_memory = torch.cuda.get_device_properties(0).total_memory | |
memory_allocated = torch.cuda.memory_allocated(0) | |
if memory_allocated/gpu_memory > 0.9: # 90% threshold | |
return False | |
return True | |
def process_input(input_image, upscale_factor, **kwargs): | |
w, h = input_image.size | |
w_original, h_original = w, h | |
aspect_ratio = w / h | |
was_resized = False | |
if w * h * upscale_factor**2 > MAX_PIXEL_BUDGET: | |
warnings.warn( | |
f"Requested output image is too large ({w * upscale_factor}x{h * upscale_factor}). Resizing to ({int(aspect_ratio * MAX_PIXEL_BUDGET ** 0.5 // upscale_factor), int(MAX_PIXEL_BUDGET ** 0.5 // aspect_ratio // upscale_factor)}) pixels." | |
) | |
gr.Info( | |
f"Requested output image is too large ({w * upscale_factor}x{h * upscale_factor}). Resizing input to ({int(aspect_ratio * MAX_PIXEL_BUDGET ** 0.5 // upscale_factor), int(MAX_PIXEL_BUDGET ** 0.5 // aspect_ratio // upscale_factor)}) pixels budget." | |
) | |
input_image = input_image.resize( | |
( | |
int(aspect_ratio * MAX_PIXEL_BUDGET**0.5 // upscale_factor), | |
int(MAX_PIXEL_BUDGET**0.5 // aspect_ratio // upscale_factor), | |
) | |
) | |
was_resized = True | |
# resize to multiple of 8 | |
w, h = input_image.size | |
w = w - w % 8 | |
h = h - h % 8 | |
return input_image.resize((w, h)), w_original, h_original, was_resized | |
def infer( | |
seed, | |
randomize_seed, | |
input_image, | |
num_inference_steps, | |
upscale_factor, | |
controlnet_conditioning_scale, | |
progress=gr.Progress(track_tqdm=True), | |
): | |
try: | |
if not check_resources(): | |
gr.Warning("System resources are running low. Try reducing parameters.") | |
return None | |
if device == "cuda": | |
torch.cuda.empty_cache() | |
if randomize_seed: | |
seed = random.randint(0, MAX_SEED) | |
true_input_image = input_image | |
input_image, w_original, h_original, was_resized = process_input( | |
input_image, upscale_factor | |
) | |
# rescale with upscale factor | |
w, h = input_image.size | |
control_image = input_image.resize((w * upscale_factor, h * upscale_factor)) | |
generator = torch.Generator().manual_seed(seed) | |
gr.Info("Upscaling image...") | |
image = pipe( | |
prompt="", | |
control_image=control_image, | |
controlnet_conditioning_scale=controlnet_conditioning_scale, | |
num_inference_steps=num_inference_steps, | |
guidance_scale=3.5, | |
height=control_image.size[1], | |
width=control_image.size[0], | |
generator=generator, | |
).images[0] | |
if was_resized: | |
gr.Info( | |
f"Resizing output image to targeted {w_original * upscale_factor}x{h_original * upscale_factor} size." | |
) | |
# resize to target desired size | |
image = image.resize((w_original * upscale_factor, h_original * upscale_factor)) | |
image.save("output.jpg") | |
return [true_input_image, image, seed] | |
except RuntimeError as e: | |
if "out of memory" in str(e): | |
gr.Warning("Not enough GPU memory. Try reducing the upscale factor or image size.") | |
return None | |
raise e | |
except Exception as e: | |
gr.Error(f"An error occurred: {str(e)}") | |
return None | |
with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", css=css) as demo: | |
with gr.Row(): | |
run_button = gr.Button(value="Run") | |
with gr.Row(): | |
with gr.Column(scale=4): | |
input_im = gr.Image(label="Input Image", type="pil") | |
with gr.Column(scale=1): | |
num_inference_steps = gr.Slider( | |
label="Number of Inference Steps", | |
minimum=8, | |
maximum=50, | |
step=1, | |
value=28, | |
) | |
upscale_factor = gr.Slider( | |
label="Upscale Factor", | |
minimum=1, | |
maximum=2, # Reduced from 4 | |
step=1, | |
value=2, # Reduced default | |
) | |
controlnet_conditioning_scale = gr.Slider( | |
label="Controlnet Conditioning Scale", | |
minimum=0.1, | |
maximum=1.5, | |
step=0.1, | |
value=0.6, | |
) | |
seed = gr.Slider( | |
label="Seed", | |
minimum=0, | |
maximum=MAX_SEED, | |
step=1, | |
value=42, | |
) | |
randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
with gr.Row(): | |
result = ImageSlider(label="Input / Output", type="pil", interactive=True) | |
examples = gr.Examples( | |
examples=[ | |
[42, False, "z1.webp", 28, 2, 0.6], # Updated upscale factor | |
[42, False, "z2.webp", 28, 2, 0.6], # Updated upscale factor | |
], | |
inputs=[ | |
seed, | |
randomize_seed, | |
input_im, | |
num_inference_steps, | |
upscale_factor, | |
controlnet_conditioning_scale, | |
], | |
fn=infer, | |
outputs=result, | |
cache_examples="lazy", | |
) | |
gr.on( | |
[run_button.click], | |
fn=infer, | |
inputs=[ | |
seed, | |
randomize_seed, | |
input_im, | |
num_inference_steps, | |
upscale_factor, | |
controlnet_conditioning_scale, | |
], | |
outputs=result, | |
show_api=False, | |
) | |
demo.queue().launch(share=False) |