adit94 commited on
Commit
637b0c6
1 Parent(s): 083168f

Upload 4 files

Browse files
helpers/common.py ADDED
File without changes
helpers/openai_service.py ADDED
File without changes
helpers/pii_anonymize.py ADDED
File without changes
helpers/pii_id.py ADDED
@@ -0,0 +1,263 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from presidio_analyzer import AnalyzerEngine, RecognizerRegistry
2
+ from presidio_analyzer.nlp_engine import NlpEngineProvider, NlpArtifacts
3
+ from presidio_analyzer import PatternRecognizer
4
+ from presidio_analyzer import Pattern, PatternRecognizer
5
+ from presidio_analyzer.predefined_recognizers import SpacyRecognizer
6
+ from presidio_analyzer.predefined_recognizers import IbanRecognizer, EmailRecognizer, IpRecognizer,\
7
+ EmailRecognizer, PhoneRecognizer, UrlRecognizer, DateRecognizer
8
+
9
+ import logging
10
+ from typing import Optional, List, Tuple, Set
11
+ from presidio_analyzer import (
12
+ RecognizerResult,
13
+ EntityRecognizer,
14
+ AnalysisExplanation,
15
+ )
16
+
17
+ from flair.data import Sentence
18
+ from flair.models import SequenceTagger
19
+
20
+ ### Creating FlairRecognizer class for NER(names, location)
21
+
22
+ class FlairRecognizer(EntityRecognizer):
23
+
24
+ ENTITIES = [
25
+ "LOCATION",
26
+ "PERSON",
27
+ "ORGANIZATION",
28
+ # "MISCELLANEOUS" # - There are no direct correlation with Presidio entities.
29
+ ]
30
+
31
+ DEFAULT_EXPLANATION = "Identified as {} by Flair's Named Entity Recognition"
32
+
33
+ CHECK_LABEL_GROUPS = [
34
+ ({"LOCATION"}, {"LOC", "LOCATION"}),
35
+ ({"PERSON"}, {"PER", "PERSON"}),
36
+ ({"ORGANIZATION"}, {"ORG"}),
37
+ # ({"MISCELLANEOUS"}, {"MISC"}), # Probably not PII
38
+ ]
39
+
40
+ MODEL_LANGUAGES = {
41
+ "en": "flair/ner-english-large",
42
+ "es": "flair/ner-spanish-large",
43
+ "de": "flair/ner-german-large",
44
+ "nl": "flair/ner-dutch-large",
45
+ }
46
+
47
+ PRESIDIO_EQUIVALENCES = {
48
+ "PER": "PERSON",
49
+ "LOC": "LOCATION",
50
+ "ORG": "ORGANIZATION",
51
+ # 'MISC': 'MISCELLANEOUS' # - Probably not PII
52
+ }
53
+
54
+ def __init__(
55
+ self,
56
+ supported_language: str = "en",
57
+ supported_entities: Optional[List[str]] = None,
58
+ check_label_groups: Optional[Tuple[Set, Set]] = None,
59
+ model: SequenceTagger = None,
60
+ ):
61
+ self.check_label_groups = (
62
+ check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS
63
+ )
64
+
65
+ supported_entities = supported_entities if supported_entities else self.ENTITIES
66
+ self.model = (
67
+ model
68
+ if model
69
+ else SequenceTagger.load(self.MODEL_LANGUAGES.get(supported_language))
70
+ )
71
+
72
+ super().__init__(
73
+ supported_entities=supported_entities,
74
+ supported_language=supported_language,
75
+ name="Flair Analytics",
76
+ )
77
+
78
+ def load(self) -> None:
79
+ """Load the model, not used. Model is loaded during initialization."""
80
+ pass
81
+
82
+ def get_supported_entities(self) -> List[str]:
83
+ """
84
+ Return supported entities by this model.
85
+
86
+ :return: List of the supported entities.
87
+ """
88
+ return self.supported_entities
89
+
90
+ # Class to use Flair with Presidio as an external recognizer.
91
+ def analyze(
92
+ self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts = None
93
+ ) -> List[RecognizerResult]:
94
+ """
95
+ Analyze text using Text Analytics.
96
+
97
+ :param text: The text for analysis.
98
+ :param entities: Not working properly for this recognizer.
99
+ :param nlp_artifacts: Not used by this recognizer.
100
+ :param language: Text language. Supported languages in MODEL_LANGUAGES
101
+ :return: The list of Presidio RecognizerResult constructed from the recognized
102
+ Flair detections.
103
+ """
104
+
105
+ results = []
106
+
107
+ sentences = Sentence(text)
108
+ self.model.predict(sentences)
109
+
110
+ # If there are no specific list of entities, we will look for all of it.
111
+ if not entities:
112
+ entities = self.supported_entities
113
+
114
+ for entity in entities:
115
+ if entity not in self.supported_entities:
116
+ continue
117
+
118
+ for ent in sentences.get_spans("ner"):
119
+ if not self.__check_label(
120
+ entity, ent.labels[0].value, self.check_label_groups
121
+ ):
122
+ continue
123
+ textual_explanation = self.DEFAULT_EXPLANATION.format(
124
+ ent.labels[0].value
125
+ )
126
+ explanation = self.build_flair_explanation(
127
+ round(ent.score, 2), textual_explanation
128
+ )
129
+ flair_result = self._convert_to_recognizer_result(ent, explanation)
130
+
131
+ results.append(flair_result)
132
+
133
+ return results
134
+
135
+ def _convert_to_recognizer_result(self, entity, explanation) -> RecognizerResult:
136
+
137
+ entity_type = self.PRESIDIO_EQUIVALENCES.get(entity.tag, entity.tag)
138
+ flair_score = round(entity.score, 2)
139
+
140
+ flair_results = RecognizerResult(
141
+ entity_type=entity_type,
142
+ start=entity.start_position,
143
+ end=entity.end_position,
144
+ score=flair_score,
145
+ analysis_explanation=explanation,
146
+ )
147
+
148
+ return flair_results
149
+
150
+ def build_flair_explanation(
151
+ self, original_score: float, explanation: str
152
+ ) -> AnalysisExplanation:
153
+ """
154
+ Create explanation for why this result was detected.
155
+
156
+ :param original_score: Score given by this recognizer
157
+ :param explanation: Explanation string
158
+ :return:
159
+ """
160
+ explanation = AnalysisExplanation(
161
+ recognizer=self.__class__.__name__,
162
+ original_score=original_score,
163
+ textual_explanation=explanation,
164
+ )
165
+ return explanation
166
+
167
+ @staticmethod
168
+ def __check_label(
169
+ entity: str, label: str, check_label_groups: Tuple[Set, Set]
170
+ ) -> bool:
171
+ return any(
172
+ [entity in egrp and label in lgrp for egrp, lgrp in check_label_groups]
173
+ )
174
+
175
+
176
+ class PII_IDENTIFIER:
177
+ def __init__(self):
178
+
179
+ configuration = {
180
+ "nlp_engine_name": "spacy",
181
+ "models": [
182
+ {"lang_code": "de", "model_name": "de_core_news_sm"}
183
+ ],
184
+ }
185
+
186
+ # Create NLP engine based on configuration
187
+ provider = NlpEngineProvider(nlp_configuration=configuration)
188
+ nlp_engine = provider.create_engine()
189
+
190
+ ## Creating regex for PatternRecognizers - SWIFT, vehicle number, zipcode, ssn
191
+ swift_regex = r"\b[A-Z]{4}DE[A-Z0-9]{2}(?:[A-Z0-9]{3})?"
192
+ vehicle_number_with_hyphen_regex = r"\b[A-ZÄÖÜ]{1,3}-[A-ZÄÖÜ]{1,2}-[0-9]{1,4}"
193
+ vehicle_number_without_hyphen_regex = r"\b[A-ZÄÖÜ]{1,3}[A-ZÄÖÜ]{1,2}[0-9]{1,4}"
194
+ german_zipcode_regex = r"\b((?:0[1-46-9]\d{3})|(?:[1-357-9]\d{4})|(?:[4][0-24-9]\d{3})|(?:[6][013-9]\d{3}))\b(?![\d/])"
195
+ german_ssn_regex = r"\b\d{2}\s?\d{6}\s?[A-Z]\s?\d{3}\b"
196
+ # Creating Presidio pattern object
197
+ vehicle_numbers_pattern1 = Pattern(name="vehicle_pattern", regex=vehicle_number_without_hyphen_regex, score=1)
198
+ vehicle_numbers_pattern2 = Pattern(name="vehicle_pattern", regex=vehicle_number_with_hyphen_regex, score=1)
199
+ swift_pattern = Pattern(name="bank_swift_pattern", regex=swift_regex, score=1)
200
+ germanzipcode_pattern = Pattern(name="german_zip_pattern",regex=german_zipcode_regex, score=1)
201
+ german_ssn_pattern = Pattern(name="german_ssn_pattern",regex=german_ssn_regex, score=1)
202
+
203
+ # Define the recognizer
204
+ swift_recognizer = PatternRecognizer(supported_entity="SWIFT", supported_language="de",patterns=[swift_pattern])
205
+ vehicle_number_recognizer = PatternRecognizer(supported_entity="VEHICLE_NUMBER", supported_language="de",patterns=[vehicle_numbers_pattern1,vehicle_numbers_pattern2])
206
+ germanzip_recognizer = PatternRecognizer(supported_entity="GERMAN_ZIP", supported_language="de",patterns=[germanzipcode_pattern])
207
+ germanssn_recognizer = PatternRecognizer(supported_entity="GERMAN_SSN", supported_language="de",patterns=[german_ssn_pattern])
208
+
209
+ ## Lading flair entity model for person, location ID
210
+ print("Loading flair")
211
+ flair_recognizer = FlairRecognizer(supported_language="de")
212
+
213
+ registry = RecognizerRegistry()
214
+ #registry.load_predefined_recognizers()
215
+ #registry.add_recognizer(SpacyRecognizer(supported_language="de"))
216
+ #registry.add_recognizer(SpacyRecognizer(supported_language="en"))
217
+
218
+ registry.remove_recognizer("SpacyRecognizer")
219
+ registry.add_recognizer(flair_recognizer)
220
+
221
+ registry.add_recognizer(swift_recognizer)
222
+ registry.add_recognizer(vehicle_number_recognizer)
223
+ registry.add_recognizer(germanzip_recognizer)
224
+ registry.add_recognizer(germanssn_recognizer)
225
+
226
+ ## Adding predefined recognizers
227
+ registry.add_recognizer(IbanRecognizer(supported_language="de"))
228
+ registry.add_recognizer(DateRecognizer(supported_language="de"))
229
+ registry.add_recognizer(EmailRecognizer(supported_language="de"))
230
+ registry.add_recognizer(IpRecognizer(supported_language="de"))
231
+ registry.add_recognizer(PhoneRecognizer(supported_language="de"))
232
+ registry.add_recognizer(UrlRecognizer(supported_language="de"))
233
+ #registry.add_recognizer(PhoneRecognizer(supported_language="de"))
234
+
235
+ self.analyzer = AnalyzerEngine(registry=registry, nlp_engine=nlp_engine, supported_languages=["de", "en"])
236
+
237
+ print(f"Type of recognizers ::\n {self.analyzer.registry.recognizers}")
238
+ print("PII initialized")
239
+
240
+ def identify(self, text):
241
+ results_de = self.analyzer.analyze(
242
+ text,
243
+ language='de'
244
+ )
245
+ entities = []
246
+
247
+ for result in results_de:
248
+ result_dict = result.to_dict()
249
+ temp_entity = {
250
+ "start":result_dict['start'],
251
+ "end":result_dict['end'],
252
+ "entity_type":result_dict['entity_type'],
253
+ "score":result_dict['score'],
254
+ "word":text[result_dict['start']:result_dict['end']]
255
+ }
256
+ print(result.analysis_explanation)
257
+ entities.append(temp_entity)
258
+
259
+ return {"entities":entities, "text":text}
260
+
261
+ def remove_overlapping_entities(entities):
262
+
263
+ return