seanpedrickcase commited on
Commit
6ea0852
·
1 Parent(s): e9c4101

Improved allow list, handwriting/signature identification, logging

Browse files
app.py CHANGED
@@ -4,7 +4,7 @@ import socket
4
  # By default TLDExtract will try to pull files from the internet. I have instead downloaded this file locally to avoid the requirement for an internet connection.
5
  os.environ['TLDEXTRACT_CACHE'] = 'tld/.tld_set_snapshot'
6
 
7
- from tools.helper_functions import ensure_output_folder_exists, add_folder_to_path, put_columns_in_df, get_connection_params, output_folder, get_or_create_env_var, reveal_feedback_buttons, wipe_logs
8
  from tools.aws_functions import upload_file_to_s3
9
  from tools.file_redaction import choose_and_run_redactor
10
  from tools.file_conversion import prepare_image_or_text_pdf
@@ -12,6 +12,7 @@ from tools.data_anonymise import anonymise_data_files
12
  from tools.auth import authenticate_user
13
  #from tools.aws_functions import load_data_from_aws
14
  import gradio as gr
 
15
 
16
  from datetime import datetime
17
  today_rev = datetime.now().strftime("%Y%m%d")
@@ -44,6 +45,8 @@ with app:
44
  first_loop_state = gr.State(True)
45
  second_loop_state = gr.State(False)
46
 
 
 
47
  session_hash_state = gr.State()
48
  s3_output_folder_state = gr.State()
49
 
@@ -69,8 +72,8 @@ with app:
69
  with gr.Tab("PDFs/images"):
70
 
71
  with gr.Accordion("Redact document", open = True):
72
- in_file = gr.File(label="Choose document/image files (PDF, JPG, PNG)", file_count= "multiple", file_types=['.pdf', '.jpg', '.png'])
73
- redact_btn = gr.Button("Redact document(s)", variant="primary")
74
 
75
  with gr.Row():
76
  output_summary = gr.Textbox(label="Output summary")
@@ -128,6 +131,8 @@ with app:
128
  with gr.Row():
129
  page_min = gr.Number(precision=0,minimum=0,maximum=9999, label="Lowest page to redact")
130
  page_max = gr.Number(precision=0,minimum=0,maximum=9999, label="Highest page to redact")
 
 
131
  with gr.Accordion("Settings for open text or xlsx/csv files", open = True):
132
  anon_strat = gr.Radio(choices=["replace with <REDACTED>", "replace with <ENTITY_NAME>", "redact", "hash", "mask", "encrypt", "fake_first_name"], label="Select an anonymisation method.", value = "replace with <REDACTED>")
133
 
@@ -135,11 +140,16 @@ with app:
135
  in_redact_entities = gr.Dropdown(value=chosen_redact_entities, choices=full_entity_list, multiselect=True, label="Entities to redact (click close to down arrow for full list)")
136
  with gr.Row():
137
  in_redact_language = gr.Dropdown(value = "en", choices = ["en"], label="Redaction language (only English currently supported)", multiselect=False)
138
- in_allow_list = gr.Dataframe(label="Allow list - enter a new term to ignore for redaction on each row e.g. Lambeth -> add new row -> Lambeth 2030", headers=["Allow list"], row_count=1, col_count=(1, 'fixed'), value=[[""]], type="array", column_widths=["100px"], datatype='str')
 
 
 
 
139
  log_files_output = gr.File(label="Log file output", interactive=False)
140
 
141
- # Invisible text box to hold the session hash/username just for logging purposes
142
- session_hash_textbox = gr.Textbox(value="", visible=False)
 
143
 
144
  # AWS options - placeholder for possibility of storing data on s3
145
  # with gr.Tab(label="Advanced options"):
@@ -153,16 +163,19 @@ with app:
153
 
154
  # ### Loading AWS data ###
155
  # load_aws_data_button.click(fn=load_data_from_aws, inputs=[in_aws_file, aws_password_box], outputs=[in_file, aws_log_box])
 
 
 
156
 
157
  # Document redaction
158
- redact_btn.click(fn = prepare_image_or_text_pdf, inputs=[in_file, in_redaction_method, in_allow_list, text_documents_done, output_summary, first_loop_state], outputs=[output_summary, prepared_pdf_state], api_name="prepare").\
159
- then(fn = choose_and_run_redactor, inputs=[in_file, prepared_pdf_state, in_redact_language, in_redact_entities, in_redaction_method, in_allow_list, text_documents_done, output_summary, output_file_list_state, log_files_output_list_state, first_loop_state, page_min, page_max, estimated_time_taken_number],
160
- outputs=[output_summary, output_file, output_file_list_state, text_documents_done, log_files_output, log_files_output_list_state, estimated_time_taken_number], api_name="redact_doc")
161
 
162
  # If the output file count text box changes, keep going with redacting each document until done
163
  text_documents_done.change(fn = prepare_image_or_text_pdf, inputs=[in_file, in_redaction_method, in_allow_list, text_documents_done, output_summary, second_loop_state], outputs=[output_summary, prepared_pdf_state]).\
164
- then(fn = choose_and_run_redactor, inputs=[in_file, prepared_pdf_state, in_redact_language, in_redact_entities, in_redaction_method, in_allow_list, text_documents_done, output_summary, output_file_list_state, log_files_output_list_state, second_loop_state, page_min, page_max, estimated_time_taken_number],
165
- outputs=[output_summary, output_file, output_file_list_state, text_documents_done, log_files_output, log_files_output_list_state, estimated_time_taken_number]).\
166
  then(fn = reveal_feedback_buttons, outputs=[pdf_feedback_radio, pdf_further_details_text, pdf_submit_feedback_btn, pdf_feedback_title])
167
 
168
  # Tabular data redaction
@@ -197,8 +210,8 @@ with app:
197
 
198
  # Log processing time/token usage when making a query
199
  usage_callback = gr.CSVLogger()
200
- usage_callback.setup([session_hash_textbox, in_data_files, estimated_time_taken_number], usage_logs_folder)
201
- estimated_time_taken_number.change(lambda *args: usage_callback.flag(list(args)), [session_hash_textbox, in_data_files, estimated_time_taken_number], None, preprocess=False).\
202
  then(fn = upload_file_to_s3, inputs=[usage_logs_state, usage_s3_logs_loc_state], outputs=[s3_logs_output_textbox])
203
 
204
  # Launch the Gradio app
 
4
  # By default TLDExtract will try to pull files from the internet. I have instead downloaded this file locally to avoid the requirement for an internet connection.
5
  os.environ['TLDEXTRACT_CACHE'] = 'tld/.tld_set_snapshot'
6
 
7
+ from tools.helper_functions import ensure_output_folder_exists, add_folder_to_path, put_columns_in_df, get_connection_params, output_folder, get_or_create_env_var, reveal_feedback_buttons, wipe_logs, custom_regex_load
8
  from tools.aws_functions import upload_file_to_s3
9
  from tools.file_redaction import choose_and_run_redactor
10
  from tools.file_conversion import prepare_image_or_text_pdf
 
12
  from tools.auth import authenticate_user
13
  #from tools.aws_functions import load_data_from_aws
14
  import gradio as gr
15
+ import pandas as pd
16
 
17
  from datetime import datetime
18
  today_rev = datetime.now().strftime("%Y%m%d")
 
