|
import os
|
|
import re
|
|
import gradio as gr
|
|
import pandas as pd
|
|
import unicodedata
|
|
from gradio_image_annotation import image_annotator
|
|
|
|
def reset_state_vars():
|
|
return [], [], pd.DataFrame(), pd.DataFrame(), 0, "", image_annotator(
|
|
label="Modify redaction boxes",
|
|
label_list=["Redaction"],
|
|
label_colors=[(0, 0, 0)],
|
|
show_label=False,
|
|
sources=None,
|
|
show_clear_button=False,
|
|
show_share_button=False,
|
|
show_remove_button=False,
|
|
interactive=False
|
|
)
|
|
|
|
def get_or_create_env_var(var_name, default_value):
|
|
|
|
value = os.environ.get(var_name)
|
|
|
|
|
|
if value is None:
|
|
os.environ[var_name] = default_value
|
|
value = default_value
|
|
|
|
return value
|
|
|
|
|
|
|
|
text_ocr_option = "Simple text analysis - docs with selectable text"
|
|
tesseract_ocr_option = "OCR analysis for documents without selectable text - best for typed text"
|
|
textract_option = "Complex image analysis - docs with handwriting/signatures (AWS Textract)"
|
|
|
|
local_pii_detector = "Local"
|
|
aws_pii_detector = "AWS Comprehend"
|
|
|
|
|
|
|
|
env_var_name = 'GRADIO_OUTPUT_FOLDER'
|
|
default_value = 'output/'
|
|
|
|
output_folder = get_or_create_env_var(env_var_name, default_value)
|
|
print(f'The value of {env_var_name} is {output_folder}')
|
|
|
|
def load_in_default_allow_list(allow_list_file_path):
|
|
if isinstance(allow_list_file_path, str):
|
|
allow_list_file_path = [allow_list_file_path]
|
|
return allow_list_file_path
|
|
|
|
|
|
def get_file_path_end(file_path):
|
|
|
|
basename = os.path.basename(file_path)
|
|
|
|
|
|
filename_without_extension, _ = os.path.splitext(basename)
|
|
|
|
|
|
|
|
return filename_without_extension
|
|
|
|
def detect_file_type(filename):
|
|
"""Detect the file type based on its extension."""
|
|
if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')):
|
|
return 'csv'
|
|
elif filename.endswith('.xlsx'):
|
|
return 'xlsx'
|
|
elif filename.endswith('.parquet'):
|
|
return 'parquet'
|
|
elif filename.endswith('.pdf'):
|
|
return 'pdf'
|
|
elif filename.endswith('.jpg'):
|
|
return 'jpg'
|
|
elif filename.endswith('.jpeg'):
|
|
return 'jpeg'
|
|
elif filename.endswith('.png'):
|
|
return 'png'
|
|
else:
|
|
raise ValueError("Unsupported file type.")
|
|
|
|
def read_file(filename):
|
|
"""Read the file based on its detected type."""
|
|
file_type = detect_file_type(filename)
|
|
|
|
if file_type == 'csv':
|
|
return pd.read_csv(filename, low_memory=False)
|
|
elif file_type == 'xlsx':
|
|
return pd.read_excel(filename)
|
|
elif file_type == 'parquet':
|
|
return pd.read_parquet(filename)
|
|
|
|
def ensure_output_folder_exists():
|
|
"""Checks if the 'output/' folder exists, creates it if not."""
|
|
|
|
folder_name = "output/"
|
|
|
|
if not os.path.exists(folder_name):
|
|
|
|
os.makedirs(folder_name)
|
|
print(f"Created the 'output/' folder.")
|
|
else:
|
|
print(f"The 'output/' folder already exists.")
|
|
|
|
def custom_regex_load(in_file):
|
|
'''
|
|
When file is loaded, update the column dropdown choices and write to relevant data states.
|
|
'''
|
|
|
|
custom_regex = pd.DataFrame()
|
|
|
|
if in_file:
|
|
|
|
file_list = [string.name for string in in_file]
|
|
|
|
regex_file_names = [string for string in file_list if "csv" in string.lower()]
|
|
if regex_file_names:
|
|
regex_file_name = regex_file_names[0]
|
|
custom_regex = pd.read_csv(regex_file_name, low_memory=False, header=None)
|
|
|
|
|
|
output_text = "Allow list file loaded."
|
|
print(output_text)
|
|
else:
|
|
error = "No allow list file provided."
|
|
print(error)
|
|
output_text = error
|
|
return error, custom_regex
|
|
|
|
return output_text, custom_regex
|
|
|
|
def put_columns_in_df(in_file):
|
|
new_choices = []
|
|
concat_choices = []
|
|
all_sheet_names = []
|
|
number_of_excel_files = 0
|
|
|
|
for file in in_file:
|
|
file_name = file.name
|
|
file_type = detect_file_type(file_name)
|
|
print("File type is:", file_type)
|
|
|
|
if file_type == 'xlsx':
|
|
number_of_excel_files += 1
|
|
new_choices = []
|
|
print("Running through all xlsx sheets")
|
|
anon_xlsx = pd.ExcelFile(file_name)
|
|
new_sheet_names = anon_xlsx.sheet_names
|
|
|
|
for sheet_name in new_sheet_names:
|
|
|
|
df = pd.read_excel(file_name, sheet_name=sheet_name)
|
|
|
|
|
|
print(f"Sheet Name: {sheet_name}")
|
|
print(df.head())
|
|
|
|
new_choices.extend(list(df.columns))
|
|
|
|
all_sheet_names.extend(new_sheet_names)
|
|
|
|
else:
|
|
df = read_file(file_name)
|
|
new_choices = list(df.columns)
|
|
|
|
concat_choices.extend(new_choices)
|
|
|
|
|
|
concat_choices = list(set(concat_choices))
|
|
|
|
if number_of_excel_files > 0:
|
|
return gr.Dropdown(choices=concat_choices, value=concat_choices), gr.Dropdown(choices=all_sheet_names, value=all_sheet_names, visible=True)
|
|
else:
|
|
return gr.Dropdown(choices=concat_choices, value=concat_choices), gr.Dropdown(visible=False)
|
|
|
|
|
|
def add_folder_to_path(folder_path: str):
|
|
'''
|
|
Check if a folder exists on your system. If so, get the absolute path and then add it to the system Path variable if it doesn't already exist.
|
|
'''
|
|
|
|
if os.path.exists(folder_path) and os.path.isdir(folder_path):
|
|
print(folder_path, "folder exists.")
|
|
|
|
|
|
absolute_path = os.path.abspath(folder_path)
|
|
|
|
current_path = os.environ['PATH']
|
|
if absolute_path not in current_path.split(os.pathsep):
|
|
full_path_extension = absolute_path + os.pathsep + current_path
|
|
os.environ['PATH'] = full_path_extension
|
|
|
|
else:
|
|
print(f"Directory {folder_path} already exists in PATH.")
|
|
else:
|
|
print(f"Folder not found at {folder_path} - not added to PATH")
|
|
|
|
|
|
def reveal_feedback_buttons():
|
|
return gr.Radio(visible=True, label="Please give some feedback about the results of the redaction. A reminder that the app is only expected to identify about 60% of personally identifiable information in a given (typed) document."), gr.Textbox(visible=True), gr.Button(visible=True), gr.Markdown(visible=True)
|
|
|
|
def wipe_logs(feedback_logs_loc, usage_logs_loc):
|
|
try:
|
|
os.remove(feedback_logs_loc)
|
|
except Exception as e:
|
|
print("Could not remove feedback logs file", e)
|
|
try:
|
|
os.remove(usage_logs_loc)
|
|
except Exception as e:
|
|
print("Could not remove usage logs file", e)
|
|
|
|
|
|
CUSTOM_HEADER = get_or_create_env_var('CUSTOM_HEADER', '')
|
|
print(f'CUSTOM_HEADER found')
|
|
|
|
|
|
CUSTOM_HEADER_VALUE = get_or_create_env_var('CUSTOM_HEADER_VALUE', '')
|
|
print(f'CUSTOM_HEADER_VALUE found')
|
|
|
|
async def get_connection_params(request: gr.Request):
|
|
base_folder = ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Request headers dictionary:", request.headers)
|
|
print("All host elements", request.client)
|
|
print("IP address:", request.client.host)
|
|
print("Query parameters:", dict(request.query_params))
|
|
|
|
|
|
print("Session hash:", request.session_hash)
|
|
|
|
if CUSTOM_HEADER and CUSTOM_HEADER_VALUE:
|
|
if CUSTOM_HEADER in request.headers:
|
|
supplied_custom_header_value = request.headers[CUSTOM_HEADER]
|
|
if supplied_custom_header_value == CUSTOM_HEADER_VALUE:
|
|
print("Custom header supplied and matches CUSTOM_HEADER_VALUE")
|
|
else:
|
|
print("Custom header value does not match expected value.")
|
|
raise ValueError("Custom header value does not match expected value.")
|
|
else:
|
|
print("Custom header value not found.")
|
|
raise ValueError("Custom header value not found.")
|
|
|
|
|
|
|
|
if request.username:
|
|
out_session_hash = request.username
|
|
base_folder = "user-files/"
|
|
print("Request username found:", out_session_hash)
|
|
|
|
elif 'x-cognito-id' in request.headers:
|
|
out_session_hash = request.headers['x-cognito-id']
|
|
base_folder = "user-files/"
|
|
print("Cognito ID found:", out_session_hash)
|
|
|
|
else:
|
|
out_session_hash = request.session_hash
|
|
base_folder = "temp-files/"
|
|
|
|
|
|
output_folder = base_folder + out_session_hash + "/"
|
|
|
|
|
|
|
|
return out_session_hash, output_folder, out_session_hash
|
|
|
|
|
|
def clean_unicode_text(text):
|
|
|
|
normalized_text = unicodedata.normalize('NFKC', text)
|
|
|
|
|
|
replacements = {
|
|
'β': "'", 'β': "'", 'β': '"', 'β': '"',
|
|
'β': '-', 'β': '-', 'β¦': '...', 'β’': '*',
|
|
}
|
|
|
|
|
|
for old_char, new_char in replacements.items():
|
|
normalized_text = normalized_text.replace(old_char, new_char)
|
|
|
|
|
|
|
|
|
|
cleaned_text = re.sub(r'[^\x00-\x7F]+', '', normalized_text)
|
|
|
|
return cleaned_text |