"""Streamlit app for Presidio.""" from json import JSONEncoder from typing import List import pandas as pd import spacy import streamlit as st from annotated_text import annotated_text from presidio_analyzer import AnalyzerEngine, RecognizerResult, RecognizerRegistry from presidio_analyzer.nlp_engine import NlpEngineProvider from presidio_anonymizer import AnonymizerEngine from presidio_anonymizer.entities import OperatorConfig from transformers_rec import ( STANFORD_COFIGURATION, TransformersRecognizer, BERT_DEID_CONFIGURATION, ) # Helper methods @st.cache def analyzer_engine(model_path: str): """Return AnalyzerEngine. :param model_path: Which model to use for NER: "StanfordAIMI/stanford-deidentifier-base", "obi/deid_roberta_i2b2", "en_core_web_lg" """ registry = RecognizerRegistry() registry.load_predefined_recognizers() # Set up NLP Engine according to the model of choice if model_path == "en_core_web_lg": if not spacy.util.is_package("en_core_web_lg"): spacy.cli.download("en_core_web_lg") nlp_configuration = { "nlp_engine_name": "spacy", "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}], } else: if not spacy.util.is_package("en_core_web_sm"): spacy.cli.download("en_core_web_sm") # Using a small spaCy model + a HF NER model transformers_recognizer = TransformersRecognizer(model_path=model_path) if model_path == "StanfordAIMI/stanford-deidentifier-base": transformers_recognizer.load_transformer(**STANFORD_COFIGURATION) elif model_path == "obi/deid_roberta_i2b2": transformers_recognizer.load_transformer(**BERT_DEID_CONFIGURATION) # Use small spaCy model, no need for both spacy and HF models # The transformers model is used here as a recognizer, not as an NlpEngine nlp_configuration = { "nlp_engine_name": "spacy", "models": [{"lang_code": "en", "model_name": "en_core_web_sm"}], } registry.add_recognizer(transformers_recognizer) nlp_engine = NlpEngineProvider(nlp_configuration=nlp_configuration).create_engine() analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry) return analyzer @st.cache def anonymizer_engine(): """Return AnonymizerEngine.""" return AnonymizerEngine() @st.cache def get_supported_entities(): """Return supported entities from the Analyzer Engine.""" return analyzer_engine(st_model).get_supported_entities() @st.cache def analyze(**kwargs): """Analyze input using Analyzer engine and input arguments (kwargs).""" if "entities" not in kwargs or "All" in kwargs["entities"]: kwargs["entities"] = None return analyzer_engine(st_model).analyze(**kwargs) def anonymize(text: str, analyze_results: List[RecognizerResult]): """Anonymize identified input using Presidio Anonymizer. :param text: Full text :param analyze_results: list of results from presidio analyzer engine """ if st_operator == "mask": operator_config = { "type": "mask", "masking_char": st_mask_char, "chars_to_mask": st_number_of_chars, "from_end": False, } elif st_operator == "encrypt": operator_config = {"key": st_encrypt_key} elif st_operator == "highlight": operator_config = {"lambda": lambda x: x} else: operator_config = None if st_operator == "highlight": operator = "custom" else: operator = st_operator res = anonymizer_engine().anonymize( text, analyze_results, operators={"DEFAULT": OperatorConfig(operator, operator_config)}, ) return res def annotate(text: str, analyze_results: List[RecognizerResult]): """ Highlights every identified entity on top of the text. :param text: full text :param analyze_results: list of analyzer results. """ tokens = [] # Use the anonymizer to resolve overlaps results = anonymize(text, analyze_results) # sort by start index results = sorted(results.items, key=lambda x: x.start) for i, res in enumerate(results): if i == 0: tokens.append(text[: res.start]) # append entity text and entity type tokens.append((text[res.start: res.end], res.entity_type)) # if another entity coming i.e. we're not at the last results element, add text up to next entity if i != len(results) - 1: tokens.append(text[res.end: results[i + 1].start]) # if no more entities coming, add all remaining text else: tokens.append(text[res.end:]) return tokens st.set_page_config(page_title="Presidio demo", layout="wide") # Sidebar st.sidebar.header( """ PII De-Identification with Microsoft Presidio """ ) st.sidebar.info( "Presidio is an open source customizable framework for PII detection and de-identification\n" "[Code](https://aka.ms/presidio) | " "[Tutorial](https://microsoft.github.io/presidio/tutorial/) | " "[Installation](https://microsoft.github.io/presidio/installation/) | " "[FAQ](https://microsoft.github.io/presidio/faq/)", icon="ℹ️", ) st.sidebar.markdown( "[![Pypi Downloads](https://img.shields.io/pypi/dm/presidio-analyzer.svg)](https://img.shields.io/pypi/dm/presidio-analyzer.svg)" "[![MIT license](https://img.shields.io/badge/license-MIT-brightgreen.svg)](http://opensource.org/licenses/MIT)" "![GitHub Repo stars](https://img.shields.io/github/stars/microsoft/presidio?style=social)" ) st_model = st.sidebar.selectbox( "NER model", [ "StanfordAIMI/stanford-deidentifier-base", "obi/deid_roberta_i2b2", "en_core_web_lg", ], index=1, ) st.sidebar.markdown("> Note: Models might take some time to download. ") st_operator = st.sidebar.selectbox( "De-identification approach", ["redact", "replace", "mask", "hash", "encrypt", "highlight"], index=1, ) if st_operator == "mask": st_number_of_chars = st.sidebar.number_input( "number of chars", value=15, min_value=0, max_value=100 ) st_mask_char = st.sidebar.text_input("Mask character", value="*", max_chars=1) elif st_operator == "encrypt": st_encrypt_key = st.sidebar.text_input("AES key", value="WmZq4t7w!z%C&F)J") st_threshold = st.sidebar.slider( label="Acceptance threshold", min_value=0.0, max_value=1.0, value=0.35 ) st_return_decision_process = st.sidebar.checkbox( "Add analysis explanations to findings", value=False ) st_entities = st.sidebar.multiselect( label="Which entities to look for?", options=get_supported_entities(), default=list(get_supported_entities()), ) # Main panel analyzer_load_state = st.info("Starting Presidio analyzer...") engine = analyzer_engine(model_path=st_model) analyzer_load_state.empty() # Read default text with open("demo_text.txt") as f: demo_text = f.readlines() # Create two columns for before and after col1, col2 = st.columns(2) # Before: col1.subheader("Input string:") st_text = col1.text_area( label="Enter text", value="".join(demo_text), height=400, ) st_analyze_results = analyze( text=st_text, entities=st_entities, language="en", score_threshold=st_threshold, return_decision_process=st_return_decision_process, ) # After if st_operator != "highlight": with col2: st.subheader(f"Output") st_anonymize_results = anonymize(st_text, st_analyze_results) st.text_area(label="De-identified", value=st_anonymize_results.text, height=400) else: st.subheader("Highlighted") annotated_tokens = annotate(st_text, st_analyze_results) # annotated_tokens annotated_text(*annotated_tokens) # json result class ToDictEncoder(JSONEncoder): """Encode dict to json.""" def default(self, o): """Encode to JSON using to_dict.""" return o.to_dict() # table result st.subheader( "Findings" if not st_return_decision_process else "Findings with decision factors" ) if st_analyze_results: df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results]) df["text"] = [st_text[res.start: res.end] for res in st_analyze_results] df_subset = df[["entity_type", "text", "start", "end", "score"]].rename( { "entity_type": "Entity type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence", }, axis=1, ) df_subset["Text"] = [st_text[res.start: res.end] for res in st_analyze_results] if st_return_decision_process: analysis_explanation_df = pd.DataFrame.from_records( [r.analysis_explanation.to_dict() for r in st_analyze_results] ) df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1) st.dataframe(df_subset.reset_index(drop=True), use_container_width=True) else: st.text("No findings")