File size: 11,684 Bytes
641ff3e
eea5c07
7810536
 
eea5c07
ec98119
641ff3e
8235bbb
ec98119
 
 
 
 
 
 
 
 
 
 
8235bbb
12224f5
 
 
 
 
 
 
 
 
 
 
e2aae24
 
 
 
 
 
 
 
 
 
12224f5
 
 
 
 
 
 
390bef2
 
 
 
 
 
641ff3e
 
 
 
 
 
 
 
 
 
2a4b347
7810536
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2a4b347
 
 
 
 
 
 
 
 
 
 
 
6ea0852
 
 
 
 
 
 
390bef2
 
 
6ea0852
390bef2
 
 
 
 
6ea0852
390bef2
 
6ea0852
 
 
 
 
 
 
 
7810536
 
 
01c88c0
 
7810536
 
01c88c0
 
 
7810536
01c88c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7810536
01c88c0
 
7810536
 
01c88c0
 
 
 
 
2a4b347
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01c88c0
2a4b347
 
 
 
bc22fc4
8c33828
 
eea5c07
8c33828
 
 
 
 
 
 
 
 
 
 
bc22fc4
275c820
 
8235bbb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bc22fc4
8235bbb
 
 
 
bc22fc4
8235bbb
 
 
bc22fc4
8235bbb
eea5c07
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import os
import re
import gradio as gr
import pandas as pd
import unicodedata
from gradio_image_annotation import image_annotator

def reset_state_vars():
    return [], [], pd.DataFrame(), pd.DataFrame(), 0, "", image_annotator(
            label="Modify redaction boxes",
            label_list=["Redaction"],
            label_colors=[(0, 0, 0)],
            show_label=False,
            sources=None,#["upload"],
            show_clear_button=False,
            show_share_button=False,
            show_remove_button=False,
            interactive=False
        )

def get_or_create_env_var(var_name, default_value):
    # Get the environment variable if it exists
    value = os.environ.get(var_name)
    
    # If it doesn't exist, set it to the default value
    if value is None:
        os.environ[var_name] = default_value
        value = default_value
    
    return value


# Names for options labels
text_ocr_option = "Simple text analysis - docs with selectable text"
tesseract_ocr_option = "OCR analysis for documents without selectable text - best for typed text"
textract_option = "Complex image analysis - docs with handwriting/signatures (AWS Textract)"

local_pii_detector = "Local"
aws_pii_detector  = "AWS Comprehend"


# Retrieving or setting output folder
env_var_name = 'GRADIO_OUTPUT_FOLDER'
default_value = 'output/'

output_folder = get_or_create_env_var(env_var_name, default_value)
print(f'The value of {env_var_name} is {output_folder}')

def load_in_default_allow_list(allow_list_file_path):
    if isinstance(allow_list_file_path, str):
        allow_list_file_path = [allow_list_file_path]
    return allow_list_file_path


def get_file_path_end(file_path):
    # First, get the basename of the file (e.g., "example.txt" from "/path/to/example.txt")
    basename = os.path.basename(file_path)
    
    # Then, split the basename and its extension and return only the basename without the extension
    filename_without_extension, _ = os.path.splitext(basename)

    #print(filename_without_extension)
    
    return filename_without_extension

def detect_file_type(filename):
    """Detect the file type based on its extension."""
    if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')):
        return 'csv'
    elif filename.endswith('.xlsx'):
        return 'xlsx'
    elif filename.endswith('.parquet'):
        return 'parquet'
    elif filename.endswith('.pdf'):
        return 'pdf'
    elif filename.endswith('.jpg'):
        return 'jpg'
    elif filename.endswith('.jpeg'):
        return 'jpeg'
    elif filename.endswith('.png'):
        return 'png'
    else:
        raise ValueError("Unsupported file type.")

def read_file(filename):
    """Read the file based on its detected type."""
    file_type = detect_file_type(filename)
    
    if file_type == 'csv':
        return pd.read_csv(filename, low_memory=False)
    elif file_type == 'xlsx':
        return pd.read_excel(filename)
    elif file_type == 'parquet':
        return pd.read_parquet(filename)

