rag_chat_with_analytics / observability_router.py
pvanand's picture
Update observability_router.py
040b5e1 verified
## OBSERVABILITY
from uuid import uuid4
import csv
from io import StringIO
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List, Dict, Optional, Any
from observability import LLMObservabilityManager
router = APIRouter(
prefix="/observability",
tags=["observability"]
)
class ObservationResponse(BaseModel):
observations: List[Dict]
def create_csv_response(observations: List[Dict]) -> StreamingResponse:
def iter_csv(data):
output = StringIO()
writer = csv.DictWriter(output, fieldnames=data[0].keys() if data else [])
writer.writeheader()
for row in data:
writer.writerow(row)
output.seek(0)
yield output.read()
headers = {
'Content-Disposition': 'attachment; filename="observations.csv"'
}
return StreamingResponse(iter_csv(observations), media_type="text/csv", headers=headers)
@router.get("/last-observations/{limit}")
async def get_last_observations(limit: int = 10, format: str = "json"):
observability_manager = LLMObservabilityManager()
try:
# Get all observations, sorted by created_at in descending order
all_observations = observability_manager.get_observations()
all_observations.sort(key=lambda x: x['created_at'], reverse=True)
# Get the last conversation_id
if all_observations:
last_conversation_id = all_observations[0]['conversation_id']
# Filter observations for the last conversation
last_conversation_observations = [
obs for obs in all_observations
if obs['conversation_id'] == last_conversation_id
][:limit]
if format.lower() == "csv":
return create_csv_response(last_conversation_observations)
else:
return ObservationResponse(observations=last_conversation_observations)
else:
if format.lower() == "csv":
return create_csv_response([])
else:
return ObservationResponse(observations=[])
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve observations: {str(e)}")
@router.get("/all-unique-observations")
async def get_all_unique_observations(limit: Optional[int] = None):
observability_manager = LLMObservabilityManager()
return ObservationResponse(observations=observability_manager.get_all_unique_conversation_observations(limit))
class StatisticsResponse(BaseModel):
statistics: Dict[str, Any]
@router.get("/export-statistics")
async def get_dashboard_data(
days: Optional[int] = 30,
time_series_interval: str = 'day',
):
"""
Get comprehensive Analytics data including statistics.
Args:
days: Number of days to look back for statistics
time_series_interval: Interval for time series data ('hour', 'day', 'week', 'month')
"""
try:
observability_manager = LLMObservabilityManager()
# Get dashboard statistics
statistics = observability_manager.get_dashboard_statistics(
days=days,
time_series_interval=time_series_interval
)
return StatisticsResponse(
statistics=statistics,
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve dashboard data: {str(e)}"
)