## OBSERVABILITY from uuid import uuid4 import csv from io import StringIO from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import List, Dict, Optional from observability import LLMObservabilityManager router = APIRouter( prefix="/observability", tags=["observability"] ) class ObservationResponse(BaseModel): observations: List[Dict] def create_csv_response(observations: List[Dict]) -> StreamingResponse: def iter_csv(data): output = StringIO() writer = csv.DictWriter(output, fieldnames=data[0].keys() if data else []) writer.writeheader() for row in data: writer.writerow(row) output.seek(0) yield output.read() headers = { 'Content-Disposition': 'attachment; filename="observations.csv"' } return StreamingResponse(iter_csv(observations), media_type="text/csv", headers=headers) @router.get("/last-observations/{limit}") async def get_last_observations(limit: int = 10, format: str = "json"): observability_manager = LLMObservabilityManager() try: # Get all observations, sorted by created_at in descending order all_observations = observability_manager.get_observations() all_observations.sort(key=lambda x: x['created_at'], reverse=True) # Get the last conversation_id if all_observations: last_conversation_id = all_observations[0]['conversation_id'] # Filter observations for the last conversation last_conversation_observations = [ obs for obs in all_observations if obs['conversation_id'] == last_conversation_id ][:limit] if format.lower() == "csv": return create_csv_response(last_conversation_observations) else: return ObservationResponse(observations=last_conversation_observations) else: if format.lower() == "csv": return create_csv_response([]) else: return ObservationResponse(observations=[]) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to retrieve observations: {str(e)}") @router.get("/all-unique-observations") async def get_all_unique_observations(limit: Optional[int] = None): observability_manager = LLMObservabilityManager() return ObservationResponse(observations=observability_manager.get_all_unique_conversation_observations(limit))