Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, HTTPException | |
from pydantic import BaseModel, validator | |
from app.services.speckle_service import SpeckleService | |
from specklepy.objects import Base | |
from datetime import datetime | |
from typing import Any, List | |
import json | |
class GenericPayload(BaseModel): | |
payload: str | |
def validate_payload(cls, value): | |
# Parse the JSON string to ensure it's valid | |
try: | |
parsed = json.loads(value) | |
# You can add additional validation here if needed | |
except json.JSONDecodeError: | |
raise ValueError("payload must be a valid JSON string") | |
return value | |
def get_data(self): | |
return json.loads(self.payload) | |
router = APIRouter() | |
def extract_value(obj: Any, key: str) -> Any: | |
"""Recursively search for the value of a given key in nested objects""" | |
print(f"Extracting value for key: {key} from object: {obj}") | |
if isinstance(obj, dict): | |
for k, v in obj.items(): | |
print(f"Checking key: {k}, value: {v}") | |
if k == key: | |
print(f"Found key: {k}, returning value: {v}") | |
return v | |
elif isinstance(v, (dict, list)): | |
result = extract_value(v, key) | |
if result is not None: | |
return result | |
elif isinstance(obj, list): | |
for item in obj: | |
result = extract_value(item, key) | |
if result is not None: | |
return result | |
return None | |
async def patronage_analysis(payload: GenericPayload): | |
try: | |
data = payload.get_data() | |
# Log the received data | |
print(f"Received data: {data}") | |
# Log starting extraction | |
print("Starting extracting values") | |
# Extract necessary fields from the payload | |
try: | |
landuse_columns = extract_value(data, 'landuseColumns') | |
distance_threshold = extract_value(data, 'distanceThreshold') | |
normalise_results = extract_value(data, 'normaliseResults') | |
speckle_input_distance_matrix = extract_value(data, 'speckleInput_distanceMatrix') | |
speckle_input_buildings = extract_value(data, 'speckleInput_buildings') | |
speckle_output_patronage_result = extract_value(data, 'speckleOutput_patronageResult') | |
token = extract_value(data, 'token') | |
# Check if any of the extracted values are None | |
if None in [landuse_columns, distance_threshold, normalise_results, speckle_input_distance_matrix, speckle_input_buildings, speckle_output_patronage_result, token]: | |
raise ValueError("One or more keys not found in payload.") | |
except ValueError as e: | |
print(f"Error extracting values: {str(e)}") | |
raise HTTPException(status_code=400, detail=f"Key not found in payload: {str(e)}") | |
# Log extracted values | |
print(f"Extracted values: landuse_columns={landuse_columns}, distance_threshold={distance_threshold}, normalise_results={normalise_results}, speckle_input_distance_matrix={speckle_input_distance_matrix}, speckle_input_buildings={speckle_input_buildings}, speckle_output_patronage_result={speckle_output_patronage_result}, token={token}") | |
# Step 1: Create the client | |
client = SpeckleService.create_client(base_url="https://speckle.xyz", token=token) | |
# Step 2: Parse the Speckle URLs to get the components | |
distance_matrix_url_info = SpeckleService.parse_speckle_url(speckle_input_distance_matrix) | |
distance_matrix_stream_id = distance_matrix_url_info["streamID"] | |
distance_matrix_branch_name = distance_matrix_url_info["branchName"] | |
buildings_url_info = SpeckleService.parse_speckle_url(speckle_input_buildings) | |
buildings_stream_id = buildings_url_info["streamID"] | |
buildings_branch_name = buildings_url_info["branchName"] | |
output_url_info = SpeckleService.parse_speckle_url(speckle_output_patronage_result) | |
output_stream_id = output_url_info["streamID"] | |
output_branch_name = output_url_info["branchName"] | |
# Step 3: Fetch the Speckle branches | |
distance_matrix_data, distance_matrix_commit_id = SpeckleService.get_speckle_stream(distance_matrix_stream_id, distance_matrix_branch_name, client) | |
buildings_data, buildings_commit_id = SpeckleService.get_speckle_stream(buildings_stream_id, buildings_branch_name, client) | |
# Step 4: Generate metadata | |
metadata = SpeckleService.generate_metadata([speckle_input_distance_matrix, speckle_input_buildings], token) | |
# Step 5: Create a dummy Speckle object | |
dummy_object = Base() | |
dummy_object["IWasCreatedBy"] = "patronageAnalysis" | |
dummy_object["LanduseColumns"] = landuse_columns | |
dummy_object["DistanceThreshold"] = distance_threshold | |
dummy_object["NormaliseResults"] = normalise_results | |
dummy_object["MetaData"] = metadata | |
# Step 6: Update the Speckle stream | |
new_commit_id = SpeckleService.update_speckle_stream(output_stream_id, output_branch_name, client, dummy_object) | |
# Step 7: Create the report using metadata | |
current_time = datetime.utcnow().isoformat() | |
sources = [ | |
{ | |
"streamID": distance_matrix_stream_id, | |
"branchName": distance_matrix_branch_name, | |
"commitID": distance_matrix_commit_id, | |
"time": current_time | |
}, | |
{ | |
"streamID": buildings_stream_id, | |
"branchName": buildings_branch_name, | |
"commitID": buildings_commit_id, | |
"time": current_time | |
} | |
] | |
targets = [ | |
{ | |
"streamID": output_stream_id, | |
"branchName": output_branch_name, | |
"commitID": new_commit_id, | |
"time": current_time | |
} | |
] | |
report = { | |
"method": "patronageAnalysis", | |
"sources": sources, | |
"targets": targets | |
} | |
return report | |
except Exception as e: | |
print(f"Error occurred: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |