Spaces:
Sleeping
Sleeping
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor | |
import torch | |
import numpy as np | |
import av | |
import gc | |
import spaces | |
import gradio as gr | |
import os | |
import json | |
import csv | |
import io | |
# Model Configuration | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16 | |
) | |
model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf' | |
# Load Model and Processor | |
processor = LlavaNextVideoProcessor.from_pretrained(model_name) | |
model = LlavaNextVideoForConditionalGeneration.from_pretrained( | |
model_name, | |
quantization_config=quantization_config, | |
device_map='auto' | |
) | |
def read_video_pyav(container, indices): | |
''' | |
Decode the video with PyAV decoder. | |
''' | |
frames = [] | |
container.seek(0) | |
start_index = indices[0] | |
end_index = indices[-1] | |
for i, frame in enumerate(container.decode(video=0)): | |
if i > end_index: | |
break | |
if i >= start_index and i in indices: | |
frames.append(frame) | |
return np.stack([x.to_ndarray(format="rgb24") for x in frames]) | |
def process_video(video_file, question): | |
''' | |
Processes a single video and returns the answer to the given question. | |
''' | |
with av.open(video_file.name) as container: | |
total_frames = container.streams.video[0].frames | |
indices = np.arange(0, total_frames, total_frames / 8).astype(int) | |
video_clip = read_video_pyav(container, indices) | |
conversation = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": f"{question}"}, | |
{"type": "video"}, | |
], | |
}, | |
] | |
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) | |
input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device) | |
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} | |
# Disable gradient calculation during inference | |
with torch.no_grad(): | |
output = model.generate(**input, **generate_kwargs) | |
generated_text = processor.batch_decode(output, skip_special_tokens=True)[0] | |
return generated_text.split("ASSISTANT: ", 1)[-1].strip() | |
def analyze_videos(video_files, selected_questions): | |
"""Analyzes videos, saves results to CSV, and returns CSV data and JSON.""" | |
all_results = {} | |
questions = { | |
"hands_free": "Is the subject's hand in the video free or not?", | |
"standing": "Is the subject in the video sitting or standing?", | |
"interaction_with_background": "Assess the surroundings behind the subject in the video. Do they seem to interact with any visible screens, such as laptops, TVs, or digital billboards? If yes, then they are interacting with a screen. If not, they are not interacting with a screen.", | |
"indoors": "Consider the broader environmental context shown in the video’s background. Are there signs of an open-air space, like greenery, structures, or people passing by? If so, it’s an outdoor setting. If the setting looks confined with furniture, walls, or home decorations, it’s an indoor environment." | |
} | |
for video_file in video_files: | |
video_name = os.path.basename(video_file.name) | |
all_results[video_name] = {} | |
for question_key in selected_questions: | |
answer = process_video(video_file, questions[question_key]) | |
all_results[video_name][question_key] = "true" if "yes" in answer.lower() else "false" | |
# Clear cache and collect garbage after each video | |
gc.collect() | |
torch.cuda.empty_cache() | |
# Create CSV content | |
csv_output = io.StringIO() | |
writer = csv.writer(csv_output) | |
header = ["Video File"] + list(questions.keys()) | |
writer.writerow(header) | |
for video_name, results in all_results.items(): | |
row = [video_name] + [results.get(key, "") for key in questions] | |
writer.writerow(row) | |
csv_content = csv_output.getvalue() | |
# Return both JSON and CSV | |
json_output = json.dumps(all_results, indent=4) | |
return json_output, csv_content | |
def download_csv(csv_content): | |
"""Creates a downloadable CSV file.""" | |
return gr.File.update( | |
value=csv_content, | |
filename="video_analysis.csv", | |
) | |
# Define Gradio interface | |
with gr.Blocks() as iface: | |
with gr.Row(): | |
file_input = gr.File(label="Upload Videos", file_count="multiple") | |
question_input = gr.CheckboxGroup(["hands_free", "standing", "interaction_with_background", "indoors"], | |
label="Select Questions to Apply") | |
process_button = gr.Button("Process Videos") | |
with gr.Row(): | |
json_output = gr.JSON(label="Analysis Results (JSON)") | |
csv_output = gr.Textbox(label="CSV Results", lines=15) | |
download_button = gr.Button("Download CSV") | |
# Link buttons to their respective functions | |
process_button.click(analyze_videos, inputs=[file_input, question_input], outputs=[json_output, csv_output]) | |
download_button.click(download_csv, inputs=csv_output, outputs=download_button) | |
if __name__ == "__main__": | |
iface.launch(debug=True) |