45
  first_loop_state = gr.State(True)
46
  second_loop_state = gr.State(False)
47
 
48
+ in_allow_list_state = gr.State(pd.DataFrame())
49
+
50
  session_hash_state = gr.State()
51
  s3_output_folder_state = gr.State()
52
 
 
72
  with gr.Tab("PDFs/images"):
73
 
74
  with gr.Accordion("Redact document", open = True):
75
+ in_file = gr.File(label="Choose document/image files (PDF, JPG, PNG)", file_count= "multiple", file_types=['.pdf', '.jpg', '.png', '.json'])
76
+ document_redact_btn = gr.Button("Redact document(s)", variant="primary")
77
 
78
  with gr.Row():
79
  output_summary = gr.Textbox(label="Output summary")
 
131
  with gr.Row():
132
  page_min = gr.Number(precision=0,minimum=0,maximum=9999, label="Lowest page to redact")
133
  page_max = gr.Number(precision=0,minimum=0,maximum=9999, label="Highest page to redact")
134
+ with gr.Row():
135
+ handwrite_signature_checkbox = gr.CheckboxGroup(choices=["Redact all identified handwriting", "Redact all identified signatures"], value=["Redact all identified handwriting", "Redact all identified signatures"])
136
  with gr.Accordion("Settings for open text or xlsx/csv files", open = True):
137
  anon_strat = gr.Radio(choices=["replace with <REDACTED>", "replace with <ENTITY_NAME>", "redact", "hash", "mask", "encrypt", "fake_first_name"], label="Select an anonymisation method.", value = "replace with <REDACTED>")
138
 
 
140
  in_redact_entities = gr.Dropdown(value=chosen_redact_entities, choices=full_entity_list, multiselect=True, label="Entities to redact (click close to down arrow for full list)")
141
  with gr.Row():
142
  in_redact_language = gr.Dropdown(value = "en", choices = ["en"], label="Redaction language (only English currently supported)", multiselect=False)
143
+ #in_allow_list = gr.Dataframe(label="Allow list - enter a new term to ignore for redaction on each row e.g. Lambeth -> add new row -> Lambeth 2030", headers=["Allow list"], row_count=1, col_count=(1, 'fixed'), value=[[""]], type="array", column_widths=["100px"], datatype='str')
144
+ with gr.Row():
145
+ in_allow_list = gr.UploadButton(label="Import allow list file.", file_count="multiple")
146
+ gr.Markdown("""Import allow list file - csv table with one column of a different word/phrase on each row (case sensitive). Terms in this file will not be redacted.""")
147
+ in_allow_list_text = gr.Textbox(label="Custom allow list load status")
148
  log_files_output = gr.File(label="Log file output", interactive=False)
149
 
150
+ # Invisible text box to hold the session hash/username and Textract request metadata just for logging purposes
151
+ session_hash_textbox = gr.Textbox(value="", visible=False)
152
+ textract_metadata_textbox = gr.Textbox(value="", visible=False)
153
 
154
  # AWS options - placeholder for possibility of storing data on s3
155
  # with gr.Tab(label="Advanced options"):
 
163
 
164
  # ### Loading AWS data ###
165
  # load_aws_data_button.click(fn=load_data_from_aws, inputs=[in_aws_file, aws_password_box], outputs=[in_file, aws_log_box])
166
+
167
+ # If a custom allow list is uploaded
168
+ in_allow_list.upload(fn=custom_regex_load, inputs=[in_allow_list], outputs=[in_allow_list_text, in_allow_list_state])
169
 
170
  # Document redaction
171
+ document_redact_btn.click(fn = prepare_image_or_text_pdf, inputs=[in_file, in_redaction_method, in_allow_list, text_documents_done, output_summary, first_loop_state], outputs=[output_summary, prepared_pdf_state], api_name="prepare").\
172
+ then(fn = choose_and_run_redactor, inputs=[in_file, prepared_pdf_state, in_redact_language, in_redact_entities, in_redaction_method, in_allow_list_state, text_documents_done, output_summary, output_file_list_state, log_files_output_list_state, first_loop_state, page_min, page_max, estimated_time_taken_number, handwrite_signature_checkbox],
173
+ outputs=[output_summary, output_file, output_file_list_state, text_documents_done, log_files_output, log_files_output_list_state, estimated_time_taken_number, textract_metadata_textbox], api_name="redact_doc")
174
 
175
  # If the output file count text box changes, keep going with redacting each document until done
176
  text_documents_done.change(fn = prepare_image_or_text_pdf, inputs=[in_file, in_redaction_method, in_allow_list, text_documents_done, output_summary, second_loop_state], outputs=[output_summary, prepared_pdf_state]).\
177
+ then(fn = choose_and_run_redactor, inputs=[in_file, prepared_pdf_state, in_redact_language, in_redact_entities, in_redaction_method, in_allow_list_state, text_documents_done, output_summary, output_file_list_state, log_files_output_list_state, second_loop_state, page_min, page_max, estimated_time_taken_number, handwrite_signature_checkbox],
178
+ outputs=[output_summary, output_file, output_file_list_state, text_documents_done, log_files_output, log_files_output_list_state, estimated_time_taken_number, textract_metadata_textbox]).\
179
  then(fn = reveal_feedback_buttons, outputs=[pdf_feedback_radio, pdf_further_details_text, pdf_submit_feedback_btn, pdf_feedback_title])
180
 
181
  # Tabular data redaction
 
210
 
211
  # Log processing time/token usage when making a query
212
  usage_callback = gr.CSVLogger()
213
+ usage_callback.setup([session_hash_textbox, in_data_files, estimated_time_taken_number, textract_metadata_textbox], usage_logs_folder)
214
+ estimated_time_taken_number.change(lambda *args: usage_callback.flag(list(args)), [session_hash_textbox, in_data_files, estimated_time_taken_number, textract_metadata_textbox], None, preprocess=False).\
215
  then(fn = upload_file_to_s3, inputs=[usage_logs_state, usage_s3_logs_loc_state], outputs=[s3_logs_output_textbox])
216
 
217
  # Launch the Gradio app
tools/aws_functions.py CHANGED
@@ -14,6 +14,10 @@ aws_var_default = "0"
14
  aws_var_val = get_or_create_env_var(aws_var, aws_var_default)
15
  print(f'The value of {aws_var} is {aws_var_val}')
16
 
 
 
 
 
17
  if aws_var_val == "1":
18
  try:
19
  bucket_name = os.environ['DOCUMENT_REDACTION_BUCKET']
@@ -22,7 +26,8 @@ if aws_var_val == "1":
22
  print(e)
23
 
24
  def get_assumed_role_info():
25
- sts = boto3.client('sts', region_name='eu-west-2', endpoint_url='https://sts.eu-west-2.amazonaws.com')
 
26
  response = sts.get_caller_identity()
27
 
28
  # Extract ARN of the assumed role
 
14
  aws_var_val = get_or_create_env_var(aws_var, aws_var_default)
15
  print(f'The value of {aws_var} is {aws_var_val}')
16
 
17
+ # Launch the Gradio app
18
+ AWS_REGION = get_or_create_env_var('AWS_REGION', 'eu-west-2')
19
+ print(f'The value of AWS_REGION is {AWS_REGION}')
20
+
21
  if aws_var_val == "1":
22
  try:
