Spaces:
Sleeping
Sleeping
## OBSERVABILITY | |
from uuid import uuid4 | |
import csv | |
from io import StringIO | |
from fastapi import APIRouter, HTTPException | |
from fastapi.responses import StreamingResponse | |
from pydantic import BaseModel | |
from typing import List, Dict, Optional, Any | |
from observability import LLMObservabilityManager | |
router = APIRouter( | |
prefix="/observability", | |
tags=["observability"] | |
) | |
class ObservationResponse(BaseModel): | |
observations: List[Dict] | |
def create_csv_response(observations: List[Dict]) -> StreamingResponse: | |
def iter_csv(data): | |
output = StringIO() | |
writer = csv.DictWriter(output, fieldnames=data[0].keys() if data else []) | |
writer.writeheader() | |
for row in data: | |
writer.writerow(row) | |
output.seek(0) | |
yield output.read() | |
headers = { | |
'Content-Disposition': 'attachment; filename="observations.csv"' | |
} | |
return StreamingResponse(iter_csv(observations), media_type="text/csv", headers=headers) | |
async def get_last_observations(limit: int = 10, format: str = "json"): | |
observability_manager = LLMObservabilityManager() | |
try: | |
# Get all observations, sorted by created_at in descending order | |
all_observations = observability_manager.get_observations() | |
all_observations.sort(key=lambda x: x['created_at'], reverse=True) | |
# Get the last conversation_id | |
if all_observations: | |
last_conversation_id = all_observations[0]['conversation_id'] | |
# Filter observations for the last conversation | |
last_conversation_observations = [ | |
obs for obs in all_observations | |
if obs['conversation_id'] == last_conversation_id | |
][:limit] | |
if format.lower() == "csv": | |
return create_csv_response(last_conversation_observations) | |
else: | |
return ObservationResponse(observations=last_conversation_observations) | |
else: | |
if format.lower() == "csv": | |
return create_csv_response([]) | |
else: | |
return ObservationResponse(observations=[]) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Failed to retrieve observations: {str(e)}") | |
async def get_all_unique_observations(limit: Optional[int] = None): | |
observability_manager = LLMObservabilityManager() | |
return ObservationResponse(observations=observability_manager.get_all_unique_conversation_observations(limit)) | |
class StatisticsResponse(BaseModel): | |
statistics: Dict[str, Any] | |
async def get_dashboard_data( | |
days: Optional[int] = 30, | |
time_series_interval: str = 'day', | |
): | |
""" | |
Get comprehensive Analytics data including statistics. | |
Args: | |
days: Number of days to look back for statistics | |
time_series_interval: Interval for time series data ('hour', 'day', 'week', 'month') | |
""" | |
try: | |
observability_manager = LLMObservabilityManager() | |
# Get dashboard statistics | |
statistics = observability_manager.get_dashboard_statistics( | |
days=days, | |
time_series_interval=time_series_interval | |
) | |
return StatisticsResponse( | |
statistics=statistics, | |
) | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Failed to retrieve dashboard data: {str(e)}" | |
) | |