sketch-to-BPMN / modules /toWizard.py
BenjiELCA's picture
work a bit
3289919
raw
history blame
10 kB
import xml.etree.ElementTree as ET
from modules.utils import class_dict
from xml.dom import minidom
from modules.utils import error
def rescale(scale, boxes):
for i in range(len(boxes)):
boxes[i] = [boxes[i][0] * scale, boxes[i][1] * scale, boxes[i][2] * scale, boxes[i][3] * scale]
return boxes
def create_BPMN_id(data):
enum_end, enum_start, enum_task, enum_sequence, enum_dataflow, enum_messflow, enum_messageEvent, enum_exclusiveGateway, enum_parallelGateway, enum_pool = 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
BPMN_name = [class_dict[data['labels'][i]] for i in range(len(data['labels']))]
for idx, Bpmn_id in enumerate(BPMN_name):
if Bpmn_id == 'event':
if data['links'][idx][0] is not None and data['links'][idx][1] is None:
data['BPMN_id'][idx] = f'end_event_{enum_end}'
enum_end += 1
elif data['links'][idx][0] is None and data['links'][idx][1] is not None:
data['BPMN_id'][idx] = f'start_event_{enum_start}'
enum_start += 1
elif Bpmn_id == 'task' or Bpmn_id == 'dataObject':
data['BPMN_id'][idx] = f'task_{enum_task}'
enum_task += 1
elif Bpmn_id == 'sequenceFlow':
data['BPMN_id'][idx] = f'sequenceFlow_{enum_sequence}'
enum_sequence += 1
elif Bpmn_id == 'messageFlow':
data['BPMN_id'][idx] = f'messageFlow_{enum_messflow}'
enum_messflow += 1
elif Bpmn_id == 'messageEvent':
data['BPMN_id'][idx] = f'message_event_{enum_messageEvent}'
enum_messageEvent += 1
elif Bpmn_id == 'exclusiveGateway':
data['BPMN_id'][idx] = f'exclusiveGateway_{enum_exclusiveGateway}'
enum_exclusiveGateway += 1
elif Bpmn_id == 'parallelGateway':
data['BPMN_id'][idx] = f'parallelGateway_{enum_parallelGateway}'
enum_parallelGateway += 1
elif Bpmn_id == 'dataAssociation':
data['BPMN_id'][idx] = f'dataAssociation_{enum_sequence}'
enum_dataflow += 1
elif Bpmn_id == 'pool':
data['BPMN_id'][idx] = f'pool_{enum_pool}'
enum_pool += 1
return data
def check_end(link):
if link[1] is None:
return True
return False
def connect(data, text_mapping, i):
next_text = []
target_idx = data['links'][i][1]
# Check if the target index is valid
if target_idx==None or target_idx >= len(data['links']):
error('There is an error with the Vizi file, care when you download it.')
return None, None
current_id = data['BPMN_id'][i]
current_text = text_mapping[current_id]
next_idx = data['links'][target_idx][1]
next_id = data['BPMN_id'][next_idx]
if next_id.split('_')[0] == 'exclusiveGateway':
for idx, link in enumerate(data['links']):
if link[0] == next_idx and link[1] is not None:
next_text.append(text_mapping[data['BPMN_id'][link[1]]])
elif next_id.split('_')[0] == 'parallelGateway':
for idx, link in enumerate(data['links']):
if link[0] == next_idx and link[1] is not None:
next_text.append(text_mapping[data['BPMN_id'][link[1]]])
else:
next_text.append(text_mapping[next_id])
return current_text, next_text, next_id
def check_start(val):
if val[0] is None:
return True
return False
def find_merge(bpmn_id, links):
merge = []
for idx, link in enumerate(links):
next_element = link[1]
if next_element is None:
merge.append(None)
continue
next_object = links[next_element][1]
if next_object is None:
merge.append(None)
continue
if bpmn_id[next_object].split('_')[0] == 'parallelGateway':
merge.append(bpmn_id[next_object])
else:
merge.append(None)
merge_elements = merge.copy()
for idx, element in enumerate(merge):
if element is None:
merge_elements[idx] = False
continue
#count how many time the element is in the list
count = merge.count(element)
if count > 1:
merge_elements[idx] = True
else:
merge_elements[idx] = False
return merge_elements
def create_wizard_file(data, text_mapping):
root = ET.Element('methodAndStyleWizard')
modelName = ET.SubElement(root, 'modelName')
modelName.text = 'My Diagram'
author = ET.SubElement(root, 'author')
author.text = 'Benjamin'
# Add pools to the collaboration element
for idx, (pool_index, keep_elements) in enumerate(data['pool_dict'].items()):
pool_id = f'participant_{idx+1}'
pool = ET.SubElement(root, 'processName')
pool.text = text_mapping[pool_index]
processDescription = ET.SubElement(root, 'processDescription')
for idx, Bpmn_id in enumerate(data['BPMN_id']):
# Start Event
element_type = Bpmn_id.split('_')[0]
if element_type == 'messageEvent':
eventType = 'Message'
elif element_type == 'event':
eventType = 'None'
if idx >= len(data['links']):
continue
if check_start(data['links'][idx]) and (element_type=='event' or element_type=='messageEvent'):
startEvent = ET.SubElement(root, 'startEvent', attrib={'name': text_mapping[Bpmn_id], 'eventType': eventType, 'isRegular': 'False'})
requestMessage = ET.SubElement(root, 'requestMessage')
requester = ET.SubElement(root, 'requester')
endEvents = ET.SubElement(root, 'endStates')
# Add end states event to the collaboration element
for idx, Bpmn_id in enumerate(data['BPMN_id']):
# End States
if idx >= len(data['links']):
continue
if check_end(data['links'][idx]) and (Bpmn_id.split('_')[0] == 'event' or Bpmn_id.split('_')[0] == 'messageEvent'):
if text_mapping[Bpmn_id] == '':
text_mapping[Bpmn_id] = '(unnamed)'
ET.SubElement(endEvents, 'endState', attrib={'name': text_mapping[Bpmn_id], 'eventType': 'None', 'isRegular': 'False'})
# Add activities to the collaboration element
activities = ET.SubElement(root, 'activities')
for idx, activity_name in enumerate(data['BPMN_id']):
if activity_name.startswith('task'):
activity = ET.SubElement(activities, 'activity', attrib={'name': text_mapping.get(activity_name, activity_name), 'performer': ''})
endStates = ET.SubElement(activity, 'endStates')
current_text, next_text, next_id = connect(data, text_mapping, idx)
if next_text is not None and len(next_text) == 1:
ET.SubElement(endStates, 'endState', attrib={'name': next_text[0], 'isRegular': 'True'})
elif next_text is not None and len(next_text) >= 2 and next_id.split('_')[0] == 'exclusiveGateway':
ET.SubElement(endStates, 'endState', attrib={'name': next_text[0], 'isRegular': 'True'})
for i in range(1, len(next_text)):
ET.SubElement(endStates, 'endState', attrib={'name': next_text[i], 'isRegular': 'False'})
ET.SubElement(activity, 'subActivities')
ET.SubElement(activity, 'subActivityFlows')
ET.SubElement(activity, 'messageFlows')
merge_object = find_merge(data['BPMN_id'], data['links'])
activityFlows = ET.SubElement(root, 'activityFlows')
i=0
for i, link in enumerate(data['links']):
# create flow with start event
if link[0] is None and link[1] is not None and (data['BPMN_id'][i].split('_')[0] == 'event' or data['BPMN_id'][i].split('_')[0] == 'messageEvent'):
current_text, next_text, _ = connect(data, text_mapping, i)
if current_text is None or next_text is None:
continue
ET.SubElement(activityFlows, 'activityFlow', attrib={'startEvent': current_text, 'endState': '---', 'target': next_text[0], 'isMerging': 'False', 'isPredefined': 'True'})
i+=1
# create flow with tasks
if link[0] is not None and link[1] is not None and data['BPMN_id'][i].split('_')[0] == 'task':
current_text, next_text, next_id = connect(data, text_mapping, i)
if current_text is None or next_text is None:
continue
if merge_object[i] == True:
merge = 'True'
else:
merge = 'False'
if len(next_text) == 2 and next_id.split('_')[0] == 'exclusiveGateway':
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': next_text[0]+'?', 'target': next_text[0], 'isMerging': 'False', 'isPredefined': 'True'})
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text[1], 'isMerging': 'False', 'isPredefined': 'True'})
elif len(next_text) > 1 and next_id.split('_')[0] == 'parallelGateway':
for next in next_text:
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next, 'isMerging': merge, 'isPredefined': 'True'})
elif len(next_text) == 1:
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text[0], 'isMerging': merge, 'isPredefined': 'True'})
else:
ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text, 'isMerging': merge, 'isPredefined': 'True'})
i+=1
ET.SubElement(root, 'participants')
# Pretty print the XML
xml_str = ET.tostring(root, encoding='utf-8', method='xml')
pretty_xml_str = minidom.parseString(xml_str).toprettyxml(indent=" ")
return pretty_xml_str