|
import os |
|
import gradio as gr |
|
import pandas as pd |
|
|
|
def get_or_create_env_var(var_name, default_value): |
|
|
|
value = os.environ.get(var_name) |
|
|
|
|
|
if value is None: |
|
os.environ[var_name] = default_value |
|
value = default_value |
|
|
|
return value |
|
|
|
|
|
env_var_name = 'GRADIO_OUTPUT_FOLDER' |
|
default_value = 'output/' |
|
|
|
output_folder = get_or_create_env_var(env_var_name, default_value) |
|
print(f'The value of {env_var_name} is {output_folder}') |
|
|
|
def get_file_path_with_extension(file_path): |
|
|
|
basename = os.path.basename(file_path) |
|
|
|
|
|
return basename |
|
|
|
def get_file_path_end(file_path): |
|
|
|
basename = os.path.basename(file_path) |
|
|
|
|
|
filename_without_extension, _ = os.path.splitext(basename) |
|
|
|
|
|
|
|
return filename_without_extension |
|
|
|
def detect_file_type(filename): |
|
"""Detect the file type based on its extension.""" |
|
if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')): |
|
return 'csv' |
|
elif filename.endswith('.xlsx'): |
|
return 'xlsx' |
|
elif filename.endswith('.parquet'): |
|
return 'parquet' |
|
elif filename.endswith('.pdf'): |
|
return 'pdf' |
|
elif filename.endswith('.jpg'): |
|
return 'jpg' |
|
elif filename.endswith('.jpeg'): |
|
return 'jpeg' |
|
elif filename.endswith('.png'): |
|
return 'png' |
|
else: |
|
raise ValueError("Unsupported file type.") |
|
|
|
def read_file(filename): |
|
"""Read the file based on its detected type.""" |
|
file_type = detect_file_type(filename) |
|
|
|
if file_type == 'csv': |
|
return pd.read_csv(filename, low_memory=False) |
|
elif file_type == 'xlsx': |
|
return pd.read_excel(filename) |
|
elif file_type == 'parquet': |
|
return pd.read_parquet(filename) |
|
|
|
def ensure_output_folder_exists(): |
|
"""Checks if the 'output/' folder exists, creates it if not.""" |
|
|
|
folder_name = "output/" |
|
|
|
if not os.path.exists(folder_name): |
|
|
|
os.makedirs(folder_name) |
|
print(f"Created the 'output/' folder.") |
|
else: |
|
print(f"The 'output/' folder already exists.") |
|
|
|
def put_columns_in_df(in_file): |
|
new_choices = [] |
|
concat_choices = [] |
|
all_sheet_names = [] |
|
number_of_excel_files = 0 |
|
|
|
for file in in_file: |
|
file_name = file.name |
|
file_type = detect_file_type(file_name) |
|
|
|
|
|
file_end = get_file_path_with_extension(file_name) |
|
|
|
if file_type == 'xlsx': |
|
number_of_excel_files += 1 |
|
new_choices = [] |
|
print("Running through all xlsx sheets") |
|
anon_xlsx = pd.ExcelFile(file_name) |
|
new_sheet_names = anon_xlsx.sheet_names |
|
|
|
for sheet_name in new_sheet_names: |
|
|
|
df = pd.read_excel(file_name, sheet_name=sheet_name) |
|
|
|
|
|
print(f"Sheet Name: {sheet_name}") |
|
print(df.head()) |
|
|
|
new_choices.extend(list(df.columns)) |
|
|
|
all_sheet_names.extend(new_sheet_names) |
|
|
|
else: |
|
df = read_file(file_name) |
|
new_choices = list(df.columns) |
|
|
|
concat_choices.extend(new_choices) |
|
|
|
|
|
concat_choices = list(set(concat_choices)) |
|
|
|
if number_of_excel_files > 0: |
|
return gr.Dropdown(choices=concat_choices, value=concat_choices[0]), gr.Dropdown(choices=all_sheet_names, value=all_sheet_names[0], visible=True), file_end |
|
else: |
|
return gr.Dropdown(choices=concat_choices, value=concat_choices[0]), gr.Dropdown(visible=False), file_end |
|
|
|
|
|
def add_folder_to_path(folder_path: str): |
|
''' |
|
Check if a folder exists on your system. If so, get the absolute path and then add it to the system Path variable if it doesn't already exist. |
|
''' |
|
|
|
if os.path.exists(folder_path) and os.path.isdir(folder_path): |
|
print(folder_path, "folder exists.") |
|
|
|
|
|
absolute_path = os.path.abspath(folder_path) |
|
|
|
current_path = os.environ['PATH'] |
|
if absolute_path not in current_path.split(os.pathsep): |
|
full_path_extension = absolute_path + os.pathsep + current_path |
|
os.environ['PATH'] = full_path_extension |
|
|
|
else: |
|
print(f"Directory {folder_path} already exists in PATH.") |
|
else: |
|
print(f"Folder not found at {folder_path} - not added to PATH") |
|
|
|
|
|
def reveal_feedback_buttons(): |
|
return gr.Radio(visible=True), gr.Textbox(visible=True), gr.Button(visible=True), gr.Markdown(visible=True) |
|
|
|
def wipe_logs(feedback_logs_loc, usage_logs_loc): |
|
try: |
|
os.remove(feedback_logs_loc) |
|
except Exception as e: |
|
print("Could not remove feedback logs file", e) |
|
try: |
|
os.remove(usage_logs_loc) |
|
except Exception as e: |
|
print("Could not remove usage logs file", e) |
|
|
|
|
|
|
|
|
|
async def get_connection_params(request: gr.Request): |
|
base_folder = "" |
|
|
|
if request: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Session hash:", request.session_hash) |
|
|
|
|
|
CUSTOM_CLOUDFRONT_HEADER_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER', '') |
|
|
|
|
|
|
|
CUSTOM_CLOUDFRONT_HEADER_VALUE_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER_VALUE', '') |
|
|
|
|
|
if CUSTOM_CLOUDFRONT_HEADER_var and CUSTOM_CLOUDFRONT_HEADER_VALUE_var: |
|
if CUSTOM_CLOUDFRONT_HEADER_var in request.headers: |
|
supplied_cloudfront_custom_value = request.headers[CUSTOM_CLOUDFRONT_HEADER_var] |
|
if supplied_cloudfront_custom_value == CUSTOM_CLOUDFRONT_HEADER_VALUE_var: |
|
print("Custom Cloudfront header found:", supplied_cloudfront_custom_value) |
|
else: |
|
raise(ValueError, "Custom Cloudfront header value does not match expected value.") |
|
|
|
|
|
|
|
if request.username: |
|
out_session_hash = request.username |
|
base_folder = "user-files/" |
|
print("Request username found:", out_session_hash) |
|
|
|
elif 'x-cognito-id' in request.headers: |
|
out_session_hash = request.headers['x-cognito-id'] |
|
base_folder = "user-files/" |
|
print("Cognito ID found:", out_session_hash) |
|
|
|
else: |
|
out_session_hash = request.session_hash |
|
base_folder = "temp-files/" |
|
|
|
|
|
output_folder = base_folder + out_session_hash + "/" |
|
|
|
|
|
|
|
return out_session_hash, output_folder, out_session_hash |
|
else: |
|
print("No session parameters found.") |
|
return "","" |