Ayushnangia's picture
Update app.py
ca57650
import cv2
import time
import numpy as np
import onnx
import onnxruntime
import os
os.system('pip install --upgrade --force-reinstall onnxruntime')
# Ref: https://github.com/liruoteng/OpticalFlowToolkit/blob/5cf87b947a0032f58c922bbc22c0afb30b90c418/lib/flowlib.py#L249
import numpy as np
UNKNOWN_FLOW_THRESH = 1e7
def make_color_wheel():
"""
Generate color wheel according Middlebury color code
:return: Color wheel
"""
RY = 15
YG = 6
GC = 4
CB = 11
BM = 13
MR = 6
ncols = RY + YG + GC + CB + BM + MR
colorwheel = np.zeros([ncols, 3])
col = 0
# RY
colorwheel[0:RY, 0] = 255
colorwheel[0:RY, 1] = np.transpose(np.floor(255*np.arange(0, RY) / RY))
col += RY
# YG
colorwheel[col:col+YG, 0] = 255 - np.transpose(np.floor(255*np.arange(0, YG) / YG))
colorwheel[col:col+YG, 1] = 255
col += YG
# GC
colorwheel[col:col+GC, 1] = 255
colorwheel[col:col+GC, 2] = np.transpose(np.floor(255*np.arange(0, GC) / GC))
col += GC
# CB
colorwheel[col:col+CB, 1] = 255 - np.transpose(np.floor(255*np.arange(0, CB) / CB))
colorwheel[col:col+CB, 2] = 255
col += CB
# BM
colorwheel[col:col+BM, 2] = 255
colorwheel[col:col+BM, 0] = np.transpose(np.floor(255*np.arange(0, BM) / BM))
col += + BM
# MR
colorwheel[col:col+MR, 2] = 255 - np.transpose(np.floor(255 * np.arange(0, MR) / MR))
colorwheel[col:col+MR, 0] = 255
return colorwheel
colorwheel = make_color_wheel()
def compute_color(u, v):
"""
compute optical flow color map
:param u: optical flow horizontal map
:param v: optical flow vertical map
:return: optical flow in color code
"""
[h, w] = u.shape
img = np.zeros([h, w, 3])
nanIdx = np.isnan(u) | np.isnan(v)
u[nanIdx] = 0
v[nanIdx] = 0
ncols = np.size(colorwheel, 0)
rad = np.sqrt(u**2+v**2)
a = np.arctan2(-v, -u) / np.pi
fk = (a+1) / 2 * (ncols - 1) + 1
k0 = np.floor(fk).astype(int)
k1 = k0 + 1
k1[k1 == ncols+1] = 1
f = fk - k0
for i in range(0, np.size(colorwheel,1)):
tmp = colorwheel[:, i]
col0 = tmp[k0-1] / 255
col1 = tmp[k1-1] / 255
col = (1-f) * col0 + f * col1
idx = rad <= 1
col[idx] = 1-rad[idx]*(1-col[idx])
notidx = np.logical_not(idx)
col[notidx] *= 0.75
img[:, :, i] = np.uint8(np.floor(255 * col*(1-nanIdx)))
return img
def flow_to_image(flow):
"""
Convert flow into middlebury color code image
:param flow: optical flow map
:return: optical flow image in middlebury color
"""
u = flow[:, :, 0]
v = flow[:, :, 1]
maxu = -999.
maxv = -999.
minu = 999.
minv = 999.
idxUnknow = (abs(u) > UNKNOWN_FLOW_THRESH) | (abs(v) > UNKNOWN_FLOW_THRESH)
u[idxUnknow] = 0
v[idxUnknow] = 0
maxu = max(maxu, np.max(u))
minu = min(minu, np.min(u))
maxv = max(maxv, np.max(v))
minv = min(minv, np.min(v))
rad = np.sqrt(u ** 2 + v ** 2)
maxrad = max(-1, np.max(rad))
u = u/(maxrad + np.finfo(float).eps)
v = v/(maxrad + np.finfo(float).eps)
img = compute_color(u, v)
idx = np.repeat(idxUnknow[:, :, np.newaxis], 3, axis=2)
img[idx] = 0
return np.uint8(img)
class Raft():
def __init__(self, model_path):
# Initialize model
self.initialize_model(model_path)
def __call__(self, img1, img2):
return self.estimate_flow(img1, img2)
def initialize_model(self, model_path):
self.session = onnxruntime.InferenceSession(model_path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
# Get model info
self.get_input_details()
self.get_output_details()
def estimate_flow(self, img1, img2):
input_tensor1 = self.prepare_input(img1)
input_tensor2 = self.prepare_input(img2)
outputs = self.inference(input_tensor1, input_tensor2)
self.flow_map = self.process_output(outputs)
return self.flow_map
def prepare_input(self, img):
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
self.img_height, self.img_width = img.shape[:2]
img_input = cv2.resize(img, (self.input_width,self.input_height))
# img_input = img_input/255
img_input = img_input.transpose(2, 0, 1)
img_input = img_input[np.newaxis,:,:,:]
return img_input.astype(np.float32)
def inference(self, input_tensor1, input_tensor2):
# start = time.time()
outputs = self.session.run(self.output_names, {self.input_names[0]: input_tensor1,
self.input_names[1]: input_tensor2})
# print(time.time() - start)
return outputs
def process_output(self, output):
flow_map = output[1][0].transpose(1, 2, 0)
return flow_map
def draw_flow(self):
# Convert flow to image
flow_img = flow_to_image(self.flow_map)
# Convert to BGR
flow_img = cv2.cvtColor(flow_img, cv2.COLOR_RGB2BGR)
# Resize the depth map to match the input image shape
return cv2.resize(flow_img, (self.img_width,self.img_height))
def get_input_details(self):
model_inputs = self.session.get_inputs()
self.input_names = [model_inputs[i].name for i in range(len(model_inputs))]
self.input_shape = model_inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
def get_output_details(self):
model_outputs = self.session.get_outputs()
self.output_names = [model_outputs[i].name for i in range(len(model_outputs))]
self.output_shape = model_outputs[0].shape
self.output_height = self.output_shape[2]
self.output_width = self.output_shape[3]
if __name__ == '__main__':
from imread_from_url import imread_from_url
# Initialize model
model_path='raft_small_iter10_240x320.onnx'
flow_estimator = Raft(model_path)
# Read inference image
img1 = imread_from_url("https://github.com/princeton-vl/RAFT/blob/master/demo-frames/frame_0016.png?raw=true")
img2 = imread_from_url("https://github.com/princeton-vl/RAFT/blob/master/demo-frames/frame_0025.png?raw=true")
# Estimate flow and colorize it
flow_map = flow_estimator(img1, img2)
flow_img = flow_estimator.draw_flow()
combined_img = np.hstack((img1, img2, flow_img))
#cv2.namedWindow("Estimated flow", cv2.WINDOW_NORMAL)
#cv2.imshow("Estimated flow", combined_img)
#cv2.waitKey(0)
import os
import cv2
import gradio as gr
import yt_dlp
def download_youtube_video(youtube_url, output_filename):
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': output_filename,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_url])
def process_video(youtube_url, start_time, flow_frame_offset):
model_path = 'models/raft_small_iter10_240x320.onnx'
flow_estimator = Raft(model_path)
output_filename = 'downloaded_video.mp4'
processed_output = 'processed_video.mp4'
# Download video
if os.path.exists(output_filename):
os.remove(output_filename)
download_youtube_video(youtube_url, output_filename)
cap = cv2.VideoCapture(output_filename)
if not cap.isOpened():
return "Error: Could not open video."
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(processed_output, fourcc, fps, (frame_width, frame_height))
cap.set(cv2.CAP_PROP_POS_FRAMES, start_time * fps)
frame_list = []
frame_num = 0
while cap.isOpened():
ret, prev_frame = cap.read()
if not ret:
break
frame_list.append(prev_frame)
frame_num += 1
if frame_num <= flow_frame_offset:
continue
flow_map = flow_estimator(frame_list[0], frame_list[-1])
flow_img = flow_estimator.draw_flow()
alpha = 0.5
combined_img = cv2.addWeighted(frame_list[0], alpha, flow_img, (1 - alpha), 0)
if combined_img is None:
break
out.write(combined_img)
frame_list.pop(0)
cap.release()
out.release()
return processed_output
examples = [
["https://www.youtube.com/watch?v=is38pqgbj6A", 5, 50, "output_1.mp4"],
["https://www.youtube.com/watch?v=AdbrfoxiAtk", 0, 60, "output_2.mp4"],
["https://www.youtube.com/watch?v=vWGg0iPmI8k", 13, 70, "output_3.mp4"],
]
with gr.Blocks() as app:
gr.Markdown("### Optical Flow Video Processing\n"
"Enter a YouTube URL, set the start time and flow frame offset, "
"then click 'Process Video' to see the optical flow processing.")
with gr.Row():
with gr.Column():
youtube_url = gr.Textbox(label="YouTube URL", placeholder="Enter YouTube Video URL Here")
start_time = gr.Slider(minimum=0, maximum=60, label="Start Time (seconds)", step=1)
flow_frame_offset = gr.Slider(minimum=1, maximum=100, label="Flow Frame Offset", step=1)
submit_button = gr.Button("Process Video")
with gr.Column():
output_video = gr.Video(label="Processed Video")
submit_button.click(
fn=process_video,
inputs=[youtube_url, start_time, flow_frame_offset],
outputs=output_video
)
gr.Examples(examples=examples,
inputs=[youtube_url, start_time, flow_frame_offset],
fn=process_video,
outputs=output_video,
cache_examples=False)
app.launch()