import sys sys.path.append('./extern/dust3r') from dust3r.inference import inference, load_model from dust3r.utils.image import load_images from dust3r.image_pairs import make_pairs from dust3r.cloud_opt import global_aligner, GlobalAlignerMode from dust3r.utils.device import to_numpy import trimesh import torch import numpy as np import torchvision import os import copy import cv2 from PIL import Image import pytorch3d print(pytorch3d.__version__) from pytorch3d.structures import Pointclouds from torchvision.utils import save_image import torch.nn.functional as F import torchvision.transforms as transforms from PIL import Image from utils.pvd_utils import * from omegaconf import OmegaConf from pytorch_lightning import seed_everything from utils.diffusion_utils import instantiate_from_config,load_model_checkpoint,image_guided_synthesis from pathlib import Path from torchvision.utils import save_image class ViewCrafter: def __init__(self, opts, gradio = False): self.opts = opts self.device = opts.device self.setup_dust3r() self.setup_diffusion() #fixme # initialize ref images, pcd if not gradio: self.images, self.img_ori = self.load_initial_images(image_dir=self.opts.image_dir) self.run_dust3r(input_images=self.images) def run_dust3r(self, input_images,clean_pc = False): pairs = make_pairs(input_images, scene_graph='complete', prefilter=None, symmetrize=True) output = inference(pairs, self.dust3r, self.device, batch_size=self.opts.batch_size) mode = GlobalAlignerMode.PointCloudOptimizer #if len(self.images) > 2 else GlobalAlignerMode.PairViewer scene = global_aligner(output, device=self.device, mode=mode) if mode == GlobalAlignerMode.PointCloudOptimizer: loss = scene.compute_global_alignment(init='mst', niter=self.opts.niter, schedule=self.opts.schedule, lr=self.opts.lr) if clean_pc: self.scene = scene.clean_pointcloud() else: self.scene = scene def render_pcd(self,pts3d,imgs,masks,views,renderer,device): imgs = to_numpy(imgs) pts3d = to_numpy(pts3d) if masks == None: pts = torch.from_numpy(np.concatenate([p for p in pts3d])).view(-1, 3).to(device) col = torch.from_numpy(np.concatenate([p for p in imgs])).view(-1, 3).to(device) else: # masks = to_numpy(masks) pts = torch.from_numpy(np.concatenate([p[m] for p, m in zip(pts3d, masks)])).to(device) col = torch.from_numpy(np.concatenate([p[m] for p, m in zip(imgs, masks)])).to(device) color_mask = torch.ones(col.shape).to(device) # point_cloud_mask = Pointclouds(points=[pts],features=[color_mask]).extend(views) point_cloud = Pointclouds(points=[pts], features=[col]).extend(views) images = renderer(point_cloud) # view_masks = renderer(point_cloud_mask) return images, None def run_render(self, pcd, imgs,masks, H, W, camera_traj,num_views,use_cpu=False): if use_cpu: device = torch.device("cpu") else: device = self.device render_setup = setup_renderer(camera_traj, image_size=(H,W)) renderer = render_setup['renderer'] render_results, viewmask = self.render_pcd(pcd, imgs, masks, num_views,renderer,device) return render_results, viewmask def run_diffusion(self, renderings): prompts = [self.opts.prompt] videos = (renderings * 2. - 1.).permute(3,0,1,2).unsqueeze(0).to(self.device) condition_index = [0] with torch.no_grad(), torch.cuda.amp.autocast(): # [1,1,c,t,h,w] batch_samples = image_guided_synthesis(self.diffusion, prompts, videos, self.noise_shape, self.opts.n_samples, self.opts.ddim_steps, self.opts.ddim_eta, \ self.opts.unconditional_guidance_scale, self.opts.cfg_img, self.opts.frame_stride, self.opts.text_input, self.opts.multiple_cond_cfg, self.opts.timestep_spacing, self.opts.guidance_rescale, condition_index) # save_results_seperate(batch_samples[0], self.opts.save_dir, fps=8) # torch.Size([1, 3, 25, 576, 1024]) [-1,1] return torch.clamp(batch_samples[0][0].permute(1,2,3,0), -1., 1.) def nvs_single_view(self, gradio=False): # 最后一个view为 0 pose c2ws = self.scene.get_im_poses().detach()[1:] principal_points = self.scene.get_principal_points().detach()[1:] #cx cy focals = self.scene.get_focals().detach()[1:] shape = self.images[0]['true_shape'] H, W = int(shape[0][0]), int(shape[0][1]) pcd = [i.detach() for i in self.scene.get_pts3d(clip_thred=self.opts.dpt_trd)] # a list of points of size whc depth = [i.detach() for i in self.scene.get_depthmaps()] depth_avg = depth[-1][H//2,W//2] #以图像中心处的depth(z)为球心旋转 radius = depth_avg*self.opts.center_scale #缩放调整 ## change coordinate c2ws,pcd = world_point_to_obj(poses=c2ws, points=torch.stack(pcd), k=-1, r=radius, elevation=self.opts.elevation, device=self.device) imgs = np.array(self.scene.imgs) masks = None if self.opts.mode == 'single_view_nbv': ## 输入candidate->渲染mask->最大mask对应的pose作为nbv ## nbv模式下self.opts.d_theta[0], self.opts.d_phi[0]代表search space中的网格theta, phi之间的间距; self.opts.d_phi[0]的符号代表方向,分为左右两个方向 ## FIXME hard coded candidate view数量, 以left为例,第一次迭代从[左,左上]中选取, 从第二次开始可以从[左,左上,左下]中选取 num_candidates = 2 candidate_poses,thetas,phis = generate_candidate_poses(c2ws, H, W, focals, principal_points, self.opts.d_theta[0], self.opts.d_phi[0],num_candidates, self.device) _, viewmask = self.run_render([pcd[-1]], [imgs[-1]],masks, H, W, candidate_poses,num_candidates,use_cpu=False) nbv_id = torch.argmin(viewmask.sum(dim=[1,2,3])).item() save_image( viewmask.permute(0,3,1,2), os.path.join(self.opts.save_dir,f"candidate_mask0_nbv{nbv_id}.png"), normalize=True, value_range=(0, 1)) theta_nbv = thetas[nbv_id] phi_nbv = phis[nbv_id] # generate camera trajectory from T_curr to T_nbv camera_traj,num_views = generate_traj_specified(c2ws, H, W, focals, principal_points, theta_nbv, phi_nbv, self.opts.d_r[0],self.opts.video_length, self.device) # 重置elevation self.opts.elevation -= theta_nbv elif self.opts.mode == 'single_view_target': camera_traj,num_views = generate_traj_specified(c2ws, H, W, focals, principal_points, self.opts.d_theta[0], self.opts.d_phi[0], self.opts.d_r[0],self.opts.video_length, self.device) elif self.opts.mode == 'single_view_txt': if not gradio: with open(self.opts.traj_txt, 'r') as file: lines = file.readlines() phi = [float(i) for i in lines[0].split()] theta = [float(i) for i in lines[1].split()] r = [float(i) for i in lines[2].split()] else: phi, theta, r = self.gradio_traj # device = torch.device("cpu") device = self.device camera_traj,num_views = generate_traj_txt(c2ws, H, W, focals, principal_points, phi, theta, r,self.opts.video_length, device,viz_traj=True, save_dir = self.opts.save_dir) # camera_traj,num_views = generate_traj_txt(c2ws, H, W, focals, principal_points, phi, theta, r,self.opts.video_length, self.device,viz_traj=True, save_dir = self.opts.save_dir) else: raise KeyError(f"Invalid Mode: {self.opts.mode}") render_results, viewmask = self.run_render([pcd[-1]], [imgs[-1]],masks, H, W, camera_traj,num_views,use_cpu=False) render_results = render_results.to(self.device) render_results = F.interpolate(render_results.permute(0,3,1,2), size=(576, 1024), mode='bilinear', align_corners=False).permute(0,2,3,1) render_results[0] = self.img_ori if self.opts.mode == 'single_view_txt': if phi[-1]==0. and theta[-1]==0. and r[-1]==0.: render_results[-1] = self.img_ori # torch.Size([25, 576, 1024, 3]), [0,1] # save_pointcloud_with_normals([imgs[-1]], [pcd[-1]], msk=None, save_path=os.path.join(self.opts.save_dir,'pcd0.ply') , mask_pc=False, reduce_pc=False) # diffusion_results = self.run_diffusion(render_results) # save_video((diffusion_results + 1.0) / 2.0, os.path.join(self.opts.save_dir, 'diffusion0.mp4')) return render_results def nvs_sparse_view(self,iter): c2ws = self.scene.get_im_poses().detach() principal_points = self.scene.get_principal_points().detach() focals = self.scene.get_focals().detach() shape = self.images[0]['true_shape'] H, W = int(shape[0][0]), int(shape[0][1]) pcd = [i.detach() for i in self.scene.get_pts3d(clip_thred=self.opts.dpt_trd)] # a list of points of size whc depth = [i.detach() for i in self.scene.get_depthmaps()] depth_avg = depth[0][H//2,W//2] #以ref图像中心处的depth(z)为球心旋转 radius = depth_avg*self.opts.center_scale #缩放调整 ## masks for cleaner point cloud self.scene.min_conf_thr = float(self.scene.conf_trf(torch.tensor(self.opts.min_conf_thr))) masks = self.scene.get_masks() depth = self.scene.get_depthmaps() bgs_mask = [dpt > self.opts.bg_trd*(torch.max(dpt[40:-40,:])+torch.min(dpt[40:-40,:])) for dpt in depth] masks_new = [m+mb for m, mb in zip(masks,bgs_mask)] masks = to_numpy(masks_new) ## render, 从c2ws[0]即ref image对应的相机开始 imgs = np.array(self.scene.imgs) if self.opts.mode == 'single_view_ref_iterative': c2ws,pcd = world_point_to_obj(poses=c2ws, points=torch.stack(pcd), k=0, r=radius, elevation=self.opts.elevation, device=self.device) camera_traj,num_views = generate_traj_specified(c2ws[0:1], H, W, focals[0:1], principal_points[0:1], self.opts.d_theta[iter], self.opts.d_phi[iter], self.opts.d_r[iter],self.opts.video_length, self.device) render_results, viewmask = self.run_render(pcd, imgs,masks, H, W, camera_traj,num_views) render_results = F.interpolate(render_results.permute(0,3,1,2), size=(576, 1024), mode='bilinear', align_corners=False).permute(0,2,3,1) render_results[0] = self.img_ori elif self.opts.mode == 'single_view_1drc_iterative': self.opts.elevation -= self.opts.d_theta[iter-1] c2ws,pcd = world_point_to_obj(poses=c2ws, points=torch.stack(pcd), k=-1, r=radius, elevation=self.opts.elevation, device=self.device) camera_traj,num_views = generate_traj_specified(c2ws[-1:], H, W, focals[-1:], principal_points[-1:], self.opts.d_theta[iter], self.opts.d_phi[iter], self.opts.d_r[iter],self.opts.video_length, self.device) render_results, viewmask = self.run_render(pcd, imgs,masks, H, W, camera_traj,num_views) render_results = F.interpolate(render_results.permute(0,3,1,2), size=(576, 1024), mode='bilinear', align_corners=False).permute(0,2,3,1) render_results[0] = (self.images[-1]['img_ori'].squeeze(0).permute(1,2,0)+1.)/2. elif self.opts.mode == 'single_view_nbv': c2ws,pcd = world_point_to_obj(poses=c2ws, points=torch.stack(pcd), k=-1, r=radius, elevation=self.opts.elevation, device=self.device) ## 输入candidate->渲染mask->最大mask对应的pose作为nbv ## nbv模式下self.opts.d_theta[0], self.opts.d_phi[0]代表search space中的网格theta, phi之间的间距; self.opts.d_phi[0]的符号代表方向,分为左右两个方向 ## FIXME hard coded candidate view数量, 以left为例,第一次迭代从[左,左上]中选取, 从第二次开始可以从[左,左上,左下]中选取 num_candidates = 3 candidate_poses,thetas,phis = generate_candidate_poses(c2ws[-1:], H, W, focals[-1:], principal_points[-1:], self.opts.d_theta[0], self.opts.d_phi[0], num_candidates, self.device) _, viewmask = self.run_render(pcd, imgs,masks, H, W, candidate_poses,num_candidates) nbv_id = torch.argmin(viewmask.sum(dim=[1,2,3])).item() save_image(viewmask.permute(0,3,1,2), os.path.join(self.opts.save_dir,f"candidate_mask{iter}_nbv{nbv_id}.png"), normalize=True, value_range=(0, 1)) theta_nbv = thetas[nbv_id] phi_nbv = phis[nbv_id] # generate camera trajectory from T_curr to T_nbv camera_traj,num_views = generate_traj_specified(c2ws[-1:], H, W, focals[-1:], principal_points[-1:], theta_nbv, phi_nbv, self.opts.d_r[0],self.opts.video_length, self.device) # 重置elevation self.opts.elevation -= theta_nbv render_results, viewmask = self.run_render(pcd, imgs,masks, H, W, camera_traj,num_views) render_results = F.interpolate(render_results.permute(0,3,1,2), size=(576, 1024), mode='bilinear', align_corners=False).permute(0,2,3,1) render_results[0] = (self.images[-1]['img_ori'].squeeze(0).permute(1,2,0)+1.)/2. else: raise KeyError(f"Invalid Mode: {self.opts.mode}") save_video(render_results, os.path.join(self.opts.save_dir, f'render{iter}.mp4')) save_pointcloud_with_normals(imgs, pcd, msk=masks, save_path=os.path.join(self.opts.save_dir, f'pcd{iter}.ply') , mask_pc=True, reduce_pc=False) diffusion_results = self.run_diffusion(render_results) save_video((diffusion_results + 1.0) / 2.0, os.path.join(self.opts.save_dir, f'diffusion{iter}.mp4')) # torch.Size([25, 576, 1024, 3]) return diffusion_results def nvs_single_view_ref_iterative(self): all_results = [] sample_rate = 6 idx = 1 #初始包含1张ref image for itr in range(0, len(self.opts.d_phi)): if itr == 0: self.images = [self.images[0]] #去掉后一份copy diffusion_results_itr = self.nvs_single_view() # diffusion_results_itr = torch.randn([25, 576, 1024, 3]).to(self.device) diffusion_results_itr = diffusion_results_itr.permute(0,3,1,2) all_results.append(diffusion_results_itr) else: for i in range(0+sample_rate, diffusion_results_itr.shape[0], sample_rate): self.images.append(get_input_dict(diffusion_results_itr[i:i+1,...],idx,dtype = torch.float32)) idx += 1 self.run_dust3r(input_images=self.images, clean_pc=True) diffusion_results_itr = self.nvs_sparse_view(itr) # diffusion_results_itr = torch.randn([25, 576, 1024, 3]).to(self.device) diffusion_results_itr = diffusion_results_itr.permute(0,3,1,2) all_results.append(diffusion_results_itr) return all_results def nvs_single_view_1drc_iterative(self): all_results = [] sample_rate = 6 idx = 1 #初始包含1张ref image for itr in range(0, len(self.opts.d_phi)): if itr == 0: self.images = [self.images[0]] #去掉后一份copy diffusion_results_itr = self.nvs_single_view() # diffusion_results_itr = torch.randn([25, 576, 1024, 3]).to(self.device) diffusion_results_itr = diffusion_results_itr.permute(0,3,1,2) all_results.append(diffusion_results_itr) else: for i in range(0+sample_rate, diffusion_results_itr.shape[0], sample_rate): self.images.append(get_input_dict(diffusion_results_itr[i:i+1,...],idx,dtype = torch.float32)) idx += 1 self.run_dust3r(input_images=self.images, clean_pc=True) diffusion_results_itr = self.nvs_sparse_view(itr) # diffusion_results_itr = torch.randn([25, 576, 1024, 3]).to(self.device) diffusion_results_itr = diffusion_results_itr.permute(0,3,1,2) all_results.append(diffusion_results_itr) return all_results def nvs_single_view_nbv(self): # lef and right # d_theta and a_phi 是搜索空间的顶点间隔 all_results = [] ## FIXME: hard coded sample_rate = 6 max_itr = 3 idx = 1 #初始包含1张ref image for itr in range(0, max_itr): if itr == 0: self.images = [self.images[0]] #去掉后一份copy diffusion_results_itr = self.nvs_single_view() # diffusion_results_itr = torch.randn([25, 576, 1024, 3]).to(self.device) diffusion_results_itr = diffusion_results_itr.permute(0,3,1,2) all_results.append(diffusion_results_itr) else: for i in range(0+sample_rate, diffusion_results_itr.shape[0], sample_rate): self.images.append(get_input_dict(diffusion_results_itr[i:i+1,...],idx,dtype = torch.float32)) idx += 1 self.run_dust3r(input_images=self.images, clean_pc=True) diffusion_results_itr = self.nvs_sparse_view(itr) # diffusion_results_itr = torch.randn([25, 576, 1024, 3]).to(self.device) diffusion_results_itr = diffusion_results_itr.permute(0,3,1,2) all_results.append(diffusion_results_itr) return all_results def setup_diffusion(self): seed_everything(self.opts.seed) config = OmegaConf.load(self.opts.config) model_config = config.pop("model", OmegaConf.create()) ## set use_checkpoint as False as when using deepspeed, it encounters an error "deepspeed backend not set" model_config['params']['unet_config']['params']['use_checkpoint'] = False model = instantiate_from_config(model_config) model = model.to(self.device) model.cond_stage_model.device = self.device model.perframe_ae = self.opts.perframe_ae assert os.path.exists(self.opts.ckpt_path), "Error: checkpoint Not Found!" model = load_model_checkpoint(model, self.opts.ckpt_path) model.eval() self.diffusion = model h, w = self.opts.height // 8, self.opts.width // 8 channels = model.model.diffusion_model.out_channels n_frames = self.opts.video_length self.noise_shape = [self.opts.bs, channels, n_frames, h, w] def setup_dust3r(self): self.dust3r = load_model(self.opts.model_path, self.device) def load_initial_images(self, image_dir): ## load images ## dict_keys(['img', 'true_shape', 'idx', 'instance', 'img_ori']),张量形式 images = load_images([image_dir], size=512,force_1024 = True) img_ori = (images[0]['img_ori'].squeeze(0).permute(1,2,0)+1.)/2. # [576,1024,3] [0,1] # img_ori = Image.open(image_dir).convert('RGB') # transform = transforms.Compose([ # transforms.Resize((576, 1024)), # transforms.ToTensor(), # transforms.Normalize((0., 0., 0.), (1., 1., 1.)) # 归一化到[-1,1],如果要归一化到[0,1],请使用transforms.Normalize((0., 0., 0.), (1., 1., 1.)) # ]) # img_ori = transform(img_ori).permute(1,2,0).to(self.device) if len(images) == 1: images = [images[0], copy.deepcopy(images[0])] images[1]['idx'] = 1 return images, img_ori def run_traj(self,i2v_input_image, i2v_elevation, i2v_center_scale, i2v_d_phi, i2v_d_theta, i2v_d_r): self.opts.elevation = float(i2v_elevation) self.opts.center_scale = float(i2v_center_scale) self.gradio_traj = [float(i) for i in i2v_d_phi.split()],[float(i) for i in i2v_d_theta.split()],[float(i) for i in i2v_d_r.split()] transform = transforms.Compose([ transforms.Resize(576), transforms.CenterCrop((576,1024)), ]) torch.cuda.empty_cache() img_tensor = torch.from_numpy(i2v_input_image).permute(2, 0, 1).unsqueeze(0).float().to(self.device) img_tensor = (img_tensor / 255. - 0.5) * 2 image_tensor_resized = transform(img_tensor) #1,3,h,w images = get_input_dict(image_tensor_resized,idx = 0,dtype = torch.float32) images = [images, copy.deepcopy(images)] images[1]['idx'] = 1 self.images = images self.img_ori = (image_tensor_resized.squeeze(0).permute(1,2,0) + 1.)/2. # self.images: torch.Size([1, 3, 288, 512]), [-1,1] # self.img_ori: torch.Size([576, 1024, 3]), [0,1] # self.images, self.img_ori = self.load_initial_images(image_dir=i2v_input_image) self.run_dust3r(input_images=self.images) render_results = self.nvs_single_view(gradio=True) save_video(render_results, os.path.join(self.opts.save_dir, 'render0.mp4')) traj_dir = os.path.join(self.opts.save_dir, "viz_traj.mp4") return traj_dir def run_gen(self,i2v_steps, i2v_seed): self.opts.ddim_steps = i2v_steps seed_everything(i2v_seed) render_dir = os.path.join(self.opts.save_dir, 'render0.mp4') video = imageio.get_reader(render_dir, 'ffmpeg') frames = [] for frame in video: frame = frame / 255.0 frames.append(frame) frames = np.array(frames) ##torch.Size([25, 576, 1024, 3]) render_results = torch.from_numpy(frames).to(self.device).half() gen_dir = os.path.join(self.opts.save_dir, "diffusion0.mp4") diffusion_results = self.run_diffusion(render_results) save_video((diffusion_results + 1.0) / 2.0, os.path.join(self.opts.save_dir, 'diffusion0.mp4')) return gen_dir