seanpedrickcase commited on
Commit
a748df6
1 Parent(s): 8652429

Generally improved OCR recognition of texts, corrected postcode regex

Browse files
tools/custom_image_analyser_engine.py CHANGED
@@ -420,7 +420,7 @@ class CustomImageAnalyzerEngine:
420
  # block_size=11
421
  # )
422
  image_preprocessor = ContrastSegmentedImageEnhancer()
423
- print(image_preprocessor)
424
  self.image_preprocessor = image_preprocessor
425
 
426
  def perform_ocr(self, image: Union[str, Image.Image, np.ndarray]) -> List[OCRResult]:
@@ -461,6 +461,7 @@ class CustomImageAnalyzerEngine:
461
  def analyze_text(
462
  self,
463
  ocr_results: List[OCRResult],
 
464
  **text_analyzer_kwargs
465
  ) -> List[CustomImageRecognizerResult]:
466
  # Define English as default language, if not specified
@@ -468,8 +469,8 @@ class CustomImageAnalyzerEngine:
468
  text_analyzer_kwargs["language"] = "en"
469
 
470
  allow_list = text_analyzer_kwargs.get('allow_list', [])
471
- combined_results = []
472
 
 
473
  for ocr_result in ocr_results:
474
  # Analyze each OCR result (line) individually
