Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import cv2 | |
import time | |
import torch | |
import imageio | |
import numpy as np | |
from PIL import Image | |
from transformers import CLIPTextModel, CLIPTokenizer | |
from diffusers import DDIMScheduler, AutoencoderKL, DDIMInverseScheduler | |
from models.pipeline_flatten import FlattenPipeline | |
from models.util import sample_trajectories | |
from models.unet import UNet3DConditionModel | |
def init_pipeline(device): | |
dtype = torch.float16 | |
sd_path = "stabilityai/stable-diffusion-2-1-base" | |
UNET_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "checkpoints", "unet") | |
unet = UNet3DConditionModel.from_pretrained_2d(UNET_PATH, dtype=torch.float16) | |
# unet = UNet3DConditionModel.from_pretrained_2d(sd_path, subfolder="unet").to(dtype=torch.float16) | |
vae = AutoencoderKL.from_pretrained(sd_path, subfolder="vae").to(dtype=torch.float16) | |
tokenizer = CLIPTokenizer.from_pretrained(sd_path, subfolder="tokenizer", dtype=dtype) | |
text_encoder = CLIPTextModel.from_pretrained(sd_path, subfolder="text_encoder").to(dtype=torch.float16) | |
scheduler = DDIMScheduler.from_pretrained(sd_path, subfolder="scheduler") | |
inverse = DDIMInverseScheduler.from_pretrained(sd_path, subfolder="scheduler") | |
pipe = FlattenPipeline(vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, unet=unet, scheduler=scheduler, inverse_scheduler=inverse) | |
pipe.enable_vae_slicing() | |
pipe.to(device) | |
return pipe | |
height = 512 | |
width = 512 | |
sample_steps = 50 | |
inject_step = 40 | |
device = torch.device("cuda") if torch.cuda.is_available() else "cpu" | |
pipe = init_pipeline(device) | |
def inference( | |
seed : int = 66, | |
prompt : str = None, | |
neg_prompt : str = "", | |
guidance_scale: float = 10.0, | |
video_length : int = 16, | |
video_path : str = None, | |
output_dir : str = None, | |
frame_rate : int = 1, | |
fps : int = 15, | |
old_qk : int = 0, | |
): | |
generator = torch.Generator(device=device) | |
generator.manual_seed(seed) | |
# xformers should be used here to support ZeroGPU? | |
pipe.enable_xformers_memory_efficient_attention() | |
# read the source video | |
video_reader = imageio.get_reader(video_path, "ffmpeg") | |
video = [] | |
for frame in video_reader: | |
if len(video) >= video_length: | |
break | |
video.append(cv2.resize(frame, (width, height))) # .transpose(2, 0, 1)) | |
real_frames = [Image.fromarray(frame) for frame in video] | |
# compute optical flows and sample trajectories | |
trajectories = sample_trajectories(torch.tensor(np.array(video)).permute(0, 3, 1, 2), device) | |
torch.cuda.empty_cache() | |
for k in trajectories.keys(): | |
trajectories[k] = trajectories[k].to(device) | |
sample = ( | |
pipe( | |
prompt, | |
video_length = video_length, | |
frames = real_frames, | |
num_inference_steps = sample_steps, | |
generator = generator, | |
guidance_scale = guidance_scale, | |
negative_prompt = neg_prompt, | |
width = width, | |
height = height, | |
trajs = trajectories, | |
output_dir = "tmp/", | |
inject_step = inject_step, | |
old_qk = old_qk, | |
) | |
.videos[0] | |
.permute(1, 2, 3, 0) | |
.cpu() | |
.numpy() | |
* 255 | |
).astype(np.uint8) | |
temp_video_name = f"/tmp/{prompt}_{neg_prompt}_{str(guidance_scale)}_{time.time()}.mp4".replace(" ", "-") | |
video_writer = imageio.get_writer(temp_video_name, fps=fps) | |
for frame in sample: | |
video_writer.append_data(frame) | |
print(f"Saving video to {temp_video_name}, sample shape: {sample.shape}") | |
return temp_video_name | |
if __name__ == "__main__": | |
video_path = "./data/puff.mp4" | |
inference( | |
video_path = video_path, | |
prompt = "A Tiger, high quality", | |
neg_prompt = "a cat with big eyes, deformed", | |
guidance_scale = 20, | |
old_qk = 0, | |
) | |