import torch from xora.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder from xora.models.transformers.transformer3d import Transformer3DModel from xora.models.transformers.symmetric_patchifier import SymmetricPatchifier from xora.schedulers.rf import RectifiedFlowScheduler from xora.pipelines.pipeline_xora_video import XoraVideoPipeline from pathlib import Path from transformers import T5EncoderModel, T5Tokenizer import safetensors.torch import json import argparse from xora.utils.conditioning_method import ConditioningMethod import os import numpy as np import cv2 from PIL import Image import random RECOMMENDED_RESOLUTIONS = [ (704, 1216, 41), (704, 1088, 49), (640, 1056, 57), (608, 992, 65), (608, 896, 73), (544, 896, 81), (544, 832, 89), (512, 800, 97), (512, 768, 97), (480, 800, 105), (480, 736, 113), (480, 704, 121), (448, 704, 129), (448, 672, 137), (416, 640, 153), (384, 672, 161), (384, 640, 169), (384, 608, 177), (384, 576, 185), (352, 608, 193), (352, 576, 201), (352, 544, 209), (352, 512, 225), (352, 512, 233), (320, 544, 241), (320, 512, 249), (320, 512, 257), ] def load_vae(vae_dir): vae_ckpt_path = vae_dir / "vae_diffusion_pytorch_model.safetensors" vae_config_path = vae_dir / "config.json" with open(vae_config_path, "r") as f: vae_config = json.load(f) vae = CausalVideoAutoencoder.from_config(vae_config) vae_state_dict = safetensors.torch.load_file(vae_ckpt_path) vae.load_state_dict(vae_state_dict) return vae.cuda().to(torch.bfloat16) def load_unet(unet_dir): unet_ckpt_path = unet_dir / "unet_diffusion_pytorch_model.safetensors" unet_config_path = unet_dir / "config.json" transformer_config = Transformer3DModel.load_config(unet_config_path) transformer = Transformer3DModel.from_config(transformer_config) unet_state_dict = safetensors.torch.load_file(unet_ckpt_path) transformer.load_state_dict(unet_state_dict, strict=True) return transformer.cuda() def load_scheduler(scheduler_dir): scheduler_config_path = scheduler_dir / "scheduler_config.json" scheduler_config = RectifiedFlowScheduler.load_config(scheduler_config_path) return RectifiedFlowScheduler.from_config(scheduler_config) def center_crop_and_resize(frame, target_height, target_width): h, w, _ = frame.shape aspect_ratio_target = target_width / target_height aspect_ratio_frame = w / h if aspect_ratio_frame > aspect_ratio_target: new_width = int(h * aspect_ratio_target) x_start = (w - new_width) // 2 frame_cropped = frame[:, x_start : x_start + new_width] else: new_height = int(w / aspect_ratio_target) y_start = (h - new_height) // 2 frame_cropped = frame[y_start : y_start + new_height, :] frame_resized = cv2.resize(frame_cropped, (target_width, target_height)) return frame_resized def load_video_to_tensor_with_resize(video_path, target_height, target_width): cap = cv2.VideoCapture(video_path) frames = [] while True: ret, frame = cap.read() if not ret: break frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if target_height is not None: frame_resized = center_crop_and_resize( frame_rgb, target_height, target_width ) else: frame_resized = frame_rgb frames.append(frame_resized) cap.release() video_np = (np.array(frames) / 127.5) - 1.0 video_tensor = torch.tensor(video_np).permute(3, 0, 1, 2).float() return video_tensor def load_image_to_tensor_with_resize(image_path, target_height=512, target_width=768): image = Image.open(image_path).convert("RGB") image_np = np.array(image) frame_resized = center_crop_and_resize(image_np, target_height, target_width) frame_tensor = torch.tensor(frame_resized).permute(2, 0, 1).float() frame_tensor = (frame_tensor / 127.5) - 1.0 # Create 5D tensor: (batch_size=1, channels=3, num_frames=1, height, width) return frame_tensor.unsqueeze(0).unsqueeze(2) def main(): parser = argparse.ArgumentParser( description="Load models from separate directories and run the pipeline." ) # Directories parser.add_argument( "--ckpt_dir", type=str, required=True, help="Path to the directory containing unet, vae, and scheduler subdirectories", ) parser.add_argument( "--input_video_path", type=str, help="Path to the input video file (first frame used)", ) parser.add_argument( "--input_image_path", type=str, help="Path to the input image file" ) parser.add_argument( "--output_path", type=str, default=None, help="Path to save output video, if None will save in working directory.", ) parser.add_argument("--seed", type=int, default="171198") # Pipeline parameters parser.add_argument( "--num_inference_steps", type=int, default=40, help="Number of inference steps" ) parser.add_argument( "--num_images_per_prompt", type=int, default=1, help="Number of images per prompt", ) parser.add_argument( "--guidance_scale", type=float, default=3, help="Guidance scale for the pipeline", ) parser.add_argument( "--height", type=int, default=None, help="Height of the output video frames. Optional if an input image provided.", ) parser.add_argument( "--width", type=int, default=None, help="Width of the output video frames. If None will infer from input image.", ) parser.add_argument( "--num_frames", type=int, default=121, help="Number of frames to generate in the output video", ) parser.add_argument( "--frame_rate", type=int, default=25, help="Frame rate for the output video" ) parser.add_argument( "--bfloat16", action="store_true", help="Denoise in bfloat16", ) # Prompts parser.add_argument( "--prompt", type=str, help="Text prompt to guide generation", ) parser.add_argument( "--negative_prompt", type=str, default="worst quality, inconsistent motion, blurry, jittery, distorted", help="Negative prompt for undesired features", ) parser.add_argument( "--custom_resolution", action="store_true", default=False, help="Enable custom resolution (not in recommneded resolutions) if specified (default: False)", ) args = parser.parse_args() if args.input_image_path is None and args.input_video_path is None: assert ( args.height is not None and args.width is not None ), "Must enter height and width for text to image generation." # Load media (video or image) if args.input_video_path: media_items = load_video_to_tensor_with_resize( args.input_video_path, args.height, args.width ).unsqueeze(0) elif args.input_image_path: media_items = load_image_to_tensor_with_resize( args.input_image_path, args.height, args.width ) else: media_items = None height = args.height if args.height else media_items.shape[-2] width = args.width if args.width else media_items.shape[-1] assert height % 32 == 0, f"Height ({height}) should be divisible by 32." assert width % 32 == 0, f"Width ({width}) should be divisible by 32." assert ( height, width, args.num_frames, ) in RECOMMENDED_RESOLUTIONS or args.custom_resolution, f"The selected resolution + num frames combination is not supported, results would be suboptimal. Supported (h,w,f) are: {RECOMMENDED_RESOLUTIONS}. Use --custom_resolution to enable working with this resolution." # Paths for the separate mode directories ckpt_dir = Path(args.ckpt_dir) unet_dir = ckpt_dir / "unet" vae_dir = ckpt_dir / "vae" scheduler_dir = ckpt_dir / "scheduler" # Load models vae = load_vae(vae_dir) unet = load_unet(unet_dir) scheduler = load_scheduler(scheduler_dir) patchifier = SymmetricPatchifier(patch_size=1) text_encoder = T5EncoderModel.from_pretrained( "PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="text_encoder" ).to("cuda") tokenizer = T5Tokenizer.from_pretrained( "PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="tokenizer" ) if args.bfloat16 and unet.dtype != torch.bfloat16: unet = unet.to(torch.bfloat16) # Use submodels for the pipeline submodel_dict = { "transformer": unet, "patchifier": patchifier, "text_encoder": text_encoder, "tokenizer": tokenizer, "scheduler": scheduler, "vae": vae, } pipeline = XoraVideoPipeline(**submodel_dict).to("cuda") # Prepare input for the pipeline sample = { "prompt": args.prompt, "prompt_attention_mask": None, "negative_prompt": args.negative_prompt, "negative_prompt_attention_mask": None, "media_items": media_items, } random.seed(args.seed) np.random.seed(args.seed) torch.manual_seed(args.seed) torch.cuda.manual_seed(args.seed) generator = torch.Generator(device="cuda").manual_seed(args.seed) images = pipeline( num_inference_steps=args.num_inference_steps, num_images_per_prompt=args.num_images_per_prompt, guidance_scale=args.guidance_scale, generator=generator, output_type="pt", callback_on_step_end=None, height=height, width=width, num_frames=args.num_frames, frame_rate=args.frame_rate, **sample, is_video=True, vae_per_channel_normalize=True, conditioning_method=( ConditioningMethod.FIRST_FRAME if media_items is not None else ConditioningMethod.UNCONDITIONAL ), mixed_precision=not args.bfloat16, ).images # Save output video def get_unique_filename(base, ext, dir=".", index_range=1000): for i in range(index_range): filename = os.path.join(dir, f"{base}_{i}{ext}") if not os.path.exists(filename): return filename raise FileExistsError( f"Could not find a unique filename after {index_range} attempts." ) for i in range(images.shape[0]): video_np = images.squeeze(0).permute(1, 2, 3, 0).cpu().float().numpy() video_np = (video_np * 255).astype(np.uint8) fps = args.frame_rate height, width = video_np.shape[1:3] if video_np.shape[0] == 1: output_filename = ( args.output_path if args.output_path is not None else get_unique_filename(f"image_output_{i}", ".png", ".") ) cv2.imwrite( output_filename, video_np[0][..., ::-1] ) # Save single frame as image else: output_filename = ( args.output_path if args.output_path is not None else get_unique_filename(f"video_output_{i}", ".mp4", ".") ) out = cv2.VideoWriter( output_filename, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height) ) for frame in video_np[..., ::-1]: out.write(frame) out.release() if __name__ == "__main__": main()