Spaces:
Sleeping
Sleeping
import sys | |
from specklepy.api.client import SpeckleClient | |
from specklepy.api.credentials import get_default_account, get_local_accounts | |
from specklepy.transports.server import ServerTransport | |
from specklepy.api import operations | |
from specklepy.objects.geometry import Polyline, Point | |
from specklepy.objects import Base | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import math | |
import matplotlib | |
import json | |
from notion_client import Client | |
import os | |
from config import landuseColumnName | |
from config import subdomainColumnName | |
from config import sqmPerEmployeeColumnName | |
from config import thresholdsColumnName | |
from config import maxPointsColumnName | |
from config import domainColumnName | |
from config import landuseDatabaseId , streamId, dmBranchName, dmCommitId, luBranchName, luCommitId | |
import speckle_utils | |
import data_utils | |
notionToken = os.getenv('notionToken') | |
#notion = Client(auth=notionToken) | |
speckleToken = os.getenv('speckleToken') | |
# ---------------------------------------------------------------------------------- | |
# query full database | |
def fetch_all_database_pages(client, database_id): | |
""" | |
Fetches all pages from a specified Notion database. | |
:param client: Initialized Notion client. | |
:param database_id: The ID of the Notion database to query. | |
:return: A list containing all pages from the database. | |
""" | |
start_cursor = None | |
all_pages = [] | |
while True: | |
response = client.databases.query( | |
**{ | |
"database_id": database_id, | |
"start_cursor": start_cursor | |
} | |
) | |
all_pages.extend(response['results']) | |
# Check if there's more data to fetch | |
if response['has_more']: | |
start_cursor = response['next_cursor'] | |
else: | |
break | |
return all_pages | |
def get_property_value(page, property_name): | |
""" | |
Extracts the value from a specific property in a Notion page based on its type. | |
:param page: The Notion page data as retrieved from the API. | |
:param property_name: The name of the property whose value is to be fetched. | |
:return: The value or values contained in the specified property, depending on type. | |
""" | |
# Check if the property exists in the page | |
if property_name not in page['properties']: | |
return None # or raise an error if you prefer | |
property_data = page['properties'][property_name] | |
prop_type = property_data['type'] | |
# Handle 'title' and 'rich_text' types | |
if prop_type in ['title', 'rich_text']: | |
return ''.join(text_block['text']['content'] for text_block in property_data[prop_type]) | |
# Handle 'number' type | |
elif prop_type == 'number': | |
return property_data[prop_type] | |
# Handle 'select' type | |
elif prop_type == 'select': | |
return property_data[prop_type]['name'] if property_data[prop_type] else None | |
# Handle 'multi_select' type | |
elif prop_type == 'multi_select': | |
return [option['name'] for option in property_data[prop_type]] | |
# Handle 'date' type | |
elif prop_type == 'date': | |
if property_data[prop_type]['end']: | |
return (property_data[prop_type]['start'], property_data[prop_type]['end']) | |
else: | |
return property_data[prop_type]['start'] | |
# Handle 'relation' type | |
elif prop_type == 'relation': | |
return [relation['id'] for relation in property_data[prop_type]] | |
# Handle 'people' type | |
elif prop_type == 'people': | |
return [person['name'] for person in property_data[prop_type] if 'name' in person] | |
# Add more handlers as needed for other property types | |
else: | |
# Return None or raise an error for unsupported property types | |
return None | |
def get_page_by_id(notion_db_pages, page_id): | |
for pg in notion_db_pages: | |
if pg["id"] == page_id: | |
return pg | |
""" | |
def fetchDomainMapper (luAttributePages): | |
lu_domain_mapper ={} | |
#subdomains_unique = [] | |
for page in luAttributePages: | |
value_landuse = get_property_value(page, landuseColumnName) | |
value_subdomain = get_property_value(page, subdomainColumnName) | |
origin = "false" if not get_property_value(page, "is_origin_mask") else get_property_value(page, "is_origin_mask") | |
if value_subdomain and value_landuse: | |
lu_domain_mapper[value_landuse] = { | |
'subdomain livability': value_subdomain, | |
'is origin': origin | |
} | |
#lu_domain_mapper[value_landuse] = value_subdomain | |
#if value_subdomain != "": | |
#subdomains_unique.append(value_subdomain) | |
#subdomains_unique = list(set(subdomains_unique)) | |
return lu_domain_mapper | |
def fetchSubdomainMapper (livabilityAttributePages): | |
attribute_mapper ={} | |
domains_unique = [] | |
for page in livabilityAttributePages: | |
subdomain = get_property_value(page, subdomainColumnName) | |
sqm_per_employee = get_property_value(page, sqmPerEmployeeColumnName) | |
thresholds = get_property_value(page, thresholdsColumnName) | |
max_points = get_property_value(page, maxPointsColumnName) | |
domain = get_property_value(page, domainColumnName) | |
if thresholds: | |
attribute_mapper[subdomain] = { | |
'sqmPerEmpl': sqm_per_employee if sqm_per_employee != "" else 0, | |
'thresholds': thresholds, | |
'max_points': max_points, | |
'domain': [domain if domain != "" else 0] | |
} | |
if domain != "": | |
domains_unique.append(domain) | |
#domains_unique = list(set(domains_unique)) | |
return attribute_mapper | |
""" | |
# --------------------------------------------------------------------------------------------- # | |
def getDataFromSpeckle( | |
speckleClient, | |
streamID, | |
matrixBranchName, | |
landuseBranchName, | |
matrixComitID="", | |
landuseComitID="", | |
pathToData = ["@Data", "@{0}"], | |
uuidColumn = "uuid", | |
landuseColumns="lu+" | |
): | |
if landuseBranchName: | |
streamLanduses = speckle_utils.getSpeckleStream(streamId,luBranchName,speckleClient, luCommitId) | |
streamData = streamLanduses["@Data"]["@{0}"] | |
dfLanduses = speckle_utils.get_dataframe(streamData, return_original_df=False) | |
dfLanduses = dfLanduses.set_index("uuid", drop=False) # variable, uuid as default | |
if type(landuseColumns) == type("s"): | |
# extract landuse columns with "landuseColumns" | |
landuse_columns = [] | |
for name in dfLanduses.columns: | |
if name.startswith(landuseColumns): | |
landuse_columns.append(name) | |
elif type(landuseColumns) == type([]): | |
#assmuming the user provided a lsit of columns | |
landuse_columns = landuseColumns | |
dfLanduses_filtered = dfLanduses[landuse_columns] | |
dfLanduses_filtered.columns = [col.replace('lu+', '') for col in dfLanduses_filtered.columns] | |
if matrixBranchName: | |
streamObj = speckle_utils.getSpeckleStream(streamId,dmBranchName,speckleClient, dmCommitId) | |
matrices = {} | |
isDict = False | |
try: | |
data_part = streamObj["@Data"]["@{0}"] | |
for matrix in data_part: | |
# Find the matrix name | |
matrix_name = next((attr for attr in dir(matrix) if "matrix" in attr), None) | |
if not matrix_name: | |
continue | |
matrix_data = getattr(matrix, matrix_name) | |
originUUID = matrix_data["@originUUID"] | |
destinationUUID = matrix_data["@destinationUUID"] | |
processed_rows = [] | |
for chunk in matrix_data["@chunks"]: | |
for row in chunk["@rows"]: | |
processed_rows.append(row["@row"]) | |
matrix_array = np.array(processed_rows) | |
matrix_df = pd.DataFrame(matrix_array, index=originUUID, columns=destinationUUID) | |
matrices[matrix_name] = matrix_df | |
except KeyError: | |
data_part = streamObj["@Data"].__dict__ | |
print(data_part.keys()) | |
for k, v in data_part.items(): | |
if "matrix" in k: | |
matrix_name = k | |
matrix_data = v | |
originUUID = matrix_data["@originUUID"] | |
destinationUUID = matrix_data["@destinationUUID"] | |
processed_rows = [] | |
for chunk in matrix_data["@chunks"]: | |
for row in chunk["@rows"]: | |
processed_rows.append(row["@row"]) | |
matrix_array = np.array(processed_rows) | |
matrix_df = pd.DataFrame(matrix_array, index=originUUID, columns=destinationUUID) | |
matrices[matrix_name] = matrix_df | |
return dfLanduses_filtered, matrices | |
def getDataFromNotion( | |
notion, | |
notionToken, | |
landuseDatabaseID, | |
subdomainDatabaseID, | |
landuseColumnName ="LANDUSE", | |
subdomainColumnName ="SUBDOMAIN_LIVABILITY", | |
sqmPerEmployeeColumnName = "SQM PER EMPL", | |
thresholdsColumnName="MANHATTAN THRESHOLD", | |
maxPointsColumnName = "LIVABILITY MAX POINT", | |
domainColumnName = "DOMAIN_LIVABILITY" | |
): | |
landuse_attributes = fetch_all_database_pages(notion, landuseDatabaseID) | |
livability_attributes = fetch_all_database_pages(notion, subdomainDatabaseID) | |
landuseMapperDict ={} | |
livabilityMapperDict ={} | |
for page in landuse_attributes: | |
value_landuse = get_property_value(page, landuseColumnName) | |
value_subdomain = get_property_value(page, subdomainColumnName) | |
origin = "false" if not get_property_value(page, "is_origin_mask") else get_property_value(page, "is_origin_mask") | |
if value_subdomain and value_landuse: | |
landuseMapperDict[value_landuse] = { | |
'subdomain livability': value_subdomain, | |
'is origin': origin | |
} | |
for page in livability_attributes: | |
subdomain = get_property_value(page, subdomainColumnName) | |
sqm_per_employee = get_property_value(page, sqmPerEmployeeColumnName) | |
thresholds = get_property_value(page, thresholdsColumnName) | |
max_points = get_property_value(page, maxPointsColumnName) | |
domain = get_property_value(page, domainColumnName) | |
if thresholds: | |
livabilityMapperDict[subdomain] = { | |
'sqmPerEmpl': sqm_per_employee if sqm_per_employee != "" else 0, | |
'thresholds': thresholds, | |
'max_points': max_points, | |
'domain': [domain if domain != "" else 0] | |
} | |
return landuseMapperDict, livabilityMapperDict | |
def getDataFromGrasshopper( | |
inputJson, | |
inputNameMatrix, | |
inputNameLanduse, | |
inputNameAttributeMapper, | |
inputNameLanduseMapper, | |
inputNameAlpha = "alpha", | |
inputNameThreshold = "threshold" | |
): | |
if inputNameMatrix is not None: | |
matrix = inputJson['input'][inputNameMatrix] | |
dfMatrix_gh = pd.DataFrame(matrix).T | |
dfMatrix_gh = dfMatrix_gh.apply(pd.to_numeric, errors='coerce') | |
dfMatrix_gh = dfMatrix_gh.replace([np.inf, -np.inf], 10000).fillna(0) | |
dfMatrix_gh = dfMatrix_gh.round(0).astype(int) | |
mask_connected = dfMatrix_gh.index.tolist() | |
else: | |
dfMatrix_gh = None | |
if inputNameLanduse is not None: | |
landuses = inputJson['input'][inputNameLanduse] | |
dfLanduses_gh = pd.DataFrame(landuses).T | |
dfLanduses_gh = dfLanduses_gh.apply(pd.to_numeric, errors='coerce') | |
dfLanduses_gh = dfLanduses_gh.replace([np.inf, -np.inf], 0).fillna(0) # cleaning function? | |
dfLanduses_gh = dfLanduses_gh.round(0).astype(int) | |
if dfMatrix_gh is not None: | |
valid_indexes = [idx for idx in mask_connected if idx in dfLanduses_gh.index] | |
# Identify and report missing indexes | |
missing_indexes = set(mask_connected) - set(valid_indexes) | |
if missing_indexes: | |
print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}, length: {len(missing_indexes)}") | |
# Apply the filtered mask | |
dfLanduses_gh = dfLanduses_gh.loc[valid_indexes] | |
else: | |
dfLanduses_gh = None | |
if inputNameAttributeMapper is not None: | |
attributeMapperDict_gh = inputJson['input'][inputNameAttributeMapper] | |
else: | |
attributeMapperDict_gh = None | |
if inputNameLanduseMapper is not None: | |
landuseMapperDict_gh = inputJson['input'][inputNameLanduseMapper] | |
else: | |
landuseMapperDict_gh = None | |
if inputNameAlpha is not None: | |
alpha = inputJson['input'][inputNameAlpha] | |
alpha = float(alpha) | |
if alpha is None: | |
alpha = alphaDefault | |
else: | |
alpha = alphaDefault | |
if inputNameThreshold is not None: | |
threshold = inputJson['input'][inputNameThreshold] | |
threshold = float(threshold) | |
if threshold is None: | |
threshold = thresholdDefault | |
else: | |
threshold = thresholdDefault | |
return dfMatrix_gh, dfLanduses_gh, attributeMapperDict_gh, landuseMapperDict_gh, alpha, threshold | |
def splitDictByStrFragmentInColumnName(original_dict, substrings): | |
result_dicts = {substring: {} for substring in substrings} | |
for key, nested_dict in original_dict.items(): | |
for subkey, value in nested_dict.items(): | |
for substring in substrings: | |
if substring in subkey: | |
if key not in result_dicts[substring]: | |
result_dicts[substring][key] = {} | |
result_dicts[substring][key][subkey] = value | |
return result_dicts | |
def landusesToSubdomains(DistanceMatrix, LanduseDf, LanduseToSubdomainDict, UniqueSubdomainsList): | |
df_LivabilitySubdomainsArea = pd.DataFrame(0, index=DistanceMatrix.index, columns=UniqueSubdomainsList) | |
for subdomain in UniqueSubdomainsList: | |
for lu, attributes in LanduseToSubdomainDict.items(): | |
if attributes["subdomain livability"] == subdomain: | |
if lu in LanduseDf.columns: | |
if LanduseDf[lu].notna().any(): | |
df_LivabilitySubdomainsArea[subdomain] = df_LivabilitySubdomainsArea[subdomain].add(LanduseDf[lu], fill_value=0) | |
else: | |
print(f"Warning: Column '{lu}' not found in landuse database") | |
return df_LivabilitySubdomainsArea | |
def FindWorkplacesNumber (DistanceMatrix,livabilityMapperDict,destinationWeights,UniqueSubdomainsList ): | |
df_LivabilitySubdomainsWorkplaces = pd.DataFrame(0, index=DistanceMatrix.index, columns=['jobs']) | |
for subdomain in UniqueSubdomainsList: | |
for key, values in livabilityMapperDict.items(): | |
if key and values['sqmPerEmpl']: | |
sqm_per_empl = float(livabilityMapperDict[subdomain]['sqmPerEmpl']) | |
if key in destinationWeights.columns and key == subdomain: | |
if sqm_per_empl > 0: | |
df_LivabilitySubdomainsWorkplaces['jobs'] += (round(destinationWeights[key] / sqm_per_empl,2)).fillna(0) | |
else: | |
df_LivabilitySubdomainsWorkplaces['jobs'] += 0 | |
else: | |
df_LivabilitySubdomainsWorkplaces['jobs'] += 0 | |
return df_LivabilitySubdomainsWorkplaces | |
def computeAccessibility (DistanceMatrix, destinationWeights=None,alpha = 0.0038, threshold = 600): | |
decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold) | |
# for weighted accessibility (e. g. areas) | |
if destinationWeights is not None: #not destinationWeights.empty: | |
subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=destinationWeights.columns) | |
for col in destinationWeights.columns: | |
subdomainsAccessibility[col] = (decay_factors * destinationWeights[col].values).sum(axis=1) | |
else: | |
print("Destination weights parameter is None") | |
return subdomainsAccessibility | |
def computeAccessibility_pointOfInterest (DistanceMatrix, columnName, alpha = 0.0038, threshold = 600): | |
decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold) | |
pointOfInterestAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=[columnName]) | |
for col in pointOfInterestAccessibility.columns: | |
pointOfInterestAccessibility[col] = (decay_factors * 1).sum(axis=1) | |
return pointOfInterestAccessibility | |
def remap(value, B_min, B_max, C_min, C_max): | |
return C_min + (((value - B_min) / (B_max - B_min))* (C_max - C_min)) | |
def accessibilityToLivability (DistanceMatrix,accessibilityInputs, SubdomainAttributeDict,UniqueDomainsList): | |
livability = pd.DataFrame(index=DistanceMatrix.index, columns=accessibilityInputs.columns) | |
for domain in UniqueDomainsList: | |
livability[domain] = 0 | |
livability.fillna(0, inplace=True) | |
templist = [] | |
# remap accessibility to livability points | |
for key, values in SubdomainAttributeDict.items(): | |
threshold = float(SubdomainAttributeDict[key]['thresholds']) | |
max_livability = float(SubdomainAttributeDict[key]['max_points']) | |
domains = [str(item) for item in SubdomainAttributeDict[key]['domain']] | |
if key in accessibilityInputs.columns and key != 'commercial': | |
livability_score = remap(accessibilityInputs[key], 0, threshold, 0, max_livability) | |
livability.loc[accessibilityInputs[key] >= threshold, key] = max_livability | |
livability.loc[accessibilityInputs[key] < threshold, key] = livability_score | |
if any(domains): | |
for domain in domains: | |
if domain != 'Workplaces': | |
livability.loc[accessibilityInputs[key] >= threshold, domain] += max_livability | |
livability.loc[accessibilityInputs[key] < threshold, domain] += livability_score | |
elif key == 'commercial': | |
livability_score = remap(accessibilityInputs['jobs'], 0, threshold, 0, max_livability) | |
livability.loc[accessibilityInputs['jobs'] >= threshold, domains[0]] = max_livability | |
livability.loc[accessibilityInputs['jobs'] < threshold, domains[0]] = livability_score | |
return livability | |
def findUniqueDomains (livabilityMapperDict): | |
# find a set of unique domains, to which subdomains are aggregated | |
temp = [] | |
domain_list = [] | |
for key, values in livabilityMapperDict.items(): | |
domain = livabilityMapperDict[key]['domain'] | |
for item in domain: | |
if ',' in item: | |
domain_list = item.split(',') | |
livabilityMapperDict[key]['domain'] = domain_list | |
for domain in domain_list: | |
temp.append(domain) | |
else: | |
if item != 0: | |
temp.append(item) | |
domainsUnique = list(set(temp)) | |
return domainsUnique | |
def findUniqueSubdomains (landuseMapperDict): | |
# find a list of unique subdomains, to which land uses are aggregated | |
temp = [] | |
for key, values in landuseMapperDict.items(): | |
subdomain = str(landuseMapperDict[key]["subdomain livability"]) | |
if subdomain != 0: | |
temp.append(subdomain) | |
subdomainsUnique = list(set(temp)) | |
return subdomainsUnique | |