Spaces:
Runtime error
Runtime error
from __future__ import annotations | |
import os | |
import shlex | |
import subprocess | |
import sys | |
import tempfile | |
if os.getenv('SYSTEM') == 'spaces': | |
import mim | |
mim.uninstall('mmcv-full', confirm_yes=True) | |
mim.install('mmcv-full==1.5.0', is_yes=True) | |
subprocess.call(shlex.split('pip uninstall -y opencv-python')) | |
subprocess.call(shlex.split('pip uninstall -y opencv-python-headless')) | |
subprocess.call( | |
shlex.split('pip install opencv-python-headless==4.5.5.64')) | |
import cv2 | |
import huggingface_hub | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
sys.path.insert(0, 'ViTPose/') | |
from mmdet.apis import inference_detector, init_detector | |
from mmpose.apis import (inference_top_down_pose_model, init_pose_model, | |
process_mmdet_results, vis_pose_result) | |
HF_TOKEN = os.getenv('HF_TOKEN') | |
class DetModel: | |
MODEL_DICT = { | |
'YOLOX-tiny': { | |
'config': | |
'mmdet_configs/configs/yolox/yolox_tiny_8x8_300e_coco.py', | |
'model': | |
'https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_tiny_8x8_300e_coco/yolox_tiny_8x8_300e_coco_20211124_171234-b4047906.pth', | |
}, | |
'YOLOX-s': { | |
'config': | |
'mmdet_configs/configs/yolox/yolox_s_8x8_300e_coco.py', | |
'model': | |
'https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_s_8x8_300e_coco/yolox_s_8x8_300e_coco_20211121_095711-4592a793.pth', | |
}, | |
'YOLOX-l': { | |
'config': | |
'mmdet_configs/configs/yolox/yolox_l_8x8_300e_coco.py', | |
'model': | |
'https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_l_8x8_300e_coco/yolox_l_8x8_300e_coco_20211126_140236-d3bd2b23.pth', | |
}, | |
'YOLOX-x': { | |
'config': | |
'mmdet_configs/configs/yolox/yolox_x_8x8_300e_coco.py', | |
'model': | |
'https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_x_8x8_300e_coco/yolox_x_8x8_300e_coco_20211126_140254-1ef88d67.pth', | |
}, | |
} | |
def __init__(self): | |
self.device = torch.device( | |
'cuda:0' if torch.cuda.is_available() else 'cpu') | |
self._load_all_models_once() | |
self.model_name = 'YOLOX-l' | |
self.model = self._load_model(self.model_name) | |
def _load_all_models_once(self) -> None: | |
for name in self.MODEL_DICT: | |
self._load_model(name) | |
def _load_model(self, name: str) -> nn.Module: | |
dic = self.MODEL_DICT[name] | |
return init_detector(dic['config'], dic['model'], device=self.device) | |
def set_model(self, name: str) -> None: | |
if name == self.model_name: | |
return | |
self.model_name = name | |
self.model = self._load_model(name) | |
def detect_and_visualize( | |
self, image: np.ndarray, | |
score_threshold: float) -> tuple[list[np.ndarray], np.ndarray]: | |
out = self.detect(image) | |
vis = self.visualize_detection_results(image, out, score_threshold) | |
return out, vis | |
def detect(self, image: np.ndarray) -> list[np.ndarray]: | |
image = image[:, :, ::-1] # RGB -> BGR | |
out = inference_detector(self.model, image) | |
return out | |
def visualize_detection_results( | |
self, | |
image: np.ndarray, | |
detection_results: list[np.ndarray], | |
score_threshold: float = 0.3) -> np.ndarray: | |
person_det = [detection_results[0]] + [np.array([]).reshape(0, 5)] * 79 | |
image = image[:, :, ::-1] # RGB -> BGR | |
vis = self.model.show_result(image, | |
person_det, | |
score_thr=score_threshold, | |
bbox_color=None, | |
text_color=(200, 200, 200), | |
mask_color=None) | |
return vis[:, :, ::-1] # BGR -> RGB | |
class PoseModel: | |
MODEL_DICT = { | |
'ViTPose-B (single-task train)': { | |
'config': | |
'ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_base_coco_256x192.py', | |
'model': 'models/vitpose-b.pth', | |
}, | |
'ViTPose-L (single-task train)': { | |
'config': | |
'ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_large_coco_256x192.py', | |
'model': 'models/vitpose-l.pth', | |
}, | |
'ViTPose-B (multi-task train, COCO)': { | |
'config': | |
'ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_base_coco_256x192.py', | |
'model': 'models/vitpose-b-multi-coco.pth', | |
}, | |
'ViTPose-L (multi-task train, COCO)': { | |
'config': | |
'ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_large_coco_256x192.py', | |
'model': 'models/vitpose-l-multi-coco.pth', | |
}, | |
} | |
def __init__(self): | |
self.device = torch.device( | |
'cuda:0' if torch.cuda.is_available() else 'cpu') | |
self.model_name = 'ViTPose-B (multi-task train, COCO)' | |
self.model = self._load_model(self.model_name) | |
def _load_all_models_once(self) -> None: | |
for name in self.MODEL_DICT: | |
self._load_model(name) | |
def _load_model(self, name: str) -> nn.Module: | |
dic = self.MODEL_DICT[name] | |
ckpt_path = huggingface_hub.hf_hub_download('hysts/ViTPose', | |
dic['model'], | |
use_auth_token=HF_TOKEN) | |
model = init_pose_model(dic['config'], ckpt_path, device=self.device) | |
return model | |
def set_model(self, name: str) -> None: | |
if name == self.model_name: | |
return | |
self.model_name = name | |
self.model = self._load_model(name) | |
def predict_pose_and_visualize( | |
self, | |
image: np.ndarray, | |
det_results: list[np.ndarray], | |
box_score_threshold: float, | |
kpt_score_threshold: float, | |
vis_dot_radius: int, | |
vis_line_thickness: int, | |
) -> tuple[list[dict[str, np.ndarray]], np.ndarray]: | |
out = self.predict_pose(image, det_results, box_score_threshold) | |
vis = self.visualize_pose_results(image, out, kpt_score_threshold, | |
vis_dot_radius, vis_line_thickness) | |
return out, vis | |
def predict_pose( | |
self, | |
image: np.ndarray, | |
det_results: list[np.ndarray], | |
box_score_threshold: float = 0.5) -> list[dict[str, np.ndarray]]: | |
image = image[:, :, ::-1] # RGB -> BGR | |
person_results = process_mmdet_results(det_results, 1) | |
out, _ = inference_top_down_pose_model(self.model, | |
image, | |
person_results=person_results, | |
bbox_thr=box_score_threshold, | |
format='xyxy') | |
return out | |
def visualize_pose_results(self, | |
image: np.ndarray, | |
pose_results: list[dict[str, np.ndarray]], | |
kpt_score_threshold: float = 0.3, | |
vis_dot_radius: int = 4, | |
vis_line_thickness: int = 1) -> np.ndarray: | |
image = image[:, :, ::-1] # RGB -> BGR | |
vis = vis_pose_result(self.model, | |
image, | |
pose_results, | |
kpt_score_thr=kpt_score_threshold, | |
radius=vis_dot_radius, | |
thickness=vis_line_thickness) | |
return vis[:, :, ::-1] # BGR -> RGB | |
class AppModel: | |
def __init__(self): | |
self.det_model = DetModel() | |
self.pose_model = PoseModel() | |
def run( | |
self, video_path: str, det_model_name: str, pose_model_name: str, | |
box_score_threshold: float, max_num_frames: int, | |
kpt_score_threshold: float, vis_dot_radius: int, | |
vis_line_thickness: int | |
) -> tuple[str, list[list[dict[str, np.ndarray]]]]: | |
if video_path is None: | |
return | |
self.det_model.set_model(det_model_name) | |
self.pose_model.set_model(pose_model_name) | |
cap = cv2.VideoCapture(video_path) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
preds_all = [] | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
writer = cv2.VideoWriter(out_file.name, fourcc, fps, (width, height)) | |
for _ in range(max_num_frames): | |
ok, frame = cap.read() | |
if not ok: | |
break | |
rgb_frame = frame[:, :, ::-1] | |
det_preds = self.det_model.detect(rgb_frame) | |
preds, vis = self.pose_model.predict_pose_and_visualize( | |
rgb_frame, det_preds, box_score_threshold, kpt_score_threshold, | |
vis_dot_radius, vis_line_thickness) | |
preds_all.append(preds) | |
writer.write(vis[:, :, ::-1]) | |
cap.release() | |
writer.release() | |
return out_file.name, preds_all | |
def visualize_pose_results(self, video_path: str, | |
pose_preds_all: list[list[dict[str, | |
np.ndarray]]], | |
kpt_score_threshold: float, vis_dot_radius: int, | |
vis_line_thickness: int) -> str: | |
if video_path is None or pose_preds_all is None: | |
return | |
cap = cv2.VideoCapture(video_path) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
writer = cv2.VideoWriter(out_file.name, fourcc, fps, (width, height)) | |
for pose_preds in pose_preds_all: | |
ok, frame = cap.read() | |
if not ok: | |
break | |
rgb_frame = frame[:, :, ::-1] | |
vis = self.pose_model.visualize_pose_results( | |
rgb_frame, pose_preds, kpt_score_threshold, vis_dot_radius, | |
vis_line_thickness) | |
writer.write(vis[:, :, ::-1]) | |
cap.release() | |
writer.release() | |
return out_file.name |