Spaces:
Running
Running
## Digiyatra | |
from fastapi import FastAPI, Depends, BackgroundTasks, HTTPException, APIRouter, Query, Header | |
from pydantic import BaseModel | |
from typing import List, Dict, Optional, Union, Annotated, Any, get_args | |
from openai import AsyncOpenAI | |
from observability import LLMObservabilityManager, log_execution, logger | |
from aiclient import DatabaseManager, AIClient | |
from limit_tokens import trim_messages_openai | |
from prompts import FOLLOWUP_DIGIYATRA_PROMPT | |
from utils import parse_followup_and_tools | |
from sse_starlette.sse import EventSourceResponse | |
## | |
from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks | |
from fastapi.security import APIKeyHeader | |
from fastapi.responses import StreamingResponse | |
from pydantic import BaseModel, Field | |
from typing import Literal, List, Dict | |
import os | |
from functools import lru_cache | |
from openai import OpenAI | |
from uuid import uuid4 | |
import tiktoken | |
import sqlite3 | |
import time | |
from datetime import datetime, timedelta | |
import pandas as pd | |
import requests | |
import json | |
import os | |
from pydantic import BaseModel, Field | |
import yaml | |
import json | |
from yaml.loader import SafeLoader | |
app = FastAPI( | |
title="Digiyatra Chatbot", | |
description="Digiyatra Chatbot", | |
version="1.0.0", | |
tags=["chat"], | |
contact={ | |
"name": "Digiyatra", | |
"url": "https://digiyatra.com", | |
"email": "contact@digiyatra.com" | |
} | |
) | |
from observability_router import router as observability_router | |
from rag_routerv2 import router as rag_router, query_table, QueryTableResponse, get_db_connection | |
app.include_router(observability_router) | |
app.include_router(rag_router) | |
# SQLite setup | |
DB_PATH = '/data/conversations.db' | |
def init_db(): | |
logger.info("Initializing database") | |
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) | |
conn = sqlite3.connect(DB_PATH) | |
c = conn.cursor() | |
c.execute('''CREATE TABLE IF NOT EXISTS conversations | |
(id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id TEXT, | |
conversation_id TEXT, | |
message TEXT, | |
response TEXT, | |
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') | |
conn.commit() | |
conn.close() | |
logger.info("Database initialized successfully") | |
# In-memory storage for conversations | |
conversations: Dict[str, List[Dict[str, str]]] = {} | |
last_activity: Dict[str, float] = {} | |
from aiclient import AIClient | |
client = AIClient() | |
def update_db(user_id, conversation_id, message, response): | |
logger.info(f"Updating database for conversation: {conversation_id}") | |
conn = sqlite3.connect(DB_PATH) | |
c = conn.cursor() | |
c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response) | |
VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response)) | |
conn.commit() | |
conn.close() | |
logger.info("Database updated successfully") | |
ModelID = Literal[ | |
"openai/gpt-4o-mini", | |
"openai/chatgpt-4o-latest", | |
"deepseek/deepseek-chat", | |
"anthropic/claude-3.5-sonnet", | |
"anthropic/claude-3-haiku", | |
"meta-llama/llama-3.3-70b-instruct", | |
"google/gemini-2.0-flash-exp:free", | |
"google/gemini-flash-1.5", | |
"meta-llama/llama-3-70b-instruct", | |
"meta-llama/llama-3-8b-instruct", | |
] | |
class FollowupQueryModel(BaseModel): | |
query: str = Field(..., description="User's query for the followup agent") | |
model_id: ModelID = Field( | |
default="openai/gpt-4o-mini", | |
description="ID of the model to use for response generation" | |
) | |
conversation_id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the conversation") | |
user_id: str = Field(..., description="Unique identifier for the user") | |
table_id: Optional[str] = Field(None, description="Optional knowledge base table identifier for the query") | |
class Config: | |
schema_extra = { | |
"example": { | |
"query": "How can I improve my productivity?", | |
"model_id": "openai/gpt-4o-mini", | |
"conversation_id": "123e4567-e89b-12d3-a456-426614174000", | |
"user_id": "user123", | |
"table_id":"digiyatra" | |
} | |
} | |
async def digiyatra_query_table(query: FollowupQueryModel, db: Annotated[Any, Depends(get_db_connection)], limit: Optional[int] = 5): | |
"""Query the digiyatra table.""" | |
if query.table_id is None: | |
query.table_id = "digiyatra" | |
response = await query_table( | |
table_id=query.table_id, | |
query=query.query, | |
user_id=query.user_id, | |
limit=limit | |
) | |
return response.results['data'][0]['text'] | |
async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks): | |
""" | |
Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. | |
Requires API Key authentication via X-API-Key header. | |
""" | |
try: | |
logger.info(f"Received followup agent query: {query.query}") | |
if query.conversation_id not in conversations: | |
conversations[query.conversation_id] = [ | |
{"role": "system", "content": FOLLOWUP_DIGIYATRA_PROMPT} | |
] | |
digiyatra_response = await digiyatra_query_table(query, db=get_db_connection(), limit=5) | |
user_query_with_context = f"{query.query} \n\n FAQ Context for ANSWERING: {digiyatra_response}" | |
conversations[query.conversation_id].append({"role": "user", "content": user_query_with_context}) | |
last_activity[query.conversation_id] = time.time() | |
# Limit tokens in the conversation history | |
limited_conversation = conversations[query.conversation_id] | |
async def process_response(): | |
try: | |
full_response = "" | |
async for content in client.generate_response(limited_conversation, model=query.model_id, conversation_id=query.conversation_id, user=query.user_id): | |
full_response += content | |
yield f"{json.dumps({'type': 'token', 'content': content})}" | |
logger.info(f"LLM RAW response for query: {query.query}: {full_response}") | |
response_content, interact, tools = parse_followup_and_tools(full_response) | |
result = { | |
"response": response_content, | |
"clarification": interact | |
} | |
yield f"{json.dumps({'type': 'metadata', 'response_full': result})}" | |
# Add the assistant's response to the conversation history | |
conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) | |
logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}") | |
except Exception as e: | |
logger.error(f"Error during response processing: {str(e)}") | |
yield f"{json.dumps({'type': 'error', 'message': 'An error occurred while processing the response.'})}" | |
return EventSourceResponse(process_response(), media_type="text/event-stream") | |
except Exception as e: | |
logger.error(f"Error in followup_agent: {str(e)}") | |
raise HTTPException(status_code=500, detail="An error occurred while processing the followup agent request.") | |
class ModelsResponse(BaseModel): | |
models: List[str] | |
def get_models(): | |
""" | |
Endpoint to return a list of available model IDs dynamically extracted from the ModelID Literal. | |
""" | |
try: | |
# Dynamically extract the literal values of ModelID | |
model_list = list(get_args(ModelID)) | |
return ModelsResponse(models=model_list) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"An error occurred while fetching the models: {str(e)}") | |
from fastapi.middleware.cors import CORSMiddleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
def startup(): | |
logger.info("Starting up the application") | |
init_db() | |
def shutdown(): | |
logger.info("Shutting down the application") | |
# import uvicorn | |
# if __name__ == "__main__": | |
# uvicorn.run( | |
# "app:app", | |
# host="0.0.0.0", | |
# port=8000, | |
# workers=4, | |
# reload=False, | |
# access_log=False | |
# ) |