pvanand commited on
Commit
b66bfbf
1 Parent(s): dd91c47

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +55 -95
main.py CHANGED
@@ -1,3 +1,4 @@
 
1
  from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks
2
  from fastapi.security import APIKeyHeader
3
  from fastapi.responses import StreamingResponse
@@ -13,11 +14,15 @@ import time
13
  from datetime import datetime, timedelta
14
  import asyncio
15
  import requests
16
- from prompts import CODING_ASSISTANT_PROMPT, NEWS_ASSISTANT_PROMPT, generate_news_prompt, SEARCH_ASSISTANT_PROMPT, generate_search_prompt
17
  from fastapi_cache import FastAPICache
18
  from fastapi_cache.backends.inmemory import InMemoryBackend
19
  from fastapi_cache.decorator import cache
20
 
 
 
 
 
21
  app = FastAPI()
22
 
23
  API_KEY_NAME = "X-API-Key"
@@ -69,6 +74,7 @@ class NewsQueryModel(BaseModel):
69
 
70
  @lru_cache()
71
  def get_api_keys():
 
72
  return {
73
  "OPENROUTER_API_KEY": f"sk-or-v1-{os.environ['OPENROUTER_API_KEY']}",
74
  "BRAVE_API_KEY": os.environ['BRAVE_API_KEY']
@@ -85,12 +91,16 @@ last_activity: Dict[str, float] = {}
85
  encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
86
 
87
  def limit_tokens(input_string, token_limit=6000):
 
88
  return encoding.decode(encoding.encode(input_string)[:token_limit])
89
 
90
  def calculate_tokens(msgs):
91
- return sum(len(encoding.encode(str(m))) for m in msgs)
 
 
92
 
93
  def chat_with_llama_stream(messages, model="gpt-3.5-turbo", max_llm_history=4, max_output_tokens=2500):
 
94
  while calculate_tokens(messages) > (8000 - max_output_tokens):
95
  if len(messages) > max_llm_history:
96
  messages = [messages[0]] + messages[-max_llm_history:]
@@ -98,9 +108,11 @@ def chat_with_llama_stream(messages, model="gpt-3.5-turbo", max_llm_history=4, m
98
  max_llm_history -= 1
99
  if max_llm_history < 2:
100
  error_message = "Token limit exceeded. Please shorten your input or start a new conversation."
 
101
  raise HTTPException(status_code=400, detail=error_message)
102
 
103
  try:
 
104
  response = or_client.chat.completions.create(
105
  model=model,
106
  messages=messages,
@@ -115,20 +127,25 @@ def chat_with_llama_stream(messages, model="gpt-3.5-turbo", max_llm_history=4, m
115
  full_response += content
116
  yield content
117
 
 
118
  # After streaming, add the full response to the conversation history
119
  messages.append({"role": "assistant", "content": full_response})
120
  except Exception as e:
 
121
  raise HTTPException(status_code=500, detail=f"Error in model response: {str(e)}")
122
 
123
  async def verify_api_key(api_key: str = Security(api_key_header)):
124
  if api_key != API_KEY:
 
125
  raise HTTPException(status_code=403, detail="Could not validate credentials")
 
126
  return api_key
127
 
128
  # SQLite setup
129
  DB_PATH = '/app/data/conversations.db'
130
 
131
  def init_db():
 
132
  os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
133
  conn = sqlite3.connect(DB_PATH)
134
  c = conn.cursor()
@@ -141,18 +158,22 @@ def init_db():
141
  timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''')
142
  conn.commit()
143
  conn.close()
 
144
 
145
  init_db()
146
 
147
  def update_db(user_id, conversation_id, message, response):
 
148
  conn = sqlite3.connect(DB_PATH)
149
  c = conn.cursor()
150
  c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response)
151
  VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response))
152
  conn.commit()
153
  conn.close()
 
154
 
155
  async def clear_inactive_conversations():
 
156
  while True:
157
  current_time = time.time()
158
  inactive_convos = [conv_id for conv_id, last_time in last_activity.items()
@@ -162,27 +183,18 @@ async def clear_inactive_conversations():
162
  del conversations[conv_id]
163
  if conv_id in last_activity:
164
  del last_activity[conv_id]
 
165
  await asyncio.sleep(60) # Check every minute
166
 
167
  @app.on_event("startup")
168
  async def startup_event():
 
169
  FastAPICache.init(InMemoryBackend(), prefix="fastapi-cache")
170
  asyncio.create_task(clear_inactive_conversations())
171
 
172
  @app.post("/coding-assistant")
173
  async def coding_assistant(query: QueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)):
174
- """
175
- Coding assistant endpoint that provides programming help based on user queries.
176
- Available models:
177
- - meta-llama/llama-3-70b-instruct (default)
178
- - anthropic/claude-3.5-sonnet
179
- - deepseek/deepseek-coder
180
- - anthropic/claude-3-haiku
181
- - openai/gpt-3.5-turbo-instruct
182
- - qwen/qwen-72b-chat
183
- - google/gemma-2-27b-it
184
- Requires API Key authentication via X-API-Key header.
185
- """
186
  if query.conversation_id not in conversations:
187
  conversations[query.conversation_id] = [
188
  {"role": "system", "content": "You are a helpful assistant proficient in coding tasks. Help the user in understanding and writing code."}
@@ -199,18 +211,16 @@ async def coding_assistant(query: QueryModel, background_tasks: BackgroundTasks,
199
  for content in chat_with_llama_stream(limited_conversation, model=query.model_id):
200
  full_response += content
201
  yield content
 
202
  background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.user_query, full_response)
203
 
204
  return StreamingResponse(process_response(), media_type="text/event-stream")
205
 
206
  # New functions for news assistant
207
 
208
- def internet_search(query, type = "web", num_results=20):
209
- if type == "web":
210
- url = "https://api.search.brave.com/res/v1/web/search"
211
- else:
212
- url = "https://api.search.brave.com/res/v1/news/search"
213
-
214
  headers = {
215
  "Accept": "application/json",
216
  "Accept-Encoding": "gzip",
@@ -220,37 +230,33 @@ def internet_search(query, type = "web", num_results=20):
220
 
221
  response = requests.get(url, headers=headers, params=params)
222
 
223
- if response.status_code != 200:
224
- return []
225
-
226
- if type == "web":
227
- search_data = response.json()["web"]["results"]
 
 
 
 
 
 
 
228
  else:
229
- search_data = response.json()["results"]
230
- processed_results = []
231
-
232
- for item in search_data:
233
- if not item.get("extra_snippets"):
234
- continue
235
-
236
- result = {
237
- "title": item["title"],
238
- "snippet": item["extra_snippets"][0],
239
- "last_updated": item.get("age", "")
240
- }
241
- processed_results.append(result)
242
-
243
- return processed_results[:num_results]
244
 
245
  @lru_cache(maxsize=100)
246
- def cached_internet_search(query: str):
247
- return internet_search(query, type = "news")
248
-
249
 
250
  def analyze_news(query):
251
- news_data = cached_internet_search(query)
 
252
 
253
  if not news_data:
 
254
  return "Failed to fetch news data.", []
255
 
256
  # Prepare the prompt for the AI
@@ -262,72 +268,26 @@ def analyze_news(query):
262
  {"role": "user", "content": prompt}
263
  ]
264
 
 
265
  return messages
266
 
267
  @app.post("/news-assistant")
268
  async def news_assistant(query: NewsQueryModel, api_key: str = Depends(verify_api_key)):
269
- """
270
- News assistant endpoint that provides summaries and analysis of recent news based on user queries.
271
- Requires API Key authentication via X-API-Key header.
272
- """
273
  messages = analyze_news(query.query)
274
 
275
  if not messages:
 
276
  raise HTTPException(status_code=500, detail="Failed to fetch news data")
277
 
278
  def process_response():
279
  for content in chat_with_llama_stream(messages, model=query.model_id):
280
  yield content
281
- #meta-llama/llama-3-70b-instruct google/gemini-pro-1.5
282
- return StreamingResponse(process_response(), media_type="text/event-stream")
283
-
284
- class SearchQueryModel(BaseModel):
285
- query: str = Field(..., description="Search query")
286
- model_id: ModelID = Field(
287
- default="meta-llama/llama-3-70b-instruct",
288
- description="ID of the model to use for response generation"
289
- )
290
- class Config:
291
- schema_extra = {
292
- "example": {
293
- "query": "What are the latest advancements in quantum computing?",
294
- "model_id": "meta-llama/llama-3-70b-instruct"
295
- }
296
- }
297
-
298
- def analyze_search_results(query):
299
- search_data = internet_search(query, type="web")
300
-
301
- if not search_data:
302
- return "Failed to fetch search data.", []
303
-
304
- # Prepare the prompt for the AI
305
- prompt = generate_search_prompt(query, search_data)
306
-
307
- messages = [
308
- {"role": "system", "content": SEARCH_ASSISTANT_PROMPT},
309
- {"role": "user", "content": prompt}
310
- ]
311
-
312
- return messages
313
-
314
- @app.post("/search-assistant")
315
- async def search_assistant(query: SearchQueryModel, api_key: str = Depends(verify_api_key)):
316
- """
317
- Search assistant endpoint that provides summaries and analysis of web search results based on user queries.
318
- Requires API Key authentication via X-API-Key header.
319
- """
320
- messages = analyze_search_results(query.query)
321
-
322
- if not messages:
323
- raise HTTPException(status_code=500, detail="Failed to fetch search data")
324
-
325
- def process_response():
326
- for content in chat_with_llama_stream(messages, model=query.model_id):
327
- yield content
328
 
 
329
  return StreamingResponse(process_response(), media_type="text/event-stream")
330
 
331
  if __name__ == "__main__":
332
  import uvicorn
 
333
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
+ import logging
2
  from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks
3
  from fastapi.security import APIKeyHeader
4
  from fastapi.responses import StreamingResponse
 
14
  from datetime import datetime, timedelta
15
  import asyncio
16
  import requests
17
+ from prompts import CODING_ASSISTANT_PROMPT, NEWS_ASSISTANT_PROMPT, generate_news_prompt
18
  from fastapi_cache import FastAPICache
19
  from fastapi_cache.backends.inmemory import InMemoryBackend
20
  from fastapi_cache.decorator import cache
21
 
22
+ # Set up logging
23
+ logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
24
+ logger = logging.getLogger(__name__)
25
+
26
  app = FastAPI()
27
 
28
  API_KEY_NAME = "X-API-Key"
 
74
 
75
  @lru_cache()
76
  def get_api_keys():
77
+ logger.debug("Fetching API keys")
78
  return {
79
  "OPENROUTER_API_KEY": f"sk-or-v1-{os.environ['OPENROUTER_API_KEY']}",
80
  "BRAVE_API_KEY": os.environ['BRAVE_API_KEY']
 
91
  encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
92
 
93
  def limit_tokens(input_string, token_limit=6000):
94
+ logger.debug(f"Limiting tokens for input string, token limit: {token_limit}")
95
  return encoding.decode(encoding.encode(input_string)[:token_limit])
96
 
97
  def calculate_tokens(msgs):
98
+ token_count = sum(len(encoding.encode(str(m))) for m in msgs)
99
+ logger.debug(f"Calculated token count: {token_count}")
100
+ return token_count
101
 
102
  def chat_with_llama_stream(messages, model="gpt-3.5-turbo", max_llm_history=4, max_output_tokens=2500):
103
+ logger.info(f"Starting chat with model: {model}")
104
  while calculate_tokens(messages) > (8000 - max_output_tokens):
105
  if len(messages) > max_llm_history:
106
  messages = [messages[0]] + messages[-max_llm_history:]
 
108
  max_llm_history -= 1
109
  if max_llm_history < 2:
110
  error_message = "Token limit exceeded. Please shorten your input or start a new conversation."
111
+ logger.error(error_message)
112
  raise HTTPException(status_code=400, detail=error_message)
113
 
114
  try:
115
+ logger.debug("Sending request to OpenAI")
116
  response = or_client.chat.completions.create(
117
  model=model,
118
  messages=messages,
 
127
  full_response += content
128
  yield content
129
 
130
+ logger.debug("Finished streaming response")
131
  # After streaming, add the full response to the conversation history
132
  messages.append({"role": "assistant", "content": full_response})
133
  except Exception as e:
134
+ logger.error(f"Error in model response: {str(e)}")
135
  raise HTTPException(status_code=500, detail=f"Error in model response: {str(e)}")
136
 
137
  async def verify_api_key(api_key: str = Security(api_key_header)):
138
  if api_key != API_KEY:
139
+ logger.warning("Invalid API key attempt")
140
  raise HTTPException(status_code=403, detail="Could not validate credentials")
141
+ logger.debug("API key verified successfully")
142
  return api_key
143
 
144
  # SQLite setup
145
  DB_PATH = '/app/data/conversations.db'
146
 
147
  def init_db():
148
+ logger.info("Initializing database")
149
  os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
150
  conn = sqlite3.connect(DB_PATH)
151
  c = conn.cursor()
 
158
  timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''')
159
  conn.commit()
160
  conn.close()
161
+ logger.debug("Database initialized")
162
 
163
  init_db()
164
 
165
  def update_db(user_id, conversation_id, message, response):
166
+ logger.debug(f"Updating database for conversation {conversation_id}")
167
  conn = sqlite3.connect(DB_PATH)
168
  c = conn.cursor()
169
  c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response)
170
  VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response))