23
  bucket_name = os.environ['DOCUMENT_REDACTION_BUCKET']
 
26
  print(e)
27
 
28
  def get_assumed_role_info():
29
+ sts_endpoint = 'https://sts.' + AWS_REGION + '.amazonaws.com'
30
+ sts = boto3.client('sts', region_name=AWS_REGION, endpoint_url=sts_endpoint)
31
  response = sts.get_caller_identity()
32
 
33
  # Extract ARN of the assumed role
tools/aws_textract.py CHANGED
@@ -7,6 +7,22 @@ import pikepdf
7
  from pdf2image import convert_from_bytes
8
  from tools.custom_image_analyser_engine import OCRResult, CustomImageRecognizerResult
9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
  def analyse_page_with_textract(pdf_page_bytes, json_file_path):
11
  '''
12
  Analyse page with AWS Textract
@@ -27,7 +43,8 @@ def analyse_page_with_textract(pdf_page_bytes, json_file_path):
27
  #response = client.detect_document_text(Document={'Bytes': image_bytes})
28
  response = client.analyze_document(Document={'Bytes': pdf_page_bytes}, FeatureTypes=["SIGNATURES"])
29
 
30
- text_blocks = response['Blocks']
 
31
 
32
  # Write the response to a JSON file
33
  with open(json_file_path, 'w') as json_file:
@@ -35,7 +52,7 @@ def analyse_page_with_textract(pdf_page_bytes, json_file_path):
35
 
36
  print("Response has been written to output:", json_file_path)
37
 
38
- return text_blocks
39
 
40
 
41
  def convert_pike_pdf_page_to_bytes(pdf, page_num):
@@ -66,10 +83,12 @@ def convert_pike_pdf_page_to_bytes(pdf, page_num):
66
 
67
  def json_to_ocrresult(json_data, page_width, page_height):
68
  '''
69
- Convert the json response from textract to the OCRResult format used elsewhere in the code.
70
  '''
71
  all_ocr_results = []
72
  signature_or_handwriting_recogniser_results = []
 
 
73
  signatures = []
74
  handwriting = []
75
 
@@ -78,30 +97,40 @@ def json_to_ocrresult(json_data, page_width, page_height):
78
  is_signature = False
79
  is_handwriting = False
80
 
81
- if (text_block['BlockType'] == 'WORD') | (text_block['BlockType'] == 'LINE'):
82
- text = text_block['Text']
83
-
84
- # Extract BoundingBox details
85
- bbox = text_block["Geometry"]["BoundingBox"]
86
- left = bbox["Left"]
87
- top = bbox["Top"]
88
- width = bbox["Width"]
89
- height = bbox["Height"]
90
-
91
- # Convert proportional coordinates to absolute coordinates
92
- left_abs = int(left * page_width)
93
- top_abs = int(top * page_height)
94
- width_abs = int(width * page_width)
95
- height_abs = int(height * page_height)
96
-
97
- # Create OCRResult with absolute coordinates
98
- ocr_result = OCRResult(text, left_abs, top_abs, width_abs, height_abs)
99
-
100
- # If handwriting or signature, add to bounding box
101
- confidence = text_block['Confidence']
102
 
103
- if 'TextType' in text_block:
104
- text_type = text_block["TextType"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
 
106
  if text_type == "HANDWRITING":
107
  is_handwriting = True
@@ -110,42 +139,43 @@ def json_to_ocrresult(json_data, page_width, page_height):
110
  recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= text, score= confidence, start=0, end=word_end, left=left_abs, top=top_abs, width=width_abs, height=height_abs)
111
  handwriting.append(recogniser_result)
112
  print("Handwriting found:", handwriting[-1])
113
-
114
- all_ocr_results.append(ocr_result)
115
 
116
- elif (text_block['BlockType'] == 'SIGNATURE'):
117
- text = "SIGNATURE"
118
 
119
- # Extract BoundingBox details
120
- bbox = text_block["Geometry"]["BoundingBox"]
121
- left = bbox["Left"]
122
- top = bbox["Top"]
123
- width = bbox["Width"]
124
- height = bbox["Height"]
125
 
126
- # Convert proportional coordinates to absolute coordinates
127
- left_abs = int(left * page_width)
128
- top_abs = int(top * page_height)
129
- width_abs = int(width * page_width)
130
- height_abs = int(height * page_height)
 
131
 
132
- # Create OCRResult with absolute coordinates
133
- ocr_result = OCRResult(text, left_abs, top_abs, width_abs, height_abs)
 
 
 
134
 
 
 
 
135
 
136
- is_signature = True
137
- entity_name = "Signature"
138
- word_end = len(entity_name)
139
- recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= text, score= confidence, start=0, end=word_end, left=left_abs, top=top_abs, width=width_abs, height=height_abs)
140
- signatures.append(recogniser_result)
141
- print("Signature found:", signatures[-1])
142
-
143
  all_ocr_results.append(ocr_result)
144
 
145
- is_signature_or_handwriting = is_signature | is_handwriting
 
 
 
 
146
 
147
- # If it is signature or handwriting, will overwrite the default behaviour of the PII analyser
148
- if is_signature_or_handwriting:
149
- signature_or_handwriting_recogniser_results.append(recogniser_result)
150
 
151
- return all_ocr_results, signature_or_handwriting_recogniser_results
 
7
  from pdf2image import convert_from_bytes
8
  from tools.custom_image_analyser_engine import OCRResult, CustomImageRecognizerResult
9
 
10
+ def extract_textract_metadata(response):
11
+ """Extracts metadata from an AWS Textract response."""
12
+
13
+ print("Document metadata:", response['DocumentMetadata'])
14
+
15
+ request_id = response['ResponseMetadata']['RequestId']
16
+ pages = response['DocumentMetadata']['Pages']
17
+ #number_of_pages = response['DocumentMetadata']['NumberOfPages']
18
+
19
+ return str({
20
+ 'RequestId': request_id,
21
+ 'Pages': pages
22
+ #,
23
+ #'NumberOfPages': number_of_pages
24
+ })
25
+
26
  def analyse_page_with_textract(pdf_page_bytes, json_file_path):
27
  '''
28
  Analyse page with AWS Textract
 
43
  #response = client.detect_document_text(Document={'Bytes': image_bytes})
44
  response = client.analyze_document(Document={'Bytes': pdf_page_bytes}, FeatureTypes=["SIGNATURES"])
45
 
46
+ text_blocks = response['Blocks']
47
+ request_metadata = extract_textract_metadata(response)
48
 
49
  # Write the response to a JSON file
50
  with open(json_file_path, 'w') as json_file:
 
52
 
53
  print("Response has been written to output:", json_file_path)
54
 
55
+ return text_blocks, request_metadata
56
 
57
 
58
  def convert_pike_pdf_page_to_bytes(pdf, page_num):
 
83
 
84
  def json_to_ocrresult(json_data, page_width, page_height):
85
  '''
86
+ Convert the json response from textract to the OCRResult format used elsewhere in the code. Looks for lines, words, and signatures. Handwriting and signatures are set aside especially for later in case the user wants to override the default behaviour and redact all handwriting/signatures.
87
  '''
88
  all_ocr_results = []
89
  signature_or_handwriting_recogniser_results = []
90
+ signature_recogniser_results = []
91
+ handwriting_recogniser_results = []
92
  signatures = []