475
  analyzer_result = self.analyzer_engine.analyze(
@@ -480,18 +481,42 @@ class CustomImageAnalyzerEngine:
480
  # Extract the relevant portion of text based on start and end
481
  relevant_text = ocr_result.text[result.start:result.end]
482
 
483
- # Create a new OCRResult with the relevant text and adjusted position
484
- relevant_ocr_result = OCRResult(
485
- text=relevant_text,
486
- left=ocr_result.left + self.estimate_x_offset(ocr_result.text, result.start),
487
- top=ocr_result.top,
488
- width=self.estimate_width(ocr_result=ocr_result, start=result.start, end=result.end),
489
- height=ocr_result.height
490
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
491
 
492
  # Map the analyzer results to bounding boxes for this line
493
  line_results = self.map_analyzer_results_to_bounding_boxes(
494
- [result], [relevant_ocr_result], relevant_text, allow_list
495
  )
496
 
497
  combined_results.extend(line_results)
@@ -504,33 +529,95 @@ class CustomImageAnalyzerEngine:
504
  ocr_results: List[OCRResult],
505
  full_text: str,
506
  allow_list: List[str],
 
507
  ) -> List[CustomImageRecognizerResult]:
508
  pii_bboxes = []
509
  text_position = 0
510
 
511
  for ocr_result in ocr_results:
512
  word_end = text_position + len(ocr_result.text)
 
 
513
 
514
  for result in text_analyzer_results:
515
- if (max(text_position, result.start) < min(word_end, result.end)) and (ocr_result.text not in allow_list):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
516
  pii_bboxes.append(
517
  CustomImageRecognizerResult(
518
  entity_type=result.entity_type,
519
  start=result.start,
520
  end=result.end,
521
  score=result.score,
522
- left=ocr_result.left,
523
- top=ocr_result.top,
524
- width=ocr_result.width,
525
- height=ocr_result.height,
526
  text=ocr_result.text
527
  )
528
  )
529
- break
530
 
531
  text_position = word_end + 1 # +1 for the space between words
532
 
533
  return pii_bboxes
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
534
 
535
  @staticmethod
536
  def remove_space_boxes(ocr_result: dict) -> dict:
@@ -676,17 +763,33 @@ class CustomImageAnalyzerEngine:
676
 
677
 
678
  # Function to combine OCR results into line-level results
679
- def combine_ocr_results(ocr_results, x_threshold=20, y_threshold=3):
680
- # Sort OCR results by 'top' to ensure line order
681
- ocr_results = sorted(ocr_results, key=lambda x: (x.top, x.left))
682
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
683
  combined_results = []
684
  new_format_results = {}
685
  current_line = []
686
  current_bbox = None
687
  line_counter = 1
688
 
689
- for result in ocr_results:
690
  if not current_line:
691
  # Start a new line
692
  current_line.append(result)
 
420
  # block_size=11
421
  # )
422
  image_preprocessor = ContrastSegmentedImageEnhancer()
423
+ #print(image_preprocessor)
424
  self.image_preprocessor = image_preprocessor
425
 
426
  def perform_ocr(self, image: Union[str, Image.Image, np.ndarray]) -> List[OCRResult]:
 
461
  def analyze_text(
462
  self,
463
  ocr_results: List[OCRResult],
464
+ ocr_results_with_children: Dict[str, Dict],
465
  **text_analyzer_kwargs
466
  ) -> List[CustomImageRecognizerResult]:
467
  # Define English as default language, if not specified
 
469
  text_analyzer_kwargs["language"] = "en"
470
 
471
  allow_list = text_analyzer_kwargs.get('allow_list', [])
 
472
 
473
+ combined_results = []
474
  for ocr_result in ocr_results:
475
  # Analyze each OCR result (line) individually
476
  analyzer_result = self.analyzer_engine.analyze(
 
481
  # Extract the relevant portion of text based on start and end
482
  relevant_text = ocr_result.text[result.start:result.end]
483
 
484
+ # Find the corresponding entry in ocr_results_with_children
485
+ child_info = ocr_results_with_children.get(ocr_result.text)
486
+ if child_info:
487
+ # Calculate left and width based on child words
488
+ #print("Found in ocr_results_with_children")
489
+ child_words = child_info['words']
490
+ start_word = child_words[0]
491
+ end_word = child_words[-1]
492
+ left = start_word['bounding_box'][0]
493
+ width = end_word['bounding_box'][2] - left
494
+
495
+ relevant_ocr_result = OCRResult(
496
+ text=relevant_text,
497
+ left=left,
498
+ top=ocr_result.top,
499
+ width=width,
500
+ height=ocr_result.height
501
+ )
502
+ else:
503
+ # Fallback to previous method if not found in ocr_results_with_children
504
+ #print("Couldn't find result in ocr_results_with_children")
505
+ relevant_ocr_result = OCRResult(
506
+ text=relevant_text,
507
+ left=ocr_result.left + self.estimate_x_offset(relevant_text, result.start),
508
+ top=ocr_result.top,
509
+ width=self.estimate_width(ocr_result=ocr_result, start=result.start, end=result.end),
510
+ height=ocr_result.height
511
+ )
512
+
513
+ result_mod = result
514
+ result.start = 0
515
+ result.end = len(relevant_text)
516
 
517
  # Map the analyzer results to bounding boxes for this line
518
  line_results = self.map_analyzer_results_to_bounding_boxes(
519
+ [result_mod], [relevant_ocr_result], ocr_result.text, allow_list, ocr_results_with_children
520
  )
521
 
522
  combined_results.extend(line_results)
 
529
  ocr_results: List[OCRResult],
530
  full_text: str,
531
  allow_list: List[str],
532
+ ocr_results_with_children: Dict[str, Dict]
533
  ) -> List[CustomImageRecognizerResult]:
534
  pii_bboxes = []
535
  text_position = 0
536
 
537
  for ocr_result in ocr_results:
538
  word_end = text_position + len(ocr_result.text)
539
+
540
+ #print("Checking relevant OCR result:", ocr_result)
541
 
542
  for result in text_analyzer_results:
543
+ max_of_current_text_pos_or_result_start_pos = max(text_position, result.start)
544
+ min_of_result_end_pos_or_results_end = min(word_end, result.end)
545
+
546
+ #print("max_of_current_text_pos_or_result_start_pos", str(max_of_current_text_pos_or_result_start_pos))
547
+ #print("min_of_result_end_pos_or_results_end", str(min_of_result_end_pos_or_results_end))
548
+
549
+ if (max_of_current_text_pos_or_result_start_pos < min_of_result_end_pos_or_results_end) and (ocr_result.text not in allow_list):
550
+ print("result", result, "made it through if statement")
551
+
552
+ # Find the corresponding entry in ocr_results_with_children
553
+ child_info = ocr_results_with_children.get(full_text)
554
+ if child_info:
555
+ # Use the bounding box from ocr_results_with_children
556
+ bbox = child_info['bounding_box']
557
+ left, top, right, bottom = bbox
558
+ width = right - left
559
+ height = bottom - top
560
+ else:
561
+ # Fallback to ocr_result if not found
562
+ left = ocr_result.left
563
+ top = ocr_result.top
564
+ width = ocr_result.width
565
+ height = ocr_result.height
566
+
567
  pii_bboxes.append(
568
  CustomImageRecognizerResult(
569
  entity_type=result.entity_type,
570
  start=result.start,
571
  end=result.end,
572
  score=result.score,
573
+ left=left,
574
+ top=top,
575
+ width=width,
576
+ height=height,
577
  text=ocr_result.text
578
  )
579
  )
 
580
 
581
  text_position = word_end + 1 # +1 for the space between words
582
 
583
  return pii_bboxes
584
+
585
+ # @staticmethod
586
+ # def map_analyzer_results_to_bounding_boxes(
587
+ # text_analyzer_results: List[RecognizerResult],
588
+ # ocr_results: List[OCRResult],
589
+ # full_text: str,
590
+ # allow_list: List[str],
591
+ # ) -> List[CustomImageRecognizerResult]:
592
+ # pii_bboxes = []
593
+ # text_position = 0
594
+
595
+ # for ocr_result in ocr_results:
596
+ # word_end = text_position + len(ocr_result.text)
597
+
598
+ # print("Checking relevant OCR result:", ocr_result)
599
+
600
+ # for result in text_analyzer_results:
601
+ # if (max(text_position, result.start) < min(word_end, result.end)) and (ocr_result.text not in allow_list):
602
+ # print("result", result, "made it through if statement")
603
+
604
+ # pii_bboxes.append(
605
+ # CustomImageRecognizerResult(
606
+ # entity_type=result.entity_type,
607
+ # start=result.start,
608
+ # end=result.end,
609
+ # score=result.score,
610
+ # left=ocr_result.left,
611
+ # top=ocr_result.top,
612
+ # width=ocr_result.width,
613
+ # height=ocr_result.height,
614
+ # text=ocr_result.text
615
+ # )
616
+ # )
617
+
618
+ # text_position = word_end + 1 # +1 for the space between words
619
+
620
+ # return pii_bboxes
621
 
622
  @staticmethod
623
  def remove_space_boxes(ocr_result: dict) -> dict:
 
763
 
764
 
765
  # Function to combine OCR results into line-level results
766
+ def combine_ocr_results(ocr_results, x_threshold=50, y_threshold=12):
767
+ # Group OCR results into lines based on y_threshold
768
+ lines = []
769
+ current_line = []
770
+ for result in sorted(ocr_results, key=lambda x: x.top):
771
+ if not current_line or abs(result.top - current_line[0].top) <= y_threshold:
772
+ current_line.append(result)
773
+ else:
774
+ lines.append(current_line)
775
+ current_line = [result]
776
+ if current_line:
777
+ lines.append(current_line)
778
+
779
+ # Sort each line by left position
780
+ for line in lines:
781
+ line.sort(key=lambda x: x.left)
782
+
783
+ # Flatten the sorted lines back into a single list
784
+ sorted_results = [result for line in lines for result in line]
785
+
786
  combined_results = []
787
  new_format_results = {}
788
  current_line = []
789
  current_bbox = None
790
  line_counter = 1
791
 
792
+ for result in sorted_results:
793
  if not current_line:
794
  # Start a new line
795
  current_line.append(result)
tools/file_redaction.py CHANGED
@@ -9,156 +9,7 @@ import pandas as pd
9
 
10
  #from presidio_image_redactor.entities import ImageRecognizerResult
11
  from pdfminer.high_level import extract_pages
12
- from pdfminer.layout import LTTextContainer, LTChar, LTTextLine #, LTAnno
13
- from pikepdf import Pdf, Dictionary, Name
14
- import gradio as gr
15
- from gradio import Progress
16
-
17
- from collections import defaultdict # For efficient grouping
18
-
19
- from tools.custom_image_analyser_engine import CustomImageAnalyzerEngine, OCRResult, combine_ocr_results, CustomImageRecognizerResult
20
- from tools.file_conversion import process_file
21
- from tools.load_spacy_model_custom_recognisers import nlp_analyser, score_threshold
22
- from tools.helper_functions import get_file_path_end, output_folder
23
- from tools.file_conversion import process_file, is_pdf, convert_text_pdf_to_img_pdf, is_pdf_or_image
24
- from tools.data_anonymise import generate_decision_process_output
25
- from tools.aws_textract import analyse_page_with_textract, convert_pike_pdf_page_to_bytes, json_to_ocrresult
26
-
27
- def sum_numbers_before_seconds(string:str):
28
- """Extracts numbers that precede the word 'seconds' from a string and adds them up.
29
-
30
- Args:
31
- string: The input string.
32
-
33
- Returns:
34
- The sum of all numbers before 'seconds' in the string.
35
- """
36
-
37
- # Extract numbers before 'seconds' using regular expression
38
- numbers = re.findall(r'(\d+\.\d+)?\s*seconds', string)
39
-
40
- # Extract the numbers from the matches
41
- numbers = [float(num.split()[0]) for num in numbers]
42
-
43
- # Sum up the extracted numbers
44
- sum_of_numbers = round(sum(numbers),1)
45
-
46
- return sum_of_numbers
47
-
48
- def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], language:str, chosen_redact_entities:List[str], in_redact_method:str, in_allow_list:List[List[str]]=None, latest_file_completed:int=0, out_message:list=[], out_file_paths:list=[], log_files_output_paths:list=[], first_loop_state:bool=False, page_min:int=0, page_max:int=999, estimated_time_taken_state:float=0.0, handwrite_signature_checkbox:List[str]=["Redact all identified handwriting", "Redact all identified signatures"], all_request_metadata_str:str = "", progress=gr.Progress(track_tqdm=True)):
49
- '''
50
- Based on the type of redaction selected, pass the document file content onto the relevant function and return a redacted document plus processing logs.
51
- '''
52
-
53
- tic = time.perf_counter()
54
- all_request_metadata = all_request_metadata_str.split('\n') if all_request_metadata_str else []
55
-
56
- # If this is the first time around, set variables to 0/blank
57
- if first_loop_state==True:
58
- latest_file_completed = 0
59
- #out_message = []
60
- out_file_paths = []
61
-
62
- # If out message is string or out_file_paths are blank, change to a list so it can be appended to
63
- if isinstance(out_message, str):
64
- out_message = [out_message]
65
-
66
- if not out_file_paths:
67
- out_file_paths = []
68
-
69
- latest_file_completed = int(latest_file_completed)
70
-
71
- # If we have already redacted the last file, return the input out_message and file list to the relevant components
72
- if latest_file_completed >= len(file_paths):
73
- print("Last file reached")
74
- # Set to a very high number so as not to mix up with subsequent file processing by the user
75
- latest_file_completed = 99
76
- final_out_message = '\n'.join(out_message)
77
- #final_out_message = final_out_message + "\n\nGo to to the Redaction settings tab to see redaction logs. Please give feedback on the results below to help improve this app."
78
-
79
- estimate_total_processing_time = sum_numbers_before_seconds(final_out_message)
80
- print("Estimated total processing time:", str(estimate_total_processing_time))
81
-
82
- return final_out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimate_total_processing_time, all_request_metadata_str
83
-
84
- file_paths_loop = [file_paths[int(latest_file_completed)]]
85
-
86
- if not in_allow_list.empty:
87
- in_allow_list_flat = in_allow_list[0].tolist()
88
- print("In allow list:", in_allow_list_flat)
89
- else:
90
- in_allow_list_flat = []
91
-
92
- for file in progress.tqdm(file_paths_loop, desc="Redacting files", unit = "files"):
93
- file_path = file.name
94
-
95
- if file_path:
96
- file_path_without_ext = get_file_path_end(file_path)
97
- is_a_pdf = is_pdf(file_path) == True
98
- if is_a_pdf == False:
99
- # If user has not submitted a pdf, assume it's an image
100
- print("File is not a pdf, assuming that image analysis needs to be used.")
101
- in_redact_method = "Quick image analysis - typed text"
102
- else:
103
- out_message = "No file selected"
104
- print(out_message)
105
- return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
106
-
107
- if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis - AWS Textract, handwriting/signatures":
108
- #Analyse and redact image-based pdf or image
109
- if is_pdf_or_image(file_path) == False:
110
- out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
111
- return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
112
-
113
- print("Redacting file " + file_path_without_ext + " as an image-based file")
114
- pdf_images, output_logs, logging_file_paths, new_request_metadata = redact_image_pdf(file_path, image_paths, language, chosen_redact_entities, in_allow_list_flat, is_a_pdf, page_min, page_max, in_redact_method, handwrite_signature_checkbox)
115
-
116
- # Save file
117
- out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
118
- pdf_images[0].save(out_image_file_path, "PDF" ,resolution=100.0, save_all=True, append_images=pdf_images[1:])
119
-
120
- out_file_paths.append(out_image_file_path)
121
- if logging_file_paths:
122
- log_files_output_paths.extend(logging_file_paths)
123
-
124
- out_message.append("File '" + file_path_without_ext + "' successfully redacted")
125
-
126
- # Save decision making process
127
- output_logs_str = str(output_logs)
128
- logs_output_file_name = out_image_file_path + "_decision_process_output.txt"
129
- with open(logs_output_file_name, "w") as f:
130
- f.write(output_logs_str)
131
- log_files_output_paths.append(logs_output_file_name)
132
-
133
- # Save Textract request metadata (if exists)
134
- if new_request_metadata:
135
- print("Request metadata:", new_request_metadata)
136
- all_request_metadata.append(new_request_metadata)
137
-
138
- # Increase latest file completed count unless we are at the last file
139
- if latest_file_completed != len(file_paths):
140
- print("Completed file number:", str(latest_file_completed))
141
- latest_file_completed += 1
142
-
143
- elif in_redact_method == "Simple text analysis - PDFs with selectable text":
144
-
145
- if is_pdf(file_path) == False:
146
- return "Please upload a PDF file for text analysis. If you have an image, select 'Image analysis'.", None, None
147
-
148
- # Analyse text-based pdf
149
- print('Redacting file as text-based PDF')
150
- import time
151
- import re
152
- import json
153
- import io
154
- import os
155
- from PIL import Image, ImageChops, ImageDraw
156
- from typing import List, Dict
157
- import pandas as pd
158
-
159
- #from presidio_image_redactor.entities import ImageRecognizerResult
160
- from pdfminer.high_level import extract_pages
161
- from pdfminer.layout import LTTextContainer, LTChar, LTTextLine #, LTAnno
162
  from pikepdf import Pdf, Dictionary, Name
