Spaces:
Sleeping
Sleeping
""" | |
Helper methods for the Presidio Streamlit app | |
""" | |
from typing import List, Optional | |
import spacy | |
import streamlit as st | |
from presidio_analyzer import AnalyzerEngine, RecognizerResult, RecognizerRegistry | |
from presidio_analyzer.nlp_engine import NlpEngineProvider | |
from presidio_anonymizer import AnonymizerEngine | |
from presidio_anonymizer.entities import OperatorConfig | |
from flair_recognizer import FlairRecognizer | |
from openai_fake_data_generator import ( | |
set_openai_key, | |
call_completion_model, | |
create_prompt, | |
) | |
from transformers_rec import ( | |
STANFORD_COFIGURATION, | |
TransformersRecognizer, | |
BERT_DEID_CONFIGURATION, | |
) | |
def analyzer_engine(model_path: str): | |
"""Return AnalyzerEngine. | |
:param model_path: Which model to use for NER: | |
"StanfordAIMI/stanford-deidentifier-base", | |
"obi/deid_roberta_i2b2", | |
"en_core_web_lg" | |
""" | |
registry = RecognizerRegistry() | |
registry.load_predefined_recognizers() | |
# Set up NLP Engine according to the model of choice | |
if model_path == "en_core_web_lg": | |
if not spacy.util.is_package("en_core_web_lg"): | |
spacy.cli.download("en_core_web_lg") | |
nlp_configuration = { | |
"nlp_engine_name": "spacy", | |
"models": [{"lang_code": "en", "model_name": "en_core_web_lg"}], | |
} | |
elif model_path == "flair/ner-english-large": | |
flair_recognizer = FlairRecognizer() | |
nlp_configuration = { | |
"nlp_engine_name": "spacy", | |
"models": [{"lang_code": "en", "model_name": "en_core_web_sm"}], | |
} | |
registry.add_recognizer(flair_recognizer) | |
registry.remove_recognizer("SpacyRecognizer") | |
else: | |
if not spacy.util.is_package("en_core_web_sm"): | |
spacy.cli.download("en_core_web_sm") | |
# Using a small spaCy model + a HF NER model | |
transformers_recognizer = TransformersRecognizer(model_path=model_path) | |
registry.remove_recognizer("SpacyRecognizer") | |
if model_path == "StanfordAIMI/stanford-deidentifier-base": | |
transformers_recognizer.load_transformer(**STANFORD_COFIGURATION) | |
elif model_path == "obi/deid_roberta_i2b2": | |
transformers_recognizer.load_transformer(**BERT_DEID_CONFIGURATION) | |
# Use small spaCy model, no need for both spacy and HF models | |
# The transformers model is used here as a recognizer, not as an NlpEngine | |
nlp_configuration = { | |
"nlp_engine_name": "spacy", | |
"models": [{"lang_code": "en", "model_name": "en_core_web_sm"}], | |
} | |
registry.add_recognizer(transformers_recognizer) | |
nlp_engine = NlpEngineProvider(nlp_configuration=nlp_configuration).create_engine() | |
analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry) | |
return analyzer | |
def anonymizer_engine(): | |
"""Return AnonymizerEngine.""" | |
return AnonymizerEngine() | |
def get_supported_entities(st_model: str): | |
"""Return supported entities from the Analyzer Engine.""" | |
return analyzer_engine(st_model).get_supported_entities() | |
def analyze(st_model: str, **kwargs): | |
"""Analyze input using Analyzer engine and input arguments (kwargs).""" | |
if "entities" not in kwargs or "All" in kwargs["entities"]: | |
kwargs["entities"] = None | |
return analyzer_engine(st_model).analyze(**kwargs) | |
def anonymize( | |
text: str, | |
operator: str, | |
analyze_results: List[RecognizerResult], | |
mask_char: Optional[str] = None, | |
number_of_chars: Optional[str] = None, | |
encrypt_key: Optional[str] = None, | |
): | |
"""Anonymize identified input using Presidio Anonymizer. | |
:param text: Full text | |
:param operator: Operator name | |
:param mask_char: Mask char (for mask operator) | |
:param number_of_chars: Number of characters to mask (for mask operator) | |
:param encrypt_key: Encryption key (for encrypt operator) | |
:param analyze_results: list of results from presidio analyzer engine | |
""" | |
if operator == "mask": | |
operator_config = { | |
"type": "mask", | |
"masking_char": mask_char, | |
"chars_to_mask": number_of_chars, | |
"from_end": False, | |
} | |
# Define operator config | |
elif operator == "encrypt": | |
operator_config = {"key": encrypt_key} | |
elif operator == "highlight": | |
operator_config = {"lambda": lambda x: x} | |
else: | |
operator_config = None | |
# Change operator if needed as intermediate step | |
if operator == "highlight": | |
operator = "custom" | |
elif operator == "synthesize": | |
operator = "replace" | |
else: | |
operator = operator | |
res = anonymizer_engine().anonymize( | |
text, | |
analyze_results, | |
operators={"DEFAULT": OperatorConfig(operator, operator_config)}, | |
) | |
return res | |
def annotate(text: str, analyze_results: List[RecognizerResult]): | |
"""Highlight the identified PII entities on the original text | |
:param text: Full text | |
:param analyze_results: list of results from presidio analyzer engine | |
""" | |
tokens = [] | |
# Use the anonymizer to resolve overlaps | |
results = anonymize( | |
text=text, | |
operator="highlight", | |
analyze_results=analyze_results, | |
) | |
# sort by start index | |
results = sorted(results.items, key=lambda x: x.start) | |
for i, res in enumerate(results): | |
if i == 0: | |
tokens.append(text[: res.start]) | |
# append entity text and entity type | |
tokens.append((text[res.start : res.end], res.entity_type)) | |
# if another entity coming i.e. we're not at the last results element, add text up to next entity | |
if i != len(results) - 1: | |
tokens.append(text[res.end : results[i + 1].start]) | |
# if no more entities coming, add all remaining text | |
else: | |
tokens.append(text[res.end :]) | |
return tokens | |
def create_fake_data( | |
text: str, | |
analyze_results: List[RecognizerResult], | |
openai_key: str, | |
openai_model_name: str, | |
): | |
"""Creates a synthetic version of the text using OpenAI APIs""" | |
if not openai_key: | |
return "Please provide your OpenAI key" | |
results = anonymize(text=text, operator="replace", analyze_results=analyze_results) | |
set_openai_key(openai_key) | |
prompt = create_prompt(results.text) | |
fake = call_openai_api(prompt, openai_model_name) | |
return fake | |
def call_openai_api(prompt: str, openai_model_name: str) -> str: | |
fake_data = call_completion_model(prompt, model=openai_model_name) | |
return fake_data | |