|
|
|
|
|
from diffusers import AutoencoderKL, DDPMScheduler, DDIMScheduler |
|
from transformers import CLIPTextModel, CLIPTokenizer |
|
from omegaconf import OmegaConf |
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
from diffusers.utils.import_utils import is_xformers_available |
|
from typing import Any |
|
import torch |
|
from einops import rearrange |
|
|
|
from animatediff.models.unet import UNet3DConditionModel |
|
from animatediff.pipelines.pipeline_animation import AnimationPipeline |
|
from animatediff.utils.util import save_videos_grid |
|
from animatediff.utils.util import load_weights |
|
|
|
|
|
class EndpointHandler(): |
|
def __init__(self, model_path: str = "bluestarburst/AnimateDiff-SceneFusion"): |
|
|
|
|
|
|
|
inference_config = {'unet_additional_kwargs': {'unet_use_cross_frame_attention': False, 'unet_use_temporal_attention': False, 'use_motion_module': True, 'motion_module_resolutions': [1, 2, 4, 8], 'motion_module_mid_block': False, 'motion_module_decoder_only': False, 'motion_module_type': 'Vanilla', 'motion_module_kwargs': {'num_attention_heads': 8, 'num_transformer_block': 1, 'attention_block_types': ['Temporal_Self', 'Temporal_Self'], 'temporal_position_encoding': True, 'temporal_position_encoding_max_len': 24, 'temporal_attention_dim_div': 1}}, 'noise_scheduler_kwargs': {'DDIMScheduler': {'num_train_timesteps': 1000, 'beta_start': 0.00085, 'beta_end': 0.012, 'beta_schedule': 'linear', 'steps_offset': 1, 'clip_sample': False}, 'EulerAncestralDiscreteScheduler': {'num_train_timesteps': 1000, 'beta_start': 0.00085, 'beta_end': 0.012, 'beta_schedule': 'linear'}, 'KDPM2AncestralDiscreteScheduler': {'num_train_timesteps': 1000, 'beta_start': 0.00085, 'beta_end': 0.012, 'beta_schedule': 'linear'}}} |
|
|
|
|
|
tokenizer = CLIPTokenizer.from_pretrained(model_path, subfolder="models/StableDiffusion/tokenizer") |
|
text_encoder = CLIPTextModel.from_pretrained(model_path, subfolder="models/StableDiffusion/text_encoder") |
|
vae = AutoencoderKL.from_pretrained(model_path, subfolder="models/StableDiffusion/vae") |
|
unet = UNet3DConditionModel.from_pretrained_2d(model_path, subfolder="models/StableDiffusion/unet", unet_additional_kwargs=OmegaConf.to_container(inference_config.unet_additional_kwargs)) |
|
|
|
if is_xformers_available(): unet.enable_xformers_memory_efficient_attention() |
|
else: assert False |
|
|
|
self.pipeline = AnimationPipeline( |
|
vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, unet=unet, |
|
scheduler=DDIMScheduler(**OmegaConf.to_container(inference_config['noise_scheduler_kwargs']['DDIMScheduler']'])), |
|
).to("cuda") |
|
|
|
# huggingface download motion module from bluestarburst/AnimateDiff-SceneFusion/models/Motion_Module/mm_sd_v15.ckpt |
|
|
|
motion_module = "models/Motion_Module/mm_sd_v15.ckpt" |
|
hf_hub_download(repo_id="bluestarburst/AnimateDiff-SceneFusion", filename="models/Motion_Module/mm_sd_v15.ckpt", output_dir="models/Motion_Module") |
|
|
|
|
|
self.pipeline = load_weights( |
|
self.pipeline, |
|
# motion module |
|
motion_module_path = motion_module, |
|
motion_module_lora_configs = [], |
|
# image layers |
|
dreambooth_model_path = "", |
|
lora_model_path = "", |
|
lora_alpha = 0.8, |
|
).to("cuda") |
|
|
|
def __call__(self, prompt, negative_prompt, steps, guidance_scale): |
|
""" |
|
__call__ method will be called once per request. This can be used to |
|
run inference. |
|
""" |
|
vids = self.pipeline( |
|
prompt=prompt, |
|
negative_prompt=negative_prompt, |
|
num_inference_steps=steps, |
|
guidance_scale=guidance_scale, |
|
width= 256, |
|
height= 256, |
|
video_length= 5, |
|
).videos |
|
|
|
videos = rearrange(vids, "b c t h w -> t b c h w") |
|
n_rows=6 |
|
fps=1 |
|
loop = True |
|
rescale=False |
|
outputs = [] |
|
for x in videos: |
|
x = torchvision.utils.make_grid(x, nrow=n_rows) |
|
x = x.transpose(0, 1).transpose(1, 2).squeeze(-1) |
|
if rescale: |
|
x = (x + 1.0) / 2.0 # -1,1 -> 0,1 |
|
x = (x * 255).numpy().astype(np.uint8) |
|
outputs.append(x) |
|
|
|
# imageio.mimsave(path, outputs, fps=fps) |
|
|
|
# return a gif file as bytes |
|
return outputs |
|
|
|
|
|
# This is the entry point for the serverless function. |
|
# This function will be called during inference time. |
|
|
|
|
|
# new_handler = EndpointHandler() |
|
|