Spaces:
Running
Running
# Copyright (c) 2018-present, Facebook, Inc. | |
# All rights reserved. | |
# | |
# This source code is licensed under the license found in the | |
# LICENSE file in the root directory of this source tree. | |
# | |
import hashlib | |
import os | |
import pathlib | |
import shutil | |
import sys | |
import time | |
import cv2 | |
import numpy as np | |
import torch | |
def add_path(): | |
Alphapose_path = os.path.abspath('joints_detectors/Alphapose') | |
hrnet_path = os.path.abspath('joints_detectors/hrnet') | |
trackers_path = os.path.abspath('pose_trackers') | |
paths = filter(lambda p: p not in sys.path, [Alphapose_path, hrnet_path, trackers_path]) | |
sys.path.extend(paths) | |
def wrap(func, *args, unsqueeze=False): | |
""" | |
Wrap a torch function so it can be called with NumPy arrays. | |
Input and return types are seamlessly converted. | |
""" | |
# Convert input types where applicable | |
args = list(args) | |
for i, arg in enumerate(args): | |
if type(arg) == np.ndarray: | |
args[i] = torch.from_numpy(arg) | |
if unsqueeze: | |
args[i] = args[i].unsqueeze(0) | |
result = func(*args) | |
# Convert output types where applicable | |
if isinstance(result, tuple): | |
result = list(result) | |
for i, res in enumerate(result): | |
if type(res) == torch.Tensor: | |
if unsqueeze: | |
res = res.squeeze(0) | |
result[i] = res.numpy() | |
return tuple(result) | |
elif type(result) == torch.Tensor: | |
if unsqueeze: | |
result = result.squeeze(0) | |
return result.numpy() | |
else: | |
return result | |
def deterministic_random(min_value, max_value, data): | |
digest = hashlib.sha256(data.encode()).digest() | |
raw_value = int.from_bytes(digest[:4], byteorder='little', signed=False) | |
return int(raw_value / (2 ** 32 - 1) * (max_value - min_value)) + min_value | |
def alpha_map(prediction): | |
p_min, p_max = prediction.min(), prediction.max() | |
k = 1.6 / (p_max - p_min) | |
b = 0.8 - k * p_max | |
prediction = k * prediction + b | |
return prediction | |
def change_score(prediction, detectron_detection_path): | |
detectron_predictions = np.load(detectron_detection_path, allow_pickle=True)['positions_2d'].item() | |
pose = detectron_predictions['S1']['Directions 1'] | |
prediction[..., 2] = pose[..., 2] | |
return prediction | |
class Timer: | |
def __init__(self, message, show=True): | |
self.message = message | |
self.elapsed = 0 | |
self.show = show | |
def __enter__(self): | |
self.start = time.perf_counter() | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
if self.show: | |
print(f'{self.message} --- elapsed time: {time.perf_counter() - self.start} s') | |
def calculate_area(data): | |
""" | |
Get the rectangle area of keypoints. | |
:param data: AlphaPose json keypoint format([x, y, score, ... , x, y, score]) or AlphaPose result keypoint format([[x, y], ..., [x, y]]) | |
:return: area | |
""" | |
data = np.array(data) | |
if len(data.shape) == 1: | |
data = np.reshape(data, (-1, 3)) | |
width = min(data[:, 0]) - max(data[:, 0]) | |
height = min(data[:, 1]) - max(data[:, 1]) | |
return np.abs(width * height) | |
def read_video(filename, fps=None, skip=0, limit=-1): | |
stream = cv2.VideoCapture(filename) | |
i = 0 | |
while True: | |
grabbed, frame = stream.read() | |
# if the `grabbed` boolean is `False`, then we have | |
# reached the end of the video file | |
if not grabbed: | |
print('===========================> This video get ' + str(i) + ' frames in total.') | |
sys.stdout.flush() | |
break | |
i += 1 | |
if i > skip: | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
yield np.array(frame) | |
if i == limit: | |
break | |
def split_video(video_path): | |
stream = cv2.VideoCapture(video_path) | |
output_dir = os.path.dirname(video_path) | |
video_name = os.path.basename(video_path) | |
video_name = video_name[:video_name.rfind('.')] | |
save_folder = pathlib.Path(f'./{output_dir}/alpha_pose_{video_name}/split_image/') | |
shutil.rmtree(str(save_folder), ignore_errors=True) | |
save_folder.mkdir(parents=True, exist_ok=True) | |
total_frames = int(stream.get(cv2.CAP_PROP_FRAME_COUNT)) | |
length = len(str(total_frames)) + 1 | |
i = 1 | |
while True: | |
grabbed, frame = stream.read() | |
if not grabbed: | |
print(f'Split totally {i + 1} images from video.') | |
break | |
save_path = f'{save_folder}/output{str(i).zfill(length)}.png' | |
cv2.imwrite(save_path, frame) | |
i += 1 | |
saved_path = os.path.dirname(save_path) | |
print(f'Split images saved in {saved_path}') | |
return saved_path | |
def evaluate(test_generator, model_pos, action=None, return_predictions=False): | |
""" | |
Inference the 3d positions from 2d position. | |
:type test_generator: UnchunkedGenerator | |
:param test_generator: | |
:param model_pos: 3d pose model | |
:param return_predictions: return predictions if true | |
:return: | |
""" | |
joints_left, joints_right = list([4, 5, 6, 11, 12, 13]), list([1, 2, 3, 14, 15, 16]) | |
with torch.no_grad(): | |
model_pos.eval() | |
N = 0 | |
for _, batch, batch_2d in test_generator.next_epoch(): | |
inputs_2d = torch.from_numpy(batch_2d.astype('float32')) | |
if torch.cuda.is_available(): | |
inputs_2d = inputs_2d | |
# Positional model | |
predicted_3d_pos = model_pos(inputs_2d) | |
if test_generator.augment_enabled(): | |
# Undo flipping and take average with non-flipped version | |
predicted_3d_pos[1, :, :, 0] *= -1 | |
predicted_3d_pos[1, :, joints_left + joints_right] = predicted_3d_pos[1, :, joints_right + joints_left] | |
predicted_3d_pos = torch.mean(predicted_3d_pos, dim=0, keepdim=True) | |
if return_predictions: | |
return predicted_3d_pos.squeeze(0).cpu().numpy() | |
if __name__ == '__main__': | |
os.chdir('..') | |
split_video('outputs/kobe.mp4') | |