File size: 4,870 Bytes
b4b5272
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import gradio as gr
import torch
import numpy as np
from transformers import AutoProcessor, AutoModel
from PIL import Image
import cv2
from pathlib import Path
from tempfile import NamedTemporaryFile

MODEL_NAME = "microsoft/xclip-base-patch16-zero-shot"
CLIP_LEN = 32

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

processor = AutoProcessor.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME).to(device)

def get_video_length(file_path):
    cap = cv2.VideoCapture(file_path)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()
    return length

def read_video_opencv(file_path, indices):
    frames = []
    failed_indices = []
    
    cap = cv2.VideoCapture(file_path)
    if not cap.isOpened():
        print(f"Error opening video file: {file_path}")
        return frames

    max_index = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - 1
    for idx in indices:
        if idx <= max_index:
            frame = get_frame_with_opened_cap(cap, idx)
            if frame is not None:
                frames.append(frame)
            else:
                failed_indices.append(idx)
        else:
            failed_indices.append(idx)
    cap.release()

    if failed_indices:
        print(f"Failed to extract frames at indices: {failed_indices}")
    return frames

def get_frame_with_opened_cap(cap, index):
    cap.set(cv2.CAP_PROP_POS_FRAMES, index)
    ret, frame = cap.read()
    if ret:
        return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    return None

def sample_uniform_frame_indices(clip_len, seg_len):
    if seg_len < clip_len:
        repeat_factor = np.ceil(clip_len / seg_len).astype(int)
        indices = np.arange(seg_len).tolist() * repeat_factor
        indices = indices[:clip_len]
    else:
        spacing = seg_len // clip_len
        indices = [i * spacing for i in range(clip_len)]
    return np.array(indices).astype(np.int64)

def concatenate_frames(frames, clip_len):
    layout = { 32: (4, 8) }
    rows, cols = layout[clip_len]
    combined_image = Image.new('RGB', (frames[0].shape[1]*cols, frames[0].shape[0]*rows))
    frame_iter = iter(frames)
    y_offset = 0
    for i in range(rows):
        x_offset = 0
        for j in range(cols):
            img = Image.fromarray(next(frame_iter))
            combined_image.paste(img, (x_offset, y_offset))
            x_offset += frames[0].shape[1]
        y_offset += frames[0].shape[0]
    return combined_image

def model_interface(uploaded_video, activity):
    video_length = get_video_length(uploaded_video)
    indices = sample_uniform_frame_indices(CLIP_LEN, seg_len=video_length)
    video = read_video_opencv(uploaded_video, indices)
    concatenated_image = concatenate_frames(video, CLIP_LEN)

    activities_list = [activity, "other"]
    inputs = processor(
        text=activities_list,
        videos=list(video),
        return_tensors="pt",
        padding=True,
    )

    for key, value in inputs.items():
        if isinstance(value, torch.Tensor):
            inputs[key] = value.to(device)

    with torch.no_grad():
        outputs = model(**inputs)

    logits_per_video = outputs.logits_per_video
    probs = logits_per_video.softmax(dim=1)

    results_probs = []
    results_logits = []
    max_prob_index = torch.argmax(probs[0]).item()
    for i in range(len(activities_list)):
        current_activity = activities_list[i]
        prob = float(probs[0][i].cpu())  
        logit = float(logits_per_video[0][i].cpu())  
        results_probs.append((current_activity, f"Probability: {prob * 100:.2f}%"))
        results_logits.append((current_activity, f"Raw Score: {logit:.2f}"))

    likely_label = activities_list[max_prob_index]
    likely_probability = float(probs[0][max_prob_index].cpu()) * 100  

    return concatenated_image, results_probs, results_logits, [likely_label, likely_probability]

iface = gr.Interface(
    fn=model_interface,
    inputs=[
        gr.Video(label="Upload a Video"),
        gr.Textbox(label="Activity to Detect")
    ],
    outputs=[
        gr.Image(label="Concatenated Frames"),
        gr.Dataframe(headers=["Activity", "Probability"], label="Probabilities"),
        gr.Dataframe(headers=["Activity", "Raw Score"], label="Raw Scores"),
        gr.Textbox(label="Most Likely Activity")
    ],
    title="Video Activity Classifier",
    description="""
    **Instructions:**

    1. **Upload a Video**: Select a video file to upload. 
    2. **Enter Activity Label**: Specify the activity you want to detect in the video.
    3. **View Results**: 
       - The concatenated frames from the video will be displayed.
       - Probabilities and raw scores for the specified activity and the "other" category will be shown.
       - The most likely activity detected in the video will be displayed.
    """
)

if __name__ == "__main__":
    iface.launch()