Spaces:
Running
Running
File size: 8,663 Bytes
9c95ead c6cf4a7 9c95ead 68394ea adb504f d3051c0 68394ea b1c8f17 26e0ddc d3051c0 68394ea 9c95ead 2bb5179 9c95ead 26e0ddc 9c95ead 2bb5179 9c95ead 26e0ddc 9c95ead 68394ea 5c18844 5c4af3f 68394ea 4d0ec3e 5c4af3f 68394ea 4d0ec3e 68394ea 9c95ead 68394ea 4d0ec3e 5c4af3f 68394ea 4d0ec3e 68394ea 2bb5179 9c95ead 6bfafac 9c95ead 6bfafac 9c95ead effe83d 58c3ea2 effe83d 01f7187 58c3ea2 effe83d ecd288b 58c3ea2 9c95ead 58c3ea2 9c95ead 58c3ea2 9c95ead bfcfdb0 9c95ead bfcfdb0 9c95ead 58c3ea2 9c95ead 963c80d 76466bd 58b4c6a 0f044f0 58b4c6a 9c95ead 58b4c6a 9c95ead 58b4c6a 9c95ead |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
## Digiyatra
from fastapi import FastAPI, Depends, BackgroundTasks, HTTPException, APIRouter, Query, Header
from pydantic import BaseModel
from typing import List, Dict, Optional, Union, Annotated, Any, get_args
from openai import AsyncOpenAI
from observability import LLMObservabilityManager, log_execution, logger
from aiclient import DatabaseManager, AIClient
from limit_tokens import trim_messages_openai
from prompts import FOLLOWUP_DIGIYATRA_PROMPT
from utils import parse_followup_and_tools
from sse_starlette.sse import EventSourceResponse
##
from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks
from fastapi.security import APIKeyHeader
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Literal, List, Dict
import os
from functools import lru_cache
from openai import OpenAI
from uuid import uuid4
import tiktoken
import sqlite3
import time
from datetime import datetime, timedelta
import pandas as pd
import requests
import json
import os
from pydantic import BaseModel, Field
import yaml
import json
from yaml.loader import SafeLoader
app = FastAPI(
title="Digiyatra Chatbot",
description="Digiyatra Chatbot",
version="1.0.0",
tags=["chat"],
contact={
"name": "Digiyatra",
"url": "https://digiyatra.com",
"email": "contact@digiyatra.com"
}
)
from observability_router import router as observability_router
from rag_routerv2 import router as rag_router, query_table, QueryTableResponse, get_db_connection
app.include_router(observability_router)
app.include_router(rag_router)
# SQLite setup
DB_PATH = '/data/conversations.db'
def init_db():
logger.info("Initializing database")
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS conversations
(id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT,
conversation_id TEXT,
message TEXT,
response TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''')
conn.commit()
conn.close()
logger.info("Database initialized successfully")
# In-memory storage for conversations
conversations: Dict[str, List[Dict[str, str]]] = {}
last_activity: Dict[str, float] = {}
from aiclient import AIClient
client = AIClient()
def update_db(user_id, conversation_id, message, response):
logger.info(f"Updating database for conversation: {conversation_id}")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response)
VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response))
conn.commit()
conn.close()
logger.info("Database updated successfully")
ModelID = Literal[
"openai/gpt-4o-mini",
"openai/chatgpt-4o-latest",
"deepseek/deepseek-chat",
"anthropic/claude-3.5-sonnet",
"anthropic/claude-3-haiku",
"meta-llama/llama-3.3-70b-instruct",
"google/gemini-2.0-flash-exp:free",
"google/gemini-flash-1.5",
"meta-llama/llama-3-70b-instruct",
"meta-llama/llama-3-8b-instruct",
]
class FollowupQueryModel(BaseModel):
query: str = Field(..., description="User's query for the followup agent")
model_id: ModelID = Field(
default="openai/gpt-4o-mini",
description="ID of the model to use for response generation"
)
conversation_id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the conversation")
user_id: str = Field(..., description="Unique identifier for the user")
table_id: Optional[str] = Field(None, description="Optional knowledge base table identifier for the query")
class Config:
schema_extra = {
"example": {
"query": "How can I improve my productivity?",
"model_id": "openai/gpt-4o-mini",
"conversation_id": "123e4567-e89b-12d3-a456-426614174000",
"user_id": "user123",
"table_id":"digiyatra"
}
}
async def digiyatra_query_table(query: FollowupQueryModel, db: Annotated[Any, Depends(get_db_connection)], limit: Optional[int] = 5):
"""Query the digiyatra table."""
if query.table_id is None:
query.table_id = "digiyatra"
response = await query_table(
table_id=query.table_id,
query=query.query,
user_id=query.user_id,
limit=limit
)
return response.results['data'][0]['text']
@app.post("/digiyatra-followup")
async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks):
"""
Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries.
Requires API Key authentication via X-API-Key header.
"""
try:
logger.info(f"Received followup agent query: {query.query}")
if query.conversation_id not in conversations:
conversations[query.conversation_id] = [
{"role": "system", "content": FOLLOWUP_DIGIYATRA_PROMPT}
]
digiyatra_response = await digiyatra_query_table(query, db=get_db_connection(), limit=5)
user_query_with_context = f"{query.query} \n\n FAQ Context for ANSWERING: {digiyatra_response}"
conversations[query.conversation_id].append({"role": "user", "content": user_query_with_context})
last_activity[query.conversation_id] = time.time()
# Limit tokens in the conversation history
limited_conversation = conversations[query.conversation_id]
async def process_response():
try:
full_response = ""
async for content in client.generate_response(limited_conversation, model=query.model_id, conversation_id=query.conversation_id, user=query.user_id):
full_response += content
yield f"{json.dumps({'type': 'token', 'content': content})}"
logger.info(f"LLM RAW response for query: {query.query}: {full_response}")
response_content, interact, tools = parse_followup_and_tools(full_response)
result = {
"response": response_content,
"clarification": interact
}
yield f"{json.dumps({'type': 'metadata', 'response_full': result})}"
# Add the assistant's response to the conversation history
conversations[query.conversation_id].append({"role": "assistant", "content": full_response})
background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response)
logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}")
except Exception as e:
logger.error(f"Error during response processing: {str(e)}")
yield f"{json.dumps({'type': 'error', 'message': 'An error occurred while processing the response.'})}"
return EventSourceResponse(process_response(), media_type="text/event-stream")
except Exception as e:
logger.error(f"Error in followup_agent: {str(e)}")
raise HTTPException(status_code=500, detail="An error occurred while processing the followup agent request.")
class ModelsResponse(BaseModel):
models: List[str]
@app.get("/models", response_model=ModelsResponse)
def get_models():
"""
Endpoint to return a list of available model IDs dynamically extracted from the ModelID Literal.
"""
try:
# Dynamically extract the literal values of ModelID
model_list = list(get_args(ModelID))
return ModelsResponse(models=model_list)
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred while fetching the models: {str(e)}")
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
def startup():
logger.info("Starting up the application")
init_db()
@app.on_event("shutdown")
def shutdown():
logger.info("Shutting down the application")
# import uvicorn
# if __name__ == "__main__":
# uvicorn.run(
# "app:app",
# host="0.0.0.0",
# port=8000,
# workers=4,
# reload=False,
# access_log=False
# ) |