Spaces:
Running
on
A10G
Running
on
A10G
import os | |
import copy | |
import json | |
import time | |
import torch | |
import argparse | |
from PIL import Image | |
import numpy as np | |
import soundfile as sf | |
from tqdm import tqdm | |
from diffusers import DDPMScheduler | |
from models import build_pretrained_models, AudioDiffusion | |
from transformers import AutoProcessor, ClapModel | |
import torchaudio | |
import tools.torch_tools as torch_tools | |
from datasets import load_dataset | |
# Check if CUDA is available and set the device | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print("Using device:", device) | |
class dotdict(dict): | |
"""dot.notation access to dictionary attributes""" | |
__getattr__ = dict.get | |
__setattr__ = dict.__setitem__ | |
__delattr__ = dict.__delitem__ | |
def chunks(lst, n): | |
"""Yield successive n-sized chunks from lst.""" | |
for i in range(0, len(lst), n): | |
yield lst[i:i + n] | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="Inference for text to audio generation task.") | |
parser.add_argument( | |
"--original_args", type=str, default=None, | |
help="Path for summary jsonl file saved during training." | |
) | |
parser.add_argument( | |
"--model", type=str, default=None, | |
help="Path for saved model bin file." | |
) | |
parser.add_argument( | |
"--vae_model", type=str, default="audioldm-s-full", | |
help="Path for saved model bin file." | |
) | |
parser.add_argument( | |
"--num_steps", type=int, default=200, | |
help="How many denoising steps for generation.", | |
) | |
parser.add_argument( | |
"--guidance", type=float, default=3, | |
help="Guidance scale for classifier free guidance." | |
) | |
parser.add_argument( | |
"--batch_size", type=int, default=1, | |
help="Batch size for generation.", | |
) | |
parser.add_argument( | |
"--num_samples", type=int, default=1, | |
help="How many samples per prompt.", | |
) | |
parser.add_argument( | |
"--num_test_instances", type=int, default=-1, | |
help="How many test instances to evaluate.", | |
) | |
parser.add_argument( | |
"--sample_rate", type=int, default=16000, | |
help="Sample rate for audio output.", | |
) | |
parser.add_argument( | |
"--max_duration", type=int, default=10, | |
help="Maximum length duration for generated audio." | |
) | |
parser.add_argument( | |
"--save_dir", type=str, default="./outputs/tmp", | |
help="output save dir" | |
) | |
parser.add_argument( | |
"--data_path", type=str, default="data/video_processed/video_gt_augment", | |
help="inference data path" | |
) | |
args = parser.parse_args() | |
return args | |
def main(): | |
args = parse_args() | |
train_args = dotdict(json.loads(open(args.original_args).readlines()[0])) | |
if "hf_model" not in train_args: | |
train_args["hf_model"] = None | |
# Load Models # | |
name = train_args.vae_model | |
vae, stft = build_pretrained_models(name) | |
vae, stft = vae.to(device), stft.to(device) # Ensure models are on the correct device | |
model_class = AudioDiffusion | |
if train_args.ib: | |
print("*****USING MODEL IMAGEBIND*****") | |
from models_imagebind import AudioDiffusion_IB | |
model_class = AudioDiffusion_IB | |
elif train_args.lb: | |
print("*****USING MODEL LANGUAGEBIND*****") | |
from models_languagebind import AudioDiffusion_LB | |
model_class = AudioDiffusion_LB | |
elif train_args.jepa: | |
print("*****USING MODEL JEPA*****") | |
from models_vjepa import AudioDiffusion_JEPA | |
model_class = AudioDiffusion_JEPA | |
model = model_class( | |
train_args.fea_encoder_name, | |
train_args.scheduler_name, | |
train_args.unet_model_name, | |
train_args.unet_model_config, | |
train_args.snr_gamma, | |
train_args.freeze_text_encoder, | |
train_args.uncondition, | |
train_args.img_pretrained_model_path, | |
train_args.task, | |
train_args.embedding_dim, | |
train_args.pe | |
) | |
model.eval() | |
# Load Trained Weight # | |
try: | |
if args.model.endswith(".pt") or args.model.endswith(".bin"): | |
model.load_state_dict(torch.load(args.model, map_location=device), strict=False) | |
else: | |
from safetensors.torch import load_model | |
load_model(model, args.model, strict=False) | |
except OSError as e: | |
print(f"Error loading model with safetensors: {e}") | |
print("Falling back to torch.load") | |
model.load_state_dict(torch.load(args.model, map_location=device), strict=False) | |
model.to(device) | |
scheduler = DDPMScheduler.from_pretrained(train_args.scheduler_name, subfolder="scheduler") | |
sample_rate = args.sample_rate | |
# Define max_len_in_seconds globally for consistency | |
max_len_in_seconds = args.max_duration | |
def audio_text_matching(waveforms, text, sample_freq=24000, max_len_in_seconds=max_len_in_seconds): | |
new_freq = 48000 | |
resampled = [] | |
for wav in waveforms: | |
x = torchaudio.functional.resample(torch.tensor(wav, dtype=torch.float).reshape(1, -1), orig_freq=sample_freq, new_freq=new_freq)[0].numpy() | |
resampled.append(x[:new_freq*max_len_in_seconds]) | |
inputs = clap_processor(text=text, audios=resampled, return_tensors="pt", padding=True, sampling_rate=48000) | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
with torch.no_grad(): | |
outputs = clap(**inputs) | |
logits_per_audio = outputs.logits_per_audio | |
ranks = torch.argsort(logits_per_audio.flatten(), descending=True).cpu().numpy() | |
return ranks | |
# Load Data # | |
if train_args.prefix: | |
prefix = train_args.prefix | |
else: | |
prefix = "" | |
data_path = args.data_path | |
wavname = [f"{name.split('.')[0]}.wav" for name in os.listdir(data_path)] | |
video_features = [] | |
for video_file in os.listdir(data_path): | |
video_path = os.path.join(data_path, video_file) | |
video_feature = torch_tools.load_video(video_path, frame_rate=2, size=224) | |
print(video_feature.shape) | |
video_features.append(video_feature.to(device)) # Move to device | |
# Generate # | |
num_steps, guidance, batch_size, num_samples = args.num_steps, args.guidance, args.batch_size, args.num_samples | |
all_outputs = [] | |
for k in tqdm(range(0, len(wavname), batch_size)): | |
with torch.no_grad(): | |
prompt = video_features[k: k+batch_size] | |
latents = model.inference(scheduler, None, prompt, None, num_steps, guidance, num_samples, disable_progress=True, device=device) | |
mel = vae.decode_first_stage(latents) | |
wave = vae.decode_to_waveform(mel) | |
# Ensure the waveform is exactly 8 seconds long | |
num_samples_n_seconds = sample_rate * max_len_in_seconds | |
wave = [wav[:num_samples_n_seconds] for wav in wave] | |
all_outputs += [item for item in wave] | |
# Save # | |
exp_id = str(int(time.time())) | |
if not os.path.exists("outputs"): | |
os.makedirs("outputs") | |
if num_samples == 1: | |
output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}_augment".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate) | |
os.makedirs(output_dir, exist_ok=True) | |
for j, wav in enumerate(all_outputs): | |
sf.write("{}/{}".format(output_dir, wavname[j]), wav, samplerate=sample_rate) | |
else: | |
for i in range(num_samples): | |
output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}/rank_{}".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate, i+1) | |
os.makedirs(output_dir, exist_ok=True) | |
groups = list(chunks(all_outputs, num_samples)) | |
for k in tqdm(range(len(groups))): | |
wavs_for_text = groups[k] | |
rank = audio_text_matching(wavs_for_text, text_prompts[k]) | |
ranked_wavs_for_text = [wavs_for_text[r] for r in rank] | |
for i, wav in enumerate(ranked_wavs_for_text): | |
output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}/rank_{}".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate, i+1) | |
sf.write("{}/{}".format(output_dir, wavname[k]), wav, samplerate=sample_rate) | |
if __name__ == "__main__": | |
main() |