import gradio as gr import pandas as pd import numpy as np import json from io import StringIO from collections import OrderedDict import os # ---------------------- Accessing data from Notion ---------------------- # from notion_client import Client as client_notion notionToken = os.getenv('notionToken') if notionToken is None: raise Exception("Notion token not found. Please check the environment variables.") else: print("Notion token found successfully!") from config import landuseDatabaseId , subdomainAttributesDatabaseId from imports_utils import fetch_all_database_pages from imports_utils import get_property_value from imports_utils import notion landuse_attributes = fetch_all_database_pages(notion, landuseDatabaseId) livability_attributes = fetch_all_database_pages(notion, subdomainAttributesDatabaseId) # fetch the dictionary with landuse - domain pairs landuseMapperDict ={} subdomains_unique = [] for page in landuse_attributes: value_landuse = get_property_value(page, "LANDUSE") value_subdomain = get_property_value(page, "SUBDOMAIN_LIVEABILITY") if value_subdomain and value_landuse: landuseMapperDict[value_landuse] = value_subdomain if value_subdomain != "": subdomains_unique.append(value_subdomain) # fetch the dictionary with subdomain attribute data attributeMapperDict ={} domains_unique = [] for page in livability_attributes: subdomain = get_property_value(page, "SUBDOMAIN_UNIQUE") sqm_per_employee = get_property_value(page, "SQM PER EMPL") thresholds = get_property_value(page, "MANHATTAN THRESHOLD") max_points = get_property_value(page, "LIVABILITY MAX POINT") domain = get_property_value(page, "DOMAIN") if thresholds: attributeMapperDict[subdomain] = { 'sqmPerEmpl': sqm_per_employee if sqm_per_employee != "" else 0, 'thresholds': thresholds, 'max_points': max_points, 'domain': [domain if domain != "" else 0] } if domain != "": domains_unique.append(domain) # ---------------------- Accessing data from Speckle ---------------------- # from specklepy.api.client import SpeckleClient from specklepy.api.credentials import get_default_account, get_local_accounts from specklepy.transports.server import ServerTransport from specklepy.api import operations from specklepy.objects.geometry import Polyline, Point from specklepy.objects import Base import imports_utils import speckle_utils import data_utils from config import landuseDatabaseId , streamId, dmBranchName, dmCommitId, luBranchName, luCommitId from imports_utils import speckleToken from imports_utils import fetchDistanceMatrices from config import distanceMatrixActivityNodes from config import distanceMatrixTransportStops CLIENT = SpeckleClient(host="https://speckle.xyz/") account = get_default_account() CLIENT.authenticate_with_token(token=speckleToken) streamDistanceMatrices = speckle_utils.getSpeckleStream(streamId,dmBranchName,CLIENT, dmCommitId) matrices = fetchDistanceMatrices (streamDistanceMatrices) streamLanduses = speckle_utils.getSpeckleStream(streamId,luBranchName,CLIENT, luCommitId) streamData = streamLanduses["@Data"]["@{0}"] df_speckle_lu = speckle_utils.get_dataframe(streamData, return_original_df=False) df_lu = df_speckle_lu.copy() # set index column df_lu = df_lu.set_index("ids", drop=False) df_dm = matrices[distanceMatrixActivityNodes] df_dm_transport = matrices[distanceMatrixTransportStops] dm_dictionary = df_dm.to_dict('index') df_dm_transport_dictionary = df_dm_transport.to_dict('index') # filter activity nodes attributes mask_connected = df_dm.index.tolist() lu_columns = [] for name in df_lu.columns: if name.startswith("lu+"): lu_columns.append(name) df_lu_filtered = df_lu[lu_columns].loc[mask_connected] df_lu_filtered.columns = [col.replace('lu+', '') for col in df_lu_filtered.columns] df_lu_filtered_dict = df_lu_filtered.to_dict('index') def test(input_json): print("Received input") # Parse the input JSON string try: inputs = json.loads(input_json) except json.JSONDecodeError: inputs = json.loads(input_json.replace("'", '"')) # ------------------------- Accessing input data from Grasshopper ------------------------- # #if df_dm is None: matrix = inputs['input']["matrix"] #if df_dm_transport is None: matrix_transport = inputs['input']["transportMatrix"] if df_lu_filtered is None: landuses = inputs['input']["landuse_areas"] else: landuses = df_lu_filtered attributeMapperDict_gh = inputs['input']["attributeMapperDict"] landuseMapperDict_gh = inputs['input']["landuseMapperDict"] alpha = inputs['input']["alpha"] alpha = float(alpha) threshold = inputs['input']["threshold"] threshold = float(threshold) df_matrix = pd.DataFrame(matrix).T df_matrix = df_matrix.round(0).astype(int) df_landuses = pd.DataFrame(landuses).T df_landuses = df_landuses.round(0).astype(int) from imports_utils import splitDictByStrFragmentInColumnName # List containing the substrings to check against tranportModes = ["DRT", "GMT", "HSR"] result_dicts = splitDictByStrFragmentInColumnName(df_dm_transport_dictionary, tranportModes) # Accessing each dictionary art_dict = result_dicts["DRT"] gmt_dict = result_dicts["GMT"] df_art_matrix = pd.DataFrame(art_dict).T df_art_matrix = df_art_matrix.round(0).astype(int) df_gmt_matrix = pd.DataFrame(gmt_dict).T df_gmt_matrix = df_art_matrix.round(0).astype(int) # create a mask based on the matrix size and ids, crop activity nodes to the mask mask_connected = df_dm.index.tolist() valid_indexes = [idx for idx in mask_connected if idx in df_landuses.index] # Identify and report missing indexes missing_indexes = set(mask_connected) - set(valid_indexes) if missing_indexes: print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}, length: {len(missing_indexes)}") # Apply the filtered mask df_landuses_filtered = df_landuses.loc[valid_indexes] # find a set of unique domains, to which subdomains are aggregated temp = [] for key, values in attributeMapperDict.items(): domain = attributeMapperDict[key]['domain'] for item in domain: if ',' in item: domain_list = item.split(',') attributeMapperDict[key]['domain'] = domain_list for domain in domain_list: temp.append(domain) else: if item != 0: temp.append(item) domainsUnique = list(set(temp)) # find a list of unique subdomains, to which land uses are aggregated temp = [] for key, values in landuseMapperDict.items(): subdomain = str(landuseMapperDict[key]) if subdomain != 0: temp.append(subdomain) subdomainsUnique = list(set(temp)) from imports_utils import landusesToSubdomains from imports_utils import FindWorkplacesNumber from imports_utils import computeAccessibility from imports_utils import computeAccessibility_pointOfInterest from imports_utils import remap from imports_utils import accessibilityToLivability LivabilitySubdomainsWeights = landusesToSubdomains(df_dm,df_landuses_filtered,landuseMapperDict,subdomainsUnique) WorkplacesNumber = FindWorkplacesNumber(df_dm,attributeMapperDict,LivabilitySubdomainsWeights,subdomainsUnique) # prepare an input weights dataframe for the parameter LivabilitySubdomainsInputs LivabilitySubdomainsInputs =pd.concat([LivabilitySubdomainsWeights, WorkplacesNumber], axis=1) subdomainsAccessibility = computeAccessibility(df_dm,LivabilitySubdomainsInputs,alpha,threshold) artAccessibility = computeAccessibility_pointOfInterest(df_art_matrix,'ART',alpha,threshold) gmtAccessibility = computeAccessibility_pointOfInterest(df_gmt_matrix,'GMT+HSR',alpha,threshold) AccessibilityInputs = pd.concat([subdomainsAccessibility, artAccessibility,gmtAccessibility], axis=1) if 'jobs' not in subdomainsAccessibility.columns: print("Error: Column 'jobs' does not exist in the subdomainsAccessibility.") livability = accessibilityToLivability(df_dm,AccessibilityInputs,attributeMapperDict,domainsUnique) livability_dictionary = livability.to_dict('index') LivabilitySubdomainsInputs_dictionary = LivabilitySubdomainsInputs.to_dict('index') subdomainsAccessibility_dictionary = AccessibilityInputs.to_dict('index') artmatrix = df_art_matrix.to_dict('index') # Prepare the output output = { "subdomainsAccessibility_dictionary": subdomainsAccessibility_dictionary, "livability_dictionary": livability_dictionary, "subdomainsWeights_dictionary": LivabilitySubdomainsInputs_dictionary, "luDomainMapper": landuseMapperDict, "attributeMapper": attributeMapperDict, "fetchDm": dm_dictionary, "landuses":df_lu_filtered_dict } return json.dumps(output) # Define the Gradio interface with a single JSON input iface = gr.Interface( fn=test, inputs=gr.Textbox(label="Input JSON", lines=20, placeholder="Enter JSON with all parameters here..."), outputs=gr.JSON(label="Output JSON"), title="testspace" ) iface.launch()