## Digiyatra from fastapi import FastAPI, Depends, BackgroundTasks, HTTPException, APIRouter, Query, Header from pydantic import BaseModel from typing import List, Dict, Optional, Union, Annotated, Any, get_args from openai import AsyncOpenAI from observability import LLMObservabilityManager, log_execution, logger from aiclient import DatabaseManager, AIClient from limit_tokens import trim_messages_openai from prompts import FOLLOWUP_DIGIYATRA_PROMPT from utils import parse_followup_and_tools from sse_starlette.sse import EventSourceResponse ## from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks from fastapi.security import APIKeyHeader from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import Literal, List, Dict import os from functools import lru_cache from openai import OpenAI from uuid import uuid4 import tiktoken import sqlite3 import time from datetime import datetime, timedelta import pandas as pd import requests import json import os from pydantic import BaseModel, Field import yaml import json from yaml.loader import SafeLoader app = FastAPI( title="Digiyatra Chatbot", description="Digiyatra Chatbot", version="1.0.0", tags=["chat"], contact={ "name": "Digiyatra", "url": "https://digiyatra.com", "email": "contact@digiyatra.com" } ) from observability_router import router as observability_router from rag_routerv2 import router as rag_router, query_table, QueryTableResponse, get_db_connection app.include_router(observability_router) app.include_router(rag_router) # SQLite setup DB_PATH = '/data/conversations.db' def init_db(): logger.info("Initializing database") os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS conversations (id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, conversation_id TEXT, message TEXT, response TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') conn.commit() conn.close() logger.info("Database initialized successfully") # In-memory storage for conversations conversations: Dict[str, List[Dict[str, str]]] = {} last_activity: Dict[str, float] = {} from aiclient import AIClient client = AIClient() def update_db(user_id, conversation_id, message, response): logger.info(f"Updating database for conversation: {conversation_id}") conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response) VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response)) conn.commit() conn.close() logger.info("Database updated successfully") ModelID = Literal[ "openai/gpt-4o-mini", "openai/chatgpt-4o-latest", "deepseek/deepseek-chat", "anthropic/claude-3.5-sonnet", "anthropic/claude-3-haiku", "meta-llama/llama-3.3-70b-instruct", "google/gemini-2.0-flash-exp:free", "google/gemini-flash-1.5", "meta-llama/llama-3-70b-instruct", "meta-llama/llama-3-8b-instruct", ] class FollowupQueryModel(BaseModel): query: str = Field(..., description="User's query for the followup agent") model_id: ModelID = Field( default="openai/gpt-4o-mini", description="ID of the model to use for response generation" ) conversation_id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the conversation") user_id: str = Field(..., description="Unique identifier for the user") table_id: Optional[str] = Field(None, description="Optional knowledge base table identifier for the query") class Config: schema_extra = { "example": { "query": "How can I improve my productivity?", "model_id": "openai/gpt-4o-mini", "conversation_id": "123e4567-e89b-12d3-a456-426614174000", "user_id": "user123", "table_id":"digiyatra" } } async def digiyatra_query_table(query: FollowupQueryModel, db: Annotated[Any, Depends(get_db_connection)], limit: Optional[int] = 5): """Query the digiyatra table.""" if query.table_id is None: query.table_id = "digiyatra" response = await query_table( table_id=query.table_id, query=query.query, user_id=query.user_id, limit=limit ) return response.results['data'][0]['text'] @app.post("/digiyatra-followup") async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks): """ Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. Requires API Key authentication via X-API-Key header. """ try: logger.info(f"Received followup agent query: {query.query}") if query.conversation_id not in conversations: conversations[query.conversation_id] = [ {"role": "system", "content": FOLLOWUP_DIGIYATRA_PROMPT} ] digiyatra_response = await digiyatra_query_table(query, db=get_db_connection(), limit=5) user_query_with_context = f"{query.query} \n\n FAQ Context for ANSWERING: {digiyatra_response}" conversations[query.conversation_id].append({"role": "user", "content": user_query_with_context}) last_activity[query.conversation_id] = time.time() # Limit tokens in the conversation history limited_conversation = conversations[query.conversation_id] async def process_response(): try: full_response = "" async for content in client.generate_response(limited_conversation, model=query.model_id, conversation_id=query.conversation_id, user=query.user_id): full_response += content yield f"{json.dumps({'type': 'token', 'content': content})}" logger.info(f"LLM RAW response for query: {query.query}: {full_response}") response_content, interact, tools = parse_followup_and_tools(full_response) result = { "response": response_content, "clarification": interact } yield f"{json.dumps({'type': 'metadata', 'response_full': result})}" # Add the assistant's response to the conversation history conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}") except Exception as e: logger.error(f"Error during response processing: {str(e)}") yield f"{json.dumps({'type': 'error', 'message': 'An error occurred while processing the response.'})}" return EventSourceResponse(process_response(), media_type="text/event-stream") except Exception as e: logger.error(f"Error in followup_agent: {str(e)}") raise HTTPException(status_code=500, detail="An error occurred while processing the followup agent request.") class ModelsResponse(BaseModel): models: List[str] @app.get("/models", response_model=ModelsResponse) def get_models(): """ Endpoint to return a list of available model IDs dynamically extracted from the ModelID Literal. """ try: # Dynamically extract the literal values of ModelID model_list = list(get_args(ModelID)) return ModelsResponse(models=model_list) except Exception as e: raise HTTPException(status_code=500, detail=f"An error occurred while fetching the models: {str(e)}") from fastapi.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") def startup(): logger.info("Starting up the application") init_db() @app.on_event("shutdown") def shutdown(): logger.info("Shutting down the application") # import uvicorn # if __name__ == "__main__": # uvicorn.run( # "app:app", # host="0.0.0.0", # port=8000, # workers=4, # reload=False, # access_log=False # )