ImgCleaner / app.py
yizhangliu's picture
Update app.py
72b7fc6
raw
history blame
18 kB
import gradio as gr
import PIL
from PIL import Image
import numpy as np
import os
import uuid
import torch
from torch import autocast
import cv2
from io import BytesIO
import requests
import PIL
from PIL import Image
import numpy as np
import os
import uuid
import torch
from torch import autocast
import cv2
from matplotlib import pyplot as plt
from torchvision import transforms
from diffusers import DiffusionPipeline
import io
import logging
import multiprocessing
import random
import time
import imghdr
from pathlib import Path
from typing import Union
from loguru import logger
from lama_cleaner.model_manager import ModelManager
from lama_cleaner.schema import Config
try:
torch._C._jit_override_can_fuse_on_cpu(False)
torch._C._jit_override_can_fuse_on_gpu(False)
torch._C._jit_set_texpr_fuser_enabled(False)
torch._C._jit_set_nvfuser_enabled(False)
except:
pass
from lama_cleaner.helper import (
load_img,
numpy_to_bytes,
resize_max_size,
)
NUM_THREADS = str(multiprocessing.cpu_count())
# fix libomp problem on windows https://github.com/Sanster/lama-cleaner/issues/56
os.environ["KMP_DUPLICATE_LIB_OK"] = "True"
os.environ["OMP_NUM_THREADS"] = NUM_THREADS
os.environ["OPENBLAS_NUM_THREADS"] = NUM_THREADS
os.environ["MKL_NUM_THREADS"] = NUM_THREADS
os.environ["VECLIB_MAXIMUM_THREADS"] = NUM_THREADS
os.environ["NUMEXPR_NUM_THREADS"] = NUM_THREADS
if os.environ.get("CACHE_DIR"):
os.environ["TORCH_HOME"] = os.environ["CACHE_DIR"]
os.environ["TORCH_HOME"] = './'
BUILD_DIR = os.environ.get("LAMA_CLEANER_BUILD_DIR", "app/build")
from share_btn import community_icon_html, loading_icon_html, share_js
HF_TOKEN_SD = os.environ.get('HF_TOKEN_SD')
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f'device = {device}')
def get_image_ext(img_bytes):
w = imghdr.what("", img_bytes)
if w is None:
w = "jpeg"
return w
def diffuser_callback(i, t, latents):
pass
def preprocess_image(image):
w, h = image.size
w, h = map(lambda x: x - x % 32, (w, h)) # resize to integer multiple of 32
image = image.resize((w, h), resample=PIL.Image.LANCZOS)
image = np.array(image).astype(np.float32) / 255.0
image = image[None].transpose(0, 3, 1, 2)
image = torch.from_numpy(image)
return 2.0 * image - 1.0
def preprocess_mask(mask):
mask = mask.convert("L")
w, h = mask.size
w, h = map(lambda x: x - x % 32, (w, h)) # resize to integer multiple of 32
mask = mask.resize((w // 8, h // 8), resample=PIL.Image.NEAREST)
mask = np.array(mask).astype(np.float32) / 255.0
mask = np.tile(mask, (4, 1, 1))
mask = mask[None].transpose(0, 1, 2, 3) # what does this step do?
mask = 1 - mask # repaint white, keep black
mask = torch.from_numpy(mask)
return mask
def load_img_1_(nparr, gray: bool = False):
# alpha_channel = None
# nparr = np.frombuffer(img_bytes, np.uint8)
if gray:
np_img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)
else:
np_img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED)
if len(np_img.shape) == 3 and np_img.shape[2] == 4:
alpha_channel = np_img[:, :, -1]
np_img = cv2.cvtColor(np_img, cv2.COLOR_BGRA2RGB)
else:
np_img = cv2.cvtColor(np_img, cv2.COLOR_BGR2RGB)
return np_img, alpha_channel
model = None
def model_process_pil(input):
global model
# input = request.files
# RGB
# origin_image_bytes = input["image"].read()
image_pil = input['image']
mask_pil = input['mask']
image = np.array(image_pil)
mask = np.array(mask_pil.convert("L"))
# print(f'image_pil_ = {type(image_pil)}')
# print(f'mask_pil_ = {type(mask_pil)}')
# mask_pil.save(f'./mask_pil.png')
#image, alpha_channel = load_img(image)
# Origin image shape: (512, 512, 3)
alpha_channel = (np.ones((image.shape[0],image.shape[1]))*255).astype(np.uint8)
original_shape = image.shape
interpolation = cv2.INTER_CUBIC
# form = request.form
print(f'liuyz_3_here_', original_shape, alpha_channel, image.dtype, mask.dtype)
size_limit = "Original" # image.shape[1] # : Union[int, str] = form.get("sizeLimit", "1080")
if size_limit == "Original":
size_limit = max(image.shape)
else:
size_limit = int(size_limit)
config = Config(
ldm_steps=25,
ldm_sampler='plms',
zits_wireframe=True,
hd_strategy='Original',
hd_strategy_crop_margin=196,
hd_strategy_crop_trigger_size=1280,
hd_strategy_resize_limit=2048,
prompt='',
use_croper=False,
croper_x=0,
croper_y=0,
croper_height=512,
croper_width=512,
sd_mask_blur=5,
sd_strength=0.75,
sd_steps=50,
sd_guidance_scale=7.5,
sd_sampler='ddim',
sd_seed=42,
cv2_flag='INPAINT_NS',
cv2_radius=5,
)
# print(f'config = {config}')
print(f'config/alpha_channel/size_limit = {config} / {alpha_channel} / {size_limit}')
if config.sd_seed == -1:
config.sd_seed = random.randint(1, 999999999)
# logger.info(f"Origin image shape: {original_shape}")
print(f"Origin image shape: {original_shape} / {image[250][250]}")
image = resize_max_size(image, size_limit=size_limit, interpolation=interpolation)
# logger.info(f"Resized image shape: {image.shape}")
print(f"Resized image shape: {image.shape} / {image[250][250]} / {image.dtype}")
# mask, _ = load_img(mask, gray=True)
#mask = np.array(mask_pil)
mask = resize_max_size(mask, size_limit=size_limit, interpolation=interpolation)
print(f"mask image shape: {mask.shape} / {type(mask)} / {mask[250][250]} / {mask.dtype}")
if model is None:
return None
start = time.time()
res_np_img = model(image, mask, config)
logger.info(f"process time: {(time.time() - start) * 1000}ms, {res_np_img.shape}")
print(f"process time_1_: {(time.time() - start) * 1000}ms, {alpha_channel.shape}, {res_np_img.shape} / {res_np_img[250][250]} / {res_np_img.dtype}")
torch.cuda.empty_cache()
if alpha_channel is not None:
print(f"liuyz_here_10_: {alpha_channel.shape} / {alpha_channel.dtype} / {res_np_img.dtype}")
if alpha_channel.shape[:2] != res_np_img.shape[:2]:
print(f"liuyz_here_20_: {alpha_channel.shape} / {res_np_img.shape}")
alpha_channel = cv2.resize(
alpha_channel, dsize=(res_np_img.shape[1], res_np_img.shape[0])
)
print(f"liuyz_here_30_: {alpha_channel.shape} / {res_np_img.shape} / {alpha_channel.dtype} / {res_np_img.dtype}")
res_np_img = np.concatenate(
(res_np_img, alpha_channel[:, :, np.newaxis]), axis=-1
)
print(f"liuyz_here_40_: {alpha_channel.shape} / {res_np_img.shape} / {alpha_channel.dtype} / {res_np_img.dtype}")
print(f"process time_2_: {(time.time() - start) * 1000}ms, {alpha_channel.shape}, {res_np_img.shape} / {res_np_img[250][250]} / {res_np_img.dtype}")
ext = 'png'
image = Image.open(io.BytesIO(numpy_to_bytes(res_np_img, ext)))
image.save(f'./result_image.png')
return image # res_np_img.astype(np.uint8) # image
'''
ext = get_image_ext(origin_image_bytes)
return ext
'''
def model_process_filepath(input): #image, mask):
global model
# {'image': '/tmp/tmp8mn9xw93.png', 'mask': '/tmp/tmpn5ars4te.png'}
# input = request.files
# RGB
origin_image_bytes = read_content(input["image"])
print(f'origin_image_bytes = ', type(origin_image_bytes), len(origin_image_bytes))
image, alpha_channel = load_img(origin_image_bytes)
alpha_channel = (np.ones((image.shape[0],image.shape[1]))*255).astype(np.uint8)
original_shape = image.shape
interpolation = cv2.INTER_CUBIC
image_pil = Image.fromarray(image)
# mask_pil = Image.fromarray(mask).convert("L")
# form = request.form
# print(f'size_limit_1_ = ', form["sizeLimit"], type(input["image"]))
size_limit = "Original" #: Union[int, str] = form.get("sizeLimit", "1080")
print(f'size_limit_2_ = {size_limit}')
if size_limit == "Original":
size_limit = max(image.shape)
else:
size_limit = int(size_limit)
print(f'size_limit_3_ = {size_limit}')
config = Config(
ldm_steps=25,
ldm_sampler='plms',
zits_wireframe=True,
hd_strategy='Original',
hd_strategy_crop_margin=196,
hd_strategy_crop_trigger_size=1280,
hd_strategy_resize_limit=2048,
prompt='',
use_croper=False,
croper_x=0,
croper_y=0,
croper_height=512,
croper_width=512,
sd_mask_blur=5,
sd_strength=0.75,
sd_steps=50,
sd_guidance_scale=7.5,
sd_sampler='ddim',
sd_seed=42,
cv2_flag='INPAINT_NS',
cv2_radius=5,
)
print(f'config/alpha_channel/size_limit = {config} / {alpha_channel} / {size_limit}')
if config.sd_seed == -1:
config.sd_seed = random.randint(1, 999999999)
logger.info(f"Origin image shape: {original_shape}")
print(f"Origin image shape: {original_shape} / {image[250][250]}")
image = resize_max_size(image, size_limit=size_limit, interpolation=interpolation)
logger.info(f"Resized image shape: {image.shape} / {type(image)}")
print(f"Resized image shape: {image.shape} / {image[250][250]}")
mask, _ = load_img(read_content(input["mask"]), gray=True)
mask = resize_max_size(mask, size_limit=size_limit, interpolation=interpolation)
print(f"mask image shape: {mask.shape} / {type(mask)} / {mask[250][250]} / {alpha_channel}")
if model is None:
return None
start = time.time()
res_np_img = model(image, mask, config)
logger.info(f"process time: {(time.time() - start) * 1000}ms, {res_np_img.shape}")
print(f"process time_1_: {(time.time() - start) * 1000}ms, {alpha_channel.shape}, {res_np_img.shape} / {res_np_img[250][250]} / {res_np_img.dtype}")
torch.cuda.empty_cache()
if alpha_channel is not None:
print(f"liuyz_here_10_: {alpha_channel.shape} / {alpha_channel.dtype} / {res_np_img.dtype}")
if alpha_channel.shape[:2] != res_np_img.shape[:2]:
print(f"liuyz_here_20_: {alpha_channel.shape} / {res_np_img.shape}")
alpha_channel = cv2.resize(
alpha_channel, dsize=(res_np_img.shape[1], res_np_img.shape[0])
)
print(f"liuyz_here_30_: {alpha_channel.shape} / {res_np_img.shape} / {alpha_channel.dtype} / {res_np_img.dtype}")
res_np_img = np.concatenate(
(res_np_img, alpha_channel[:, :, np.newaxis]), axis=-1
)
print(f"liuyz_here_40_: {alpha_channel.shape} / {res_np_img.shape} / {alpha_channel.dtype} / {res_np_img.dtype}")
ext = get_image_ext(origin_image_bytes)
print(f"process time_2_: {(time.time() - start) * 1000}ms, {alpha_channel.shape}, {res_np_img.shape} / {res_np_img[250][250]} / {res_np_img.dtype} /{ext}")
image = Image.open(io.BytesIO(numpy_to_bytes(res_np_img, ext)))
image.save(f'./result_image.png')
return image # image
'''
ext = get_image_ext(origin_image_bytes)
response = make_response(
send_file(
io.BytesIO(numpy_to_bytes(res_np_img, ext)),
mimetype=f"image/{ext}",
)
)
response.headers["X-Seed"] = str(config.sd_seed)
return response
'''
model = ModelManager(
name='lama',
device=device,
# hf_access_token=HF_TOKEN_SD,
# sd_disable_nsfw=False,
# sd_cpu_textencoder=True,
# sd_run_local=True,
# callback=diffuser_callback,
)
'''
pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-inpainting", dtype=torch.float16, revision="fp16", use_auth_token=auth_token).to(device)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
transforms.Resize((512, 512)),
])
'''
def read_content(file_path):
"""read the content of target file
"""
with open(file_path, 'rb') as f:
content = f.read()
return content
image_type = 'pil' #'filepath' #'pil'
def predict(input):
print(f'liuyz_0_', input)
'''
image_np = np.array(input["image"])
print(f'image_np = {image_np.shape}')
mask_np = np.array(input["mask"])
print(f'mask_np = {mask_np.shape}')
'''
'''
image = dict["image"] # .convert("RGB") #.resize((512, 512))
# target_size = (init_image.shape[0], init_image.shape[1])
print(f'liuyz_1_', image.shape)
print(f'liuyz_2_', image.convert("RGB").shape)
print(f'liuyz_3_', image.convert("RGB").resize((512, 512)).shape)
# mask = dict["mask"] # .convert("RGB") #.resize((512, 512))
'''
if image_type == 'filepath':
output = model_process_filepath(input) # dict["image"], dict["mask"])
elif image_type == 'pil':
output = model_process_pil(input)
# output = mask #output.images[0]
# output = pipe(prompt = prompt, image=init_image, mask_image=mask,guidance_scale=7.5)
# output = input["mask"]
# output = None
return output #, gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)
print(f'liuyz_500_here_')
css = '''
.container {max-width: 1150px;margin: auto;padding-top: 1.5rem}
#image_upload{min-height:512px}
#image_upload [data-testid="image"], #image_upload [data-testid="image"] > div{min-height: 512px}
#mask_radio .gr-form{background:transparent; border: none}
#word_mask{margin-top: .75em !important}
#word_mask textarea:disabled{opacity: 0.3}
.footer {margin-bottom: 45px;margin-top: 35px;text-align: center;border-bottom: 1px solid #e5e5e5}
.footer>p {font-size: .8rem; display: inline-block; padding: 0 10px;transform: translateY(10px);background: white}
.dark .footer {border-color: #303030}
.dark .footer>p {background: #0b0f19}
.acknowledgments h4{margin: 1.25em 0 .25em 0;font-weight: bold;font-size: 115%}
#image_upload .touch-none{display: flex}
@keyframes spin {
from {
transform: rotate(0deg);
}
to {
transform: rotate(360deg);
}
}
#share-btn-container {
display: flex; padding-left: 0.5rem !important; padding-right: 0.5rem !important; background-color: #000000; justify-content: center; align-items: center; border-radius: 9999px !important; width: 13rem;
}
#share-btn {
all: initial; color: #ffffff;font-weight: 600; cursor:pointer; font-family: 'IBM Plex Sans', sans-serif; margin-left: 0.5rem !important; padding-top: 0.25rem !important; padding-bottom: 0.25rem !important;
}
#share-btn * {
all: unset;
}
#share-btn-container div:nth-child(-n+2){
width: auto !important;
min-height: 0px !important;
}
#share-btn-container .wrap {
display: none !important;
}
'''
'''
sketchpad = Sketchpad()
imageupload = ImageUplaod()
interface = gr.Interface(fn=predict, inputs="image", outputs="image", sketchpad, imageupload)
interface.launch(share=True)
'''
'''
# gr.Interface(fn=predict, inputs="image", outputs="image").launch(share=True)
image = gr.Image(source='upload', tool='sketch', type="pil", label="Upload")# .style(height=400)
image_blocks = gr.Interface(
fn=predict,
inputs=image,
outputs=image,
# examples=[["cheetah.jpg"]],
)
image_blocks.launch(inline=True)
import gradio as gr
def greet(dict, name, is_morning, temperature):
image = dict['image']
target_size = (image.shape[0], image.shape[1])
print(f'liuyz_1_', target_size)
salutation = "Good morning" if is_morning else "Good evening"
greeting = f"{salutation} {name}. It is {temperature} degrees today"
celsius = (temperature - 32) * 5 / 9
return image, greeting, round(celsius, 2)
image = gr.Image(source='upload', tool='sketch', label="上传")# .style(height=400)
demo = gr.Interface(
fn=greet,
inputs=[image, "text", "checkbox", gr.Slider(0, 100)],
outputs=['image', "text", "number"],
)
demo.launch()
'''
image_blocks = gr.Blocks(css=css)
with image_blocks as demo:
# gr.HTML(read_content("header.html"))
with gr.Group():
with gr.Box():
with gr.Row():
with gr.Column():
image = gr.Image(source='upload', tool='sketch',type=f'{image_type}', label="Upload").style(height=512)
with gr.Row(elem_id="prompt-container").style(mobile_collapse=False, equal_height=True):
# prompt = gr.Textbox(placeholder = 'Your prompt (what you want in place of what is erased)', show_label=False, elem_id="input-text")
btn = gr.Button("Done!").style(
margin=True,
rounded=(True, True, True, True),
full_width=True,
)
with gr.Column():
image_out = gr.Image(label="Output").style(height=512)
'''
with gr.Group(elem_id="share-btn-container"):
community_icon = gr.HTML(community_icon_html, visible=False)
loading_icon = gr.HTML(loading_icon_html, visible=False)
share_button = gr.Button("Share to community", elem_id="share-btn", visible=False)
'''
# btn.click(fn=predict, inputs=[image, prompt], outputs=[image_out, community_icon, loading_icon, share_button])
btn.click(fn=predict, inputs=[image], outputs=[image_out]) #, community_icon, loading_icon, share_button])
#share_button.click(None, [], [], _js=share_js)
image_blocks.launch()