""" utils.py This module contains utility functions for: - Loading and processing images - Object detection with YOLO - OCR with EasyOCR / PaddleOCR - Image annotation and bounding box manipulation - Captioning / semantic parsing of detected icons """ import os import io import base64 import time import json import sys import re from typing import Tuple, List import torch import numpy as np import cv2 from PIL import Image, ImageDraw, ImageFont from matplotlib import pyplot as plt import easyocr from paddleocr import PaddleOCR import supervision as sv import torchvision.transforms as T from torchvision.transforms import ToPILImage from torchvision.ops import box_convert # Optional: import AzureOpenAI if used from openai import AzureOpenAI # Initialize OCR readers reader = easyocr.Reader(['en']) paddle_ocr = PaddleOCR( lang='en', # other languages available use_angle_cls=False, use_gpu=False, # using cuda might conflict with PyTorch in the same process show_log=False, max_batch_size=1024, use_dilation=True, # improves accuracy det_db_score_mode='slow', # improves accuracy rec_batch_num=1024 ) def get_caption_model_processor(model_name, model_name_or_path="Salesforce/blip2-opt-2.7b", device=None): """ Loads the captioning model and processor. Supports either BLIP2 or Florence-2 models. """ if not device: device = "cuda" if torch.cuda.is_available() else "cpu" if model_name == "blip2": from transformers import Blip2Processor, Blip2ForConditionalGeneration processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b") if device == 'cpu': model = Blip2ForConditionalGeneration.from_pretrained( model_name_or_path, device_map=None, torch_dtype=torch.float32 ) else: model = Blip2ForConditionalGeneration.from_pretrained( model_name_or_path, device_map=None, torch_dtype=torch.float16 ).to(device) elif model_name == "florence2": from transformers import AutoProcessor, AutoModelForCausalLM processor = AutoProcessor.from_pretrained("microsoft/Florence-2-base", trust_remote_code=True) if device == 'cpu': model = AutoModelForCausalLM.from_pretrained( model_name_or_path, torch_dtype=torch.float32, trust_remote_code=True ) else: model = AutoModelForCausalLM.from_pretrained( model_name_or_path, torch_dtype=torch.float16, trust_remote_code=True ).to(device) return {'model': model.to(device), 'processor': processor} def get_yolo_model(model_path): """ Loads a YOLO model from a given model_path using ultralytics. """ from ultralytics import YOLO model = YOLO(model_path) return model @torch.inference_mode() def get_parsed_content_icon(filtered_boxes, starting_idx, image_source, caption_model_processor, prompt=None, batch_size=32): # Ensure batch_size is an integer if batch_size is None: batch_size = 32 to_pil = ToPILImage() if starting_idx: non_ocr_boxes = filtered_boxes[starting_idx:] else: non_ocr_boxes = filtered_boxes cropped_pil_images = [] for coord in non_ocr_boxes: xmin, xmax = int(coord[0] * image_source.shape[1]), int(coord[2] * image_source.shape[1]) ymin, ymax = int(coord[1] * image_source.shape[0]), int(coord[3] * image_source.shape[0]) cropped_image = image_source[ymin:ymax, xmin:xmax, :] cropped_pil_images.append(to_pil(cropped_image)) model, processor = caption_model_processor['model'], caption_model_processor['processor'] if not prompt: if 'florence' in model.config.name_or_path: prompt = "" else: prompt = "The image shows" generated_texts = [] device = model.device for i in range(0, len(cropped_pil_images), batch_size): batch = cropped_pil_images[i:i + batch_size] if model.device.type == 'cuda': inputs = processor(images=batch, text=[prompt] * len(batch), return_tensors="pt").to(device=device, dtype=torch.float16) else: inputs = processor(images=batch, text=[prompt] * len(batch), return_tensors="pt").to(device=device) if 'florence' in model.config.name_or_path: generated_ids = model.generate( input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"], max_new_tokens=100, num_beams=3, do_sample=False ) else: generated_ids = model.generate( **inputs, max_length=100, num_beams=5, no_repeat_ngram_size=2, early_stopping=True, num_return_sequences=1 ) generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True) generated_text = [gen.strip() for gen in generated_text] generated_texts.extend(generated_text) return generated_texts def get_parsed_content_icon_phi3v(filtered_boxes, ocr_bbox, image_source, caption_model_processor): """ Generates parsed textual content for detected icons using the phi3_v model variant. """ to_pil = ToPILImage() if ocr_bbox: non_ocr_boxes = filtered_boxes[len(ocr_bbox):] else: non_ocr_boxes = filtered_boxes cropped_pil_images = [] for coord in non_ocr_boxes: xmin, xmax = int(coord[0] * image_source.shape[1]), int(coord[2] * image_source.shape[1]) ymin, ymax = int(coord[1] * image_source.shape[0]), int(coord[3] * image_source.shape[0]) cropped_image = image_source[ymin:ymax, xmin:xmax, :] cropped_pil_images.append(to_pil(cropped_image)) model, processor = caption_model_processor['model'], caption_model_processor['processor'] device = model.device messages = [{"role": "user", "content": "<|image_1|>\ndescribe the icon in one sentence"}] prompt = processor.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) batch_size = 5 # Number of samples per batch generated_texts = [] for i in range(0, len(cropped_pil_images), batch_size): images = cropped_pil_images[i:i+batch_size] image_inputs = [processor.image_processor(x, return_tensors="pt") for x in images] inputs = {'input_ids': [], 'attention_mask': [], 'pixel_values': [], 'image_sizes': []} texts = [prompt] * len(images) for idx, txt in enumerate(texts): inp = processor._convert_images_texts_to_inputs(image_inputs[idx], txt, return_tensors="pt") inputs['input_ids'].append(inp['input_ids']) inputs['attention_mask'].append(inp['attention_mask']) inputs['pixel_values'].append(inp['pixel_values']) inputs['image_sizes'].append(inp['image_sizes']) max_len = max(x.shape[1] for x in inputs['input_ids']) for idx, v in enumerate(inputs['input_ids']): pad_tensor = processor.tokenizer.pad_token_id * torch.ones(1, max_len - v.shape[1], dtype=torch.long) inputs['input_ids'][idx] = torch.cat([pad_tensor, v], dim=1) pad_att = torch.zeros(1, max_len - v.shape[1], dtype=torch.long) inputs['attention_mask'][idx] = torch.cat([pad_att, inputs['attention_mask'][idx]], dim=1) inputs_cat = {k: torch.concatenate(v).to(device) for k, v in inputs.items()} generation_args = { "max_new_tokens": 25, "temperature": 0.01, "do_sample": False, } generate_ids = model.generate(**inputs_cat, eos_token_id=processor.tokenizer.eos_token_id, **generation_args) # Remove input tokens from the generated sequence generate_ids = generate_ids[:, inputs_cat['input_ids'].shape[1]:] response = processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False) response = [res.strip('\n').strip() for res in response] generated_texts.extend(response) return generated_texts def remove_overlap(boxes, iou_threshold, ocr_bbox=None): """ Removes overlapping bounding boxes based on IoU and optionally considers OCR boxes. Args: boxes: Tensor of bounding boxes (in xyxy format). iou_threshold: IoU threshold to determine overlaps. ocr_bbox: Optional list of OCR bounding boxes. Returns: Filtered boxes as a torch.Tensor. """ assert ocr_bbox is None or isinstance(ocr_bbox, List) def box_area(box): return (box[2] - box[0]) * (box[3] - box[1]) def intersection_area(box1, box2): x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) return max(0, x2 - x1) * max(0, y2 - y1) def IoU(box1, box2): inter = intersection_area(box1, box2) union = box_area(box1) + box_area(box2) - inter + 1e-6 ratio1 = inter / box_area(box1) if box_area(box1) > 0 else 0 ratio2 = inter / box_area(box2) if box_area(box2) > 0 else 0 return max(inter / union, ratio1, ratio2) def is_inside(box1, box2): inter = intersection_area(box1, box2) return (inter / box_area(box1)) > 0.95 boxes = boxes.tolist() filtered_boxes = [] if ocr_bbox: filtered_boxes.extend(ocr_bbox) for i, box1 in enumerate(boxes): is_valid_box = True for j, box2 in enumerate(boxes): if i != j and IoU(box1, box2) > iou_threshold and box_area(box1) > box_area(box2): is_valid_box = False break if is_valid_box: if ocr_bbox: # Only add the box if it does not overlap with any OCR box if not any(IoU(box1, box3) > iou_threshold and not is_inside(box1, box3) for box3 in ocr_bbox): filtered_boxes.append(box1) else: filtered_boxes.append(box1) return torch.tensor(filtered_boxes) def remove_overlap_new(boxes, iou_threshold, ocr_bbox=None): """ Removes overlapping boxes with OCR priority. Args: boxes: List of dictionaries, each with keys: 'type', 'bbox', 'interactivity', 'content'. iou_threshold: IoU threshold for removal. ocr_bbox: List of OCR box dictionaries. Returns: A list of filtered box dictionaries. """ assert ocr_bbox is None or isinstance(ocr_bbox, List) def box_area(box): return (box[2] - box[0]) * (box[3] - box[1]) def intersection_area(box1, box2): x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) return max(0, x2 - x1) * max(0, y2 - y1) def IoU(box1, box2): inter = intersection_area(box1, box2) union = box_area(box1) + box_area(box2) - inter + 1e-6 ratio1 = inter / box_area(box1) if box_area(box1) > 0 else 0 ratio2 = inter / box_area(box2) if box_area(box2) > 0 else 0 return max(inter / union, ratio1, ratio2) def is_inside(box1, box2): inter = intersection_area(box1, box2) return (inter / box_area(box1)) > 0.80 filtered_boxes = [] if ocr_bbox: filtered_boxes.extend(ocr_bbox) for i, box1_elem in enumerate(boxes): box1 = box1_elem['bbox'] is_valid_box = True for j, box2_elem in enumerate(boxes): box2 = box2_elem['bbox'] if i != j and IoU(box1, box2) > iou_threshold and box_area(box1) > box_area(box2): is_valid_box = False break if is_valid_box: if ocr_bbox: box_added = False for box3_elem in ocr_bbox: box3 = box3_elem['bbox'] if is_inside(box3, box1): try: filtered_boxes.append({ 'type': 'text', 'bbox': box1_elem['bbox'], 'interactivity': True, 'content': box3_elem['content'] }) filtered_boxes.remove(box3_elem) except Exception: continue elif is_inside(box1, box3): box_added = True break if not box_added: filtered_boxes.append({ 'type': 'icon', 'bbox': box1_elem['bbox'], 'interactivity': True, 'content': None }) else: filtered_boxes.append(box1) return filtered_boxes # Optionally, you could return torch.tensor(filtered_boxes) if needed def load_image(image_path: str) -> Tuple[np.array, torch.Tensor]: """ Loads an image and applies transformations. Returns: image: Original image as a NumPy array. image_transformed: Transformed tensor. """ transform = T.Compose([ T.RandomResize([800], max_size=1333), T.ToTensor(), T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), ]) image_source = Image.open(image_path).convert("RGB") image = np.asarray(image_source) image_transformed, _ = transform(image_source, None) return image, image_transformed def annotate(image_source: np.ndarray, boxes: torch.Tensor, logits: torch.Tensor, phrases: List[str], text_scale: float, text_padding=5, text_thickness=2, thickness=3) -> Tuple[np.ndarray, dict]: """ Annotates an image with bounding boxes and labels. """ # Validate phrases input phrases = [str(phrase) if not isinstance(phrase, str) else phrase for phrase in phrases] h, w, _ = image_source.shape boxes = boxes * torch.Tensor([w, h, w, h]) xyxy = box_convert(boxes=boxes, in_fmt="cxcywh", out_fmt="xyxy").numpy() xywh = box_convert(boxes=boxes, in_fmt="cxcywh", out_fmt="xywh").numpy() detections = sv.Detections(xyxy=xyxy) labels = [f"{phrase}" for phrase in phrases] from util.box_annotator import BoxAnnotator box_annotator = BoxAnnotator(text_scale=text_scale, text_padding=text_padding, text_thickness=text_thickness, thickness=thickness) annotated_frame = image_source.copy() annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels, image_size=(w, h)) label_coordinates = {f"{phrase}": v for phrase, v in zip(phrases, xywh)} return annotated_frame, label_coordinates def predict(model, image, caption, box_threshold, text_threshold): """ Uses a Hugging Face model to perform grounded object detection. Args: model: Dictionary with 'model' and 'processor'. image: Input PIL image. caption: Caption text. box_threshold: Confidence threshold for boxes. text_threshold: Threshold for text detection. Returns: boxes, logits, phrases from the detection. """ model_obj, processor = model['model'], model['processor'] device = model_obj.device inputs = processor(images=image, text=caption, return_tensors="pt").to(device) with torch.no_grad(): outputs = model_obj(**inputs) results = processor.post_process_grounded_object_detection( outputs, inputs.input_ids, box_threshold=box_threshold, text_threshold=text_threshold, target_sizes=[image.size[::-1]] )[0] boxes, logits, phrases = results["boxes"], results["scores"], results["labels"] return boxes, logits, phrases def predict_yolo(model, image_path, box_threshold, imgsz, scale_img, iou_threshold=0.7): """ Uses a YOLO model for object detection. Args: model: YOLO model instance. image_path: Path to the image. box_threshold: Confidence threshold. imgsz: Image size for scaling (if scale_img is True). scale_img: Boolean flag to scale the image. iou_threshold: IoU threshold for non-max suppression. Returns: Bounding boxes, confidence scores, and placeholder phrases. """ kwargs = { 'conf': box_threshold, # Confidence threshold 'iou': iou_threshold, # IoU threshold 'verbose': False } if scale_img: kwargs['imgsz'] = imgsz results = model.predict(image_path, **kwargs) boxes = results[0].boxes.xyxy conf = results[0].boxes.conf return boxes, conf, [str(i) for i in range(len(boxes))] def get_som_labeled_img(img_path, model=None, BOX_TRESHOLD=0.01, output_coord_in_ratio=False, ocr_bbox=None, text_scale=0.4, text_padding=5, draw_bbox_config=None, caption_model_processor=None, ocr_text=[], use_local_semantics=True, iou_threshold=0.9, prompt=None, scale_img=False, imgsz=None, batch_size=None): """ Processes an image to generate semantic (SOM) labels. Args: img_path: Path to the image. model: YOLO model for detection. BOX_TRESHOLD: Confidence threshold for box prediction. output_coord_in_ratio: If True, output coordinates in ratio. ocr_bbox: OCR bounding boxes. text_scale, text_padding: Parameters for drawing annotations. draw_bbox_config: Custom configuration for bounding box drawing. caption_model_processor: Dictionary with caption model and processor. ocr_text: List of OCR-detected texts. use_local_semantics: Whether to use local semantic processing. iou_threshold: IoU threshold for filtering overlaps. prompt: Optional caption prompt. scale_img: Whether to scale the image. imgsz: Image size for YOLO. batch_size: Batch size for captioning. Returns: Encoded annotated image, label coordinates, and filtered boxes. """ image_source = Image.open(img_path).convert("RGB") w, h = image_source.size if not imgsz: imgsz = (h, w) # Run YOLO detection xyxy, logits, phrases = predict_yolo( model=model, image_path=img_path, box_threshold=BOX_TRESHOLD, imgsz=imgsz, scale_img=scale_img, iou_threshold=0.1 ) xyxy = xyxy / torch.Tensor([w, h, w, h]).to(xyxy.device) image_source_np = np.asarray(image_source) phrases = [str(i) for i in range(len(phrases))] # Process OCR bounding boxes (if any) if ocr_bbox: ocr_bbox = torch.tensor(ocr_bbox) / torch.Tensor([w, h, w, h]) ocr_bbox = ocr_bbox.tolist() else: print('no ocr bbox!!!') ocr_bbox = None ocr_bbox_elem = [{'type': 'text', 'bbox': box, 'interactivity': False, 'content': txt} for box, txt in zip(ocr_bbox, ocr_text)] xyxy_elem = [{'type': 'icon', 'bbox': box, 'interactivity': True, 'content': None} for box in xyxy.tolist()] filtered_boxes = remove_overlap_new(boxes=xyxy_elem, iou_threshold=iou_threshold, ocr_bbox=ocr_bbox_elem) # Sort filtered boxes so that boxes with 'content' == None are at the end filtered_boxes_elem = sorted(filtered_boxes, key=lambda x: x['content'] is None) starting_idx = next((i for i, box in enumerate(filtered_boxes_elem) if box['content'] is None), -1) filtered_boxes_tensor = torch.tensor([box['bbox'] for box in filtered_boxes_elem]) if batch_size is None: batch_size = 32 # Generate parsed icon semantics if required if use_local_semantics: caption_model = caption_model_processor['model'] if 'phi3_v' in caption_model.config.model_type: parsed_content_icon = get_parsed_content_icon_phi3v(filtered_boxes_tensor, ocr_bbox, image_source_np, caption_model_processor) else: parsed_content_icon = get_parsed_content_icon(filtered_boxes_tensor, starting_idx, image_source_np, caption_model_processor, prompt=prompt, batch_size=batch_size) ocr_text = [f"Text Box ID {i}: {txt}" for i, txt in enumerate(ocr_text)] icon_start = len(ocr_text) parsed_content_icon_ls = [] # Fill boxes with no OCR content with parsed icon content for box in filtered_boxes_elem: if box['content'] is None and parsed_content_icon: box['content'] = parsed_content_icon.pop(0) for i, txt in enumerate(parsed_content_icon): parsed_content_icon_ls.append(f"Icon Box ID {str(i+icon_start)}: {txt}") parsed_content_merged = ocr_text + parsed_content_icon_ls else: ocr_text = [f"Text Box ID {i}: {txt}" for i, txt in enumerate(ocr_text)] parsed_content_merged = ocr_text filtered_boxes_cxcywh = box_convert(boxes=filtered_boxes_tensor, in_fmt="xyxy", out_fmt="cxcywh") phrases = [i for i in range(len(filtered_boxes_cxcywh))] # Annotate image with bounding boxes and labels if draw_bbox_config: annotated_frame, label_coordinates = annotate( image_source=image_source_np, boxes=filtered_boxes_cxcywh, logits=logits, phrases=phrases, **draw_bbox_config ) else: annotated_frame, label_coordinates = annotate( image_source=image_source_np, boxes=filtered_boxes_cxcywh, logits=logits, phrases=phrases, text_scale=text_scale, text_padding=text_padding ) pil_img = Image.fromarray(annotated_frame) buffered = io.BytesIO() pil_img.save(buffered, format="PNG") encoded_image = base64.b64encode(buffered.getvalue()).decode('ascii') if output_coord_in_ratio: label_coordinates = {k: [v[0] / w, v[1] / h, v[2] / w, v[3] / h] for k, v in label_coordinates.items()} assert w == annotated_frame.shape[1] and h == annotated_frame.shape[0] return encoded_image, label_coordinates, filtered_boxes_elem def get_xywh(input): """ Converts a bounding box from a list of two points into (x, y, width, height). """ x, y = input[0][0], input[0][1] w = input[2][0] - input[0][0] h = input[2][1] - input[0][1] return int(x), int(y), int(w), int(h) def get_xyxy(input): """ Converts a bounding box from a list of two points into (x, y, x2, y2). """ x, y = input[0][0], input[0][1] x2, y2 = input[2][0], input[2][1] return int(x), int(y), int(x2), int(y2) def get_xywh_yolo(input): """ Converts a YOLO-style bounding box (x1, y1, x2, y2) into (x, y, width, height). """ x, y = input[0], input[1] w = input[2] - input[0] h = input[3] - input[1] return int(x), int(y), int(w), int(h) def check_ocr_box(image_path, display_img=True, output_bb_format='xywh', goal_filtering=None, easyocr_args=None, use_paddleocr=False): """ Runs OCR on the given image using PaddleOCR or EasyOCR and optionally displays annotated results. Returns: A tuple containing: - A tuple (text, bounding boxes) - The goal_filtering parameter (unchanged) """ if use_paddleocr: text_threshold = 0.5 if easyocr_args is None else easyocr_args.get('text_threshold', 0.5) result = paddle_ocr.ocr(image_path, cls=False)[0] conf = [item[1] for item in result] coord = [item[0] for item in result if item[1][1] > text_threshold] text = [item[1][0] for item in result if item[1][1] > text_threshold] else: # EasyOCR if easyocr_args is None: easyocr_args = {} result = reader.readtext(image_path, **easyocr_args) coord = [item[0] for item in result] text = [item[1] for item in result] if display_img: opencv_img = cv2.imread(image_path) opencv_img = cv2.cvtColor(opencv_img, cv2.COLOR_RGB2BGR) bb = [] for item in coord: x, y, a, b = get_xywh(item) bb.append((x, y, a, b)) cv2.rectangle(opencv_img, (x, y), (x + a, y + b), (0, 255, 0), 2) plt.imshow(opencv_img) else: if output_bb_format == 'xywh': bb = [get_xywh(item) for item in coord] elif output_bb_format == 'xyxy': bb = [get_xyxy(item) for item in coord] return (text, bb), goal_filtering