Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, HTTPException | |
from pydantic import BaseModel, validator | |
from app.services.speckle_service import SpeckleService | |
from specklepy.objects import Base | |
from datetime import datetime | |
from typing import Any, List | |
import json | |
class GenericPayload(BaseModel): | |
payload: str | |
def validate_payload(cls, value): | |
# Parse the JSON string to ensure it's valid | |
try: | |
parsed = json.loads(value) | |
# You can add additional validation here if needed | |
except json.JSONDecodeError: | |
raise ValueError("payload must be a valid JSON string") | |
return value | |
def get_data(self): | |
return json.loads(self.payload) | |
router = APIRouter() | |
def extract_value(obj: Any, key: str) -> Any: | |
"""Recursively search for the value of a given key in nested objects""" | |
print(f"Extracting value for key: {key} from object: {obj}") | |
if isinstance(obj, dict): | |
for k, v in obj.items(): | |
print(f"Checking key: {k}, value: {v}") | |
if k == key: | |
print(f"Found key: {k}, returning value: {v}") | |
return v | |
elif isinstance(v, (dict, list)): | |
result = extract_value(v, key) | |
if result is not None: | |
return result | |
elif isinstance(obj, list): | |
for item in obj: | |
result = extract_value(item, key) | |
if result is not None: | |
return result | |
return None | |
async def create_graph(payload: GenericPayload): | |
try: | |
data = payload.get_data() | |
# Extract necessary fields from the payload | |
speckle_input_street_network = extract_value(data, 'speckleInput_StreetNetwork') | |
speckle_output_graph = extract_value(data, 'speckleOutput_Graph') | |
token = extract_value(data, 'token') | |
# Step 1: Create the client | |
client = SpeckleService.create_client(base_url="https://speckle.xyz", token=token) | |
# Step 2: Parse the Speckle URLs to get the components | |
input_speckle_url_info = SpeckleService.parse_speckle_url(speckle_input_street_network) | |
input_stream_id = input_speckle_url_info["streamID"] | |
input_branch_name = input_speckle_url_info["branchName"] | |
output_speckle_url_info = SpeckleService.parse_speckle_url(speckle_output_graph) | |
output_stream_id = output_speckle_url_info["streamID"] | |
output_branch_name = output_speckle_url_info["branchName"] | |
# Step 3: Fetch the Speckle branch | |
speckle_data, commit_id = SpeckleService.get_speckle_stream(input_stream_id, input_branch_name, client) | |
# Step 4: Generate metadata | |
metadata = SpeckleService.generate_metadata([speckle_input_street_network], token) | |
# Step 5: Create a dummy Speckle object | |
dummy_object = Base() | |
dummy_object["IWasCreatedBy"] = "createGraph" | |
dummy_object["MetaData"] = metadata | |
# Step 6: Update the Speckle stream | |
new_commit_id = SpeckleService.update_speckle_stream(output_stream_id, output_branch_name, client, dummy_object) | |
# Step 7: Create the report using metadata | |
current_time = datetime.utcnow().isoformat() | |
sources = [ | |
{ | |
"streamID": input_stream_id, | |
"branchName": input_branch_name, | |
"commitID": commit_id, | |
"time": current_time | |
} | |
] | |
targets = [ | |
{ | |
"streamID": output_stream_id, | |
"branchName": output_branch_name, | |
"commitID": new_commit_id, | |
"time": current_time | |
} | |
] | |
report = { | |
"method": "createGraph", | |
"sources": sources, | |
"targets": targets | |
} | |
return report | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |