pvanand's picture
Update main.py
6bfafac verified
## Digiyatra
from fastapi import FastAPI, Depends, BackgroundTasks, HTTPException, APIRouter, Query, Header
from pydantic import BaseModel
from typing import List, Dict, Optional, Union, Annotated, Any, get_args
from openai import AsyncOpenAI
from observability import LLMObservabilityManager, log_execution, logger
from aiclient import DatabaseManager, AIClient
from limit_tokens import trim_messages_openai
from prompts import FOLLOWUP_DIGIYATRA_PROMPT
from utils import parse_followup_and_tools
from sse_starlette.sse import EventSourceResponse
##
from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks
from fastapi.security import APIKeyHeader
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Literal, List, Dict
import os
from functools import lru_cache
from openai import OpenAI
from uuid import uuid4
import tiktoken
import sqlite3
import time
from datetime import datetime, timedelta
import pandas as pd
import requests
import json
import os
from pydantic import BaseModel, Field
import yaml
import json
from yaml.loader import SafeLoader
app = FastAPI(
title="Digiyatra Chatbot",
description="Digiyatra Chatbot",
version="1.0.0",
tags=["chat"],
contact={
"name": "Digiyatra",
"url": "https://digiyatra.com",
"email": "contact@digiyatra.com"
}
)
from observability_router import router as observability_router
from rag_routerv2 import router as rag_router, query_table, QueryTableResponse, get_db_connection
app.include_router(observability_router)
app.include_router(rag_router)
# SQLite setup
DB_PATH = '/data/conversations.db'
def init_db():
logger.info("Initializing database")
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS conversations
(id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT,
conversation_id TEXT,
message TEXT,
response TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''')
conn.commit()
conn.close()
logger.info("Database initialized successfully")
# In-memory storage for conversations
conversations: Dict[str, List[Dict[str, str]]] = {}
last_activity: Dict[str, float] = {}
from aiclient import AIClient
client = AIClient()
def update_db(user_id, conversation_id, message, response):
logger.info(f"Updating database for conversation: {conversation_id}")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response)
VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response))
conn.commit()
conn.close()
logger.info("Database updated successfully")
ModelID = Literal[
"openai/gpt-4o-mini",
"openai/chatgpt-4o-latest",
"deepseek/deepseek-chat",
"anthropic/claude-3.5-sonnet",
"anthropic/claude-3-haiku",
"meta-llama/llama-3.3-70b-instruct",
"google/gemini-2.0-flash-exp:free",
"google/gemini-flash-1.5",
"meta-llama/llama-3-70b-instruct",
"meta-llama/llama-3-8b-instruct",
]
class FollowupQueryModel(BaseModel):
query: str = Field(..., description="User's query for the followup agent")
model_id: ModelID = Field(
default="openai/gpt-4o-mini",
description="ID of the model to use for response generation"
)
conversation_id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the conversation")
user_id: str = Field(..., description="Unique identifier for the user")
table_id: Optional[str] = Field(None, description="Optional knowledge base table identifier for the query")
class Config:
schema_extra = {
"example": {
"query": "How can I improve my productivity?",
"model_id": "openai/gpt-4o-mini",
"conversation_id": "123e4567-e89b-12d3-a456-426614174000",
"user_id": "user123",
"table_id":"digiyatra"
}
}
async def digiyatra_query_table(query: FollowupQueryModel, db: Annotated[Any, Depends(get_db_connection)], limit: Optional[int] = 5):
"""Query the digiyatra table."""
if query.table_id is None:
query.table_id = "digiyatra"
response = await query_table(
table_id=query.table_id,
query=query.query,
user_id=query.user_id,
limit=limit
)
return response.results['data'][0]['text']
@app.post("/digiyatra-followup")
async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks):
"""
Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries.
Requires API Key authentication via X-API-Key header.
"""
try:
logger.info(f"Received followup agent query: {query.query}")
if query.conversation_id not in conversations:
conversations[query.conversation_id] = [
{"role": "system", "content": FOLLOWUP_DIGIYATRA_PROMPT}
]
digiyatra_response = await digiyatra_query_table(query, db=get_db_connection(), limit=5)
user_query_with_context = f"{query.query} \n\n FAQ Context for ANSWERING: {digiyatra_response}"
conversations[query.conversation_id].append({"role": "user", "content": user_query_with_context})
last_activity[query.conversation_id] = time.time()
# Limit tokens in the conversation history
limited_conversation = conversations[query.conversation_id]
async def process_response():
try:
full_response = ""
async for content in client.generate_response(limited_conversation, model=query.model_id, conversation_id=query.conversation_id, user=query.user_id):
full_response += content
yield f"{json.dumps({'type': 'token', 'content': content})}"
logger.info(f"LLM RAW response for query: {query.query}: {full_response}")
response_content, interact, tools = parse_followup_and_tools(full_response)
result = {
"response": response_content,
"clarification": interact
}
yield f"{json.dumps({'type': 'metadata', 'response_full': result})}"
# Add the assistant's response to the conversation history
conversations[query.conversation_id].append({"role": "assistant", "content": full_response})
background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response)
logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}")
except Exception as e:
logger.error(f"Error during response processing: {str(e)}")
yield f"{json.dumps({'type': 'error', 'message': 'An error occurred while processing the response.'})}"
return EventSourceResponse(process_response(), media_type="text/event-stream")
except Exception as e:
logger.error(f"Error in followup_agent: {str(e)}")
raise HTTPException(status_code=500, detail="An error occurred while processing the followup agent request.")
class ModelsResponse(BaseModel):
models: List[str]
@app.get("/models", response_model=ModelsResponse)
def get_models():
"""
Endpoint to return a list of available model IDs dynamically extracted from the ModelID Literal.
"""
try:
# Dynamically extract the literal values of ModelID
model_list = list(get_args(ModelID))
return ModelsResponse(models=model_list)
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred while fetching the models: {str(e)}")
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
def startup():
logger.info("Starting up the application")
init_db()
@app.on_event("shutdown")
def shutdown():
logger.info("Shutting down the application")
# import uvicorn
# if __name__ == "__main__":
# uvicorn.run(
# "app:app",
# host="0.0.0.0",
# port=8000,
# workers=4,
# reload=False,
# access_log=False
# )