File size: 8,663 Bytes
9c95ead
 
 
c6cf4a7
9c95ead
 
 
 
 
 
 
 
68394ea
adb504f
d3051c0
 
68394ea
b1c8f17
26e0ddc
d3051c0
68394ea
 
 
 
 
9c95ead
2bb5179
9c95ead
 
26e0ddc
9c95ead
 
 
 
2bb5179
9c95ead
 
 
 
 
 
 
 
 
26e0ddc
9c95ead
 
 
 
 
68394ea
5c18844
5c4af3f
68394ea
4d0ec3e
5c4af3f
 
68394ea
 
 
 
 
 
 
 
 
 
4d0ec3e
68394ea
9c95ead
 
 
 
 
 
 
68394ea
 
4d0ec3e
5c4af3f
68394ea
 
 
 
 
4d0ec3e
68394ea
2bb5179
9c95ead
 
6bfafac
 
9c95ead
 
6bfafac
 
 
 
 
 
9c95ead
effe83d
 
 
 
 
 
 
 
58c3ea2
effe83d
 
 
 
 
 
 
01f7187
58c3ea2
effe83d
 
 
ecd288b
58c3ea2
9c95ead
58c3ea2
 
 
9c95ead
58c3ea2
 
 
9c95ead
 
 
bfcfdb0
 
9c95ead
bfcfdb0
 
 
 
9c95ead
 
 
 
 
 
 
 
58c3ea2
9c95ead
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
963c80d
76466bd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58b4c6a
0f044f0
58b4c6a
 
9c95ead
58b4c6a
9c95ead
58b4c6a
 
9c95ead
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
## Digiyatra
from fastapi import FastAPI, Depends, BackgroundTasks, HTTPException, APIRouter, Query, Header
from pydantic import BaseModel
from typing import List, Dict, Optional, Union, Annotated, Any, get_args
from openai import AsyncOpenAI
from observability import LLMObservabilityManager, log_execution, logger
from aiclient import DatabaseManager, AIClient
from limit_tokens import trim_messages_openai
from prompts import FOLLOWUP_DIGIYATRA_PROMPT
from utils import parse_followup_and_tools
from sse_starlette.sse import EventSourceResponse
##
from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks
from fastapi.security import APIKeyHeader
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Literal, List, Dict
import os
from functools import lru_cache
from openai import OpenAI
from uuid import uuid4
import tiktoken
import sqlite3
import time
from datetime import datetime, timedelta
import pandas as pd
import requests
import json
import os

from pydantic import BaseModel, Field
import yaml
import json
from yaml.loader import SafeLoader

app = FastAPI(
    title="Digiyatra Chatbot",
    description="Digiyatra Chatbot",
    version="1.0.0",
    tags=["chat"],
    contact={
        "name": "Digiyatra",
        "url": "https://digiyatra.com",
        "email": "contact@digiyatra.com"
    }
)
from observability_router import router as observability_router
from rag_routerv2 import router as rag_router, query_table, QueryTableResponse, get_db_connection
app.include_router(observability_router)
app.include_router(rag_router)
# SQLite setup
DB_PATH = '/data/conversations.db'

