Video2MC / common /utils.py
Sapphire-356's picture
Change to the CPU version
aa34300
# Copyright (c) 2018-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
#
import hashlib
import os
import pathlib
import shutil
import sys
import time
import cv2
import numpy as np
import torch
def add_path():
Alphapose_path = os.path.abspath('joints_detectors/Alphapose')
hrnet_path = os.path.abspath('joints_detectors/hrnet')
trackers_path = os.path.abspath('pose_trackers')
paths = filter(lambda p: p not in sys.path, [Alphapose_path, hrnet_path, trackers_path])
sys.path.extend(paths)
def wrap(func, *args, unsqueeze=False):
"""
Wrap a torch function so it can be called with NumPy arrays.
Input and return types are seamlessly converted.
"""
# Convert input types where applicable
args = list(args)
for i, arg in enumerate(args):
if type(arg) == np.ndarray:
args[i] = torch.from_numpy(arg)
if unsqueeze:
args[i] = args[i].unsqueeze(0)
result = func(*args)
# Convert output types where applicable
if isinstance(result, tuple):
result = list(result)
for i, res in enumerate(result):
if type(res) == torch.Tensor:
if unsqueeze:
res = res.squeeze(0)
result[i] = res.numpy()
return tuple(result)
elif type(result) == torch.Tensor:
if unsqueeze:
result = result.squeeze(0)
return result.numpy()
else:
return result
def deterministic_random(min_value, max_value, data):
digest = hashlib.sha256(data.encode()).digest()
raw_value = int.from_bytes(digest[:4], byteorder='little', signed=False)
return int(raw_value / (2 ** 32 - 1) * (max_value - min_value)) + min_value
def alpha_map(prediction):
p_min, p_max = prediction.min(), prediction.max()
k = 1.6 / (p_max - p_min)
b = 0.8 - k * p_max
prediction = k * prediction + b
return prediction
def change_score(prediction, detectron_detection_path):
detectron_predictions = np.load(detectron_detection_path, allow_pickle=True)['positions_2d'].item()
pose = detectron_predictions['S1']['Directions 1']
prediction[..., 2] = pose[..., 2]
return prediction
class Timer:
def __init__(self, message, show=True):
self.message = message
self.elapsed = 0
self.show = show
def __enter__(self):
self.start = time.perf_counter()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.show:
print(f'{self.message} --- elapsed time: {time.perf_counter() - self.start} s')
def calculate_area(data):
"""
Get the rectangle area of keypoints.
:param data: AlphaPose json keypoint format([x, y, score, ... , x, y, score]) or AlphaPose result keypoint format([[x, y], ..., [x, y]])
:return: area
"""
data = np.array(data)
if len(data.shape) == 1:
data = np.reshape(data, (-1, 3))
width = min(data[:, 0]) - max(data[:, 0])
height = min(data[:, 1]) - max(data[:, 1])
return np.abs(width * height)
def read_video(filename, fps=None, skip=0, limit=-1):
stream = cv2.VideoCapture(filename)
i = 0
while True:
grabbed, frame = stream.read()
# if the `grabbed` boolean is `False`, then we have
# reached the end of the video file
if not grabbed:
print('===========================> This video get ' + str(i) + ' frames in total.')
sys.stdout.flush()
break
i += 1
if i > skip:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
yield np.array(frame)
if i == limit:
break
def split_video(video_path):
stream = cv2.VideoCapture(video_path)
output_dir = os.path.dirname(video_path)
video_name = os.path.basename(video_path)
video_name = video_name[:video_name.rfind('.')]
save_folder = pathlib.Path(f'./{output_dir}/alpha_pose_{video_name}/split_image/')
shutil.rmtree(str(save_folder), ignore_errors=True)
save_folder.mkdir(parents=True, exist_ok=True)
total_frames = int(stream.get(cv2.CAP_PROP_FRAME_COUNT))
length = len(str(total_frames)) + 1
i = 1
while True:
grabbed, frame = stream.read()
if not grabbed:
print(f'Split totally {i + 1} images from video.')
break
save_path = f'{save_folder}/output{str(i).zfill(length)}.png'
cv2.imwrite(save_path, frame)
i += 1
saved_path = os.path.dirname(save_path)
print(f'Split images saved in {saved_path}')
return saved_path
def evaluate(test_generator, model_pos, action=None, return_predictions=False):
"""
Inference the 3d positions from 2d position.
:type test_generator: UnchunkedGenerator
:param test_generator:
:param model_pos: 3d pose model
:param return_predictions: return predictions if true
:return:
"""
joints_left, joints_right = list([4, 5, 6, 11, 12, 13]), list([1, 2, 3, 14, 15, 16])
with torch.no_grad():
model_pos.eval()
N = 0
for _, batch, batch_2d in test_generator.next_epoch():
inputs_2d = torch.from_numpy(batch_2d.astype('float32'))
if torch.cuda.is_available():
inputs_2d = inputs_2d
# Positional model
predicted_3d_pos = model_pos(inputs_2d)
if test_generator.augment_enabled():
# Undo flipping and take average with non-flipped version
predicted_3d_pos[1, :, :, 0] *= -1
predicted_3d_pos[1, :, joints_left + joints_right] = predicted_3d_pos[1, :, joints_right + joints_left]
predicted_3d_pos = torch.mean(predicted_3d_pos, dim=0, keepdim=True)
if return_predictions:
return predicted_3d_pos.squeeze(0).cpu().numpy()
if __name__ == '__main__':
os.chdir('..')
split_video('outputs/kobe.mp4')