Spaces:
Sleeping
Sleeping
seanpedrickcase
commited on
Commit
·
e9c4101
1
Parent(s):
e1c402a
Added AWS Textract support. Allowed for OCR logs export.
Browse files- app.py +1 -1
- tools/aws_textract.py +151 -0
- tools/custom_image_analyser_engine.py +116 -0
- tools/file_conversion.py +9 -6
- tools/file_redaction.py +294 -111
app.py
CHANGED
@@ -124,7 +124,7 @@ with app:
|
|
124 |
Define redaction settings that affect both document and open text redaction.
|
125 |
""")
|
126 |
with gr.Accordion("Settings for documents", open = True):
|
127 |
-
in_redaction_method = gr.Radio(label="Default document redaction method - text analysis is faster is not useful for image-based PDFs. Imaged-based is slightly less accurate in general.", value = "Text analysis", choices=["Text analysis", "Image analysis"])
|
128 |
with gr.Row():
|
129 |
page_min = gr.Number(precision=0,minimum=0,maximum=9999, label="Lowest page to redact")
|
130 |
page_max = gr.Number(precision=0,minimum=0,maximum=9999, label="Highest page to redact")
|
|
|
124 |
Define redaction settings that affect both document and open text redaction.
|
125 |
""")
|
126 |
with gr.Accordion("Settings for documents", open = True):
|
127 |
+
in_redaction_method = gr.Radio(label="Default document redaction method - text analysis is faster is not useful for image-based PDFs. Imaged-based is slightly less accurate in general.", value = "Text analysis", choices=["Text analysis", "Image analysis", "AWS Textract"])
|
128 |
with gr.Row():
|
129 |
page_min = gr.Number(precision=0,minimum=0,maximum=9999, label="Lowest page to redact")
|
130 |
page_max = gr.Number(precision=0,minimum=0,maximum=9999, label="Highest page to redact")
|
tools/aws_textract.py
ADDED
@@ -0,0 +1,151 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import boto3
|
2 |
+
from PIL import Image
|
3 |
+
import io
|
4 |
+
import json
|
5 |
+
import pikepdf
|
6 |
+
# Example: converting this single page to an image
|
7 |
+
from pdf2image import convert_from_bytes
|
8 |
+
from tools.custom_image_analyser_engine import OCRResult, CustomImageRecognizerResult
|
9 |
+
|
10 |
+
def analyse_page_with_textract(pdf_page_bytes, json_file_path):
|
11 |
+
'''
|
12 |
+
Analyse page with AWS Textract
|
13 |
+
'''
|
14 |
+
try:
|
15 |
+
client = boto3.client('textract')
|
16 |
+
except:
|
17 |
+
print("Cannot connect to AWS Textract")
|
18 |
+
return "", "", ""
|
19 |
+
|
20 |
+
print("Analysing page with AWS Textract")
|
21 |
+
|
22 |
+
# Convert the image to bytes using an in-memory buffer
|
23 |
+
#image_buffer = io.BytesIO()
|
24 |
+
#image.save(image_buffer, format='PNG') # Save as PNG, or adjust format if needed
|
25 |
+
#image_bytes = image_buffer.getvalue()
|
26 |
+
|
27 |
+
#response = client.detect_document_text(Document={'Bytes': image_bytes})
|
28 |
+
response = client.analyze_document(Document={'Bytes': pdf_page_bytes}, FeatureTypes=["SIGNATURES"])
|
29 |
+
|
30 |
+
text_blocks = response['Blocks']
|
31 |
+
|
32 |
+
# Write the response to a JSON file
|
33 |
+
with open(json_file_path, 'w') as json_file:
|
34 |
+
json.dump(response, json_file, indent=4) # indent=4 makes the JSON file pretty-printed
|
35 |
+
|
36 |
+
print("Response has been written to output:", json_file_path)
|
37 |
+
|
38 |
+
return text_blocks
|
39 |
+
|
40 |
+
|
41 |
+
def convert_pike_pdf_page_to_bytes(pdf, page_num):
|
42 |
+
# Create a new empty PDF
|
43 |
+
new_pdf = pikepdf.Pdf.new()
|
44 |
+
|
45 |
+
# Specify the page number you want to extract (0-based index)
|
46 |
+
page_num = 0 # Example: first page
|
47 |
+
|
48 |
+
# Extract the specific page and add it to the new PDF
|
49 |
+
new_pdf.pages.append(pdf.pages[page_num])
|
50 |
+
|
51 |
+
# Save the new PDF to a bytes buffer
|
52 |
+
buffer = io.BytesIO()
|
53 |
+
new_pdf.save(buffer)
|
54 |
+
|
55 |
+
# Get the PDF bytes
|
56 |
+
pdf_bytes = buffer.getvalue()
|
57 |
+
|
58 |
+
# Now you can use the `pdf_bytes` to convert it to an image or further process
|
59 |
+
buffer.close()
|
60 |
+
|
61 |
+
#images = convert_from_bytes(pdf_bytes)
|
62 |
+
#image = images[0]
|
63 |
+
|
64 |
+
return pdf_bytes
|
65 |
+
|
66 |
+
|
67 |
+
def json_to_ocrresult(json_data, page_width, page_height):
|
68 |
+
'''
|
69 |
+
Convert the json response from textract to the OCRResult format used elsewhere in the code.
|
70 |
+
'''
|
71 |
+
all_ocr_results = []
|
72 |
+
signature_or_handwriting_recogniser_results = []
|
73 |
+
signatures = []
|
74 |
+
handwriting = []
|
75 |
+
|
76 |
+
for text_block in json_data:
|
77 |
+
|
78 |
+
is_signature = False
|
79 |
+
is_handwriting = False
|
80 |
+
|
81 |
+
if (text_block['BlockType'] == 'WORD') | (text_block['BlockType'] == 'LINE'):
|
82 |
+
text = text_block['Text']
|
83 |
+
|
84 |
+
# Extract BoundingBox details
|
85 |
+
bbox = text_block["Geometry"]["BoundingBox"]
|
86 |
+
left = bbox["Left"]
|
87 |
+
top = bbox["Top"]
|
88 |
+
width = bbox["Width"]
|
89 |
+
height = bbox["Height"]
|
90 |
+
|
91 |
+
# Convert proportional coordinates to absolute coordinates
|
92 |
+
left_abs = int(left * page_width)
|
93 |
+
top_abs = int(top * page_height)
|
94 |
+
width_abs = int(width * page_width)
|
95 |
+
height_abs = int(height * page_height)
|
96 |
+
|
97 |
+
# Create OCRResult with absolute coordinates
|
98 |
+
ocr_result = OCRResult(text, left_abs, top_abs, width_abs, height_abs)
|
99 |
+
|
100 |
+
# If handwriting or signature, add to bounding box
|
101 |
+
confidence = text_block['Confidence']
|
102 |
+
|
103 |
+
if 'TextType' in text_block:
|
104 |
+
text_type = text_block["TextType"]
|
105 |
+
|
106 |
+
if text_type == "HANDWRITING":
|
107 |
+
is_handwriting = True
|
108 |
+
entity_name = "HANDWRITING"
|
109 |
+
word_end = len(entity_name)
|
110 |
+
recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= text, score= confidence, start=0, end=word_end, left=left_abs, top=top_abs, width=width_abs, height=height_abs)
|
111 |
+
handwriting.append(recogniser_result)
|
112 |
+
print("Handwriting found:", handwriting[-1])
|
113 |
+
|
114 |
+
all_ocr_results.append(ocr_result)
|
115 |
+
|
116 |
+
elif (text_block['BlockType'] == 'SIGNATURE'):
|
117 |
+
text = "SIGNATURE"
|
118 |
+
|
119 |
+
# Extract BoundingBox details
|
120 |
+
bbox = text_block["Geometry"]["BoundingBox"]
|
121 |
+
left = bbox["Left"]
|
122 |
+
top = bbox["Top"]
|
123 |
+
width = bbox["Width"]
|
124 |
+
height = bbox["Height"]
|
125 |
+
|
126 |
+
# Convert proportional coordinates to absolute coordinates
|
127 |
+
left_abs = int(left * page_width)
|
128 |
+
top_abs = int(top * page_height)
|
129 |
+
width_abs = int(width * page_width)
|
130 |
+
height_abs = int(height * page_height)
|
131 |
+
|
132 |
+
# Create OCRResult with absolute coordinates
|
133 |
+
ocr_result = OCRResult(text, left_abs, top_abs, width_abs, height_abs)
|
134 |
+
|
135 |
+
|
136 |
+
is_signature = True
|
137 |
+
entity_name = "Signature"
|
138 |
+
word_end = len(entity_name)
|
139 |
+
recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= text, score= confidence, start=0, end=word_end, left=left_abs, top=top_abs, width=width_abs, height=height_abs)
|
140 |
+
signatures.append(recogniser_result)
|
141 |
+
print("Signature found:", signatures[-1])
|
142 |
+
|
143 |
+
all_ocr_results.append(ocr_result)
|
144 |
+
|
145 |
+
is_signature_or_handwriting = is_signature | is_handwriting
|
146 |
+
|
147 |
+
# If it is signature or handwriting, will overwrite the default behaviour of the PII analyser
|
148 |
+
if is_signature_or_handwriting:
|
149 |
+
signature_or_handwriting_recogniser_results.append(recogniser_result)
|
150 |
+
|
151 |
+
return all_ocr_results, signature_or_handwriting_recogniser_results
|
tools/custom_image_analyser_engine.py
ADDED
@@ -0,0 +1,116 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import pytesseract
|
2 |
+
from PIL import Image
|
3 |
+
import numpy as np
|
4 |
+
from presidio_analyzer import AnalyzerEngine, RecognizerResult
|
5 |
+
from typing import List, Dict, Optional, Union, Tuple
|
6 |
+
from dataclasses import dataclass
|
7 |
+
|
8 |
+
@dataclass
|
9 |
+
class OCRResult:
|
10 |
+
text: str
|
11 |
+
left: int
|
12 |
+
top: int
|
13 |
+
width: int
|
14 |
+
height: int
|
15 |
+
|
16 |
+
@dataclass
|
17 |
+
class CustomImageRecognizerResult:
|
18 |
+
entity_type: str
|
19 |
+
start: int
|
20 |
+
end: int
|
21 |
+
score: float
|
22 |
+
left: int
|
23 |
+
top: int
|
24 |
+
width: int
|
25 |
+
height: int
|
26 |
+
text: str
|
27 |
+
|
28 |
+
class CustomImageAnalyzerEngine:
|
29 |
+
def __init__(
|
30 |
+
self,
|
31 |
+
analyzer_engine: Optional[AnalyzerEngine] = None,
|
32 |
+
tesseract_config: Optional[str] = None
|
33 |
+
):
|
34 |
+
if not analyzer_engine:
|
35 |
+
analyzer_engine = AnalyzerEngine()
|
36 |
+
self.analyzer_engine = analyzer_engine
|
37 |
+
self.tesseract_config = tesseract_config or '--oem 3 --psm 11'
|
38 |
+
|
39 |
+
def perform_ocr(self, image: Union[str, Image.Image, np.ndarray]) -> List[OCRResult]:
|
40 |
+
# Ensure image is a PIL Image
|
41 |
+
if isinstance(image, str):
|
42 |
+
image = Image.open(image)
|
43 |
+
elif isinstance(image, np.ndarray):
|
44 |
+
image = Image.fromarray(image)
|
45 |
+
|
46 |
+
ocr_data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT, config=self.tesseract_config)
|
47 |
+
|
48 |
+
# Filter out empty strings and low confidence results
|
49 |
+
valid_indices = [i for i, text in enumerate(ocr_data['text']) if text.strip() and int(ocr_data['conf'][i]) > 0]
|
50 |
+
|
51 |
+
return [
|
52 |
+
OCRResult(
|
53 |
+
text=ocr_data['text'][i],
|
54 |
+
left=ocr_data['left'][i],
|
55 |
+
top=ocr_data['top'][i],
|
56 |
+
width=ocr_data['width'][i],
|
57 |
+
height=ocr_data['height'][i]
|
58 |
+
)
|
59 |
+
for i in valid_indices
|
60 |
+
]
|
61 |
+
|
62 |
+
def analyze_text(
|
63 |
+
self,
|
64 |
+
ocr_results: List[OCRResult],
|
65 |
+
**text_analyzer_kwargs
|
66 |
+
) -> List[CustomImageRecognizerResult]:
|
67 |
+
# Combine all OCR text
|
68 |
+
full_text = ' '.join([result.text for result in ocr_results])
|
69 |
+
|
70 |
+
# Define English as default language, if not specified
|
71 |
+
if "language" not in text_analyzer_kwargs:
|
72 |
+
text_analyzer_kwargs["language"] = "en"
|
73 |
+
|
74 |
+
analyzer_result = self.analyzer_engine.analyze(
|
75 |
+
text=full_text, **text_analyzer_kwargs
|
76 |
+
)
|
77 |
+
|
78 |
+
allow_list = text_analyzer_kwargs.get('allow_list', [])
|
79 |
+
|
80 |
+
return self.map_analyzer_results_to_bounding_boxes(
|
81 |
+
analyzer_result, ocr_results, full_text, allow_list
|
82 |
+
)
|
83 |
+
|
84 |
+
@staticmethod
|
85 |
+
def map_analyzer_results_to_bounding_boxes(
|
86 |
+
text_analyzer_results: List[RecognizerResult],
|
87 |
+
ocr_results: List[OCRResult],
|
88 |
+
full_text: str,
|
89 |
+
allow_list: List[str],
|
90 |
+
) -> List[CustomImageRecognizerResult]:
|
91 |
+
pii_bboxes = []
|
92 |
+
text_position = 0
|
93 |
+
|
94 |
+
for ocr_result in ocr_results:
|
95 |
+
word_end = text_position + len(ocr_result.text)
|
96 |
+
|
97 |
+
for result in text_analyzer_results:
|
98 |
+
if (max(text_position, result.start) < min(word_end, result.end)) and (ocr_result.text not in allow_list):
|
99 |
+
pii_bboxes.append(
|
100 |
+
CustomImageRecognizerResult(
|
101 |
+
entity_type=result.entity_type,
|
102 |
+
start=result.start,
|
103 |
+
end=result.end,
|
104 |
+
score=result.score,
|
105 |
+
left=ocr_result.left,
|
106 |
+
top=ocr_result.top,
|
107 |
+
width=ocr_result.width,
|
108 |
+
height=ocr_result.height,
|
109 |
+
text=ocr_result.text
|
110 |
+
)
|
111 |
+
)
|
112 |
+
break
|
113 |
+
|
114 |
+
text_position = word_end + 1 # +1 for the space between words
|
115 |
+
|
116 |
+
return pii_bboxes
|
tools/file_conversion.py
CHANGED
@@ -49,7 +49,7 @@ def convert_pdf_to_images(pdf_path:str, page_min:int = 0, progress=Progress(trac
|
|
49 |
#for page_num in progress.tqdm(range(0,page_count), total=page_count, unit="pages", desc="Converting pages"):
|
50 |
for page_num in range(page_min,page_count): #progress.tqdm(range(0,page_count), total=page_count, unit="pages", desc="Converting pages"):
|
51 |
|
52 |
-
|
53 |
|
54 |
# Convert one page to image
|
55 |
image = convert_from_path(pdf_path, first_page=page_num+1, last_page=page_num+1, dpi=300, use_cropbox=True, use_pdftocairo=False)
|
@@ -128,8 +128,8 @@ def prepare_image_or_text_pdf(
|
|
128 |
tic = time.perf_counter()
|
129 |
|
130 |
# If out message or out_file_paths are blank, change to a list so it can be appended to
|
131 |
-
|
132 |
-
|
133 |
|
134 |
# If this is the first time around, set variables to 0/blank
|
135 |
if first_loop_state==True:
|
@@ -150,8 +150,11 @@ def prepare_image_or_text_pdf(
|
|
150 |
# If we have already redacted the last file, return the input out_message and file list to the relevant components
|
151 |
if latest_file_completed >= len(file_paths):
|
152 |
print("Last file reached, returning files:", str(latest_file_completed))
|
153 |
-
|
154 |
-
|
|
|
|
|
|
|
155 |
|
156 |
#in_allow_list_flat = [item for sublist in in_allow_list for item in sublist]
|
157 |
|
@@ -178,7 +181,7 @@ def prepare_image_or_text_pdf(
|
|
178 |
print(out_message)
|
179 |
return out_message, out_file_paths
|
180 |
|
181 |
-
if in_redact_method == "Image analysis":
|
182 |
# Analyse and redact image-based pdf or image
|
183 |
if is_pdf_or_image(file_path) == False:
|
184 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
|
|
49 |
#for page_num in progress.tqdm(range(0,page_count), total=page_count, unit="pages", desc="Converting pages"):
|
50 |
for page_num in range(page_min,page_count): #progress.tqdm(range(0,page_count), total=page_count, unit="pages", desc="Converting pages"):
|
51 |
|
52 |
+
print("Converting page: ", str(page_num + 1))
|
53 |
|
54 |
# Convert one page to image
|
55 |
image = convert_from_path(pdf_path, first_page=page_num+1, last_page=page_num+1, dpi=300, use_cropbox=True, use_pdftocairo=False)
|
|
|
128 |
tic = time.perf_counter()
|
129 |
|
130 |
# If out message or out_file_paths are blank, change to a list so it can be appended to
|
131 |
+
if isinstance(out_message, str):
|
132 |
+
out_message = [out_message]
|
133 |
|
134 |
# If this is the first time around, set variables to 0/blank
|
135 |
if first_loop_state==True:
|
|
|
150 |
# If we have already redacted the last file, return the input out_message and file list to the relevant components
|
151 |
if latest_file_completed >= len(file_paths):
|
152 |
print("Last file reached, returning files:", str(latest_file_completed))
|
153 |
+
if isinstance(out_message, list):
|
154 |
+
final_out_message = '\n'.join(out_message)
|
155 |
+
else:
|
156 |
+
final_out_message = out_message
|
157 |
+
return final_out_message, out_file_paths
|
158 |
|
159 |
#in_allow_list_flat = [item for sublist in in_allow_list for item in sublist]
|
160 |
|
|
|
181 |
print(out_message)
|
182 |
return out_message, out_file_paths
|
183 |
|
184 |
+
if in_redact_method == "Image analysis" or in_redact_method == "AWS Textract":
|
185 |
# Analyse and redact image-based pdf or image
|
186 |
if is_pdf_or_image(file_path) == False:
|
187 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
tools/file_redaction.py
CHANGED
@@ -1,23 +1,28 @@
|
|
|
|
|
|
|
|
|
|
|
|
1 |
from PIL import Image, ImageChops, ImageDraw
|
2 |
from typing import List
|
3 |
import pandas as pd
|
4 |
-
|
5 |
from presidio_image_redactor.entities import ImageRecognizerResult
|
6 |
from pdfminer.high_level import extract_pages
|
7 |
-
from tools.file_conversion import process_file
|
8 |
from pdfminer.layout import LTTextContainer, LTChar, LTTextLine #, LTAnno
|
9 |
from pikepdf import Pdf, Dictionary, Name
|
|
|
10 |
from gradio import Progress
|
11 |
-
|
12 |
-
import re
|
13 |
from collections import defaultdict # For efficient grouping
|
14 |
|
|
|
|
|
15 |
from tools.load_spacy_model_custom_recognisers import nlp_analyser, score_threshold
|
16 |
from tools.helper_functions import get_file_path_end, output_folder
|
17 |
from tools.file_conversion import process_file, is_pdf, convert_text_pdf_to_img_pdf
|
18 |
from tools.data_anonymise import generate_decision_process_output
|
19 |
-
import
|
20 |
-
|
21 |
|
22 |
def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], language:str, chosen_redact_entities:List[str], in_redact_method:str, in_allow_list:List[List[str]]=None, latest_file_completed:int=0, out_message:list=[], out_file_paths:list=[], log_files_output_paths:list=[], first_loop_state:bool=False, page_min:int=0, page_max:int=999, estimated_time_taken_state:float=0.0, progress=gr.Progress(track_tqdm=True)):
|
23 |
|
@@ -93,17 +98,20 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
93 |
print(out_message)
|
94 |
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state
|
95 |
|
96 |
-
if in_redact_method == "Image analysis":
|
97 |
# Analyse and redact image-based pdf or image
|
98 |
# if is_pdf_or_image(file_path) == False:
|
99 |
# return "Please upload a PDF file or image file (JPG, PNG) for image analysis.", None
|
100 |
|
101 |
-
print("Redacting file as image-based file")
|
102 |
-
pdf_images, output_logs = redact_image_pdf(file_path, image_paths, language, chosen_redact_entities, in_allow_list_flat, is_a_pdf, page_min, page_max)
|
103 |
out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
|
104 |
pdf_images[0].save(out_image_file_path, "PDF" ,resolution=100.0, save_all=True, append_images=pdf_images[1:])
|
105 |
|
106 |
out_file_paths.append(out_image_file_path)
|
|
|
|
|
|
|
107 |
out_message.append("File '" + file_path_without_ext + "' successfully redacted")
|
108 |
|
109 |
output_logs_str = str(output_logs)
|
@@ -118,16 +126,15 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
118 |
latest_file_completed += 1
|
119 |
|
120 |
elif in_redact_method == "Text analysis":
|
|
|
121 |
if is_pdf(file_path) == False:
|
122 |
return "Please upload a PDF file for text analysis. If you have an image, select 'Image analysis'.", None, None
|
123 |
-
|
124 |
# Analyse text-based pdf
|
125 |
print('Redacting file as text-based PDF')
|
126 |
-
pdf_text, output_logs = redact_text_pdf(file_path, language, chosen_redact_entities, in_allow_list_flat, page_min, page_max)
|
127 |
out_text_file_path = output_folder + file_path_without_ext + "_text_redacted.pdf"
|
128 |
-
pdf_text.save(out_text_file_path)
|
129 |
-
|
130 |
-
|
131 |
|
132 |
# Convert message
|
133 |
convert_message="Converting PDF to image-based PDF to embed redactions."
|
@@ -170,55 +177,60 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
170 |
|
171 |
return out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state
|
172 |
|
173 |
-
def merge_img_bboxes(bboxes, horizontal_threshold=150, vertical_threshold=25):
|
174 |
-
|
175 |
-
|
176 |
-
|
177 |
-
|
178 |
-
|
179 |
-
|
180 |
-
|
181 |
-
|
182 |
-
|
183 |
-
|
184 |
-
|
185 |
-
|
186 |
-
|
187 |
-
|
188 |
-
|
189 |
-
|
190 |
-
|
191 |
-
|
192 |
-
|
193 |
-
|
194 |
-
|
195 |
-
|
196 |
-
|
197 |
-
|
198 |
-
|
199 |
-
|
200 |
-
|
201 |
-
|
202 |
-
|
203 |
-
|
204 |
-
|
|
|
|
|
|
|
|
|
|
|
205 |
'''
|
206 |
Take an path for an image of a document, then run this image through the Presidio ImageAnalyzer and PIL to get a redacted page back. Adapted from Presidio ImageRedactorEngine.
|
207 |
'''
|
208 |
-
|
209 |
-
|
|
|
|
|
210 |
decision_process_output_str = ""
|
|
|
|
|
211 |
|
212 |
if not image_paths:
|
213 |
out_message = "PDF does not exist as images. Converting pages to image"
|
214 |
print(out_message)
|
215 |
-
#progress(0, desc=out_message)
|
216 |
|
217 |
image_paths = process_file(file_path)
|
218 |
|
219 |
-
print("image_paths:", image_paths)
|
220 |
-
|
221 |
-
|
222 |
if not isinstance(image_paths, list):
|
223 |
print("Converting image_paths to list")
|
224 |
image_paths = [image_paths]
|
@@ -235,84 +247,142 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
235 |
# Check that page_min and page_max are within expected ranges
|
236 |
if page_max > number_of_pages or page_max == 0:
|
237 |
page_max = number_of_pages
|
238 |
-
#else:
|
239 |
-
# page_max = page_max - 1
|
240 |
|
241 |
if page_min <= 0:
|
242 |
page_min = 0
|
243 |
else:
|
244 |
page_min = page_min - 1
|
245 |
|
246 |
-
print("Page range:", str(page_min), "to", str(page_max))
|
247 |
|
248 |
#for i in progress.tqdm(range(0,number_of_pages), total=number_of_pages, unit="pages", desc="Redacting pages"):
|
249 |
|
250 |
-
images = []
|
251 |
-
|
252 |
for n in range(0, number_of_pages):
|
|
|
253 |
|
254 |
try:
|
255 |
image = image_paths[0][n]#.copy()
|
256 |
print("Skipping page", str(n))
|
257 |
#print("image:", image)
|
258 |
except Exception as e:
|
259 |
-
print("Could not redact page:", str(
|
260 |
print(e)
|
261 |
continue
|
262 |
|
263 |
-
if n >= page_min and n
|
264 |
-
#for i in range(page_min, page_max):
|
265 |
|
266 |
i = n
|
267 |
|
268 |
-
|
269 |
|
270 |
-
|
271 |
-
#print("image_paths:", image_paths)
|
272 |
-
|
273 |
-
#image = ImageChops.duplicate(image_paths[i])
|
274 |
-
#print("Image paths i:", image_paths[0])
|
275 |
|
276 |
# Assuming image_paths[i] is your PIL image object
|
277 |
try:
|
278 |
image = image_paths[0][i]#.copy()
|
279 |
#print("image:", image)
|
280 |
except Exception as e:
|
281 |
-
print("Could not redact page:",
|
282 |
print(e)
|
283 |
continue
|
284 |
|
285 |
# %%
|
286 |
-
image_analyser = ImageAnalyzerEngine(nlp_analyser)
|
287 |
-
engine = ImageRedactorEngine(image_analyser)
|
288 |
|
289 |
if language == 'en':
|
290 |
ocr_lang = 'eng'
|
291 |
else: ocr_lang = language
|
292 |
|
293 |
-
bboxes = image_analyser.analyze(image,
|
294 |
-
|
295 |
-
|
296 |
-
|
297 |
-
|
298 |
-
|
299 |
-
|
300 |
-
|
301 |
-
|
302 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
303 |
if bboxes:
|
|
|
|
|
304 |
decision_process_output_str = str(bboxes)
|
305 |
print("Decision process:", decision_process_output_str)
|
306 |
-
|
307 |
-
#print("For page: ", str(i), "Bounding boxes: ", bboxes)
|
308 |
|
309 |
-
|
310 |
-
|
311 |
-
merged_bboxes = merge_img_bboxes(bboxes)
|
312 |
|
313 |
#print("For page:", str(i), "Merged bounding boxes:", merged_bboxes)
|
|
|
|
|
|
|
|
|
|
|
314 |
|
315 |
-
# 3. Draw the merged boxes (unchanged)
|
316 |
for box in merged_bboxes:
|
317 |
x0 = box.left
|
318 |
y0 = box.top
|
@@ -322,7 +392,7 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
322 |
|
323 |
images.append(image)
|
324 |
|
325 |
-
return images, decision_process_output_str
|
326 |
|
327 |
def analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list):
|
328 |
if isinstance(text_container, LTTextContainer):
|
@@ -343,16 +413,82 @@ def analyze_text_container(text_container, language, chosen_redact_entities, sco
|
|
343 |
return [], []
|
344 |
|
345 |
# Inside the loop where you process analyzer_results, merge bounding boxes that are right next to each other:
|
346 |
-
def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, vertical_padding=2):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
347 |
analyzed_bounding_boxes = []
|
348 |
-
|
349 |
-
|
350 |
-
|
351 |
-
current_y = None
|
352 |
|
|
|
|
|
353 |
for i, result in enumerate(analyzer_results):
|
354 |
-
print("Considering result", str(i))
|
355 |
-
|
|
|
|
|
|
|
356 |
if isinstance(char, LTChar):
|
357 |
char_box = list(char.bbox)
|
358 |
# Add vertical padding to the top of the box
|
@@ -378,24 +514,55 @@ def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, verti
|
|
378 |
# Reset current_box and current_y after appending
|
379 |
current_box = char_box
|
380 |
current_y = char_box[1]
|
381 |
-
|
382 |
# After finishing with the current result, add the last box for this result
|
383 |
if current_box:
|
384 |
merged_bounding_boxes.append({"boundingBox": current_box, "result": result})
|
385 |
current_box = None
|
386 |
current_y = None # Reset for the next result
|
387 |
|
388 |
-
|
389 |
-
|
390 |
-
|
391 |
-
|
392 |
-
|
393 |
-
|
394 |
-
|
395 |
-
|
396 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
397 |
|
398 |
-
|
399 |
|
400 |
return analyzed_bounding_boxes
|
401 |
|
@@ -437,7 +604,7 @@ def create_annotations_for_bounding_boxes(analyzed_bounding_boxes):
|
|
437 |
annotations_on_page.append(annotation)
|
438 |
return annotations_on_page
|
439 |
|
440 |
-
def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, page_min:int=0, page_max:int=999, progress=Progress(track_tqdm=True)):
|
441 |
'''
|
442 |
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
443 |
'''
|
@@ -469,6 +636,12 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
|
|
469 |
|
470 |
print("Page number is:", page_no)
|
471 |
|
|
|
|
|
|
|
|
|
|
|
|
|
472 |
annotations_on_page = []
|
473 |
decision_process_table_on_page = []
|
474 |
|
@@ -480,13 +653,23 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
|
|
480 |
text_container_analyzed_bounding_boxes = []
|
481 |
characters = []
|
482 |
|
483 |
-
|
484 |
-
|
485 |
-
|
486 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
487 |
|
488 |
-
|
489 |
-
|
490 |
|
491 |
decision_process_table_on_page = create_text_redaction_process_results(page_analyzer_results, page_analyzed_bounding_boxes, page_num)
|
492 |
|
|
|
1 |
+
import time
|
2 |
+
import re
|
3 |
+
import json
|
4 |
+
import io
|
5 |
+
import os
|
6 |
from PIL import Image, ImageChops, ImageDraw
|
7 |
from typing import List
|
8 |
import pandas as pd
|
9 |
+
|
10 |
from presidio_image_redactor.entities import ImageRecognizerResult
|
11 |
from pdfminer.high_level import extract_pages
|
|
|
12 |
from pdfminer.layout import LTTextContainer, LTChar, LTTextLine #, LTAnno
|
13 |
from pikepdf import Pdf, Dictionary, Name
|
14 |
+
import gradio as gr
|
15 |
from gradio import Progress
|
16 |
+
|
|
|
17 |
from collections import defaultdict # For efficient grouping
|
18 |
|
19 |
+
from tools.custom_image_analyser_engine import CustomImageAnalyzerEngine, OCRResult
|
20 |
+
from tools.file_conversion import process_file
|
21 |
from tools.load_spacy_model_custom_recognisers import nlp_analyser, score_threshold
|
22 |
from tools.helper_functions import get_file_path_end, output_folder
|
23 |
from tools.file_conversion import process_file, is_pdf, convert_text_pdf_to_img_pdf
|
24 |
from tools.data_anonymise import generate_decision_process_output
|
25 |
+
from tools.aws_textract import analyse_page_with_textract, convert_pike_pdf_page_to_bytes, json_to_ocrresult
|
|
|
26 |
|
27 |
def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], language:str, chosen_redact_entities:List[str], in_redact_method:str, in_allow_list:List[List[str]]=None, latest_file_completed:int=0, out_message:list=[], out_file_paths:list=[], log_files_output_paths:list=[], first_loop_state:bool=False, page_min:int=0, page_max:int=999, estimated_time_taken_state:float=0.0, progress=gr.Progress(track_tqdm=True)):
|
28 |
|
|
|
98 |
print(out_message)
|
99 |
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state
|
100 |
|
101 |
+
if in_redact_method == "Image analysis" or in_redact_method == "AWS Textract":
|
102 |
# Analyse and redact image-based pdf or image
|
103 |
# if is_pdf_or_image(file_path) == False:
|
104 |
# return "Please upload a PDF file or image file (JPG, PNG) for image analysis.", None
|
105 |
|
106 |
+
print("Redacting file" + file_path_without_ext + "as an image-based file")
|
107 |
+
pdf_images, output_logs, logging_file_paths = redact_image_pdf(file_path, image_paths, language, chosen_redact_entities, in_allow_list_flat, is_a_pdf, page_min, page_max, in_redact_method)
|
108 |
out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
|
109 |
pdf_images[0].save(out_image_file_path, "PDF" ,resolution=100.0, save_all=True, append_images=pdf_images[1:])
|
110 |
|
111 |
out_file_paths.append(out_image_file_path)
|
112 |
+
if logging_file_paths:
|
113 |
+
log_files_output_paths.extend(logging_file_paths)
|
114 |
+
|
115 |
out_message.append("File '" + file_path_without_ext + "' successfully redacted")
|
116 |
|
117 |
output_logs_str = str(output_logs)
|
|
|
126 |
latest_file_completed += 1
|
127 |
|
128 |
elif in_redact_method == "Text analysis":
|
129 |
+
|
130 |
if is_pdf(file_path) == False:
|
131 |
return "Please upload a PDF file for text analysis. If you have an image, select 'Image analysis'.", None, None
|
132 |
+
|
133 |
# Analyse text-based pdf
|
134 |
print('Redacting file as text-based PDF')
|
135 |
+
pdf_text, output_logs = redact_text_pdf(file_path, language, chosen_redact_entities, in_allow_list_flat, page_min, page_max, "Text analysis")
|
136 |
out_text_file_path = output_folder + file_path_without_ext + "_text_redacted.pdf"
|
137 |
+
pdf_text.save(out_text_file_path)
|
|
|
|
|
138 |
|
139 |
# Convert message
|
140 |
convert_message="Converting PDF to image-based PDF to embed redactions."
|
|
|
177 |
|
178 |
return out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state
|
179 |
|
180 |
+
def merge_img_bboxes(bboxes, handwriting_or_signature_boxes = [], horizontal_threshold=150, vertical_threshold=25):
|
181 |
+
merged_bboxes = []
|
182 |
+
grouped_bboxes = defaultdict(list)
|
183 |
+
|
184 |
+
if handwriting_or_signature_boxes:
|
185 |
+
print("Handwriting or signature boxes exist at merge:", handwriting_or_signature_boxes)
|
186 |
+
bboxes.extend(handwriting_or_signature_boxes)
|
187 |
+
|
188 |
+
# 1. Group by approximate vertical proximity
|
189 |
+
for box in bboxes:
|
190 |
+
grouped_bboxes[round(box.top / vertical_threshold)].append(box)
|
191 |
+
|
192 |
+
# 2. Merge within each group
|
193 |
+
for _, group in grouped_bboxes.items():
|
194 |
+
group.sort(key=lambda box: box.left)
|
195 |
+
|
196 |
+
merged_box = group[0]
|
197 |
+
for next_box in group[1:]:
|
198 |
+
if next_box.left - (merged_box.left + merged_box.width) <= horizontal_threshold:
|
199 |
+
#print("Merging a box")
|
200 |
+
# Calculate new dimensions for the merged box
|
201 |
+
print("Merged box:", merged_box)
|
202 |
+
new_left = min(merged_box.left, next_box.left)
|
203 |
+
new_top = min(merged_box.top, next_box.top)
|
204 |
+
new_width = max(merged_box.left + merged_box.width, next_box.left + next_box.width) - new_left
|
205 |
+
new_height = max(merged_box.top + merged_box.height, next_box.top + next_box.height) - new_top
|
206 |
+
merged_box = ImageRecognizerResult(
|
207 |
+
merged_box.entity_type, merged_box.start, merged_box.end, merged_box.score, new_left, new_top, new_width, new_height
|
208 |
+
)
|
209 |
+
else:
|
210 |
+
merged_bboxes.append(merged_box)
|
211 |
+
merged_box = next_box
|
212 |
+
|
213 |
+
merged_bboxes.append(merged_box)
|
214 |
+
return merged_bboxes
|
215 |
+
|
216 |
+
def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, is_a_pdf:bool=True, page_min:int=0, page_max:int=999, analysis_type:str="Image analysis", progress=Progress(track_tqdm=True)):
|
217 |
'''
|
218 |
Take an path for an image of a document, then run this image through the Presidio ImageAnalyzer and PIL to get a redacted page back. Adapted from Presidio ImageRedactorEngine.
|
219 |
'''
|
220 |
+
# json_file_path is for AWS Textract outputs
|
221 |
+
logging_file_paths = []
|
222 |
+
file_name = get_file_path_end(file_path)
|
223 |
+
fill = (0, 0, 0) # Fill colour
|
224 |
decision_process_output_str = ""
|
225 |
+
images = []
|
226 |
+
image_analyser = CustomImageAnalyzerEngine(nlp_analyser)
|
227 |
|
228 |
if not image_paths:
|
229 |
out_message = "PDF does not exist as images. Converting pages to image"
|
230 |
print(out_message)
|
|
|
231 |
|
232 |
image_paths = process_file(file_path)
|
233 |
|
|
|
|
|
|
|
234 |
if not isinstance(image_paths, list):
|
235 |
print("Converting image_paths to list")
|
236 |
image_paths = [image_paths]
|
|
|
247 |
# Check that page_min and page_max are within expected ranges
|
248 |
if page_max > number_of_pages or page_max == 0:
|
249 |
page_max = number_of_pages
|
|
|
|
|
250 |
|
251 |
if page_min <= 0:
|
252 |
page_min = 0
|
253 |
else:
|
254 |
page_min = page_min - 1
|
255 |
|
256 |
+
print("Page range:", str(page_min + 1), "to", str(page_max))
|
257 |
|
258 |
#for i in progress.tqdm(range(0,number_of_pages), total=number_of_pages, unit="pages", desc="Redacting pages"):
|
259 |
|
|
|
|
|
260 |
for n in range(0, number_of_pages):
|
261 |
+
handwriting_or_signature_boxes = []
|
262 |
|
263 |
try:
|
264 |
image = image_paths[0][n]#.copy()
|
265 |
print("Skipping page", str(n))
|
266 |
#print("image:", image)
|
267 |
except Exception as e:
|
268 |
+
print("Could not redact page:", str(n), "due to:")
|
269 |
print(e)
|
270 |
continue
|
271 |
|
272 |
+
if n >= page_min and n < page_max:
|
|
|
273 |
|
274 |
i = n
|
275 |
|
276 |
+
reported_page_number = str(i + 1)
|
277 |
|
278 |
+
print("Redacting page", reported_page_number)
|
|
|
|
|
|
|
|
|
279 |
|
280 |
# Assuming image_paths[i] is your PIL image object
|
281 |
try:
|
282 |
image = image_paths[0][i]#.copy()
|
283 |
#print("image:", image)
|
284 |
except Exception as e:
|
285 |
+
print("Could not redact page:", reported_page_number, "due to:")
|
286 |
print(e)
|
287 |
continue
|
288 |
|
289 |
# %%
|
290 |
+
# image_analyser = ImageAnalyzerEngine(nlp_analyser)
|
291 |
+
# engine = ImageRedactorEngine(image_analyser)
|
292 |
|
293 |
if language == 'en':
|
294 |
ocr_lang = 'eng'
|
295 |
else: ocr_lang = language
|
296 |
|
297 |
+
# bboxes = image_analyser.analyze(image,
|
298 |
+
# ocr_kwargs={"lang": ocr_lang},
|
299 |
+
# **{
|
300 |
+
# "allow_list": allow_list,
|
301 |
+
# "language": language,
|
302 |
+
# "entities": chosen_redact_entities,
|
303 |
+
# "score_threshold": score_threshold,
|
304 |
+
# "return_decision_process":True,
|
305 |
+
# })
|
306 |
+
|
307 |
+
# Step 1: Perform OCR. Either with Tesseract, or with AWS Textract
|
308 |
+
if analysis_type == "Image analysis":
|
309 |
+
ocr_results = image_analyser.perform_ocr(image)
|
310 |
+
|
311 |
+
# Process all OCR text with bounding boxes
|
312 |
+
#print("OCR results:", ocr_results)
|
313 |
+
ocr_results_str = str(ocr_results)
|
314 |
+
ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_page_" + reported_page_number + ".txt"
|
315 |
+
with open(ocr_results_file_path, "w") as f:
|
316 |
+
f.write(ocr_results_str)
|
317 |
+
logging_file_paths.append(ocr_results_file_path)
|
318 |
+
|
319 |
+
# Import results from json and convert
|
320 |
+
if analysis_type == "AWS Textract":
|
321 |
+
|
322 |
+
# Ensure image is a PIL Image object
|
323 |
+
# if isinstance(image, str):
|
324 |
+
# image = Image.open(image)
|
325 |
+
# elif not isinstance(image, Image.Image):
|
326 |
+
# print(f"Unexpected image type on page {i}: {type(image)}")
|
327 |
+
# continue
|
328 |
+
|
329 |
+
# Convert the image to bytes using an in-memory buffer
|
330 |
+
image_buffer = io.BytesIO()
|
331 |
+
image.save(image_buffer, format='PNG') # Save as PNG, or adjust format if needed
|
332 |
+
pdf_page_as_bytes = image_buffer.getvalue()
|
333 |
+
|
334 |
+
json_file_path = output_folder + file_name + "_page_" + reported_page_number + "_textract.json"
|
335 |
+
|
336 |
+
if not os.path.exists(json_file_path):
|
337 |
+
text_blocks = analyse_page_with_textract(pdf_page_as_bytes, json_file_path) # Analyse page with Textract
|
338 |
+
logging_file_paths.append(json_file_path)
|
339 |
+
else:
|
340 |
+
# Open the file and load the JSON data
|
341 |
+
print("Found existing Textract json results file for this page.")
|
342 |
+
with open(json_file_path, 'r') as json_file:
|
343 |
+
text_blocks = json.load(json_file)
|
344 |
+
text_blocks = text_blocks['Blocks']
|
345 |
+
|
346 |
+
|
347 |
+
# Need image size to convert textract OCR outputs to the correct sizes
|
348 |
+
#print("Image size:", image.size)
|
349 |
+
page_width, page_height = image.size
|
350 |
+
|
351 |
+
ocr_results, handwriting_or_signature_boxes = json_to_ocrresult(text_blocks, page_width, page_height)
|
352 |
+
|
353 |
+
#print("OCR results:", ocr_results)
|
354 |
+
ocr_results_str = str(ocr_results)
|
355 |
+
textract_ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_page_" + reported_page_number + "_textract.txt"
|
356 |
+
with open(textract_ocr_results_file_path, "w") as f:
|
357 |
+
f.write(ocr_results_str)
|
358 |
+
logging_file_paths.append(textract_ocr_results_file_path)
|
359 |
+
|
360 |
+
# Step 2: Analyze text and identify PII
|
361 |
+
bboxes = image_analyser.analyze_text(
|
362 |
+
ocr_results,
|
363 |
+
language=language,
|
364 |
+
entities=chosen_redact_entities,
|
365 |
+
allow_list=allow_list,
|
366 |
+
score_threshold=score_threshold,
|
367 |
+
)
|
368 |
+
|
369 |
+
# Process the bboxes (PII entities)
|
370 |
if bboxes:
|
371 |
+
for bbox in bboxes:
|
372 |
+
print(f"Entity: {bbox.entity_type}, Text: {bbox.text}, Bbox: ({bbox.left}, {bbox.top}, {bbox.width}, {bbox.height})")
|
373 |
decision_process_output_str = str(bboxes)
|
374 |
print("Decision process:", decision_process_output_str)
|
|
|
|
|
375 |
|
376 |
+
# Merge close bounding boxes
|
377 |
+
merged_bboxes = merge_img_bboxes(bboxes, handwriting_or_signature_boxes)
|
|
|
378 |
|
379 |
#print("For page:", str(i), "Merged bounding boxes:", merged_bboxes)
|
380 |
+
#from PIL import Image
|
381 |
+
#image_object = Image.open(image)
|
382 |
+
|
383 |
+
# 3. Draw the merged boxes
|
384 |
+
draw = ImageDraw.Draw(image)
|
385 |
|
|
|
386 |
for box in merged_bboxes:
|
387 |
x0 = box.left
|
388 |
y0 = box.top
|
|
|
392 |
|
393 |
images.append(image)
|
394 |
|
395 |
+
return images, decision_process_output_str, logging_file_paths
|
396 |
|
397 |
def analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list):
|
398 |
if isinstance(text_container, LTTextContainer):
|
|
|
413 |
return [], []
|
414 |
|
415 |
# Inside the loop where you process analyzer_results, merge bounding boxes that are right next to each other:
|
416 |
+
# def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, vertical_padding=2):
|
417 |
+
# '''
|
418 |
+
# Merge identified bounding boxes containing PII that are very close to one another
|
419 |
+
# '''
|
420 |
+
# analyzed_bounding_boxes = []
|
421 |
+
# if len(analyzer_results) > 0 and len(characters) > 0:
|
422 |
+
# merged_bounding_boxes = []
|
423 |
+
# current_box = None
|
424 |
+
# current_y = None
|
425 |
+
|
426 |
+
# for i, result in enumerate(analyzer_results):
|
427 |
+
# print("Considering result", str(i))
|
428 |
+
# for char in characters[result.start : result.end]:
|
429 |
+
# if isinstance(char, LTChar):
|
430 |
+
# char_box = list(char.bbox)
|
431 |
+
# # Add vertical padding to the top of the box
|
432 |
+
# char_box[3] += vertical_padding
|
433 |
+
|
434 |
+
# if current_y is None or current_box is None:
|
435 |
+
# current_box = char_box
|
436 |
+
# current_y = char_box[1]
|
437 |
+
# else:
|
438 |
+
# vertical_diff_bboxes = abs(char_box[1] - current_y)
|
439 |
+
# horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
|
440 |
+
|
441 |
+
# if (
|
442 |
+
# vertical_diff_bboxes <= 5
|
443 |
+
# and horizontal_diff_bboxes <= combine_pixel_dist
|
444 |
+
# ):
|
445 |
+
# current_box[2] = char_box[2] # Extend the current box horizontally
|
446 |
+
# current_box[3] = max(current_box[3], char_box[3]) # Ensure the top is the highest
|
447 |
+
# else:
|
448 |
+
# merged_bounding_boxes.append(
|
449 |
+
# {"boundingBox": current_box, "result": result})
|
450 |
+
|
451 |
+
# # Reset current_box and current_y after appending
|
452 |
+
# current_box = char_box
|
453 |
+
# current_y = char_box[1]
|
454 |
+
|
455 |
+
# # After finishing with the current result, add the last box for this result
|
456 |
+
# if current_box:
|
457 |
+
# merged_bounding_boxes.append({"boundingBox": current_box, "result": result})
|
458 |
+
# current_box = None
|
459 |
+
# current_y = None # Reset for the next result
|
460 |
+
|
461 |
+
# if not merged_bounding_boxes:
|
462 |
+
# analyzed_bounding_boxes.extend(
|
463 |
+
# {"boundingBox": char.bbox, "result": result}
|
464 |
+
# for result in analyzer_results
|
465 |
+
# for char in characters[result.start:result.end]
|
466 |
+
# if isinstance(char, LTChar)
|
467 |
+
# )
|
468 |
+
# else:
|
469 |
+
# analyzed_bounding_boxes.extend(merged_bounding_boxes)
|
470 |
+
|
471 |
+
# print("analysed_bounding_boxes:\n\n", analyzed_bounding_boxes)
|
472 |
+
|
473 |
+
# return analyzed_bounding_boxes
|
474 |
+
|
475 |
+
def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, vertical_padding=2, signature_bounding_boxes=None):
|
476 |
+
'''
|
477 |
+
Merge identified bounding boxes containing PII or signatures that are very close to one another.
|
478 |
+
'''
|
479 |
analyzed_bounding_boxes = []
|
480 |
+
merged_bounding_boxes = []
|
481 |
+
current_box = None
|
482 |
+
current_y = None
|
|
|
483 |
|
484 |
+
# Handle PII and text bounding boxes first
|
485 |
+
if len(analyzer_results) > 0 and len(characters) > 0:
|
486 |
for i, result in enumerate(analyzer_results):
|
487 |
+
#print("Considering result", str(i))
|
488 |
+
#print("Result:", result)
|
489 |
+
#print("Characters:", characters)
|
490 |
+
|
491 |
+
for char in characters[result.start: result.end]:
|
492 |
if isinstance(char, LTChar):
|
493 |
char_box = list(char.bbox)
|
494 |
# Add vertical padding to the top of the box
|
|
|
514 |
# Reset current_box and current_y after appending
|
515 |
current_box = char_box
|
516 |
current_y = char_box[1]
|
517 |
+
|
518 |
# After finishing with the current result, add the last box for this result
|
519 |
if current_box:
|
520 |
merged_bounding_boxes.append({"boundingBox": current_box, "result": result})
|
521 |
current_box = None
|
522 |
current_y = None # Reset for the next result
|
523 |
|
524 |
+
# Handle signature bounding boxes (without specific characters)
|
525 |
+
if signature_bounding_boxes is not None:
|
526 |
+
for sig_box in signature_bounding_boxes:
|
527 |
+
sig_box = list(sig_box) # Ensure it's a list to modify the values
|
528 |
+
if current_y is None or current_box is None:
|
529 |
+
current_box = sig_box
|
530 |
+
current_y = sig_box[1]
|
531 |
+
else:
|
532 |
+
vertical_diff_bboxes = abs(sig_box[1] - current_y)
|
533 |
+
horizontal_diff_bboxes = abs(sig_box[0] - current_box[2])
|
534 |
+
|
535 |
+
if (
|
536 |
+
vertical_diff_bboxes <= 5
|
537 |
+
and horizontal_diff_bboxes <= combine_pixel_dist
|
538 |
+
):
|
539 |
+
current_box[2] = sig_box[2] # Extend the current box horizontally
|
540 |
+
current_box[3] = max(current_box[3], sig_box[3]) # Ensure the top is the highest
|
541 |
+
else:
|
542 |
+
merged_bounding_boxes.append({"boundingBox": current_box, "type": "signature"})
|
543 |
+
|
544 |
+
# Reset current_box and current_y after appending
|
545 |
+
current_box = sig_box
|
546 |
+
current_y = sig_box[1]
|
547 |
+
|
548 |
+
# Add the last bounding box for the signature
|
549 |
+
if current_box:
|
550 |
+
merged_bounding_boxes.append({"boundingBox": current_box, "type": "signature"})
|
551 |
+
current_box = None
|
552 |
+
current_y = None
|
553 |
+
|
554 |
+
# If no bounding boxes were merged, add individual character bounding boxes
|
555 |
+
if not merged_bounding_boxes:
|
556 |
+
analyzed_bounding_boxes.extend(
|
557 |
+
{"boundingBox": char.bbox, "result": result}
|
558 |
+
for result in analyzer_results
|
559 |
+
for char in characters[result.start:result.end]
|
560 |
+
if isinstance(char, LTChar)
|
561 |
+
)
|
562 |
+
else:
|
563 |
+
analyzed_bounding_boxes.extend(merged_bounding_boxes)
|
564 |
|
565 |
+
#print("analysed_bounding_boxes:\n\n", analyzed_bounding_boxes)
|
566 |
|
567 |
return analyzed_bounding_boxes
|
568 |
|
|
|
604 |
annotations_on_page.append(annotation)
|
605 |
return annotations_on_page
|
606 |
|
607 |
+
def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, page_min:int=0, page_max:int=999, analysis_type:str = "Text analysis", progress=Progress(track_tqdm=True)):
|
608 |
'''
|
609 |
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
610 |
'''
|
|
|
636 |
|
637 |
print("Page number is:", page_no)
|
638 |
|
639 |
+
# The /MediaBox in a PDF specifies the size of the page [left, bottom, right, top]
|
640 |
+
media_box = page.MediaBox
|
641 |
+
page_width = media_box[2] - media_box[0]
|
642 |
+
page_height = media_box[3] - media_box[1]
|
643 |
+
|
644 |
+
|
645 |
annotations_on_page = []
|
646 |
decision_process_table_on_page = []
|
647 |
|
|
|
653 |
text_container_analyzed_bounding_boxes = []
|
654 |
characters = []
|
655 |
|
656 |
+
if analysis_type == "Text analysis":
|
657 |
+
for i, text_container in enumerate(page_layout):
|
658 |
+
|
659 |
+
text_container_analyzer_results, characters = analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list)
|
660 |
+
|
661 |
+
# Merge bounding boxes if very close together
|
662 |
+
text_container_analyzed_bounding_boxes = merge_bounding_boxes(text_container_analyzer_results, characters, combine_pixel_dist, vertical_padding = 2)
|
663 |
+
|
664 |
+
|
665 |
+
page_analyzed_bounding_boxes.extend(text_container_analyzed_bounding_boxes)
|
666 |
+
page_analyzer_results.extend(text_container_analyzer_results)
|
667 |
+
|
668 |
+
# Merge bounding boxes if very close together
|
669 |
+
text_container_analyzed_bounding_boxes = merge_bounding_boxes(text_container_analyzer_results, characters, combine_pixel_dist, vertical_padding = 2)
|
670 |
|
671 |
+
page_analyzed_bounding_boxes.extend(text_container_analyzed_bounding_boxes)
|
672 |
+
page_analyzer_results.extend(text_container_analyzer_results)
|
673 |
|
674 |
decision_process_table_on_page = create_text_redaction_process_results(page_analyzer_results, page_analyzed_bounding_boxes, page_num)
|
675 |
|