Spaces:
Sleeping
Sleeping
import os | |
import random | |
import torch | |
import hydra | |
import numpy as np | |
import zipfile | |
import time | |
import uuid | |
from typing import Any | |
from hydra import compose, initialize | |
from omegaconf import DictConfig, OmegaConf | |
from huggingface_hub import hf_hub_download | |
from utils.misc import compute_model_dim | |
from datasets.base import create_dataset | |
from datasets.misc import collate_fn_general, collate_fn_squeeze_pcd_batch | |
from models.base import create_model | |
from models.visualizer import create_visualizer | |
from models.environment import create_enviroment | |
def pretrain_pointtrans_weight_path(): | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/POINTTRANS_C_32768/model.pth') | |
def model_weight_path(task, has_observation=False): | |
if task == 'pose_gen': | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-09_11-22-52_PoseGen_ddm4_lr1e-4_ep100/ckpts/model.pth') | |
elif task == 'motion_gen' and has_observation == True: | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-09_14-28-12_MotionGen_ddm_T200_lr1e-4_ep300_obser/ckpts/model.pth') | |
elif task == 'motion_gen' and has_observation == False: | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-09_12-54-50_MotionGen_ddm_T200_lr1e-4_ep300/ckpts/model.pth') | |
elif task == 'path_planning': | |
return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-25_20-57-28_Path_ddm4_LR1e-4_E100_REL/ckpts/model.pth') | |
else: | |
raise Exception('Unexcepted task.') | |
def pose_motion_data_path(): | |
zip_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'hf_data/pose_motion.zip') | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(os.path.dirname(zip_path)) | |
rpath = os.path.join(os.path.dirname(zip_path), 'pose_motion') | |
return ( | |
os.path.join(rpath, 'PROXD_temp'), | |
os.path.join(rpath, 'models_smplx_v1_1/models/'), | |
os.path.join(rpath, 'PROX'), | |
os.path.join(rpath, 'PROX/V02_05') | |
) | |
def path_planning_data_path(): | |
zip_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'hf_data/path_planning.zip') | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(os.path.dirname(zip_path)) | |
return os.path.join(os.path.dirname(zip_path), 'path_planning') | |
def load_ckpt(model: torch.nn.Module, path: str) -> None: | |
""" load ckpt for current model | |
Args: | |
model: current model | |
path: save path | |
""" | |
assert os.path.exists(path), 'Can\'t find provided ckpt.' | |
saved_state_dict = torch.load(path)['model'] | |
model_state_dict = model.state_dict() | |
for key in model_state_dict: | |
if key in saved_state_dict: | |
model_state_dict[key] = saved_state_dict[key] | |
## model is trained with ddm | |
if 'module.'+key in saved_state_dict: | |
model_state_dict[key] = saved_state_dict['module.'+key] | |
model.load_state_dict(model_state_dict) | |
def _sampling(cfg: DictConfig, scene: str) -> Any: | |
## compute modeling dimension according to task | |
cfg.model.d_x = compute_model_dim(cfg.task) | |
if cfg.gpu is not None: | |
device = f'cuda:{cfg.gpu}' | |
else: | |
device = 'cpu' | |
dataset = create_dataset(cfg.task.dataset, 'test', cfg.slurm, case_only=True, specific_scene=scene) | |
if cfg.model.scene_model.name == 'PointTransformer': | |
collate_fn = collate_fn_squeeze_pcd_batch | |
else: | |
collate_fn = collate_fn_general | |
dataloader = dataset.get_dataloader( | |
batch_size=1, | |
collate_fn=collate_fn, | |
shuffle=True, | |
) | |
## create model and load ckpt | |
model = create_model(cfg, slurm=cfg.slurm, device=device) | |
model.to(device=device) | |
load_ckpt(model, path=model_weight_path(cfg.task.name, cfg.task.has_observation if 'has_observation' in cfg.task else False)) | |
## create visualizer and visualize | |
visualizer = create_visualizer(cfg.task.visualizer) | |
results = visualizer.visualize(model, dataloader) | |
return results | |
def _planning(cfg: DictConfig, scene: str) -> Any: | |
## compute modeling dimension according to task | |
cfg.model.d_x = compute_model_dim(cfg.task) | |
if cfg.gpu is not None: | |
device = f'cuda:{cfg.gpu}' | |
else: | |
device = 'cpu' | |
dataset = create_dataset(cfg.task.dataset, 'test', cfg.slurm, case_only=True, specific_scene=scene) | |
if cfg.model.scene_model.name == 'PointTransformer': | |
collate_fn = collate_fn_squeeze_pcd_batch | |
else: | |
collate_fn = collate_fn_general | |
dataloader = dataset.get_dataloader( | |
batch_size=1, | |
collate_fn=collate_fn, | |
shuffle=True, | |
) | |
## create model and load ckpt | |
model = create_model(cfg, slurm=cfg.slurm, device=device) | |
model.to(device=device) | |
load_ckpt(model, path=model_weight_path(cfg.task.name, cfg.task.has_observation if 'has_observation' in cfg.task else False)) | |
## create environment for planning task and run | |
env = create_enviroment(cfg.task.env) | |
results = env.run(model, dataloader) | |
return results | |
## interface for five task | |
## real-time model: | |
## - pose generation | |
## - motion generation | |
## - path planning | |
def pose_generation(scene, count, seed, opt, scale) -> Any: | |
scene_model_weight_path = pretrain_pointtrans_weight_path() | |
data_dir, smpl_dir, prox_dir, vposer_dir = pose_motion_data_path() | |
override_config = [ | |
"diffuser=ddpm", | |
"model=unet", | |
f"model.scene_model.pretrained_weights={scene_model_weight_path}", | |
"task=pose_gen", | |
"task.visualizer.name=PoseGenVisualizerHF", | |
f"task.visualizer.ksample={count}", | |
f"task.dataset.data_dir={data_dir}", | |
f"task.dataset.smpl_dir={smpl_dir}", | |
f"task.dataset.prox_dir={prox_dir}", | |
f"task.dataset.vposer_dir={vposer_dir}", | |
] | |
if opt == True: | |
override_config += [ | |
"optimizer=pose_in_scene", | |
"optimizer.scale_type=div_var", | |
f"optimizer.scale={scale}", | |
"optimizer.vposer=false", | |
"optimizer.contact_weight=0.02", | |
"optimizer.collision_weight=1.0" | |
] | |
initialize(config_path="./scenediffuser/configs", version_base=None) | |
config = compose(config_name="default", overrides=override_config) | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
res = _sampling(config, scene) | |
hydra.core.global_hydra.GlobalHydra.instance().clear() | |
return res | |
def motion_generation(scene, count, seed, withstart, opt, scale) -> Any: | |
scene_model_weight_path = pretrain_pointtrans_weight_path() | |
data_dir, smpl_dir, prox_dir, vposer_dir = pose_motion_data_path() | |
override_config = [ | |
"diffuser=ddpm", | |
"diffuser.steps=200", | |
"model=unet", | |
"model.use_position_embedding=true", | |
f"model.scene_model.pretrained_weights={scene_model_weight_path}", | |
"task=motion_gen", | |
f"task.has_observation={withstart}", | |
"task.dataset.repr_type=absolute", | |
"task.dataset.frame_interval_test=20", | |
"task.visualizer.name=MotionGenVisualizerHF", | |
f"task.visualizer.ksample={count}", | |
f"task.dataset.data_dir={data_dir}", | |
f"task.dataset.smpl_dir={smpl_dir}", | |
f"task.dataset.prox_dir={prox_dir}", | |
f"task.dataset.vposer_dir={vposer_dir}", | |
] | |
if opt == True: | |
override_config += [ | |
"optimizer=motion_in_scene", | |
"optimizer.scale_type=div_var", | |
f"optimizer.scale={scale}", | |
"optimizer.vposer=false", | |
"optimizer.contact_weight=0.02", | |
"optimizer.collision_weight=1.0", | |
"optimizer.smoothness_weight=0.001", | |
"optimizer.frame_interval=1", | |
] | |
initialize(config_path="./scenediffuser/configs", version_base=None) | |
config = compose(config_name="default", overrides=override_config) | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
res_gifs = _sampling(config, scene) | |
## save sampled motion as .gif file | |
datestr = time.strftime("%Y-%m-%d", time.localtime(time.time())) | |
target_dir = os.path.join('./results/motion_generation/', f'd-{datestr}') | |
os.makedirs(target_dir, exist_ok=True) | |
res = [] | |
uuid_str = uuid.uuid4() | |
for i, imgs in enumerate(res_gifs): | |
target_path = os.path.join(target_dir, f'{uuid_str}--{i}.gif') | |
imgs = [im.resize((720, 405)) for im in imgs] # resize image for low resolution to save space | |
img, *img_rest = imgs | |
img.save(fp=target_path, format='GIF', append_images=img_rest, save_all=True, duration=33.33, loop=0) | |
res.append(target_path) | |
hydra.core.global_hydra.GlobalHydra.instance().clear() | |
return res | |
def grasp_generation(case_id): | |
assert isinstance(case_id, str) | |
res = f"./results/grasp_generation/results/{case_id}/{random.randint(0, 19)}.glb" | |
if not os.path.exists(res): | |
results_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'results/grasp_generation/results.zip') | |
os.makedirs('./results/grasp_generation/', exist_ok=True) | |
with zipfile.ZipFile(results_path, 'r') as zip_ref: | |
zip_ref.extractall('./results/grasp_generation/') | |
return res | |
def path_planning(scene, mode, count, seed, opt, scale_opt, pla, scale_pla): | |
scene_model_weight_path = pretrain_pointtrans_weight_path() | |
data_dir = path_planning_data_path() | |
override_config = [ | |
"diffuser=ddpm", | |
"model=unet", | |
"model.use_position_embedding=true", | |
f"model.scene_model.pretrained_weights={scene_model_weight_path}", | |
"task=path_planning", | |
"task.visualizer.name=PathPlanningRenderingVisualizerHF", | |
f"task.visualizer.ksample={count}", | |
f"task.dataset.data_dir={data_dir}", | |
"task.dataset.repr_type=relative", | |
"task.env.name=PathPlanningEnvWrapperHF", | |
"task.env.inpainting_horizon=16", | |
"task.env.robot_top=3.0", | |
"task.env.env_adaption=false" | |
] | |
if opt == True: | |
override_config += [ | |
"optimizer=path_in_scene", | |
"optimizer.scale_type=div_var", | |
"optimizer.continuity=false", | |
f"optimizer.scale={scale_opt}", | |
] | |
if pla == True: | |
override_config += [ | |
"planner=greedy_path_planning", | |
f"planner.scale={scale_pla}", | |
"planner.scale_type=div_var", | |
"planner.greedy_type=all_frame_exp" | |
] | |
initialize(config_path="./scenediffuser/configs", version_base=None) | |
config = compose(config_name="default", overrides=override_config) | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
if mode == 'Sampling': | |
img = _sampling(config, scene) | |
res = (img, 0) | |
elif mode == 'Planning': | |
res = _planning(config, scene) | |
else: | |
res = (None, 0) | |
hydra.core.global_hydra.GlobalHydra.instance().clear() | |
return res | |