## OBSERVABILITY from uuid import uuid4 import csv from io import StringIO from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import List, Dict, Optional, Any from observability import LLMObservabilityManager router = APIRouter( prefix="/observability", tags=["observability"] ) class ObservationResponse(BaseModel): observations: List[Dict] def create_csv_response(observations: List[Dict]) -> StreamingResponse: def iter_csv(data): output = StringIO() writer = csv.DictWriter(output, fieldnames=data[0].keys() if data else []) writer.writeheader() for row in data: writer.writerow(row) output.seek(0) yield output.read() headers = { 'Content-Disposition': 'attachment; filename="observations.csv"' } return StreamingResponse(iter_csv(observations), media_type="text/csv", headers=headers) @router.get("/last-observations/{limit}") async def get_last_observations(limit: int = 10, format: str = "json"): observability_manager = LLMObservabilityManager() try: # Get all observations, sorted by created_at in descending order all_observations = observability_manager.get_observations() all_observations.sort(key=lambda x: x['created_at'], reverse=True) # Get the last conversation_id if all_observations: last_conversation_id = all_observations[0]['conversation_id'] # Filter observations for the last conversation last_conversation_observations = [ obs for obs in all_observations if obs['conversation_id'] == last_conversation_id ][:limit] if format.lower() == "csv": return create_csv_response(last_conversation_observations) else: return ObservationResponse(observations=last_conversation_observations) else: if format.lower() == "csv": return create_csv_response([]) else: return ObservationResponse(observations=[]) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to retrieve observations: {str(e)}") @router.get("/all-unique-observations") async def get_all_unique_observations(limit: Optional[int] = None): observability_manager = LLMObservabilityManager() return ObservationResponse(observations=observability_manager.get_all_unique_conversation_observations(limit)) class StatisticsResponse(BaseModel): statistics: Dict[str, Any] @router.get("/export-statistics") async def get_dashboard_data( days: Optional[int] = 30, time_series_interval: str = 'day', ): """ Get comprehensive Analytics data including statistics. Args: days: Number of days to look back for statistics time_series_interval: Interval for time series data ('hour', 'day', 'week', 'month') """ try: observability_manager = LLMObservabilityManager() # Get dashboard statistics statistics = observability_manager.get_dashboard_statistics( days=days, time_series_interval=time_series_interval ) return StatisticsResponse( statistics=statistics, ) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to retrieve dashboard data: {str(e)}" )