|
from transformers import DPTImageProcessor, DPTForDepthEstimation |
|
from segment_anything import SamAutomaticMaskGenerator, sam_model_registry, SamPredictor |
|
import gradio as gr |
|
import supervision as sv |
|
import torch |
|
import numpy as np |
|
from PIL import Image |
|
import requests |
|
import open3d as o3d |
|
import pandas as pd |
|
import plotly.express as px |
|
import matplotlib.pyplot as plt |
|
|
|
def remove_outliers(point_cloud, threshold=3.0): |
|
|
|
mean = np.mean(point_cloud, axis=0) |
|
std = np.std(point_cloud, axis=0) |
|
|
|
|
|
lower_bounds = mean - threshold * std |
|
upper_bounds = mean + threshold * std |
|
|
|
|
|
mask = np.all((point_cloud >= lower_bounds) & (point_cloud <= upper_bounds), axis=1) |
|
|
|
|
|
filtered_point_cloud = point_cloud[mask] |
|
|
|
return filtered_point_cloud |
|
|
|
|
|
def map_image_range(depth, min_value, max_value): |
|
""" |
|
Maps the values of a numpy image array to a specified range. |
|
|
|
Args: |
|
image (numpy.ndarray): Input image array with values ranging from 0 to 1. |
|
min_value (float): Minimum value of the new range. |
|
max_value (float): Maximum value of the new range. |
|
|
|
Returns: |
|
numpy.ndarray: Image array with values mapped to the specified range. |
|
""" |
|
|
|
print(np.min(depth)) |
|
print(np.max(depth)) |
|
depth = np.array(depth) |
|
|
|
depth = (depth - depth.min()) / (depth.max() - depth.min()) |
|
|
|
depth = 1 - depth |
|
print(np.min(depth)) |
|
print(np.max(depth)) |
|
|
|
mapped_image = (depth - 0) * (max_value - min_value) / (1 - 0) + min_value |
|
print(np.min(mapped_image)) |
|
print(np.max(mapped_image)) |
|
return mapped_image |
|
|
|
|
|
def PCL(mask, depth): |
|
assert mask.shape == depth.shape |
|
assert type(mask) == np.ndarray |
|
assert type(depth) == np.ndarray |
|
rgb_mask = np.zeros((mask.shape[0], mask.shape[1], 3)).astype("uint8") |
|
rgb_mask[mask] = (255, 0, 0) |
|
print(np.unique(rgb_mask)) |
|
depth_o3d = o3d.geometry.Image(depth) |
|
image_o3d = o3d.geometry.Image(rgb_mask) |
|
|
|
|
|
rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( |
|
image_o3d, depth_o3d, convert_rgb_to_intensity=False |
|
) |
|
|
|
pcd = o3d.geometry.PointCloud.create_from_rgbd_image( |
|
rgbd_image, |
|
o3d.camera.PinholeCameraIntrinsic( |
|
o3d.camera.PinholeCameraIntrinsicParameters.PrimeSenseDefault |
|
), |
|
) |
|
|
|
|
|
points = np.asarray(pcd.points) |
|
colors = np.asarray(pcd.colors) |
|
print(np.unique(colors, axis=0)) |
|
print(np.unique(colors, axis=1)) |
|
print(np.unique(colors)) |
|
mask = colors[:, 0] == 1.0 |
|
print(mask.sum()) |
|
print(colors.shape) |
|
points = points[mask] |
|
colors = colors[mask] |
|
return points, colors |
|
|
|
|
|
def PCL_rgb(rgb, depth): |
|
|
|
assert type(rgb) == np.ndarray |
|
assert type(depth) == np.ndarray |
|
depth_o3d = o3d.geometry.Image(depth) |
|
image_o3d = o3d.geometry.Image(rgb) |
|
rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( |
|
image_o3d, depth_o3d, convert_rgb_to_intensity=False |
|
) |
|
|
|
pcd = o3d.geometry.PointCloud.create_from_rgbd_image( |
|
rgbd_image, |
|
o3d.camera.PinholeCameraIntrinsic( |
|
o3d.camera.PinholeCameraIntrinsicParameters.PrimeSenseDefault |
|
), |
|
) |
|
|
|
points = np.asarray(pcd.points) |
|
colors = np.asarray(pcd.colors) |
|
return points, colors |
|
|
|
|
|
class DepthPredictor: |
|
def __init__(self): |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
self.feature_extractor = DPTImageProcessor.from_pretrained("Intel/dpt-large") |
|
self.model = DPTForDepthEstimation.from_pretrained("Intel/dpt-large") |
|
self.model.eval() |
|
|
|
def predict(self, image): |
|
|
|
encoding = self.feature_extractor(image, return_tensors="pt") |
|
|
|
with torch.no_grad(): |
|
outputs = self.model(**encoding) |
|
predicted_depth = outputs.predicted_depth |
|
|
|
prediction = torch.nn.functional.interpolate( |
|
predicted_depth.unsqueeze(1), |
|
size=image.size[::-1], |
|
mode="bicubic", |
|
align_corners=False, |
|
).squeeze() |
|
|
|
output = prediction.cpu().numpy() |
|
|
|
return output |
|
|
|
def generate_pcl(self, image): |
|
print(np.array(image).shape) |
|
depth = self.predict(image) |
|
print(depth.shape) |
|
|
|
depth_o3d = o3d.geometry.Image(depth) |
|
image_o3d = o3d.geometry.Image(np.array(image)) |
|
rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( |
|
image_o3d, depth_o3d, convert_rgb_to_intensity=False |
|
) |
|
|
|
pcd = o3d.geometry.PointCloud.create_from_rgbd_image( |
|
rgbd_image, |
|
o3d.camera.PinholeCameraIntrinsic( |
|
o3d.camera.PinholeCameraIntrinsicParameters.PrimeSenseDefault |
|
), |
|
) |
|
|
|
points = np.asarray(pcd.points) |
|
colors = np.asarray(pcd.colors) |
|
print(points.shape, colors.shape) |
|
return points, colors |
|
|
|
def generate_fig(self, image): |
|
points, colors = self.generate_pcl(image) |
|
data = { |
|
"x": points[:, 0], |
|
"y": points[:, 1], |
|
"z": points[:, 2], |
|
"red": colors[:, 0], |
|
"green": colors[:, 1], |
|
"blue": colors[:, 2], |
|
} |
|
df = pd.DataFrame(data) |
|
size = np.zeros(len(df)) |
|
size[:] = 0.01 |
|
|
|
fig = px.scatter_3d(df, x="x", y="y", z="z", color="red", size=size) |
|
return fig |
|
|
|
def generate_fig2(self, image): |
|
points, colors = self.generate_pcl(image) |
|
|
|
fig = plt.figure() |
|
ax = fig.add_subplot(111, projection="3d") |
|
ax.scatter(points, size=0.01, c=colors, marker="o") |
|
return fig |
|
|
|
def generate_obj_rgb(self, image, n_samples, cube_size, max_depth, min_depth): |
|
|
|
depth = self.predict(image) |
|
image = np.array(image) |
|
depth = map_image_range(depth, min_depth, max_depth) |
|
point_cloud, color_array = PCL_rgb(image, depth) |
|
idxs = np.random.choice(len(point_cloud), int(n_samples)) |
|
point_cloud = point_cloud[idxs] |
|
color_array = color_array[idxs] |
|
|
|
mesh = o3d.geometry.TriangleMesh() |
|
|
|
for point, color in zip(point_cloud, color_array): |
|
cube = o3d.geometry.TriangleMesh.create_box( |
|
width=cube_size, height=cube_size, depth=cube_size |
|
) |
|
cube.translate(-point) |
|
cube.paint_uniform_color(color) |
|
mesh += cube |
|
|
|
output_file = "./cloud.obj" |
|
o3d.io.write_triangle_mesh(output_file, mesh) |
|
return output_file |
|
|
|
def generate_obj_masks(self, image, n_samples, masks, cube_size): |
|
|
|
point_cloud, color_array = self.generate_pcl(image) |
|
print(point_cloud.shape) |
|
mesh = o3d.geometry.TriangleMesh() |
|
|
|
cs = [(255, 0, 0), (0, 255, 0), (0, 0, 255)] |
|
for c, (mask, _) in zip(cs, masks): |
|
mask = mask.ravel() |
|
point_cloud_subset, color_array_subset = ( |
|
point_cloud[mask], |
|
color_array[mask], |
|
) |
|
idxs = np.random.choice(len(point_cloud_subset), int(n_samples)) |
|
point_cloud_subset = point_cloud_subset[idxs] |
|
for point in point_cloud_subset: |
|
cube = o3d.geometry.TriangleMesh.create_box( |
|
width=cube_size, height=cube_size, depth=cube_size |
|
) |
|
cube.translate(-point) |
|
cube.paint_uniform_color(c) |
|
mesh += cube |
|
|
|
output_file = "./cloud.obj" |
|
o3d.io.write_triangle_mesh(output_file, mesh) |
|
return output_file |
|
|
|
def generate_obj_masks2( |
|
self, image, masks, cube_size, n_samples, min_depth, max_depth |
|
): |
|
|
|
depth = self.predict(image) |
|
depth = map_image_range(depth, min_depth, max_depth) |
|
image = np.array(image) |
|
mesh = o3d.geometry.TriangleMesh() |
|
|
|
print(len(masks)) |
|
cs = [(1, 0, 0), (0, 1, 0), (0, 0, 1)] |
|
for c, (mask, _) in zip(cs, masks): |
|
points, _ = PCL(mask, depth) |
|
idxs = np.random.choice(len(points), int(n_samples)) |
|
points = points[idxs] |
|
points = remove_outliers(points) |
|
for point in points: |
|
cube = o3d.geometry.TriangleMesh.create_box( |
|
width=cube_size, height=cube_size, depth=cube_size |
|
) |
|
cube.translate(-point) |
|
cube.paint_uniform_color(c) |
|
mesh += cube |
|
|
|
output_file = "./cloud.obj" |
|
o3d.io.write_triangle_mesh(output_file, mesh) |
|
return output_file |
|
|
|
|
|
import numpy as np |
|
from typing import Optional, Tuple |
|
|
|
|
|
class CustomSamPredictor(SamPredictor): |
|
def __init__( |
|
self, |
|
sam_model, |
|
) -> None: |
|
super().__init__(sam_model) |
|
|
|
def encode_image( |
|
self, |
|
image: np.ndarray, |
|
image_format: str = "RGB", |
|
) -> None: |
|
""" |
|
Calculates the image embeddings for the provided image, allowing |
|
masks to be predicted with the 'predict' method. |
|
|
|
Arguments: |
|
image (np.ndarray): The image for calculating masks. Expects an |
|
image in HWC uint8 format, with pixel values in [0, 255]. |
|
image_format (str): The color format of the image, in ['RGB', 'BGR']. |
|
""" |
|
assert image_format in [ |
|
"RGB", |
|
"BGR", |
|
], f"image_format must be in ['RGB', 'BGR'], is {image_format}." |
|
if image_format != self.model.image_format: |
|
image = image[..., ::-1] |
|
|
|
|
|
input_image = self.transform.apply_image(image) |
|
input_image_torch = torch.as_tensor(input_image, device=self.device) |
|
input_image_torch = input_image_torch.permute(2, 0, 1).contiguous()[ |
|
None, :, :, : |
|
] |
|
self.set_torch_image(input_image_torch, image.shape[:2]) |
|
return self.get_image_embedding() |
|
|
|
def decode_and_predict( |
|
self, |
|
embedding: torch.Tensor, |
|
point_coords: Optional[np.ndarray] = None, |
|
point_labels: Optional[np.ndarray] = None, |
|
box: Optional[np.ndarray] = None, |
|
mask_input: Optional[np.ndarray] = None, |
|
multimask_output: bool = True, |
|
return_logits: bool = False, |
|
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
|
""" |
|
Decodes the provided image embedding and makes mask predictions based on prompts. |
|
|
|
Arguments: |
|
embedding (torch.Tensor): The image embedding to decode. |
|
... (other arguments from the predict function) |
|
|
|
Returns: |
|
(np.ndarray): The output masks in CxHxW format. |
|
(np.ndarray): An array of quality predictions for each mask. |
|
(np.ndarray): Low resolution mask logits for subsequent iterations. |
|
""" |
|
self.features = embedding |
|
self.is_image_set = True |
|
return self.predict( |
|
point_coords=point_coords, |
|
point_labels=point_labels, |
|
box=box, |
|
mask_input=mask_input, |
|
multimask_output=multimask_output, |
|
return_logits=return_logits, |
|
) |
|
|
|
def dummy_set_torch_image( |
|
self, |
|
transformed_image: torch.Tensor, |
|
original_image_size: Tuple[int, ...], |
|
) -> None: |
|
""" |
|
Calculates the image embeddings for the provided image, allowing |
|
masks to be predicted with the 'predict' method. Expects the input |
|
image to be already transformed to the format expected by the model. |
|
|
|
Arguments: |
|
transformed_image (torch.Tensor): The input image, with shape |
|
1x3xHxW, which has been transformed with ResizeLongestSide. |
|
original_image_size (tuple(int, int)): The size of the image |
|
before transformation, in (H, W) format. |
|
""" |
|
assert ( |
|
len(transformed_image.shape) == 4 |
|
and transformed_image.shape[1] == 3 |
|
and max(*transformed_image.shape[2:]) == self.model.image_encoder.img_size |
|
), f"set_torch_image input must be BCHW with long side {self.model.image_encoder.img_size}." |
|
self.reset_image() |
|
|
|
self.original_size = original_image_size |
|
self.input_size = tuple(transformed_image.shape[-2:]) |
|
input_image = self.model.preprocess(transformed_image) |
|
|
|
|
|
self.is_image_set = True |
|
|
|
def dummy_set_image( |
|
self, |
|
image: np.ndarray, |
|
image_format: str = "RGB", |
|
) -> None: |
|
""" |
|
Calculates the image embeddings for the provided image, allowing |
|
masks to be predicted with the 'predict' method. |
|
|
|
Arguments: |
|
image (np.ndarray): The image for calculating masks. Expects an |
|
image in HWC uint8 format, with pixel values in [0, 255]. |
|
image_format (str): The color format of the image, in ['RGB', 'BGR']. |
|
""" |
|
assert image_format in [ |
|
"RGB", |
|
"BGR", |
|
], f"image_format must be in ['RGB', 'BGR'], is {image_format}." |
|
if image_format != self.model.image_format: |
|
image = image[..., ::-1] |
|
|
|
|
|
input_image = self.transform.apply_image(image) |
|
input_image_torch = torch.as_tensor(input_image, device=self.device) |
|
input_image_torch = input_image_torch.permute(2, 0, 1).contiguous()[ |
|
None, :, :, : |
|
] |
|
|
|
self.dummy_set_torch_image(input_image_torch, image.shape[:2]) |
|
|
|
|
|
class SegmentPredictor: |
|
def __init__(self, device=None): |
|
MODEL_TYPE = "vit_h" |
|
checkpoint = "sam_vit_h_4b8939.pth" |
|
sam = sam_model_registry[MODEL_TYPE](checkpoint=checkpoint) |
|
|
|
if device is None: |
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
else: |
|
self.device = device |
|
sam.to(device=self.device) |
|
self.mask_generator = SamAutomaticMaskGenerator(sam) |
|
self.conditioned_pred = CustomSamPredictor(sam) |
|
|
|
def encode(self, image): |
|
image = np.array(image) |
|
return self.conditioned_pred.encode_image(image) |
|
|
|
def dummy_encode(self, image): |
|
image = np.array(image) |
|
self.conditioned_pred.dummy_set_image(image) |
|
|
|
def cond_pred(self, embedding, pts, lbls): |
|
lbls = np.array(lbls) |
|
pts = np.array(pts) |
|
masks, _, _ = self.conditioned_pred.decode_and_predict( |
|
embedding, point_coords=pts, point_labels=lbls, multimask_output=True |
|
) |
|
idxs = np.argsort(-masks.sum(axis=(1, 2))) |
|
sam_masks = [] |
|
for n, i in enumerate(idxs): |
|
sam_masks.append((masks[i], str(n))) |
|
return sam_masks |
|
|
|
def segment_everything(self, image): |
|
image = np.array(image) |
|
sam_result = self.mask_generator.generate(image) |
|
sam_masks = [] |
|
for i, mask in enumerate(sam_result): |
|
sam_masks.append((mask["segmentation"], str(i))) |
|
return sam_masks |
|
|