ViTPose / model.py
Spidartist
new files
a030099
from __future__ import annotations
import os
import shlex
import subprocess
import sys
import tempfile
if os.getenv('SYSTEM') == 'spaces':
import mim
mim.uninstall('mmcv-full', confirm_yes=True)
mim.install('mmcv-full==1.5.0', is_yes=True)
subprocess.call(shlex.split('pip uninstall -y opencv-python'))
subprocess.call(shlex.split('pip uninstall -y opencv-python-headless'))
subprocess.call(
shlex.split('pip install opencv-python-headless==4.5.5.64'))
import cv2
import huggingface_hub
import numpy as np
import torch
import torch.nn as nn
sys.path.insert(0, 'ViTPose/')
from mmdet.apis import inference_detector, init_detector
from mmpose.apis import (inference_top_down_pose_model, init_pose_model,
process_mmdet_results, vis_pose_result)
HF_TOKEN = os.getenv('HF_TOKEN')
class DetModel:
MODEL_DICT = {
'YOLOX-tiny': {
'config':
'mmdet_configs/configs/yolox/yolox_tiny_8x8_300e_coco.py',
'model':
'https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_tiny_8x8_300e_coco/yolox_tiny_8x8_300e_coco_20211124_171234-b4047906.pth',
},
'YOLOX-s': {
'config':
'mmdet_configs/configs/yolox/yolox_s_8x8_300e_coco.py',
'model':
'https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_s_8x8_300e_coco/yolox_s_8x8_300e_coco_20211121_095711-4592a793.pth',
},
'YOLOX-l': {
'config':
'mmdet_configs/configs/yolox/yolox_l_8x8_300e_coco.py',
'model':
'https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_l_8x8_300e_coco/yolox_l_8x8_300e_coco_20211126_140236-d3bd2b23.pth',
},
'YOLOX-x': {
'config':
'mmdet_configs/configs/yolox/yolox_x_8x8_300e_coco.py',
'model':
'https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_x_8x8_300e_coco/yolox_x_8x8_300e_coco_20211126_140254-1ef88d67.pth',
},
}
def __init__(self):
self.device = torch.device(
'cuda:0' if torch.cuda.is_available() else 'cpu')
self._load_all_models_once()
self.model_name = 'YOLOX-l'
self.model = self._load_model(self.model_name)
def _load_all_models_once(self) -> None:
for name in self.MODEL_DICT:
self._load_model(name)
def _load_model(self, name: str) -> nn.Module:
dic = self.MODEL_DICT[name]
return init_detector(dic['config'], dic['model'], device=self.device)
def set_model(self, name: str) -> None:
if name == self.model_name:
return
self.model_name = name
self.model = self._load_model(name)
def detect_and_visualize(
self, image: np.ndarray,
score_threshold: float) -> tuple[list[np.ndarray], np.ndarray]:
out = self.detect(image)
vis = self.visualize_detection_results(image, out, score_threshold)
return out, vis
def detect(self, image: np.ndarray) -> list[np.ndarray]:
image = image[:, :, ::-1] # RGB -> BGR
out = inference_detector(self.model, image)
return out
def visualize_detection_results(
self,
image: np.ndarray,
detection_results: list[np.ndarray],
score_threshold: float = 0.3) -> np.ndarray:
person_det = [detection_results[0]] + [np.array([]).reshape(0, 5)] * 79
image = image[:, :, ::-1] # RGB -> BGR
vis = self.model.show_result(image,
person_det,
score_thr=score_threshold,
bbox_color=None,
text_color=(200, 200, 200),
mask_color=None)
return vis[:, :, ::-1] # BGR -> RGB
class PoseModel:
MODEL_DICT = {
'ViTPose-B (single-task train)': {
'config':
'ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_base_coco_256x192.py',
'model': 'models/vitpose-b.pth',
},
'ViTPose-L (single-task train)': {
'config':
'ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_large_coco_256x192.py',
'model': 'models/vitpose-l.pth',
},
'ViTPose-B (multi-task train, COCO)': {
'config':
'ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_base_coco_256x192.py',
'model': 'models/vitpose-b-multi-coco.pth',
},
'ViTPose-L (multi-task train, COCO)': {
'config':
'ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_large_coco_256x192.py',
'model': 'models/vitpose-l-multi-coco.pth',
},
}
def __init__(self):
self.device = torch.device(
'cuda:0' if torch.cuda.is_available() else 'cpu')
self.model_name = 'ViTPose-B (multi-task train, COCO)'
self.model = self._load_model(self.model_name)
def _load_all_models_once(self) -> None:
for name in self.MODEL_DICT:
self._load_model(name)
def _load_model(self, name: str) -> nn.Module:
dic = self.MODEL_DICT[name]
ckpt_path = huggingface_hub.hf_hub_download('hysts/ViTPose',
dic['model'],
use_auth_token=HF_TOKEN)
model = init_pose_model(dic['config'], ckpt_path, device=self.device)
return model
def set_model(self, name: str) -> None:
if name == self.model_name:
return
self.model_name = name
self.model = self._load_model(name)
def predict_pose_and_visualize(
self,
image: np.ndarray,
det_results: list[np.ndarray],
box_score_threshold: float,
kpt_score_threshold: float,
vis_dot_radius: int,
vis_line_thickness: int,
) -> tuple[list[dict[str, np.ndarray]], np.ndarray]:
out = self.predict_pose(image, det_results, box_score_threshold)
vis = self.visualize_pose_results(image, out, kpt_score_threshold,
vis_dot_radius, vis_line_thickness)
return out, vis
def predict_pose(
self,
image: np.ndarray,
det_results: list[np.ndarray],
box_score_threshold: float = 0.5) -> list[dict[str, np.ndarray]]:
image = image[:, :, ::-1] # RGB -> BGR
person_results = process_mmdet_results(det_results, 1)
out, _ = inference_top_down_pose_model(self.model,
image,
person_results=person_results,
bbox_thr=box_score_threshold,
format='xyxy')
return out
def visualize_pose_results(self,
image: np.ndarray,
pose_results: list[dict[str, np.ndarray]],
kpt_score_threshold: float = 0.3,
vis_dot_radius: int = 4,
vis_line_thickness: int = 1) -> np.ndarray:
image = image[:, :, ::-1] # RGB -> BGR
vis = vis_pose_result(self.model,
image,
pose_results,
kpt_score_thr=kpt_score_threshold,
radius=vis_dot_radius,
thickness=vis_line_thickness)
return vis[:, :, ::-1] # BGR -> RGB
class AppModel:
def __init__(self):
self.det_model = DetModel()
self.pose_model = PoseModel()
def run(
self, video_path: str, det_model_name: str, pose_model_name: str,
box_score_threshold: float, max_num_frames: int,
kpt_score_threshold: float, vis_dot_radius: int,
vis_line_thickness: int
) -> tuple[str, list[list[dict[str, np.ndarray]]]]:
if video_path is None:
return
self.det_model.set_model(det_model_name)
self.pose_model.set_model(pose_model_name)
cap = cv2.VideoCapture(video_path)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
fps = cap.get(cv2.CAP_PROP_FPS)
preds_all = []
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
writer = cv2.VideoWriter(out_file.name, fourcc, fps, (width, height))
for _ in range(max_num_frames):
ok, frame = cap.read()
if not ok:
break
rgb_frame = frame[:, :, ::-1]
det_preds = self.det_model.detect(rgb_frame)
preds, vis = self.pose_model.predict_pose_and_visualize(
rgb_frame, det_preds, box_score_threshold, kpt_score_threshold,
vis_dot_radius, vis_line_thickness)
preds_all.append(preds)
writer.write(vis[:, :, ::-1])
cap.release()
writer.release()
return out_file.name, preds_all
def visualize_pose_results(self, video_path: str,
pose_preds_all: list[list[dict[str,
np.ndarray]]],
kpt_score_threshold: float, vis_dot_radius: int,
vis_line_thickness: int) -> str:
if video_path is None or pose_preds_all is None:
return
cap = cv2.VideoCapture(video_path)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
writer = cv2.VideoWriter(out_file.name, fourcc, fps, (width, height))
for pose_preds in pose_preds_all:
ok, frame = cap.read()
if not ok:
break
rgb_frame = frame[:, :, ::-1]
vis = self.pose_model.visualize_pose_results(
rgb_frame, pose_preds, kpt_score_threshold, vis_dot_radius,
vis_line_thickness)
writer.write(vis[:, :, ::-1])
cap.release()
writer.release()
return out_file.name