171
  conn.commit()
172
  conn.close()
173
+ logger.debug("Database updated successfully")
174
 
175
  async def clear_inactive_conversations():
176
+ logger.info("Starting inactive conversation cleanup task")
177
  while True:
178
  current_time = time.time()
179
  inactive_convos = [conv_id for conv_id, last_time in last_activity.items()
 
183
  del conversations[conv_id]
184
  if conv_id in last_activity:
185
  del last_activity[conv_id]
186
+ logger.debug(f"Cleared {len(inactive_convos)} inactive conversations")
187
  await asyncio.sleep(60) # Check every minute
188
 
189
  @app.on_event("startup")
190
  async def startup_event():
191
+ logger.info("Starting up FastAPI application")
192
  FastAPICache.init(InMemoryBackend(), prefix="fastapi-cache")
193
  asyncio.create_task(clear_inactive_conversations())
194
 
195
  @app.post("/coding-assistant")
196
  async def coding_assistant(query: QueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)):
197
+ logger.info(f"Received coding assistant request for user {query.user_id}")
 
 
 
 
 
 
 
 
 
 
 
198
  if query.conversation_id not in conversations:
199
  conversations[query.conversation_id] = [
200
  {"role": "system", "content": "You are a helpful assistant proficient in coding tasks. Help the user in understanding and writing code."}
 
211
  for content in chat_with_llama_stream(limited_conversation, model=query.model_id):
212
  full_response += content
213
  yield content
214
+ logger.debug(f"Finished processing response for conversation {query.conversation_id}")
215
  background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.user_query, full_response)
