File size: 11,684 Bytes
641ff3e eea5c07 7810536 eea5c07 ec98119 641ff3e 8235bbb ec98119 8235bbb 12224f5 e2aae24 12224f5 390bef2 641ff3e 2a4b347 7810536 2a4b347 6ea0852 390bef2 6ea0852 390bef2 6ea0852 390bef2 6ea0852 7810536 01c88c0 7810536 01c88c0 7810536 01c88c0 7810536 01c88c0 7810536 01c88c0 2a4b347 01c88c0 2a4b347 bc22fc4 8c33828 eea5c07 8c33828 bc22fc4 275c820 8235bbb bc22fc4 8235bbb bc22fc4 8235bbb bc22fc4 8235bbb eea5c07 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
import os
import re
import gradio as gr
import pandas as pd
import unicodedata
from gradio_image_annotation import image_annotator
def reset_state_vars():
return [], [], pd.DataFrame(), pd.DataFrame(), 0, "", image_annotator(
label="Modify redaction boxes",
label_list=["Redaction"],
label_colors=[(0, 0, 0)],
show_label=False,
sources=None,#["upload"],
show_clear_button=False,
show_share_button=False,
show_remove_button=False,
interactive=False
)
def get_or_create_env_var(var_name, default_value):
# Get the environment variable if it exists
value = os.environ.get(var_name)
# If it doesn't exist, set it to the default value
if value is None:
os.environ[var_name] = default_value
value = default_value
return value
# Names for options labels
text_ocr_option = "Simple text analysis - docs with selectable text"
tesseract_ocr_option = "OCR analysis for documents without selectable text - best for typed text"
textract_option = "Complex image analysis - docs with handwriting/signatures (AWS Textract)"
local_pii_detector = "Local"
aws_pii_detector = "AWS Comprehend"
# Retrieving or setting output folder
env_var_name = 'GRADIO_OUTPUT_FOLDER'
default_value = 'output/'
output_folder = get_or_create_env_var(env_var_name, default_value)
print(f'The value of {env_var_name} is {output_folder}')
def load_in_default_allow_list(allow_list_file_path):
if isinstance(allow_list_file_path, str):
allow_list_file_path = [allow_list_file_path]
return allow_list_file_path
def get_file_path_end(file_path):
# First, get the basename of the file (e.g., "example.txt" from "/path/to/example.txt")
basename = os.path.basename(file_path)
# Then, split the basename and its extension and return only the basename without the extension
filename_without_extension, _ = os.path.splitext(basename)
#print(filename_without_extension)
return filename_without_extension
def detect_file_type(filename):
"""Detect the file type based on its extension."""
if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')):
return 'csv'
elif filename.endswith('.xlsx'):
return 'xlsx'
elif filename.endswith('.parquet'):
return 'parquet'
elif filename.endswith('.pdf'):
return 'pdf'
elif filename.endswith('.jpg'):
return 'jpg'
elif filename.endswith('.jpeg'):
return 'jpeg'
elif filename.endswith('.png'):
return 'png'
else:
raise ValueError("Unsupported file type.")
def read_file(filename):
"""Read the file based on its detected type."""
file_type = detect_file_type(filename)
if file_type == 'csv':
return pd.read_csv(filename, low_memory=False)
elif file_type == 'xlsx':
return pd.read_excel(filename)
elif file_type == 'parquet':
return pd.read_parquet(filename)
def ensure_output_folder_exists():
"""Checks if the 'output/' folder exists, creates it if not."""
folder_name = "output/"
if not os.path.exists(folder_name):
# Create the folder if it doesn't exist
os.makedirs(folder_name)
print(f"Created the 'output/' folder.")
else:
print(f"The 'output/' folder already exists.")
def custom_regex_load(in_file):
'''
When file is loaded, update the column dropdown choices and write to relevant data states.
'''
custom_regex = pd.DataFrame()
if in_file:
file_list = [string.name for string in in_file]
regex_file_names = [string for string in file_list if "csv" in string.lower()]
if regex_file_names:
regex_file_name = regex_file_names[0]
custom_regex = pd.read_csv(regex_file_name, low_memory=False, header=None)
#regex_file_name_no_ext = get_file_path_end(regex_file_name)
output_text = "Allow list file loaded."
print(output_text)
else:
error = "No allow list file provided."
print(error)
output_text = error
return error, custom_regex
return output_text, custom_regex
def put_columns_in_df(in_file):
new_choices = []
concat_choices = []
all_sheet_names = []
number_of_excel_files = 0
for file in in_file:
file_name = file.name
file_type = detect_file_type(file_name)
print("File type is:", file_type)
if file_type == 'xlsx':
number_of_excel_files += 1
new_choices = []
print("Running through all xlsx sheets")
anon_xlsx = pd.ExcelFile(file_name)
new_sheet_names = anon_xlsx.sheet_names
# Iterate through the sheet names
for sheet_name in new_sheet_names:
# Read each sheet into a DataFrame
df = pd.read_excel(file_name, sheet_name=sheet_name)
# Process the DataFrame (e.g., print its contents)
print(f"Sheet Name: {sheet_name}")
print(df.head()) # Print the first few rows
new_choices.extend(list(df.columns))
all_sheet_names.extend(new_sheet_names)
else:
df = read_file(file_name)
new_choices = list(df.columns)
concat_choices.extend(new_choices)
# Drop duplicate columns
concat_choices = list(set(concat_choices))
if number_of_excel_files > 0:
return gr.Dropdown(choices=concat_choices, value=concat_choices), gr.Dropdown(choices=all_sheet_names, value=all_sheet_names, visible=True)
else:
return gr.Dropdown(choices=concat_choices, value=concat_choices), gr.Dropdown(visible=False)
# Following function is only relevant for locally-created executable files based on this app (when using pyinstaller it creates a _internal folder that contains tesseract and poppler. These need to be added to the system path to enable the app to run)
def add_folder_to_path(folder_path: str):
'''
Check if a folder exists on your system. If so, get the absolute path and then add it to the system Path variable if it doesn't already exist.
'''
if os.path.exists(folder_path) and os.path.isdir(folder_path):
print(folder_path, "folder exists.")
# Resolve relative path to absolute path
absolute_path = os.path.abspath(folder_path)
current_path = os.environ['PATH']
if absolute_path not in current_path.split(os.pathsep):
full_path_extension = absolute_path + os.pathsep + current_path
os.environ['PATH'] = full_path_extension
#print(f"Updated PATH with: ", full_path_extension)
else:
print(f"Directory {folder_path} already exists in PATH.")
else:
print(f"Folder not found at {folder_path} - not added to PATH")
# Upon running a process, the feedback buttons are revealed
def reveal_feedback_buttons():
return gr.Radio(visible=True, label="Please give some feedback about the results of the redaction. A reminder that the app is only expected to identify about 60% of personally identifiable information in a given (typed) document."), gr.Textbox(visible=True), gr.Button(visible=True), gr.Markdown(visible=True)
def wipe_logs(feedback_logs_loc, usage_logs_loc):
try:
os.remove(feedback_logs_loc)
except Exception as e:
print("Could not remove feedback logs file", e)
try:
os.remove(usage_logs_loc)
except Exception as e:
print("Could not remove usage logs file", e)
async def get_connection_params(request: gr.Request):
base_folder = ""
#print("request user:", request.username)
#request_data = await request.json() # Parse JSON body
#print("All request data:", request_data)
#context_value = request_data.get('context')
#if 'context' in request_data:
# print("Request context dictionary:", request_data['context'])
# print("Request headers dictionary:", request.headers)
# print("All host elements", request.client)
# print("IP address:", request.client.host)
# print("Query parameters:", dict(request.query_params))
# To get the underlying FastAPI items you would need to use await and some fancy @ stuff for a live query: https://fastapi.tiangolo.com/vi/reference/request/
#print("Request dictionary to object:", request.request.body())
print("Session hash:", request.session_hash)
# Retrieving or setting CUSTOM_CLOUDFRONT_HEADER
CUSTOM_CLOUDFRONT_HEADER_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER', '')
#print(f'The value of CUSTOM_CLOUDFRONT_HEADER is {CUSTOM_CLOUDFRONT_HEADER_var}')
# Retrieving or setting CUSTOM_CLOUDFRONT_HEADER_VALUE
CUSTOM_CLOUDFRONT_HEADER_VALUE_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER_VALUE', '')
#print(f'The value of CUSTOM_CLOUDFRONT_HEADER_VALUE_var is {CUSTOM_CLOUDFRONT_HEADER_VALUE_var}')
if CUSTOM_CLOUDFRONT_HEADER_var and CUSTOM_CLOUDFRONT_HEADER_VALUE_var:
if CUSTOM_CLOUDFRONT_HEADER_var in request.headers:
supplied_cloudfront_custom_value = request.headers[CUSTOM_CLOUDFRONT_HEADER_var]
if supplied_cloudfront_custom_value == CUSTOM_CLOUDFRONT_HEADER_VALUE_var:
print("Custom Cloudfront header found:", supplied_cloudfront_custom_value)
else:
raise(ValueError, "Custom Cloudfront header value does not match expected value.")
# Get output save folder from 1 - username passed in from direct Cognito login, 2 - Cognito ID header passed through a Lambda authenticator, 3 - the session hash.
if request.username:
out_session_hash = request.username
base_folder = "user-files/"
print("Request username found:", out_session_hash)
elif 'x-cognito-id' in request.headers:
out_session_hash = request.headers['x-cognito-id']
base_folder = "user-files/"
print("Cognito ID found:", out_session_hash)
else:
out_session_hash = request.session_hash
base_folder = "temp-files/"
# print("Cognito ID not found. Using session hash as save folder:", out_session_hash)
output_folder = base_folder + out_session_hash + "/"
#if bucket_name:
# print("S3 output folder is: " + "s3://" + bucket_name + "/" + output_folder)
return out_session_hash, output_folder, out_session_hash
def clean_unicode_text(text):
# Step 1: Normalize unicode characters to decompose any special forms
normalized_text = unicodedata.normalize('NFKC', text)
# Step 2: Replace smart quotes and special punctuation with standard ASCII equivalents
replacements = {
'β': "'", 'β': "'", 'β': '"', 'β': '"',
'β': '-', 'β': '-', 'β¦': '...', 'β’': '*',
}
# Perform replacements
for old_char, new_char in replacements.items():
normalized_text = normalized_text.replace(old_char, new_char)
# Step 3: Optionally remove non-ASCII characters if needed
# This regex removes any remaining non-ASCII characters, if desired.
# Comment this line if you want to keep all Unicode characters.
cleaned_text = re.sub(r'[^\x00-\x7F]+', '', normalized_text)
return cleaned_text |