Spaces:
Sleeping
Sleeping
from pprint import pprint | |
import json | |
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry | |
from presidio_analyzer.nlp_engine import NlpEngineProvider, NlpArtifacts | |
from presidio_analyzer import PatternRecognizer | |
from presidio_analyzer import Pattern, PatternRecognizer | |
from presidio_analyzer.predefined_recognizers import SpacyRecognizer | |
from presidio_analyzer.predefined_recognizers import IbanRecognizer, EmailRecognizer, IpRecognizer,\ | |
EmailRecognizer, PhoneRecognizer, UrlRecognizer, DateRecognizer | |
from presidio_anonymizer import AnonymizerEngine | |
from presidio_anonymizer.entities import OperatorConfig | |
import logging | |
from typing import Optional, List, Tuple, Set | |
from presidio_analyzer import ( | |
RecognizerResult, | |
EntityRecognizer, | |
AnalysisExplanation, | |
) | |
from flair.data import Sentence | |
from flair.models import SequenceTagger | |
### Creating FlairRecognizer class for NER(names, location) | |
class FlairRecognizer(EntityRecognizer): | |
ENTITIES = [ | |
"LOCATION", | |
"PERSON", | |
"ORGANIZATION", | |
# "MISCELLANEOUS" # - There are no direct correlation with Presidio entities. | |
] | |
DEFAULT_EXPLANATION = "Identified as {} by Flair's Named Entity Recognition" | |
CHECK_LABEL_GROUPS = [ | |
({"LOCATION"}, {"LOC", "LOCATION"}), | |
({"PERSON"}, {"PER", "PERSON"}), | |
({"ORGANIZATION"}, {"ORG"}), | |
# ({"MISCELLANEOUS"}, {"MISC"}), # Probably not PII | |
] | |
MODEL_LANGUAGES = { | |
#"en": "flair/ner-english-large", | |
#"es": "flair/ner-spanish-large", | |
"de": "flair/ner-german-large", | |
#"nl": "flair/ner-dutch-large", | |
} | |
PRESIDIO_EQUIVALENCES = { | |
"PER": "PERSON", | |
"LOC": "LOCATION", | |
"ORG": "ORGANIZATION", | |
# 'MISC': 'MISCELLANEOUS' # - Probably not PII | |
} | |
def __init__( | |
self, | |
supported_language: str = "en", | |
supported_entities: Optional[List[str]] = None, | |
check_label_groups: Optional[Tuple[Set, Set]] = None, | |
model: SequenceTagger = None, | |
): | |
self.check_label_groups = ( | |
check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS | |
) | |
supported_entities = supported_entities if supported_entities else self.ENTITIES | |
self.model = ( | |
model | |
if model | |
else SequenceTagger.load(self.MODEL_LANGUAGES.get(supported_language)) | |
) | |
super().__init__( | |
supported_entities=supported_entities, | |
supported_language=supported_language, | |
name="Flair Analytics", | |
) | |
print("Flair class initialized") | |
def load(self) -> None: | |
"""Load the model, not used. Model is loaded during initialization.""" | |
pass | |
def get_supported_entities(self) -> List[str]: | |
""" | |
Return supported entities by this model. | |
:return: List of the supported entities. | |
""" | |
return self.supported_entities | |
# Class to use Flair with Presidio as an external recognizer. | |
def analyze( | |
self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts = None | |
) -> List[RecognizerResult]: | |
""" | |
Analyze text using Text Analytics. | |
:param text: The text for analysis. | |
:param entities: Not working properly for this recognizer. | |
:param nlp_artifacts: Not used by this recognizer. | |
:param language: Text language. Supported languages in MODEL_LANGUAGES | |
:return: The list of Presidio RecognizerResult constructed from the recognized | |
Flair detections. | |
""" | |
results = [] | |
sentences = Sentence(text) | |
self.model.predict(sentences) | |
# If there are no specific list of entities, we will look for all of it. | |
if not entities: | |
entities = self.supported_entities | |
for entity in entities: | |
if entity not in self.supported_entities: | |
continue | |
for ent in sentences.get_spans("ner"): | |
if not self.__check_label( | |
entity, ent.labels[0].value, self.check_label_groups | |
): | |
continue | |
textual_explanation = self.DEFAULT_EXPLANATION.format( | |
ent.labels[0].value | |
) | |
explanation = self.build_flair_explanation( | |
round(ent.score, 2), textual_explanation | |
) | |
flair_result = self._convert_to_recognizer_result(ent, explanation) | |
results.append(flair_result) | |
return results | |
def _convert_to_recognizer_result(self, entity, explanation) -> RecognizerResult: | |
entity_type = self.PRESIDIO_EQUIVALENCES.get(entity.tag, entity.tag) | |
flair_score = round(entity.score, 2) | |
flair_results = RecognizerResult( | |
entity_type=entity_type, | |
start=entity.start_position, | |
end=entity.end_position, | |
score=flair_score, | |
analysis_explanation=explanation, | |
) | |
return flair_results | |
def build_flair_explanation( | |
self, original_score: float, explanation: str | |
) -> AnalysisExplanation: | |
""" | |
Create explanation for why this result was detected. | |
:param original_score: Score given by this recognizer | |
:param explanation: Explanation string | |
:return: | |
""" | |
explanation = AnalysisExplanation( | |
recognizer=self.__class__.__name__, | |
original_score=original_score, | |
textual_explanation=explanation, | |
) | |
return explanation | |
def __check_label( | |
entity: str, label: str, check_label_groups: Tuple[Set, Set] | |
) -> bool: | |
return any( | |
[entity in egrp and label in lgrp for egrp, lgrp in check_label_groups] | |
) | |
class PIIService: | |
def __init__(self): | |
configuration = { | |
"nlp_engine_name": "spacy", | |
"models": [ | |
{"lang_code": "de", "model_name": "de_core_news_sm"} | |
], | |
} | |
# Create NLP engine based on configuration | |
provider = NlpEngineProvider(nlp_configuration=configuration) | |
nlp_engine = provider.create_engine() | |
## Creating regex for PatternRecognizers - SWIFT, vehicle number, zipcode, ssn | |
swift_regex = r"\b[A-Z]{4}DE[A-Z0-9]{2}(?:[A-Z0-9]{3})?" | |
vehicle_number_with_hyphen_regex = r"\b[A-ZÄÖÜ]{1,3}-[A-ZÄÖÜ]{1,2}-[0-9]{1,4}" | |
vehicle_number_without_hyphen_regex = r"\b[A-ZÄÖÜ]{1,3}[A-ZÄÖÜ]{1,2}[0-9]{1,4}" | |
german_zipcode_regex = r"\b((?:0[1-46-9]\d{3})|(?:[1-357-9]\d{4})|(?:[4][0-24-9]\d{3})|(?:[6][013-9]\d{3}))\b(?![\d/])" | |
german_ssn_regex = r"\b\d{2}\s?\d{6}\s?[A-Z]\s?\d{3}\b" | |
# Creating Presidio pattern object | |
vehicle_numbers_pattern1 = Pattern(name="vehicle_pattern", regex=vehicle_number_without_hyphen_regex, score=1) | |
vehicle_numbers_pattern2 = Pattern(name="vehicle_pattern", regex=vehicle_number_with_hyphen_regex, score=1) | |
swift_pattern = Pattern(name="bank_swift_pattern", regex=swift_regex, score=1) | |
germanzipcode_pattern = Pattern(name="german_zip_pattern",regex=german_zipcode_regex, score=1) | |
german_ssn_pattern = Pattern(name="german_ssn_pattern",regex=german_ssn_regex, score=1) | |
# Define the recognizer | |
swift_recognizer = PatternRecognizer(supported_entity="SWIFT", supported_language="de",patterns=[swift_pattern]) | |
vehicle_number_recognizer = PatternRecognizer(supported_entity="VEHICLE_NUMBER", supported_language="de",patterns=[vehicle_numbers_pattern1,vehicle_numbers_pattern2]) | |
germanzip_recognizer = PatternRecognizer(supported_entity="GERMAN_ZIP", supported_language="de",patterns=[germanzipcode_pattern]) | |
germanssn_recognizer = PatternRecognizer(supported_entity="GERMAN_SSN", supported_language="de",patterns=[german_ssn_pattern]) | |
## Lading flair entity model for person, location ID | |
print("Loading flair") | |
flair_recognizer = FlairRecognizer(supported_language="de") | |
print("Flair loaded") | |
registry = RecognizerRegistry() | |
#registry.load_predefined_recognizers() | |
#registry.add_recognizer(SpacyRecognizer(supported_language="de")) | |
#registry.add_recognizer(SpacyRecognizer(supported_language="en")) | |
registry.remove_recognizer("SpacyRecognizer") | |
registry.add_recognizer(flair_recognizer) | |
registry.add_recognizer(swift_recognizer) | |
registry.add_recognizer(vehicle_number_recognizer) | |
registry.add_recognizer(germanzip_recognizer) | |
registry.add_recognizer(germanssn_recognizer) | |
## Adding predefined recognizers | |
registry.add_recognizer(IbanRecognizer(supported_language="de")) | |
registry.add_recognizer(DateRecognizer(supported_language="de")) | |
registry.add_recognizer(EmailRecognizer(supported_language="de")) | |
registry.add_recognizer(IpRecognizer(supported_language="de")) | |
registry.add_recognizer(PhoneRecognizer(supported_language="de")) | |
registry.add_recognizer(UrlRecognizer(supported_language="de")) | |
registry.add_recognizer(PhoneRecognizer(supported_language="de")) | |
print("Recognizer registry loaded") | |
self.analyzer = AnalyzerEngine(registry=registry, nlp_engine=nlp_engine, supported_languages=["de"]) | |
#print(f"Type of recognizers ::\n {self.analyzer.registry.recognizers}") | |
print("PII initialized") | |
self.anonymizer = AnonymizerEngine() | |
def identify(self, text): | |
results_de = self.analyzer.analyze( | |
text, | |
language='de' | |
) | |
#anonymized_results = self.anonymize(results_de, text) | |
entities = [] | |
for result in results_de: | |
result_dict = result.to_dict() | |
temp_entity = { | |
"start":result_dict['start'], | |
"end":result_dict['end'], | |
"entity_type":result_dict['entity_type'], | |
"score":result_dict['score'], | |
"word":text[result_dict['start']:result_dict['end']] | |
} | |
entities.append(temp_entity) | |
return {"entities":entities, "text":text}#, "anonymized_text":anonymized_results['text']} | |
"""def anonymize(self, entities, text): | |
anonymized_results = self.anonymizer.anonymize( | |
text=text, | |
analyzer_results=entities, | |
#operators={"DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"})}, | |
) | |
return ""#json.loads(anonymized_results.to_json())""" | |
def add_mask(self, data): | |
masked_data = [] | |
entity_count = {} | |
for item_idx,item in enumerate(data['entities']): | |
entity_type = item['entity_type'] | |
word = item['word'] | |
suffix = entity_count.get(entity_type, 0) + 1 | |
entity_count[entity_type] = suffix | |
masked_word = f"{entity_type}_{suffix}" | |
item['mask'] = masked_word | |
#data['entities'][item_idx]['mask'] = masked_word | |
masked_data.append(item) | |
return masked_data | |
def anonymize(self, entities, text): | |
print("anonymyzing") | |
updated_text = text | |
for ent_idx, ent in enumerate(entities): | |
#text[ent['start']:ent['end']] = ent['mask'] | |
updated_text = updated_text[:ent['start']] + " " + ent['mask'] + " " + updated_text[ent['end']:] | |
return updated_text | |
def remove_overlapping_entities(entities): | |
return |