File size: 5,585 Bytes
7f861c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# coding: utf-8
"""
Utility functions and classes to handle feature extraction and model loading
"""
import os
import os.path as osp
import torch
from collections import OrderedDict
import psutil
from rich.console import Console
from rich.progress import Progress
from ..modules.spade_generator import SPADEDecoder
from ..modules.warping_network import WarpingNetwork
from ..modules.motion_extractor import MotionExtractor
from ..modules.appearance_feature_extractor import AppearanceFeatureExtractor
from ..modules.stitching_retargeting_network import StitchingRetargetingNetwork
from rich.console import Console
import psutil
console = Console()
def show_memory_usage():
"""
Display the current memory usage in the terminal using rich.
"""
mem_info = psutil.virtual_memory()
total_mem = mem_info.total / (1024 ** 3) # Convert to GB
used_mem = mem_info.used / (1024 ** 3) # Convert to GB
available_mem = mem_info.available / (1024 ** 3) # Convert to GB
console.log(f"[bold green]Memory Usage:[/bold green] [bold red]{used_mem:.2f} GB[/bold red] used of [bold blue]{total_mem:.2f} GB[/bold blue]")
console.log(f"[bold green]Available Memory:[/bold green] [bold yellow]{available_mem:.2f} GB[/bold yellow]")
def suffix(filename):
"""a.jpg -> jpg"""
pos = filename.rfind(".")
if pos == -1:
return ""
return filename[pos + 1:]
def prefix(filename):
"""a.jpg -> a"""
pos = filename.rfind(".")
if pos == -1:
return filename
return filename[:pos]
def basename(filename):
"""a/b/c.jpg -> c"""
return prefix(osp.basename(filename))
def is_video(file_path):
if file_path.lower().endswith((".mp4", ".mov", ".avi", ".webm")) or osp.isdir(file_path):
return True
return False
def is_template(file_path):
if file_path.endswith(".pkl"):
return True
return False
def mkdir(d, log=False):
# return self-assigned `d`, for one line code
if not osp.exists(d):
os.makedirs(d, exist_ok=True)
if log:
log(f"Make dir: {d}")
return d
def squeeze_tensor_to_numpy(tensor):
out = tensor.data.squeeze(0).cpu().numpy()
return out
def dct2cpu(dct: dict, device='cpu'):
for key in dct:
dct[key] = torch.tensor(dct[key]).to(device)
return dct
def concat_feat(kp_source: torch.Tensor, kp_driving: torch.Tensor) -> torch.Tensor:
"""
kp_source: (bs, k, 3)
kp_driving: (bs, k, 3)
Return: (bs, 2k*3)
"""
bs_src = kp_source.shape[0]
bs_dri = kp_driving.shape[0]
assert bs_src == bs_dri, 'batch size must be equal'
feat = torch.cat([kp_source.view(bs_src, -1), kp_driving.view(bs_dri, -1)], dim=1)
return feat
def remove_ddp_duplicate_key(state_dict):
state_dict_new = OrderedDict()
for key in state_dict.keys():
state_dict_new[key.replace('module.', '')] = state_dict[key]
return state_dict_new
def load_model(ckpt_path, model_config, device, model_type):
model_params = model_config['model_params'][f'{model_type}_params']
if model_type == 'appearance_feature_extractor':
model = AppearanceFeatureExtractor(**model_params).to('cpu')
elif model_type == 'motion_extractor':
model = MotionExtractor(**model_params).to('cpu')
elif model_type == 'warping_module':
model = WarpingNetwork(**model_params).to('cpu')
elif model_type == 'spade_generator':
model = SPADEDecoder(**model_params).to('cpu')
elif model_type == 'stitching_retargeting_module':
# Special handling for stitching and retargeting module
config = model_config['model_params']['stitching_retargeting_module_params']
checkpoint = torch.load(ckpt_path, map_location='cpu')
stitcher = StitchingRetargetingNetwork(**config.get('stitching'))
stitcher.load_state_dict(remove_ddp_duplicate_key(checkpoint['retarget_shoulder']))
stitcher = stitcher.to('cpu')
stitcher.eval()
retargetor_lip = StitchingRetargetingNetwork(**config.get('lip'))
retargetor_lip.load_state_dict(remove_ddp_duplicate_key(checkpoint['retarget_mouth']))
retargetor_lip = retargetor_lip.to('cpu')
retargetor_lip.eval()
retargetor_eye = StitchingRetargetingNetwork(**config.get('eye'))
retargetor_eye.load_state_dict(remove_ddp_duplicate_key(checkpoint['retarget_eye']))
retargetor_eye = retargetor_eye.to('cpu')
retargetor_eye.eval()
return {
'stitching': stitcher,
'lip': retargetor_lip,
'eye': retargetor_eye
}
else:
raise ValueError(f"Unknown model type: {model_type}")
model.load_state_dict(torch.load(ckpt_path, map_location='cpu'))
model.eval()
return model
# Get coefficients of Eqn. 7
def calculate_transformation(config, s_kp_info, t_0_kp_info, t_i_kp_info, R_s, R_t_0, R_t_i):
if config.relative:
new_rotation = (R_t_i @ R_t_0.permute(0, 2, 1)) @ R_s
new_expression = s_kp_info['exp'] + (t_i_kp_info['exp'] - t_0_kp_info['exp'])
else:
new_rotation = R_t_i
new_expression = t_i_kp_info['exp']
new_translation = s_kp_info['t'] + (t_i_kp_info['t'] - t_0_kp_info['t'])
new_translation[..., 2].fill_(0) # Keep the z-axis unchanged
new_scale = s_kp_info['scale'] * (t_i_kp_info['scale'] / t_0_kp_info['scale'])
return new_rotation, new_expression, new_translation, new_scale
def load_description(fp):
with open(fp, 'r', encoding='utf-8') as f:
content = f.read()
return content |