import gradio as gr
import pandas as pd
import os
import sqlite3
from filter import filter_data, sdg_column_mapping
from tar import sdg_indicators_mapping, sdg_input_mapping ,sdg_formulas_mapping, sdg_progress_mapping
from tqdm import tqdm
import time
import uuid
from datetime import datetime
# Load your dataset
cwd = os.getcwd()
csv_file_path = os.path.join(cwd,'Sustainable_Development_Report_2023_(with_indicators)__-2086263501583264136.csv')
data = pd.read_csv(csv_file_path)
# Function to create a SQLite database and table if it doesn't exist
def sign_in(username, password):
try:
conn = sqlite3.connect('data_storage.db')
c = conn.cursor()
# Check if the user_sessions table exists
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_sessions'")
if c.fetchone() is None:
# If the table doesn't exist, create it
c.execute("""
CREATE TABLE user_sessions (
user_id TEXT PRIMARY KEY,
timestamp TEXT
)
""")
c.execute("SELECT * FROM user_info WHERE username=? AND password=?", (username, password))
user = c.fetchone()
conn.close()
if user:
user_id = str(uuid.uuid4())
timestamp = datetime.now()
conn = sqlite3.connect('data_storage.db')
c = conn.cursor()
c.execute("INSERT INTO user_sessions (user_id, timestamp) VALUES (?, ?)", (user_id, timestamp))
conn.commit()
conn.close()
return "Welcome, " + username
else:
return "Invalid username or password"
except Exception as e:
return str(e)
def sign_up(username, password, email):
try:
conn = sqlite3.connect('data_storage.db')
c = conn.cursor()
# Check if the user_info table exists
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_info'")
if c.fetchone() is None:
# If the table doesn't exist, create it
c.execute("""
CREATE TABLE user_info (
username TEXT PRIMARY KEY,
password TEXT,
email TEXT
)
""")
c.execute("INSERT INTO user_info (username, password, email) VALUES (?, ?, ?)", (username, password, email))
conn.commit()
conn.close()
return "Sign up successful"
except Exception as e:
return str(e)
def save_inputs(indicator, names, inputs,user_id, timestamp):
# Connect to the SQLite database
conn = sqlite3.connect('data_storage.db')
c = conn.cursor()
timestamp = time.time()
# Create table if it doesn't exist
c.execute('''CREATE TABLE IF NOT EXISTS inputs
(indicator text, names text, inputs text, user_id, timestamp)''')
# Insert a row of data
c.execute("INSERT INTO inputs VALUES (?, ?, ?, ?,?)", (indicator, str(names), str(inputs), user_id, timestamp))
# Save (commit) the changes
conn.commit()
# Close the connection
conn.close()
def create_database():
conn = sqlite3.connect("data_storage.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_data (
user_id TEXT PRIMARY KEY,
username TEXT,
country_name TEXT,
sdg_number INTEGER,
result TEXT,
indicator TEXT,
names TEXT,
input_first REAL,
input_2nd REAL,
input_3rd REAL,
input_4th REAL,
timestamp REAL
)
""")
conn.commit()
conn.close()
def filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp):
try:
# Convert the selected SDG number to an integer
sdg_number = int(sdg_number)
# Get the list of columns based on the selected SDG number
selected_columns = ['Name'] + sdg_column_mapping.get(sdg_number, [])
# Filter data based on country and selected columns
selected_data = data[(data['Name'] == country_name)][selected_columns]
# Convert the selected data to an HTML table with more rows
html_table = selected_data.to_html(index=False)
# Wrap the HTML table in a div with a fixed height and scrollable overflow
scrollable_html_table = f'
{html_table}
'
# Get the current timestamp
timestamp = time.time()
timestamp = datetime.now()
# Store the data in the SQLite database
# Store the data in the SQLite database
conn = sqlite3.connect("data_storage.db")
cursor = conn.cursor()
cursor.execute("INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(username, country_name, sdg_number, str(scrollable_html_table), str(indicator), str(names), input_first, input_2nd, input_3rd, input_4th, user_id, timestamp))
conn.commit()
conn.close()
return scrollable_html_table
except ValueError:
return "Please select a valid SDG number.", []
except Exception as e:
return f"An error occurred: {e}", []
def gradio_app(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp):
# Call the filter_and_store_data function
return filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp)
def gradios(username, country_name, sdg_number, indicator, names, input_first, input_2nd, input_3rd, input_4th):
# Get the formula for the selected indicator
formula = sdg_formulas_mapping[indicator]
# Calculate the result using the formula
result = formula(input_first, input_2nd, input_3rd, input_4th)
# Store the result in the database
conn = sqlite3.connect("data_storage.db")
cursor = conn.cursor()
cursor.execute("""
INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th))
conn.commit()
conn.close()
return f"Result: {result}"
def user_latest_data():
conn = sqlite3.connect("data_storage.db")
df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn)
conn.close()
# Convert numeric values to strings
return str(df['username'].values[0])
def user_late_data():
conn = sqlite3.connect("data_storage.db")
df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn)
conn.close()
# Check if the DataFrame is not empty and contains the 'username' column
if not df.empty and 'username' in df.columns and not df['username'].empty:
return str(df['username'].values[0])
else:
return "No username available"
def get_nam(indicator):
if indicator in sdg_input_mapping:
names = [item['name'] for item in sdg_input_mapping[indicator]]
return names
def update_nam_dropdown(indicator):
return get_nam(indicator)
def add_indicator(sdg_number):
max_inputs = 5
def variable_output2(k):
k = int(k)
return [gr.Number(visible=True)]*k + [gr.Number(visible=False)]*(max_inputs-k)
with gr.Row(variant='panel') as row:
indicator_dropdown2 = gr.Dropdown(choices=list(sdg_input_mapping.keys()), interactive=True, type="value", label="Please choose an Indicator")
names_dropdown = gr.Dropdown(choices=[update_nam_dropdown(indicator_dropdown2.choices[0])], allow_custom_value=True,multiselect=True, interactive=True, type="value", label="The indicators Inputs needed respectively")
indicator_dropdown2.change(fn=update_nam_dropdown, inputs=indicator_dropdown2, outputs=names_dropdown)
with gr.Column(variant='panel'):
u = gr.Slider(1, max_inputs, value=max_inputs, step=1, label="How many inputs to fill:")
textboxes = []
for i in range(max_inputs):
t = gr.Number(label=f"Please fill with the input number {i}", interactive=True)
textboxes.append(t)
u.change(variable_output2, u, textboxes)
save_button = gr.Button("Save")
#save_button.click(save_inputs, inputs=[indicator_dropdown2, names_dropdown, t], outputs=[])
def save_inputs_wrapper(indicator, names, *textboxes):
return save_inputs(indicator, names, [str(t) if isinstance(t, float) else str(t.value) for t in textboxes])
save_button.click(save_inputs_wrapper, inputs=[indicator_dropdown2, names_dropdown, *textboxes], outputs=[])
return row
max_indicators = 15
def variable_outputs(k):
k = int(k)
num_rows = sdg_progress_mapping.get(k, 0)
return [gr.Textbox(visible=True)]*num_rows + [gr.Textbox(visible=False)]*(max_indicators-num_rows)
def get_new_ind(sdg_number):
if sdg_number in sdg_indicators_mapping:
ind = sdg_indicators_mapping.get(sdg_number, [])
return list(ind)
def update_indicator_dropdown(sdg_number):
return get_new_ind(sdg_number)
def fetch_all_data():
conn = sqlite3.connect("data_storage.db")
# Fetch user data
user_df = pd.read_sql_query("SELECT * FROM user_info", conn)
latest_user_row = user_df.iloc[[-1]]
user_data = str(latest_user_row['username'].values[0])
# Fetch user data
user_data_df = pd.read_sql_query("SELECT * FROM user_data", conn)
latest_user_data_row = user_data_df.iloc[[-1]]
user_data_values = (
str(latest_user_data_row['country_name'].values[0]),
str(latest_user_data_row['sdg_number'].values[0]),
str(latest_user_data_row['result'].values[0])
)
# Fetch input data
input_df = pd.read_sql_query("SELECT * FROM inputs", conn)
latest_input_row = input_df.iloc[[-1]]
input_data = (
str(latest_input_row['indicator'].values[0]),
str(latest_input_row['names'].values[0]),
str(latest_input_row['inputs'].values[0])
)
conn.close()
return [user_data] + list(user_data_values) + list(input_data)
def calculate_indicator(user_id, timestamp):
try:
conn = sqlite3.connect("data_storage.db")
# Fetch user data based on user_id and timestamp
user_data_df = pd.read_sql_query("SELECT * FROM user_data WHERE user_id=? AND timestamp=?", conn, params=(user_id, timestamp))
if user_data_df.empty:
return "No data found for the specified user_id and timestamp"
# Extract relevant information
indicator = user_data_df['indicator'].values[0]
names = user_data_df['names'].values[0]
input_first = user_data_df['input_first'].values[0]
# Perform the calculation
if indicator in sdg_formulas_mapping:
formula = sdg_formulas_mapping[indicator]
result = formula(input_first)
return f"Result for {indicator}: {result}"
else:
return f"Indicator {indicator} not found in the dictionary"
except Exception as e:
return f"An error occurred: {e}"