Spaces:
Sleeping
Sleeping
import cv2 | |
import numpy as np | |
from gradio_utils.flow_utils import bivariate_Gaussian | |
OBJECT_MOTION_MODE = ["Provided Trajectory", "Custom Trajectory"] | |
PROVIDED_TRAJS = { | |
"horizon_1": "examples/trajectories/horizon_2.txt", | |
"swaying_1": "examples/trajectories/shake_1.txt", | |
"swaying_2": "examples/trajectories/shake_2.txt", | |
"swaying_3": "examples/trajectories/shaking_10.txt", | |
"curve_1": "examples/trajectories/curve_1.txt", | |
"curve_2": "examples/trajectories/curve_2.txt", | |
"curve_3": "examples/trajectories/curve_3.txt", | |
"curve_4": "examples/trajectories/curve_4.txt", | |
} | |
def read_points(file, video_len=16, reverse=False): | |
with open(file, 'r') as f: | |
lines = f.readlines() | |
points = [] | |
for line in lines: | |
x, y = line.strip().split(',') | |
points.append((int(x), int(y))) | |
if reverse: | |
points = points[::-1] | |
if len(points) > video_len: | |
skip = len(points) // video_len | |
points = points[::skip] | |
points = points[:video_len] | |
return points | |
def get_provided_traj(traj_name): | |
traj = read_points(PROVIDED_TRAJS[traj_name]) | |
# xrange from 256 to 1024 | |
traj = [[int(1024*x/256), int(1024*y/256)] for x,y in traj] | |
return traj | |
blur_kernel = bivariate_Gaussian(99, 10, 10, 0, grid=None, isotropic=True) | |
def process_points(points): | |
frames = 16 | |
defualt_points = [[512,512]]*16 | |
if len(points) < 2: | |
return defualt_points | |
elif len(points) >= frames: | |
skip = len(points)//frames | |
return points[::skip][:15] + points[-1:] | |
else: | |
insert_num = frames - len(points) | |
insert_num_dict = {} | |
interval = len(points) - 1 | |
n = insert_num // interval | |
m = insert_num % interval | |
for i in range(interval): | |
insert_num_dict[i] = n | |
for i in range(m): | |
insert_num_dict[i] += 1 | |
res = [] | |
for i in range(interval): | |
insert_points = [] | |
x0,y0 = points[i] | |
x1,y1 = points[i+1] | |
delta_x = x1 - x0 | |
delta_y = y1 - y0 | |
for j in range(insert_num_dict[i]): | |
x = x0 + (j+1)/(insert_num_dict[i]+1)*delta_x | |
y = y0 + (j+1)/(insert_num_dict[i]+1)*delta_y | |
insert_points.append([int(x), int(y)]) | |
res += points[i:i+1] + insert_points | |
res += points[-1:] | |
return res | |
def get_flow(points, video_len=16): | |
optical_flow = np.zeros((video_len, 256, 256, 2), dtype=np.float32) | |
for i in range(video_len-1): | |
p = points[i] | |
p1 = points[i+1] | |
optical_flow[i+1, p[1], p[0], 0] = p1[0] - p[0] | |
optical_flow[i+1, p[1], p[0], 1] = p1[1] - p[1] | |
for i in range(1, video_len): | |
optical_flow[i] = cv2.filter2D(optical_flow[i], -1, blur_kernel) | |
return optical_flow | |
def process_traj(points, device='cpu'): | |
xy_range = 1024 | |
points = process_points(points) | |
points = [[int(256*x/xy_range), int(256*y/xy_range)] for x,y in points] | |
optical_flow = get_flow(points) | |
# optical_flow = torch.tensor(optical_flow).to(device) | |
return optical_flow |