import gradio as gr import torch import numpy as np from transformers import AutoProcessor, AutoModel from PIL import Image import cv2 from pathlib import Path from tempfile import NamedTemporaryFile MODEL_NAME = "microsoft/xclip-base-patch16-zero-shot" CLIP_LEN = 32 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") processor = AutoProcessor.from_pretrained(MODEL_NAME) model = AutoModel.from_pretrained(MODEL_NAME).to(device) def get_video_length(file_path): cap = cv2.VideoCapture(file_path) length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() return length def read_video_opencv(file_path, indices): frames = [] failed_indices = [] cap = cv2.VideoCapture(file_path) if not cap.isOpened(): print(f"Error opening video file: {file_path}") return frames max_index = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - 1 for idx in indices: if idx <= max_index: frame = get_frame_with_opened_cap(cap, idx) if frame is not None: frames.append(frame) else: failed_indices.append(idx) else: failed_indices.append(idx) cap.release() if failed_indices: print(f"Failed to extract frames at indices: {failed_indices}") return frames def get_frame_with_opened_cap(cap, index): cap.set(cv2.CAP_PROP_POS_FRAMES, index) ret, frame = cap.read() if ret: return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return None def sample_uniform_frame_indices(clip_len, seg_len): if seg_len < clip_len: repeat_factor = np.ceil(clip_len / seg_len).astype(int) indices = np.arange(seg_len).tolist() * repeat_factor indices = indices[:clip_len] else: spacing = seg_len // clip_len indices = [i * spacing for i in range(clip_len)] return np.array(indices).astype(np.int64) def concatenate_frames(frames, clip_len): layout = { 32: (4, 8) } rows, cols = layout[clip_len] combined_image = Image.new('RGB', (frames[0].shape[1]*cols, frames[0].shape[0]*rows)) frame_iter = iter(frames) y_offset = 0 for i in range(rows): x_offset = 0 for j in range(cols): img = Image.fromarray(next(frame_iter)) combined_image.paste(img, (x_offset, y_offset)) x_offset += frames[0].shape[1] y_offset += frames[0].shape[0] return combined_image def model_interface(uploaded_video, activity): video_length = get_video_length(uploaded_video) indices = sample_uniform_frame_indices(CLIP_LEN, seg_len=video_length) video = read_video_opencv(uploaded_video, indices) concatenated_image = concatenate_frames(video, CLIP_LEN) activities_list = [activity, "other"] inputs = processor( text=activities_list, videos=list(video), return_tensors="pt", padding=True, ) for key, value in inputs.items(): if isinstance(value, torch.Tensor): inputs[key] = value.to(device) with torch.no_grad(): outputs = model(**inputs) logits_per_video = outputs.logits_per_video probs = logits_per_video.softmax(dim=1) results_probs = [] results_logits = [] max_prob_index = torch.argmax(probs[0]).item() for i in range(len(activities_list)): current_activity = activities_list[i] prob = float(probs[0][i].cpu()) logit = float(logits_per_video[0][i].cpu()) results_probs.append((current_activity, f"Probability: {prob * 100:.2f}%")) results_logits.append((current_activity, f"Raw Score: {logit:.2f}")) likely_label = activities_list[max_prob_index] likely_probability = float(probs[0][max_prob_index].cpu()) * 100 return concatenated_image, results_probs, results_logits, [likely_label, likely_probability] iface = gr.Interface( fn=model_interface, inputs=[ gr.Video(label="Upload a Video"), gr.Textbox(label="Activity to Detect") ], outputs=[ gr.Image(label="Concatenated Frames"), gr.Dataframe(headers=["Activity", "Probability"], label="Probabilities"), gr.Dataframe(headers=["Activity", "Raw Score"], label="Raw Scores"), gr.Textbox(label="Most Likely Activity") ], title="Video Activity Classifier", description=""" **Instructions:** 1. **Upload a Video**: Select a video file to upload. 2. **Enter Activity Label**: Specify the activity you want to detect in the video. 3. **View Results**: - The concatenated frames from the video will be displayed. - Probabilities and raw scores for the specified activity and the "other" category will be shown. - The most likely activity detected in the video will be displayed. """ ) if __name__ == "__main__": iface.launch()