93
  handwriting = []
94
 
 
97
  is_signature = False
98
  is_handwriting = False
99
 
100
+ if (text_block['BlockType'] == 'LINE') | (text_block['BlockType'] == 'SIGNATURE'): # (text_block['BlockType'] == 'WORD') |
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
 
102
+ if (text_block['BlockType'] == 'LINE'):
103
+
104
+ # If a line, pull out the text type and confidence from the child words and get text, bounding box
105
+
106
+ if 'Text' in text_block:
107
+ text = text_block['Text']
108
+
109
+ if 'Relationships' in text_block:
110
+ for relationship in text_block['Relationships']:
111
+ if relationship['Type'] == 'CHILD':
112
+ for child_id in relationship['Ids']:
113
+ child_block = next((block for block in json_data if block['Id'] == child_id), None)
114
+ if child_block and 'TextType' in child_block:
115
+ text_type = child_block['TextType']
116
+ confidence = text_block['Confidence']
117
+ break
118
+ break
119
+
120
+ # Extract BoundingBox details
121
+ bbox = text_block["Geometry"]["BoundingBox"]
122
+ left = bbox["Left"]
123
+ top = bbox["Top"]
124
+ width = bbox["Width"]
125
+ height = bbox["Height"]
126
+
127
+ # Convert proportional coordinates to absolute coordinates
128
+ left_abs = int(left * page_width)
129
+ top_abs = int(top * page_height)
130
+ width_abs = int(width * page_width)
131
+ height_abs = int(height * page_height)
132
+
133
+ # If handwriting or signature, add to bounding box
134
 
135
  if text_type == "HANDWRITING":
136
  is_handwriting = True
 
139
  recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= text, score= confidence, start=0, end=word_end, left=left_abs, top=top_abs, width=width_abs, height=height_abs)
140
  handwriting.append(recogniser_result)
141
  print("Handwriting found:", handwriting[-1])
 
 
142
 
143
+ elif (text_block['BlockType'] == 'SIGNATURE'):
144
+ text = "SIGNATURE"
145
 
146
+ is_signature = True
147
+ entity_name = "SIGNATURE"
148
+ confidence = text_block['Confidence']
149
+ word_end = len(entity_name)
 
 
150
 
151
+ # Extract BoundingBox details
152
+ bbox = text_block["Geometry"]["BoundingBox"]
153
+ left = bbox["Left"]
154
+ top = bbox["Top"]
155
+ width = bbox["Width"]
156
+ height = bbox["Height"]
157
 
158
+ # Convert proportional coordinates to absolute coordinates
159
+ left_abs = int(left * page_width)
160
+ top_abs = int(top * page_height)
161
+ width_abs = int(width * page_width)
162
+ height_abs = int(height * page_height)
163
 
164
+ recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= text, score= confidence, start=0, end=word_end, left=left_abs, top=top_abs, width=width_abs, height=height_abs)
165
+ signatures.append(recogniser_result)
166
+ print("Signature found:", signatures[-1])
167
 
168
+ # Create OCRResult with absolute coordinates
169
+ ocr_result = OCRResult(text, left_abs, top_abs, width_abs, height_abs)
 
 
 
 
 
170
  all_ocr_results.append(ocr_result)
171
 
172
+ is_signature_or_handwriting = is_signature | is_handwriting
173
+
174
+ # If it is signature or handwriting, will overwrite the default behaviour of the PII analyser
175
+ if is_signature_or_handwriting:
176
+ signature_or_handwriting_recogniser_results.append(recogniser_result)
177
 
178
+ if is_signature: signature_recogniser_results.append(recogniser_result)
179
+ if is_handwriting: handwriting_recogniser_results.append(recogniser_result)
 
180
 
181
+ return all_ocr_results, signature_or_handwriting_recogniser_results, signature_recogniser_results, handwriting_recogniser_results
tools/custom_image_analyser_engine.py CHANGED
@@ -64,22 +64,40 @@ class CustomImageAnalyzerEngine:
64
  ocr_results: List[OCRResult],
65
  **text_analyzer_kwargs
66
  ) -> List[CustomImageRecognizerResult]:
67
- # Combine all OCR text
68
- full_text = ' '.join([result.text for result in ocr_results])
69
-
70
  # Define English as default language, if not specified
71
  if "language" not in text_analyzer_kwargs:
72
  text_analyzer_kwargs["language"] = "en"
73
 
74
- analyzer_result = self.analyzer_engine.analyze(
75
- text=full_text, **text_analyzer_kwargs
76
- )
77
-
78
  allow_list = text_analyzer_kwargs.get('allow_list', [])
