import os import time from common.arguments import parse_args from common.camera import * from common.generators import * from common.loss import * from common.model import * from common.utils import Timer, evaluate, add_path from common.inference_3d import * from model.block.refine import refine from model.stmo import Model import HPE2keyframes as Hk from datetime import datetime import pytz # from joints_detectors.openpose.main import generate_kpts as open_pose os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" # see issue #152 os.environ["CUDA_VISIBLE_DEVICES"] = "0" metadata = {'layout_name': 'coco', 'num_joints': 17, 'keypoints_symmetry': [[1, 3, 5, 7, 9, 11, 13, 15], [2, 4, 6, 8, 10, 12, 14, 16]]} add_path() # record time def ckpt_time(ckpt=None): if not ckpt: return time.time() else: return time.time() - float(ckpt), time.time() time0 = ckpt_time() def get_detector_2d(detector_name): def get_alpha_pose(): from joints_detectors.Alphapose.gene_npz import generate_kpts as alpha_pose return alpha_pose detector_map = { 'alpha_pose': get_alpha_pose, # 'hr_pose': get_hr_pose, # 'open_pose': open_pose } assert detector_name in detector_map, f'2D detector: {detector_name} not implemented yet!' return detector_map[detector_name]() class Skeleton: def parents(self): return np.array([-1, 0, 1, 2, 0, 4, 5, 0, 7, 8, 9, 8, 11, 12, 8, 14, 15]) def joints_right(self): return [1, 2, 3, 14, 15, 16] def main(args, progress): detector_2d = get_detector_2d(args.detector_2d) assert detector_2d, 'detector_2d should be in ({alpha, hr, open}_pose)' # 2D kpts loads or generate #args.input_npz = './outputs/alpha_pose_skiing_cut/skiing_cut.npz' if not args.input_npz: video_name = args.viz_video keypoints = detector_2d(video_name, progress) else: npz = np.load(args.input_npz) keypoints = npz['kpts'] # (N, 17, 2) keypoints_symmetry = metadata['keypoints_symmetry'] kps_left, kps_right = list(keypoints_symmetry[0]), list(keypoints_symmetry[1]) joints_left, joints_right = list([4, 5, 6, 11, 12, 13]), list([1, 2, 3, 14, 15, 16]) # normlization keypoints Suppose using the camera parameter keypoints = normalize_screen_coordinates(keypoints[..., :2], w=1000, h=1002) # model_pos = TemporalModel(17, 2, 17, filter_widths=[3, 3, 3, 3, 3], causal=args.causal, dropout=args.dropout, channels=args.channels, # dense=args.dense) model = {} model['trans'] = Model(args) # if torch.cuda.is_available(): # model_pos = model_pos ckpt, time1 = ckpt_time(time0) print('-------------- load data spends {:.2f} seconds'.format(ckpt)) # load trained model # chk_filename = os.path.join(args.checkpoint, args.resume if args.resume else args.evaluate) # print('Loading checkpoint', chk_filename) # checkpoint = torch.load(chk_filename, map_location=lambda storage, loc: storage) # 把loc映射到storage # model_pos.load_state_dict(checkpoint['model_pos']) model_dict = model['trans'].state_dict() no_refine_path = "checkpoint/PSTMOS_no_refine_48_5137_in_the_wild.pth" pre_dict = torch.load(no_refine_path, map_location=torch.device('cpu')) for key, value in pre_dict.items(): name = key[7:] model_dict[name] = pre_dict[key] model['trans'].load_state_dict(model_dict) ckpt, time2 = ckpt_time(time1) print('-------------- load 3D model spends {:.2f} seconds'.format(ckpt)) # Receptive field: 243 frames for args.arc [3, 3, 3, 3, 3] receptive_field = args.frames pad = (receptive_field - 1) // 2 # Padding on each side causal_shift = 0 print('Rendering...') input_keypoints = keypoints.copy() print(input_keypoints.shape) # gen = UnchunkedGenerator(None, None, [input_keypoints], # pad=pad, causal_shift=causal_shift, augment=args.test_time_augmentation, # kps_left=kps_left, kps_right=kps_right, joints_left=joints_left, joints_right=joints_right) # test_data = Fusion(opt=args, train=False, dataset=dataset, root_path=root_path, MAE=opt.MAE) # test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=1, # shuffle=False, num_workers=0, pin_memory=True) #prediction = evaluate(gen, model_pos, return_predictions=True) gen = Evaluate_Generator(128, None, None, [input_keypoints], args.stride, pad=pad, causal_shift=causal_shift, augment=args.test_time_augmentation, shuffle=False, kps_left=kps_left, kps_right=kps_right, joints_left=joints_left, joints_right=joints_right) prediction = val(args, gen, model, progress) # save 3D joint points # np.save(f'outputs/test_3d_{args.video_name}_output.npy', prediction, allow_pickle=True) rot = np.array([0.14070565, -0.15007018, -0.7552408, 0.62232804], dtype=np.float32) prediction = camera_to_world(prediction, R=rot, t=0) # We don't have the trajectory, but at least we can rebase the height prediction[:, :, 2] -= np.min(prediction[:, :, 2]) output_dir_dict = {} npy_filename = f'output_3Dpose_npy/{args.video_name}.npy' output_dir_dict['npy'] = npy_filename np.save(npy_filename, prediction, allow_pickle=True) anim_output = {'Skeleton': prediction} input_keypoints = image_coordinates(input_keypoints[..., :2], w=1000, h=1002) ckpt, time3 = ckpt_time(time2) print('-------------- generate reconstruction 3D data spends {:.2f} seconds'.format(ckpt)) if not args.viz_output: args.viz_output = 'outputs/alpha_result.mp4' from common.visualization import render_animation render_animation(input_keypoints, anim_output, Skeleton(), 25, args.viz_bitrate, np.array(70., dtype=np.float32), args.viz_output, progress, limit=args.viz_limit, downsample=args.viz_downsample, size=args.viz_size, input_video_path=args.viz_video, viewport=(1000, 1002), input_video_skip=args.viz_skip) ckpt, time4 = ckpt_time(time3) print('total spend {:2f} second'.format(ckpt)) return output_dir_dict def inference_video(video_path, detector_2d, progress): """ Do image -> 2d points -> 3d points to video. :param detector_2d: used 2d joints detector. Can be {alpha_pose, hr_pose} :param video_path: relative to outputs :return: None """ args = parse_args() args.detector_2d = detector_2d dir_name = os.path.dirname(video_path) basename = os.path.basename(video_path) args.video_name = basename[:basename.rfind('.')] args.viz_video = video_path args.viz_output = f'output_videos/{args.video_name}.mp4' args.evaluate = 'pretrained_h36m_detectron_coco.bin' with Timer(video_path): output_dir_dict = main(args, progress) output_dir_dict["output_videos"] = args.viz_output output_dir_dict["video_name"] = args.video_name return output_dir_dict def gr_video2mc(video_path, progress): print("\n>>>>> One video uploaded <<<<<\n") china_tz = pytz.timezone('Asia/Shanghai') current_time = datetime.now(china_tz) formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') print(f"Start Time: {formatted_time}\n") if not os.path.exists('output_3Dpose_npy'): os.makedirs('output_3Dpose_npy') if not os.path.exists('output_alphapose'): os.makedirs('output_alphapose') if not os.path.exists('output_miframes'): os.makedirs('output_miframes') if not os.path.exists('output_videos'): os.makedirs('output_videos') FPS_mine_imator = 30 output_dir_dict = inference_video(video_path, 'alpha_pose', progress) Hk.hpe2keyframes(output_dir_dict['npy'], FPS_mine_imator, f"output_miframes/{output_dir_dict['video_name']}.miframes") path1 = os.path.abspath(f"output_miframes/{output_dir_dict['video_name']}.miframes") path2 = os.path.abspath(f"output_videos/{output_dir_dict['video_name']}.mp4") print("\n----- One video processed -----\n") china_tz = pytz.timezone('Asia/Shanghai') current_time = datetime.now(china_tz) formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') print(f"Finished Time: {formatted_time}\n") return path1, path2 if __name__ == '__main__': files = os.listdir('./input_videos') FPS_mine_imator = 30 for file in files: output_dir_dict = inference_video(os.path.join('input_videos', file), 'alpha_pose') Hk.hpe2keyframes(output_dir_dict['npy'], FPS_mine_imator, f"output_miframes/{output_dir_dict['video_name']}.miframes")