def ensure_output_folder_exists():
    """Checks if the 'output/' folder exists, creates it if not."""

    folder_name = "output/"

    if not os.path.exists(folder_name):
        # Create the folder if it doesn't exist
        os.makedirs(folder_name)
        print(f"Created the 'output/' folder.")
    else:
        print(f"The 'output/' folder already exists.")

def custom_regex_load(in_file):
    '''

    When file is loaded, update the column dropdown choices and write to relevant data states.

    '''

    custom_regex = pd.DataFrame()

    if in_file:

        file_list = [string.name for string in in_file]

        regex_file_names = [string for string in file_list if "csv" in string.lower()]
        if regex_file_names:
            regex_file_name = regex_file_names[0]
            custom_regex = pd.read_csv(regex_file_name, low_memory=False, header=None)
            #regex_file_name_no_ext = get_file_path_end(regex_file_name)

            output_text = "Allow list file loaded."
            print(output_text)
    else:
        error = "No allow list file provided."
        print(error)
        output_text = error
        return error, custom_regex
       
    return output_text, custom_regex

def put_columns_in_df(in_file):
    new_choices = []
    concat_choices = []
    all_sheet_names = []
    number_of_excel_files = 0
    
    for file in in_file:
        file_name = file.name
        file_type = detect_file_type(file_name)
        print("File type is:", file_type)

        if file_type == 'xlsx':
            number_of_excel_files += 1
            new_choices = []
            print("Running through all xlsx sheets")
            anon_xlsx = pd.ExcelFile(file_name)
            new_sheet_names = anon_xlsx.sheet_names
            # Iterate through the sheet names
            for sheet_name in new_sheet_names:
                # Read each sheet into a DataFrame
                df = pd.read_excel(file_name, sheet_name=sheet_name)

                # Process the DataFrame (e.g., print its contents)
                print(f"Sheet Name: {sheet_name}")
                print(df.head())  # Print the first few rows

                new_choices.extend(list(df.columns))

            all_sheet_names.extend(new_sheet_names)

        else:
            df = read_file(file_name)
            new_choices = list(df.columns)

        concat_choices.extend(new_choices)
        
    # Drop duplicate columns
    concat_choices = list(set(concat_choices))

    if number_of_excel_files > 0:      
        return gr.Dropdown(choices=concat_choices, value=concat_choices), gr.Dropdown(choices=all_sheet_names, value=all_sheet_names, visible=True)
    else:
        return gr.Dropdown(choices=concat_choices, value=concat_choices), gr.Dropdown(visible=False)

# Following function is only relevant for locally-created executable files based on this app (when using pyinstaller it creates a _internal folder that contains tesseract and poppler. These need to be added to the system path to enable the app to run)
def add_folder_to_path(folder_path: str):
    '''

    Check if a folder exists on your system. If so, get the absolute path and then add it to the system Path variable if it doesn't already exist.

    '''

    if os.path.exists(folder_path) and os.path.isdir(folder_path):
        print(folder_path, "folder exists.")

        # Resolve relative path to absolute path
        absolute_path = os.path.abspath(folder_path)

        current_path = os.environ['PATH']
        if absolute_path not in current_path.split(os.pathsep):
            full_path_extension = absolute_path + os.pathsep + current_path
            os.environ['PATH'] = full_path_extension
            #print(f"Updated PATH with: ", full_path_extension)
        else:
            print(f"Directory {folder_path} already exists in PATH.")
    else:
        print(f"Folder not found at {folder_path} - not added to PATH")

# Upon running a process, the feedback buttons are revealed
def reveal_feedback_buttons():
    return gr.Radio(visible=True, label="Please give some feedback about the results of the redaction. A reminder that the app is only expected to identify about 60% of personally identifiable information in a given (typed) document."), gr.Textbox(visible=True), gr.Button(visible=True), gr.Markdown(visible=True)

