import datetime as datetime import hashlib import json import os import sys import pandas as pd import streamlit as st current = os.path.dirname(os.path.realpath(__file__)) parent = os.path.dirname(current) sys.path.append(parent) from helpers import ( apply_style, callback_add_to_multiselect, choose_text_menu, do_prent, find_event_types, get_additional_words, get_idx_column, get_nli_limit, get_num_sentences_in_list_text, get_top_k, initiate_widget_st_state, run_prent, ) # Set constant values TOP_K = get_top_k() NLI_LIMIT = get_nli_limit() ### Styling # Needs to be done first apply_style() # Avoid having ellipsis in the multi select options styl = """ """ st.markdown(styl, unsafe_allow_html=True) # Set color of multiselect to red st.markdown( """ """, unsafe_allow_html=True, ) def validated_metric_per_event_types(validated_dataset): """Compute the accuracy metrics of the validated dataset for each event type. Compute True Positive, False Negative, True Negative, False Positive. :param validated_dataset: Dictionary containing results of PRENT validated by the user :type validated_dataset: dict :return: Dictionnary containing accuracy metric for all event types :rtype: dict """ dict_acc = {} for key, val in validated_dataset.items(): # Compute the event types based on the computed templates of PRENT pred_event_types = find_event_types( st.session_state.codebook, val["filled_templates"] ) true_event_types = val["event_types"] # Compute only accuracy for accepted samples if val["decision"] == "Accept": # Iterate over all possible event types for event_type in st.session_state.codebook["events"].keys(): dict_acc.setdefault(event_type, {}) dict_acc[event_type].setdefault("TP", 0) dict_acc[event_type].setdefault("FN", 0) dict_acc[event_type].setdefault("FP", 0) dict_acc[event_type].setdefault("TN", 0) if (event_type in true_event_types) and ( event_type in pred_event_types ): dict_acc[event_type]["TP"] += 1 elif (event_type in true_event_types) and not ( event_type in pred_event_types ): dict_acc[event_type]["FN"] += 1 elif not (event_type in true_event_types) and ( event_type in pred_event_types ): dict_acc[event_type]["FP"] += 1 else: dict_acc[event_type]["TN"] += 1 # Normalize metrics if dict_acc: for event_type in st.session_state.codebook["events"].keys(): dict_acc[event_type]["Accuracy"] = ( dict_acc[event_type]["TP"] + dict_acc[event_type]["TN"] ) / ( dict_acc[event_type]["TP"] + dict_acc[event_type]["TN"] + dict_acc[event_type]["FP"] + dict_acc[event_type]["FN"] ) return dict_acc def store_validated_data( text, decision, text_idx, templates, additional_words, list_event_type, prent_params=(TOP_K, NLI_LIMIT), ): """Function used to store the results of PRENT in a DataFrame and in the session state of Streamlit. :param text: Event description :type text: string :param decision: Decision of the user (Accept/Reject/Ignore) :type decision: string :param text_idx: Index of the event :type text_idx: int :param templates: List of template used :type templates: list :param additional_words: List of additional words used :type additional_words: list :param list_event_type: List of event type found by PRENT and Codebook :type list_event_type: list :param prent_params: Parameters of PRENT, defaults to (TOP_K, NLI_LIMIT) :type prent_params: tuple, optional """ if "validated_data" not in st.session_state: st.session_state["validated_data"] = {} # Generate an index if the text is not coming from a csv if not text_idx: # Create a hash of 8 digits of the text to put as index data_idx = str( "manual_{}".format( int( hashlib.sha256(text.encode("utf-8")).hexdigest(), 16, ) % 10**8 ) ) else: data_idx = str(text_idx) if data_idx not in st.session_state["validated_data"]: st.session_state["validated_data"][data_idx] = {} st.session_state["validated_data"][data_idx]["text"] = text st.session_state["validated_data"][data_idx]["templates"] = [ template.replace("{}", "[Z]") for template in templates ] st.session_state["validated_data"][data_idx]["additional_words"] = additional_words st.session_state["validated_data"][data_idx]["event_types"] = list_event_type st.session_state["validated_data"][data_idx][ "filled_templates" ] = list_filled_templates st.session_state["validated_data"][data_idx]["decision"] = decision st.session_state["validated_data"][data_idx]["prent_params"] = prent_params ### Initialize session state variables if "codebook" not in st.session_state: st.session_state.codebook = {} st.session_state.codebook.setdefault("events", {}) st.session_state.codebook["templates"] = [] if "text" not in st.session_state: st.session_state.text = "" if "res" not in st.session_state: st.session_state.res = None if "accept_reject_text_perm" not in st.session_state: st.session_state.accept_reject_text_perm = None if "validated_data" not in st.session_state: st.session_state["validated_data"] = {} if "time_comput" not in st.session_state: st.session_state.time_comput = 20 if "rerun" not in st.session_state: st.session_state.rerun = False if "recompute_all_templates" not in st.session_state: st.session_state.recompute_all_templates = False def reset_computation_results(): """Reset cached values in session state related to computations""" st.session_state.res = {} st.session_state.recompute_all_templates = True st.session_state["accept_reject_text_perm"] = "Ignore" st.session_state.rerun = True def get_all_filled_templates(results): """Create the filled templates from PRENT results. Merging template with mask with the entailed tokens. :param results: Dictionary containing PRENT results :type results: dict :return: List of all entailed templates :rtype: list """ filled_templates = [] templates_used = [x.replace("[Z]", "{}") for x in results["templates_used"]] for template in templates_used: filled_template = [template.format(x) for x in results[template]] filled_templates.extend(filled_template) return filled_templates # Split streamlit dashboard col_intro_left, col_intro_righter = st.columns([8, 8]) with col_intro_left: st.markdown( """ # Codebook Design """ ) def load_demo( codebook_path="codebook_demo.json", validated_data_path="validated_data_demo.json", csv_data_path="data_demo.csv", ): """Load demonstration files from disk :param codebook_path: path to codebook, defaults to "codebook_demo.json" :type codebook_path: str, optional :param validated_data_path: path to validated dataset, defaults to "validated_data_demo.json" :type validated_data_path: str, optional :param csv_data_path: path to raw data, defaults to "data_demo.csv" :type csv_data_path: str, optional """ st.session_state.codebook = json.load(open(codebook_path)) st.session_state.validated_data = json.load(open(validated_data_path)) st.session_state.data = pd.read_csv(csv_data_path, delimiter=";") st.session_state.filtered_df = st.session_state.data st.session_state.text_column_design_perm = "Event Descriptions" st.session_state["multiselect_classes"] = list( st.session_state.codebook["events"].keys() ) st.session_state.text_idx = 0 st.session_state.text = ( "On 23 August, a group attacked a village, abducting 6 people." ) st.session_state.text_display = ( "On 23 August, a group attacked a village, abducting 6 people." ) st.session_state["text_options_valid_perm"] = "From CSV" st.session_state["text_options_valid"] = "From CSV" def clear_all(): """Cleare session state""" for each in st.session_state: del st.session_state[each] st.experimental_rerun() # Add two buttons in the sidebar to load and clear the demo with st.sidebar: if st.button("Load Demo"): load_demo() if st.button("Clear Demo"): clear_all() st.write("********") with st.sidebar: # Next function used for callback when download def update_codebook_save_time(): st.session_state.save_codebook_time = ( datetime.datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S %z") ) if st.download_button( label="Download codebook as JSON", data=json.dumps(st.session_state.codebook, indent=3).encode("ASCII"), file_name="codebook.json", mime="application/json", ): update_codebook_save_time() if "save_codebook_time" in st.session_state: st.write("Saved on: " + st.session_state.save_codebook_time) with st.sidebar: # Next function used for callback when download def update_validated_save_time(): st.session_state.save_validated_time = ( datetime.datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S %z") ) if st.download_button( label="Download labeled data", data=json.dumps(st.session_state["validated_data"], indent=3).encode("ASCII"), file_name="validated_data.json", mime="application/json", ): update_validated_save_time() if "save_validated_time" in st.session_state: st.write("Saved on: " + st.session_state.save_validated_time) # Add text to sidebar with st.sidebar: st.write("********") st.markdown( """ #### Manual: 1. Set the list of possible event types 2. Select the input mode of the data (Manual or CSV) 3. If the codebook is empty, write a default template - `This event involves [Z].` is a good starting point 4. Write/Select an event description 5. Run PR-ENT 6. Check the event type classification - If it is correct then select Accept and return to step 4. - If it is wrong then select Reject and populate the codebook with the appropriate filled templates. The classification is updated for each change, when it is correct, click Accept. 7. Return to step 4 #### Tips & Tricks: - If you start a codebook from scratch, it may be easier to pass a manual text example for each event type to get a first codebook draft - Current codebook accuracy based on labeled data can be found in the top right - The approach does not aim for perfect accuracy and some failures can happen, e.g. some event descriptions can produce filled templates that are not satisfactory. """ ) # Add accuracy table with col_intro_righter: accuracy = st.empty() # We fill the table with the last acc to avoid having it disappearing each time if "acc_df" in st.session_state: accuracy.table( st.session_state.acc_df.loc["Accuracy":"Accuracy"].style.format("{:.2}") ) performance_container = st.expander("Detailed Performances") st.write("*********") col_left, col_right = st.columns(2) # Add widgets to add event type and choose text input with col_intro_left: with st.expander("Event Types List"): st.markdown( """ ## Select Event Types. """ ) if "class_list_perm" not in st.session_state: st.session_state["class_list_perm"] = [] # Text field + button to add new event types to multiselect new_class = st.text_input( "Add a new event type", "", key="new_class_text_input" ) st.button( "Add Class", on_click=callback_add_to_multiselect, args=( new_class, "multiselect_classes", "new_class_text_input", "class_list_perm", ), ) # Multiselect to choose event types if "multiselect_classes" not in st.session_state: st.session_state["multiselect_classes"] = list( st.session_state.codebook["events"].keys() ) class_list = st.multiselect( "Event Type List", set( st.session_state["class_list_perm"] + list(st.session_state.codebook["events"].keys()) ), st.session_state["multiselect_classes"], key="multiselect_classes", ) st.session_state["class_list_perm"] = class_list with st.expander("Select Text Input Mode (Manual, CSV)"): st.write( """ Choose the text input of the event descriptions. Three choices: - Manual: One event description can be manually input - From CSV: If a CSV of event descriptions was provided """ ) def callback_radio_text_choice(): st.session_state.text = "" st.session_state.text_display = "" initiate_widget_st_state( "text_options_valid", "text_options_valid_perm", "Manual" ) st.session_state["text_options_valid_perm"] = st.radio( "Choose text input", ["Manual", "From CSV"], index=get_idx_column( st.session_state["text_options_valid"], ["Manual", "From CSV"] ), key="text_options_valid", on_change=callback_radio_text_choice, horizontal=True, ) with col_left: if st.session_state["text_options_valid_perm"] == "Manual": text = choose_text_menu("") # Reset all computations if text has changed if text != st.session_state.text: reset_computation_results() st.session_state.text_idx = None st.session_state.text = text st.session_state.text_display = text elif st.session_state["text_options_valid_perm"] == "From CSV": if st.button("Select Random Text"): sample = st.session_state.filtered_df.sample(n=1).iloc[0] text = sample[st.session_state["text_column_design_perm"]] idx = sample.name if text != st.session_state.text: reset_computation_results() st.session_state.text = text st.session_state.text_idx = idx st.session_state.text_display = st.session_state.text expected_time = st.session_state.time_comput * get_num_sentences_in_list_text( [st.session_state.text] ) if st.button("Run PR-ENT / Expected time: {}sec".format(expected_time)): if "templates" in st.session_state.codebook: templates = st.session_state.codebook["templates"] else: templates = [] st.warning("No template in codebook. Please add one.") additional_words = get_additional_words() st.session_state.res = {} res, time_comput = run_prent(st.session_state.text, templates, additional_words) st.session_state.res = res st.write("**Event Descriptions:** {}".format(st.session_state.text_display)) ev_desc = st.empty() radio_empty = st.empty() if st.session_state.res: list_filled_templates = get_all_filled_templates(st.session_state.res) list_event_type = find_event_types( st.session_state.codebook, list_filled_templates ) event_type_text = ev_desc.markdown( "**Current Event Types Classification**: {}".format( "; ".join(list_event_type) ) ) if "accept_reject_text_perm" not in st.session_state: st.session_state["accept_reject_text_perm"] = "Ignore" def callback_function(mod, key): st.session_state[mod] = st.session_state[key] radio_empty.radio( "Accept or Reject Coding", ["Ignore", "Accept", "Reject"], key="accept_reject_text", on_change=callback_function, args=( "accept_reject_text_perm", "accept_reject_text", ), index=get_idx_column( st.session_state["accept_reject_text_perm"], ["Ignore", "Accept", "Reject"], ), horizontal=True, ) decision = st.session_state["accept_reject_text_perm"] text_idx = st.session_state.text_idx text = st.session_state.text store_validated_data( text, decision, text_idx, st.session_state.res["templates_used"], st.session_state.res["additional_words_used"], list_event_type, prent_params=(TOP_K, NLI_LIMIT), ) with col_right: if ( st.session_state["accept_reject_text_perm"] == "Reject" ) or not st.session_state.codebook["templates"]: with st.expander("Add Templates + Explanation"): st.markdown( """ ## Add Templates """ ) st.markdown( """ For each template added. PR-ENT will be run on the selected text. """ ) if "templates" not in st.session_state.codebook: st.session_state.codebook["templates"] = [] template = st.text_input( "Template with a mask [Z].", "This event involves [Z]." ) if st.button("Add template"): if template not in st.session_state.codebook["templates"]: ## Add template to codebook st.session_state.codebook["templates"].append(template) additional_words = get_additional_words() prompt = template.replace("[Z]", "{}") results_nli, _ = do_prent( st.session_state.text, prompt, TOP_K, NLI_LIMIT, additional_words, ) tokens_nli = [x[0] for x in results_nli] # Update result table with new template if not st.session_state["res"]: st.session_state.res = {} st.session_state.res["additional_words_used"] = additional_words st.session_state.res["templates_used"] = [] st.session_state.res[prompt] = tokens_nli st.session_state.res["templates_used"].append(template) st.write("Template '{}' added.".format(template)) else: st.write("Template '{}' already added.".format(template)) if st.session_state.codebook["templates"]: with st.expander("Populate Codebook Explanation"): st.markdown( """ ## Set the filled template to each class. For each class you can select one or more filled templates. When the evaluation will be made, these templates will be compared with the results of PR-ENT. There are 4 options: - ALL: If **ALL** of these filled templates are present in the results of PR-ENT then this event type is correct - ANY: If **ANY** of these filled templates is present in the results of PR-ENT then this event type is correct - NOT ALL: If **ALL** of these filled templates are present in the results of PR-ENT, then this event type is **not** correct - e.g. You may want to remove all *explosions* events from a class *Killings*. - NOT ANY: If **ANY** of these filled templates is present in the results of PR-ENT, then this event type is **not** correct Moreover, **ANY/ALL** and **NOT ANY/ NOT ALL** can be made in relation by a **AND / OR** condition. """ ) st.write("***************") st.write("### Populate Codebook") if not class_list: st.warning("No event type in codebook.") tokens_list = get_all_filled_templates(st.session_state.res) for event_type in class_list: st.session_state.codebook["events"].setdefault(event_type, {}) event_type_chosen = event_type with st.expander(event_type): def declare_ms_event_templates( widget_key, widget_display, codebook_key ): if widget_key not in st.session_state: st.session_state[widget_key] = st.session_state.codebook[ "events" ][event_type_chosen].setdefault(codebook_key, []) tokens_all = st.multiselect( widget_display, set( list( tokens_list + st.session_state.codebook["events"][ event_type_chosen ].setdefault(codebook_key, []) ) ), st.session_state[widget_key], key=widget_key, ) st.session_state.codebook["events"][event_type_chosen][ codebook_key ] = tokens_all declare_ms_event_templates( "ms_all_{}".format(event_type_chosen), "ALL", "all" ) st.session_state.codebook["events"][event_type_chosen][ "all_any_rel" ] = st.selectbox( "Relation", ["AND", "OR"], index=get_idx_column( st.session_state.codebook["events"][ event_type_chosen ].setdefault("all_any_rel", "OR"), ["AND", "OR"], ), key="select_relation_any_all_{}".format(event_type_chosen), ) declare_ms_event_templates( "ms_any_{}".format(event_type_chosen), "ANY", "any" ) declare_ms_event_templates( "ms_not_all_{}".format(event_type_chosen), "NOT ALL", "not_all" ) st.session_state.codebook["events"][event_type_chosen][ "not_all_any_rel" ] = st.selectbox( "Relation", ["AND", "OR"], index=get_idx_column( st.session_state.codebook["events"][ event_type_chosen ].setdefault("not_all_any_rel", "OR"), ["AND", "OR"], ), key="select_relation_not_any_all_{}".format(event_type_chosen), ) declare_ms_event_templates( "ms_not_any_{}".format(event_type_chosen), "NOT ANY", "not_any" ) # Workaround to avoid the expanders closing after first modification # I have no explanation for the bug if st.session_state.rerun: st.session_state.rerun = False st.experimental_rerun() if "validated_data" in st.session_state: recompute = False performance_container.markdown( "If a new template is added, the previous labeled samples needs to be recomputed with it. The next button allows that, however it can take some time depending on the number of samples." ) if performance_container.button( "Recompute Missing Templates", key="recompute_temp" ): prog_bar = performance_container.progress(0) for i, datapoint in enumerate(st.session_state["validated_data"].values()): if not set(st.session_state.codebook["templates"]).issubset( set(datapoint["templates"]) ): # Get templates that are missing from results but present in codebook # These happens if templates are added a posteriori missing_templates = list( set(st.session_state.codebook["templates"]) - set(set(datapoint["templates"])) ) recompute = True # For now additional words are not recomputed if not set(st.session_state.codebook["add_words"]).issubset( set(datapoint["additional_words"]) ): missing_add_words = list( set(st.session_state.codebook["add_words"]) - set(set(datapoint["additional_words"])) ) recompute = True else: missing_add_words = None if recompute: res, _ = run_prent( datapoint["text"], missing_templates, missing_add_words, progress=False, ) datapoint["filled_templates"].extend(get_all_filled_templates(res)) datapoint["templates"].extend(missing_templates) prog_bar.progress( (1 / len(st.session_state["validated_data"].values())) * (i + 1) ) st.session_state.acc_df = pd.DataFrame( validated_metric_per_event_types(st.session_state["validated_data"]) ) accuracy.table( st.session_state.acc_df.loc["Accuracy":"Accuracy"].style.format("{:.2}") ) performance_container.markdown("### Performances on labeled dataset") performance_container.dataframe(st.session_state.acc_df.style.format("{:.3}")) if st.session_state.res: list_filled_templates = get_all_filled_templates(st.session_state.res) list_event_type = find_event_types(st.session_state.codebook, list_filled_templates) ev_desc.markdown( "**Current Event Types Classification**: {}".format("; ".join(list_event_type)) )