|
from pprint import pprint |
|
import json |
|
|
|
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry |
|
from presidio_analyzer.nlp_engine import NlpEngineProvider, NlpArtifacts |
|
from presidio_analyzer import PatternRecognizer |
|
from presidio_analyzer import Pattern, PatternRecognizer |
|
from presidio_analyzer.predefined_recognizers import SpacyRecognizer |
|
from presidio_analyzer.predefined_recognizers import IbanRecognizer, EmailRecognizer, IpRecognizer,\ |
|
EmailRecognizer, PhoneRecognizer, UrlRecognizer, DateRecognizer |
|
|
|
from presidio_anonymizer import AnonymizerEngine |
|
from presidio_anonymizer.entities import OperatorConfig |
|
|
|
import logging |
|
from typing import Optional, List, Tuple, Set |
|
from presidio_analyzer import ( |
|
RecognizerResult, |
|
EntityRecognizer, |
|
AnalysisExplanation, |
|
) |
|
|
|
from flair.data import Sentence |
|
from flair.models import SequenceTagger |
|
|
|
|
|
|
|
class FlairRecognizer(EntityRecognizer): |
|
|
|
ENTITIES = [ |
|
"LOCATION", |
|
"PERSON", |
|
"ORGANIZATION", |
|
|
|
] |
|
|
|
DEFAULT_EXPLANATION = "Identified as {} by Flair's Named Entity Recognition" |
|
|
|
CHECK_LABEL_GROUPS = [ |
|
({"LOCATION"}, {"LOC", "LOCATION"}), |
|
({"PERSON"}, {"PER", "PERSON"}), |
|
({"ORGANIZATION"}, {"ORG"}), |
|
|
|
] |
|
|
|
MODEL_LANGUAGES = { |
|
|
|
|
|
"de": "flair/ner-german-large", |
|
|
|
} |
|
|
|
PRESIDIO_EQUIVALENCES = { |
|
"PER": "PERSON", |
|
"LOC": "LOCATION", |
|
"ORG": "ORGANIZATION", |
|
|
|
} |
|
|
|
def __init__( |
|
self, |
|
supported_language: str = "de", |
|
supported_entities: Optional[List[str]] = None, |
|
check_label_groups: Optional[Tuple[Set, Set]] = None, |
|
model: SequenceTagger = None, |
|
): |
|
self.check_label_groups = ( |
|
check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS |
|
) |
|
|
|
supported_entities = supported_entities if supported_entities else self.ENTITIES |
|
self.model = ( |
|
model |
|
if model |
|
else SequenceTagger.load(self.MODEL_LANGUAGES.get(supported_language)) |
|
) |
|
|
|
super().__init__( |
|
supported_entities=supported_entities, |
|
supported_language=supported_language, |
|
name="Flair Analytics", |
|
) |
|
print("Flair class initialized") |
|
|
|
def load(self) -> None: |
|
"""Load the model, not used. Model is loaded during initialization.""" |
|
pass |
|
|
|
def get_supported_entities(self) -> List[str]: |
|
""" |
|
Return supported entities by this model. |
|
|
|
:return: List of the supported entities. |
|
""" |
|
return self.supported_entities |
|
|
|
|
|
def analyze( |
|
self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts = None |
|
) -> List[RecognizerResult]: |
|
""" |
|
Analyze text using Text Analytics. |
|
|
|
:param text: The text for analysis. |
|
:param entities: Not working properly for this recognizer. |
|
:param nlp_artifacts: Not used by this recognizer. |
|
:param language: Text language. Supported languages in MODEL_LANGUAGES |
|
:return: The list of Presidio RecognizerResult constructed from the recognized |
|
Flair detections. |
|
""" |
|
|
|
results = [] |
|
|
|
sentences = Sentence(text) |
|
self.model.predict(sentences) |
|
|
|
|
|
if not entities: |
|
entities = self.supported_entities |
|
|
|
for entity in entities: |
|
if entity not in self.supported_entities: |
|
continue |
|
|
|
for ent in sentences.get_spans("ner"): |
|
if not self.__check_label( |
|
entity, ent.labels[0].value, self.check_label_groups |
|
): |
|
continue |
|
textual_explanation = self.DEFAULT_EXPLANATION.format( |
|
ent.labels[0].value |
|
) |
|
explanation = self.build_flair_explanation( |
|
round(ent.score, 2), textual_explanation |
|
) |
|
flair_result = self._convert_to_recognizer_result(ent, explanation) |
|
|
|
results.append(flair_result) |
|
|
|
return results |
|
|
|
def _convert_to_recognizer_result(self, entity, explanation) -> RecognizerResult: |
|
|
|
entity_type = self.PRESIDIO_EQUIVALENCES.get(entity.tag, entity.tag) |
|
flair_score = round(entity.score, 2) |
|
|
|
flair_results = RecognizerResult( |
|
entity_type=entity_type, |
|
start=entity.start_position, |
|
end=entity.end_position, |
|
score=flair_score, |
|
analysis_explanation=explanation, |
|
) |
|
|
|
return flair_results |
|
|
|
def build_flair_explanation( |
|
self, original_score: float, explanation: str |
|
) -> AnalysisExplanation: |
|
""" |
|
Create explanation for why this result was detected. |
|
|
|
:param original_score: Score given by this recognizer |
|
:param explanation: Explanation string |
|
:return: |
|
""" |
|
explanation = AnalysisExplanation( |
|
recognizer=self.__class__.__name__, |
|
original_score=original_score, |
|
textual_explanation=explanation, |
|
) |
|
return explanation |
|
|
|
@staticmethod |
|
def __check_label( |
|
entity: str, label: str, check_label_groups: Tuple[Set, Set] |
|
) -> bool: |
|
return any( |
|
[entity in egrp and label in lgrp for egrp, lgrp in check_label_groups] |
|
) |
|
|
|
|
|
class PII_IDENTIFIER: |
|
def __init__(self): |
|
|
|
configuration = { |
|
"nlp_engine_name": "spacy", |
|
"models": [ |
|
{"lang_code": "de", "model_name": "de_core_news_sm"} |
|
], |
|
} |
|
|
|
|
|
provider = NlpEngineProvider(nlp_configuration=configuration) |
|
nlp_engine = provider.create_engine() |
|
|
|
|
|
swift_regex = r"\b[A-Z]{4}DE[A-Z0-9]{2}(?:[A-Z0-9]{3})?" |
|
vehicle_number_with_hyphen_regex = r"\b[A-ZÄÖÜ]{1,3}-[A-ZÄÖÜ]{1,2}-[0-9]{1,4}" |
|
vehicle_number_without_hyphen_regex = r"\b[A-ZÄÖÜ]{1,3}[A-ZÄÖÜ]{1,2}[0-9]{1,4}" |
|
german_zipcode_regex = r"\b((?:0[1-46-9]\d{3})|(?:[1-357-9]\d{4})|(?:[4][0-24-9]\d{3})|(?:[6][013-9]\d{3}))\b(?![\d/])" |
|
german_ssn_regex = r"\b\d{2}\s?\d{6}\s?[A-Z]\s?\d{3}\b" |
|
|
|
vehicle_numbers_pattern1 = Pattern(name="vehicle_pattern", regex=vehicle_number_without_hyphen_regex, score=1) |
|
vehicle_numbers_pattern2 = Pattern(name="vehicle_pattern", regex=vehicle_number_with_hyphen_regex, score=1) |
|
swift_pattern = Pattern(name="bank_swift_pattern", regex=swift_regex, score=1) |
|
germanzipcode_pattern = Pattern(name="german_zip_pattern",regex=german_zipcode_regex, score=1) |
|
german_ssn_pattern = Pattern(name="german_ssn_pattern",regex=german_ssn_regex, score=1) |
|
|
|
|
|
swift_recognizer = PatternRecognizer(supported_entity="SWIFT", supported_language="de",patterns=[swift_pattern]) |
|
vehicle_number_recognizer = PatternRecognizer(supported_entity="VEHICLE_NUMBER", supported_language="de",patterns=[vehicle_numbers_pattern1,vehicle_numbers_pattern2]) |
|
germanzip_recognizer = PatternRecognizer(supported_entity="GERMAN_ZIP", supported_language="de",patterns=[germanzipcode_pattern]) |
|
germanssn_recognizer = PatternRecognizer(supported_entity="GERMAN_SSN", supported_language="de",patterns=[german_ssn_pattern]) |
|
|
|
|
|
print("Loading flair") |
|
flair_recognizer = FlairRecognizer(supported_language="de") |
|
print("Flair loaded") |
|
|
|
registry = RecognizerRegistry(supported_languages=["de"]) |
|
|
|
|
|
|
|
|
|
registry.remove_recognizer("SpacyRecognizer") |
|
registry.add_recognizer(flair_recognizer) |
|
|
|
registry.add_recognizer(swift_recognizer) |
|
registry.add_recognizer(vehicle_number_recognizer) |
|
registry.add_recognizer(germanzip_recognizer) |
|
registry.add_recognizer(germanssn_recognizer) |
|
|
|
|
|
registry.add_recognizer(IbanRecognizer(supported_language="de")) |
|
registry.add_recognizer(DateRecognizer(supported_language="de")) |
|
registry.add_recognizer(EmailRecognizer(supported_language="de")) |
|
registry.add_recognizer(IpRecognizer(supported_language="de")) |
|
registry.add_recognizer(PhoneRecognizer(supported_language="de")) |
|
registry.add_recognizer(UrlRecognizer(supported_language="de")) |
|
registry.add_recognizer(PhoneRecognizer(supported_language="de")) |
|
print("Recognizer registry loaded") |
|
|
|
self.analyzer = AnalyzerEngine(registry=registry, nlp_engine=nlp_engine, supported_languages=["de"]) |
|
|
|
|
|
print("PII initialized") |
|
|
|
self.anonymizer = AnonymizerEngine() |
|
|
|
def identify(self, text): |
|
results_de = self.analyzer.analyze( |
|
text, |
|
language='de' |
|
) |
|
|
|
anonymized_results = self.anonymize(results_de, text) |
|
entities = [] |
|
|
|
for result in results_de: |
|
result_dict = result.to_dict() |
|
temp_entity = { |
|
"start":result_dict['start'], |
|
"end":result_dict['end'], |
|
"entity_type":result_dict['entity_type'], |
|
"score":result_dict['score'], |
|
"word":text[result_dict['start']:result_dict['end']] |
|
} |
|
entities.append(temp_entity) |
|
|
|
return {"entities":entities, "text":text, "anonymized_text":anonymized_results['text']} |
|
|
|
def anonymize(self, entities, text): |
|
anonymized_results = self.anonymizer.anonymize( |
|
text=text, |
|
analyzer_results=entities, |
|
|
|
) |
|
|
|
return json.loads(anonymized_results.to_json()) |
|
|
|
def remove_overlapping_entities(entities): |
|
|
|
return |