Spaces:
Sleeping
Sleeping
import sys | |
#import other libaries | |
from specklepy.api.client import SpeckleClient | |
from specklepy.api.credentials import get_default_account, get_local_accounts | |
from specklepy.transports.server import ServerTransport | |
from specklepy.api import operations | |
from specklepy.objects.geometry import Polyline, Point | |
from specklepy.objects import Base | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
#import seaborn as sns | |
import math | |
import matplotlib | |
#from google.colab import files | |
import json | |
from notion_client import Client | |
import os | |
# Fetch the token securely from environment variables | |
notion_token = os.getenv('notionToken') | |
# Initialize the Notion client with your token | |
notion = Client(auth=notion_token) | |
# ---------------------------------------------------------------------------------- | |
speckleToken = os.getenv('speckleToken') | |
if speckleToken is None: | |
raise Exception("Speckle token not found") | |
else: | |
print("Speckle token found successfully!") | |
#CLIENT = SpeckleClient(host="https://speckle.xyz/") | |
#CLIENT.authenticate_with_token(token=userdata.get(speckleToken)) | |
CLIENT = SpeckleClient(host="https://speckle.xyz/") | |
account = get_default_account() | |
CLIENT.authenticate(token=speckleToken) | |
# query full database | |
def fetch_all_database_pages(client, database_id): | |
""" | |
Fetches all pages from a specified Notion database. | |
:param client: Initialized Notion client. | |
:param database_id: The ID of the Notion database to query. | |
:return: A list containing all pages from the database. | |
""" | |
start_cursor = None | |
all_pages = [] | |
while True: | |
response = client.databases.query( | |
**{ | |
"database_id": database_id, | |
"start_cursor": start_cursor | |
} | |
) | |
all_pages.extend(response['results']) | |
# Check if there's more data to fetch | |
if response['has_more']: | |
start_cursor = response['next_cursor'] | |
else: | |
break | |
return all_pages | |
def get_property_value(page, property_name): | |
""" | |
Extracts the value from a specific property in a Notion page based on its type. | |
:param page: The Notion page data as retrieved from the API. | |
:param property_name: The name of the property whose value is to be fetched. | |
:return: The value or values contained in the specified property, depending on type. | |
""" | |
# Check if the property exists in the page | |
if property_name not in page['properties']: | |
return None # or raise an error if you prefer | |
property_data = page['properties'][property_name] | |
prop_type = property_data['type'] | |
# Handle 'title' and 'rich_text' types | |
if prop_type in ['title', 'rich_text']: | |
return ''.join(text_block['text']['content'] for text_block in property_data[prop_type]) | |
# Handle 'number' type | |
elif prop_type == 'number': | |
return property_data[prop_type] | |
# Handle 'select' type | |
elif prop_type == 'select': | |
return property_data[prop_type]['name'] if property_data[prop_type] else None | |
# Handle 'multi_select' type | |
elif prop_type == 'multi_select': | |
return [option['name'] for option in property_data[prop_type]] | |
# Handle 'date' type | |
elif prop_type == 'date': | |
if property_data[prop_type]['end']: | |
return (property_data[prop_type]['start'], property_data[prop_type]['end']) | |
else: | |
return property_data[prop_type]['start'] | |
# Handle 'relation' type | |
elif prop_type == 'relation': | |
return [relation['id'] for relation in property_data[prop_type]] | |
# Handle 'people' type | |
elif prop_type == 'people': | |
return [person['name'] for person in property_data[prop_type] if 'name' in person] | |
# Add more handlers as needed for other property types | |
else: | |
# Return None or raise an error for unsupported property types | |
return None | |
def get_page_by_id(notion_db_pages, page_id): | |
for pg in notion_db_pages: | |
if pg["id"] == page_id: | |
return pg | |
""" | |
def streamMatrices (speckleToken, stream_id, branch_name_dm, commit_id): | |
#stream_id="ebcfc50abe" | |
stream_distance_matrices = speckle_utils.getSpeckleStream(stream_id, | |
branch_name_dm, | |
CLIENT, | |
commit_id = commit_id_dm) | |
return stream_distance_matrices | |
""" | |
def fetchDomainMapper (luAttributePages): | |
lu_domain_mapper ={} | |
subdomains_unique = [] | |
for page in lu_attributes: | |
value_landuse = get_property_value(page, "LANDUSE") | |
value_subdomain = get_property_value(page, "SUBDOMAIN_LIVEABILITY") | |
if value_subdomain and value_landuse: | |
lu_domain_mapper[value_landuse] = value_subdomain | |
if value_subdomain != "": | |
subdomains_unique.append(value_subdomain) | |
#subdomains_unique = list(set(subdomains_unique)) | |
return lu_domain_mapper | |
def fetchSubdomainMapper (livability_attributes): | |
attribute_mapper ={} | |
domains_unique = [] | |
for page in domain_attributes: | |
subdomain = get_property_value(page, "SUBDOMAIN_UNIQUE") | |
sqm_per_employee = get_property_value(page, "SQM PER EMPL") | |
thresholds = get_property_value(page, "MANHATTAN THRESHOLD") | |
max_points = get_property_value(page, "LIVABILITY MAX POINT") | |
domain = get_property_value(page, "DOMAIN") | |
if thresholds: | |
attribute_mapper[subdomain] = { | |
'sqmPerEmpl': [sqm_per_employee if sqm_per_employee != "" else 0], | |
'thresholds': thresholds, | |
'max_points': max_points, | |
'domain': [domain if domain != "" else 0] | |
} | |
if domain != "": | |
domains_unique.append(domain) | |
#domains_unique = list(set(domains_unique)) | |
return attribute_mapper | |
def fetchDistanceMatrices (stream_distance_matrices): | |
# navigate to list with speckle objects of interest | |
distance_matrices = {} | |
for distM in stream_distance_matrices["@Data"]['@{0}']: | |
for kk in distM.__dict__.keys(): | |
try: | |
if kk.split("+")[1].startswith("distance_matrix"): | |
distance_matrix_dict = json.loads(distM[kk]) | |
origin_ids = distance_matrix_dict["origin_uuid"] | |
destination_ids = distance_matrix_dict["destination_uuid"] | |
distance_matrix = distance_matrix_dict["matrix"] | |
# Convert the distance matrix to a DataFrame | |
df_distances = pd.DataFrame(distance_matrix, index=origin_ids, columns=destination_ids) | |
# i want to add the index & colum names to dist_m_csv | |
#distance_matrices[kk] = dist_m_csv[kk] | |
distance_matrices[kk] = df_distances | |
except: | |
pass | |
return distance_matrices | |
def splitDictByStrFragmentInColumnName(original_dict, substrings): | |
result_dicts = {substring: {} for substring in substrings} | |
for key, nested_dict in original_dict.items(): | |
for subkey, value in nested_dict.items(): | |
for substring in substrings: | |
if substring in subkey: | |
if key not in result_dicts[substring]: | |
result_dicts[substring][key] = {} | |
result_dicts[substring][key][subkey] = value | |
return result_dicts | |