163
  import gradio as gr
164
  from gradio import Progress
@@ -349,8 +200,6 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
349
 
350
  return out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
351
 
352
-
353
-
354
  def bounding_boxes_overlap(box1, box2):
355
  """Check if two bounding boxes overlap."""
356
  return (box1[0] < box2[2] and box2[0] < box1[2] and
@@ -385,11 +234,11 @@ def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_result
385
  current_char = 0
386
  for word in line_info['words']:
387
  word_end = current_char + len(word['text'])
388
- if current_char <= start_char < word_end or current_char < end_char <= word_end:
389
  relevant_words.append(word)
390
  if word_end >= end_char:
391
  break
392
- current_char = word_end # +1 for space
393
  if not word['text'].endswith(' '):
394
  current_char += 1 # +1 for space if the word doesn't already end with a space
395
 
@@ -400,7 +249,7 @@ def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_result
400
  right = max(word['bounding_box'][2] for word in relevant_words)
401
  bottom = max(word['bounding_box'][3] for word in relevant_words)
402
 
403
- # Combine the text of the relevant words
404
  combined_text = " ".join(word['text'] for word in relevant_words)
405
 
406
  reconstructed_bbox = CustomImageRecognizerResult(
@@ -551,6 +400,8 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
551
  # Combine OCR results
552
  ocr_results, ocr_results_with_children = combine_ocr_results(ocr_results)
553
 
 
 
554
  # Save decision making process
555
  ocr_results_with_children_str = str(ocr_results_with_children)
556
  logs_output_file_name = output_folder + "ocr_with_children.txt"
@@ -589,6 +440,7 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
589
  # Step 2: Analyze text and identify PII
590
  bboxes = image_analyser.analyze_text(
591
  ocr_results,
 
592
  language=language,
593
  entities=chosen_redact_entities,
594
  allow_list=allow_list,
@@ -650,59 +502,81 @@ def analyze_text_container(text_container, language, chosen_redact_entities, sco
650
  score_threshold=score_threshold,
651
  return_decision_process=True,
652
  allow_list=allow_list)
 
 
653
  characters = [char
654
  for line in text_container
655
- if isinstance(line, LTTextLine)
656
  for char in line]
657
 
658
  return analyzer_results, characters
659
  return [], []
660
 
661
- # Inside the loop where you process analyzer_results, merge bounding boxes that are right next to each other:
662
  def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, vertical_padding=2):
663
  '''
664
  Merge identified bounding boxes containing PII that are very close to one another
665
  '''
666
  analyzed_bounding_boxes = []
667
  if len(analyzer_results) > 0 and len(characters) > 0:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
668
  merged_bounding_boxes = []
669
  current_box = None
670
  current_y = None
 
671
 
672
- for i, result in enumerate(analyzer_results):
673
- print("Considering result", str(i))
674
- for char in characters[result.start : result.end]:
675
- if isinstance(char, LTChar):
676
- char_box = list(char.bbox)
677
- # Add vertical padding to the top of the box
678
- char_box[3] += vertical_padding
679
-
680
- if current_y is None or current_box is None:
681
- current_box = char_box
682
- current_y = char_box[1]
683
- else:
684
- vertical_diff_bboxes = abs(char_box[1] - current_y)
685
- horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
686
-
687
- if (
688
- vertical_diff_bboxes <= 5
689
- and horizontal_diff_bboxes <= combine_pixel_dist
690
- ):
691
- current_box[2] = char_box[2] # Extend the current box horizontally
692
- current_box[3] = max(current_box[3], char_box[3]) # Ensure the top is the highest
693
- else:
694
- merged_bounding_boxes.append(
695
- {"boundingBox": current_box, "result": result})
696
-
697
- # Reset current_box and current_y after appending
698
- current_box = char_box
699
- current_y = char_box[1]
700
-
701
- # After finishing with the current result, add the last box for this result
702
- if current_box:
703
- merged_bounding_boxes.append({"boundingBox": current_box, "result": result})
704
- current_box = None
705
- current_y = None # Reset for the next result
 
 
 
 
 
706
 
707
  if not merged_bounding_boxes:
708
  analyzed_bounding_boxes.extend(
@@ -714,104 +588,10 @@ def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, verti
714
  else:
715
  analyzed_bounding_boxes.extend(merged_bounding_boxes)
716
 
717
- print("analysed_bounding_boxes:\n\n", analyzed_bounding_boxes)
718
 
719
  return analyzed_bounding_boxes
720
 
721
- # def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, vertical_padding=2, signature_bounding_boxes=None):
722
- # '''
723
- # Merge identified bounding boxes containing PII or signatures that are very close to one another.
724
- # '''
725
- # analyzed_bounding_boxes = []
726
- # merged_bounding_boxes = []
727
- # current_box = None
728
- # current_y = None
729
-
730
- # # Handle PII and text bounding boxes first
731
- # if len(analyzer_results) > 0 and len(characters) > 0:
732
- # for i, result in enumerate(analyzer_results):
733
- # #print("Considering result", str(i))
734
- # #print("Result:", result)
735
- # #print("Characters:", characters)
736
-
737
- # for char in characters[result.start: result.end]:
738
- # if isinstance(char, LTChar):
739
- # char_box = list(char.bbox)
740
- # # Add vertical padding to the top of the box
741
- # char_box[3] += vertical_padding
742
-
743
- # if current_y is None or current_box is None:
744
- # current_box = char_box
745
- # current_y = char_box[1]
746
- # else:
747
- # vertical_diff_bboxes = abs(char_box[1] - current_y)
748
- # horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
749
-
750
- # if (
751
- # vertical_diff_bboxes <= 5
752
- # and horizontal_diff_bboxes <= combine_pixel_dist
753
- # ):
754
- # current_box[2] = char_box[2] # Extend the current box horizontally
755
- # current_box[3] = max(current_box[3], char_box[3]) # Ensure the top is the highest
756
- # else:
757
- # merged_bounding_boxes.append(
758
- # {"boundingBox": current_box, "result": result})
759
-
760
- # # Reset current_box and current_y after appending
761
- # current_box = char_box
762
- # current_y = char_box[1]
763
-
764
- # # After finishing with the current result, add the last box for this result
765
- # if current_box:
766
- # merged_bounding_boxes.append({"boundingBox": current_box, "result": result})
767
- # current_box = None
768
- # current_y = None # Reset for the next result
769
-
770
- # # Handle signature bounding boxes (without specific characters)
771
- # if signature_bounding_boxes is not None:
772
- # for sig_box in signature_bounding_boxes:
773
- # sig_box = list(sig_box) # Ensure it's a list to modify the values
774
- # if current_y is None or current_box is None:
775
- # current_box = sig_box
776
- # current_y = sig_box[1]
777
- # else:
778
- # vertical_diff_bboxes = abs(sig_box[1] - current_y)
779
- # horizontal_diff_bboxes = abs(sig_box[0] - current_box[2])
780
-
781
- # if (
782
- # vertical_diff_bboxes <= 5
783
- # and horizontal_diff_bboxes <= combine_pixel_dist
784
- # ):
785
- # current_box[2] = sig_box[2] # Extend the current box horizontally
786
- # current_box[3] = max(current_box[3], sig_box[3]) # Ensure the top is the highest
787
- # else:
788
- # merged_bounding_boxes.append({"boundingBox": current_box, "type": "signature"})
789
-
790
- # # Reset current_box and current_y after appending
791
- # current_box = sig_box
792
- # current_y = sig_box[1]
793
-
794
- # # Add the last bounding box for the signature
795
- # if current_box:
796
- # merged_bounding_boxes.append({"boundingBox": current_box, "type": "signature"})
797
- # current_box = None
798
- # current_y = None
799
-
800
- # # If no bounding boxes were merged, add individual character bounding boxes
801
- # if not merged_bounding_boxes:
802
- # analyzed_bounding_boxes.extend(
803
- # {"boundingBox": char.bbox, "result": result}
804
- # for result in analyzer_results
805
- # for char in characters[result.start:result.end]
806
- # if isinstance(char, LTChar)
807
- # )
808
- # else:
809
- # analyzed_bounding_boxes.extend(merged_bounding_boxes)
810
-
811
- # #print("analysed_bounding_boxes:\n\n", analyzed_bounding_boxes)
812
-
813
- # return analyzed_bounding_boxes
814
-
815
  def create_text_redaction_process_results(analyzer_results, analyzed_bounding_boxes, page_num):
816
  decision_process_table = pd.DataFrame()
817
 
@@ -857,7 +637,7 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
857
  annotations_all_pages = []
858
  decision_process_table_all_pages = []
859
 
860
- combine_pixel_dist = 200 # Horizontal distance between PII bounding boxes under/equal they are combined into one
861
 
862
  pdf = Pdf.open(filename)
863
  page_num = 0
@@ -883,9 +663,9 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
883
  print("Page number is:", page_no)
884
 
885
  # The /MediaBox in a PDF specifies the size of the page [left, bottom, right, top]
886
- media_box = page.MediaBox
887
- page_width = media_box[2] - media_box[0]
888
- page_height = media_box[3] - media_box[1]
889
 
890
 
891
  annotations_on_page = []
@@ -905,8 +685,14 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
905
  text_container_analyzer_results, characters = analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list)
906
 
907
  # Merge bounding boxes if very close together
 
 
 
 
908
  text_container_analyzed_bounding_boxes = merge_bounding_boxes(text_container_analyzer_results, characters, combine_pixel_dist, vertical_padding = 2)
909
 
 
 
910
 
911
  page_analyzed_bounding_boxes.extend(text_container_analyzed_bounding_boxes)
912
  page_analyzer_results.extend(text_container_analyzer_results)
@@ -915,7 +701,7 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
915
  decision_process_table_on_page = create_text_redaction_process_results(page_analyzer_results, page_analyzed_bounding_boxes, page_num)
916
 
917
  annotations_on_page = create_annotations_for_bounding_boxes(page_analyzed_bounding_boxes)
918
- #print('\n\nannotations_on_page:', annotations_on_page)
919
 
920
  # Make page annotations
921
  page.Annots = pdf.make_indirect(annotations_on_page)
 
9
 
10
  #from presidio_image_redactor.entities import ImageRecognizerResult
11
  from pdfminer.high_level import extract_pages
12
+ from pdfminer.layout import LTTextContainer, LTChar, LTTextLine, LTTextLineHorizontal #, LTAnno
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  from pikepdf import Pdf, Dictionary, Name
14
  import gradio as gr
15
  from gradio import Progress
 
200
 
201
  return out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
202
 
 
 
203
  def bounding_boxes_overlap(box1, box2):
204
  """Check if two bounding boxes overlap."""
205
  return (box1[0] < box2[2] and box2[0] < box1[2] and
 
234
  current_char = 0
235
  for word in line_info['words']:
236
  word_end = current_char + len(word['text'])
237
+ if current_char <= start_char < word_end or current_char < end_char <= word_end or (start_char <= current_char and word_end <= end_char):
238
  relevant_words.append(word)
239
  if word_end >= end_char:
240
  break
241
+ current_char = word_end
242
  if not word['text'].endswith(' '):
243
  current_char += 1 # +1 for space if the word doesn't already end with a space
244
 
 
249
  right = max(word['bounding_box'][2] for word in relevant_words)
250
  bottom = max(word['bounding_box'][3] for word in relevant_words)
251
 
252
+ # Combine the text of all relevant words
253
  combined_text = " ".join(word['text'] for word in relevant_words)
254
 
255
  reconstructed_bbox = CustomImageRecognizerResult(
 
400
  # Combine OCR results
401
  ocr_results, ocr_results_with_children = combine_ocr_results(ocr_results)
402
 
403
+ #print("ocr_results after:", ocr_results)
404
+
405
  # Save decision making process
406
  ocr_results_with_children_str = str(ocr_results_with_children)
407
  logs_output_file_name = output_folder + "ocr_with_children.txt"
 
440
  # Step 2: Analyze text and identify PII
441
  bboxes = image_analyser.analyze_text(
442
  ocr_results,
443
+ ocr_results_with_children,
444
  language=language,
445
  entities=chosen_redact_entities,
446
  allow_list=allow_list,
 
502
  score_threshold=score_threshold,
503
  return_decision_process=True,
504
  allow_list=allow_list)
505
+
506
+ #print("\ntext_container:", text_container)
507
  characters = [char
508
  for line in text_container
509
+ if isinstance(line, LTTextLine) or isinstance(line, LTTextLineHorizontal)
510
  for char in line]
511
 
512
  return analyzer_results, characters
513
  return [], []
514
 
 
515
  def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, vertical_padding=2):
516
  '''
517
  Merge identified bounding boxes containing PII that are very close to one another
518
  '''
519
  analyzed_bounding_boxes = []
520
  if len(analyzer_results) > 0 and len(characters) > 0:
521
+ # Extract bounding box coordinates for sorting
522
+ bounding_boxes = []
523
+ for result in analyzer_results:
524
+ char_boxes = [char.bbox for char in characters[result.start:result.end] if isinstance(char, LTChar)]
525
+ if char_boxes:
526
+ # Calculate the bounding box that encompasses all characters
527
+ left = min(box[0] for box in char_boxes)
528
+ bottom = min(box[1] for box in char_boxes)
529
+ right = max(box[2] for box in char_boxes)
530
+ top = max(box[3] for box in char_boxes) + vertical_padding
531
+ bounding_boxes.append((bottom, left, result, [left, bottom, right, top])) # (y, x, result, bbox)
532
+
533
+ # Sort the results by y-coordinate and then by x-coordinate
534
+ bounding_boxes.sort()
535
+
536
  merged_bounding_boxes = []
537
  current_box = None
538
  current_y = None
539
+ current_result = None
540
 
541
+ for y, x, result, char_box in bounding_boxes:
542
+ print(f"Considering result: {result}")
543
+ print(f"Character box: {char_box}")
544
+
545
+ if current_y is None or current_box is None:
546
+ current_box = char_box
547
+ current_y = char_box[1]
548
+ current_result = result
549
+ print(f"Starting new box: {current_box}")
550
+ else:
551
+ vertical_diff_bboxes = abs(char_box[1] - current_y)
552
+ horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
553
+
554
+ print(f"Comparing boxes: current_box={current_box}, char_box={char_box}")
555
+ print(f"Vertical diff: {vertical_diff_bboxes}, Horizontal diff: {horizontal_diff_bboxes}")
556
+
557
+ if (
558
+ vertical_diff_bboxes <= 5
559
+ and horizontal_diff_bboxes <= combine_pixel_dist
560
+ ):
561
+ current_box[2] = char_box[2] # Extend the current box horizontally
562
+ current_box[3] = max(current_box[3], char_box[3]) # Ensure the top is the highest
563
+ current_result.end = max(current_result.end, result.end) # Extend the text range
564
+ print(f"Extended current box: {current_box}")
565
+ else:
566
+ merged_bounding_boxes.append(
567
+ {"boundingBox": current_box, "result": current_result})
568
+ print(f"Appending merged box: {current_box}")
569
+
570
+ # Reset current_box and current_y after appending
571
+ current_box = char_box
572
+ current_y = char_box[1]
573
+ current_result = result
574
+ print(f"Starting new box: {current_box}")
575
+
576
+ # After finishing with the current result, add the last box for this result
577
+ if current_box:
578
+ merged_bounding_boxes.append({"boundingBox": current_box, "result": current_result})
579
+ print(f"Appending final box for result: {current_box}")
580
 
581
  if not merged_bounding_boxes:
582
  analyzed_bounding_boxes.extend(
 
588
  else:
589
  analyzed_bounding_boxes.extend(merged_bounding_boxes)
590
 
591
+ print("Analyzed bounding boxes:\n\n", analyzed_bounding_boxes)
592
 
593
  return analyzed_bounding_boxes
594
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
595
  def create_text_redaction_process_results(analyzer_results, analyzed_bounding_boxes, page_num):
596
  decision_process_table = pd.DataFrame()
597
 
 
637
  annotations_all_pages = []
638
  decision_process_table_all_pages = []
639
 
640
+ combine_pixel_dist = 100 # Horizontal distance between PII bounding boxes under/equal they are combined into one
641
 
642
  pdf = Pdf.open(filename)
643
  page_num = 0
 
663
  print("Page number is:", page_no)
664
 
665
  # The /MediaBox in a PDF specifies the size of the page [left, bottom, right, top]
666
+ #media_box = page.MediaBox
667
+ #page_width = media_box[2] - media_box[0]
668
+ #page_height = media_box[3] - media_box[1]
669
 
670
 
671
  annotations_on_page = []
 
685
  text_container_analyzer_results, characters = analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list)
686
 
687
  # Merge bounding boxes if very close together
688
+ print("\n\ntext_container_analyzer_results:", text_container_analyzer_results)
689
+
690
+ #print("\n\ncharacters:", characters)
691
+
692
  text_container_analyzed_bounding_boxes = merge_bounding_boxes(text_container_analyzer_results, characters, combine_pixel_dist, vertical_padding = 2)
693
 
694
+ print("\n\ntext_container_analyzed_bounding_boxes:", text_container_analyzed_bounding_boxes)
695
+
696
 
697
  page_analyzed_bounding_boxes.extend(text_container_analyzed_bounding_boxes)
698
  page_analyzer_results.extend(text_container_analyzer_results)
 
701
  decision_process_table_on_page = create_text_redaction_process_results(page_analyzer_results, page_analyzed_bounding_boxes, page_num)
702
 
703
  annotations_on_page = create_annotations_for_bounding_boxes(page_analyzed_bounding_boxes)
704
+ #print('\n\nAnnotations_on_page:', annotations_on_page)
705
 
706
  # Make page annotations
707
  page.Annots = pdf.make_indirect(annotations_on_page)
tools/load_spacy_model_custom_recognisers.py CHANGED
@@ -16,7 +16,6 @@ score_threshold = 0.001
16
 
17
  # %%
18
  # Custom title recogniser
19
- import re
20
  titles_list = ["Sir", "Ma'am", "Madam", "Mr", "Mr.", "Mrs", "Mrs.", "Ms", "Ms.", "Miss", "Dr", "Dr.", "Professor"]
21
  titles_regex = '\\b' + '\\b|\\b'.join(rf"{re.escape(title)}" for title in titles_list) + '\\b'
22
  titles_pattern = Pattern(name="titles_pattern",regex=titles_regex, score = 1)
@@ -26,7 +25,11 @@ titles_recogniser = PatternRecognizer(supported_entity="TITLES", patterns = [tit
26
  # Custom postcode recogniser
27
 
28
  # Define the regex pattern in a Presidio `Pattern` object:
29
- ukpostcode_pattern = Pattern(name="ukpostcode_pattern",regex="\b([A-Z][A-HJ-Y]?[0-9][0-9A-Z]? ?[0-9][A-Z]{2}|GIR ?0AA)\b", score = 1)
 
 
 
 
30
 
31
  # Define the recognizer with one or more patterns
32
  ukpostcode_recogniser = PatternRecognizer(supported_entity="UKPOSTCODE", patterns = [ukpostcode_pattern])
@@ -77,10 +80,9 @@ def extract_street_name(text:str) -> str:
77
  street_name = match.group('street_name').strip()
78
  start_pos = match.start()
79
  end_pos = match.end()
80
- print(f"Start: {start_pos}, End: {end_pos}")
81
- print(f"Preceding words: {preceding_word}")
82
- print(f"Street name: {street_name}")
83
- print()
84
 
85
  start_positions.append(start_pos)
86
  end_positions.append(end_pos)
@@ -158,7 +160,7 @@ loaded_nlp_engine = LoadedSpacyNlpEngine(loaded_spacy_model = nlp)
158
  nlp_analyser = AnalyzerEngine(nlp_engine=loaded_nlp_engine,
159
  default_score_threshold=score_threshold,
160
  supported_languages=["en"],
161
- log_decision_process=True,
162
  )
163
 
164
  # %%
 
16
 
17
  # %%
18
  # Custom title recogniser
 
19
  titles_list = ["Sir", "Ma'am", "Madam", "Mr", "Mr.", "Mrs", "Mrs.", "Ms", "Ms.", "Miss", "Dr", "Dr.", "Professor"]
20
  titles_regex = '\\b' + '\\b|\\b'.join(rf"{re.escape(title)}" for title in titles_list) + '\\b'
21
  titles_pattern = Pattern(name="titles_pattern",regex=titles_regex, score = 1)
 
25
  # Custom postcode recogniser
26
 
27
  # Define the regex pattern in a Presidio `Pattern` object:
28
+ ukpostcode_pattern = Pattern(
29
+ name="ukpostcode_pattern",
30
+ regex=r"\b([A-Z]{1,2}\d[A-Z\d]? ?\d[A-Z]{2}|GIR ?0AA)\b",
31
+ score=1
32
+ )
33
 
34
  # Define the recognizer with one or more patterns
35
  ukpostcode_recogniser = PatternRecognizer(supported_entity="UKPOSTCODE", patterns = [ukpostcode_pattern])
 
80
  street_name = match.group('street_name').strip()
81
  start_pos = match.start()
82
  end_pos = match.end()
83
+ #print(f"Start: {start_pos}, End: {end_pos}")
84
+ #print(f"Preceding words: {preceding_word}")
85
+ #print(f"Street name: {street_name}")
 
86
 
87
  start_positions.append(start_pos)
88
  end_positions.append(end_pos)
 
160
  nlp_analyser = AnalyzerEngine(nlp_engine=loaded_nlp_engine,
161
  default_score_threshold=score_threshold,
162
  supported_languages=["en"],
163
+ log_decision_process=False,
164
  )
165
 
166
  # %%