def init_db():
    logger.info("Initializing database")
    os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS conversations
                 (id INTEGER PRIMARY KEY AUTOINCREMENT,
                  user_id TEXT,
                  conversation_id TEXT,
                  message TEXT,
                  response TEXT,
                  timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''')
    conn.commit()
    conn.close()
    logger.info("Database initialized successfully")


# In-memory storage for conversations
conversations: Dict[str, List[Dict[str, str]]] = {}
last_activity: Dict[str, float] = {}

from aiclient import AIClient
client = AIClient()

def update_db(user_id, conversation_id, message, response):
    logger.info(f"Updating database for conversation: {conversation_id}")
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response)
                 VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response))
    conn.commit()
    conn.close()
    logger.info("Database updated successfully")


ModelID = Literal[
    "openai/gpt-4o-mini",
    "openai/chatgpt-4o-latest",
    "deepseek/deepseek-chat",
    "anthropic/claude-3.5-sonnet",
    "anthropic/claude-3-haiku",
    "meta-llama/llama-3.3-70b-instruct",
    "google/gemini-2.0-flash-exp:free",
    "google/gemini-flash-1.5",
    "meta-llama/llama-3-70b-instruct",
    "meta-llama/llama-3-8b-instruct",

]
class FollowupQueryModel(BaseModel):
    query: str = Field(..., description="User's query for the followup agent")
    model_id: ModelID = Field(
        default="openai/gpt-4o-mini",
        description="ID of the model to use for response generation"
    )
    conversation_id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the conversation")
    user_id: str = Field(..., description="Unique identifier for the user")
    table_id: Optional[str] = Field(None, description="Optional knowledge base table identifier for the query")

    class Config:
        schema_extra = {
            "example": {
                "query": "How can I improve my productivity?",
                "model_id": "openai/gpt-4o-mini",
                "conversation_id": "123e4567-e89b-12d3-a456-426614174000",
                "user_id": "user123",
                "table_id":"digiyatra"
            }
        }


async def digiyatra_query_table(query: FollowupQueryModel, db: Annotated[Any, Depends(get_db_connection)], limit: Optional[int] = 5):
    """Query the digiyatra table."""
    if query.table_id is None:
        query.table_id = "digiyatra"
        
    response = await query_table(
        table_id=query.table_id,
        query=query.query,
        user_id=query.user_id, 
        limit=limit
    )
    return response.results['data'][0]['text']

@app.post("/digiyatra-followup")
async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks):
    """
    Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries.
    Requires API Key authentication via X-API-Key header.
    """
    try:
        logger.info(f"Received followup agent query: {query.query}")

        if query.conversation_id not in conversations:
            conversations[query.conversation_id] = [
                {"role": "system", "content": FOLLOWUP_DIGIYATRA_PROMPT}
            ]

        digiyatra_response = await digiyatra_query_table(query, db=get_db_connection(), limit=5)
        user_query_with_context = f"{query.query} \n\n FAQ Context for ANSWERING: {digiyatra_response}"
        conversations[query.conversation_id].append({"role": "user", "content": user_query_with_context})
        last_activity[query.conversation_id] = time.time()
        
        # Limit tokens in the conversation history
        limited_conversation = conversations[query.conversation_id]

        async def process_response():
            try:
                full_response = ""
                async for content in client.generate_response(limited_conversation, model=query.model_id, conversation_id=query.conversation_id, user=query.user_id):
                    full_response += content
                    yield f"{json.dumps({'type': 'token', 'content': content})}"

                logger.info(f"LLM RAW response for query: {query.query}: {full_response}")
                response_content, interact, tools = parse_followup_and_tools(full_response)
                
                result = {
                    "response": response_content,
                    "clarification": interact
                }
                
                yield f"{json.dumps({'type': 'metadata', 'response_full': result})}"
                
                # Add the assistant's response to the conversation history
                conversations[query.conversation_id].append({"role": "assistant", "content": full_response})
                
                background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response)
                logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}")
            except Exception as e:
                logger.error(f"Error during response processing: {str(e)}")
                yield f"{json.dumps({'type': 'error', 'message': 'An error occurred while processing the response.'})}"

        return EventSourceResponse(process_response(), media_type="text/event-stream")
    except Exception as e:
        logger.error(f"Error in followup_agent: {str(e)}")
        raise HTTPException(status_code=500, detail="An error occurred while processing the followup agent request.")

class ModelsResponse(BaseModel):
    models: List[str]

@app.get("/models", response_model=ModelsResponse)
def get_models():
    """
    Endpoint to return a list of available model IDs dynamically extracted from the ModelID Literal.
    """
    try:
        # Dynamically extract the literal values of ModelID
        model_list = list(get_args(ModelID))
        return ModelsResponse(models=model_list)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"An error occurred while fetching the models: {str(e)}")

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.on_event("startup")
def startup():
    logger.info("Starting up the application")
    init_db()

@app.on_event("shutdown")
def shutdown():
    logger.info("Shutting down the application")



# import uvicorn

# if __name__ == "__main__":
#     uvicorn.run(
#         "app:app",
#         host="0.0.0.0",
#         port=8000,
#         workers=4,
#         reload=False,
#         access_log=False
#     )