nastasiasnk's picture
Update app.py
850974d verified
raw
history blame
7.25 kB
import gradio as gr
import pandas as pd
import numpy as np
import json
from io import StringIO
from collections import OrderedDict
def test(input_json):
print("Received input")
# Parse the input JSON string
try:
inputs = json.loads(input_json)
except json.JSONDecodeError:
inputs = json.loads(input_json.replace("'", '"'))
# Accessing input data
ids_index = inputs['input']['ids_matrix']
weightsNames = inputs['input']["weights_names"]
matrix = inputs['input']["matrix"]
weights = inputs['input']["weights"]
landuses = inputs['input']["landuse_areas"]
attributeMapperDict = inputs['input']["attributeMapperDict"]
landuseMapperDict = inputs['input']["landuseMapperDict"]
alpha = inputs['input']["alpha"]
alpha = float(alpha)
threshold = inputs['input']["threshold"]
threshold = float(threshold)
df_matrix = pd.DataFrame(matrix).T
df_weights = pd.DataFrame(weights).T
df_landuses = pd.DataFrame(landuses).T
df_matrix = df_matrix.round(0).astype(int)
df_weights = df_weights.round(0).astype(int)
df_landuses = df_landuses.round(0).astype(int)
"""
if len(A) == len(B):
B.index = A
else:
print("The lengths do not match.")
"""
# create a mask based on the matrix size and ids, crop activity nodes to the mask
mask_connected = df_matrix.index.tolist()
valid_indexes = [idx for idx in mask_connected if idx in df_landuses.index]
# Identify and report missing indexes
missing_indexes = set(mask_connected) - set(valid_indexes)
if missing_indexes:
print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}")
# Apply the filtered mask
df_landuses_filtered = df_landuses.loc[valid_indexes]
# find a set of unique domains, to which subdomains are aggregated
temp = []
for key, values in attributeMapperDict.items():
domain = attributeMapperDict[key]['domain']
for item in domain:
if ',' in item:
domain_list = item.split(',')
attributeMapperDict[key]['domain'] = domain_list
for domain in domain_list:
temp.append(domain)
else:
if item != 0:
temp.append(item)
domainsUnique = list(set(temp))
# find a list of unique subdomains, to which land uses are aggregated
temp = []
for key, values in landuseMapperDict.items():
subdomain = str(landuseMapperDict[key])
if subdomain != 0:
temp.append(subdomain)
subdomainsUnique = list(set(temp))
def landusesToSubdomains(DistanceMatrix, LanduseDf, LanduseToSubdomainDict, UniqueSubdomainsList):
df_LivabilitySubdomainsArea = pd.DataFrame(0, index=DistanceMatrix.index, columns=UniqueSubdomainsList)
for subdomain in UniqueSubdomainsList:
for lu, lu_subdomain in LanduseToSubdomainDict.items():
if lu_subdomain == subdomain:
if lu in LanduseDf.columns:
df_LivabilitySubdomainsArea[subdomain] = df_LivabilitySubdomainsArea[subdomain].add(LanduseDf[lu], fill_value=0)
else:
print(f"Warning: Column '{lu}' not found in landuse database")
return df_LivabilitySubdomainsArea
LivabilitySubdomainsWeights = landusesToSubdomains(df_matrix,df_landuses_filtered,landuseMapperDict,subdomainsUnique)
# make a dictionary to output in grasshopper / etc
LivabilitySubdomainsWeights_dictionary = LivabilitySubdomainsWeights.to_dict('index')
def computeAccessibility (DistanceMatrix,weightsNames, destinationWeights=None,alpha = 0.0038, threshold = 600):
decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold)
subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=weightsNames) #destinationWeights.columns)
# for weighted accessibility (e. g. areas)
if not destinationWeights.empty:
for col,columnName in zip(destinationWeights.columns, weightsNames):
subdomainsAccessibility[columnName] = (decay_factors * destinationWeights[col].values).sum(axis=1)
# for unweighted accessibility (e. g. points of interest)
else:
for columnName in weightsNames:
subdomainsAccessibility[columnName] = (decay_factors * 1).sum(axis=1)
return subdomainsAccessibility
subdomainsAccessibility = computeAccessibility(df_matrix,subdomainsUnique,LivabilitySubdomainsWeights,alpha,threshold)
# make a dictionary to output in grasshopper / etc
subdomainsAccessibility_dictionary = subdomainsAccessibility.to_dict('index')
def remap(value, B_min, B_max, C_min, C_max):
return C_min + (((value - B_min) / (B_max - B_min))* (C_max - C_min))
def accessibilityToLivability (DistanceMatrix,subdomainsAccessibility, SubdomainAttributeDict,UniqueDomainsList):
livability = pd.DataFrame(index=DistanceMatrix.index, columns=subdomainsAccessibility.columns)
livability.fillna(0, inplace=True)
for domain in UniqueDomainsList:
livability[domain] = 0
# remap accessibility to livability points
for key, values in SubdomainAttributeDict.items():
if key in subdomainsAccessibility.columns:
domain = [str(item) for item in SubdomainAttributeDict[key]['domain']]
threshold = float(SubdomainAttributeDict[key]['thresholds'])
max_livability = float(SubdomainAttributeDict[key]['max_points'])
sqm_per_employee = SubdomainAttributeDict[key]['sqmPerEmpl']
livability_score = remap(subdomainsAccessibility[key], 0, threshold, 0, max_livability)
livability.loc[subdomainsAccessibility[key] >= threshold, key] = max_livability
livability.loc[subdomainsAccessibility[key] < threshold, key] = livability_score
if any(domain):
for item in domain:
livability.loc[subdomainsAccessibility[key] >= threshold, item] += max_livability
livability.loc[subdomainsAccessibility[key] < threshold, item] += livability_score
return livability
livability = accessibilityToLivability(df_matrix,subdomainsAccessibility,attributeMapperDict,domainsUnique)
livability_dictionary = livability.to_dict('index')
# Prepare the output
output = {
"subdomainsAccessibility_dictionary": subdomainsAccessibility_dictionary,
"livability_dictionary": livability_dictionary,
"subdomainsArea_dictionary": LivabilitySubdomainsWeights_dictionary
}
return json.dumps(output)
# Define the Gradio interface with a single JSON input
iface = gr.Interface(
fn=test,
inputs=gr.Textbox(label="Input JSON", lines=20, placeholder="Enter JSON with all parameters here..."),
outputs=gr.JSON(label="Output JSON"),
title="testspace"
)
iface.launch()