sketch-to-BPMN / modules /toWizard.py
BenjiELCA's picture
add commentary to all the code
64b088f
import xml.etree.ElementTree as ET
from modules.utils import class_dict
from xml.dom import minidom
from modules.utils import error
from modules.OCR import analyze_sentiment
def rescale(scale, boxes):
"""
Rescale the coordinates of the bounding boxes by a given scale factor.
Args:
scale (float): The scale factor to apply.
boxes (list): List of bounding boxes to be rescaled.
Returns:
list: Rescaled bounding boxes.
"""
for i in range(len(boxes)):
boxes[i] = [boxes[i][0] * scale, boxes[i][1] * scale, boxes[i][2] * scale, boxes[i][3] * scale]
return boxes
def create_BPMN_id(data):
"""
Create unique BPMN IDs for each element in the data based on their types.
Args:
data (dict): Dictionary containing labels and links of elements.
Returns:
dict: Updated data with BPMN IDs assigned.
"""
enum_end, enum_start, enum_task, enum_sequence, enum_dataflow, enum_messflow, enum_messageEvent, enum_exclusiveGateway, enum_parallelGateway, enum_pool = 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
BPMN_name = [class_dict[data['labels'][i]] for i in range(len(data['labels']))]
for idx, Bpmn_id in enumerate(BPMN_name):
if Bpmn_id == 'event':
if data['links'][idx][0] is not None and data['links'][idx][1] is None:
data['BPMN_id'][idx] = f'end_event_{enum_end}'
enum_end += 1
elif data['links'][idx][0] is None and data['links'][idx][1] is not None:
data['BPMN_id'][idx] = f'start_event_{enum_start}'
enum_start += 1
elif Bpmn_id == 'task' or Bpmn_id == 'dataObject':
data['BPMN_id'][idx] = f'task_{enum_task}'
enum_task += 1
elif Bpmn_id == 'sequenceFlow':
data['BPMN_id'][idx] = f'sequenceFlow_{enum_sequence}'
enum_sequence += 1
elif Bpmn_id == 'messageFlow':
data['BPMN_id'][idx] = f'messageFlow_{enum_messflow}'
enum_messflow += 1
elif Bpmn_id == 'messageEvent':
data['BPMN_id'][idx] = f'message_event_{enum_messageEvent}'
enum_messageEvent += 1
elif Bpmn_id == 'exclusiveGateway':
data['BPMN_id'][idx] = f'exclusiveGateway_{enum_exclusiveGateway}'
enum_exclusiveGateway += 1
elif Bpmn_id == 'parallelGateway':
data['BPMN_id'][idx] = f'parallelGateway_{enum_parallelGateway}'
enum_parallelGateway += 1
elif Bpmn_id == 'dataAssociation':
data['BPMN_id'][idx] = f'dataAssociation_{enum_sequence}'
enum_dataflow += 1
elif Bpmn_id == 'pool':
data['BPMN_id'][idx] = f'pool_{enum_pool}'
enum_pool += 1
return data
def check_end(link):
"""
Check if a link represents an end event.
Args:
link (tuple): A link containing indices of connected elements.
Returns:
bool: True if the link represents an end event, False otherwise.
"""
if link[1] is None:
return True
return False
def connect(data, text_mapping, i):
"""
Connect elements based on their links and generate the corresponding text mapping.
Args:
data (dict): Data containing links and BPMN IDs.
text_mapping (dict): Mapping of BPMN IDs to their text descriptions.
i (int): Index of the current element.
Returns:
tuple: Current text, next texts, and next ID.
"""
next_text = []
target_idx = data['links'][i][1]
# Check if the target index is valid
if target_idx == None or target_idx >= len(data['links']):
error('There may be an error with the Vizi file, care when you download it.')
return None, None, None
current_id = data['BPMN_id'][i]
current_text = text_mapping[current_id]
next_idx = data['links'][target_idx][1]
next_id = data['BPMN_id'][next_idx]
if next_id.split('_')[0] == 'exclusiveGateway':
for idx, link in enumerate(data['links']):
if link[0] == next_idx and link[1] is not None:
next_text.append(text_mapping[data['BPMN_id'][link[1]]])
elif next_id.split('_')[0] == 'parallelGateway':
for idx, link in enumerate(data['links']):
if link[0] == next_idx and link[1] is not None:
next_text.append(text_mapping[data['BPMN_id'][link[1]]])
else:
next_text.append(text_mapping[next_id])
return current_text, next_text, next_id
def check_start(val):
"""
Check if a link represents a start event.
Args:
val (tuple): A link containing indices of connected elements.
Returns:
bool: True if the link represents a start event, False otherwise.
"""
if val[0] is None:
return True
return False
def find_merge(bpmn_id, links):
"""
Identify merge points in the BPMN diagram.
Args:
bpmn_id (list): List of BPMN IDs.
links (list): List of links between elements.
Returns:
list: List indicating merge points.
"""
merge = []
for idx, link in enumerate(links):
next_element = link[1]
if next_element is None:
merge.append(None)
continue
next_object = links[next_element][1]
if next_object is None:
merge.append(None)
continue
if bpmn_id[next_object].split('_')[0] == 'parallelGateway':
merge.append(bpmn_id[next_object])
else:
merge.append(None)
merge_elements = merge.copy()
for idx, element in enumerate(merge):
if element is None:
merge_elements[idx] = False
continue
# Count how many times the element is in the list
count = merge.count(element)
if count > 1:
merge_elements[idx] = True
else:
merge_elements[idx] = False
return merge_elements
def find_positive_end(bpmn_ids, links, text_mapping):
"""
Find the positive end event based on sentiment analysis.
Args:
bpmn_ids (list): List of BPMN IDs.
links (list): List of links between elements.
text_mapping (dict): Mapping of BPMN IDs to their text descriptions.
Returns:
str: BPMN ID of the positive end event.
"""
emotion_data = []
for idx, bpmn_id in enumerate(bpmn_ids):
if idx >= len(links):
continue
if check_end(links[idx]) and (bpmn_id.split('_')[0] in ['event', 'message']):
# Perform sentiment analysis and get the highest scoring emotion and its score between positive and negative
highest_emotion, highest_score = analyze_sentiment(text_mapping[bpmn_id])
emotion_data.append((bpmn_id, highest_emotion, highest_score))
# Sort by emotion label with 'positive' first and 'negative' second,
# then by score in descending order
sorted_emotions = sorted(emotion_data, key=lambda x: (x[1] != 'positive', -x[2]))
return sorted_emotions[0][0] if len(sorted_emotions) > 0 else None
def find_best_direction(texts_list):
"""
Find the best direction based on sentiment analysis.
Args:
texts_list (list): List of texts to analyze.
Returns:
str: Text with the best (positive) sentiment.
"""
emotion_data = []
for text in texts_list:
highest_emotion, highest_score = analyze_sentiment(text)
emotion_data.append((text, highest_emotion, highest_score))
# Sort by emotion label with 'positive' first and 'negative' second,
# then by score in descending order
sorted_emotions = sorted(emotion_data, key=lambda x: (x[1] != 'positive', -x[2]))
return sorted_emotions[0][0] if len(sorted_emotions) > 0 else None
def create_wizard_file(data, text_mapping):
"""
Create a wizard file for BPMN modeling based on the provided data and text mappings.
Args:
data (dict): Data containing BPMN elements and their properties.
text_mapping (dict): Mapping of BPMN IDs to their text descriptions.
Returns:
str: Pretty-printed XML string of the wizard file.
"""
not_change = ['pool','sequenceFlow','messageFlow','dataAssociation']
# Add a name into the text_mapping when there is no name
for idx, key in enumerate(text_mapping.keys()):
if text_mapping[key] == '' and key.split('_')[0] not in not_change:
text_mapping[key] = f'unnamed_{key}'
root = ET.Element('methodAndStyleWizard')
modelName = ET.SubElement(root, 'modelName')
modelName.text = 'My Diagram'
author = ET.SubElement(root, 'author')
author.text = 'sketch-to-BPMN'
# Add pools to the collaboration element
for idx, (pool_index, keep_elements) in enumerate(data['pool_dict'].items()):
pool_id = f'participant_{idx+1}'
pool = ET.SubElement(root, 'processName')
pool.text = text_mapping[pool_index]
processDescription = ET.SubElement(root, 'processDescription')
first = False
for idx, Bpmn_id in enumerate(data['BPMN_id']):
# Start Event
element_type = Bpmn_id.split('_')[0]
if element_type == 'message':
eventType = 'Message'
elif element_type == 'event':
eventType = 'None'
if idx >= len(data['links']):
continue
if check_start(data['links'][idx]) and (element_type == 'event' or element_type == 'message'):
if text_mapping[Bpmn_id] == '':
text_mapping[Bpmn_id] = 'start'
startEvent = ET.SubElement(root, 'startEvent', attrib={'name': text_mapping[Bpmn_id], 'eventType': eventType, 'isRegular': 'True'})
requestMessage = ET.SubElement(root, 'requestMessage')
requester = ET.SubElement(root, 'requester')
endEvents = ET.SubElement(root, 'endStates')
positive_end = find_positive_end(data['BPMN_id'], data['links'], text_mapping)
if positive_end is not None:
print("Best end is: ", text_mapping[positive_end])
# Add end states event to the collaboration element
for idx, Bpmn_id in enumerate(data['BPMN_id']):
# End States
if idx >= len(data['links']):
continue
if check_end(data['links'][idx]) and (Bpmn_id.split('_')[0] == 'event' or Bpmn_id.split('_')[0] == 'message'):
if text_mapping[Bpmn_id] == '':
text_mapping[Bpmn_id] = '(unnamed)'
if Bpmn_id == positive_end:
ET.SubElement(endEvents, 'endState', attrib={'name': text_mapping[Bpmn_id], 'eventType': 'None', 'isRegular': 'True'})
else:
ET.SubElement(endEvents, 'endState', attrib={'name': text_mapping[Bpmn_id], 'eventType': 'None', 'isRegular': 'False'})
# Add activities to the collaboration element
activities = ET.SubElement(root, 'activities')
for idx, activity_name in enumerate(data['BPMN_id']):
if activity_name.startswith('task'):
activity = ET.SubElement(activities, 'activity', attrib={'name': text_mapping.get(activity_name, activity_name), 'performer': ''})
endStates = ET.SubElement(activity, 'endStates')
current_text, next_text, next_id = connect(data, text_mapping, idx)
if next_text is not None and len(next_text) == 1:
ET.SubElement(endStates, 'endState', attrib={'name': next_text[0], 'isRegular': 'True'})
elif next_text is not None and len(next_text) >= 2 and next_id.split('_')[0] == 'exclusiveGateway':
best_direction = find_best_direction(next_text)
if best_direction is not None:
print("Best direction is: ", best_direction)
for i in range(len(next_text)):
if next_text[i] == best_direction:
ET.SubElement(endStates, 'endState', attrib={'name': next_text[i], 'isRegular': 'True'})
else:
ET.SubElement(endStates, 'endState', attrib={'name': next_text[i], 'isRegular': 'False'})
ET.SubElement(activity, 'subActivities')
ET.SubElement(activity, 'subActivityFlows')
ET.SubElement(activity, 'messageFlows')
merge_object = find_merge(data['BPMN_id'], data['links'])
activityFlows = ET.SubElement(root, 'activityFlows')
for i, link in enumerate(data['links']):
# create flow with start event
if link[0] is None and link[1] is not None and (data['BPMN_id'][i].split('_')[0] == 'event' or data['BPMN_id'][i].split('_')[0] == 'message'):
current_text, next_text, _ = connect(data, text_mapping, i)
if current_text is None or next_text is None:
continue
ET.SubElement(activityFlows, 'activityFlow', attrib={'startEvent': current_text, 'endState': '---', 'target': next_text[0], 'isMerging': 'False', 'isPredefined': 'True'})
continue
# create flow with tasks
if link[0] is not None and link[1] is not None and data['BPMN_id'][i].split('_')[0] == 'task':
current_text, next_text, next_id = connect(data, text_mapping, i)
if current_text is None or next_text is None:
continue
if merge_object[i] == True:
merge = 'True'
else:
merge = 'False'
if len(next_text) == 2 and next_id.split('_')[0] == 'exclusiveGateway':
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': next_text[0], 'target': next_text[0], 'isMerging': 'False', 'isPredefined': 'True'})
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': next_text[1], 'target': next_text[1], 'isMerging': 'False', 'isPredefined': 'True'})
elif len(next_text) > 1 and next_id.split('_')[0] == 'parallelGateway':
for next in next_text:
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next, 'isMerging': merge, 'isPredefined': 'True'})
elif len(next_text) == 1:
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text[0], 'isMerging': merge, 'isPredefined': 'True'})
else:
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text, 'isMerging': merge, 'isPredefined': 'True'})
ET.SubElement(root, 'participants')
# Pretty print the XML
pwm_str = ET.tostring(root, encoding='utf-8', method='xml')
pretty_pwm_str = minidom.parseString(pwm_str).toprettyxml(indent=" ")
return pretty_pwm_str