n8nSpeckle / app /endpoints /patronageAnalysis.py
serJD's picture
Update app/endpoints/patronageAnalysis.py
126f2a6 verified
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, validator
from app.services.speckle_service import SpeckleService
from specklepy.objects import Base
from datetime import datetime
from typing import Any, List
import json
class GenericPayload(BaseModel):
payload: str
@validator("payload")
def validate_payload(cls, value):
# Parse the JSON string to ensure it's valid
try:
parsed = json.loads(value)
# You can add additional validation here if needed
except json.JSONDecodeError:
raise ValueError("payload must be a valid JSON string")
return value
def get_data(self):
return json.loads(self.payload)
router = APIRouter()
def extract_value(obj: Any, key: str) -> Any:
"""Recursively search for the value of a given key in nested objects"""
print(f"Extracting value for key: {key} from object: {obj}")
if isinstance(obj, dict):
for k, v in obj.items():
print(f"Checking key: {k}, value: {v}")
if k == key:
print(f"Found key: {k}, returning value: {v}")
return v
elif isinstance(v, (dict, list)):
result = extract_value(v, key)
if result is not None:
return result
elif isinstance(obj, list):
for item in obj:
result = extract_value(item, key)
if result is not None:
return result
return None
@router.post("/patronageAnalysis")
async def patronage_analysis(payload: GenericPayload):
try:
data = payload.get_data()
# Log the received data
print(f"Received data: {data}")
# Log starting extraction
print("Starting extracting values")
# Extract necessary fields from the payload
try:
landuse_columns = extract_value(data, 'landuseColumns')
distance_threshold = extract_value(data, 'distanceThreshold')
normalise_results = extract_value(data, 'normaliseResults')
speckle_input_distance_matrix = extract_value(data, 'speckleInput_distanceMatrix')
speckle_input_buildings = extract_value(data, 'speckleInput_buildings')
speckle_output_patronage_result = extract_value(data, 'speckleOutput_patronageResult')
token = extract_value(data, 'token')
# Check if any of the extracted values are None
if None in [landuse_columns, distance_threshold, normalise_results, speckle_input_distance_matrix, speckle_input_buildings, speckle_output_patronage_result, token]:
raise ValueError("One or more keys not found in payload.")
except ValueError as e:
print(f"Error extracting values: {str(e)}")
raise HTTPException(status_code=400, detail=f"Key not found in payload: {str(e)}")
# Log extracted values
print(f"Extracted values: landuse_columns={landuse_columns}, distance_threshold={distance_threshold}, normalise_results={normalise_results}, speckle_input_distance_matrix={speckle_input_distance_matrix}, speckle_input_buildings={speckle_input_buildings}, speckle_output_patronage_result={speckle_output_patronage_result}, token={token}")
# Step 1: Create the client
client = SpeckleService.create_client(base_url="https://speckle.xyz", token=token)
# Step 2: Parse the Speckle URLs to get the components
distance_matrix_url_info = SpeckleService.parse_speckle_url(speckle_input_distance_matrix)
distance_matrix_stream_id = distance_matrix_url_info["streamID"]
distance_matrix_branch_name = distance_matrix_url_info["branchName"]
buildings_url_info = SpeckleService.parse_speckle_url(speckle_input_buildings)
buildings_stream_id = buildings_url_info["streamID"]
buildings_branch_name = buildings_url_info["branchName"]
output_url_info = SpeckleService.parse_speckle_url(speckle_output_patronage_result)
output_stream_id = output_url_info["streamID"]
output_branch_name = output_url_info["branchName"]
# Step 3: Fetch the Speckle branches
distance_matrix_data, distance_matrix_commit_id = SpeckleService.get_speckle_stream(distance_matrix_stream_id, distance_matrix_branch_name, client)
buildings_data, buildings_commit_id = SpeckleService.get_speckle_stream(buildings_stream_id, buildings_branch_name, client)
# Step 4: Generate metadata
metadata = SpeckleService.generate_metadata([speckle_input_distance_matrix, speckle_input_buildings], token)
# Step 5: Create a dummy Speckle object
dummy_object = Base()
dummy_object["IWasCreatedBy"] = "patronageAnalysis"
dummy_object["LanduseColumns"] = landuse_columns
dummy_object["DistanceThreshold"] = distance_threshold
dummy_object["NormaliseResults"] = normalise_results
dummy_object["MetaData"] = metadata
# Step 6: Update the Speckle stream
new_commit_id = SpeckleService.update_speckle_stream(output_stream_id, output_branch_name, client, dummy_object)
# Step 7: Create the report using metadata
current_time = datetime.utcnow().isoformat()
sources = [
{
"streamID": distance_matrix_stream_id,
"branchName": distance_matrix_branch_name,
"commitID": distance_matrix_commit_id,
"time": current_time
},
{
"streamID": buildings_stream_id,
"branchName": buildings_branch_name,
"commitID": buildings_commit_id,
"time": current_time
}
]
targets = [
{
"streamID": output_stream_id,
"branchName": output_branch_name,
"commitID": new_commit_id,
"time": current_time
}
]
report = {
"method": "patronageAnalysis",
"sources": sources,
"targets": targets
}
return report
except Exception as e:
print(f"Error occurred: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))