Spaces:
Running
Running
import os | |
from azure.ai.vision.imageanalysis import ImageAnalysisClient | |
from azure.ai.vision.imageanalysis.models import VisualFeatures | |
from azure.core.credentials import AzureKeyCredential | |
import numpy as np | |
import networkx as nx | |
from modules.utils import class_dict, proportion_inside | |
import json | |
from modules.utils import rescale_boxes as rescale, is_vertical | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
import torch | |
import logging | |
VISION_KEY = os.getenv("VISION_KEY") | |
VISION_ENDPOINT = os.getenv("VISION_ENDPOINT") | |
#If local execution | |
"""with open("VISION_KEY.json", "r") as json_file: | |
json_data = json.load(json_file) | |
VISION_KEY = json_data["VISION_KEY"] | |
VISION_ENDPOINT = json_data["VISION_ENDPOINT"]""" | |
# Suppress specific warnings from transformers | |
logging.getLogger("transformers.modeling_utils").setLevel(logging.ERROR) | |
# Function to initialize the model and tokenizer | |
def initialize_model(): | |
""" | |
Initialize the tokenizer and model for sentiment analysis. | |
""" | |
tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest") | |
model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest") | |
return tokenizer, model | |
# Initialize model and tokenizer | |
tokenizer, emotion_model = initialize_model() | |
# Function to perform sentiment analysis and return the highest scoring emotion and its score between positive and negative | |
def analyze_sentiment(sentence, tokenizer=tokenizer, model=emotion_model): | |
""" | |
Analyze the sentiment of a given sentence using the initialized tokenizer and model. | |
Parameters: | |
- sentence (str): The input sentence to analyze. | |
- tokenizer (AutoTokenizer): The tokenizer for processing the sentence. | |
- model (AutoModelForSequenceClassification): The model for sentiment analysis. | |
Returns: | |
- tuple: The highest scoring emotion ('positive' or 'negative') and its corresponding score. | |
""" | |
inputs = tokenizer(sentence, return_tensors="pt") | |
outputs = model(**inputs) | |
probs = torch.nn.functional.softmax(outputs.logits, dim=-1).squeeze().tolist() | |
labels = ["negative", "neutral", "positive"] | |
results = dict(zip(labels, probs)) | |
# Filter out the neutral score and get the highest score between positive and negative | |
relevant_results = {k: results[k] for k in ["positive", "negative"]} | |
highest_emotion = max(relevant_results, key=relevant_results.get) | |
highest_score = relevant_results[highest_emotion] | |
return highest_emotion, highest_score | |
def sample_ocr_image_file(image_data): | |
""" | |
Sample OCR function to analyze an image file and extract text using Azure's Computer Vision service. | |
Parameters: | |
- image_data (bytes): The image data in bytes. | |
Returns: | |
- result: The OCR result from the Computer Vision service. | |
""" | |
# Set the values of your computer vision endpoint and computer vision key as environment variables: | |
try: | |
endpoint = VISION_ENDPOINT | |
key = VISION_KEY | |
except KeyError: | |
print("Missing environment variable 'VISION_ENDPOINT' or 'VISION_KEY'") | |
print("Set them before running this sample.") | |
exit() | |
# Create an Image Analysis client | |
client = ImageAnalysisClient( | |
endpoint=endpoint, | |
credential=AzureKeyCredential(key) | |
) | |
# Extract text (OCR) from an image stream. This will be a synchronously (blocking) call. | |
result = client.analyze( | |
image_data=image_data, | |
visual_features=[VisualFeatures.READ] | |
) | |
return result | |
def text_prediction(image): | |
""" | |
Perform OCR on an image to extract text. | |
Parameters: | |
- image: The image to process. | |
Returns: | |
- ocr_result: The OCR result. | |
""" | |
# Transform the image into a byte array | |
image.save('temp.jpg') | |
with open('temp.jpg', 'rb') as f: | |
image_data = f.read() | |
ocr_result = sample_ocr_image_file(image_data) | |
# Delete the temporary image | |
os.remove('temp.jpg') | |
return ocr_result | |
def filter_text(ocr_result, threshold=0.5): | |
""" | |
Filter and process the OCR results to remove unwanted characters and low-confidence words. | |
Parameters: | |
- ocr_result: The OCR result. | |
- threshold (float): The confidence threshold for filtering words. | |
Returns: | |
- list_of_lines: Processed text lines and their bounding boxes. | |
""" | |
words_to_cancel = {"-","--","---","+",".",",","#","@","!","?","(",")","[","]","{","}","<",">","/","\\","|","-","_","=","&","^","%","$","£","€","¥","¢","¤","§","©","®","™","°","±","×","÷","¶","∆","∏","∑","∞","√","∫","≈","≠","≤","≥","≡","∼"} | |
# Add every other one-letter word to the list of words to cancel, except 'I' and 'a' | |
for letter in "bcdefghjklmnopqrstuvwxyz1234567890": # All lowercase letters except 'a' | |
words_to_cancel.add(letter) | |
words_to_cancel.add("i") | |
words_to_cancel.add(letter.upper()) # Add the uppercase version as well | |
characters_to_cancel = {"+", "<", ">"} # Characters to cancel | |
list_of_lines = [] | |
for block in ocr_result['readResult']['blocks']: | |
for line in block['lines']: | |
line_text = [] | |
x_min, y_min = float('inf'), float('inf') | |
x_max, y_max = float('-inf'), float('-inf') | |
for word in line['words']: | |
if word['text'] in words_to_cancel or any(disallowed_char in word['text'] for disallowed_char in characters_to_cancel): | |
continue | |
if word['confidence'] > threshold: | |
if word['text']: | |
line_text.append(word['text']) | |
x = [point['x'] for point in word['boundingPolygon']] | |
y = [point['y'] for point in word['boundingPolygon']] | |
x_min = min(x_min, min(x)) | |
y_min = min(y_min, min(y)) | |
x_max = max(x_max, max(x)) | |
y_max = max(y_max, max(y)) | |
if line_text: # If there are valid words in the line | |
list_of_lines.append({ | |
'text': ' '.join(line_text), | |
'boundingBox': [x_min,y_min,x_max,y_max] | |
}) | |
list_text = [] | |
list_bbox = [] | |
for i in range(len(list_of_lines)): | |
list_text.append(list_of_lines[i]['text']) | |
for i in range(len(list_of_lines)): | |
list_bbox.append(list_of_lines[i]['boundingBox']) | |
list_of_lines = [list_bbox, list_text] | |
return list_of_lines | |
def get_box_points(box): | |
""" | |
Returns all critical points of a box: corners and midpoints of edges. | |
Parameters: | |
- box (array): Bounding box coordinates [xmin, ymin, xmax, ymax]. | |
Returns: | |
- numpy.array: Array of critical points. | |
""" | |
xmin, ymin, xmax, ymax = box | |
return np.array([ | |
[xmin, ymin], # Bottom-left corner | |
[xmax, ymin], # Bottom-right corner | |
[xmin, ymax], # Top-left corner | |
[xmax, ymax], # Top-right corner | |
[(xmin + xmax) / 2, ymin], # Midpoint of bottom edge | |
[(xmin + xmax) / 2, ymax], # Midpoint of top edge | |
[xmin, (ymin + ymax) / 2], # Midpoint of left edge | |
[xmax, (ymin + ymax) / 2] # Midpoint of right edge | |
]) | |
def min_distance_between_boxes(box1, box2): | |
""" | |
Computes the minimum distance between two boxes considering all critical points. | |
Parameters: | |
- box1 (array): First bounding box coordinates. | |
- box2 (array): Second bounding box coordinates. | |
Returns: | |
- float: The minimum distance between the two boxes. | |
""" | |
points1 = get_box_points(box1) | |
points2 = get_box_points(box2) | |
min_dist = float('inf') | |
for point1 in points1: | |
for point2 in points2: | |
dist = np.linalg.norm(point1 - point2) | |
if dist < min_dist: | |
min_dist = dist | |
return min_dist | |
def are_close(box1, box2, threshold=50): | |
""" | |
Determines if boxes are close based on their corners and center points. | |
Parameters: | |
- box1 (array): First bounding box coordinates. | |
- box2 (array): Second bounding box coordinates. | |
- threshold (int): Distance threshold for determining closeness. | |
Returns: | |
- bool: True if boxes are close, otherwise False. | |
""" | |
corners1 = np.array([ | |
[box1[0], box1[1]], [box1[0], box1[3]], [box1[2], box1[1]], [box1[2], box1[3]], | |
[(box1[0]+box1[2])/2, box1[1]], [(box1[0]+box1[2])/2, box1[3]], | |
[box1[0], (box1[1]+box1[3])/2], [box1[2], (box1[1]+box1[3])/2] | |
]) | |
corners2 = np.array([ | |
[box2[0], box2[1]], [box2[0], box2[3]], [box2[2], box2[1]], [box2[2], box2[3]], | |
[(box2[0]+box2[2])/2, box2[1]], [(box2[0]+box2[2])/2, box2[3]], | |
[box2[0], (box2[1]+box2[3])/2], [box2[2], (box2[1]+box2[3])/2] | |
]) | |
for c1 in corners1: | |
for c2 in corners2: | |
if np.linalg.norm(c1 - c2) < threshold: | |
return True | |
return False | |
def find_closest_box(text_box, all_boxes, labels, threshold, iou_threshold=0.5): | |
""" | |
Find the closest box to the given text box within a specified threshold. | |
Parameters: | |
- text_box (array): The text box coordinates. | |
- all_boxes (list): List of all bounding boxes. | |
- labels (list): List of labels corresponding to the boxes. | |
- threshold (float): Distance threshold for determining closeness. | |
- iou_threshold (float): IoU threshold for determining if a text is inside a sequenceFlow. | |
Returns: | |
- int or None: Index of the closest box or None if no box is close enough. | |
""" | |
min_distance = float('inf') | |
closest_index = None | |
# Check if the text is inside a sequenceFlow | |
for j in range(len(all_boxes)): | |
if proportion_inside(text_box, all_boxes[j]) > iou_threshold and labels[j] == list(class_dict.values()).index('sequenceFlow'): | |
return j | |
for i, box in enumerate(all_boxes): | |
# Compute the center of both boxes | |
center_text = np.array([(text_box[0] + text_box[2]) / 2, (text_box[1] + text_box[3]) / 2]) | |
center_box = np.array([(box[0] + box[2]) / 2, (box[1] + box[3]) / 2]) | |
# Calculate Euclidean distance between centers | |
distance = np.linalg.norm(center_text - center_box) | |
# Update closest box if this box is nearer | |
if distance < min_distance: | |
min_distance = distance | |
closest_index = i | |
# Check if the closest box found is within the acceptable threshold | |
if min_distance < threshold: | |
return closest_index | |
return None | |
def group_texts(task_boxes, text_boxes, texts, min_dist=50, iou_threshold=0.8, percentage_thresh=0.8): | |
""" | |
Maps text boxes to task boxes and groups texts within each task based on proximity. | |
Parameters: | |
- task_boxes (list): List of task bounding boxes. | |
- text_boxes (list): List of text bounding boxes. | |
- texts (list): List of texts corresponding to the text boxes. | |
- min_dist (float): Minimum distance threshold for grouping. | |
- iou_threshold (float): IoU threshold for determining if text is inside a task box. | |
- percentage_thresh (float): Percentage threshold for determining if text boxes are close. | |
Returns: | |
- tuple: Grouped task-related texts, their bounding boxes, grouped information texts, and their bounding boxes. | |
""" | |
G = nx.Graph() | |
# Map each text box to the nearest task box | |
task_to_texts = {i: [] for i in range(len(task_boxes))} | |
information_texts = [] # Texts not inside any task box | |
text_to_task_mapped = [False] * len(text_boxes) | |
for idx, text_box in enumerate(text_boxes): | |
mapped = False | |
for jdx, task_box in enumerate(task_boxes): | |
if proportion_inside(text_box, task_box) > iou_threshold: | |
task_to_texts[jdx].append(idx) | |
text_to_task_mapped[idx] = True | |
mapped = True | |
break | |
if not mapped: | |
information_texts.append(idx) | |
all_grouped_texts = [] | |
sentence_boxes = [] # Store the bounding box for each sentence | |
# Process texts for each task | |
for task_texts in task_to_texts.values(): | |
G.clear() | |
for i in task_texts: | |
G.add_node(i) | |
for j in task_texts: | |
if i != j and are_close(text_boxes[i], text_boxes[j]) and not is_vertical(text_boxes[i]) and not is_vertical(text_boxes[j]): | |
G.add_edge(i, j) | |
groups = list(nx.connected_components(G)) | |
for group in groups: | |
group = list(group) | |
lines = {} | |
for idx in group: | |
y_center = (text_boxes[idx][1] + text_boxes[idx][3]) / 2 | |
found_line = False | |
for line in lines: | |
if abs(y_center - line) < (text_boxes[idx][3] - text_boxes[idx][1]) / 2: | |
lines[line].append(idx) | |
found_line = True | |
break | |
if not found_line: | |
lines[y_center] = [idx] | |
sorted_lines = sorted(lines.keys()) | |
grouped_texts = [] | |
min_x = min_y = float('inf') | |
max_x = max_y = -float('inf') | |
for line in sorted_lines: | |
sorted_indices = sorted(lines[line], key=lambda idx: text_boxes[idx][0]) | |
line_text = ' '.join(texts[idx] for idx in sorted_indices) | |
grouped_texts.append(line_text) | |
for idx in sorted_indices: | |
box = text_boxes[idx] | |
min_x = min(min_x-5, box[0]-5) | |
min_y = min(min_y-5, box[1]-5) | |
max_x = max(max_x+5, box[2]+5) | |
max_y = max(max_y+5, box[3]+5) | |
all_grouped_texts.append(' '.join(grouped_texts)) | |
sentence_boxes.append([min_x, min_y, max_x, max_y]) | |
# Group information texts | |
G.clear() | |
info_sentence_boxes = [] | |
for i in information_texts: | |
G.add_node(i) | |
for j in information_texts: | |
if i != j and are_close(text_boxes[i], text_boxes[j], percentage_thresh * min_dist) and not is_vertical(text_boxes[i]) and not is_vertical(text_boxes[j]): | |
G.add_edge(i, j) | |
info_groups = list(nx.connected_components(G)) | |
information_grouped_texts = [] | |
for group in info_groups: | |
group = list(group) | |
lines = {} | |
for idx in group: | |
y_center = (text_boxes[idx][1] + text_boxes[idx][3]) / 2 | |
found_line = False | |
for line in lines: | |
if abs(y_center - line) < (text_boxes[idx][3] - text_boxes[idx][1]) / 2: | |
lines[line].append(idx) | |
found_line = True | |
break | |
if not found_line: | |
lines[y_center] = [idx] | |
sorted_lines = sorted(lines.keys()) | |
grouped_texts = [] | |
min_x = min_y = float('inf') | |
max_x = max_y = -float('inf') | |
for line in sorted_lines: | |
sorted_indices = sorted(lines[line], key=lambda idx: text_boxes[idx][0]) | |
line_text = ' '.join(texts[idx] for idx in sorted_indices) | |
grouped_texts.append(line_text) | |
for idx in sorted_indices: | |
box = text_boxes[idx] | |
min_x = min(min_x, box[0]) | |
min_y = min(min_y, box[1]) | |
max_x = max(max_x, box[2]) | |
max_y = max(max_y, box[3]) | |
information_grouped_texts.append(' '.join(grouped_texts)) | |
info_sentence_boxes.append([min_x, min_y, max_x, max_y]) | |
return all_grouped_texts, sentence_boxes, information_grouped_texts, info_sentence_boxes | |
def mapping_text(full_pred, text_pred, print_sentences=False, percentage_thresh=0.6, scale=1.0, iou_threshold=0.5): | |
""" | |
Map the extracted texts to the predicted bounding boxes. | |
Parameters: | |
- full_pred (dict): Full prediction dictionary containing boxes, labels, BPMN IDs, and pool dictionary. | |
- text_pred (list): List containing text predictions and their bounding boxes. | |
- print_sentences (bool): Whether to print the sentences and their bounding boxes. | |
- percentage_thresh (float): Percentage threshold for determining closeness. | |
- scale (float): Scale factor for rescaling bounding boxes. | |
- iou_threshold (float): IoU threshold for determining if text is inside a bounding box. | |
Returns: | |
- dict: Text mapping for BPMN elements. | |
""" | |
boxes = rescale(scale, full_pred['boxes']) | |
min_dist = 200 | |
labels = full_pred['labels'] | |
avoid = [list(class_dict.values()).index('pool'), list(class_dict.values()).index('lane'), list(class_dict.values()).index('sequenceFlow'), list(class_dict.values()).index('messageFlow'), list(class_dict.values()).index('dataAssociation')] | |
for i in range(len(boxes)): | |
box1 = boxes[i] | |
if labels[i] in avoid: | |
continue | |
for j in range(i + 1, len(boxes)): | |
box2 = boxes[j] | |
if labels[j] in avoid: | |
continue | |
dist = min_distance_between_boxes(box1, box2) | |
min_dist = min(min_dist, dist) | |
# Print the minimum distance between boxes | |
# print("Minimum distance between boxes:", min_dist) | |
text_pred[0] = rescale(scale, text_pred[0]) | |
task_boxes = [box for i, box in enumerate(boxes) if full_pred['labels'][i] == list(class_dict.values()).index('task')] | |
grouped_sentences, sentence_bounding_boxes, info_texts, info_boxes = group_texts(task_boxes, text_pred[0], text_pred[1], min_dist=min_dist) | |
BPMN_id = set(full_pred['BPMN_id']) # This ensures uniqueness of task names | |
text_mapping = {id: '' for id in BPMN_id} | |
if print_sentences: | |
for sentence, box in zip(grouped_sentences, sentence_bounding_boxes): | |
print("Task-related Text:", sentence) | |
print("Bounding Box:", box) | |
print("Information Texts:", info_texts) | |
print("Information Bounding Boxes:", info_boxes) | |
# Map the grouped sentences to the corresponding task | |
for i in range(len(sentence_bounding_boxes)): | |
for j in range(len(boxes)): | |
if proportion_inside(sentence_bounding_boxes[i], boxes[j]) > iou_threshold and full_pred['labels'][j] == list(class_dict.values()).index('task'): | |
text_mapping[full_pred['BPMN_id'][j]] = grouped_sentences[i] | |
# Map the grouped sentences to the corresponding pool | |
for key, elements in full_pred['pool_dict'].items(): | |
if len(elements) > 0: | |
continue | |
else: | |
for i in range(len(info_boxes)): | |
# Find the position of the key in BPMN_id | |
position = list(full_pred['BPMN_id']).index(key) | |
if proportion_inside(info_boxes[i], boxes[position]) > iou_threshold: | |
text_mapping[key] = info_texts[i] | |
info_texts[i] = '' # Clear the text to avoid re-use | |
for i in range(len(info_boxes)): | |
if is_vertical(info_boxes[i]): | |
for j in range(len(boxes)): | |
if proportion_inside(info_boxes[i], boxes[j]) > 0 and full_pred['labels'][j] == list(class_dict.values()).index('pool'): | |
print("Text:", info_texts[i], "associate with ", full_pred['BPMN_id'][j]) | |
bpmn_id = full_pred['BPMN_id'][j] | |
# Append new text or create new entry if not existing | |
if bpmn_id in text_mapping: | |
text_mapping[bpmn_id] += " " + info_texts[i] # Append text with a space in between | |
else: | |
text_mapping[bpmn_id] = info_texts[i] | |
info_texts[i] = '' # Clear the text to avoid re-use | |
# Map the grouped sentences to the corresponding object | |
for i in range(len(info_boxes)): | |
if is_vertical(info_boxes[i]): | |
continue # Skip if the text is vertical | |
for j in range(len(boxes)): | |
if info_texts[i] == '': | |
continue # Skip if there's no text | |
if (proportion_inside(info_boxes[i], boxes[j]) > 0 or are_close(info_boxes[i], boxes[j], threshold=percentage_thresh * min_dist)) and (full_pred['labels'][j] == list(class_dict.values()).index('event') | |
or full_pred['labels'][j] == list(class_dict.values()).index('messageEvent') | |
or full_pred['labels'][j] == list(class_dict.values()).index('timerEvent') | |
or full_pred['labels'][j] == list(class_dict.values()).index('dataObject')): | |
bpmn_id = full_pred['BPMN_id'][j] | |
# Append new text or create new entry if not existing | |
if bpmn_id in text_mapping: | |
text_mapping[bpmn_id] += " " + info_texts[i] # Append text with a space in between | |
else: | |
text_mapping[bpmn_id] = info_texts[i] | |
info_texts[i] = '' # Clear the text to avoid re-use | |
# Map the grouped sentences to the corresponding flow | |
for i in range(len(info_boxes)): | |
if info_texts[i] == '' or is_vertical(info_boxes[i]): | |
continue # Skip if there's no text | |
# Find the closest box within the defined threshold | |
closest_index = find_closest_box(info_boxes[i], boxes, full_pred['labels'], threshold=4 * min_dist) | |
if closest_index is not None and (full_pred['labels'][closest_index] == list(class_dict.values()).index('sequenceFlow') or full_pred['labels'][closest_index] == list(class_dict.values()).index('messageFlow')): | |
bpmn_id = full_pred['BPMN_id'][closest_index] | |
# Append new text or create new entry if not existing | |
if bpmn_id in text_mapping: | |
text_mapping[bpmn_id] += " " + info_texts[i] # Append text with a space in between | |
else: | |
text_mapping[bpmn_id] = info_texts[i] | |
info_texts[i] = '' # Clear the text to avoid re-use | |
if print_sentences: | |
print("Text Mapping:", text_mapping) | |
print("Information Texts left:", info_texts) | |
return text_mapping | |