79
-
80
- return self.map_analyzer_results_to_bounding_boxes(
81
- analyzer_result, ocr_results, full_text, allow_list
82
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
  @staticmethod
85
  def map_analyzer_results_to_bounding_boxes(
@@ -113,4 +131,58 @@ class CustomImageAnalyzerEngine:
113
 
114
  text_position = word_end + 1 # +1 for the space between words
115
 
116
- return pii_bboxes
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  ocr_results: List[OCRResult],
65
  **text_analyzer_kwargs
66
  ) -> List[CustomImageRecognizerResult]:
 
 
 
67
  # Define English as default language, if not specified
68
  if "language" not in text_analyzer_kwargs:
69
  text_analyzer_kwargs["language"] = "en"
70
 
 
 
 
 
71
  allow_list = text_analyzer_kwargs.get('allow_list', [])
72
+ combined_results = []
73
+
74
+ for ocr_result in ocr_results:
75
+ # Analyze each OCR result (line) individually
76
+ analyzer_result = self.analyzer_engine.analyze(
77
+ text=ocr_result.text, **text_analyzer_kwargs
78
+ )
79
+
80
+ for result in analyzer_result:
81
+ # Extract the relevant portion of text based on start and end
82
+ relevant_text = ocr_result.text[result.start:result.end]
83
+
84
+ # Create a new OCRResult with the relevant text and adjusted position
85
+ relevant_ocr_result = OCRResult(
86
+ text=relevant_text,
87
+ left=ocr_result.left + self.estimate_x_offset(ocr_result.text, result.start),
88
+ top=ocr_result.top,
89
+ width=self.estimate_width(ocr_result, result.start, result.end),
90
+ height=ocr_result.height
91
+ )
92
+
93
+ # Map the analyzer results to bounding boxes for this line
94
+ line_results = self.map_analyzer_results_to_bounding_boxes(
95
+ [result], [relevant_ocr_result], relevant_text, allow_list
96
+ )
97
+
98
+ combined_results.extend(line_results)
99
+
100
+ return combined_results
101
 
102
  @staticmethod
103
  def map_analyzer_results_to_bounding_boxes(
 
131
 
132
  text_position = word_end + 1 # +1 for the space between words
133
 
134
+ return pii_bboxes
135
+
136
+ @staticmethod
137
+ def estimate_x_offset(full_text: str, start: int) -> int:
138
+ # Estimate the x-offset based on character position
139
+ # This is a simple estimation and might need refinement for variable-width fonts
140
+ return int(start / len(full_text) * len(full_text))
141
+
142
+ @staticmethod
143
+ def estimate_width(ocr_result: OCRResult, start: int, end: int) -> int:
144
+ # Estimate the width of the relevant text portion
145
+ full_width = ocr_result.width
146
+ full_length = len(ocr_result.text)
147
+ return int((end - start) / full_length * full_width)
148
+
149
+ # Function to combine OCR results into line-level results
150
+ def combine_ocr_results(ocr_results, x_threshold = 20, y_threshold = 10):
151
+ # Sort OCR results by 'top' to ensure line order
152
+ ocr_results = sorted(ocr_results, key=lambda x: (x.top, x.left))
153
+
154
+ combined_results = []
155
+ current_line = []
156
+ current_bbox = None
157
+
158
+ for result in ocr_results:
159
+ if not current_line:
160
+ # Start a new line
161
+ current_line.append(result)
162
+ current_bbox = result
163
+ else:
164
+ # Check if the result is on the same line (y-axis) and close horizontally (x-axis)
165
+ last_result = current_line[-1]
166
+ if abs(result.top - last_result.top) <= y_threshold and \
167
+ (result.left - (last_result.left + last_result.width)) <= x_threshold:
168
+ # Update the bounding box to include the new word
169
+ new_right = max(current_bbox.left + current_bbox.width, result.left + result.width)
170
+ current_bbox = OCRResult(
171
+ text=f"{current_bbox.text} {result.text}",
172
+ left=current_bbox.left,
173
+ top=current_bbox.top,
174
+ width=new_right - current_bbox.left,
175
+ height=max(current_bbox.height, result.height)
176
+ )
177
+ current_line.append(result)
178
+ else:
179
+ # Commit the current line and start a new one
180
+ combined_results.append(current_bbox)
181
+ current_line = [result]
182
+ current_bbox = result
183
+
184
+ # Append the last line
185
+ if current_bbox:
186
+ combined_results.append(current_bbox)
187
+
188
+ return combined_results
tools/file_conversion.py CHANGED
@@ -3,6 +3,7 @@ from tools.helper_functions import get_file_path_end, output_folder, detect_file
3
  from PIL import Image
4
  import os
5
  import time
 
6
  from gradio import Progress
7
  from typing import List, Optional
8
 
@@ -174,6 +175,15 @@ def prepare_image_or_text_pdf(
174
  if file_extension in ['.jpg', '.jpeg', '.png']:
175
  in_redact_method = "Image analysis"
176
 
 
 
 
 
 
 
 
 
 
177
  #if file_path:
178
  # file_path_without_ext = get_file_path_end(file_path)
179
  if not file_path:
 
3
  from PIL import Image
4
  import os
5
  import time
6
+ import json
7
  from gradio import Progress
8
  from typing import List, Optional
9
 
 
175
  if file_extension in ['.jpg', '.jpeg', '.png']:
176
  in_redact_method = "Image analysis"
177
 
178
+ # If the file loaded in is json, assume this is a textract response object. Save this to the output folder so it can be found later during redaction and go to the next file.
179
+ if file_extension in ['.json']:
180
+ json_contents = json.load(file_path)
181
+ # Write the response to a JSON file
182
+ out_folder = output_folder + file_path
183
+ with open(file_path, 'w') as json_file:
184
+ json.dump(json_contents, out_folder, indent=4) # indent=4 makes the JSON file pretty-printed
185
+ continue
186
+
187
  #if file_path:
188
  # file_path_without_ext = get_file_path_end(file_path)
189
  if not file_path:
tools/file_redaction.py CHANGED
@@ -16,7 +16,7 @@ from gradio import Progress
16
 
17
  from collections import defaultdict # For efficient grouping
18
 
19
- from tools.custom_image_analyser_engine import CustomImageAnalyzerEngine, OCRResult
20
  from tools.file_conversion import process_file
21
  from tools.load_spacy_model_custom_recognisers import nlp_analyser, score_threshold
22
  from tools.helper_functions import get_file_path_end, output_folder
@@ -24,9 +24,11 @@ from tools.file_conversion import process_file, is_pdf, convert_text_pdf_to_img_
24
  from tools.data_anonymise import generate_decision_process_output
25
  from tools.aws_textract import analyse_page_with_textract, convert_pike_pdf_page_to_bytes, json_to_ocrresult
26
 
27
- def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], language:str, chosen_redact_entities:List[str], in_redact_method:str, in_allow_list:List[List[str]]=None, latest_file_completed:int=0, out_message:list=[], out_file_paths:list=[], log_files_output_paths:list=[], first_loop_state:bool=False, page_min:int=0, page_max:int=999, estimated_time_taken_state:float=0.0, progress=gr.Progress(track_tqdm=True)):
28
 
29
  tic = time.perf_counter()
 
 
30
 
31
  # If this is the first time around, set variables to 0/blank
32
  if first_loop_state==True:
@@ -75,12 +77,15 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
75
  estimate_total_processing_time = sum_numbers_before_seconds(final_out_message)
76
  print("Estimated total processing time:", str(estimate_total_processing_time))
77
 
78
- return final_out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimate_total_processing_time
79
 
80
  file_paths_loop = [file_paths[int(latest_file_completed)]]
81
 
82
- if in_allow_list:
83
- in_allow_list_flat = [item for sublist in in_allow_list for item in sublist]
 
 
 
84
 
85
 
86
  for file in progress.tqdm(file_paths_loop, desc="Redacting files", unit = "files"):
@@ -96,7 +101,7 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
96
  else:
97
  out_message = "No file selected"
98
  print(out_message)
99
- return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state
100
 
101
  if in_redact_method == "Image analysis" or in_redact_method == "AWS Textract":
102
  # Analyse and redact image-based pdf or image
@@ -104,7 +109,9 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
104
  # return "Please upload a PDF file or image file (JPG, PNG) for image analysis.", None
105
 
106
  print("Redacting file" + file_path_without_ext + "as an image-based file")
107
- pdf_images, output_logs, logging_file_paths = redact_image_pdf(file_path, image_paths, language, chosen_redact_entities, in_allow_list_flat, is_a_pdf, page_min, page_max, in_redact_method)
 
 
108
  out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
109
  pdf_images[0].save(out_image_file_path, "PDF" ,resolution=100.0, save_all=True, append_images=pdf_images[1:])
110
 
@@ -114,12 +121,18 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
114
 
115
  out_message.append("File '" + file_path_without_ext + "' successfully redacted")
116
 
 
117
  output_logs_str = str(output_logs)
118
  logs_output_file_name = out_image_file_path + "_decision_process_output.txt"
119
  with open(logs_output_file_name, "w") as f:
120
  f.write(output_logs_str)
121
  log_files_output_paths.append(logs_output_file_name)
122
 
 
 
 
 
 
123
  # Increase latest file completed count unless we are at the last file
124
  if latest_file_completed != len(file_paths):
125
  print("Completed file number:", str(latest_file_completed))
@@ -165,7 +178,7 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
165
  else:
166
  out_message = "No redaction method selected"
167
  print(out_message)
168
- return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state
169
 
170
 
171
  toc = time.perf_counter()
@@ -175,15 +188,33 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
175
  out_message_out = '\n'.join(out_message)
176
  out_message_out = out_message_out + " " + out_time
177
 
178
- return out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state
 
 
 
 
 
179
 
180
- def merge_img_bboxes(bboxes, handwriting_or_signature_boxes = [], horizontal_threshold=150, vertical_threshold=25):
 
 
 
 
 
 
181
  merged_bboxes = []
182
  grouped_bboxes = defaultdict(list)
183
 
184
- if handwriting_or_signature_boxes:
185
- print("Handwriting or signature boxes exist at merge:", handwriting_or_signature_boxes)
186
- bboxes.extend(handwriting_or_signature_boxes)
 
 
 
 
 
 
 
187
 
188
  # 1. Group by approximate vertical proximity
189
  for box in bboxes:
@@ -198,13 +229,18 @@ def merge_img_bboxes(bboxes, handwriting_or_signature_boxes = [], horizontal_thr
198
  if next_box.left - (merged_box.left + merged_box.width) <= horizontal_threshold:
199
  #print("Merging a box")
200
  # Calculate new dimensions for the merged box
201
- print("Merged box:", merged_box)
 
 
 
 
 
202
  new_left = min(merged_box.left, next_box.left)
203
  new_top = min(merged_box.top, next_box.top)
204
  new_width = max(merged_box.left + merged_box.width, next_box.left + next_box.width) - new_left
205
  new_height = max(merged_box.top + merged_box.height, next_box.top + next_box.height) - new_top
206
- merged_box = ImageRecognizerResult(
207
- merged_box.entity_type, merged_box.start, merged_box.end, merged_box.score, new_left, new_top, new_width, new_height
208
  )
209
  else:
210
  merged_bboxes.append(merged_box)
@@ -213,7 +249,7 @@ def merge_img_bboxes(bboxes, handwriting_or_signature_boxes = [], horizontal_thr
213
  merged_bboxes.append(merged_box)
214
  return merged_bboxes
215
 
216
- def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, is_a_pdf:bool=True, page_min:int=0, page_max:int=999, analysis_type:str="Image analysis", progress=Progress(track_tqdm=True)):
217
  '''
218
  Take an path for an image of a document, then run this image through the Presidio ImageAnalyzer and PIL to get a redacted page back. Adapted from Presidio ImageRedactorEngine.
219
  '''
@@ -223,6 +259,7 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
223
  fill = (0, 0, 0) # Fill colour
224
  decision_process_output_str = ""
225
  images = []
 
226
  image_analyser = CustomImageAnalyzerEngine(nlp_analyser)
227
 
228
  if not image_paths:
@@ -256,6 +293,12 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
256
  print("Page range:", str(page_min + 1), "to", str(page_max))
257
 
258
  #for i in progress.tqdm(range(0,number_of_pages), total=number_of_pages, unit="pages", desc="Redacting pages"):
 
 
 
 
 
 
259
 
260
  for n in range(0, number_of_pages):
261
  handwriting_or_signature_boxes = []
@@ -277,6 +320,7 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
277
 
278
  print("Redacting page", reported_page_number)
279
 
 
280
  # Assuming image_paths[i] is your PIL image object
281
  try:
282
  image = image_paths[0][i]#.copy()
@@ -286,45 +330,25 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
286
  print(e)
287
  continue
288
 
289
- # %%
290
- # image_analyser = ImageAnalyzerEngine(nlp_analyser)
291
- # engine = ImageRedactorEngine(image_analyser)
292
 
 
293
  if language == 'en':
294
  ocr_lang = 'eng'
295
  else: ocr_lang = language
296
 
297
- # bboxes = image_analyser.analyze(image,
298
- # ocr_kwargs={"lang": ocr_lang},
299
- # **{
300
- # "allow_list": allow_list,
301
- # "language": language,
302
- # "entities": chosen_redact_entities,
303
- # "score_threshold": score_threshold,
304
- # "return_decision_process":True,
305
- # })
306
-
307
  # Step 1: Perform OCR. Either with Tesseract, or with AWS Textract
308
  if analysis_type == "Image analysis":
 
309
  ocr_results = image_analyser.perform_ocr(image)
310
 
311
- # Process all OCR text with bounding boxes
312
- #print("OCR results:", ocr_results)
313
- ocr_results_str = str(ocr_results)
314
- ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_page_" + reported_page_number + ".txt"
315
- with open(ocr_results_file_path, "w") as f:
316
- f.write(ocr_results_str)
317
- logging_file_paths.append(ocr_results_file_path)
318
-
319
  # Import results from json and convert
320
  if analysis_type == "AWS Textract":
321
-
322
- # Ensure image is a PIL Image object
323
- # if isinstance(image, str):
324
- # image = Image.open(image)
325
- # elif not isinstance(image, Image.Image):
326
- # print(f"Unexpected image type on page {i}: {type(image)}")
327
- # continue
328
 
329
  # Convert the image to bytes using an in-memory buffer
330
  image_buffer = io.BytesIO()
@@ -334,7 +358,7 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
334
  json_file_path = output_folder + file_name + "_page_" + reported_page_number + "_textract.json"
335
 
336
  if not os.path.exists(json_file_path):
337
- text_blocks = analyse_page_with_textract(pdf_page_as_bytes, json_file_path) # Analyse page with Textract
338
  logging_file_paths.append(json_file_path)
339
  else:
340
  # Open the file and load the JSON data
@@ -343,19 +367,7 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
343
  text_blocks = json.load(json_file)
344
  text_blocks = text_blocks['Blocks']
345
 
346
-
347
- # Need image size to convert textract OCR outputs to the correct sizes
348
- #print("Image size:", image.size)
349
- page_width, page_height = image.size
350
-
351
- ocr_results, handwriting_or_signature_boxes = json_to_ocrresult(text_blocks, page_width, page_height)
352
-
353
- #print("OCR results:", ocr_results)
354
- ocr_results_str = str(ocr_results)
355
- textract_ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_page_" + reported_page_number + "_textract.txt"
356
- with open(textract_ocr_results_file_path, "w") as f:
357
- f.write(ocr_results_str)
358
- logging_file_paths.append(textract_ocr_results_file_path)
359
 
360
  # Step 2: Analyze text and identify PII
361
  bboxes = image_analyser.analyze_text(
@@ -364,21 +376,19 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
364
  entities=chosen_redact_entities,
365
  allow_list=allow_list,
366
  score_threshold=score_threshold,
367
- )
368
-
369
- # Process the bboxes (PII entities)
370
- if bboxes:
371
- for bbox in bboxes:
372
- print(f"Entity: {bbox.entity_type}, Text: {bbox.text}, Bbox: ({bbox.left}, {bbox.top}, {bbox.width}, {bbox.height})")
373
- decision_process_output_str = str(bboxes)
374
- print("Decision process:", decision_process_output_str)
375
 
376
  # Merge close bounding boxes
377
- merged_bboxes = merge_img_bboxes(bboxes, handwriting_or_signature_boxes)
 
 
 
 
 
378
 
379
- #print("For page:", str(i), "Merged bounding boxes:", merged_bboxes)
380
- #from PIL import Image
381
- #image_object = Image.open(image)
382
 
383
  # 3. Draw the merged boxes
384
  draw = ImageDraw.Draw(image)
@@ -390,9 +400,20 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
390
  y1 = y0 + box.height
391
  draw.rectangle([x0, y0, x1, y1], fill=fill)
392
 
 
 
 
393
  images.append(image)
394
 
395
- return images, decision_process_output_str, logging_file_paths
 
 
 
 
 
 
 
 
396
 
397
  def analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list):
398
  if isinstance(text_container, LTTextContainer):
 
16
 
17
  from collections import defaultdict # For efficient grouping
18
 
19
+ from tools.custom_image_analyser_engine import CustomImageAnalyzerEngine, OCRResult, combine_ocr_results, CustomImageRecognizerResult
20
  from tools.file_conversion import process_file
21
  from tools.load_spacy_model_custom_recognisers import nlp_analyser, score_threshold
22
  from tools.helper_functions import get_file_path_end, output_folder
 
24
  from tools.data_anonymise import generate_decision_process_output
25
  from tools.aws_textract import analyse_page_with_textract, convert_pike_pdf_page_to_bytes, json_to_ocrresult
26
 
27
+ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], language:str, chosen_redact_entities:List[str], in_redact_method:str, in_allow_list:List[List[str]]=None, latest_file_completed:int=0, out_message:list=[], out_file_paths:list=[], log_files_output_paths:list=[], first_loop_state:bool=False, page_min:int=0, page_max:int=999, estimated_time_taken_state:float=0.0, handwrite_signature_checkbox:List[str]=["Redact all identified handwriting", "Redact all identified signatures"], progress=gr.Progress(track_tqdm=True)):
28
 
