import os import random import torch import hydra import numpy as np import zipfile from typing import Any from hydra import compose, initialize from omegaconf import DictConfig, OmegaConf from huggingface_hub import hf_hub_download from utils.misc import compute_model_dim from datasets.base import create_dataset from datasets.misc import collate_fn_general, collate_fn_squeeze_pcd_batch from models.base import create_model from models.visualizer import create_visualizer from models.environment import create_enviroment def pretrain_pointtrans_weight_path(): return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/POINTTRANS_C_32768/model.pth') def model_weight_path(task, has_observation=False): if task == 'pose_gen': return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-09_11-22-52_PoseGen_ddm4_lr1e-4_ep100/ckpts/model.pth') elif task == 'motion_gen' and has_observation == True: return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights//ckpts/model.pth') elif task == 'motion_gen' and has_observation == False: return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights//ckpts/model.pth') elif task == 'path_planning': return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-25_20-57-28_Path_ddm4_LR1e-4_E100_REL/ckpts/model.pth') else: raise Exception('Unexcepted task.') def pose_motion_data_path(): zip_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'hf_data/pose_motion.zip') with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(os.path.dirname(zip_path)) rpath = os.path.join(os.path.dirname(zip_path), 'pose_motion') return ( os.path.join(rpath, 'PROXD_temp'), os.path.join(rpath, 'models_smplx_v1_1/models/'), os.path.join(rpath, 'PROX'), os.path.join(rpath, 'PROX/V02_05') ) def path_planning_data_path(): zip_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'hf_data/path_planning.zip') with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(os.path.dirname(zip_path)) return os.path.join(os.path.dirname(zip_path), 'path_planning') def load_ckpt(model: torch.nn.Module, path: str) -> None: """ load ckpt for current model Args: model: current model path: save path """ assert os.path.exists(path), 'Can\'t find provided ckpt.' saved_state_dict = torch.load(path)['model'] model_state_dict = model.state_dict() for key in model_state_dict: if key in saved_state_dict: model_state_dict[key] = saved_state_dict[key] ## model is trained with ddm if 'module.'+key in saved_state_dict: model_state_dict[key] = saved_state_dict['module.'+key] model.load_state_dict(model_state_dict) def _sampling(cfg: DictConfig, scene: str) -> Any: ## compute modeling dimension according to task cfg.model.d_x = compute_model_dim(cfg.task) if cfg.gpu is not None: device = f'cuda:{cfg.gpu}' else: device = 'cpu' dataset = create_dataset(cfg.task.dataset, 'test', cfg.slurm, case_only=True, specific_scene=scene) if cfg.model.scene_model.name == 'PointTransformer': collate_fn = collate_fn_squeeze_pcd_batch else: collate_fn = collate_fn_general dataloader = dataset.get_dataloader( batch_size=1, collate_fn=collate_fn, shuffle=True, ) ## create model and load ckpt model = create_model(cfg, slurm=cfg.slurm, device=device) model.to(device=device) load_ckpt(model, path=model_weight_path(cfg.task.name, cfg.task.has_observation if 'has_observation' in cfg.task else False)) ## create visualizer and visualize visualizer = create_visualizer(cfg.task.visualizer) results = visualizer.visualize(model, dataloader) return results def _planning(cfg: DictConfig, scene: str) -> Any: ## compute modeling dimension according to task cfg.model.d_x = compute_model_dim(cfg.task) if cfg.gpu is not None: device = f'cuda:{cfg.gpu}' else: device = 'cpu' dataset = create_dataset(cfg.task.dataset, 'test', cfg.slurm, case_only=True, specific_scene=scene) if cfg.model.scene_model.name == 'PointTransformer': collate_fn = collate_fn_squeeze_pcd_batch else: collate_fn = collate_fn_general dataloader = dataset.get_dataloader( batch_size=1, collate_fn=collate_fn, shuffle=True, ) ## create model and load ckpt model = create_model(cfg, slurm=cfg.slurm, device=device) model.to(device=device) load_ckpt(model, path=model_weight_path(cfg.task.name, cfg.task.has_observation if 'has_observation' in cfg.task else False)) ## create environment for planning task and run env = create_enviroment(cfg.task.env) results = env.run(model, dataloader) return results ## interface for five task ## real-time model: pose generation, path planning def pose_generation(scene, count, seed, opt, scale) -> Any: scene_model_weight_path = pretrain_pointtrans_weight_path() data_dir, smpl_dir, prox_dir, vposer_dir = pose_motion_data_path() override_config = [ "diffuser=ddpm", "model=unet", f"model.scene_model.pretrained_weights={scene_model_weight_path}", "task=pose_gen", "task.visualizer.name=PoseGenVisualizerHF", f"task.visualizer.ksample={count}", f"task.dataset.data_dir={data_dir}", f"task.dataset.smpl_dir={smpl_dir}", f"task.dataset.prox_dir={prox_dir}", f"task.dataset.vposer_dir={vposer_dir}", ] if opt == True: override_config += [ "optimizer=pose_in_scene", "optimizer.scale_type=div_var", f"optimizer.scale={scale}", "optimizer.vposer=false", "optimizer.contact_weight=0.02", "optimizer.collision_weight=1.0" ] initialize(config_path="./scenediffuser/configs", version_base=None) config = compose(config_name="default", overrides=override_config) random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) res = _sampling(config, scene) hydra.core.global_hydra.GlobalHydra.instance().clear() return res def motion_generation(scene): assert isinstance(scene, str) cnt = { 'MPH1Library': 3, 'MPH16': 6, 'N0SittingBooth': 7, 'N3OpenArea': 5 }[scene] res = f"./results/motion_generation/results/{scene}/{random.randint(0, cnt-1)}.gif" if not os.path.exists(res): results_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'results/motion_generation/results.zip') os.makedirs('./results/motion_generation/', exist_ok=True) with zipfile.ZipFile(results_path, 'r') as zip_ref: zip_ref.extractall('./results/motion_generation/') return res def grasp_generation(case_id): assert isinstance(case_id, str) res = f"./results/grasp_generation/results/{case_id}/{random.randint(0, 19)}.glb" if not os.path.exists(res): results_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'results/grasp_generation/results.zip') os.makedirs('./results/grasp_generation/', exist_ok=True) with zipfile.ZipFile(results_path, 'r') as zip_ref: zip_ref.extractall('./results/grasp_generation/') return res def path_planning(scene, mode, count, seed, opt, scale_opt, pla, scale_pla): scene_model_weight_path = pretrain_pointtrans_weight_path() data_dir = path_planning_data_path() override_config = [ "diffuser=ddpm", "model=unet", "model.use_position_embedding=true", f"model.scene_model.pretrained_weights={scene_model_weight_path}", "task=path_planning", "task.visualizer.name=PathPlanningRenderingVisualizerHF", f"task.visualizer.ksample={count}", f"task.dataset.data_dir={data_dir}", "task.dataset.repr_type=relative", "task.env.name=PathPlanningEnvWrapperHF", "task.env.inpainting_horizon=16", "task.env.robot_top=3.0", "task.env.env_adaption=false" ] if opt == True: override_config += [ "optimizer=path_in_scene", "optimizer.scale_type=div_var", "optimizer.continuity=false", f"optimizer.scale={scale_opt}", ] if pla == True: override_config += [ "planner=greedy_path_planning", f"planner.scale={scale_pla}", "planner.scale_type=div_var", "planner.greedy_type=all_frame_exp" ] initialize(config_path="./scenediffuser/configs", version_base=None) config = compose(config_name="default", overrides=override_config) random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) if mode == 'Sampling': img = _sampling(config, scene) res = (img, 0) elif mode == 'Planning': res = _planning(config, scene) else: res = (None, 0) hydra.core.global_hydra.GlobalHydra.instance().clear() return res