|
import os
|
|
import gradio as gr
|
|
import pandas as pd
|
|
|
|
def get_or_create_env_var(var_name, default_value):
|
|
|
|
value = os.environ.get(var_name)
|
|
|
|
|
|
if value is None:
|
|
os.environ[var_name] = default_value
|
|
value = default_value
|
|
|
|
return value
|
|
|
|
|
|
env_var_name = 'GRADIO_OUTPUT_FOLDER'
|
|
default_value = 'output/'
|
|
|
|
output_folder = get_or_create_env_var(env_var_name, default_value)
|
|
print(f'The value of {env_var_name} is {output_folder}')
|
|
|
|
def get_file_path_end(file_path):
|
|
|
|
basename = os.path.basename(file_path)
|
|
|
|
|
|
filename_without_extension, _ = os.path.splitext(basename)
|
|
|
|
|
|
|
|
return filename_without_extension
|
|
|
|
def detect_file_type(filename):
|
|
"""Detect the file type based on its extension."""
|
|
if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')):
|
|
return 'csv'
|
|
elif filename.endswith('.xlsx'):
|
|
return 'xlsx'
|
|
elif filename.endswith('.parquet'):
|
|
return 'parquet'
|
|
elif filename.endswith('.pdf'):
|
|
return 'pdf'
|
|
elif filename.endswith('.jpg'):
|
|
return 'jpg'
|
|
elif filename.endswith('.jpeg'):
|
|
return 'jpeg'
|
|
elif filename.endswith('.png'):
|
|
return 'png'
|
|
else:
|
|
raise ValueError("Unsupported file type.")
|
|
|
|
def read_file(filename):
|
|
"""Read the file based on its detected type."""
|
|
file_type = detect_file_type(filename)
|
|
|
|
if file_type == 'csv':
|
|
return pd.read_csv(filename, low_memory=False)
|
|
elif file_type == 'xlsx':
|
|
return pd.read_excel(filename)
|
|
elif file_type == 'parquet':
|
|
return pd.read_parquet(filename)
|
|
|
|
def ensure_output_folder_exists():
|
|
"""Checks if the 'output/' folder exists, creates it if not."""
|
|
|
|
folder_name = "output/"
|
|
|
|
if not os.path.exists(folder_name):
|
|
|
|
os.makedirs(folder_name)
|
|
print(f"Created the 'output/' folder.")
|
|
else:
|
|
print(f"The 'output/' folder already exists.")
|
|
|
|
def custom_regex_load(in_file):
|
|
'''
|
|
When file is loaded, update the column dropdown choices and write to relevant data states.
|
|
'''
|
|
|
|
custom_regex = pd.DataFrame()
|
|
|
|
file_list = [string.name for string in in_file]
|
|
|
|
regex_file_names = [string for string in file_list if "csv" in string.lower()]
|
|
if regex_file_names:
|
|
regex_file_name = regex_file_names[0]
|
|
custom_regex = pd.read_csv(regex_file_name, low_memory=False, header=None)
|
|
|
|
|
|
output_text = "Allow list file loaded."
|
|
print(output_text)
|
|
else:
|
|
error = "No allow list file provided."
|
|
print(error)
|
|
output_text = error
|
|
return error, custom_regex
|
|
|
|
return output_text, custom_regex
|
|
|
|
def put_columns_in_df(in_file):
|
|
new_choices = []
|
|
concat_choices = []
|
|
all_sheet_names = []
|
|
number_of_excel_files = 0
|
|
|
|
for file in in_file:
|
|
file_name = file.name
|
|
file_type = detect_file_type(file_name)
|
|
print("File type is:", file_type)
|
|
|
|
if file_type == 'xlsx':
|
|
number_of_excel_files += 1
|
|
new_choices = []
|
|
print("Running through all xlsx sheets")
|
|
anon_xlsx = pd.ExcelFile(file_name)
|
|
new_sheet_names = anon_xlsx.sheet_names
|
|
|
|
for sheet_name in new_sheet_names:
|
|
|
|
df = pd.read_excel(file_name, sheet_name=sheet_name)
|
|
|
|
|
|
print(f"Sheet Name: {sheet_name}")
|
|
print(df.head())
|
|
|
|
new_choices.extend(list(df.columns))
|
|
|
|
all_sheet_names.extend(new_sheet_names)
|
|
|
|
else:
|
|
df = read_file(file_name)
|
|
new_choices = list(df.columns)
|
|
|
|
concat_choices.extend(new_choices)
|
|
|
|
|
|
concat_choices = list(set(concat_choices))
|
|
|
|
if number_of_excel_files > 0:
|
|
return gr.Dropdown(choices=concat_choices, value=concat_choices), gr.Dropdown(choices=all_sheet_names, value=all_sheet_names, visible=True)
|
|
else:
|
|
return gr.Dropdown(choices=concat_choices, value=concat_choices), gr.Dropdown(visible=False)
|
|
|
|
|
|
def add_folder_to_path(folder_path: str):
|
|
'''
|
|
Check if a folder exists on your system. If so, get the absolute path and then add it to the system Path variable if it doesn't already exist.
|
|
'''
|
|
|
|
if os.path.exists(folder_path) and os.path.isdir(folder_path):
|
|
print(folder_path, "folder exists.")
|
|
|
|
|
|
absolute_path = os.path.abspath(folder_path)
|
|
|
|
current_path = os.environ['PATH']
|
|
if absolute_path not in current_path.split(os.pathsep):
|
|
full_path_extension = absolute_path + os.pathsep + current_path
|
|
os.environ['PATH'] = full_path_extension
|
|
|
|
else:
|
|
print(f"Directory {folder_path} already exists in PATH.")
|
|
else:
|
|
print(f"Folder not found at {folder_path} - not added to PATH")
|
|
|
|
|
|
def reveal_feedback_buttons():
|
|
return gr.Radio(visible=True), gr.Textbox(visible=True), gr.Button(visible=True), gr.Markdown(visible=True)
|
|
|
|
def wipe_logs(feedback_logs_loc, usage_logs_loc):
|
|
try:
|
|
os.remove(feedback_logs_loc)
|
|
except Exception as e:
|
|
print("Could not remove feedback logs file", e)
|
|
try:
|
|
os.remove(usage_logs_loc)
|
|
except Exception as e:
|
|
print("Could not remove usage logs file", e)
|
|
|
|
async def get_connection_params(request: gr.Request):
|
|
base_folder = ""
|
|
|
|
if request:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Session hash:", request.session_hash)
|
|
|
|
|
|
CUSTOM_CLOUDFRONT_HEADER_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER', '')
|
|
|
|
|
|
|
|
CUSTOM_CLOUDFRONT_HEADER_VALUE_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER_VALUE', '')
|
|
|
|
|
|
if CUSTOM_CLOUDFRONT_HEADER_var and CUSTOM_CLOUDFRONT_HEADER_VALUE_var:
|
|
if CUSTOM_CLOUDFRONT_HEADER_var in request.headers:
|
|
supplied_cloudfront_custom_value = request.headers[CUSTOM_CLOUDFRONT_HEADER_var]
|
|
if supplied_cloudfront_custom_value == CUSTOM_CLOUDFRONT_HEADER_VALUE_var:
|
|
print("Custom Cloudfront header found:", supplied_cloudfront_custom_value)
|
|
else:
|
|
raise(ValueError, "Custom Cloudfront header value does not match expected value.")
|
|
|
|
|
|
|
|
if request.username:
|
|
out_session_hash = request.username
|
|
base_folder = "user-files/"
|
|
print("Request username found:", out_session_hash)
|
|
|
|
elif 'x-cognito-id' in request.headers:
|
|
out_session_hash = request.headers['x-cognito-id']
|
|
base_folder = "user-files/"
|
|
print("Cognito ID found:", out_session_hash)
|
|
|
|
else:
|
|
out_session_hash = request.session_hash
|
|
base_folder = "temp-files/"
|
|
|
|
|
|
output_folder = base_folder + out_session_hash + "/"
|
|
|
|
|
|
|
|
return out_session_hash, output_folder, out_session_hash
|
|
else:
|
|
print("No session parameters found.")
|
|
return "","" |