29
  tic = time.perf_counter()
30
+ all_request_metadata = []
31
+ all_request_metadata_str = ""
32
 
33
  # If this is the first time around, set variables to 0/blank
34
  if first_loop_state==True:
 
77
  estimate_total_processing_time = sum_numbers_before_seconds(final_out_message)
78
  print("Estimated total processing time:", str(estimate_total_processing_time))
79
 
80
+ return final_out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimate_total_processing_time, all_request_metadata
81
 
82
  file_paths_loop = [file_paths[int(latest_file_completed)]]
83
 
84
+ if not in_allow_list.empty:
85
+ in_allow_list_flat = in_allow_list[0].tolist()
86
+ print("In allow list:", in_allow_list_flat)
87
+ else:
88
+ in_allow_list_flat = []
89
 
90
 
91
  for file in progress.tqdm(file_paths_loop, desc="Redacting files", unit = "files"):
 
101
  else:
102
  out_message = "No file selected"
103
  print(out_message)
104
+ return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata
105
 
106
  if in_redact_method == "Image analysis" or in_redact_method == "AWS Textract":
107
  # Analyse and redact image-based pdf or image
 
109
  # return "Please upload a PDF file or image file (JPG, PNG) for image analysis.", None
110
 
111
  print("Redacting file" + file_path_without_ext + "as an image-based file")
