document_redaction / tools /file_redaction.py
seanpedrickcase's picture
Can now redaction text or csv/xlsx files. Can redact multiple files. Embeds redactions as image-based file by default
7810536
raw
history blame
16.5 kB
from PIL import Image
from typing import List
import pandas as pd
from presidio_image_redactor import ImageRedactorEngine, ImageAnalyzerEngine
from presidio_image_redactor.entities import ImageRecognizerResult
from pdfminer.high_level import extract_pages
from tools.file_conversion import process_file
from pdfminer.layout import LTTextContainer, LTChar, LTTextLine #, LTAnno
from pikepdf import Pdf, Dictionary, Name
from gradio import Progress
import time
from collections import defaultdict # For efficient grouping
from tools.load_spacy_model_custom_recognisers import nlp_analyser, score_threshold
from tools.helper_functions import get_file_path_end, output_folder
from tools.file_conversion import process_file, is_pdf, convert_text_pdf_to_img_pdf
import gradio as gr
def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], language:str, chosen_redact_entities:List[str], in_redact_method:str, in_allow_list:List[List[str]]=None, progress=gr.Progress(track_tqdm=True)):
tic = time.perf_counter()
out_message = []
out_file_paths = []
if in_allow_list:
in_allow_list_flat = [item for sublist in in_allow_list for item in sublist]
print("File paths:", file_paths)
for file in progress.tqdm(file_paths, desc="Redacting files", unit = "files"):
file_path = file.name
if file_path:
file_path_without_ext = get_file_path_end(file_path)
if is_pdf(file_path) == False:
# If user has not submitted a pdf, assume it's an image
print("File is not a pdf, assuming that image analysis needs to be used.")
in_redact_method = "Image analysis"
else:
out_message = "No file selected"
print(out_message)
return out_message, out_file_paths
if in_redact_method == "Image analysis":
# Analyse and redact image-based pdf or image
# if is_pdf_or_image(file_path) == False:
# return "Please upload a PDF file or image file (JPG, PNG) for image analysis.", None
print("Redacting file as image-based pdf")
pdf_images = redact_image_pdf(file_path, image_paths, language, chosen_redact_entities, in_allow_list_flat)
out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
pdf_images[0].save(out_image_file_path, "PDF" ,resolution=100.0, save_all=True, append_images=pdf_images[1:])
out_file_paths.append(out_image_file_path)
out_message.append("File '" + file_path_without_ext + "' successfully redacted and saved to file.")
elif in_redact_method == "Text analysis":
if is_pdf(file_path) == False:
return "Please upload a PDF file for text analysis. If you have an image, select 'Image analysis'.", None, None
# Analyse text-based pdf
print('Redacting file as text-based PDF')
pdf_text = redact_text_pdf(file_path, language, chosen_redact_entities, in_allow_list_flat)
out_text_file_path = output_folder + file_path_without_ext + "_text_redacted.pdf"
pdf_text.save(out_text_file_path)
#out_file_paths.append(out_text_file_path)
out_message_new = "File " + file_path_without_ext + " successfully redacted."
out_message.append(out_message_new)
# Convert message
convert_message="Converting PDF to image-based PDF to embed redactions."
#progress(0.8, desc=convert_message)
print(convert_message)
# Convert document to image-based document to 'embed' redactions
img_output_summary, img_output_file_path = convert_text_pdf_to_img_pdf(file_path, [out_text_file_path])
out_file_paths.extend(img_output_file_path)
# Add confirmation for converting to image if you want
# out_message.append(img_output_summary)
else:
out_message = "No redaction method selected"
print(out_message)
return out_message, out_file_paths
toc = time.perf_counter()
out_time = f"Time taken: {toc - tic:0.1f} seconds."
print(out_time)
out_message_out = '\n'.join(out_message)
out_message_out = out_message_out + "\n\n" + out_time
return out_message_out, out_file_paths, out_file_paths
def merge_img_bboxes(bboxes, horizontal_threshold=150, vertical_threshold=25):
merged_bboxes = []
grouped_bboxes = defaultdict(list)
# 1. Group by approximate vertical proximity
for box in bboxes:
grouped_bboxes[round(box.top / vertical_threshold)].append(box)
# 2. Merge within each group
for _, group in grouped_bboxes.items():
group.sort(key=lambda box: box.left)
merged_box = group[0]
for next_box in group[1:]:
if next_box.left - (merged_box.left + merged_box.width) <= horizontal_threshold:
print("Merging a box")
# Calculate new dimensions for the merged box
new_left = min(merged_box.left, next_box.left)
new_top = min(merged_box.top, next_box.top)
new_width = max(merged_box.left + merged_box.width, next_box.left + next_box.width) - new_left
new_height = max(merged_box.top + merged_box.height, next_box.top + next_box.height) - new_top
merged_box = ImageRecognizerResult(
merged_box.entity_type, merged_box.start, merged_box.end, merged_box.score, new_left, new_top, new_width, new_height
)
else:
merged_bboxes.append(merged_box)
merged_box = next_box
merged_bboxes.append(merged_box)
return merged_bboxes
def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, progress=Progress(track_tqdm=True)):
'''
Take an path for an image of a document, then run this image through the Presidio ImageAnalyzer and PIL to get a redacted page back. Adapted from Presidio ImageRedactorEngine.
'''
from PIL import Image, ImageChops, ImageDraw
fill = (0, 0, 0)
if not image_paths:
out_message = "PDF does not exist as images. Converting pages to image"
print(out_message)
#progress(0, desc=out_message)
image_paths = process_file(file_path)
images = []
number_of_pages = len(image_paths)
out_message = "Redacting pages"
print(out_message)
#progress(0.1, desc=out_message)
#for i in progress.tqdm(range(0,number_of_pages), total=number_of_pages, unit="pages", desc="Redacting pages"):
for i in range(0, number_of_pages):
print("Redacting page ", str(i + 1))
# Get the image to redact using PIL lib (pillow)
image = image_paths[i] #Image.open(image_paths[i])
image = ImageChops.duplicate(image)
# %%
image_analyser = ImageAnalyzerEngine(nlp_analyser)
engine = ImageRedactorEngine(image_analyser)
if language == 'en':
ocr_lang = 'eng'
else: ocr_lang = language
bboxes = image_analyser.analyze(image,ocr_kwargs={"lang": ocr_lang},
**{
"allow_list": allow_list,
"language": language,
"entities": chosen_redact_entities,
"score_threshold": score_threshold
})
#print("For page: ", str(i), "Bounding boxes: ", bboxes)
draw = ImageDraw.Draw(image)
merged_bboxes = merge_img_bboxes(bboxes)
print("For page: ", str(i), "Merged bounding boxes: ", merged_bboxes)
# 3. Draw the merged boxes (unchanged)
for box in merged_bboxes:
x0 = box.left
y0 = box.top
x1 = x0 + box.width
y1 = y0 + box.height
draw.rectangle([x0, y0, x1, y1], fill=fill)
images.append(image)
return images
def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, progress=Progress(track_tqdm=True)):
'''
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
'''
combined_analyzer_results = []
analyser_explanations = []
annotations_all_pages = []
analyzed_bounding_boxes_df = pd.DataFrame()
# Horizontal distance between PII bounding boxes under/equal they are combined into one
combine_pixel_dist = 100
pdf = Pdf.open(filename)
page_num = 0
#for page in progress.tqdm(pdf.pages, total=len(pdf.pages), unit="pages", desc="Redacting pages"):
for page in pdf.pages:
print("Page number is: ", page_num + 1)
annotations_on_page = []
analyzed_bounding_boxes = []
for page_layout in extract_pages(filename, page_numbers = [page_num], maxpages=1):
analyzer_results = []
for text_container in page_layout:
if isinstance(text_container, LTTextContainer):
text_to_analyze = text_container.get_text()
analyzer_results = []
characters = []
analyzer_results = nlp_analyser.analyze(text=text_to_analyze,
language=language,
entities=chosen_redact_entities,
score_threshold=score_threshold,
return_decision_process=False,
allow_list=allow_list)
characters = [char # This is what we want to include in the list
for line in text_container # Loop through each line in text_container
if isinstance(line, LTTextLine) # Check if the line is an instance of LTTextLine
for char in line] # Loop through each character in the line
#if isinstance(char, LTChar)] # Check if the character is not an instance of LTAnno #isinstance(char, LTChar) or
# if len(analyzer_results) > 0 and len(characters) > 0:
# analyzed_bounding_boxes.extend({"boundingBox": char.bbox, "result": result} for result in analyzer_results for char in characters[result.start:result.end] if isinstance(char, LTChar))
# combined_analyzer_results.extend(analyzer_results)
# Inside the loop where you process analyzer_results:
if len(analyzer_results) > 0 and len(characters) > 0:
merged_bounding_boxes = []
current_box = None
current_y = None
for result in analyzer_results:
for char in characters[result.start : result.end]:
if isinstance(char, LTChar):
char_box = list(char.bbox)
# Fix: Check if either current_y or current_box are None
if current_y is None or current_box is None:
# This is the first character, so initialize current_box and current_y
current_box = char_box
current_y = char_box[1]
else: # Now we have previous values to compare
print("Comparing values")
vertical_diff_bboxes = abs(char_box[1] - current_y)
horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
#print("Vertical distance with last bbox: ", str(vertical_diff_bboxes), "Horizontal distance: ", str(horizontal_diff_bboxes), "For result: ", result)
if (
vertical_diff_bboxes <= 5
and horizontal_diff_bboxes <= combine_pixel_dist
):
old_right_pos = current_box[2]
current_box[2] = char_box[2]
print("Old right pos: ", str(old_right_pos), "has been replaced with: ", str(current_box[2]), "for result: ", result)
else:
merged_bounding_boxes.append(
{"boundingBox": current_box, "result": result})
current_box = char_box
current_y = char_box[1]
# Add the last box
if current_box:
merged_bounding_boxes.append({"boundingBox": current_box, "result": result})
if not merged_bounding_boxes:
analyzed_bounding_boxes.extend({"boundingBox": char.bbox, "result": result} for result in analyzer_results for char in characters[result.start:result.end] if isinstance(char, LTChar))
else:
analyzed_bounding_boxes.extend(merged_bounding_boxes)
combined_analyzer_results.extend(analyzer_results)
if len(analyzer_results) > 0:
# Create summary df of annotations to be made
analyzed_bounding_boxes_df_new = pd.DataFrame(analyzed_bounding_boxes)
analyzed_bounding_boxes_df_text = analyzed_bounding_boxes_df_new['result'].astype(str).str.split(",",expand=True).replace(".*: ", "", regex=True)
analyzed_bounding_boxes_df_text.columns = ["type", "start", "end", "score"]
analyzed_bounding_boxes_df_new = pd.concat([analyzed_bounding_boxes_df_new, analyzed_bounding_boxes_df_text], axis = 1)
analyzed_bounding_boxes_df_new['page'] = page_num + 1
analyzed_bounding_boxes_df = pd.concat([analyzed_bounding_boxes_df, analyzed_bounding_boxes_df_new], axis = 0)
for analyzed_bounding_box in analyzed_bounding_boxes:
bounding_box = analyzed_bounding_box["boundingBox"]
annotation = Dictionary(
Type=Name.Annot,
Subtype=Name.Square, #Name.Highlight,
QuadPoints=[bounding_box[0], bounding_box[3], bounding_box[2], bounding_box[3], bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[1]],
Rect=[bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[3]],
C=[0, 0, 0],
IC=[0, 0, 0],
CA=1, # Transparency
T=analyzed_bounding_box["result"].entity_type,
BS=Dictionary(
W=0, # Border width: 1 point
S=Name.S # Border style: solid
)
)
annotations_on_page.append(annotation)
annotations_all_pages.extend([annotations_on_page])
print("For page number: ", page_num, " there are ", len(annotations_all_pages[page_num]), " annotations")
page.Annots = pdf.make_indirect(annotations_on_page)
page_num += 1
analyzed_bounding_boxes_df.to_csv(output_folder + "annotations_made.csv")
return pdf