Spaces:
Sleeping
Sleeping
import os | |
import random | |
import torch | |
import hydra | |
import numpy as np | |
import zipfile | |
from typing import Any | |
from hydra import compose, initialize | |
from omegaconf import DictConfig, OmegaConf | |
from huggingface_hub import hf_hub_download | |
from utils.misc import compute_model_dim | |
from datasets.base import create_dataset | |
from datasets.misc import collate_fn_general, collate_fn_squeeze_pcd_batch | |
from models.base import create_model | |
from models.visualizer import create_visualizer | |
from models.environment import create_enviroment | |
def pretrain_pointtrans_weight_path(): | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/POINTTRANS_C_32768/model.pth') | |
def model_weight_path(task, has_observation=False): | |
if task == 'pose_gen': | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-09_11-22-52_PoseGen_ddm4_lr1e-4_ep100/ckpts/model.pth') | |
elif task == 'motion_gen' and has_observation == True: | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights//ckpts/model.pth') | |
elif task == 'motion_gen' and has_observation == False: | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights//ckpts/model.pth') | |
elif task == 'path_planning': | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-25_20-57-28_Path_ddm4_LR1e-4_E100_REL/ckpts/model.pth') | |
else: | |
raise Exception('Unexcepted task.') | |
def pose_motion_data_path(): | |
zip_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'hf_data/pose_motion.zip') | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(os.path.dirname(zip_path)) | |
rpath = os.path.join(os.path.dirname(zip_path), 'pose_motion') | |
return ( | |
os.path.join(rpath, 'PROXD_temp'), | |
os.path.join(rpath, 'models_smplx_v1_1/models/'), | |
os.path.join(rpath, 'PROX'), | |
os.path.join(rpath, 'PROX/V02_05') | |
) | |
def path_planning_data_path(): | |
zip_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'hf_data/path_planning.zip') | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(os.path.dirname(zip_path)) | |
return os.path.join(os.path.dirname(zip_path), 'path_planning') | |
def load_ckpt(model: torch.nn.Module, path: str) -> None: | |
""" load ckpt for current model | |
Args: | |
model: current model | |
path: save path | |
""" | |
assert os.path.exists(path), 'Can\'t find provided ckpt.' | |
saved_state_dict = torch.load(path)['model'] | |
model_state_dict = model.state_dict() | |
for key in model_state_dict: | |
if key in saved_state_dict: | |
model_state_dict[key] = saved_state_dict[key] | |
## model is trained with ddm | |
if 'module.'+key in saved_state_dict: | |
model_state_dict[key] = saved_state_dict['module.'+key] | |
model.load_state_dict(model_state_dict) | |
def _sampling(cfg: DictConfig, scene: str) -> Any: | |
## compute modeling dimension according to task | |
cfg.model.d_x = compute_model_dim(cfg.task) | |
if cfg.gpu is not None: | |
device = f'cuda:{cfg.gpu}' | |
else: | |
device = 'cpu' | |
dataset = create_dataset(cfg.task.dataset, 'test', cfg.slurm, case_only=True, specific_scene=scene) | |
if cfg.model.scene_model.name == 'PointTransformer': | |
collate_fn = collate_fn_squeeze_pcd_batch | |
else: | |
collate_fn = collate_fn_general | |
dataloader = dataset.get_dataloader( | |
batch_size=1, | |
collate_fn=collate_fn, | |
shuffle=True, | |
) | |
## create model and load ckpt | |
model = create_model(cfg, slurm=cfg.slurm, device=device) | |
model.to(device=device) | |
load_ckpt(model, path=model_weight_path(cfg.task.name, cfg.task.has_observation if 'has_observation' in cfg.task else False)) | |
## create visualizer and visualize | |
visualizer = create_visualizer(cfg.task.visualizer) | |
results = visualizer.visualize(model, dataloader) | |
return results | |
def _planning(cfg: DictConfig, scene: str) -> Any: | |
## compute modeling dimension according to task | |
cfg.model.d_x = compute_model_dim(cfg.task) | |
if cfg.gpu is not None: | |
device = f'cuda:{cfg.gpu}' | |
else: | |
device = 'cpu' | |
dataset = create_dataset(cfg.task.dataset, 'test', cfg.slurm, case_only=True, specific_scene=scene) | |
if cfg.model.scene_model.name == 'PointTransformer': | |
collate_fn = collate_fn_squeeze_pcd_batch | |
else: | |
collate_fn = collate_fn_general | |
dataloader = dataset.get_dataloader( | |
batch_size=1, | |
collate_fn=collate_fn, | |
shuffle=True, | |
) | |
## create model and load ckpt | |
model = create_model(cfg, slurm=cfg.slurm, device=device) | |
model.to(device=device) | |
load_ckpt(model, path=model_weight_path(cfg.task.name, cfg.task.has_observation if 'has_observation' in cfg.task else False)) | |
## create environment for planning task and run | |
env = create_enviroment(cfg.task.env) | |
results = env.run(model, dataloader) | |
return results | |
## interface for five task | |
## real-time model: pose generation, path planning | |
def pose_generation(scene, count, seed, opt, scale) -> Any: | |
scene_model_weight_path = pretrain_pointtrans_weight_path() | |
data_dir, smpl_dir, prox_dir, vposer_dir = pose_motion_data_path() | |
override_config = [ | |
"diffuser=ddpm", | |
"model=unet", | |
f"model.scene_model.pretrained_weights={scene_model_weight_path}", | |
"task=pose_gen", | |
"task.visualizer.name=PoseGenVisualizerHF", | |
f"task.visualizer.ksample={count}", | |
f"task.dataset.data_dir={data_dir}", | |
f"task.dataset.smpl_dir={smpl_dir}", | |
f"task.dataset.prox_dir={prox_dir}", | |
f"task.dataset.vposer_dir={vposer_dir}", | |
] | |
if opt == True: | |
override_config += [ | |
"optimizer=pose_in_scene", | |
"optimizer.scale_type=div_var", | |
f"optimizer.scale={scale}", | |
"optimizer.vposer=false", | |
"optimizer.contact_weight=0.02", | |
"optimizer.collision_weight=1.0" | |
] | |
initialize(config_path="./scenediffuser/configs", version_base=None) | |
config = compose(config_name="default", overrides=override_config) | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
res = _sampling(config, scene) | |
hydra.core.global_hydra.GlobalHydra.instance().clear() | |
return res | |
def motion_generation(scene): | |
assert isinstance(scene, str) | |
cnt = { | |
'MPH1Library': 3, | |
'MPH16': 6, | |
'N0SittingBooth': 7, | |
'N3OpenArea': 5 | |
}[scene] | |
res = f"./results/motion_generation/results/{scene}/{random.randint(0, cnt-1)}.gif" | |
if not os.path.exists(res): | |
results_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'results/motion_generation/results.zip') | |
os.makedirs('./results/motion_generation/', exist_ok=True) | |
with zipfile.ZipFile(results_path, 'r') as zip_ref: | |
zip_ref.extractall('./results/motion_generation/') | |
return res | |
def grasp_generation(case_id): | |
assert isinstance(case_id, str) | |
res = f"./results/grasp_generation/results/{case_id}/{random.randint(0, 19)}.glb" | |
if not os.path.exists(res): | |
results_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'results/grasp_generation/results.zip') | |
os.makedirs('./results/grasp_generation/', exist_ok=True) | |
with zipfile.ZipFile(results_path, 'r') as zip_ref: | |
zip_ref.extractall('./results/grasp_generation/') | |
return res | |
def path_planning(scene, mode, count, seed, opt, scale_opt, pla, scale_pla): | |
scene_model_weight_path = pretrain_pointtrans_weight_path() | |
data_dir = path_planning_data_path() | |
override_config = [ | |
"diffuser=ddpm", | |
"model=unet", | |
"model.use_position_embedding=true", | |
f"model.scene_model.pretrained_weights={scene_model_weight_path}", | |
"task=path_planning", | |
"task.visualizer.name=PathPlanningRenderingVisualizerHF", | |
f"task.visualizer.ksample={count}", | |
f"task.dataset.data_dir={data_dir}", | |
"task.dataset.repr_type=relative", | |
"task.env.name=PathPlanningEnvWrapperHF", | |
"task.env.inpainting_horizon=16", | |
"task.env.robot_top=3.0", | |
"task.env.env_adaption=false" | |
] | |
if opt == True: | |
override_config += [ | |
"optimizer=path_in_scene", | |
"optimizer.scale_type=div_var", | |
"optimizer.continuity=false", | |
f"optimizer.scale={scale_opt}", | |
] | |
if pla == True: | |
override_config += [ | |
"planner=greedy_path_planning", | |
f"planner.scale={scale_pla}", | |
"planner.scale_type=div_var", | |
"planner.greedy_type=all_frame_exp" | |
] | |
initialize(config_path="./scenediffuser/configs", version_base=None) | |
config = compose(config_name="default", overrides=override_config) | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
if mode == 'Sampling': | |
img = _sampling(config, scene) | |
res = (img, 0) | |
elif mode == 'Planning': | |
res = _planning(config, scene) | |
else: | |
res = (None, 0) | |
hydra.core.global_hydra.GlobalHydra.instance().clear() | |
return res | |