112
+ pdf_images, output_logs, logging_file_paths, request_metadata = redact_image_pdf(file_path, image_paths, language, chosen_redact_entities, in_allow_list_flat, is_a_pdf, page_min, page_max, in_redact_method, handwrite_signature_checkbox)
113
+
114
+ # Save file
115
  out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
116
  pdf_images[0].save(out_image_file_path, "PDF" ,resolution=100.0, save_all=True, append_images=pdf_images[1:])
117
 
 
121
 
122
  out_message.append("File '" + file_path_without_ext + "' successfully redacted")
123
 
124
+ # Save decision making process
125
  output_logs_str = str(output_logs)
126
  logs_output_file_name = out_image_file_path + "_decision_process_output.txt"
127
  with open(logs_output_file_name, "w") as f:
128
  f.write(output_logs_str)
129
  log_files_output_paths.append(logs_output_file_name)
130
 
131
+ # Save Textract request metadata (if exists)
132
+ if request_metadata:
133
+ print("Request metadata:", all_request_metadata)
134
+ all_request_metadata.append(request_metadata)
135
+
136
  # Increase latest file completed count unless we are at the last file
137
  if latest_file_completed != len(file_paths):
138
  print("Completed file number:", str(latest_file_completed))
 
178
  else:
179
  out_message = "No redaction method selected"
180
  print(out_message)
181
+ return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata
182
 
183
 
184
  toc = time.perf_counter()
 
188
  out_message_out = '\n'.join(out_message)
189
  out_message_out = out_message_out + " " + out_time
190
 
191
+ # If textract requests made, write to logging file
192
+ if all_request_metadata:
193
+ all_request_metadata_str = '\n'.join(all_request_metadata)
194
+
195
+ print("all_request_metadata_file_path")
196
+ all_request_metadata_file_path = output_folder + "textract_request_metadata.txt"
197
 
198
+ with open(all_request_metadata_file_path, "w") as f:
199
+ f.write(all_request_metadata_str)
200
+ log_files_output_paths.append(all_request_metadata_file_path)
201
+
202
+ return out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
203
+
204
+ def merge_img_bboxes(bboxes, signature_recogniser_results = [], handwriting_recogniser_results = [], handwrite_signature_checkbox:List[str]=["Redact all identified handwriting", "Redact all identified signatures"], horizontal_threshold=150, vertical_threshold=25):
205
  merged_bboxes = []
206
  grouped_bboxes = defaultdict(list)
207
 
208
+ if signature_recogniser_results or handwriting_recogniser_results:
209
+
210
+ if "Redact all identified handwriting" in handwrite_signature_checkbox:
211
+ print("Handwriting boxes exist at merge:", handwriting_recogniser_results)
212
+ bboxes.extend(handwriting_recogniser_results)
213
+
214
+
215
+ if "Redact all identified signatures" in handwrite_signature_checkbox:
216
+ print("Signature boxes exist at merge:", handwriting_recogniser_results)
217
+ bboxes.extend(signature_recogniser_results)
218
 
219
  # 1. Group by approximate vertical proximity
220
  for box in bboxes:
 
229
  if next_box.left - (merged_box.left + merged_box.width) <= horizontal_threshold:
230
  #print("Merging a box")
231
  # Calculate new dimensions for the merged box
232
+ #print("Merged box:", merged_box)
233
+ if merged_box.text == next_box.text:
234
+ new_text = merged_box.text
235
+ else:
236
+ new_text = merged_box.text + " " + next_box.text
237
+
238
  new_left = min(merged_box.left, next_box.left)
239
  new_top = min(merged_box.top, next_box.top)
240
  new_width = max(merged_box.left + merged_box.width, next_box.left + next_box.width) - new_left
241
  new_height = max(merged_box.top + merged_box.height, next_box.top + next_box.height) - new_top
242
+ merged_box = CustomImageRecognizerResult(
243
+ merged_box.entity_type, merged_box.start, merged_box.end, merged_box.score, new_left, new_top, new_width, new_height, new_text
244
  )
245
  else:
246
  merged_bboxes.append(merged_box)
 
249
  merged_bboxes.append(merged_box)
250
  return merged_bboxes
251
 
252
+ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, is_a_pdf:bool=True, page_min:int=0, page_max:int=999, analysis_type:str="Image analysis", handwrite_signature_checkbox:List[str]=["Redact all identified handwriting", "Redact all identified signatures"], progress=Progress(track_tqdm=True)):
253
  '''
254
  Take an path for an image of a document, then run this image through the Presidio ImageAnalyzer and PIL to get a redacted page back. Adapted from Presidio ImageRedactorEngine.
255
  '''
 
259
  fill = (0, 0, 0) # Fill colour
260
  decision_process_output_str = ""
261
  images = []
262
+ request_metadata = {}
263
  image_analyser = CustomImageAnalyzerEngine(nlp_analyser)
264
 
265
  if not image_paths:
 
