Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import json | |
from io import StringIO | |
from collections import OrderedDict | |
def test(input_json): | |
print("Received input") | |
# Parse the input JSON string | |
try: | |
inputs = json.loads(input_json) | |
except json.JSONDecodeError: | |
inputs = json.loads(input_json.replace("'", '"')) | |
# Accessing input data | |
ids_index = inputs['input']['ids_matrix'] | |
weightsNames = inputs['input']["weights_names"] | |
matrix = inputs['input']["matrix"] | |
weights = inputs['input']["weights"] | |
landuses = inputs['input']["landuse_areas"] | |
attributeMapperDict = inputs['input']["attributeMapperDict"] | |
landuseMapperDict = inputs['input']["landuseMapperDict"] | |
alpha = inputs['input']["alpha"] | |
alpha = float(alpha) | |
threshold = inputs['input']["threshold"] | |
threshold = float(threshold) | |
df_matrix = pd.DataFrame(matrix).T | |
df_weights = pd.DataFrame(weights).T | |
df_landuses = pd.DataFrame(landuses).T | |
df_matrix = df_matrix.round(0).astype(int) | |
df_weights = df_weights.round(0).astype(int) | |
df_landuses = df_landuses.round(0).astype(int) | |
""" | |
if len(A) == len(B): | |
B.index = A | |
else: | |
print("The lengths do not match.") | |
""" | |
# create a mask based on the matrix size and ids, crop activity nodes to the mask | |
mask_connected = df_matrix.index.tolist() | |
valid_indexes = [idx for idx in mask_connected if idx in df_landuses.index] | |
# Identify and report missing indexes | |
missing_indexes = set(mask_connected) - set(valid_indexes) | |
if missing_indexes: | |
print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}") | |
# Apply the filtered mask | |
df_landuses_filtered = df_landuses.loc[valid_indexes] | |
# find a set of unique domains, to which subdomains are aggregated | |
temp = [] | |
for key, values in attributeMapperDict.items(): | |
domain = attributeMapperDict[key]['domain'] | |
for item in domain: | |
if ',' in item: | |
domain_list = item.split(',') | |
attributeMapperDict[key]['domain'] = domain_list | |
for domain in domain_list: | |
temp.append(domain) | |
else: | |
if item != 0: | |
temp.append(item) | |
domainsUnique = list(set(temp)) | |
# find a list of unique subdomains, to which land uses are aggregated | |
temp = [] | |
for key, values in landuseMapperDict.items(): | |
subdomain = str(landuseMapperDict[key]) | |
if subdomain != 0: | |
temp.append(subdomain) | |
subdomainsUnique = list(set(temp)) | |
def landusesToSubdomains(DistanceMatrix, LanduseDf, LanduseToSubdomainDict, UniqueSubdomainsList): | |
df_LivabilitySubdomainsArea = pd.DataFrame(0, index=DistanceMatrix.index, columns=UniqueSubdomainsList) | |
for subdomain in UniqueSubdomainsList: | |
for lu, lu_subdomain in LanduseToSubdomainDict.items(): | |
if lu_subdomain == subdomain: | |
if lu in LanduseDf.columns: | |
df_LivabilitySubdomainsArea[subdomain] = df_LivabilitySubdomainsArea[subdomain].add(LanduseDf[lu], fill_value=0) | |
else: | |
print(f"Warning: Column '{lu}' not found in landuse database") | |
return df_LivabilitySubdomainsArea | |
LivabilitySubdomainsWeights = landusesToSubdomains(df_matrix,df_landuses_filtered,landuseMapperDict,subdomainsUnique) | |
# make a dictionary to output in grasshopper / etc | |
LivabilitySubdomainsWeights_dictionary = LivabilitySubdomainsWeights.to_dict('index') | |
def computeAccessibility (DistanceMatrix,weightsNames, destinationWeights=None,alpha = 0.0038, threshold = 600): | |
decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold) | |
subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=weightsNames) #destinationWeights.columns) | |
# for weighted accessibility (e. g. areas) | |
if not destinationWeights.empty: | |
for col,columnName in zip(destinationWeights.columns, weightsNames): | |
subdomainsAccessibility[columnName] = (decay_factors * destinationWeights[col].values).sum(axis=1) | |
# for unweighted accessibility (e. g. points of interest) | |
else: | |
for columnName in weightsNames: | |
subdomainsAccessibility[columnName] = (decay_factors * 1).sum(axis=1) | |
return subdomainsAccessibility | |
subdomainsAccessibility = computeAccessibility(df_matrix,subdomainsUnique,LivabilitySubdomainsWeights,alpha,threshold) | |
# make a dictionary to output in grasshopper / etc | |
subdomainsAccessibility_dictionary = subdomainsAccessibility.to_dict('index') | |
def remap(value, B_min, B_max, C_min, C_max): | |
return C_min + (((value - B_min) / (B_max - B_min))* (C_max - C_min)) | |
def accessibilityToLivability (DistanceMatrix,subdomainsAccessibility, SubdomainAttributeDict,UniqueDomainsList): | |
livability = pd.DataFrame(index=DistanceMatrix.index, columns=subdomainsAccessibility.columns) | |
livability.fillna(0, inplace=True) | |
for domain in UniqueDomainsList: | |
livability[domain] = 0 | |
# remap accessibility to livability points | |
for key, values in SubdomainAttributeDict.items(): | |
if key in subdomainsAccessibility.columns: | |
domain = [str(item) for item in SubdomainAttributeDict[key]['domain']] | |
threshold = float(SubdomainAttributeDict[key]['thresholds']) | |
max_livability = float(SubdomainAttributeDict[key]['max_points']) | |
sqm_per_employee = SubdomainAttributeDict[key]['sqmPerEmpl'] | |
livability_score = remap(subdomainsAccessibility[key], 0, threshold, 0, max_livability) | |
livability.loc[subdomainsAccessibility[key] >= threshold, key] = max_livability | |
livability.loc[subdomainsAccessibility[key] < threshold, key] = livability_score | |
if any(domain): | |
for item in domain: | |
livability.loc[subdomainsAccessibility[key] >= threshold, item] += max_livability | |
livability.loc[subdomainsAccessibility[key] < threshold, item] += livability_score | |
return livability | |
livability = accessibilityToLivability(df_matrix,subdomainsAccessibility,attributeMapperDict,domainsUnique) | |
livability_dictionary = livability.to_dict('index') | |
# Prepare the output | |
output = { | |
"subdomainsAccessibility_dictionary": subdomainsAccessibility_dictionary, | |
"livability_dictionary": livability_dictionary, | |
"subdomainsArea_dictionary": LivabilitySubdomainsWeights_dictionary | |
} | |
return json.dumps(output) | |
# Define the Gradio interface with a single JSON input | |
iface = gr.Interface( | |
fn=test, | |
inputs=gr.Textbox(label="Input JSON", lines=20, placeholder="Enter JSON with all parameters here..."), | |
outputs=gr.JSON(label="Output JSON"), | |
title="testspace" | |
) | |
iface.launch() |