import gradio as gr import pandas as pd import numpy as np import json from io import StringIO from collections import OrderedDict from notion_client import Client as client_notion import os # Accessing the secret variable notionToken = os.getenv('notionToken') if notionToken is None: raise Exception("Secret token not found. Please check the environment variables.") else: print("Secret token found successfully!") from config import landuseDatabaseId , subdomainAttributesDatabaseId from imports_utils import fetch_all_database_pages from imports_utils import get_property_value from imports_utils import notion landuse_attributes = fetch_all_database_pages(notion, landuseDatabaseId) livability_attributes = fetch_all_database_pages(notion, subdomainAttributesDatabaseId) # fetch the dictionary with landuse - domain pairs landuseMapperDict ={} subdomains_unique = [] for page in landuse_attributes: value_landuse = get_property_value(page, "LANDUSE") value_subdomain = get_property_value(page, "SUBDOMAIN_LIVEABILITY") if value_subdomain and value_landuse: landuseMapperDict[value_landuse] = value_subdomain if value_subdomain != "": subdomains_unique.append(value_subdomain) #subdomains_unique = list(set(subdomains_unique)) # fetch the dictionary with subdomain attribute data attributeMapperDict ={} domains_unique = [] for page in livability_attributes: subdomain = get_property_value(page, "SUBDOMAIN_UNIQUE") sqm_per_employee = get_property_value(page, "SQM PER EMPL") thresholds = get_property_value(page, "MANHATTAN THRESHOLD") max_points = get_property_value(page, "LIVABILITY MAX POINT") domain = get_property_value(page, "DOMAIN") if thresholds: attributeMapperDict[subdomain] = { 'sqmPerEmpl': [sqm_per_employee if sqm_per_employee != "" else 0], 'thresholds': thresholds, 'max_points': max_points, 'domain': [domain if domain != "" else 0] } if domain != "": domains_unique.append(domain) #domains_unique = list(set(domains_unique)) def test(input_json): print("Received input") # Parse the input JSON string try: inputs = json.loads(input_json) except json.JSONDecodeError: inputs = json.loads(input_json.replace("'", '"')) # Accessing input data from Grasshopper matrix = inputs['input']["matrix"] landuses = inputs['input']["landuse_areas"] transport_matrix = inputs['input']["transportMatrix"] #attributeMapperDict = inputs['input']["attributeMapperDict"] #landuseMapperDict = inputs['input']["landuseMapperDict"] alpha = inputs['input']["alpha"] alpha = float(alpha) threshold = inputs['input']["threshold"] threshold = float(threshold) df_matrix = pd.DataFrame(matrix).T df_matrix = df_matrix.round(0).astype(int) df_landuses = pd.DataFrame(landuses).T df_landuses = df_landuses.round(0).astype(int) # List containing the substrings to check against tranportModes = ["DRT", "GMT", "HSR"] # Initialize a dictionary to hold the categorized sub-dictionaries result_dict = {mode: {} for mode in tranportModes} # Iterate over the original dictionary to split into sub-dictionaries based on substrings for key, value in transport_matrix.items(): for mode in tranportModes: if mode in key: # Check if the substring is in the dictionary key result_dict[substring][key] = value art = result_dict["DRT"] df_art_matrix = pd.DataFrame(art).T df_art_matrix = df_art_matrix.round(0).astype(int) if df_art_matrix.empty(): print("transport matrix is empty") # create a mask based on the matrix size and ids, crop activity nodes to the mask mask_connected = df_matrix.index.tolist() valid_indexes = [idx for idx in mask_connected if idx in df_landuses.index] # Identify and report missing indexes missing_indexes = set(mask_connected) - set(valid_indexes) if missing_indexes: print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}, length: {len(missing_indexes)}") # Apply the filtered mask df_landuses_filtered = df_landuses.loc[valid_indexes] # find a set of unique domains, to which subdomains are aggregated temp = [] for key, values in attributeMapperDict.items(): domain = attributeMapperDict[key]['domain'] for item in domain: if ',' in item: domain_list = item.split(',') attributeMapperDict[key]['domain'] = domain_list for domain in domain_list: temp.append(domain) else: if item != 0: temp.append(item) domainsUnique = list(set(temp)) # find a list of unique subdomains, to which land uses are aggregated temp = [] for key, values in landuseMapperDict.items(): subdomain = str(landuseMapperDict[key]) if subdomain != 0: temp.append(subdomain) subdomainsUnique = list(set(temp)) def landusesToSubdomains(DistanceMatrix, LanduseDf, LanduseToSubdomainDict, UniqueSubdomainsList): df_LivabilitySubdomainsArea = pd.DataFrame(0, index=DistanceMatrix.index, columns=UniqueSubdomainsList) for subdomain in UniqueSubdomainsList: for lu, lu_subdomain in LanduseToSubdomainDict.items(): if lu_subdomain == subdomain: if lu in LanduseDf.columns: df_LivabilitySubdomainsArea[subdomain] = df_LivabilitySubdomainsArea[subdomain].add(LanduseDf[lu], fill_value=0) else: print(f"Warning: Column '{lu}' not found in landuse database") return df_LivabilitySubdomainsArea LivabilitySubdomainsWeights = landusesToSubdomains(df_matrix,df_landuses_filtered,landuseMapperDict,subdomainsUnique) def FindWorkplaces (DistanceMatrix,SubdomainAttributeDict,destinationWeights,UniqueSubdomainsList ): df_LivabilitySubdomainsWorkplaces = pd.DataFrame(0, index=DistanceMatrix.index, columns=['jobs']) for subdomain in UniqueSubdomainsList: for key, value_list in SubdomainAttributeDict.items(): sqm_per_empl = float(SubdomainAttributeDict[subdomain]['sqmPerEmpl'][0]) if key in destinationWeights.columns and key == subdomain: if sqm_per_empl > 0: df_LivabilitySubdomainsWorkplaces['jobs'] += (round(destinationWeights[key] / sqm_per_empl,2)).fillna(0) else: df_LivabilitySubdomainsWorkplaces['jobs'] += 0 return df_LivabilitySubdomainsWorkplaces WorkplacesNumber = FindWorkplaces(df_matrix,attributeMapperDict,LivabilitySubdomainsWeights,subdomainsUnique) # prepare an input weights dataframe for the parameter LivabilitySubdomainsInputs LivabilitySubdomainsInputs =pd.concat([LivabilitySubdomainsWeights, WorkplacesNumber], axis=1) def computeAccessibility (DistanceMatrix, destinationWeights=None,alpha = 0.0038, threshold = 600): decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold) # for weighted accessibility (e. g. areas) if destinationWeights is not None: #not destinationWeights.empty: subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=destinationWeights.columns) for col in destinationWeights.columns: subdomainsAccessibility[col] = (decay_factors * destinationWeights[col].values).sum(axis=1) # for unweighted accessibility (e. g. points of interest) else: subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=['ART']) for col in subdomainsAccessibility.columns: subdomainsAccessibility[col] = (decay_factors * 1).sum(axis=1) return subdomainsAccessibility subdomainsAccessibility = computeAccessibility(df_matrix,LivabilitySubdomainsInputs,alpha,threshold) transportAccessibility = computeAccessibility(df_art_matrix,None,alpha,threshold) AccessibilityInputs = pd.concat([subdomainsAccessibility, transportAccessibility], axis=1) def remap(value, B_min, B_max, C_min, C_max): return C_min + (((value - B_min) / (B_max - B_min))* (C_max - C_min)) if 'jobs' not in subdomainsAccessibility.columns: print("Error: Column 'jobs' does not exist in the subdomainsAccessibility.") def accessibilityToLivability (DistanceMatrix,AccessibilityInputs, SubdomainAttributeDict,UniqueDomainsList): livability = pd.DataFrame(index=DistanceMatrix.index, columns=AccessibilityInputs.columns) for domain in UniqueDomainsList: livability[domain] = 0 livability.fillna(0, inplace=True) templist = [] # remap accessibility to livability points for key, values in SubdomainAttributeDict.items(): threshold = float(SubdomainAttributeDict[key]['thresholds']) max_livability = float(SubdomainAttributeDict[key]['max_points']) domains = [str(item) for item in SubdomainAttributeDict[key]['domain']] if key in subdomainsAccessibility.columns and key != 'commercial': livability_score = remap(AccessibilityInputs[key], 0, threshold, 0, max_livability) livability.loc[AccessibilityInputs[key] >= threshold, key] = max_livability livability.loc[AccessibilityInputs[key] < threshold, key] = livability_score if any(domains): for domain in domains: if domain != 'Workplaces': livability.loc[AccessibilityInputs[key] >= threshold, domain] += max_livability livability.loc[AccessibilityInputs[key] < threshold, domain] += livability_score elif key == 'commercial': livability_score = remap(AccessibilityInputs['jobs'], 0, threshold, 0, max_livability) livability.loc[AccessibilityInputs['jobs'] >= threshold, domains[0]] = max_livability livability.loc[AccessibilityInputs['jobs'] < threshold, domains[0]] = livability_score return livability livability = accessibilityToLivability(df_matrix,AccessibilityInputs,attributeMapperDict,domainsUnique) livability_dictionary = livability.to_dict('index') LivabilitySubdomainsInputs_dictionary = LivabilitySubdomainsInputs.to_dict('index') subdomainsAccessibility_dictionary = AccessibilityInputs.to_dict('index') # Prepare the output output = { "subdomainsAccessibility_dictionary": subdomainsAccessibility_dictionary, "livability_dictionary": livability_dictionary, "subdomainsWeights_dictionary": LivabilitySubdomainsInputs_dictionary, "luDomainMapper": landuseMapperDict, "attributeMapper": attributeMapperDict } return json.dumps(output) # Define the Gradio interface with a single JSON input iface = gr.Interface( fn=test, inputs=gr.Textbox(label="Input JSON", lines=20, placeholder="Enter JSON with all parameters here..."), outputs=gr.JSON(label="Output JSON"), title="testspace" ) iface.launch()