import gradio as gr import pandas as pd import os import sqlite3 from filter import filter_data, sdg_column_mapping from tar import sdg_indicators_mapping, sdg_input_mapping ,sdg_formulas_mapping, sdg_progress_mapping from tqdm import tqdm import time import uuid from datetime import datetime # Load your dataset cwd = os.getcwd() csv_file_path = os.path.join(cwd,'Sustainable_Development_Report_2023_(with_indicators)__-2086263501583264136.csv') data = pd.read_csv(csv_file_path) # Function to create a SQLite database and table if it doesn't exist def sign_in(username, password): try: conn = sqlite3.connect('data_storage.db') c = conn.cursor() # Check if the user_sessions table exists c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_sessions'") if c.fetchone() is None: # If the table doesn't exist, create it c.execute(""" CREATE TABLE user_sessions ( user_id TEXT PRIMARY KEY, timestamp TEXT ) """) c.execute("SELECT * FROM user_info WHERE username=? AND password=?", (username, password)) user = c.fetchone() conn.close() if user: user_id = str(uuid.uuid4()) timestamp = datetime.now() conn = sqlite3.connect('data_storage.db') c = conn.cursor() c.execute("INSERT INTO user_sessions (user_id, timestamp) VALUES (?, ?)", (user_id, timestamp)) conn.commit() conn.close() return "Welcome, " + username else: return "Invalid username or password" except Exception as e: return str(e) def sign_up(username, password, email): try: conn = sqlite3.connect('data_storage.db') c = conn.cursor() # Check if the user_info table exists c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_info'") if c.fetchone() is None: # If the table doesn't exist, create it c.execute(""" CREATE TABLE user_info ( username TEXT PRIMARY KEY, password TEXT, email TEXT ) """) c.execute("INSERT INTO user_info (username, password, email) VALUES (?, ?, ?)", (username, password, email)) conn.commit() conn.close() return "Sign up successful" except Exception as e: return str(e) def save_inputs(indicator, names, inputs,user_id, timestamp): # Connect to the SQLite database conn = sqlite3.connect('data_storage.db') c = conn.cursor() timestamp = time.time() # Create table if it doesn't exist c.execute('''CREATE TABLE IF NOT EXISTS inputs (indicator text, names text, inputs text, user_id, timestamp)''') # Insert a row of data c.execute("INSERT INTO inputs VALUES (?, ?, ?, ?,?)", (indicator, str(names), str(inputs), user_id, timestamp)) # Save (commit) the changes conn.commit() # Close the connection conn.close() def create_database(): conn = sqlite3.connect("data_storage.db") cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS user_data ( user_id TEXT PRIMARY KEY, username TEXT, country_name TEXT, sdg_number INTEGER, result TEXT, indicator TEXT, names TEXT, input_first REAL, input_2nd REAL, input_3rd REAL, input_4th REAL, timestamp REAL ) """) conn.commit() conn.close() def filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp): try: # Convert the selected SDG number to an integer sdg_number = int(sdg_number) # Get the list of columns based on the selected SDG number selected_columns = ['Name'] + sdg_column_mapping.get(sdg_number, []) # Filter data based on country and selected columns selected_data = data[(data['Name'] == country_name)][selected_columns] # Convert the selected data to an HTML table with more rows html_table = selected_data.to_html(index=False) # Wrap the HTML table in a div with a fixed height and scrollable overflow scrollable_html_table = f'
{html_table}
' # Get the current timestamp timestamp = time.time() timestamp = datetime.now() # Store the data in the SQLite database # Store the data in the SQLite database conn = sqlite3.connect("data_storage.db") cursor = conn.cursor() cursor.execute("INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (username, country_name, sdg_number, str(scrollable_html_table), str(indicator), str(names), input_first, input_2nd, input_3rd, input_4th, user_id, timestamp)) conn.commit() conn.close() return scrollable_html_table except ValueError: return "Please select a valid SDG number.", [] except Exception as e: return f"An error occurred: {e}", [] def gradio_app(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp): # Call the filter_and_store_data function return filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp) def gradios(username, country_name, sdg_number, indicator, names, input_first, input_2nd, input_3rd, input_4th): # Get the formula for the selected indicator formula = sdg_formulas_mapping[indicator] # Calculate the result using the formula result = formula(input_first, input_2nd, input_3rd, input_4th) # Store the result in the database conn = sqlite3.connect("data_storage.db") cursor = conn.cursor() cursor.execute(""" INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th)) conn.commit() conn.close() return f"Result: {result}" def user_latest_data(): conn = sqlite3.connect("data_storage.db") df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn) conn.close() # Convert numeric values to strings return str(df['username'].values[0]) def user_late_data(): conn = sqlite3.connect("data_storage.db") df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn) conn.close() # Check if the DataFrame is not empty and contains the 'username' column if not df.empty and 'username' in df.columns and not df['username'].empty: return str(df['username'].values[0]) else: return "No username available" def get_nam(indicator): if indicator in sdg_input_mapping: names = [item['name'] for item in sdg_input_mapping[indicator]] return names def update_nam_dropdown(indicator): return get_nam(indicator) def add_indicator(sdg_number): max_inputs = 5 def variable_output2(k): k = int(k) return [gr.Number(visible=True)]*k + [gr.Number(visible=False)]*(max_inputs-k) with gr.Row(variant='panel') as row: indicator_dropdown2 = gr.Dropdown(choices=list(sdg_input_mapping.keys()), interactive=True, type="value", label="Please choose an Indicator") names_dropdown = gr.Dropdown(choices=[update_nam_dropdown(indicator_dropdown2.choices[0])], allow_custom_value=True,multiselect=True, interactive=True, type="value", label="The indicators Inputs needed respectively") indicator_dropdown2.change(fn=update_nam_dropdown, inputs=indicator_dropdown2, outputs=names_dropdown) with gr.Column(variant='panel'): u = gr.Slider(1, max_inputs, value=max_inputs, step=1, label="How many inputs to fill:") textboxes = [] for i in range(max_inputs): t = gr.Number(label=f"Please fill with the input number {i}", interactive=True) textboxes.append(t) u.change(variable_output2, u, textboxes) save_button = gr.Button("Save") #save_button.click(save_inputs, inputs=[indicator_dropdown2, names_dropdown, t], outputs=[]) def save_inputs_wrapper(indicator, names, *textboxes): return save_inputs(indicator, names, [str(t) if isinstance(t, float) else str(t.value) for t in textboxes]) save_button.click(save_inputs_wrapper, inputs=[indicator_dropdown2, names_dropdown, *textboxes], outputs=[]) return row max_indicators = 15 def variable_outputs(k): k = int(k) num_rows = sdg_progress_mapping.get(k, 0) return [gr.Textbox(visible=True)]*num_rows + [gr.Textbox(visible=False)]*(max_indicators-num_rows) def get_new_ind(sdg_number): if sdg_number in sdg_indicators_mapping: ind = sdg_indicators_mapping.get(sdg_number, []) return list(ind) def update_indicator_dropdown(sdg_number): return get_new_ind(sdg_number) def fetch_all_data(): conn = sqlite3.connect("data_storage.db") # Fetch user data user_df = pd.read_sql_query("SELECT * FROM user_info", conn) latest_user_row = user_df.iloc[[-1]] user_data = str(latest_user_row['username'].values[0]) # Fetch user data user_data_df = pd.read_sql_query("SELECT * FROM user_data", conn) latest_user_data_row = user_data_df.iloc[[-1]] user_data_values = ( str(latest_user_data_row['country_name'].values[0]), str(latest_user_data_row['sdg_number'].values[0]), str(latest_user_data_row['result'].values[0]) ) # Fetch input data input_df = pd.read_sql_query("SELECT * FROM inputs", conn) latest_input_row = input_df.iloc[[-1]] input_data = ( str(latest_input_row['indicator'].values[0]), str(latest_input_row['names'].values[0]), str(latest_input_row['inputs'].values[0]) ) conn.close() return [user_data] + list(user_data_values) + list(input_data) def calculate_indicator(user_id, timestamp): try: conn = sqlite3.connect("data_storage.db") # Fetch user data based on user_id and timestamp user_data_df = pd.read_sql_query("SELECT * FROM user_data WHERE user_id=? AND timestamp=?", conn, params=(user_id, timestamp)) if user_data_df.empty: return "No data found for the specified user_id and timestamp" # Extract relevant information indicator = user_data_df['indicator'].values[0] names = user_data_df['names'].values[0] input_first = user_data_df['input_first'].values[0] # Perform the calculation if indicator in sdg_formulas_mapping: formula = sdg_formulas_mapping[indicator] result = formula(input_first) return f"Result for {indicator}: {result}" else: return f"Indicator {indicator} not found in the dictionary" except Exception as e: return f"An error occurred: {e}"