|
import gradio as gr |
|
|
|
""" |
|
===================================================== |
|
Optical Flow: Predicting movement with the RAFT model |
|
===================================================== |
|
|
|
Optical flow is the task of predicting movement between two images, usually two |
|
consecutive frames of a video. Optical flow models take two images as input, and |
|
predict a flow: the flow indicates the displacement of every single pixel in the |
|
first image, and maps it to its corresponding pixel in the second image. Flows |
|
are (2, H, W)-dimensional tensors, where the first axis corresponds to the |
|
predicted horizontal and vertical displacements. |
|
|
|
The following example illustrates how torchvision can be used to predict flows |
|
using our implementation of the RAFT model. We will also see how to convert the |
|
predicted flows to RGB images for visualization. |
|
""" |
|
|
|
import cv2 |
|
import numpy as np |
|
import torch |
|
import matplotlib.pyplot as plt |
|
import torchvision.transforms.functional as F |
|
from torchvision.io import read_video |
|
from torchvision.models.optical_flow import Raft_Large_Weights |
|
from torchvision.models.optical_flow import raft_large |
|
from torchvision.io import write_jpeg |
|
|
|
import tempfile |
|
from pathlib import Path |
|
from urllib.request import urlretrieve |
|
|
|
def write_flo(flow, filename): |
|
""" |
|
Write optical flow in Middlebury .flo format |
|
|
|
:param flow: optical flow map |
|
:param filename: optical flow file path to be saved |
|
:return: None |
|
|
|
from https://github.com/liruoteng/OpticalFlowToolkit/ |
|
|
|
""" |
|
|
|
flow = flow.cpu().data.numpy() |
|
flow = flow.astype(np.float32) |
|
f = open(filename, 'wb') |
|
magic = np.array([202021.25], dtype=np.float32) |
|
(height, width) = flow.shape[0:2] |
|
w = np.array([width], dtype=np.int32) |
|
h = np.array([height], dtype=np.int32) |
|
magic.tofile(f) |
|
w.tofile(f) |
|
h.tofile(f) |
|
flow.tofile(f) |
|
f.close() |
|
|
|
def warp_flow(img, flow): |
|
h, w = flow.shape[:2] |
|
flow = flow.copy() |
|
flow[:, :, 0] += np.arange(w) |
|
flow[:, :, 1] += np.arange(h)[:, np.newaxis] |
|
res = cv2.remap(img, flow, None, cv2.INTER_LINEAR) |
|
return res |
|
|
|
def read_flo_file(filename): |
|
""" |
|
Read from Middlebury .flo file |
|
:param flow_file: name of the flow file |
|
:return: optical flow data in matrix |
|
""" |
|
f = open(filename, 'rb') |
|
magic = np.fromfile(f, np.float32, count=1) |
|
data2d = None |
|
|
|
if 202021.25 != magic: |
|
print('Magic number incorrect. Invalid .flo file') |
|
else: |
|
w = np.fromfile(f, np.int32, count=1) |
|
h = np.fromfile(f, np.int32, count=1) |
|
print("Reading %d x %d flow file in .flo format" % (h, w)) |
|
data2d = np.fromfile(f, np.float32, count=2 * w * h) |
|
|
|
data2d = np.resize(data2d, (h[0], w[0], 2)) |
|
f.close() |
|
return data2d |
|
|
|
def warp(frame1, frame2, flo_path, blend=0.5, weights_path=None): |
|
|
|
flow21 = flo_path.cpu().data.numpy() |
|
frame1pil = np.array(frame1) |
|
frame1_warped21 = warp_flow(frame1pil, flow21) |
|
|
|
frame2pil = np.array(frame2) |
|
|
|
if weights_path: |
|
forward_weights = load_cc(weights_path) |
|
blended_w = frame2pil*(1-blend) + blend*(frame1_warped21*forward_weights+frame2pil*(1-forward_weights)) |
|
else: blended_w = frame2pil*(1-blend) + frame1_warped21*(blend) |
|
|
|
return PIL.Image.fromarray(blended_w.astype('uint8')) |
|
|
|
|
|
def infer(): |
|
video_url = "https://download.pytorch.org/tutorial/pexelscom_pavel_danilyuk_basketball_hd.mp4" |
|
video_path = Path(tempfile.mkdtemp()) / "basketball.mp4" |
|
_ = urlretrieve(video_url, video_path) |
|
|
|
|
|
|
|
|
|
frames, _, _ = read_video(str(video_path), output_format="TCHW") |
|
|
|
img1_batch = torch.stack([frames[100]]) |
|
img2_batch = torch.stack([frames[101]]) |
|
|
|
|
|
|
|
weights = Raft_Large_Weights.DEFAULT |
|
transforms = weights.transforms() |
|
|
|
|
|
def preprocess(img1_batch, img2_batch): |
|
img1_batch = F.resize(img1_batch, size=[520, 960]) |
|
img2_batch = F.resize(img2_batch, size=[520, 960]) |
|
return transforms(img1_batch, img2_batch) |
|
|
|
|
|
img1_batch, img2_batch = preprocess(img1_batch, img2_batch) |
|
|
|
print(f"shape = {img1_batch.shape}, dtype = {img1_batch.dtype}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
model = raft_large(weights=Raft_Large_Weights.DEFAULT, progress=False).to(device) |
|
model = model.eval() |
|
|
|
list_of_flows = model(img1_batch.to(device), img2_batch.to(device)) |
|
print(f"type = {type(list_of_flows)}") |
|
print(f"length = {len(list_of_flows)} = number of iterations of the model") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
predicted_flows = list_of_flows[-1] |
|
print(f"dtype = {predicted_flows.dtype}") |
|
print(f"shape = {predicted_flows.shape} = (N, 2, H, W)") |
|
print(f"min = {predicted_flows.min()}, max = {predicted_flows.max()}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from torchvision.utils import flow_to_image |
|
|
|
|
|
|
|
|
|
|
|
predicted_flow = list_of_flows[-1][0] |
|
flow_img = flow_to_image(predicted_flow).to("cpu") |
|
|
|
write_jpeg(flow_img, f"predicted_flow.jpg") |
|
flo_file = write_flo(predicted_flow, "flofile.flo") |
|
|
|
warp_res = warp(img1_batch, img2_batch, predicted_flow, blend=0.5) |
|
return "done", "predicted_flow.jpg", ["flofile.flo"], warp_res |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gr.Interface(fn=infer, inputs=[], outputs=[gr.Textbox(), gr.Image(), gr.Files(), gr.Image(type="pil")]).launch() |