import xml.etree.ElementTree as ET from modules.utils import class_dict from xml.dom import minidom from modules.utils import error def rescale(scale, boxes): for i in range(len(boxes)): boxes[i] = [boxes[i][0] * scale, boxes[i][1] * scale, boxes[i][2] * scale, boxes[i][3] * scale] return boxes def create_BPMN_id(data): enum_end, enum_start, enum_task, enum_sequence, enum_dataflow, enum_messflow, enum_messageEvent, enum_exclusiveGateway, enum_parallelGateway, enum_pool = 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 BPMN_name = [class_dict[data['labels'][i]] for i in range(len(data['labels']))] for idx, Bpmn_id in enumerate(BPMN_name): if Bpmn_id == 'event': if data['links'][idx][0] is not None and data['links'][idx][1] is None: data['BPMN_id'][idx] = f'end_event_{enum_end}' enum_end += 1 elif data['links'][idx][0] is None and data['links'][idx][1] is not None: data['BPMN_id'][idx] = f'start_event_{enum_start}' enum_start += 1 elif Bpmn_id == 'task' or Bpmn_id == 'dataObject': data['BPMN_id'][idx] = f'task_{enum_task}' enum_task += 1 elif Bpmn_id == 'sequenceFlow': data['BPMN_id'][idx] = f'sequenceFlow_{enum_sequence}' enum_sequence += 1 elif Bpmn_id == 'messageFlow': data['BPMN_id'][idx] = f'messageFlow_{enum_messflow}' enum_messflow += 1 elif Bpmn_id == 'messageEvent': data['BPMN_id'][idx] = f'message_event_{enum_messageEvent}' enum_messageEvent += 1 elif Bpmn_id == 'exclusiveGateway': data['BPMN_id'][idx] = f'exclusiveGateway_{enum_exclusiveGateway}' enum_exclusiveGateway += 1 elif Bpmn_id == 'parallelGateway': data['BPMN_id'][idx] = f'parallelGateway_{enum_parallelGateway}' enum_parallelGateway += 1 elif Bpmn_id == 'dataAssociation': data['BPMN_id'][idx] = f'dataAssociation_{enum_sequence}' enum_dataflow += 1 elif Bpmn_id == 'pool': data['BPMN_id'][idx] = f'pool_{enum_pool}' enum_pool += 1 return data def check_end(link): if link[1] is None: return True return False def connect(data, text_mapping, i): next_text = [] target_idx = data['links'][i][1] # Check if the target index is valid if target_idx==None or target_idx >= len(data['links']): error('There is an error with the Vizi file, care when you download it.') return None, None current_id = data['BPMN_id'][i] current_text = text_mapping[current_id] next_idx = data['links'][target_idx][1] next_id = data['BPMN_id'][next_idx] if next_id.split('_')[0] == 'exclusiveGateway': for idx, link in enumerate(data['links']): if link[0] == next_idx and link[1] is not None: next_text.append(text_mapping[data['BPMN_id'][link[1]]]) elif next_id.split('_')[0] == 'parallelGateway': for idx, link in enumerate(data['links']): if link[0] == next_idx and link[1] is not None: next_text.append(text_mapping[data['BPMN_id'][link[1]]]) else: next_text.append(text_mapping[next_id]) return current_text, next_text, next_id def check_start(val): if val[0] is None: return True return False def find_merge(bpmn_id, links): merge = [] for idx, link in enumerate(links): next_element = link[1] if next_element is None: merge.append(None) continue next_object = links[next_element][1] if next_object is None: merge.append(None) continue if bpmn_id[next_object].split('_')[0] == 'parallelGateway': merge.append(bpmn_id[next_object]) else: merge.append(None) merge_elements = merge.copy() for idx, element in enumerate(merge): if element is None: merge_elements[idx] = False continue #count how many time the element is in the list count = merge.count(element) if count > 1: merge_elements[idx] = True else: merge_elements[idx] = False return merge_elements def create_wizard_file(data, text_mapping): root = ET.Element('methodAndStyleWizard') modelName = ET.SubElement(root, 'modelName') modelName.text = 'My Diagram' author = ET.SubElement(root, 'author') author.text = 'Benjamin' # Add pools to the collaboration element for idx, (pool_index, keep_elements) in enumerate(data['pool_dict'].items()): pool_id = f'participant_{idx+1}' pool = ET.SubElement(root, 'processName') pool.text = text_mapping[pool_index] processDescription = ET.SubElement(root, 'processDescription') for idx, Bpmn_id in enumerate(data['BPMN_id']): # Start Event element_type = Bpmn_id.split('_')[0] if element_type == 'messageEvent': eventType = 'Message' elif element_type == 'event': eventType = 'None' if idx >= len(data['links']): continue if check_start(data['links'][idx]) and (element_type=='event' or element_type=='messageEvent'): startEvent = ET.SubElement(root, 'startEvent', attrib={'name': text_mapping[Bpmn_id], 'eventType': eventType, 'isRegular': 'False'}) requestMessage = ET.SubElement(root, 'requestMessage') requester = ET.SubElement(root, 'requester') endEvents = ET.SubElement(root, 'endStates') # Add end states event to the collaboration element for idx, Bpmn_id in enumerate(data['BPMN_id']): # End States if idx >= len(data['links']): continue if check_end(data['links'][idx]) and (Bpmn_id.split('_')[0] == 'event' or Bpmn_id.split('_')[0] == 'messageEvent'): if text_mapping[Bpmn_id] == '': text_mapping[Bpmn_id] = '(unnamed)' ET.SubElement(endEvents, 'endState', attrib={'name': text_mapping[Bpmn_id], 'eventType': 'None', 'isRegular': 'False'}) # Add activities to the collaboration element activities = ET.SubElement(root, 'activities') for idx, activity_name in enumerate(data['BPMN_id']): if activity_name.startswith('task'): activity = ET.SubElement(activities, 'activity', attrib={'name': text_mapping.get(activity_name, activity_name), 'performer': ''}) endStates = ET.SubElement(activity, 'endStates') current_text, next_text, next_id = connect(data, text_mapping, idx) if next_text is not None and len(next_text) == 1: ET.SubElement(endStates, 'endState', attrib={'name': next_text[0], 'isRegular': 'True'}) elif next_text is not None and len(next_text) >= 2 and next_id.split('_')[0] == 'exclusiveGateway': ET.SubElement(endStates, 'endState', attrib={'name': next_text[0], 'isRegular': 'True'}) for i in range(1, len(next_text)): ET.SubElement(endStates, 'endState', attrib={'name': next_text[i], 'isRegular': 'False'}) ET.SubElement(activity, 'subActivities') ET.SubElement(activity, 'subActivityFlows') ET.SubElement(activity, 'messageFlows') merge_object = find_merge(data['BPMN_id'], data['links']) activityFlows = ET.SubElement(root, 'activityFlows') i=0 for i, link in enumerate(data['links']): # create flow with start event if link[0] is None and link[1] is not None and (data['BPMN_id'][i].split('_')[0] == 'event' or data['BPMN_id'][i].split('_')[0] == 'messageEvent'): current_text, next_text, _ = connect(data, text_mapping, i) if current_text is None or next_text is None: continue ET.SubElement(activityFlows, 'activityFlow', attrib={'startEvent': current_text, 'endState': '---', 'target': next_text[0], 'isMerging': 'False', 'isPredefined': 'True'}) i+=1 # create flow with tasks if link[0] is not None and link[1] is not None and data['BPMN_id'][i].split('_')[0] == 'task': current_text, next_text, next_id = connect(data, text_mapping, i) if current_text is None or next_text is None: continue if merge_object[i] == True: merge = 'True' else: merge = 'False' if len(next_text) == 2 and next_id.split('_')[0] == 'exclusiveGateway': ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': next_text[0]+'?', 'target': next_text[0], 'isMerging': 'False', 'isPredefined': 'True'}) ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text[1], 'isMerging': 'False', 'isPredefined': 'True'}) elif len(next_text) > 1 and next_id.split('_')[0] == 'parallelGateway': for next in next_text: ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next, 'isMerging': merge, 'isPredefined': 'True'}) elif len(next_text) == 1: ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text[0], 'isMerging': merge, 'isPredefined': 'True'}) else: ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text, 'isMerging': merge, 'isPredefined': 'True'}) i+=1 ET.SubElement(root, 'participants') # Pretty print the XML xml_str = ET.tostring(root, encoding='utf-8', method='xml') pretty_xml_str = minidom.parseString(xml_str).toprettyxml(indent=" ") return pretty_xml_str