from typing import Dict, Any import os from pathlib import Path import time from datetime import datetime import argparse from hyvideo.utils.file_utils import save_videos_grid from hyvideo.inference import HunyuanVideoSampler from hyvideo.config import parse_args from hyvideo.constants import NEGATIVE_PROMPT def get_default_args(): """Create default arguments instead of parsing from command line""" parser = argparse.ArgumentParser() # Add all the arguments that were in the original parser parser.add_argument("--model", type=str, default="HYVideo-T/2") parser.add_argument("--model-resolution", type=str, default="720p", choices=["540p", "720p"]) parser.add_argument("--latent-channels", type=int, default=4) parser.add_argument("--precision", type=str, default="bf16", choices=["bf16", "fp32", "fp16"]) parser.add_argument("--batch-size", type=int, default=1) parser.add_argument("--infer-steps", type=int, default=50) parser.add_argument("--model-base", type=str, default=None) parser.add_argument("--save-path", type=str, default="outputs") parser.add_argument("--video-length", type=int, default=129) # 5 seconds # Parse with empty args list to avoid reading sys.argv args = parser.parse_args([]) return args class EndpointHandler: def __init__(self, path: str = ""): """Initialize the handler with model path and default config.""" # Use default args instead of parsing from command line self.args = get_default_args() self.args.model_base = path # Use the provided model path # Initialize model models_root_path = Path(path) if not models_root_path.exists(): raise ValueError(f"`models_root` not exists: {models_root_path}") self.model = HunyuanVideoSampler.from_pretrained(models_root_path, args=self.args) def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: """Process a single request Args: data: Dictionary containing: - inputs (str): The prompt text - resolution (str, optional): Video resolution like "1280x720" - video_length (int, optional): Number of frames - num_inference_steps (int, optional): Number of inference steps - seed (int, optional): Random seed (-1 for random) - guidance_scale (float, optional): Guidance scale value - flow_shift (float, optional): Flow shift value - embedded_guidance_scale (float, optional): Embedded guidance scale Returns: Dictionary containing the generated video as base64 string """ # Get inputs from request data prompt = data.pop("inputs", None) if prompt is None: raise ValueError("No prompt provided in the 'inputs' field") # Parse resolution resolution = data.pop("resolution", "1280x720") width, height = map(int, resolution.split("x")) # Get other parameters with defaults video_length = int(data.pop("video_length", 129)) seed = data.pop("seed", -1) seed = None if seed == -1 else int(seed) num_inference_steps = int(data.pop("num_inference_steps", 50)) guidance_scale = float(data.pop("guidance_scale", 1.0)) flow_shift = float(data.pop("flow_shift", 7.0)) embedded_guidance_scale = float(data.pop("embedded_guidance_scale", 6.0)) # Run inference outputs = self.model.predict( prompt=prompt, height=height, width=width, video_length=video_length, seed=seed, negative_prompt="", infer_steps=num_inference_steps, guidance_scale=guidance_scale, num_videos_per_prompt=1, flow_shift=flow_shift, batch_size=1, embedded_guidance_scale=embedded_guidance_scale ) # Get the video tensor samples = outputs['samples'] sample = samples[0].unsqueeze(0) # Save to temporary file temp_path = "/tmp/temp_video.mp4" save_videos_grid(sample, temp_path, fps=24) # Read video file and convert to base64 with open(temp_path, "rb") as f: video_bytes = f.read() import base64 video_base64 = base64.b64encode(video_bytes).decode() # Cleanup os.remove(temp_path) return { "video_base64": video_base64, "seed": outputs['seeds'][0], "prompt": outputs['prompts'][0] }