File size: 4,925 Bytes
56de2d4
 
b8466ce
 
 
f2ea5a0
9588460
 
10696ac
53189f9
 
 
f2ea5a0
 
ba82304
2dc6183
f2ea5a0
 
 
a29b529
 
a8a0c5a
 
 
 
a29b529
a8a0c5a
a29b529
a8a0c5a
 
 
 
 
 
a29b529
a6c8793
8805736
 
 
 
 
 
 
 
 
56de2d4
 
 
 
 
 
 
 
 
 
 
2dc6183
 
56de2d4
 
 
 
 
 
f2ea5a0
56de2d4
 
 
 
 
53189f9
a29b529
 
a8a0c5a
53189f9
56de2d4
b8466ce
56de2d4
 
a29b529
56de2d4
 
2dc6183
2c5687c
f2ea5a0
 
 
 
 
56de2d4
 
 
a23243f
56de2d4
 
 
 
b8466ce
56de2d4
b8466ce
f2ea5a0
 
b8466ce
 
56de2d4
b8466ce
f2ea5a0
56de2d4
2dc6183
1bc2256
c0f4f61
 
 
 
 
 
1bc2256
 
 
 
 
 
 
 
 
 
 
 
7d3a330
5acfd40
c0f4f61
 
1bc2256
 
8805736
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import gradio as gr
import torch
import numpy as np
from transformers import AutoProcessor, AutoModel
from PIL import Image
import cv2 
from concurrent.futures import ThreadPoolExecutor


MODEL_NAME = "microsoft/xclip-base-patch16-zero-shot"
CLIP_LEN = 32

# Check if GPU is available and set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print (device)

# Load model and processor once and move them to the device
processor = AutoProcessor.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME).to(device)

def get_video_length(file_path):
    cap = cv2.VideoCapture(file_path)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()
    return length

def read_video_opencv(file_path, indices):
    frames = []
    with ThreadPoolExecutor() as executor:
        futures = [executor.submit(get_frame, file_path, i) for i in indices]
        for future in futures:
            frame = future.result()
            if frame is not None:
                frames.append(frame)
    return frames

def get_frame(file_path, index):
    cap = cv2.VideoCapture(file_path)
    cap.set(cv2.CAP_PROP_POS_FRAMES, index)
    ret, frame = cap.read()
    cap.release()
    if ret:
        return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    return None

def sample_uniform_frame_indices(clip_len, seg_len):
    if seg_len < clip_len:
        repeat_factor = np.ceil(clip_len / seg_len).astype(int)
        indices = np.arange(seg_len).tolist() * repeat_factor
        indices = indices[:clip_len]
    else:
        spacing = seg_len // clip_len
        indices = [i * spacing for i in range(clip_len)]
    return np.array(indices).astype(np.int64)

def concatenate_frames(frames, clip_len):
    layout = { 32: (4, 8) }
    rows, cols = layout[clip_len]
    combined_image = Image.new('RGB', (frames[0].shape[1]*cols, frames[0].shape[0]*rows))
    frame_iter = iter(frames)
    y_offset = 0
    for i in range(rows):
        x_offset = 0
        for j in range(cols):
            img = Image.fromarray(next(frame_iter))
            combined_image.paste(img, (x_offset, y_offset))
            x_offset += frames[0].shape[1]
        y_offset += frames[0].shape[0]
    return combined_image

def model_interface(uploaded_video, activity):
    video_length = get_video_length(uploaded_video)
    indices = sample_uniform_frame_indices(CLIP_LEN, seg_len=video_length)
    video = read_video_opencv(uploaded_video, indices)
    concatenated_image = concatenate_frames(video, CLIP_LEN)

    activities_list = [activity, "other"]
    inputs = processor(
        text=activities_list,
        videos=list(video),
        return_tensors="pt",
        padding=True,
    )

    # Move the tensors to the same device as the model
    for key, value in inputs.items():
        if isinstance(value, torch.Tensor):
            inputs[key] = value.to(device)

    with torch.no_grad():
        outputs = model(**inputs)

    logits_per_video = outputs.logits_per_video
    probs = logits_per_video.softmax(dim=1)

    results_probs = []
    results_logits = []
    max_prob_index = torch.argmax(probs[0]).item()
    for i in range(len(activities_list)):
        current_activity = activities_list[i]
        prob = float(probs[0][i].cpu())  # Move tensor data to CPU for further processing
        logit = float(logits_per_video[0][i].cpu())  # Move tensor data to CPU for further processing
        results_probs.append((current_activity, f"Probability: {prob * 100:.2f}%"))
        results_logits.append((current_activity, f"Raw Score: {logit:.2f}"))

    likely_label = activities_list[max_prob_index]
    likely_probability = float(probs[0][max_prob_index].cpu()) * 100  # Move tensor data to CPU

    return concatenated_image, results_probs, results_logits, [likely_label, likely_probability]

video_folder = "Action Detection Samples"
video_files = [os.path.join(video_folder, file) for file in os.listdir(video_folder) if file.endswith('.mp4')]  # considering only mp4 files

# Create examples: assuming every video is about 'dancing'
examples = [(video, "dancing") for video in video_files]

iface = gr.Interface(
    fn=model_interface,
    inputs=[
        gr.components.Video(label="Upload a video file"),
        gr.components.Textbox(default="dancing", label="Desired Activity to Recognize"),
    ],
    outputs=[
        gr.components.Image(type="pil", label="Sampled Frames"),
        gr.components.Textbox(type="text", label="Probabilities"),
        gr.components.Textbox(type="text", label="Raw Scores"),
        gr.components.Textbox(type="text", label="Top Prediction")
    ],
    title="Engagify's Advanced Image Recognition Suite",
    description="[[V0.5.1] Video Action Recognition - Copyright Engajify 2023] [Author: Ibrahim Ali] [Method: XCLIP ZERO SHOT / SAMPLED FRAMES = 32]",
    live=False,
    examples=examples  # Add examples to the interface
)

iface.launch()