seanpedrickcase
commited on
Commit
•
93ac94f
1
Parent(s):
8c33828
Updated decision making output files, log locations
Browse files- app.py +13 -7
- tools/data_anonymise.py +33 -27
- tools/file_conversion.py +1 -6
- tools/file_redaction.py +142 -117
app.py
CHANGED
@@ -12,6 +12,9 @@ from tools.auth import authenticate_user
|
|
12 |
#from tools.aws_functions import load_data_from_aws
|
13 |
import gradio as gr
|
14 |
|
|
|
|
|
|
|
15 |
add_folder_to_path("tesseract/")
|
16 |
add_folder_to_path("poppler/poppler-24.02.0/Library/bin/")
|
17 |
|
@@ -21,6 +24,9 @@ chosen_redact_entities = ["TITLES", "PERSON", "PHONE_NUMBER", "EMAIL_ADDRESS", "
|
|
21 |
full_entity_list = ["TITLES", "PERSON", "PHONE_NUMBER", "EMAIL_ADDRESS", "STREETNAME", "UKPOSTCODE", 'CREDIT_CARD', 'CRYPTO', 'DATE_TIME', 'IBAN_CODE', 'IP_ADDRESS', 'NRP', 'LOCATION', 'MEDICAL_LICENSE', 'URL', 'UK_NHS']
|
22 |
language = 'en'
|
23 |
|
|
|
|
|
|
|
24 |
# Create the gradio interface
|
25 |
app = gr.Blocks(theme = gr.themes.Base())
|
26 |
|
@@ -35,10 +41,10 @@ with app:
|
|
35 |
|
36 |
session_hash_state = gr.State()
|
37 |
s3_output_folder_state = gr.State()
|
38 |
-
feedback_logs_state = gr.State('
|
39 |
-
feedback_s3_logs_loc_state = gr.State(
|
40 |
-
usage_logs_state = gr.State('
|
41 |
-
usage_s3_logs_loc_state = gr.State(
|
42 |
|
43 |
gr.Markdown(
|
44 |
"""
|
@@ -162,18 +168,18 @@ with app:
|
|
162 |
|
163 |
# Log usernames and times of access to file (to know who is using the app when running on AWS)
|
164 |
callback = gr.CSVLogger()
|
165 |
-
callback.setup([session_hash_textbox],
|
166 |
session_hash_textbox.change(lambda *args: callback.flag(list(args)), [session_hash_textbox], None, preprocess=False)
|
167 |
|
168 |
# User submitted feedback for pdf redactions
|
169 |
pdf_callback = gr.CSVLogger()
|
170 |
-
pdf_callback.setup([pdf_feedback_radio, pdf_further_details_text],
|
171 |
pdf_submit_feedback_btn.click(lambda *args: pdf_callback.flag(list(args)), [pdf_feedback_radio, pdf_further_details_text], None, preprocess=False).\
|
172 |
then(fn = upload_file_to_s3, inputs=[feedback_logs_state, feedback_s3_logs_loc_state], outputs=[s3_logs_output_textbox])
|
173 |
|
174 |
# User submitted feedback for data redactions
|
175 |
data_callback = gr.CSVLogger()
|
176 |
-
data_callback.setup([data_feedback_radio, data_further_details_text],
|
177 |
data_submit_feedback_btn.click(lambda *args: data_callback.flag(list(args)), [data_feedback_radio, data_further_details_text], None, preprocess=False).\
|
178 |
then(fn = upload_file_to_s3, inputs=[feedback_logs_state, feedback_s3_logs_loc_state], outputs=[s3_logs_output_textbox])
|
179 |
|
|
|
12 |
#from tools.aws_functions import load_data_from_aws
|
13 |
import gradio as gr
|
14 |
|
15 |
+
from datetime import datetime
|
16 |
+
today_rev = datetime.now().strftime("%Y%m%d")
|
17 |
+
|
18 |
add_folder_to_path("tesseract/")
|
19 |
add_folder_to_path("poppler/poppler-24.02.0/Library/bin/")
|
20 |
|
|
|
24 |
full_entity_list = ["TITLES", "PERSON", "PHONE_NUMBER", "EMAIL_ADDRESS", "STREETNAME", "UKPOSTCODE", 'CREDIT_CARD', 'CRYPTO', 'DATE_TIME', 'IBAN_CODE', 'IP_ADDRESS', 'NRP', 'LOCATION', 'MEDICAL_LICENSE', 'URL', 'UK_NHS']
|
25 |
language = 'en'
|
26 |
|
27 |
+
feedback_data_folder = 'feedback/' + today_rev + '/'
|
28 |
+
logs_data_folder = 'logs/' + today_rev + '/'
|
29 |
+
|
30 |
# Create the gradio interface
|
31 |
app = gr.Blocks(theme = gr.themes.Base())
|
32 |
|
|
|
41 |
|
42 |
session_hash_state = gr.State()
|
43 |
s3_output_folder_state = gr.State()
|
44 |
+
feedback_logs_state = gr.State(feedback_data_folder + 'log.csv')
|
45 |
+
feedback_s3_logs_loc_state = gr.State(feedback_data_folder)
|
46 |
+
usage_logs_state = gr.State(logs_data_folder + 'log.csv')
|
47 |
+
usage_s3_logs_loc_state = gr.State(logs_data_folder)
|
48 |
|
49 |
gr.Markdown(
|
50 |
"""
|
|
|
168 |
|
169 |
# Log usernames and times of access to file (to know who is using the app when running on AWS)
|
170 |
callback = gr.CSVLogger()
|
171 |
+
callback.setup([session_hash_textbox], logs_data_folder)
|
172 |
session_hash_textbox.change(lambda *args: callback.flag(list(args)), [session_hash_textbox], None, preprocess=False)
|
173 |
|
174 |
# User submitted feedback for pdf redactions
|
175 |
pdf_callback = gr.CSVLogger()
|
176 |
+
pdf_callback.setup([pdf_feedback_radio, pdf_further_details_text], feedback_data_folder)
|
177 |
pdf_submit_feedback_btn.click(lambda *args: pdf_callback.flag(list(args)), [pdf_feedback_radio, pdf_further_details_text], None, preprocess=False).\
|
178 |
then(fn = upload_file_to_s3, inputs=[feedback_logs_state, feedback_s3_logs_loc_state], outputs=[s3_logs_output_textbox])
|
179 |
|
180 |
# User submitted feedback for data redactions
|
181 |
data_callback = gr.CSVLogger()
|
182 |
+
data_callback.setup([data_feedback_radio, data_further_details_text], feedback_data_folder)
|
183 |
data_submit_feedback_btn.click(lambda *args: data_callback.flag(list(args)), [data_feedback_radio, data_further_details_text], None, preprocess=False).\
|
184 |
then(fn = upload_file_to_s3, inputs=[feedback_logs_state, feedback_s3_logs_loc_state], outputs=[s3_logs_output_textbox])
|
185 |
|
tools/data_anonymise.py
CHANGED
@@ -23,27 +23,7 @@ fake = Faker("en_UK")
|
|
23 |
def fake_first_name(x):
|
24 |
return fake.first_name()
|
25 |
|
26 |
-
|
27 |
-
def generate_decision_process_output(analyzer_results: List[DictAnalyzerResult], df_dict: Dict[str, List[Any]]) -> str:
|
28 |
-
"""
|
29 |
-
Generate a detailed output of the decision process for entity recognition.
|
30 |
-
|
31 |
-
This function takes the results from the analyzer and the original data dictionary,
|
32 |
-
and produces a string output detailing the decision process for each recognized entity.
|
33 |
-
It includes information such as entity type, position, confidence score, and the context
|
34 |
-
in which the entity was found.
|
35 |
-
|
36 |
-
Args:
|
37 |
-
analyzer_results (List[DictAnalyzerResult]): The results from the entity analyzer.
|
38 |
-
df_dict (Dict[str, List[Any]]): The original data in dictionary format.
|
39 |
-
|
40 |
-
Returns:
|
41 |
-
str: A string containing the detailed decision process output.
|
42 |
-
"""
|
43 |
-
decision_process_output = []
|
44 |
-
keys_to_keep = ['entity_type', 'start', 'end']
|
45 |
-
|
46 |
-
def process_recognizer_result(result, recognizer_result, data_row, dictionary_key, df_dict, keys_to_keep):
|
47 |
output = []
|
48 |
|
49 |
if hasattr(result, 'value'):
|
@@ -66,29 +46,53 @@ def generate_decision_process_output(analyzer_results: List[DictAnalyzerResult],
|
|
66 |
output.append(str(analysis_explanation))
|
67 |
|
68 |
return output
|
69 |
-
|
70 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
71 |
|
72 |
# Run through each column to analyse for PII
|
73 |
for i, result in enumerate(analyzer_results):
|
74 |
print("Looking at result:", str(i))
|
|
|
75 |
|
76 |
# If a single result
|
77 |
if isinstance(result, RecognizerResult):
|
|
|
78 |
decision_process_output.extend(process_recognizer_result(result, result, 0, i, df_dict, keys_to_keep))
|
79 |
|
80 |
# If a list of results
|
81 |
-
elif isinstance(result,
|
82 |
-
for x, recognizer_result in enumerate(result.recognizer_results):
|
|
|
83 |
decision_process_output.extend(process_recognizer_result(result, recognizer_result, x, i, df_dict, keys_to_keep))
|
84 |
|
85 |
else:
|
86 |
try:
|
|
|
87 |
decision_process_output.extend(process_recognizer_result(result, result, 0, i, df_dict, keys_to_keep))
|
88 |
except Exception as e:
|
89 |
print(e)
|
90 |
|
91 |
decision_process_output_str = '\n'.join(decision_process_output)
|
|
|
|
|
92 |
|
93 |
|
94 |
return decision_process_output_str
|
@@ -220,6 +224,8 @@ def anonymise_script(df, anon_strat, language:str, chosen_redact_entities:List[s
|
|
220 |
# Usage in the main function:
|
221 |
decision_process_output_str = generate_decision_process_output(analyzer_results, df_dict)
|
222 |
|
|
|
|
|
223 |
analyse_toc = time.perf_counter()
|
224 |
analyse_time_out = f"Analysing the text took {analyse_toc - analyse_tic:0.1f} seconds."
|
225 |
print(analyse_time_out)
|
@@ -325,12 +331,12 @@ def anon_wrapper_func(anon_file, anon_df, chosen_cols, out_file_paths, out_file_
|
|
325 |
# Write each DataFrame to a different worksheet.
|
326 |
anon_df_out.to_excel(writer, sheet_name=excel_sheet_name, index=None)
|
327 |
|
328 |
-
decision_process_log_output_file = anon_xlsx_export_file_name + "
|
329 |
with open(decision_process_log_output_file, "w") as f:
|
330 |
f.write(decision_process_output_str)
|
331 |
|
332 |
else:
|
333 |
-
anon_export_file_name = output_folder + out_file_part + "
|
334 |
anon_df_out.to_csv(anon_export_file_name, index = None)
|
335 |
|
336 |
decision_process_log_output_file = anon_export_file_name + "_decision_process_output.txt"
|
|
|
23 |
def fake_first_name(x):
|
24 |
return fake.first_name()
|
25 |
|
26 |
+
def process_recognizer_result(result, recognizer_result, data_row, dictionary_key, df_dict, keys_to_keep):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
27 |
output = []
|
28 |
|
29 |
if hasattr(result, 'value'):
|
|
|
46 |
output.append(str(analysis_explanation))
|
47 |
|
48 |
return output
|
49 |
+
|
50 |
+
# Writing decision making process to file
|
51 |
+
def generate_decision_process_output(analyzer_results: List[DictAnalyzerResult], df_dict: Dict[str, List[Any]]) -> str:
|
52 |
+
"""
|
53 |
+
Generate a detailed output of the decision process for entity recognition.
|
54 |
+
|
55 |
+
This function takes the results from the analyzer and the original data dictionary,
|
56 |
+
and produces a string output detailing the decision process for each recognized entity.
|
57 |
+
It includes information such as entity type, position, confidence score, and the context
|
58 |
+
in which the entity was found.
|
59 |
+
|
60 |
+
Args:
|
61 |
+
analyzer_results (List[DictAnalyzerResult]): The results from the entity analyzer.
|
62 |
+
df_dict (Dict[str, List[Any]]): The original data in dictionary format.
|
63 |
+
|
64 |
+
Returns:
|
65 |
+
str: A string containing the detailed decision process output.
|
66 |
+
"""
|
67 |
+
decision_process_output = []
|
68 |
+
keys_to_keep = ['entity_type', 'start', 'end']
|
69 |
|
70 |
# Run through each column to analyse for PII
|
71 |
for i, result in enumerate(analyzer_results):
|
72 |
print("Looking at result:", str(i))
|
73 |
+
print("result:\n\n", result)
|
74 |
|
75 |
# If a single result
|
76 |
if isinstance(result, RecognizerResult):
|
77 |
+
print("Processing recogniser result as RecognizerResult:", str(i))
|
78 |
decision_process_output.extend(process_recognizer_result(result, result, 0, i, df_dict, keys_to_keep))
|
79 |
|
80 |
# If a list of results
|
81 |
+
elif isinstance(result, list) or isinstance(result, DictAnalyzerResult):
|
82 |
+
for x, recognizer_result in enumerate(result.recognizer_results):
|
83 |
+
print("Processing recogniser result as List:", str(i))
|
84 |
decision_process_output.extend(process_recognizer_result(result, recognizer_result, x, i, df_dict, keys_to_keep))
|
85 |
|
86 |
else:
|
87 |
try:
|
88 |
+
print("Processing recogniser result in other:", str(i))
|
89 |
decision_process_output.extend(process_recognizer_result(result, result, 0, i, df_dict, keys_to_keep))
|
90 |
except Exception as e:
|
91 |
print(e)
|
92 |
|
93 |
decision_process_output_str = '\n'.join(decision_process_output)
|
94 |
+
|
95 |
+
print("decision_process_output_str:\n\n", decision_process_output_str)
|
96 |
|
97 |
|
98 |
return decision_process_output_str
|
|
|
224 |
# Usage in the main function:
|
225 |
decision_process_output_str = generate_decision_process_output(analyzer_results, df_dict)
|
226 |
|
227 |
+
#print("decision_process_output_str:\n\n", decision_process_output_str)
|
228 |
+
|
229 |
analyse_toc = time.perf_counter()
|
230 |
analyse_time_out = f"Analysing the text took {analyse_toc - analyse_tic:0.1f} seconds."
|
231 |
print(analyse_time_out)
|
|
|
331 |
# Write each DataFrame to a different worksheet.
|
332 |
anon_df_out.to_excel(writer, sheet_name=excel_sheet_name, index=None)
|
333 |
|
334 |
+
decision_process_log_output_file = anon_xlsx_export_file_name + "_" + excel_sheet_name + "_decision_process_output.txt"
|
335 |
with open(decision_process_log_output_file, "w") as f:
|
336 |
f.write(decision_process_output_str)
|
337 |
|
338 |
else:
|
339 |
+
anon_export_file_name = output_folder + out_file_part + "_anon_" + anon_strat_txt + ".csv"
|
340 |
anon_df_out.to_csv(anon_export_file_name, index = None)
|
341 |
|
342 |
decision_process_log_output_file = anon_export_file_name + "_decision_process_output.txt"
|
tools/file_conversion.py
CHANGED
@@ -91,8 +91,6 @@ def process_file(file_path):
|
|
91 |
|
92 |
return img_object
|
93 |
|
94 |
-
|
95 |
-
|
96 |
def prepare_image_or_text_pdf(
|
97 |
file_paths: List[str],
|
98 |
in_redact_method: str,
|
@@ -123,9 +121,7 @@ def prepare_image_or_text_pdf(
|
|
123 |
|
124 |
# If out message or out_file_paths are blank, change to a list so it can be appended to
|
125 |
#if isinstance(out_message, str):
|
126 |
-
# out_message = [out_message]
|
127 |
-
|
128 |
-
|
129 |
|
130 |
# If this is the first time around, set variables to 0/blank
|
131 |
if first_loop_state==True:
|
@@ -189,7 +185,6 @@ def prepare_image_or_text_pdf(
|
|
189 |
|
190 |
return out_message, out_file_paths
|
191 |
|
192 |
-
|
193 |
def convert_text_pdf_to_img_pdf(in_file_path:str, out_text_file_path:List[str]):
|
194 |
file_path_without_ext = get_file_path_end(in_file_path)
|
195 |
|
|
|
91 |
|
92 |
return img_object
|
93 |
|
|
|
|
|
94 |
def prepare_image_or_text_pdf(
|
95 |
file_paths: List[str],
|
96 |
in_redact_method: str,
|
|
|
121 |
|
122 |
# If out message or out_file_paths are blank, change to a list so it can be appended to
|
123 |
#if isinstance(out_message, str):
|
124 |
+
# out_message = [out_message]
|
|
|
|
|
125 |
|
126 |
# If this is the first time around, set variables to 0/blank
|
127 |
if first_loop_state==True:
|
|
|
185 |
|
186 |
return out_message, out_file_paths
|
187 |
|
|
|
188 |
def convert_text_pdf_to_img_pdf(in_file_path:str, out_text_file_path:List[str]):
|
189 |
file_path_without_ext = get_file_path_end(in_file_path)
|
190 |
|
tools/file_redaction.py
CHANGED
@@ -247,142 +247,167 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
247 |
|
248 |
return images, decision_process_output_str
|
249 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
250 |
def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, progress=Progress(track_tqdm=True)):
|
251 |
'''
|
252 |
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
253 |
'''
|
254 |
-
|
255 |
-
combined_analyzer_results = []
|
256 |
-
analyser_explanations = []
|
257 |
annotations_all_pages = []
|
258 |
-
|
259 |
-
|
260 |
-
# Horizontal distance between PII bounding boxes under/equal they are combined into one
|
261 |
-
combine_pixel_dist = 100
|
262 |
|
263 |
pdf = Pdf.open(filename)
|
264 |
-
|
265 |
page_num = 0
|
266 |
|
267 |
-
#for page in progress.tqdm(pdf.pages, total=len(pdf.pages), unit="pages", desc="Redacting pages"):
|
268 |
for page in pdf.pages:
|
269 |
print("Page number is:", page_num + 1)
|
270 |
|
271 |
annotations_on_page = []
|
272 |
-
|
273 |
|
274 |
for page_layout in extract_pages(filename, page_numbers = [page_num], maxpages=1):
|
275 |
-
|
|
|
|
|
|
|
|
|
|
|
276 |
|
277 |
for text_container in page_layout:
|
278 |
-
|
279 |
-
|
280 |
-
|
281 |
-
analyzer_results = []
|
282 |
-
characters = []
|
283 |
-
|
284 |
-
analyzer_results = nlp_analyser.analyze(text=text_to_analyze,
|
285 |
-
language=language,
|
286 |
-
entities=chosen_redact_entities,
|
287 |
-
score_threshold=score_threshold,
|
288 |
-
return_decision_process=True,
|
289 |
-
allow_list=allow_list)
|
290 |
-
|
291 |
-
|
292 |
-
|
293 |
-
|
294 |
-
characters = [char # This is what we want to include in the list
|
295 |
-
for line in text_container # Loop through each line in text_container
|
296 |
-
if isinstance(line, LTTextLine) # Check if the line is an instance of LTTextLine
|
297 |
-
for char in line] # Loop through each character in the line
|
298 |
-
#if isinstance(char, LTChar)] # Check if the character is not an instance of LTAnno #isinstance(char, LTChar) or
|
299 |
-
|
300 |
-
|
301 |
-
# if len(analyzer_results) > 0 and len(characters) > 0:
|
302 |
-
# analyzed_bounding_boxes.extend({"boundingBox": char.bbox, "result": result} for result in analyzer_results for char in characters[result.start:result.end] if isinstance(char, LTChar))
|
303 |
-
# combined_analyzer_results.extend(analyzer_results)
|
304 |
-
|
305 |
-
# Inside the loop where you process analyzer_results:
|
306 |
-
if len(analyzer_results) > 0 and len(characters) > 0:
|
307 |
-
merged_bounding_boxes = []
|
308 |
-
current_box = None
|
309 |
-
current_y = None
|
310 |
-
|
311 |
-
for result in analyzer_results:
|
312 |
-
for char in characters[result.start : result.end]:
|
313 |
-
if isinstance(char, LTChar):
|
314 |
-
char_box = list(char.bbox)
|
315 |
-
|
316 |
-
# Fix: Check if either current_y or current_box are None
|
317 |
-
if current_y is None or current_box is None:
|
318 |
-
# This is the first character, so initialize current_box and current_y
|
319 |
-
current_box = char_box
|
320 |
-
current_y = char_box[1]
|
321 |
-
else: # Now we have previous values to compare
|
322 |
-
#print("Comparing values")
|
323 |
-
vertical_diff_bboxes = abs(char_box[1] - current_y)
|
324 |
-
horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
|
325 |
-
#print("Vertical distance with last bbox: ", str(vertical_diff_bboxes), "Horizontal distance: ", str(horizontal_diff_bboxes), "For result: ", result)
|
326 |
-
|
327 |
-
if (
|
328 |
-
vertical_diff_bboxes <= 5
|
329 |
-
and horizontal_diff_bboxes <= combine_pixel_dist
|
330 |
-
):
|
331 |
-
old_right_pos = current_box[2]
|
332 |
-
current_box[2] = char_box[2]
|
333 |
-
else:
|
334 |
-
merged_bounding_boxes.append(
|
335 |
-
{"boundingBox": current_box, "result": result})
|
336 |
-
|
337 |
-
current_box = char_box
|
338 |
-
current_y = char_box[1]
|
339 |
-
# Add the last box
|
340 |
-
if current_box:
|
341 |
-
merged_bounding_boxes.append({"boundingBox": current_box, "result": result})
|
342 |
-
|
343 |
-
if not merged_bounding_boxes:
|
344 |
-
analyzed_bounding_boxes.extend({"boundingBox": char.bbox, "result": result} for result in analyzer_results for char in characters[result.start:result.end] if isinstance(char, LTChar))
|
345 |
-
else:
|
346 |
-
analyzed_bounding_boxes.extend(merged_bounding_boxes)
|
347 |
-
|
348 |
-
combined_analyzer_results.extend(analyzer_results)
|
349 |
-
|
350 |
-
if len(analyzer_results) > 0:
|
351 |
-
#decision_process_output_str = generate_decision_process_output(analyzer_results, {'text':text_to_analyze})
|
352 |
-
#print("Decision process:", decision_process_output_str)
|
353 |
-
# Create summary df of annotations to be made
|
354 |
-
analyzed_bounding_boxes_df_new = pd.DataFrame(analyzed_bounding_boxes)
|
355 |
-
analyzed_bounding_boxes_df_text = analyzed_bounding_boxes_df_new['result'].astype(str).str.split(",",expand=True).replace(".*: ", "", regex=True)
|
356 |
-
analyzed_bounding_boxes_df_text.columns = ["type", "start", "end", "score"]
|
357 |
-
analyzed_bounding_boxes_df_new = pd.concat([analyzed_bounding_boxes_df_new, analyzed_bounding_boxes_df_text], axis = 1)
|
358 |
-
analyzed_bounding_boxes_df_new['page'] = page_num + 1
|
359 |
-
analyzed_bounding_boxes_df = pd.concat([analyzed_bounding_boxes_df, analyzed_bounding_boxes_df_new], axis = 0).drop('result', axis=1)
|
360 |
-
|
361 |
-
print('analyzed_bounding_boxes_df:', analyzed_bounding_boxes_df)
|
362 |
-
|
363 |
-
for analyzed_bounding_box in analyzed_bounding_boxes:
|
364 |
-
bounding_box = analyzed_bounding_box["boundingBox"]
|
365 |
-
annotation = Dictionary(
|
366 |
-
Type=Name.Annot,
|
367 |
-
Subtype=Name.Square, #Name.Highlight,
|
368 |
-
QuadPoints=[bounding_box[0], bounding_box[3], bounding_box[2], bounding_box[3], bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[1]],
|
369 |
-
Rect=[bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[3]],
|
370 |
-
C=[0, 0, 0],
|
371 |
-
IC=[0, 0, 0],
|
372 |
-
CA=1, # Transparency
|
373 |
-
T=analyzed_bounding_box["result"].entity_type,
|
374 |
-
BS=Dictionary(
|
375 |
-
W=0, # Border width: 1 point
|
376 |
-
S=Name.S # Border style: solid
|
377 |
-
)
|
378 |
-
)
|
379 |
-
annotations_on_page.append(annotation)
|
380 |
|
381 |
-
|
382 |
-
|
383 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
384 |
page.Annots = pdf.make_indirect(annotations_on_page)
|
385 |
|
|
|
|
|
|
|
|
|
|
|
386 |
page_num += 1
|
387 |
|
388 |
-
return pdf,
|
|
|
247 |
|
248 |
return images, decision_process_output_str
|
249 |
|
250 |
+
def analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list):
|
251 |
+
if isinstance(text_container, LTTextContainer):
|
252 |
+
text_to_analyze = text_container.get_text()
|
253 |
+
|
254 |
+
analyzer_results = nlp_analyser.analyze(text=text_to_analyze,
|
255 |
+
language=language,
|
256 |
+
entities=chosen_redact_entities,
|
257 |
+
score_threshold=score_threshold,
|
258 |
+
return_decision_process=True,
|
259 |
+
allow_list=allow_list)
|
260 |
+
characters = [char
|
261 |
+
for line in text_container
|
262 |
+
if isinstance(line, LTTextLine)
|
263 |
+
for char in line]
|
264 |
+
|
265 |
+
return analyzer_results, characters
|
266 |
+
return [], []
|
267 |
+
|
268 |
+
# Inside the loop where you process analyzer_results, merge bounding boxes that are right next to each other:
|
269 |
+
def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist):
|
270 |
+
analyzed_bounding_boxes = []
|
271 |
+
if len(analyzer_results) > 0 and len(characters) > 0:
|
272 |
+
merged_bounding_boxes = []
|
273 |
+
current_box = None
|
274 |
+
current_y = None
|
275 |
+
|
276 |
+
for i, result in enumerate(analyzer_results):
|
277 |
+
print("Considering result", str(i))
|
278 |
+
for char in characters[result.start : result.end]:
|
279 |
+
if isinstance(char, LTChar):
|
280 |
+
char_box = list(char.bbox)
|
281 |
+
|
282 |
+
if current_y is None or current_box is None:
|
283 |
+
current_box = char_box
|
284 |
+
current_y = char_box[1]
|
285 |
+
else:
|
286 |
+
vertical_diff_bboxes = abs(char_box[1] - current_y)
|
287 |
+
horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
|
288 |
+
|
289 |
+
if (
|
290 |
+
vertical_diff_bboxes <= 5
|
291 |
+
and horizontal_diff_bboxes <= combine_pixel_dist
|
292 |
+
):
|
293 |
+
current_box[2] = char_box[2] # Extend the current box horizontally
|
294 |
+
else:
|
295 |
+
merged_bounding_boxes.append(
|
296 |
+
{"boundingBox": current_box, "result": result})
|
297 |
+
|
298 |
+
# Reset current_box and current_y after appending
|
299 |
+
current_box = char_box
|
300 |
+
current_y = char_box[1]
|
301 |
+
|
302 |
+
# After finishing with the current result, add the last box for this result
|
303 |
+
if current_box:
|
304 |
+
merged_bounding_boxes.append({"boundingBox": current_box, "result": result})
|
305 |
+
current_box = None
|
306 |
+
current_y = None # Reset for the next result
|
307 |
+
|
308 |
+
if not merged_bounding_boxes:
|
309 |
+
analyzed_bounding_boxes.extend(
|
310 |
+
{"boundingBox": char.bbox, "result": result}
|
311 |
+
for result in analyzer_results
|
312 |
+
for char in characters[result.start:result.end]
|
313 |
+
if isinstance(char, LTChar)
|
314 |
+
)
|
315 |
+
else:
|
316 |
+
analyzed_bounding_boxes.extend(merged_bounding_boxes)
|
317 |
+
|
318 |
+
print("analysed_bounding_boxes:\n\n", analyzed_bounding_boxes)
|
319 |
+
|
320 |
+
return analyzed_bounding_boxes
|
321 |
+
|
322 |
+
def create_text_redaction_process_results(analyzer_results, analyzed_bounding_boxes, page_num):
|
323 |
+
decision_process_table = pd.DataFrame()
|
324 |
+
|
325 |
+
if len(analyzer_results) > 0:
|
326 |
+
# Create summary df of annotations to be made
|
327 |
+
analyzed_bounding_boxes_df_new = pd.DataFrame(analyzed_bounding_boxes)
|
328 |
+
analyzed_bounding_boxes_df_text = analyzed_bounding_boxes_df_new['result'].astype(str).str.split(",",expand=True).replace(".*: ", "", regex=True)
|
329 |
+
analyzed_bounding_boxes_df_text.columns = ["type", "start", "end", "score"]
|
330 |
+
analyzed_bounding_boxes_df_new = pd.concat([analyzed_bounding_boxes_df_new, analyzed_bounding_boxes_df_text], axis = 1)
|
331 |
+
analyzed_bounding_boxes_df_new['page'] = page_num + 1
|
332 |
+
decision_process_table = pd.concat([decision_process_table, analyzed_bounding_boxes_df_new], axis = 0).drop('result', axis=1)
|
333 |
+
|
334 |
+
print('\n\ndecision_process_table:\n\n', decision_process_table)
|
335 |
+
|
336 |
+
return decision_process_table
|
337 |
+
|
338 |
+
def create_annotations_for_bounding_boxes(analyzed_bounding_boxes):
|
339 |
+
annotations_on_page = []
|
340 |
+
for analyzed_bounding_box in analyzed_bounding_boxes:
|
341 |
+
bounding_box = analyzed_bounding_box["boundingBox"]
|
342 |
+
annotation = Dictionary(
|
343 |
+
Type=Name.Annot,
|
344 |
+
Subtype=Name.Square, #Name.Highlight,
|
345 |
+
QuadPoints=[bounding_box[0], bounding_box[3], bounding_box[2], bounding_box[3],
|
346 |
+
bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[1]],
|
347 |
+
Rect=[bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[3]],
|
348 |
+
C=[0, 0, 0],
|
349 |
+
IC=[0, 0, 0],
|
350 |
+
CA=1, # Transparency
|
351 |
+
T=analyzed_bounding_box["result"].entity_type,
|
352 |
+
BS=Dictionary(
|
353 |
+
W=0, # Border width: 1 point
|
354 |
+
S=Name.S # Border style: solid
|
355 |
+
)
|
356 |
+
)
|
357 |
+
annotations_on_page.append(annotation)
|
358 |
+
return annotations_on_page
|
359 |
+
|
360 |
def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, progress=Progress(track_tqdm=True)):
|
361 |
'''
|
362 |
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
363 |
'''
|
|
|
|
|
|
|
364 |
annotations_all_pages = []
|
365 |
+
decision_process_table_all_pages = []
|
366 |
+
|
367 |
+
combine_pixel_dist = 100 # Horizontal distance between PII bounding boxes under/equal they are combined into one
|
|
|
368 |
|
369 |
pdf = Pdf.open(filename)
|
|
|
370 |
page_num = 0
|
371 |
|
|
|
372 |
for page in pdf.pages:
|
373 |
print("Page number is:", page_num + 1)
|
374 |
|
375 |
annotations_on_page = []
|
376 |
+
decision_process_table_on_page = []
|
377 |
|
378 |
for page_layout in extract_pages(filename, page_numbers = [page_num], maxpages=1):
|
379 |
+
|
380 |
+
page_analyzer_results = []
|
381 |
+
page_analyzed_bounding_boxes = []
|
382 |
+
text_container_analyzer_results = []
|
383 |
+
text_container_analyzed_bounding_boxes = []
|
384 |
+
characters = []
|
385 |
|
386 |
for text_container in page_layout:
|
387 |
+
text_container_analyzer_results, characters = analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list)
|
388 |
+
# Merge bounding boxes if very close together
|
389 |
+
text_container_analyzed_bounding_boxes = merge_bounding_boxes(text_container_analyzer_results, characters, combine_pixel_dist)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
390 |
|
391 |
+
print("\n\nanalyzed_bounding_boxes_in_loop:", text_container_analyzed_bounding_boxes)
|
392 |
+
|
393 |
+
page_analyzed_bounding_boxes.extend(text_container_analyzed_bounding_boxes)
|
394 |
+
page_analyzer_results.extend(text_container_analyzer_results)
|
395 |
+
|
396 |
+
print("analyzed_bounding_boxes_out_loop:\n\n", page_analyzed_bounding_boxes)
|
397 |
+
|
398 |
+
decision_process_table_on_page = create_text_redaction_process_results(page_analyzer_results, page_analyzed_bounding_boxes, page_num)
|
399 |
+
|
400 |
+
annotations_on_page = create_annotations_for_bounding_boxes(page_analyzed_bounding_boxes)
|
401 |
+
#print('\n\nannotations_on_page:', annotations_on_page)
|
402 |
+
|
403 |
+
# Make page annotations
|
404 |
page.Annots = pdf.make_indirect(annotations_on_page)
|
405 |
|
406 |
+
annotations_all_pages.extend([annotations_on_page])
|
407 |
+
decision_process_table_all_pages.extend([decision_process_table_on_page])
|
408 |
+
|
409 |
+
print("For page number:", page_num, "there are", len(annotations_all_pages[page_num]), "annotations")
|
410 |
+
|
411 |
page_num += 1
|
412 |
|
413 |
+
return pdf, decision_process_table_all_pages
|