Spaces:
Running
on
Zero
Running
on
Zero
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor | |
import torch | |
import numpy as np | |
import av | |
import spaces | |
import gradio as gr | |
import os | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16 | |
) | |
model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf' | |
processor = LlavaNextVideoProcessor.from_pretrained(model_name) | |
model = LlavaNextVideoForConditionalGeneration.from_pretrained( | |
model_name, | |
quantization_config=quantization_config, | |
device_map='auto' | |
) | |
def read_video_pyav(container, indices): | |
''' | |
Decode the video with PyAV decoder. | |
Args: | |
container (av.container.input.InputContainer): PyAV container. | |
indices (List[int]): List of frame indices to decode. | |
Returns: | |
np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). | |
''' | |
frames = [] | |
container.seek(0) | |
start_index = indices[0] | |
end_index = indices[-1] | |
for i, frame in enumerate(container.decode(video=0)): | |
if i > end_index: | |
break | |
if i >= start_index and i in indices: | |
frames.append(frame) | |
return np.stack([x.to_ndarray(format="rgb24") for x in frames]) | |
def process_video(video_file, question): | |
# Open video and sample frames | |
with av.open(video_file.name) as container: # Access file name from Gradio input | |
total_frames = container.streams.video[0].frames | |
indices = np.arange(0, total_frames, total_frames / 8).astype(int) | |
video_clip = read_video_pyav(container, indices) | |
# Prepare conversation | |
conversation = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": f"{question}"}, | |
{"type": "video"}, | |
], | |
}, | |
] | |
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) | |
# Prepare inputs for the model | |
input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device) | |
# Generate output | |
generate_kwargs = {"max_new_tokens": 500, "do_sample": False, "top_p": 0.9} | |
output = model.generate(**input, **generate_kwargs) | |
generated_text = processor.batch_decode(output, skip_special_tokens=True)[0] | |
return generated_text.split("ASSISTANT: ", 1)[-1].strip() | |
def process_videos(video_files, question): | |
"""Processes multiple videos and answers a single question for each.""" | |
answers = [] | |
for video_file in video_files: | |
video_name = os.path.basename(video_file.name) | |
answer = process_video(video_file, question) | |
answers.append(f"**Video: {video_name}**\n{answer}\n") | |
return "\n---\n".join(answers) | |
# Define Gradio interface for multiple videos | |
def gradio_interface(videos, question): | |
answers = process_videos(videos, question) | |
return answers | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=[ | |
gr.File(label="Upload Videos", file_count="multiple"), | |
gr.Textbox(label="Enter Your Question") | |
], | |
outputs=gr.Textbox(label="Generated Answers"), | |
title="Video Question Answering", | |
description="Upload multiple videos and ask a single question to receive answers tailored to each video." | |
) | |
if __name__ == "__main__": | |
iface.launch(debug=True) |