216
 
217
  return StreamingResponse(process_response(), media_type="text/event-stream")
218
 
219
  # New functions for news assistant
220
 
221
+ def fetch_news(query, num_results=20):
222
+ logger.info(f"Fetching news for query: {query}")
223
+ url = "https://api.search.brave.com/res/v1/news/search"
 
 
 
224
  headers = {
225
  "Accept": "application/json",
226
  "Accept-Encoding": "gzip",
 
230
 
231
  response = requests.get(url, headers=headers, params=params)
232
 
233
+ if response.status_code == 200:
234
+ news_data = response.json()
235
+ logger.debug(f"Fetched {len(news_data['results'])} news items")
236
+ return [
237
+ {
238
+ "title": item["title"],
239
+ "snippet": item["extra_snippets"][0] if "extra_snippets" in item and item["extra_snippets"] else "",
240
+ "last_updated": item.get("age", ""),
241
+ }
242
+ for item in news_data['results']
243
+ if "extra_snippets" in item and item["extra_snippets"]
244
+ ][:num_results]
245
  else:
246
+ logger.error(f"Failed to fetch news. Status code: {response.status_code}")
247
+ return []
 
 
 
 
 
 
 
 
 
 
 
 
 
248
 
249
  @lru_cache(maxsize=100)
250
+ def cached_fetch_news(query: str):
251
+ logger.debug(f"Fetching cached news for query: {query}")
252
+ return fetch_news(query)
253
 
254
  def analyze_news(query):
255
+ logger.info(f"Analyzing news for query: {query}")
256
+ news_data = cached_fetch_news(query)
257
 
258
  if not news_data:
259
+ logger.warning("No news data fetched")
260
  return "Failed to fetch news data.", []
261
 
262
  # Prepare the prompt for the AI
 
268
  {"role": "user", "content": prompt}
269
  ]
270
 
271
+ logger.debug("News analysis prompt prepared")
272
  return messages
273
 
274
  @app.post("/news-assistant")
275
  async def news_assistant(query: NewsQueryModel, api_key: str = Depends(verify_api_key)):
276
+ logger.info(f"Received news assistant request for query: {query.query}")
 
 
 
277
  messages = analyze_news(query.query)
278
 
279
  if not messages:
280
+ logger.error("Failed to fetch news data")
281
  raise HTTPException(status_code=500, detail="Failed to fetch news data")
282
 
283
  def process_response():
284
  for content in chat_with_llama_stream(messages, model=query.model_id):
285
  yield content
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
286
 
287
+ logger.debug("Starting to stream news assistant response")
288
  return StreamingResponse(process_response(), media_type="text/event-stream")
289
 
290
  if __name__ == "__main__":
291
  import uvicorn
292
+ logger.info("Starting uvicorn server")
293
  uvicorn.run(app, host="0.0.0.0", port=7860)