"""
utils.py
This module contains utility functions for:
- Loading and processing images
- Object detection with YOLO
- OCR with EasyOCR / PaddleOCR
- Image annotation and bounding box manipulation
- Captioning / semantic parsing of detected icons
"""
import os
import io
import base64
import time
import json
import sys
import re
from typing import Tuple, List
import torch
import numpy as np
import cv2
from PIL import Image, ImageDraw, ImageFont
from matplotlib import pyplot as plt
import easyocr
from paddleocr import PaddleOCR
import supervision as sv
import torchvision.transforms as T
from torchvision.transforms import ToPILImage
from torchvision.ops import box_convert
# Optional: import AzureOpenAI if used
from openai import AzureOpenAI
# Initialize OCR readers
reader = easyocr.Reader(['en'])
paddle_ocr = PaddleOCR(
lang='en', # other languages available
use_angle_cls=False,
use_gpu=False, # using cuda might conflict with PyTorch in the same process
show_log=False,
max_batch_size=1024,
use_dilation=True, # improves accuracy
det_db_score_mode='slow', # improves accuracy
rec_batch_num=1024
)
def get_caption_model_processor(model_name, model_name_or_path="Salesforce/blip2-opt-2.7b", device=None):
"""
Loads the captioning model and processor.
Supports either BLIP2 or Florence-2 models.
"""
if not device:
device = "cuda" if torch.cuda.is_available() else "cpu"
if model_name == "blip2":
from transformers import Blip2Processor, Blip2ForConditionalGeneration
processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
if device == 'cpu':
model = Blip2ForConditionalGeneration.from_pretrained(
model_name_or_path, device_map=None, torch_dtype=torch.float32
)
else:
model = Blip2ForConditionalGeneration.from_pretrained(
model_name_or_path, device_map=None, torch_dtype=torch.float16
).to(device)
elif model_name == "florence2":
from transformers import AutoProcessor, AutoModelForCausalLM
processor = AutoProcessor.from_pretrained("microsoft/Florence-2-base", trust_remote_code=True)
if device == 'cpu':
model = AutoModelForCausalLM.from_pretrained(
model_name_or_path, torch_dtype=torch.float32, trust_remote_code=True
)
else:
model = AutoModelForCausalLM.from_pretrained(
model_name_or_path, torch_dtype=torch.float16, trust_remote_code=True
).to(device)
return {'model': model.to(device), 'processor': processor}
def get_yolo_model(model_path):
"""
Loads a YOLO model from a given model_path using ultralytics.
"""
from ultralytics import YOLO
model = YOLO(model_path)
return model
@torch.inference_mode()
def get_parsed_content_icon(filtered_boxes, starting_idx, image_source, caption_model_processor, prompt=None, batch_size=32):
# Ensure batch_size is an integer
if batch_size is None:
batch_size = 32
to_pil = ToPILImage()
if starting_idx:
non_ocr_boxes = filtered_boxes[starting_idx:]
else:
non_ocr_boxes = filtered_boxes
cropped_pil_images = []
for coord in non_ocr_boxes:
xmin, xmax = int(coord[0] * image_source.shape[1]), int(coord[2] * image_source.shape[1])
ymin, ymax = int(coord[1] * image_source.shape[0]), int(coord[3] * image_source.shape[0])
cropped_image = image_source[ymin:ymax, xmin:xmax, :]
cropped_pil_images.append(to_pil(cropped_image))
model, processor = caption_model_processor['model'], caption_model_processor['processor']
if not prompt:
if 'florence' in model.config.name_or_path:
prompt = "
"
else:
prompt = "The image shows"
generated_texts = []
device = model.device
for i in range(0, len(cropped_pil_images), batch_size):
batch = cropped_pil_images[i:i + batch_size]
if model.device.type == 'cuda':
inputs = processor(images=batch, text=[prompt] * len(batch), return_tensors="pt").to(device=device, dtype=torch.float16)
else:
inputs = processor(images=batch, text=[prompt] * len(batch), return_tensors="pt").to(device=device)
if 'florence' in model.config.name_or_path:
generated_ids = model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
max_new_tokens=100,
num_beams=3,
do_sample=False
)
else:
generated_ids = model.generate(
**inputs,
max_length=100,
num_beams=5,
no_repeat_ngram_size=2,
early_stopping=True,
num_return_sequences=1
)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)
generated_text = [gen.strip() for gen in generated_text]
generated_texts.extend(generated_text)
return generated_texts
def get_parsed_content_icon_phi3v(filtered_boxes, ocr_bbox, image_source, caption_model_processor):
"""
Generates parsed textual content for detected icons using the phi3_v model variant.
"""
to_pil = ToPILImage()
if ocr_bbox:
non_ocr_boxes = filtered_boxes[len(ocr_bbox):]
else:
non_ocr_boxes = filtered_boxes
cropped_pil_images = []
for coord in non_ocr_boxes:
xmin, xmax = int(coord[0] * image_source.shape[1]), int(coord[2] * image_source.shape[1])
ymin, ymax = int(coord[1] * image_source.shape[0]), int(coord[3] * image_source.shape[0])
cropped_image = image_source[ymin:ymax, xmin:xmax, :]
cropped_pil_images.append(to_pil(cropped_image))
model, processor = caption_model_processor['model'], caption_model_processor['processor']
device = model.device
messages = [{"role": "user", "content": "<|image_1|>\ndescribe the icon in one sentence"}]
prompt = processor.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
batch_size = 5 # Number of samples per batch
generated_texts = []
for i in range(0, len(cropped_pil_images), batch_size):
images = cropped_pil_images[i:i+batch_size]
image_inputs = [processor.image_processor(x, return_tensors="pt") for x in images]
inputs = {'input_ids': [], 'attention_mask': [], 'pixel_values': [], 'image_sizes': []}
texts = [prompt] * len(images)
for idx, txt in enumerate(texts):
inp = processor._convert_images_texts_to_inputs(image_inputs[idx], txt, return_tensors="pt")
inputs['input_ids'].append(inp['input_ids'])
inputs['attention_mask'].append(inp['attention_mask'])
inputs['pixel_values'].append(inp['pixel_values'])
inputs['image_sizes'].append(inp['image_sizes'])
max_len = max(x.shape[1] for x in inputs['input_ids'])
for idx, v in enumerate(inputs['input_ids']):
pad_tensor = processor.tokenizer.pad_token_id * torch.ones(1, max_len - v.shape[1], dtype=torch.long)
inputs['input_ids'][idx] = torch.cat([pad_tensor, v], dim=1)
pad_att = torch.zeros(1, max_len - v.shape[1], dtype=torch.long)
inputs['attention_mask'][idx] = torch.cat([pad_att, inputs['attention_mask'][idx]], dim=1)
inputs_cat = {k: torch.concatenate(v).to(device) for k, v in inputs.items()}
generation_args = {
"max_new_tokens": 25,
"temperature": 0.01,
"do_sample": False,
}
generate_ids = model.generate(**inputs_cat, eos_token_id=processor.tokenizer.eos_token_id, **generation_args)
# Remove input tokens from the generated sequence
generate_ids = generate_ids[:, inputs_cat['input_ids'].shape[1]:]
response = processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)
response = [res.strip('\n').strip() for res in response]
generated_texts.extend(response)
return generated_texts
def remove_overlap(boxes, iou_threshold, ocr_bbox=None):
"""
Removes overlapping bounding boxes based on IoU and optionally considers OCR boxes.
Args:
boxes: Tensor of bounding boxes (in xyxy format).
iou_threshold: IoU threshold to determine overlaps.
ocr_bbox: Optional list of OCR bounding boxes.
Returns:
Filtered boxes as a torch.Tensor.
"""
assert ocr_bbox is None or isinstance(ocr_bbox, List)
def box_area(box):
return (box[2] - box[0]) * (box[3] - box[1])
def intersection_area(box1, box2):
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
return max(0, x2 - x1) * max(0, y2 - y1)
def IoU(box1, box2):
inter = intersection_area(box1, box2)
union = box_area(box1) + box_area(box2) - inter + 1e-6
ratio1 = inter / box_area(box1) if box_area(box1) > 0 else 0
ratio2 = inter / box_area(box2) if box_area(box2) > 0 else 0
return max(inter / union, ratio1, ratio2)
def is_inside(box1, box2):
inter = intersection_area(box1, box2)
return (inter / box_area(box1)) > 0.95
boxes = boxes.tolist()
filtered_boxes = []
if ocr_bbox:
filtered_boxes.extend(ocr_bbox)
for i, box1 in enumerate(boxes):
is_valid_box = True
for j, box2 in enumerate(boxes):
if i != j and IoU(box1, box2) > iou_threshold and box_area(box1) > box_area(box2):
is_valid_box = False
break
if is_valid_box:
if ocr_bbox:
# Only add the box if it does not overlap with any OCR box
if not any(IoU(box1, box3) > iou_threshold and not is_inside(box1, box3) for box3 in ocr_bbox):
filtered_boxes.append(box1)
else:
filtered_boxes.append(box1)
return torch.tensor(filtered_boxes)
def remove_overlap_new(boxes, iou_threshold, ocr_bbox=None):
"""
Removes overlapping boxes with OCR priority.
Args:
boxes: List of dictionaries, each with keys: 'type', 'bbox', 'interactivity', 'content'.
iou_threshold: IoU threshold for removal.
ocr_bbox: List of OCR box dictionaries.
Returns:
A list of filtered box dictionaries.
"""
assert ocr_bbox is None or isinstance(ocr_bbox, List)
def box_area(box):
return (box[2] - box[0]) * (box[3] - box[1])
def intersection_area(box1, box2):
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
return max(0, x2 - x1) * max(0, y2 - y1)
def IoU(box1, box2):
inter = intersection_area(box1, box2)
union = box_area(box1) + box_area(box2) - inter + 1e-6
ratio1 = inter / box_area(box1) if box_area(box1) > 0 else 0
ratio2 = inter / box_area(box2) if box_area(box2) > 0 else 0
return max(inter / union, ratio1, ratio2)
def is_inside(box1, box2):
inter = intersection_area(box1, box2)
return (inter / box_area(box1)) > 0.80
filtered_boxes = []
if ocr_bbox:
filtered_boxes.extend(ocr_bbox)
for i, box1_elem in enumerate(boxes):
box1 = box1_elem['bbox']
is_valid_box = True
for j, box2_elem in enumerate(boxes):
box2 = box2_elem['bbox']
if i != j and IoU(box1, box2) > iou_threshold and box_area(box1) > box_area(box2):
is_valid_box = False
break
if is_valid_box:
if ocr_bbox:
box_added = False
for box3_elem in ocr_bbox:
box3 = box3_elem['bbox']
if is_inside(box3, box1):
try:
filtered_boxes.append({
'type': 'text',
'bbox': box1_elem['bbox'],
'interactivity': True,
'content': box3_elem['content']
})
filtered_boxes.remove(box3_elem)
except Exception:
continue
elif is_inside(box1, box3):
box_added = True
break
if not box_added:
filtered_boxes.append({
'type': 'icon',
'bbox': box1_elem['bbox'],
'interactivity': True,
'content': None
})
else:
filtered_boxes.append(box1)
return filtered_boxes # Optionally, you could return torch.tensor(filtered_boxes) if needed
def load_image(image_path: str) -> Tuple[np.array, torch.Tensor]:
"""
Loads an image and applies transformations.
Returns:
image: Original image as a NumPy array.
image_transformed: Transformed tensor.
"""
transform = T.Compose([
T.RandomResize([800], max_size=1333),
T.ToTensor(),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
image_source = Image.open(image_path).convert("RGB")
image = np.asarray(image_source)
image_transformed, _ = transform(image_source, None)
return image, image_transformed
def annotate(image_source: np.ndarray, boxes: torch.Tensor, logits: torch.Tensor, phrases: List[str],
text_scale: float, text_padding=5, text_thickness=2, thickness=3) -> Tuple[np.ndarray, dict]:
"""
Annotates an image with bounding boxes and labels.
"""
# Validate phrases input
phrases = [str(phrase) if not isinstance(phrase, str) else phrase for phrase in phrases]
h, w, _ = image_source.shape
boxes = boxes * torch.Tensor([w, h, w, h])
xyxy = box_convert(boxes=boxes, in_fmt="cxcywh", out_fmt="xyxy").numpy()
xywh = box_convert(boxes=boxes, in_fmt="cxcywh", out_fmt="xywh").numpy()
detections = sv.Detections(xyxy=xyxy)
labels = [f"{phrase}" for phrase in phrases]
from util.box_annotator import BoxAnnotator
box_annotator = BoxAnnotator(text_scale=text_scale, text_padding=text_padding,
text_thickness=text_thickness, thickness=thickness)
annotated_frame = image_source.copy()
annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels, image_size=(w, h))
label_coordinates = {f"{phrase}": v for phrase, v in zip(phrases, xywh)}
return annotated_frame, label_coordinates
def predict(model, image, caption, box_threshold, text_threshold):
"""
Uses a Hugging Face model to perform grounded object detection.
Args:
model: Dictionary with 'model' and 'processor'.
image: Input PIL image.
caption: Caption text.
box_threshold: Confidence threshold for boxes.
text_threshold: Threshold for text detection.
Returns:
boxes, logits, phrases from the detection.
"""
model_obj, processor = model['model'], model['processor']
device = model_obj.device
inputs = processor(images=image, text=caption, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model_obj(**inputs)
results = processor.post_process_grounded_object_detection(
outputs,
inputs.input_ids,
box_threshold=box_threshold,
text_threshold=text_threshold,
target_sizes=[image.size[::-1]]
)[0]
boxes, logits, phrases = results["boxes"], results["scores"], results["labels"]
return boxes, logits, phrases
def predict_yolo(model, image_path, box_threshold, imgsz, scale_img, iou_threshold=0.7):
"""
Uses a YOLO model for object detection.
Args:
model: YOLO model instance.
image_path: Path to the image.
box_threshold: Confidence threshold.
imgsz: Image size for scaling (if scale_img is True).
scale_img: Boolean flag to scale the image.
iou_threshold: IoU threshold for non-max suppression.
Returns:
Bounding boxes, confidence scores, and placeholder phrases.
"""
kwargs = {
'conf': box_threshold, # Confidence threshold
'iou': iou_threshold, # IoU threshold
'verbose': False
}
if scale_img:
kwargs['imgsz'] = imgsz
results = model.predict(image_path, **kwargs)
boxes = results[0].boxes.xyxy
conf = results[0].boxes.conf
return boxes, conf, [str(i) for i in range(len(boxes))]
def get_som_labeled_img(img_path, model=None, BOX_TRESHOLD=0.01, output_coord_in_ratio=False, ocr_bbox=None,
text_scale=0.4, text_padding=5, draw_bbox_config=None, caption_model_processor=None,
ocr_text=[], use_local_semantics=True, iou_threshold=0.9, prompt=None, scale_img=False,
imgsz=None, batch_size=None):
"""
Processes an image to generate semantic (SOM) labels.
Args:
img_path: Path to the image.
model: YOLO model for detection.
BOX_TRESHOLD: Confidence threshold for box prediction.
output_coord_in_ratio: If True, output coordinates in ratio.
ocr_bbox: OCR bounding boxes.
text_scale, text_padding: Parameters for drawing annotations.
draw_bbox_config: Custom configuration for bounding box drawing.
caption_model_processor: Dictionary with caption model and processor.
ocr_text: List of OCR-detected texts.
use_local_semantics: Whether to use local semantic processing.
iou_threshold: IoU threshold for filtering overlaps.
prompt: Optional caption prompt.
scale_img: Whether to scale the image.
imgsz: Image size for YOLO.
batch_size: Batch size for captioning.
Returns:
Encoded annotated image, label coordinates, and filtered boxes.
"""
image_source = Image.open(img_path).convert("RGB")
w, h = image_source.size
if not imgsz:
imgsz = (h, w)
# Run YOLO detection
xyxy, logits, phrases = predict_yolo(
model=model, image_path=img_path, box_threshold=BOX_TRESHOLD,
imgsz=imgsz, scale_img=scale_img, iou_threshold=0.1
)
xyxy = xyxy / torch.Tensor([w, h, w, h]).to(xyxy.device)
image_source_np = np.asarray(image_source)
phrases = [str(i) for i in range(len(phrases))]
# Process OCR bounding boxes (if any)
if ocr_bbox:
ocr_bbox = torch.tensor(ocr_bbox) / torch.Tensor([w, h, w, h])
ocr_bbox = ocr_bbox.tolist()
else:
print('no ocr bbox!!!')
ocr_bbox = None
ocr_bbox_elem = [{'type': 'text', 'bbox': box, 'interactivity': False, 'content': txt}
for box, txt in zip(ocr_bbox, ocr_text)]
xyxy_elem = [{'type': 'icon', 'bbox': box, 'interactivity': True, 'content': None}
for box in xyxy.tolist()]
filtered_boxes = remove_overlap_new(boxes=xyxy_elem, iou_threshold=iou_threshold, ocr_bbox=ocr_bbox_elem)
# Sort filtered boxes so that boxes with 'content' == None are at the end
filtered_boxes_elem = sorted(filtered_boxes, key=lambda x: x['content'] is None)
starting_idx = next((i for i, box in enumerate(filtered_boxes_elem) if box['content'] is None), -1)
filtered_boxes_tensor = torch.tensor([box['bbox'] for box in filtered_boxes_elem])
if batch_size is None:
batch_size = 32
# Generate parsed icon semantics if required
if use_local_semantics:
caption_model = caption_model_processor['model']
if 'phi3_v' in caption_model.config.model_type:
parsed_content_icon = get_parsed_content_icon_phi3v(filtered_boxes_tensor, ocr_bbox, image_source_np, caption_model_processor)
else:
parsed_content_icon = get_parsed_content_icon(filtered_boxes_tensor, starting_idx, image_source_np, caption_model_processor, prompt=prompt, batch_size=batch_size)
ocr_text = [f"Text Box ID {i}: {txt}" for i, txt in enumerate(ocr_text)]
icon_start = len(ocr_text)
parsed_content_icon_ls = []
# Fill boxes with no OCR content with parsed icon content
for box in filtered_boxes_elem:
if box['content'] is None and parsed_content_icon:
box['content'] = parsed_content_icon.pop(0)
for i, txt in enumerate(parsed_content_icon):
parsed_content_icon_ls.append(f"Icon Box ID {str(i+icon_start)}: {txt}")
parsed_content_merged = ocr_text + parsed_content_icon_ls
else:
ocr_text = [f"Text Box ID {i}: {txt}" for i, txt in enumerate(ocr_text)]
parsed_content_merged = ocr_text
filtered_boxes_cxcywh = box_convert(boxes=filtered_boxes_tensor, in_fmt="xyxy", out_fmt="cxcywh")
phrases = [i for i in range(len(filtered_boxes_cxcywh))]
# Annotate image with bounding boxes and labels
if draw_bbox_config:
annotated_frame, label_coordinates = annotate(
image_source=image_source_np, boxes=filtered_boxes_cxcywh, logits=logits, phrases=phrases, **draw_bbox_config
)
else:
annotated_frame, label_coordinates = annotate(
image_source=image_source_np, boxes=filtered_boxes_cxcywh, logits=logits, phrases=phrases,
text_scale=text_scale, text_padding=text_padding
)
pil_img = Image.fromarray(annotated_frame)
buffered = io.BytesIO()
pil_img.save(buffered, format="PNG")
encoded_image = base64.b64encode(buffered.getvalue()).decode('ascii')
if output_coord_in_ratio:
label_coordinates = {k: [v[0] / w, v[1] / h, v[2] / w, v[3] / h] for k, v in label_coordinates.items()}
assert w == annotated_frame.shape[1] and h == annotated_frame.shape[0]
return encoded_image, label_coordinates, filtered_boxes_elem
def get_xywh(input):
"""
Converts a bounding box from a list of two points into (x, y, width, height).
"""
x, y = input[0][0], input[0][1]
w = input[2][0] - input[0][0]
h = input[2][1] - input[0][1]
return int(x), int(y), int(w), int(h)
def get_xyxy(input):
"""
Converts a bounding box from a list of two points into (x, y, x2, y2).
"""
x, y = input[0][0], input[0][1]
x2, y2 = input[2][0], input[2][1]
return int(x), int(y), int(x2), int(y2)
def get_xywh_yolo(input):
"""
Converts a YOLO-style bounding box (x1, y1, x2, y2) into (x, y, width, height).
"""
x, y = input[0], input[1]
w = input[2] - input[0]
h = input[3] - input[1]
return int(x), int(y), int(w), int(h)
def check_ocr_box(image_path, display_img=True, output_bb_format='xywh', goal_filtering=None, easyocr_args=None, use_paddleocr=False):
"""
Runs OCR on the given image using PaddleOCR or EasyOCR and optionally displays annotated results.
Returns:
A tuple containing:
- A tuple (text, bounding boxes)
- The goal_filtering parameter (unchanged)
"""
if use_paddleocr:
text_threshold = 0.5 if easyocr_args is None else easyocr_args.get('text_threshold', 0.5)
result = paddle_ocr.ocr(image_path, cls=False)[0]
conf = [item[1] for item in result]
coord = [item[0] for item in result if item[1][1] > text_threshold]
text = [item[1][0] for item in result if item[1][1] > text_threshold]
else: # EasyOCR
if easyocr_args is None:
easyocr_args = {}
result = reader.readtext(image_path, **easyocr_args)
coord = [item[0] for item in result]
text = [item[1] for item in result]
if display_img:
opencv_img = cv2.imread(image_path)
opencv_img = cv2.cvtColor(opencv_img, cv2.COLOR_RGB2BGR)
bb = []
for item in coord:
x, y, a, b = get_xywh(item)
bb.append((x, y, a, b))
cv2.rectangle(opencv_img, (x, y), (x + a, y + b), (0, 255, 0), 2)
plt.imshow(opencv_img)
else:
if output_bb_format == 'xywh':
bb = [get_xywh(item) for item in coord]
elif output_bb_format == 'xyxy':
bb = [get_xyxy(item) for item in coord]
return (text, bb), goal_filtering