def wipe_logs(feedback_logs_loc, usage_logs_loc):
    try:
        os.remove(feedback_logs_loc)
    except Exception as e:
        print("Could not remove feedback logs file", e)
    try:
        os.remove(usage_logs_loc)
    except Exception as e:
        print("Could not remove usage logs file", e)
    
async def get_connection_params(request: gr.Request):
    base_folder = ""

    #print("request user:", request.username)

    #request_data = await request.json()  # Parse JSON body
    #print("All request data:", request_data)
    #context_value = request_data.get('context') 
    #if 'context' in request_data:
    #     print("Request context dictionary:", request_data['context'])

    # print("Request headers dictionary:", request.headers)
    # print("All host elements", request.client)           
    # print("IP address:", request.client.host)
    # print("Query parameters:", dict(request.query_params))
    # To get the underlying FastAPI items you would need to use await and some fancy @ stuff for a live query: https://fastapi.tiangolo.com/vi/reference/request/
    #print("Request dictionary to object:", request.request.body())
    print("Session hash:", request.session_hash)

    # Retrieving or setting CUSTOM_CLOUDFRONT_HEADER
    CUSTOM_CLOUDFRONT_HEADER_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER', '')
    #print(f'The value of CUSTOM_CLOUDFRONT_HEADER is {CUSTOM_CLOUDFRONT_HEADER_var}')

    # Retrieving or setting CUSTOM_CLOUDFRONT_HEADER_VALUE
    CUSTOM_CLOUDFRONT_HEADER_VALUE_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER_VALUE', '')
    #print(f'The value of CUSTOM_CLOUDFRONT_HEADER_VALUE_var is {CUSTOM_CLOUDFRONT_HEADER_VALUE_var}')

    if CUSTOM_CLOUDFRONT_HEADER_var and CUSTOM_CLOUDFRONT_HEADER_VALUE_var:
        if CUSTOM_CLOUDFRONT_HEADER_var in request.headers:
            supplied_cloudfront_custom_value = request.headers[CUSTOM_CLOUDFRONT_HEADER_var]
            if supplied_cloudfront_custom_value == CUSTOM_CLOUDFRONT_HEADER_VALUE_var:
                print("Custom Cloudfront header found:", supplied_cloudfront_custom_value)
            else:
                raise(ValueError, "Custom Cloudfront header value does not match expected value.")

    # Get output save folder from 1 - username passed in from direct Cognito login, 2 - Cognito ID header passed through a Lambda authenticator, 3 - the session hash.

    if request.username:
        out_session_hash = request.username
        base_folder = "user-files/"
        print("Request username found:", out_session_hash)

    elif 'x-cognito-id' in request.headers:
        out_session_hash = request.headers['x-cognito-id']
        base_folder = "user-files/"
        print("Cognito ID found:", out_session_hash)

    else:
        out_session_hash = request.session_hash
        base_folder = "temp-files/"
        # print("Cognito ID not found. Using session hash as save folder:", out_session_hash)

    output_folder = base_folder + out_session_hash + "/"
    #if bucket_name:
    #    print("S3 output folder is: " + "s3://" + bucket_name + "/" + output_folder)

    return out_session_hash, output_folder, out_session_hash
    

def clean_unicode_text(text):
    # Step 1: Normalize unicode characters to decompose any special forms
    normalized_text = unicodedata.normalize('NFKC', text)

    # Step 2: Replace smart quotes and special punctuation with standard ASCII equivalents
    replacements = {
        'β€˜': "'", '’': "'", 'β€œ': '"', '”': '"', 
        '–': '-', 'β€”': '-', '…': '...', 'β€’': '*',
    }

    # Perform replacements
    for old_char, new_char in replacements.items():
        normalized_text = normalized_text.replace(old_char, new_char)

    # Step 3: Optionally remove non-ASCII characters if needed
    # This regex removes any remaining non-ASCII characters, if desired.
    # Comment this line if you want to keep all Unicode characters.
    cleaned_text = re.sub(r'[^\x00-\x7F]+', '', normalized_text)

    return cleaned_text