import os import json import random import string import sqlite3 from datetime import datetime, timezone from fastapi import Query from typing import List, Dict, Any from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel from apscheduler.schedulers.asyncio import AsyncIOScheduler from huggingface_hub import AsyncInferenceClient app = FastAPI() # Configuration models = [ "meta-llama/Llama-3.1-8B-Instruct", "meta-llama/Llama-3.1-70B-Instruct", "meta-llama/Meta-Llama-3-8B-Instruct", "meta-llama/Meta-Llama-3-70B-Instruct", "meta-llama/Llama-Guard-3-8B", "meta-llama/Llama-2-7b-chat-hf", "meta-llama/Llama-2-13b-chat-hf", "deepseek-ai/DeepSeek-Coder-V2-Instruct", "mistralai/Mistral-7B-Instruct-v0.3", "mistralai/Mixtral-8x7B-Instruct-v0.1", ] LOG_FILE = "/data/api_logs.json" DB_FILE = "/data/api_logs.db" client = AsyncInferenceClient(token=os.environ["HF_INFERENCE_API_TOKEN"]) # Ensure log file exists if not os.path.exists(LOG_FILE): with open(LOG_FILE, "w") as f: json.dump([], f) # Initialize SQLite database def init_db(): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS api_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, model TEXT, success BOOLEAN, timestamp TEXT, failure_message TEXT, response_data TEXT ) ''') conn.commit() conn.close() init_db() class LogEntry(BaseModel): model: str success: bool timestamp: str failure_message: str response_data: Dict[str, Any] = None def random_string(length=10): characters = string.ascii_letters + string.digits return ''.join(random.choice(characters) for _ in range(length)) def log_to_sqlite(entry: LogEntry): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute(''' INSERT INTO api_logs (model, success, timestamp, failure_message, response_data) VALUES (?, ?, ?, ?, ?) ''', ( entry.model, entry.success, entry.timestamp, entry.failure_message, json.dumps(entry.response_data) if entry.response_data else None )) conn.commit() conn.close() async def check_apis(): results = [] for model in models: try: response = await client.chat_completion( messages=[{"role": "user", "content": f"{random_string()}\nWhat is the capital of France?"}], max_tokens=10, ) success = True response_data = response e = 'success' except Exception as e: print(e) success = False response_data = None log_entry = LogEntry( model=model, success=success, timestamp=datetime.now(timezone.utc).isoformat(), failure_message=str(e) if not success else "", response_data=dict(response_data) ) results.append(log_entry) log_to_sqlite(log_entry) with open(LOG_FILE, "r+") as f: logs = json.load(f) logs.extend([result.dict() for result in results]) f.seek(0) f.truncate() json.dump(logs, f, indent=2) @app.on_event("startup") async def start_scheduler(): scheduler = AsyncIOScheduler() scheduler.add_job(check_apis, 'interval', minutes=10) scheduler.start() @app.get("/api/models") async def get_models(): return models @app.get("/") async def index(): return FileResponse("static/index.html") @app.get("/api/logs", response_model=List[LogEntry]) async def get_logs( model: str = Query(None, description="Filter by model name"), start: str = Query(None, description="Start time for filtering (ISO format)"), end: str = Query(None, description="End time for filtering (ISO format)") ): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() query = "SELECT * FROM api_logs" params = [] if any([model, start, end]): query += " WHERE" if model: query += " model = ?" params.append(model) if start: if not query.endswith("WHERE"): query += " AND" query += " timestamp >= ?" params.append(start) if end: if not query.endswith("WHERE"): query += " AND" query += " timestamp <= ?" params.append(end) query += " ORDER BY timestamp DESC LIMIT 500" print(query, params) cursor.execute(query, params) logs = cursor.fetchall() conn.close() return [LogEntry( model=log[1], success=log[2], timestamp=log[3], failure_message=log[4], response_data=json.loads(log[5]) if log[5] else None ) for log in logs] @app.get("/api/db-logs") async def get_db_logs(): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute("SELECT * FROM api_logs ORDER BY timestamp DESC LIMIT 100") logs = cursor.fetchall() conn.close() return [{"id": log[0], "model": log[1], "success": log[2], "timestamp": log[3], "failure_message": log[4], "response_data": json.loads(log[5]) if log[5] else None} for log in logs] @app.get("/api/chart-data", response_model=Dict[str, Dict[str, Dict[str, List]]]) async def get_chart_data(): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute("SELECT model, success, timestamp FROM api_logs ORDER BY timestamp") logs = cursor.fetchall() conn.close() chart_data = {} for log in logs: model, success, timestamp = log if model not in chart_data: chart_data[model] = { 'success': {'x': [], 'y': []}, 'failure': {'x': [], 'y': []} } status = 'success' if success else 'failure' chart_data[model][status]['x'].append(timestamp) chart_data[model][status]['y'].append(1) return chart_data # Mount the static files directory app.mount("/static", StaticFiles(directory="static"), name="static") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)