pvanand's picture
Upload 11 files
1a6d961 verified
raw
history blame
5.3 kB
# aiclient.py
import os
import time
import json
from typing import List, Dict, Optional, Union, AsyncGenerator
from openai import AsyncOpenAI
from starlette.responses import StreamingResponse
from observability import log_execution ,LLMObservabilityManager
import psycopg2
import requests
from functools import lru_cache
import logging
import pandas as pd
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def get_model_info():
try:
model_info_dict = requests.get(
'https://openrouter.ai/api/v1/models',
headers={'accept': 'application/json'}
).json()["data"]
# Save the model info to a JSON file
with open('model_info.json', 'w') as json_file:
json.dump(model_info_dict, json_file, indent=4)
except Exception as e:
logger.error(f"Failed to fetch model info: {e}. Loading from file.")
if os.path.exists('model_info.json'):
with open('model_info.json', 'r') as json_file:
model_info_dict = json.load(json_file)
model_info = pd.DataFrame(model_info_dict)
return model_info
else:
logger.error("No model info file found")
return None
model_info = pd.DataFrame(model_info_dict)
return model_info
class AIClient:
def __init__(self):
self.client = AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.environ['OPENROUTER_API_KEY']
)
self.observability_manager = LLMObservabilityManager()
self.model_info = get_model_info()
#@log_execution
async def generate_response(
self,
messages: List[Dict[str, str]],
model: str = "openai/gpt-4o-mini",
max_tokens: int = 32000,
conversation_id: Optional[str] = None,
user: str = "anonymous"
) -> AsyncGenerator[str, None]:
if not messages:
return
start_time = time.time()
full_response = ""
usage = {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0}
status = "success"
try:
response = await self.client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
stream=True,
stream_options={"include_usage": True}
)
end_time = time.time()
latency = end_time - start_time
async for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
full_response += chunk.choices[0].delta.content
if chunk.usage:
model = chunk.model
usage["completion_tokens"] = chunk.usage.completion_tokens
usage["prompt_tokens"] = chunk.usage.prompt_tokens
usage["total_tokens"] = chunk.usage.total_tokens
print(usage)
print(model)
except Exception as e:
status = "error"
full_response = str(e)
latency = time.time() - start_time
print(f"Error in generate_response: {e}")
finally:
# Log the observation
try:
pricing_data = self.model_info[self.model_info.id == model]["pricing"].values[0]
cost = float(pricing_data["completion"]) * float(usage["completion_tokens"]) + float(pricing_data["prompt"]) * float(usage["prompt_tokens"])
self.observability_manager.insert_observation(
response=full_response,
model=model,
completion_tokens=usage["completion_tokens"],
prompt_tokens=usage["prompt_tokens"],
total_tokens=usage["total_tokens"],
cost=cost,
conversation_id=conversation_id or "default",
status=status,
request=json.dumps([msg for msg in messages if msg.get('role') != 'system']),
latency=latency,
user=user
)
except Exception as obs_error:
print(f"Error logging observation: {obs_error}")
class DatabaseManager:
"""Manages database operations."""
def __init__(self):
self.db_params = {
"dbname": "postgres",
"user": os.environ['SUPABASE_USER'],
"password": os.environ['SUPABASE_PASSWORD'],
"host": "aws-0-us-west-1.pooler.supabase.com",
"port": "5432"
}
@log_execution
def update_database(self, user_id: str, user_query: str, response: str) -> None:
with psycopg2.connect(**self.db_params) as conn:
with conn.cursor() as cur:
insert_query = """
INSERT INTO ai_document_generator (user_id, user_query, response)
VALUES (%s, %s, %s);
"""
cur.execute(insert_query, (user_id, user_query, response))