Spaces:
Running
Running
import numpy as np | |
import cv2 | |
from scipy.ndimage.filters import gaussian_filter | |
from .pose_utils import _get_keypoints, _pad_image | |
from insightface import model_zoo | |
from dofaker.utils import download_file, get_model_url | |
class PoseEstimator: | |
def __init__(self, name='openpose_body', root='weights/models'): | |
_, model_file = download_file(get_model_url(name), | |
save_dir=root, | |
overwrite=False) | |
providers = model_zoo.model_zoo.get_default_providers() | |
self.session = model_zoo.model_zoo.PickableInferenceSession( | |
model_file, providers=providers) | |
self.input_mean = 127.5 | |
self.input_std = 255.0 | |
inputs = self.session.get_inputs() | |
self.input_names = [] | |
for inp in inputs: | |
self.input_names.append(inp.name) | |
outputs = self.session.get_outputs() | |
output_names = [] | |
for out in outputs: | |
output_names.append(out.name) | |
self.output_names = output_names | |
assert len( | |
self.output_names | |
) == 2, "The output number of PoseEstimator model should be 2, but got {}, please check your model.".format( | |
len(self.output_names)) | |
output_shape = outputs[0].shape | |
input_cfg = inputs[0] | |
input_shape = input_cfg.shape | |
self.input_shape = input_shape | |
print('pose estimator shape:', self.input_shape) | |
def forward(self, image, image_format='rgb'): | |
if isinstance(image, str): | |
image = cv2.imread(image, 1) | |
image_format = 'bgr' | |
elif isinstance(image, np.ndarray): | |
if image_format == 'bgr': | |
pass | |
elif image_format == 'rgb': | |
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
image_format = 'bgr' | |
else: | |
raise UserWarning( | |
"PoseEstimator not support image format {}".format( | |
image_format)) | |
else: | |
raise UserWarning( | |
"PoseEstimator input must be str or np.ndarray, but got {}.". | |
format(type(image))) | |
scales = [0.5] | |
stride = 8 | |
bboxsize = 368 | |
padvalue = 128 | |
thresh_1 = 0.1 | |
thresh_2 = 0.05 | |
multipliers = [scale * bboxsize / image.shape[0] for scale in scales] | |
heatmap_avg = np.zeros((image.shape[0], image.shape[1], 19)) | |
paf_avg = np.zeros((image.shape[0], image.shape[1], 38)) | |
for scale in multipliers: | |
image_scaled = cv2.resize(image, (0, 0), | |
fx=scale, | |
fy=scale, | |
interpolation=cv2.INTER_CUBIC) | |
image_padded, pads = _pad_image(image_scaled, stride, padvalue) | |
image_tensor = np.expand_dims(np.transpose(image_padded, (2, 0, 1)), | |
0) | |
blob = (np.float32(image_tensor) - self.input_mean) / self.input_std | |
pred = self.session.run(self.output_names, | |
{self.input_names[0]: blob}) | |
Mconv7_stage6_L1, Mconv7_stage6_L2 = pred[0], pred[1] | |
heatmap = np.transpose(np.squeeze(Mconv7_stage6_L2), (1, 2, 0)) | |
heatmap = cv2.resize(heatmap, (0, 0), | |
fx=stride, | |
fy=stride, | |
interpolation=cv2.INTER_CUBIC) | |
heatmap = heatmap[:image_padded.shape[0] - | |
pads[3], :image_padded.shape[1] - pads[2], :] | |
heatmap = cv2.resize(heatmap, (image.shape[1], image.shape[0]), | |
interpolation=cv2.INTER_CUBIC) | |
paf = np.transpose(np.squeeze(Mconv7_stage6_L1), (1, 2, 0)) | |
paf = cv2.resize(paf, (0, 0), | |
fx=stride, | |
fy=stride, | |
interpolation=cv2.INTER_CUBIC) | |
paf = paf[:image_padded.shape[0] - pads[3], :image_padded.shape[1] - | |
pads[2], :] | |
paf = cv2.resize(paf, (image.shape[1], image.shape[0]), | |
interpolation=cv2.INTER_CUBIC) | |
heatmap_avg += (heatmap / len(multipliers)) | |
paf_avg += (paf / len(multipliers)) | |
all_peaks = [] | |
num_peaks = 0 | |
for part in range(18): | |
map_orig = heatmap_avg[:, :, part] | |
map_filt = gaussian_filter(map_orig, sigma=3) | |
map_L = np.zeros_like(map_filt) | |
map_T = np.zeros_like(map_filt) | |
map_R = np.zeros_like(map_filt) | |
map_B = np.zeros_like(map_filt) | |
map_L[1:, :] = map_filt[:-1, :] | |
map_T[:, 1:] = map_filt[:, :-1] | |
map_R[:-1, :] = map_filt[1:, :] | |
map_B[:, :-1] = map_filt[:, 1:] | |
peaks_binary = np.logical_and.reduce( | |
(map_filt >= map_L, map_filt >= map_T, map_filt | |
>= map_R, map_filt >= map_B, map_filt > thresh_1)) | |
peaks = list( | |
zip(np.nonzero(peaks_binary)[1], | |
np.nonzero(peaks_binary)[0])) | |
peaks_ids = range(num_peaks, num_peaks + len(peaks)) | |
peaks_with_scores = [ | |
peak + (map_orig[peak[1], peak[0]], ) for peak in peaks | |
] | |
peaks_with_scores_and_ids = [peaks_with_scores[i] + (peaks_ids[i],) \ | |
for i in range(len(peaks_ids))] | |
all_peaks.append(peaks_with_scores_and_ids) | |
num_peaks += len(peaks) | |
map_idx = [[31, 32], [39, 40], [33, 34], [35, 36], [41, 42], [43, 44], | |
[19, 20], [21, 22], [23, 24], [25, 26], [27, 28], [29, 30], | |
[47, 48], [49, 50], [53, 54], [51, 52], [55, 56], [37, 38], | |
[45, 46]] | |
limbseq = [[2, 3], [2, 6], [3, 4], [4, 5], [6, 7], [7, 8], [2, 9], | |
[9, 10], [10, 11], [2, 12], [12, 13], [13, 14], [2, 1], | |
[1, 15], [15, 17], [1, 16], [16, 18], [3, 17], [6, 18]] | |
all_connections = [] | |
spl_k = [] | |
mid_n = 10 | |
for k in range(len(map_idx)): | |
score_mid = paf_avg[:, :, [x - 19 for x in map_idx[k]]] | |
candidate_A = all_peaks[limbseq[k][0] - 1] | |
candidate_B = all_peaks[limbseq[k][1] - 1] | |
n_A = len(candidate_A) | |
n_B = len(candidate_B) | |
index_A, index_B = limbseq[k] | |
if n_A != 0 and n_B != 0: | |
connection_candidates = [] | |
for i in range(n_A): | |
for j in range(n_B): | |
v = np.subtract(candidate_B[j][:2], candidate_A[i][:2]) | |
n = np.sqrt(v[0] * v[0] + v[1] * v[1]) | |
v = np.divide(v, n) | |
ab = list( | |
zip( | |
np.linspace(candidate_A[i][0], | |
candidate_B[j][0], | |
num=mid_n), | |
np.linspace(candidate_A[i][1], | |
candidate_B[j][1], | |
num=mid_n))) | |
vx = np.array([ | |
score_mid[int(round(ab[x][1])), | |
int(round(ab[x][0])), 0] | |
for x in range(len(ab)) | |
]) | |
vy = np.array([ | |
score_mid[int(round(ab[x][1])), | |
int(round(ab[x][0])), 1] | |
for x in range(len(ab)) | |
]) | |
score_midpoints = np.multiply(vx, v[0]) + np.multiply( | |
vy, v[1]) | |
score_with_dist_prior = sum( | |
score_midpoints) / len(score_midpoints) + min( | |
0.5 * image.shape[0] / n - 1, 0) | |
criterion_1 = len( | |
np.nonzero(score_midpoints > thresh_2) | |
[0]) > 0.8 * len(score_midpoints) | |
criterion_2 = score_with_dist_prior > 0 | |
if criterion_1 and criterion_2: | |
connection_candidate = [ | |
i, j, score_with_dist_prior, | |
score_with_dist_prior + candidate_A[i][2] + | |
candidate_B[j][2] | |
] | |
connection_candidates.append(connection_candidate) | |
connection_candidates = sorted(connection_candidates, | |
key=lambda x: x[2], | |
reverse=True) | |
connection = np.zeros((0, 5)) | |
for candidate in connection_candidates: | |
i, j, s = candidate[0:3] | |
if i not in connection[:, 3] and j not in connection[:, 4]: | |
connection = np.vstack([ | |
connection, | |
[candidate_A[i][3], candidate_B[j][3], s, i, j] | |
]) | |
if len(connection) >= min(n_A, n_B): | |
break | |
all_connections.append(connection) | |
else: | |
spl_k.append(k) | |
all_connections.append([]) | |
candidate = np.array( | |
[item for sublist in all_peaks for item in sublist]) | |
subset = np.ones((0, 20)) * -1 | |
for k in range(len(map_idx)): | |
if k not in spl_k: | |
part_As = all_connections[k][:, 0] | |
part_Bs = all_connections[k][:, 1] | |
index_A, index_B = np.array(limbseq[k]) - 1 | |
for i in range(len(all_connections[k])): | |
found = 0 | |
subset_idx = [-1, -1] | |
for j in range(len(subset)): | |
if subset[j][index_A] == part_As[i] or subset[j][ | |
index_B] == part_Bs[i]: | |
subset_idx[found] = j | |
found += 1 | |
if found == 1: | |
j = subset_idx[0] | |
if subset[j][index_B] != part_Bs[i]: | |
subset[j][index_B] = part_Bs[i] | |
subset[j][-1] += 1 | |
subset[j][-2] += candidate[ | |
part_Bs[i].astype(int), | |
2] + all_connections[k][i][2] | |
elif found == 2: | |
j1, j2 = subset_idx | |
membership = ((subset[j1] >= 0).astype(int) + | |
(subset[j2] >= 0).astype(int))[:-2] | |
if len(np.nonzero(membership == 2)[0]) == 0: | |
subset[j1][:-2] += (subset[j2][:-2] + 1) | |
subset[j1][-2:] += subset[j2][-2:] | |
subset[j1][-2] += all_connections[k][i][2] | |
subset = np.delete(subset, j2, 0) | |
else: | |
subset[j1][index_B] = part_Bs[i] | |
subset[j1][-1] += 1 | |
subset[j1][-2] += candidate[ | |
part_Bs[i].astype(int), | |
2] + all_connections[k][i][2] | |
elif not found and k < 17: | |
row = np.ones(20) * -1 | |
row[index_A] = part_As[i] | |
row[index_B] = part_Bs[i] | |
row[-1] = 2 | |
row[-2] = sum( | |
candidate[all_connections[k][i, :2].astype(int), | |
2]) + all_connections[k][i][2] | |
subset = np.vstack([subset, row]) | |
del_idx = [] | |
for i in range(len(subset)): | |
if subset[i][-1] < 4 or subset[i][-2] / subset[i][-1] < 0.4: | |
del_idx.append(i) | |
subset = np.delete(subset, del_idx, axis=0) | |
return _get_keypoints(candidate, subset) | |
def get(self, image, image_format='rgb'): | |
return self.forward(image, image_format) | |