CapeX / app.py
matanru's picture
Update app.py
85fbaf5 verified
import spaces
import argparse
import random
import os
os.system('python setup.py develop')
import gradio as gr
import matplotlib
import numpy as np
import torch
from PIL import ImageDraw, Image
from matplotlib import pyplot as plt
from mmcv import Config
import json
# def replace_line(file_name, line_num, text):
# lines = open(file_name, 'r').readlines()
# lines[line_num] = text
# out = open(file_name, 'w')
# out.writelines(lines)
# out.close()
# def read_lines(file_name):
# lines = open(file_name, 'r').readlines()
# print(lines)
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/parallel/distributed.py", 7, "from mmengine import print_log\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/parallel/distributed.py", 8, "from mmengine.utils.dl_utils import TORCH_VERSION\nfrom mmengine.utils import digit_version\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/parallel/registry.py", 3, 'from mmengine.registry import Registry\n')
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/fileio/io.py", 5, "from mmengine.utils import is_list_of\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/checkpoint.py", 23, "from mmengine.utils import digit_version, mkdir_or_exist\nfrom mmengine.utils.dl_utils import load_url\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/hooks/hook.py", 1, "from mmengine.registry import Registry\nfrom mmengine.utils import is_method_overridden\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/hooks/evaluation.py",11, "from mmengine.utils import is_seq_of\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/hooks/logger/mlflow.py", 3, "from mmengine.utils.dl_utils import TORCH_VERSION\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/hooks/logger/tensorboard.py", 4, "from mmengine.utils.dl_utils import TORCH_VERSION\nfrom mmengine.utils import digit_version\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/hooks/logger/text.py", 12, "from mmengine.utils import is_tuple_of, scandir\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/hooks/logger/wandb.py", 5, "from mmengine.utils import scandir\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/hooks/optimizer.py", 11, "from mmengine.utils.dl_utils import TORCH_VERSION\nfrom mmcv.utils import IS_NPU_AVAILABLE\nfrom mmengine.utils.dl_utils.parrots_wrapper import _BatchNorm\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/hooks/optimizer.py", 14, "from mmengine.utils import digit_version\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/fp16_utils.py", 12, "from mmcv.utils import IS_NPU_AVAILABLE\nfrom mmengine.utils.dl_utils import TORCH_VERSION\nfrom mmengine.utils import digit_version\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/builder.py", 4, "from mmengine.registry import Registry\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/optimizer/builder.py", 7, "from mmcv.utils import IS_NPU_AVAILABLE\nfrom mmengine.registry import Registry, build_from_cfg\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/runner/optimizer/default_constructor.py", 8, "from mmengine.utils.dl_utils.parrots_wrapper import _BatchNorm, _InstanceNorm\nfrom mmengine.registry import build_from_cfg\nfrom mmengine.utils import is_list_of\n")
# def is_ipu_available() -> bool:
# try:
# import poptorch
# return poptorch.ipuHardwareIsAvailable()
# except ImportError:
# return False
# IS_IPU_AVAILABLE = str(is_ipu_available())
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/device/ipu/__init__.py", 1, f'IS_IPU_AVAILABLE = {IS_IPU_AVAILABLE}\n')
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/device/scatter_gather.py", 4, "from mmengine.utils import deprecated_api_warning\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmcv/device/_functions.py", 5, "from mmengine.utils import deprecated_api_warning\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmpose/__init__.py", 1, "from mmengine.utils import digit_version\nfrom mmcv import parse_version_info\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmpose/__init__.py", 21, "import mmcv\nmmcv_version = digit_version(mmcv.__version__)\n")
# replace_line("/usr/local/lib/python3.10/site-packages/mmpose/core/optimizers/builder.py", 3, "from mmengine.registry import Registry, build_from_cfg")
from mmcv.runner import load_checkpoint
from mmpose.core import wrap_fp16_model
from mmpose.models import build_posenet
from torchvision import transforms
from demo_text import Resize_Pad
from models import *
import networkx as nx
import matplotlib.pyplot as plt
import ast
import cv2
import matplotlib
# matplotlib.use('agg')
def edges_prompt_to_list(prompt):
if prompt[0] != "[":
prompt = "[" + prompt
if prompt[-1] != "]":
prompt += "]"
return ast.literal_eval(prompt)
def descriptions_prompt_to_list(prompt):
return prompt.split(',')
# Function to visualize the graph
def visualize_graph(node_descriptions, edges):
plt.close('all')
node_descriptions = descriptions_prompt_to_list(node_descriptions)
edges = edges_prompt_to_list(edges)
# Create an empty graph
G = nx.Graph()
G.clear()
# Add nodes with descriptions
for i, desc in enumerate(node_descriptions):
G.add_node(i, label=f'{i}:{desc}')
# Add edges
for edge in edges:
G.add_edge(edge[0], edge[1])
# Draw the graph
pos = nx.spring_layout(G) # Define layout
labels = nx.get_node_attributes(G, 'label') # Get labels
nx.draw(G, pos, with_labels=True, labels=labels, node_size=1500, node_color='skyblue', font_size=10, font_weight='bold', font_color='black') # Draw nodes with labels
nx.draw_networkx_edges(G, pos, width=2, edge_color='gray') # Draw edges
plt.title("Graph Visualization") # Set title
plt.axis('off') # Turn off axis
# plt.show() # Show plot
# Image from plot
fig = plt.gcf()
# fig.tight_layout(pad=0)
# To remove the huge white borders
# plt.margins(0)
fig.canvas.draw()
image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (3,))
plt.clf()
return image_from_plot
checkpoint_path = ''
def plot_query_results(query_img, query_w, skeleton, prediction, radius=6):
h, w, c = query_img.shape
prediction = prediction[-1].cpu().numpy() * h
# prediction = prediction.cpu().numpy() * h
query_img = (query_img - np.min(query_img)) / (
np.max(query_img) - np.min(query_img))
for id, (img, w, keypoint) in enumerate(zip([query_img],
[query_w],
[prediction])):
f, axes = plt.subplots()
plt.imshow(img)
for k in range(keypoint.shape[0]):
if w[k] > 0:
kp = keypoint[k, :2]
c = (1, 0, 0, 0.75) if w[k] == 1 else (0, 0, 1, 0.6)
patch = plt.Circle(kp, radius, color=c)
axes.add_patch(patch)
axes.text(kp[0], kp[1], k)
plt.draw()
for l, limb in enumerate(skeleton):
kp = keypoint[:, :2]
if l > len(COLORS) - 1:
c = [x / 255 for x in random.sample(range(0, 255), 3)]
else:
c = [x / 255 for x in COLORS[l]]
if w[limb[0]] > 0 and w[limb[1]] > 0:
patch = plt.Line2D([kp[limb[0], 0], kp[limb[1], 0]],
[kp[limb[0], 1], kp[limb[1], 1]],
linewidth=6, color=c, alpha=0.6)
axes.add_artist(patch)
plt.axis('off') # command for hiding the axis.
plt.subplots_adjust(0, 0, 1, 1, 0, 0)
plt.margins(0)
fig = plt.gcf()
fig.tight_layout(pad=0)
return plt
COLORS = [
[255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0],
[85, 255, 0], [0, 255, 0], [0, 255, 85], [0, 255, 170], [0, 255, 255],
[0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255], [170, 0, 255],
[255, 0, 255], [255, 0, 170], [255, 0, 85], [255, 0, 0]
]
model = None
# @spaces.GPU(duration=30)
# def estimate(model, data):
# with torch.no_grad():
# model_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# data["img_q"].to(device=model_device)
# data['target_weight_s'][0].to(device=model_device)
# print(f'img type: {data["img_q"].dtype}, target_weight type: {data["target_weight_s"][0].dtype}')
# model.to(model_device)
# model.eval()
# # return model(**data)
# return model(str(data))
# @spaces.GPU(duration=30)
def estimate(data):
global model
with torch.no_grad():
# model_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# data["img_q"].to(device=model_device)
# data['target_weight_s'][0].to(device=model_device)
return model(data)
# Custom JSON encoder to handle non-serializable objects
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
return super().default(obj)
def process(query_img, node_descriptions, edges,
cfg_path='configs/1shot-swin-gte/graph_split1_config.py'):
global model
node_descriptions = descriptions_prompt_to_list(node_descriptions)
edges = edges_prompt_to_list(edges)
cfg = Config.fromfile(cfg_path)
kp_src_tensor = torch.zeros((len(node_descriptions), 2))
preprocess = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
Resize_Pad(cfg.model.encoder_config.img_size,
cfg.model.encoder_config.img_size)])
if len(edges) == 0:
edges = [(0, 0)]
#model_device = "cuda" if torch.cuda.is_available() else "cpu"
np_query = np.array(query_img)[:, :, ::-1].copy()
q_img = preprocess(np_query).flip(0)[None] #.to(model_device)
# Create heatmap from keypoints
genHeatMap = TopDownGenerateTargetFewShot()
data_cfg = cfg.data_cfg
data_cfg['image_size'] = np.array([cfg.model.encoder_config.img_size,
cfg.model.encoder_config.img_size])
data_cfg['joint_weights'] = None
data_cfg['use_different_joint_weights'] = False
kp_src_3d = torch.cat(
(kp_src_tensor, torch.zeros(kp_src_tensor.shape[0], 1)), dim=-1)
kp_src_3d_weight = torch.cat(
(torch.ones_like(kp_src_tensor),
torch.zeros(kp_src_tensor.shape[0], 1)), dim=-1)
target_s, target_weight_s = genHeatMap._msra_generate_target(data_cfg,
kp_src_3d,
kp_src_3d_weight,
sigma=1)
target_s = torch.tensor(target_s).float()[None]
target_weight_s = torch.ones_like(
torch.tensor(target_weight_s).float()[None]) #.to(model_device)
data = {
'img_s': [0],
'img_q': q_img,
'target_s': [target_s],
'target_weight_s': [target_weight_s],
'target_q': None,
'target_weight_q': None,
'return_loss': False,
'img_metas': [{'sample_skeleton': [edges],
'query_skeleton': edges,
# 'sample_point_descriptions': np.array([node_descriptions]),
'sample_point_descriptions': node_descriptions,
'sample_joints_3d': [kp_src_3d],
'query_joints_3d': kp_src_3d,
'sample_center': [kp_src_tensor.mean(dim=0)],
'query_center': kp_src_tensor.mean(dim=0),
'sample_scale': [
kp_src_tensor.max(dim=0)[0] -
kp_src_tensor.min(dim=0)[0]],
'query_scale': kp_src_tensor.max(dim=0)[0] -
kp_src_tensor.min(dim=0)[0],
'sample_rotation': [0],
'query_rotation': 0,
'sample_bbox_score': [1],
'query_bbox_score': 1,
'query_image_file': '',
'sample_image_file': [''],
}]
}
# Load model
model = build_posenet(cfg.model)
fp16_cfg = cfg.get('fp16', None)
if fp16_cfg is not None:
wrap_fp16_model(model)
load_checkpoint(model, checkpoint_path, map_location='cpu')
#model.to(model_device)
#model.eval()
# with torch.no_grad():
# outputs = model(**data)
data["img_q"] = data["img_q"].cpu().numpy().tolist()
data['target_weight_s'][0] = data['target_weight_s'][0].cpu().numpy().tolist()
data['target_s'][0] = data['target_s'][0].cpu().numpy().tolist()
data['img_metas'][0]['sample_joints_3d'][0] = data['img_metas'][0]['sample_joints_3d'][0].cpu().tolist()
data['img_metas'][0]['query_joints_3d'] = data['img_metas'][0]['query_joints_3d'].cpu().tolist()
data['img_metas'][0]['sample_center'][0] = data['img_metas'][0]['sample_center'][0].cpu().tolist()
data['img_metas'][0]['query_center'] = data['img_metas'][0]['query_center'].cpu().tolist()
data['img_metas'][0]['sample_scale'][0] = data['img_metas'][0]['sample_scale'][0].cpu().tolist()
data['img_metas'][0]['query_scale'] = data['img_metas'][0]['query_scale'].cpu().tolist()
# # data['img_metas'][0]['sample_point_descriptions'] = data['img_metas'][0]['sample_point_descriptions'].tolist()
#model.cuda()
model.eval()
# return model(**data)
# with torch.no_grad():
# outputs = model(**data)
str_data = json.dumps(data, cls=CustomEncoder)
outputs = estimate(str_data)
#outputs = estimate(**data)
# visualize results
vis_q_weight = target_weight_s[0]
vis_q_image = q_img[0].detach().cpu().numpy().transpose(1, 2, 0)
out = plot_query_results(vis_q_image, vis_q_weight, edges, torch.tensor(outputs['points']).squeeze(0))
return out
def update_examples(query_img, node_descriptions, edges):
return query_img, node_descriptions, edges
with gr.Blocks() as demo:
state = gr.State({
'kp_src': [],
'skeleton': [],
'count': 0,
'color_idx': 0,
'prev_pt': None,
'prev_pt_idx': None,
'prev_clicked': None,
'point_descriptions': None,
})
gr.Markdown('''
# CapeX Demo
We present a novel category agnostic pose estimation approach that utilizes support text-graphs
(graphs with text on its nodes), instead of the conventional techniques that use support images.
By leveraging the abstraction power of text-graphs, CapeX showcases SOTA results on MP100 while dropping the need
of providing an annotated support image.
### [Paper](https://arxiv.org/pdf/2406.00384) | [GitHub](https://github.com/matanr/capex)
## Instructions
1. Explain using text the desired keypoints. Please refer to the example for the correct format.
2. Optionally provide a graph representing the connections between the keypoints. Please refer to the example for the right format.
3. Upload an image of the object you want to pose to the query image.
4. Click **Evaluate** to pose the query image.
''')
with gr.Row():
# Input block for node descriptions
node_descriptions = gr.Textbox(label="Node Descriptions (String separated by commas)", lines=5, type="text",
# value="left eye, right eye, nose, neck, root of tail, left shoulder, left elbow, "
# "left front paw, right shoulder, right elbow, right front paw, left hip, "
# "left knee, left back paw, right hip, right knee, right back paw"
value="left eye, nose, right eye"
)
# Input block for edges
edges = gr.Textbox(label="Edges (List of 2-valued lists representing connections)", lines=5, type="text",
# value="[[0, 1], [0, 2], [1, 2], [2, 3], [3, 4], [3, 5], [5, 6], [6, 7], [3, 8], "
# "[8, 9], [9, 10], [4, 11], [11, 12], [12, 13], [4, 14], [14, 15], [15, 16]]"
value="[[0,1], [1,2]]"
)
def set_initial_text_graph():
text_graph = visualize_graph("left eye, nose, right eye", "[[0,1], [1,2]]")
return text_graph
text_graph = gr.Image(label="Text-graph visualization",
value=set_initial_text_graph,
type="pil", height=400, width=400)
with gr.Row():
query_img = gr.Image(label="Query Image",
type="pil", height=400, width=400)
with gr.Row():
eval_btn = gr.Button(value="Evaluate")
with gr.Row():
output_img = gr.Plot(label="Output Image")
with gr.Row():
gr.Markdown("## Examples")
with gr.Row():
gr.Examples(
examples=[
['examples/animal.png',
"left eye, right eye, nose, neck, root of tail, left shoulder, left elbow, "
"left front paw, right shoulder, right elbow, right front paw, left hip, "
"left knee, left back paw, right hip, right knee, right back paw",
"[[0, 1], [0, 2], [1, 2], [2, 3], [3, 4], [3, 5], [5, 6], [6, 7], [3, 8], [8, 9],"
"[9, 10], [4, 11], [11, 12], [12, 13], [4, 14], [14, 15], [15, 16]]"
],
['examples/person.png',
"nose, left eye, right eye, left ear, right ear, left shoulder, right shoulder, left elbow, "
"right elbow, left wrist, right wrist, left hip, right hip, left knee, right knee, left ankle, "
"right ankle",
"[[15, 13], [13, 11], [16, 14], [14, 12], [11, 12], [5, 11], [6, 12], [5, 6], [5, 7],"
"[6, 8], [7, 9], [8, 10], [1, 2], [0, 1], [0, 2], [1, 3], [2, 4], [3, 5], [4, 6]]"
],
['examples/chair.png',
"left and front leg, right and front leg, right and back leg, left and back leg, "
"left and front side of the seat, right and front side of the seat, right and back side of the seat, "
"left and back side of the seat, top left side of the backseat, top right side of the backseat",
"[[0, 4], [3, 7], [1, 5], [2, 6], [4, 5], [5, 6], [6, 7], [7, 4], [6, 7], [7, 8],[8, 9], [9, 6]]",
],
['examples/car.png',
"front and right wheel, front and left wheel, rear and right wheel, rear and left wheel, "
"right headlight, left headlight, right taillight, left taillight, "
"front and right side of the top, front and left side of the top, rear and right side of the top, "
"rear and left side of the top",
"[[0, 2], [1, 3], [0, 1], [2, 3], [8, 10], [9, 11], [8, 9], [10, 11], [4, 0], "
"[4, 8], [4, 5], [5, 1], [5, 9], [6, 2], [6, 10], [7, 3], [7, 11], [6, 7]]"
]
],
inputs=[query_img, node_descriptions, edges],
outputs=[query_img, node_descriptions, edges],
fn=update_examples,
run_on_click=True,
)
eval_btn.click(fn=process,
inputs=[query_img, node_descriptions, edges],
outputs=[output_img])
node_descriptions.change(visualize_graph, inputs=[node_descriptions, edges], outputs=[text_graph])
edges.input(visualize_graph, inputs=[node_descriptions, edges], outputs=[text_graph])
# visualize_button.click(fn=visualize_graph,
# inputs=[node_descriptions, edges, state],
# outputs=[text_graph, state])
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='CapeX Demo')
parser.add_argument('--checkpoint',
help='checkpoint path',
default='swin-gte-split1.pth')
args = parser.parse_args()
checkpoint_path = args.checkpoint
demo.launch()