Spaces:
Running
Running
import re | |
import os | |
from enum import Enum | |
from uuid import uuid4 | |
import base64 | |
import requests | |
from io import BytesIO | |
import time | |
import asyncio | |
import logging | |
import sqlite3 | |
import tiktoken | |
from uuid import uuid4 | |
from functools import lru_cache | |
from typing import Optional, List, Dict, Literal | |
from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks, Query, Header | |
from fastapi.security import APIKeyHeader | |
from fastapi.responses import StreamingResponse, JSONResponse | |
from pydantic import BaseModel, Field | |
from openai import OpenAI | |
from prompts import * | |
# ============================================================================ | |
# Configuration and Setup | |
# ============================================================================ | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("app.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# FastAPI app setup | |
app = FastAPI() | |
# API key configuration | |
API_KEY_NAME = "X-API-Key" | |
PEXELS_API_KEY = os.environ["PEXELS_API_KEY"] | |
API_KEY = os.environ.get("CHAT_AUTH_KEY", "default_secret_key") | |
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) | |
# Model definitions | |
ModelID = Literal[ | |
"openai/gpt-4o-mini", | |
"meta-llama/llama-3-70b-instruct", | |
"anthropic/claude-3.5-sonnet", | |
"deepseek/deepseek-coder", | |
"anthropic/claude-3-haiku", | |
"openai/gpt-3.5-turbo-instruct", | |
"qwen/qwen-72b-chat", | |
"google/gemma-2-27b-it" | |
] | |
# Pydantic models | |
class LLMAgentQueryModel(BaseModel): | |
prompt: str = Field(..., description="User's query or prompt") | |
system_message: Optional[str] = Field(None, description="Custom system message for the conversation") | |
model_id: ModelID = Field( | |
default="openai/gpt-4o-mini", | |
description="ID of the model to use for response generation" | |
) | |
conversation_id: Optional[str] = Field(None, description="Unique identifier for the conversation") | |
user_id: str = Field(..., description="Unique identifier for the user") | |
class Config: | |
schema_extra = { | |
"example": { | |
"prompt": "How do I implement a binary search in Python?", | |
"system_message": "You are a helpful coding assistant.", | |
"model_id": "meta-llama/llama-3-70b-instruct", | |
"conversation_id": "123e4567-e89b-12d3-a456-426614174000", | |
"user_id": "user123" | |
} | |
} | |
# API key and client setup | |
def get_api_keys(): | |
logger.info("Loading API keys") | |
return { | |
"OPENROUTER_API_KEY": f"sk-or-v1-{os.environ['OPENROUTER_API_KEY']}", | |
# "OPENAI_API_KEY": f"sk-or-v1-{os.environ['OPENAI_API_KEY']}", | |
} | |
api_keys = get_api_keys() | |
or_client = OpenAI(api_key=api_keys["OPENROUTER_API_KEY"], base_url="https://openrouter.ai/api/v1") | |
# In-memory storage for conversations | |
conversations: Dict[str, List[Dict[str, str]]] = {} | |
last_activity: Dict[str, float] = {} | |
# Token encoding | |
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") | |
# ============================================================================ | |
# Database Functions | |
# ============================================================================ | |
DB_PATH = '/app/data/conversations.db' | |
def init_db(): | |
logger.info("Initializing database") | |
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) | |
conn = sqlite3.connect(DB_PATH) | |
c = conn.cursor() | |
c.execute('''CREATE TABLE IF NOT EXISTS conversations | |
(id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id TEXT, | |
conversation_id TEXT, | |
message TEXT, | |
response TEXT, | |
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') | |
conn.commit() | |
conn.close() | |
logger.info("Database initialized successfully") | |
def update_db(user_id, conversation_id, message, response): | |
logger.info(f"Updating database for conversation: {conversation_id}") | |
conn = sqlite3.connect(DB_PATH) | |
c = conn.cursor() | |
c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response) | |
VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response)) | |
conn.commit() | |
conn.close() | |
logger.info("Database updated successfully") | |
# ============================================================================ | |
# Utility Functions | |
# ============================================================================ | |
def extract_data_from_tag(input_string, tag, invert=False): | |
""" | |
returns combined data from the identified tags, if not found return "" | |
if inverted returns data excluding tag, if tag not found, input_string is returned | |
""" | |
pattern = f'<{tag}.*?>(.*?)</{tag}>' | |
matches = re.findall(pattern, input_string, re.DOTALL) | |
if invert: | |
if matches: | |
out = re.sub(pattern, '', input_string, flags=re.DOTALL) | |
return out.strip() | |
else: | |
return input_string.strip() | |
else: | |
if matches: | |
return '\n'.join(match.strip() for match in matches) | |
else: | |
return "" | |
def calculate_tokens(msgs): | |
return sum(len(encoding.encode(str(m))) for m in msgs) | |
def limit_conversation_history(conversation: List[Dict[str, str]], max_tokens: int = 7500) -> List[Dict[str, str]]: | |
"""Limit the conversation history to a maximum number of tokens.""" | |
limited_conversation = [] | |
current_tokens = 0 | |
for message in reversed(conversation): | |
message_tokens = calculate_tokens([message]) | |
if current_tokens + message_tokens > max_tokens: | |
break | |
limited_conversation.insert(0, message) | |
current_tokens += message_tokens | |
return limited_conversation | |
async def verify_api_key(api_key: str = Security(api_key_header)): | |
if api_key != API_KEY: | |
logger.warning("Invalid API key used") | |
raise HTTPException(status_code=403, detail="Could not validate credentials") | |
return api_key | |
# ============================================================================ | |
# LLM Interaction Functions | |
# ============================================================================ | |
def chat_with_llama_stream(messages, model="meta-llama/llama-3-70b-instruct", max_output_tokens=2500): | |
logger.info(f"Recieved chat request: {messages}") | |
logger.info(f"Starting chat with model: {model}") | |
try: | |
response = or_client.chat.completions.create( | |
model=model, | |
messages=messages, | |
max_tokens=max_output_tokens, | |
stream=True | |
) | |
full_response = "" | |
for chunk in response: | |
if chunk.choices[0].delta.content is not None: | |
content = chunk.choices[0].delta.content | |
full_response += content | |
yield content | |
# After streaming, add the full response to the conversation history | |
messages.append({"role": "assistant", "content": full_response}) | |
logger.info("Chat completed successfully") | |
except Exception as e: | |
logger.error(f"Error in model response: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error in model response: {str(e)}") | |
# ============================================================================ | |
# Background Tasks | |
# ============================================================================ | |
async def clear_inactive_conversations(): | |
while True: | |
logger.info("Clearing inactive conversations") | |
current_time = time.time() | |
inactive_convos = [conv_id for conv_id, last_time in last_activity.items() | |
if current_time - last_time > 1800] # 30 minutes | |
for conv_id in inactive_convos: | |
if conv_id in conversations: | |
del conversations[conv_id] | |
if conv_id in last_activity: | |
del last_activity[conv_id] | |
await asyncio.sleep(600) # Check every hour | |
# ============================================================================ | |
# FastAPI Events and Endpoints | |
# ============================================================================ | |
async def startup_event(): | |
logger.info("Starting up the application") | |
init_db() | |
asyncio.create_task(clear_inactive_conversations()) | |
async def llm_agent(query: LLMAgentQueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
""" | |
LLM agent endpoint that provides responses based on user queries, maintaining conversation history. | |
Accepts custom system messages and allows selection of different models. | |
Requires API Key authentication via X-API-Key header. | |
""" | |
logger.info(f"Received LLM agent query: {query.prompt}") | |
# Generate a new conversation ID if not provided | |
if not query.conversation_id: | |
query.conversation_id = str(uuid4()) | |
# Initialize or retrieve conversation history | |
if query.conversation_id not in conversations: | |
system_message = query.system_message or "You are a helpful assistant." | |
conversations[query.conversation_id] = [ | |
{"role": "system", "content": system_message} | |
] | |
elif query.system_message: | |
# Update system message if provided | |
conversations[query.conversation_id][0] = {"role": "system", "content": query.system_message} | |
# Add user's prompt to conversation history | |
conversations[query.conversation_id].append({"role": "user", "content": query.prompt}) | |
last_activity[query.conversation_id] = time.time() | |
# Limit tokens in the conversation history | |
limited_conversation = limit_conversation_history(conversations[query.conversation_id]) | |
def process_response(): | |
full_response = "" | |
for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
full_response += content | |
yield content | |
# Add the assistant's response to the conversation history | |
conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.prompt, full_response) | |
logger.info(f"Completed LLM agent response for query: {query.prompt}") | |
return StreamingResponse(process_response(), media_type="text/event-stream") | |
async def llm_agent_v2(query: LLMAgentQueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
""" | |
LLM agent endpoint that provides responses based on user queries, maintaining conversation history. | |
Accepts custom system messages and allows selection of different models. | |
Requires API Key authentication via X-API-Key header. | |
""" | |
logger.info(f"Received LLM agent query: {query.prompt}") | |
# Generate a new conversation ID if not provided | |
if not query.conversation_id: | |
query.conversation_id = str(uuid4()) | |
# Initialize or retrieve conversation history | |
if query.conversation_id not in conversations: | |
system_message = query.system_message or "You are a helpful assistant." | |
conversations[query.conversation_id] = [ | |
{"role": "system", "content": system_message} | |
] | |
elif query.system_message: | |
# Update system message if provided | |
conversations[query.conversation_id][0] = {"role": "system", "content": query.system_message} | |
# Add user's prompt to conversation history | |
conversations[query.conversation_id].append({"role": "user", "content": query.prompt}) | |
last_activity[query.conversation_id] = time.time() | |
# Limit tokens in the conversation history | |
limited_conversation = limit_conversation_history(conversations[query.conversation_id]) | |
def process_response(): | |
full_response = "" | |
for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
full_response += content | |
yield json.dumps({"type": "response","content": content}) + "\n" | |
# Add the assistant's response to the conversation history | |
conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.prompt, full_response) | |
logger.info(f"Completed LLM agent response for query: {query.prompt}") | |
return StreamingResponse(process_response(), media_type="text/event-stream") | |
import edge_tts | |
import io | |
async def text_to_speech( | |
text: str = Query(..., description="Text to convert to speech"), | |
voice: str = Query(default="en-GB-SoniaNeural", description="Voice to use for speech") | |
): | |
communicate = edge_tts.Communicate(text, voice) | |
async def generate(): | |
async for chunk in communicate.stream(): | |
if chunk["type"] == "audio": | |
yield chunk["data"] | |
return StreamingResponse(generate(), media_type="audio/mpeg") | |
# ============================================================================ | |
# PPT AGENT | |
# ============================================================================ | |
class PresentationChatModel(BaseModel): | |
prompt: str = Field(..., description="User's query or prompt") | |
model_id: ModelID = Field( | |
default="openai/gpt-4o-mini", | |
description="ID of the model to use for response generation" | |
) | |
conversation_id: Optional[str] = Field(None, description="Unique identifier for the conversation") | |
user_id: str = Field(..., description="Unique identifier for the user") | |
class Config: | |
schema_extra = { | |
"example": { | |
"prompt": "Help me create a presentation for my healthy snacks startup", | |
"model_id": "openai/gpt-4o-mini", | |
"conversation_id": "123e4567-e89b-12d3-a456-426614174000", | |
"user_id": "user123" | |
} | |
} | |
# Enum for output formats | |
class OutputFormatEnum(str, Enum): | |
html = "html" | |
pdf = "pdf" | |
pptx = "pptx" | |
# Class model for presentation data | |
class PresentationModel(BaseModel): | |
markdown: str | |
output_format: OutputFormatEnum = OutputFormatEnum.html | |
def get_pexels_image(query): | |
default_img_url = "https://images.pexels.com/photos/593158/pexels-photo-593158.jpeg?auto=compress&cs=tinysrgb&w=1260&h=750&dpr=2" | |
url = f"https://api.pexels.com/v1/search?query={query}&per_page=1" | |
headers = {"Authorization": PEXELS_API_KEY} | |
try: | |
response = requests.get(url, headers=headers) | |
if response.status_code == 200: | |
data = response.json() | |
logger.info(f"PEXELS API RESPONSE: {response.json()}") | |
if data["total_results"] > 0: | |
return data["photos"][0]["src"]["medium"] | |
else: | |
logger.error(f"PEXELS API ERROR: {response}") | |
return default_img_url | |
except Exception as e: | |
logger.error(f"An error occurred for Pexels API: {e}") | |
return default_img_url | |
def replace_image_keywords(text): | |
def replace_match(match): | |
bg_params = match.group(1) | |
keyword = re.sub(r'[^\w\s]', ' ', match.group(2)).strip() | |
logger.info(f"Extracted keywords for pexels : {keyword}") | |
image_url = get_pexels_image(keyword) | |
return f"![bg {bg_params}]({image_url})" | |
pattern = r'!\[bg (.*?)\]\((.*?)\)' | |
return re.sub(pattern, replace_match, text) | |
def convert_markdown_marp(markdown, output_format='html'): | |
API_URL = "https://pvanand-marpit-backend.hf.space/convert" | |
if output_format not in ['html', 'pdf', 'pptx']: | |
raise ValueError(f"Invalid output format. Supported formats are: html, pdf, pptx") | |
data = { | |
"markdown": markdown, | |
"outputFormat": output_format, | |
"options": [] | |
} | |
try: | |
response = requests.post(API_URL, json=data, timeout=30) | |
response.raise_for_status() | |
return response.content | |
except requests.exceptions.RequestException as e: | |
logger.error(f"An error occurred while connecting to the API: {e}") | |
return None | |
async def create_presentation(data: PresentationModel): | |
if not data.markdown: | |
raise HTTPException(status_code=400, detail="Please provide Markdown text.") | |
markdown = data.markdown | |
output_format = data.output_format | |
markdown_with_images = replace_image_keywords(markdown) | |
logger.info(f"INPUT MD: {markdown_with_images} OUTPUT FORMAT: {output_format}") | |
result = convert_markdown_marp(markdown_with_images, output_format) | |
if result: | |
if output_format == 'html': | |
return {"html": result.decode()} | |
elif output_format == 'pdf': | |
return StreamingResponse(BytesIO(result), media_type="application/pdf") | |
elif output_format == 'pptx': | |
return StreamingResponse(BytesIO(result), media_type="application/vnd.openxmlformats-officedocument.presentationml.presentation") | |
else: | |
raise HTTPException(status_code=500, detail="Failed to create presentation.") | |
async def presentation_chat(query: PresentationChatModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
""" | |
Presentation chat endpoint that generates a presentation based on user queries. | |
Uses the llm_agent function and returns both markdown and HTML output. | |
Requires API Key authentication via X-API-Key header. | |
""" | |
logger.info(f"Received presentation chat query: {query.prompt}") | |
# Create a new LLMAgentQueryModel with a specific system message for presentation generation | |
llm_query = LLMAgentQueryModel( | |
prompt=query.prompt, | |
conversation_id=query.conversation_id, | |
system_message=PRESENTATION_SYSTEM_PROMPT, | |
model_id=query.model_id, | |
user_id=query.user_id | |
) | |
# Use the llm_agent function to generate the presentation content | |
response_stream = await llm_agent(llm_query, background_tasks, api_key) | |
# Collect the entire response | |
full_response = "" | |
html_content = "" | |
marp_content_with_images = "" | |
async for chunk in response_stream.body_iterator: | |
full_response += chunk | |
logger.info(f"####------LLM RESPONSE-------#####/n {full_response}") | |
# Extract the Marp presentation content | |
marp_content = extract_data_from_tag(full_response, "marp_presentation") | |
if marp_content: | |
# Replace image keywords | |
marp_content_with_images = replace_image_keywords(marp_content.strip("```").strip("``")) | |
# Convert Markdown to HTML | |
html_content = convert_markdown_marp(marp_content_with_images, 'html') | |
return JSONResponse({ | |
"response": extract_data_from_tag(full_response, "marp_presentation",invert=True), | |
"markdown_presentation": marp_content_with_images, | |
"html_presentation": html_content.decode() if isinstance(html_content, bytes) else html_content | |
}) | |
# ============================================================================ | |
# AUDIO ENDPOINTS | |
# ============================================================================ | |
from enum import Enum | |
import io | |
openai_client = OpenAI(api_key = os.getenv("OPENAI_API_KEY")) | |
class OpenaiTTSModels: | |
class ModelType(str, Enum): | |
tts_1_hd = "tts-1-hd" | |
tts_1 = "tts-1" | |
class VoiceType(str, Enum): | |
alloy = "alloy" | |
echo = "echo" | |
fable = "fable" | |
onyx = "onyx" | |
nova = "nova" | |
shimmer = "shimmer" | |
class OutputFormat(str, Enum): | |
mp3 = "mp3" | |
opus = "opus" | |
aac = "aac" | |
flac = "flac" | |
wav = "wav" | |
pcm = "pcm" | |
class AudioAPI: | |
class TTSRequest(BaseModel): | |
model: OpenaiTTSModels.ModelType = Field(..., description="The TTS model to use") | |
voice: OpenaiTTSModels.VoiceType = Field(..., description="The voice type for speech synthesis") | |
input: str = Field(..., description="The text to convert to speech") | |
output_format: OpenaiTTSModels.OutputFormat = Field(default=OpenaiTTSModels.OutputFormat.mp3, description="The audio output format") | |
async def text_to_speech_v2(request: AudioAPI.TTSRequest, api_key: str = Depends(verify_api_key)): | |
""" | |
Convert text to speech using OpenAI's TTS API with real-time audio streaming. | |
Requires API Key authentication via X-API-Key header. | |
""" | |
try: | |
response = openai_client.audio.speech.create( | |
model=request.model, | |
voice=request.voice, | |
input=request.input, | |
response_format="mp3" # Always set to MP3 | |
) | |
return StreamingResponse(io.BytesIO(response.content), media_type="audio/mp3") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# try: | |
# response = openai_client.audio.speech.create( | |
# model=request.model, | |
# voice=request.voice, | |
# input=request.input, | |
# response_format=request.output_format | |
# ) | |
# content_type = f"audio/{request.output_format.value}" | |
# if request.output_format == OpenaiTTSModels.OutputFormat.pcm: | |
# content_type = "audio/pcm" | |
# return StreamingResponse(io.BytesIO(response.content), media_type=content_type) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# ============================================================================ | |
# Main Execution | |
# ============================================================================ | |
from fastapi.middleware.cors import CORSMiddleware | |
# CORS middleware setup | |
app.add_middleware( | |
CORSMiddleware, | |
#allow_origins=["*"], | |
allow_origins=[ | |
"http://127.0.0.1:5501/", | |
"http://localhost:5501", | |
"http://localhost:3000", | |
"https://www.elevaticsai.com", | |
"https://www.elevatics.cloud", | |
"https://www.elevatics.online", | |
"https://www.elevatics.ai", | |
"https://elevaticsai.com", | |
"https://elevatics.cloud", | |
"https://elevatics.online", | |
"https://elevatics.ai", | |
"https://pvanand-specialized-agents.hf.space", | |
"https://pvanand-audio-chat.hf.space/" | |
], | |
allow_credentials=True, | |
allow_methods=["GET", "POST"], | |
allow_headers=["*"], | |
) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |