from ultralytics import YOLO import numpy as np import matplotlib.pyplot as plt import gradio as gr import cv2 import torch # import queue # import threading # from PIL import Image model = YOLO('checkpoints/FastSAM.pt') # load a custom model def fast_process(annotations, image, high_quality, device): if isinstance(annotations[0],dict): annotations = [annotation['segmentation'] for annotation in annotations] original_h = image.height original_w = image.width fig = plt.figure(figsize=(10, 10)) plt.imshow(image) if high_quality == True: if isinstance(annotations[0],torch.Tensor): annotations = np.array(annotations.cpu()) for i, mask in enumerate(annotations): mask = cv2.morphologyEx(mask.astype(np.uint8), cv2.MORPH_CLOSE, np.ones((3, 3), np.uint8)) annotations[i] = cv2.morphologyEx(mask.astype(np.uint8), cv2.MORPH_OPEN, np.ones((8, 8), np.uint8)) if device == 'cpu': annotations = np.array(annotations) fast_show_mask(annotations, plt.gca(), bbox=None, points=None, pointlabel=None, retinamask=True, target_height=original_h, target_width=original_w) else: if isinstance(annotations[0],np.ndarray): annotations = torch.from_numpy(annotations) fast_show_mask_gpu(annotations, plt.gca(), bbox=None, points=None, pointlabel=None) if isinstance(annotations, torch.Tensor): annotations = annotations.cpu().numpy() if high_quality == True: contour_all = [] temp = np.zeros((original_h, original_w,1)) for i, mask in enumerate(annotations): if type(mask) == dict: mask = mask['segmentation'] annotation = mask.astype(np.uint8) contours, _ = cv2.findContours(annotation, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) for contour in contours: contour_all.append(contour) cv2.drawContours(temp, contour_all, -1, (255, 255, 255), 2) color = np.array([0 / 255, 0 / 255, 255 / 255, 0.8]) contour_mask = temp / 225 * color.reshape(1, 1, -1) plt.imshow(contour_mask) plt.axis('off') plt.tight_layout() return fig # CPU post process def fast_show_mask(annotation, ax, bbox=None, points=None, pointlabel=None, retinamask=True, target_height=960, target_width=960): msak_sum = annotation.shape[0] height = annotation.shape[1] weight = annotation.shape[2] # 将annotation 按照面积 排序 areas = np.sum(annotation, axis=(1, 2)) sorted_indices = np.argsort(areas)[::1] annotation = annotation[sorted_indices] index = (annotation != 0).argmax(axis=0) color = np.random.random((msak_sum,1,1,3)) transparency = np.ones((msak_sum,1,1,1)) * 0.6 visual = np.concatenate([color,transparency],axis=-1) mask_image = np.expand_dims(annotation,-1) * visual show = np.zeros((height,weight,4)) h_indices, w_indices = np.meshgrid(np.arange(height), np.arange(weight), indexing='ij') indices = (index[h_indices, w_indices], h_indices, w_indices, slice(None)) # 使用向量化索引更新show的值 show[h_indices, w_indices, :] = mask_image[indices] if bbox is not None: x1, y1, x2, y2 = bbox ax.add_patch(plt.Rectangle((x1, y1), x2 - x1, y2 - y1, fill=False, edgecolor='b', linewidth=1)) # draw point if points is not None: plt.scatter([point[0] for i, point in enumerate(points) if pointlabel[i]==1], [point[1] for i, point in enumerate(points) if pointlabel[i]==1], s=20, c='y') plt.scatter([point[0] for i, point in enumerate(points) if pointlabel[i]==0], [point[1] for i, point in enumerate(points) if pointlabel[i]==0], s=20, c='m') if retinamask==False: show = cv2.resize(show,(target_width,target_height),interpolation=cv2.INTER_NEAREST) ax.imshow(show) def fast_show_mask_gpu(annotation, ax, bbox=None, points=None, pointlabel=None): msak_sum = annotation.shape[0] height = annotation.shape[1] weight = annotation.shape[2] areas = torch.sum(annotation, dim=(1, 2)) sorted_indices = torch.argsort(areas, descending=False) annotation = annotation[sorted_indices] # 找每个位置第一个非零值下标 index = (annotation != 0).to(torch.long).argmax(dim=0) color = torch.rand((msak_sum,1,1,3)).to(annotation.device) transparency = torch.ones((msak_sum,1,1,1)).to(annotation.device) * 0.6 visual = torch.cat([color,transparency],dim=-1) mask_image = torch.unsqueeze(annotation,-1) * visual # 按index取数,index指每个位置选哪个batch的数,把mask_image转成一个batch的形式 show = torch.zeros((height,weight,4)).to(annotation.device) h_indices, w_indices = torch.meshgrid(torch.arange(height), torch.arange(weight)) indices = (index[h_indices, w_indices], h_indices, w_indices, slice(None)) # 使用向量化索引更新show的值 show[h_indices, w_indices, :] = mask_image[indices] show_cpu = show.cpu().numpy() if bbox is not None: x1, y1, x2, y2 = bbox ax.add_patch(plt.Rectangle((x1, y1), x2 - x1, y2 - y1, fill=False, edgecolor='b', linewidth=1)) # draw point if points is not None: plt.scatter([point[0] for i, point in enumerate(points) if pointlabel[i]==1], [point[1] for i, point in enumerate(points) if pointlabel[i]==1], s=20, c='y') plt.scatter([point[0] for i, point in enumerate(points) if pointlabel[i]==0], [point[1] for i, point in enumerate(points) if pointlabel[i]==0], s=20, c='m') ax.imshow(show_cpu) # # 预测队列 # prediction_queue = queue.Queue(maxsize=5) # # 线程锁 # lock = threading.Lock() device = 'cuda' if torch.cuda.is_available() else 'cpu' def predict(input, input_size=512, high_visual_quality=False): input_size = int(input_size) # 确保 imgsz 是整数 # # 获取线程锁 # with lock: # print('5') # # 将任务添加到队列 # prediction_queue.put((input, input_size, high_visual_quality)) # # 等待结果 # print('6') # fig = prediction_queue.get()[0] # print(fig) # return fig results = model(input, device=device, retina_masks=True, iou=0.7, conf=0.25, imgsz=input_size) fig = fast_process(annotations=results[0].masks.data, image=input, high_quality=high_visual_quality, device=device) return fig # def worker(): # while True: # # 从队列获取任务 # print('1') # input, input_size, high_visual_quality = prediction_queue.get() # # 执行模型预测 # print('2') # results = model(input, device=device, retina_masks=True, iou=0.7, conf=0.25, imgsz=input_size) # print('3') # fig = fast_process(annotations=results[0].masks.data, # image=input, high_quality=high_visual_quality, device=device) # print('4') # # 将结果放回队列 # prediction_queue.put(fig) # # 在一个新的线程中启动工作函数 # threading.Thread(target=worker).start() # # 将耗时的函数包装在另一个函数中,用于控制队列和线程同步 # def process_request(): # while True: # if not request_queue.empty(): # # 如果请求队列不为空,则处理该请求 # try: # lock.put(1) # 加锁,防止同时处理多个请求 # input, input_size, high_visual_quality = request_queue.get() # fig = predict(input, input_size, high_visual_quality) # request_queue.task_done() # 请求处理结束,移除请求 # lock.get() # 解锁 # yield fig # 返回预测结果 # except: # lock.get() # 出错时也需要解锁 # else: # # 如果请求队列为空,则等待新的请求到达 # time.sleep(1) # input_size=1024 # high_quality_visual=True # inp = 'assets/sa_192.jpg' # input = Image.open(inp) # device = 'cuda' if torch.cuda.is_available() else 'cpu' # input_size = int(input_size) # 确保 imgsz 是整数 # results = model(input, device=device, retina_masks=True, iou=0.7, conf=0.25, imgsz=input_size) # pil_image = fast_process(annotations=results[0].masks.data, # image=input, high_quality=high_quality_visual, device=device) app_interface = gr.Interface(fn=predict, inputs=[gr.components.Image(type='pil'), gr.components.Slider(minimum=512, maximum=1024, value=1024, step=64, label='input_size'), gr.components.Checkbox(value=False, label='high_visual_quality')], outputs=['plot'], examples=[["assets/sa_8776.jpg", 1024, True]], # # ["assets/sa_1309.jpg", 1024]], # examples=[["assets/sa_192.jpg"], ["assets/sa_414.jpg"], # ["assets/sa_561.jpg"], ["assets/sa_862.jpg"], # ["assets/sa_1309.jpg"], ["assets/sa_8776.jpg"], # ["assets/sa_10039.jpg"], ["assets/sa_11025.jpg"],], cache_examples=True, title="Fast Segment Anything (Everything mode)" ) # # 定义一个请求处理函数,将请求添加到队列中 # def handle_request(value): # try: # request_queue.put_nowait(value) # 添加请求到队列 # except: # return "当前队列已满,请稍后再试!" # return None # # 添加请求处理函数到应用程序界面 # app_interface.call_function() app_interface.queue(concurrency_count=1, max_size=20) app_interface.launch()