|
import av |
|
import torch |
|
import numpy as np |
|
from transformers import LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor |
|
|
|
def read_video_pyav(container, indices): |
|
''' |
|
Decode the video with PyAV decoder. |
|
Args: |
|
container (`av.container.input.InputContainer`): PyAV container. |
|
indices (`List[int]`): List of frame indices to decode. |
|
Returns: |
|
result (np.ndarray): np array of decoded frames of shape (num_frames, height, width, 3). |
|
''' |
|
frames = [] |
|
container.seek(0) |
|
start_index = indices[0] |
|
end_index = indices[-1] |
|
for i, frame in enumerate(container.decode(video=0)): |
|
if i > end_index: |
|
break |
|
if i >= start_index and i in indices: |
|
frames.append(frame) |
|
return np.stack([x.to_ndarray(format="rgb24") for x in frames]) |
|
|
|
|
|
model = LlavaNextVideoForConditionalGeneration.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf", torch_dtype=torch.float16, device_map="auto") |
|
processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") |
|
|
|
|
|
video_path = hf_hub_download(repo_id="raushan-testing-hf/videos-test", filename="sample_demo_1.mp4", repo_type="dataset") |
|
container = av.open(video_path) |
|
total_frames = container.streams.video[0].frames |
|
indices = np.arange(0, total_frames, total_frames / 8).astype(int) |
|
video = read_video_pyav(container, indices) |
|
|
|
conversation = [ |
|
{ |
|
|
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": "Why is this video funny?"}, |
|
{"type": "video"}, |
|
], |
|
}, |
|
] |
|
|
|
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) |
|
inputs = processor(text=prompt, videos=video, return_tensors="pt") |
|
|
|
out = model.generate(**inputs, max_new_tokens=60) |
|
processor.batch_decode(out, skip_special_tokens=True, clean_up_tokenization_spaces=True) |
|
|