nastasiasnk's picture
Update app.py
1598fe9 verified
raw
history blame
11.7 kB
import gradio as gr
import pandas as pd
import numpy as np
import json
from io import StringIO
from collections import OrderedDict
from notion_client import Client as client_notion
import os
# Accessing the secret variable
notionToken = os.getenv('notionToken')
if notionToken is None:
raise Exception("Secret token not found. Please check the environment variables.")
else:
print("Secret token found successfully!")
from config import landuseDatabaseId , subdomainAttributesDatabaseId
from imports_utils import fetch_all_database_pages
from imports_utils import get_property_value
from imports_utils import notion
landuse_attributes = fetch_all_database_pages(notion, landuseDatabaseId)
livability_attributes = fetch_all_database_pages(notion, subdomainAttributesDatabaseId)
# fetch the dictionary with landuse - domain pairs
landuseMapperDict ={}
subdomains_unique = []
for page in landuse_attributes:
value_landuse = get_property_value(page, "LANDUSE")
value_subdomain = get_property_value(page, "SUBDOMAIN_LIVEABILITY")
if value_subdomain and value_landuse:
landuseMapperDict[value_landuse] = value_subdomain
if value_subdomain != "":
subdomains_unique.append(value_subdomain)
#subdomains_unique = list(set(subdomains_unique))
# fetch the dictionary with subdomain attribute data
attributeMapperDict ={}
domains_unique = []
for page in livability_attributes:
subdomain = get_property_value(page, "SUBDOMAIN_UNIQUE")
sqm_per_employee = get_property_value(page, "SQM PER EMPL")
thresholds = get_property_value(page, "MANHATTAN THRESHOLD")
max_points = get_property_value(page, "LIVABILITY MAX POINT")
domain = get_property_value(page, "DOMAIN")
if thresholds:
attributeMapperDict[subdomain] = {
'sqmPerEmpl': [sqm_per_employee if sqm_per_employee != "" else 0],
'thresholds': thresholds,
'max_points': max_points,
'domain': [domain if domain != "" else 0]
}
if domain != "":
domains_unique.append(domain)
#domains_unique = list(set(domains_unique))
def test(input_json):
print("Received input")
# Parse the input JSON string
try:
inputs = json.loads(input_json)
except json.JSONDecodeError:
inputs = json.loads(input_json.replace("'", '"'))
# Accessing input data from Grasshopper
matrix = inputs['input']["matrix"]
landuses = inputs['input']["landuse_areas"]
transport_matrix = inputs['input']["transportMatrix"]
#attributeMapperDict = inputs['input']["attributeMapperDict"]
#landuseMapperDict = inputs['input']["landuseMapperDict"]
alpha = inputs['input']["alpha"]
alpha = float(alpha)
threshold = inputs['input']["threshold"]
threshold = float(threshold)
df_matrix = pd.DataFrame(matrix).T
df_matrix = df_matrix.round(0).astype(int)
df_landuses = pd.DataFrame(landuses).T
df_landuses = df_landuses.round(0).astype(int)
# List containing the substrings to check against
tranportModes = ["DRT", "GMT", "HSR"]
# Initialize a dictionary to hold the categorized sub-dictionaries
result_dict = {mode: {} for mode in tranportModes}
# Iterate over the original dictionary to split into sub-dictionaries based on substrings
for key, value in transport_matrix.items():
for mode in tranportModes:
if mode in key: # Check if the substring is in the dictionary key
result_dict[substring][key] = value
art = result_dict["DRT"]
df_art_matrix = pd.DataFrame(art).T
df_art_matrix = df_art_matrix.round(0).astype(int)
if df_art_matrix.empty():
print("transport matrix is empty")
# create a mask based on the matrix size and ids, crop activity nodes to the mask
mask_connected = df_matrix.index.tolist()
valid_indexes = [idx for idx in mask_connected if idx in df_landuses.index]
# Identify and report missing indexes
missing_indexes = set(mask_connected) - set(valid_indexes)
if missing_indexes:
print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}, length: {len(missing_indexes)}")
# Apply the filtered mask
df_landuses_filtered = df_landuses.loc[valid_indexes]
# find a set of unique domains, to which subdomains are aggregated
temp = []
for key, values in attributeMapperDict.items():
domain = attributeMapperDict[key]['domain']
for item in domain:
if ',' in item:
domain_list = item.split(',')
attributeMapperDict[key]['domain'] = domain_list
for domain in domain_list:
temp.append(domain)
else:
if item != 0:
temp.append(item)
domainsUnique = list(set(temp))
# find a list of unique subdomains, to which land uses are aggregated
temp = []
for key, values in landuseMapperDict.items():
subdomain = str(landuseMapperDict[key])
if subdomain != 0:
temp.append(subdomain)
subdomainsUnique = list(set(temp))
def landusesToSubdomains(DistanceMatrix, LanduseDf, LanduseToSubdomainDict, UniqueSubdomainsList):
df_LivabilitySubdomainsArea = pd.DataFrame(0, index=DistanceMatrix.index, columns=UniqueSubdomainsList)
for subdomain in UniqueSubdomainsList:
for lu, lu_subdomain in LanduseToSubdomainDict.items():
if lu_subdomain == subdomain:
if lu in LanduseDf.columns:
df_LivabilitySubdomainsArea[subdomain] = df_LivabilitySubdomainsArea[subdomain].add(LanduseDf[lu], fill_value=0)
else:
print(f"Warning: Column '{lu}' not found in landuse database")
return df_LivabilitySubdomainsArea
LivabilitySubdomainsWeights = landusesToSubdomains(df_matrix,df_landuses_filtered,landuseMapperDict,subdomainsUnique)
def FindWorkplaces (DistanceMatrix,SubdomainAttributeDict,destinationWeights,UniqueSubdomainsList ):
df_LivabilitySubdomainsWorkplaces = pd.DataFrame(0, index=DistanceMatrix.index, columns=['jobs'])
for subdomain in UniqueSubdomainsList:
for key, value_list in SubdomainAttributeDict.items():
sqm_per_empl = float(SubdomainAttributeDict[subdomain]['sqmPerEmpl'][0])
if key in destinationWeights.columns and key == subdomain:
if sqm_per_empl > 0:
df_LivabilitySubdomainsWorkplaces['jobs'] += (round(destinationWeights[key] / sqm_per_empl,2)).fillna(0)
else:
df_LivabilitySubdomainsWorkplaces['jobs'] += 0
return df_LivabilitySubdomainsWorkplaces
WorkplacesNumber = FindWorkplaces(df_matrix,attributeMapperDict,LivabilitySubdomainsWeights,subdomainsUnique)
# prepare an input weights dataframe for the parameter LivabilitySubdomainsInputs
LivabilitySubdomainsInputs =pd.concat([LivabilitySubdomainsWeights, WorkplacesNumber], axis=1)
def computeAccessibility (DistanceMatrix, destinationWeights=None,alpha = 0.0038, threshold = 600):
decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold)
# for weighted accessibility (e. g. areas)
if destinationWeights is not None: #not destinationWeights.empty:
subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=destinationWeights.columns)
for col in destinationWeights.columns:
subdomainsAccessibility[col] = (decay_factors * destinationWeights[col].values).sum(axis=1)
# for unweighted accessibility (e. g. points of interest)
else:
subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=['ART'])
for col in subdomainsAccessibility.columns:
subdomainsAccessibility[col] = (decay_factors * 1).sum(axis=1)
return subdomainsAccessibility
subdomainsAccessibility = computeAccessibility(df_matrix,LivabilitySubdomainsInputs,alpha,threshold)
transportAccessibility = computeAccessibility(df_art_matrix,None,alpha,threshold)
AccessibilityInputs = pd.concat([subdomainsAccessibility, transportAccessibility], axis=1)
def remap(value, B_min, B_max, C_min, C_max):
return C_min + (((value - B_min) / (B_max - B_min))* (C_max - C_min))
if 'jobs' not in subdomainsAccessibility.columns:
print("Error: Column 'jobs' does not exist in the subdomainsAccessibility.")
def accessibilityToLivability (DistanceMatrix,AccessibilityInputs, SubdomainAttributeDict,UniqueDomainsList):
livability = pd.DataFrame(index=DistanceMatrix.index, columns=AccessibilityInputs.columns)
for domain in UniqueDomainsList:
livability[domain] = 0
livability.fillna(0, inplace=True)
templist = []
# remap accessibility to livability points
for key, values in SubdomainAttributeDict.items():
threshold = float(SubdomainAttributeDict[key]['thresholds'])
max_livability = float(SubdomainAttributeDict[key]['max_points'])
domains = [str(item) for item in SubdomainAttributeDict[key]['domain']]
if key in subdomainsAccessibility.columns and key != 'commercial':
livability_score = remap(AccessibilityInputs[key], 0, threshold, 0, max_livability)
livability.loc[AccessibilityInputs[key] >= threshold, key] = max_livability
livability.loc[AccessibilityInputs[key] < threshold, key] = livability_score
if any(domains):
for domain in domains:
if domain != 'Workplaces':
livability.loc[AccessibilityInputs[key] >= threshold, domain] += max_livability
livability.loc[AccessibilityInputs[key] < threshold, domain] += livability_score
elif key == 'commercial':
livability_score = remap(AccessibilityInputs['jobs'], 0, threshold, 0, max_livability)
livability.loc[AccessibilityInputs['jobs'] >= threshold, domains[0]] = max_livability
livability.loc[AccessibilityInputs['jobs'] < threshold, domains[0]] = livability_score
return livability
livability = accessibilityToLivability(df_matrix,AccessibilityInputs,attributeMapperDict,domainsUnique)
livability_dictionary = livability.to_dict('index')
LivabilitySubdomainsInputs_dictionary = LivabilitySubdomainsInputs.to_dict('index')
subdomainsAccessibility_dictionary = AccessibilityInputs.to_dict('index')
# Prepare the output
output = {
"subdomainsAccessibility_dictionary": subdomainsAccessibility_dictionary,
"livability_dictionary": livability_dictionary,
"subdomainsWeights_dictionary": LivabilitySubdomainsInputs_dictionary,
"luDomainMapper": landuseMapperDict,
"attributeMapper": attributeMapperDict
}
return json.dumps(output)
# Define the Gradio interface with a single JSON input
iface = gr.Interface(
fn=test,
inputs=gr.Textbox(label="Input JSON", lines=20, placeholder="Enter JSON with all parameters here..."),
outputs=gr.JSON(label="Output JSON"),
title="testspace"
)
iface.launch()