293
  print("Page range:", str(page_min + 1), "to", str(page_max))
294
 
295
  #for i in progress.tqdm(range(0,number_of_pages), total=number_of_pages, unit="pages", desc="Redacting pages"):
296
+
297
+ all_ocr_results = []
298
+ all_decision_process = []
299
+
300
+ if analysis_type == "Image analysis": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".txt"
301
+ elif analysis_type == "AWS Textract": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + "_textract.txt"
302
 
303
  for n in range(0, number_of_pages):
304
  handwriting_or_signature_boxes = []
 
320
 
321
  print("Redacting page", reported_page_number)
322
 
323
+
324
  # Assuming image_paths[i] is your PIL image object
325
  try:
326
  image = image_paths[0][i]#.copy()
 
330
  print(e)
331
  continue
332
 
333
+ # Need image size to convert textract OCR outputs to the correct sizes
334
+ page_width, page_height = image.size
 
335
 
336
+ # Possibility to use different languages
337
  if language == 'en':
338
  ocr_lang = 'eng'
339
  else: ocr_lang = language
340
 
 
 
 
 
 
 
 
 
 
 
341
  # Step 1: Perform OCR. Either with Tesseract, or with AWS Textract
342
  if analysis_type == "Image analysis":
343
+
344
  ocr_results = image_analyser.perform_ocr(image)
345
 
346
+ # Combine OCR results
347
+ ocr_results = combine_ocr_results(ocr_results)
348
+
 
 
 
 
 
349
  # Import results from json and convert
350
  if analysis_type == "AWS Textract":
351
+
 
 
 
 
 
 
352
 
353
  # Convert the image to bytes using an in-memory buffer
354
  image_buffer = io.BytesIO()
 
358
  json_file_path = output_folder + file_name + "_page_" + reported_page_number + "_textract.json"
359
 
360
  if not os.path.exists(json_file_path):
361
+ text_blocks, request_metadata = analyse_page_with_textract(pdf_page_as_bytes, json_file_path) # Analyse page with Textract
362
  logging_file_paths.append(json_file_path)
363
  else:
364
  # Open the file and load the JSON data
 
367
  text_blocks = json.load(json_file)
368
  text_blocks = text_blocks['Blocks']
369
 
370
+ ocr_results, handwriting_or_signature_boxes, signature_recogniser_results, handwriting_recogniser_results = json_to_ocrresult(text_blocks, page_width, page_height)
 
 
 
 
 
 
 
 
 
 
 
 
371
 
372
  # Step 2: Analyze text and identify PII
373
  bboxes = image_analyser.analyze_text(
 
376
  entities=chosen_redact_entities,
377
  allow_list=allow_list,
378
  score_threshold=score_threshold,
379
+ )
 
 
 
 
 
 
 
380
 
381
  # Merge close bounding boxes
382
+ merged_bboxes = merge_img_bboxes(bboxes, signature_recogniser_results, handwriting_recogniser_results, handwrite_signature_checkbox)
383
+
384
+ # Export the decision making process
385
+ if merged_bboxes:
386
+ for bbox in merged_bboxes:
387
+ print(f"Entity: {bbox.entity_type}, Text: {bbox.text}, Bbox: ({bbox.left}, {bbox.top}, {bbox.width}, {bbox.height})")
388
 
389
+
390
+ decision_process_output_str = "Page " + reported_page_number + ":\n" + str(merged_bboxes)
391
+ all_decision_process.append(decision_process_output_str)
392
 
393
  # 3. Draw the merged boxes
394
  draw = ImageDraw.Draw(image)
 
400
  y1 = y0 + box.height
401
  draw.rectangle([x0, y0, x1, y1], fill=fill)
402
 
403
+ ocr_results_str = "Page:" + reported_page_number + "\n" + str(ocr_results)
404
+ all_ocr_results.append(ocr_results_str)
405
+
406
  images.append(image)
407
 
408
+ # Write OCR results as a log file
409
+ ocr_results_out = "\n".join(all_ocr_results)
410
+ with open(ocr_results_file_path, "w") as f:
411
+ f.write(ocr_results_out)
412
+ logging_file_paths.append(ocr_results_file_path)
413
+
414
+ all_decision_process_str = "\n".join(all_decision_process)
415
+
416
+ return images, all_decision_process_str, logging_file_paths, request_metadata
417
 
418
  def analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list):
419
  if isinstance(text_container, LTTextContainer):
tools/helper_functions.py CHANGED
@@ -73,6 +73,31 @@ def ensure_output_folder_exists():
73
  else:
74
  print(f"The 'output/' folder already exists.")
75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  def put_columns_in_df(in_file):
77
  new_choices = []
78
  concat_choices = []
 
73
  else:
74
  print(f"The 'output/' folder already exists.")
75
 
76
+ def custom_regex_load(in_file):
77
+ '''
78
+ When file is loaded, update the column dropdown choices and write to relevant data states.
79
+ '''
80
+
81
+ custom_regex = pd.DataFrame()
82
+
83
+ file_list = [string.name for string in in_file]
84
+
85
+ regex_file_names = [string for string in file_list if "csv" in string.lower()]
86
+ if regex_file_names:
87
+ regex_file_name = regex_file_names[0]
88
+ custom_regex = pd.read_csv(regex_file_name, low_memory=False, header=None)
89
+ #regex_file_name_no_ext = get_file_path_end(regex_file_name)
90
+
91
+ output_text = "Allow list file loaded."
92
+ print(output_text)
93
+ else:
94
+ error = "No allow list file provided."
95
+ print(error)
96
+ output_text = error
97
+ return error, custom_regex
98
+
99
+ return output_text, custom_regex
100
+
101
  def put_columns_in_df(in_file):
102
  new_choices = []
103
  concat_choices = []
tools/load_spacy_model_custom_recognisers.py CHANGED
@@ -26,7 +26,7 @@ titles_recogniser = PatternRecognizer(supported_entity="TITLES", patterns = [tit
26
  # Custom postcode recogniser
27
 
28
  # Define the regex pattern in a Presidio `Pattern` object:
29
- ukpostcode_pattern = Pattern(name="ukpostcode_pattern",regex="\\b(?:[A-Z][A-HJ-Y]?[0-9][0-9A-Z]? ?[0-9][A-Z]{2}|GIR ?0A{2})\\b|(?:[A-Z][A-HJ-Y]?[0-9][0-9A-Z]? ?[0-9]{1}?)$|\\b(?:[A-Z][A-HJ-Y]?[0-9][0-9A-Z]?)\\b", score = 1)
30
 
31
  # Define the recognizer with one or more patterns
32
  ukpostcode_recogniser = PatternRecognizer(supported_entity="UKPOSTCODE", patterns = [ukpostcode_pattern])
 
26
  # Custom postcode recogniser
27
 
28
  # Define the regex pattern in a Presidio `Pattern` object:
29
+ ukpostcode_pattern = Pattern(name="ukpostcode_pattern",regex="\b([A-Z][A-HJ-Y]?[0-9][0-9A-Z]? ?[0-9][A-Z]{2}|GIR ?0AA)\b", score = 1)
30
 
31
  # Define the recognizer with one or more patterns
32
  ukpostcode_recogniser = PatternRecognizer(supported_entity="UKPOSTCODE", patterns = [ukpostcode_pattern])