Spaces:
Running
Running
from fastapi import FastAPI, HTTPException, UploadFile, File,Request,Depends,status,BackgroundTasks | |
from fastapi.security import OAuth2PasswordBearer | |
from pydantic import BaseModel, Json,EmailStr | |
from typing import Optional | |
from pinecone import Pinecone, ServerlessSpec | |
from uuid import uuid4 | |
import os | |
from dotenv import load_dotenv | |
from rag import * | |
from fastapi.responses import StreamingResponse | |
import json | |
from prompt import * | |
from typing import Literal | |
import time | |
from fastapi.middleware.cors import CORSMiddleware | |
import requests | |
import smtplib | |
from email.mime.text import MIMEText | |
load_dotenv() | |
## setup pinecone index | |
pinecone_api_key = os.environ.get("PINECONE_API_KEY") | |
pc = Pinecone(api_key=pinecone_api_key) | |
index_name = os.environ.get("INDEX_NAME") # change if desired | |
existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] | |
if index_name not in existing_indexes: | |
pc.create_index( | |
name=index_name, | |
dimension=1536, | |
metric="cosine", | |
spec=ServerlessSpec(cloud="aws", region="us-east-1"), | |
) | |
while not pc.describe_index(index_name).status["ready"]: | |
time.sleep(1) | |
index = pc.Index(index_name) | |
vector_store = PineconeVectorStore(index=index, embedding=embedding) | |
## setup authorization | |
api_keys = [os.environ.get("FASTAPI_API_KEY")] | |
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # use token authentication | |
def api_key_auth(api_key: str = Depends(oauth2_scheme)): | |
if api_key not in api_keys: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Forbidden" | |
) | |
dev_mode = os.environ.get("DEV") | |
if dev_mode == "True": | |
app = FastAPI() | |
else: | |
app = FastAPI(dependencies=[Depends(api_key_auth)]) | |
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) | |
# Pydantic model for the form data | |
class ContactForm(BaseModel): | |
name: str | |
email: EmailStr | |
message: str | |
def send_simple_message(to,subject,text): | |
api_key = os.getenv("MAILGUN_API_KEY") | |
return requests.post( | |
"https://api.mailgun.net/v3/sandboxafc6970ffdab40ee9566a4e180b117fd.mailgun.org/messages", | |
auth=("api", api_key), | |
data={"from": "Excited User <mailgun@sandboxafc6970ffdab40ee9566a4e180b117fd.mailgun.org>", | |
"to": [to], | |
"subject": subject, | |
"text": text}) | |
# Function to send email | |
def send_email(form_data: ContactForm): | |
# sender_email = os.getenv("SENDER_EMAIL") | |
# sender_password = os.getenv("SENDER_PASSWORD") | |
receiver_email = os.getenv("RECEIVER_EMAIL") # Your email | |
# Setup the message content | |
text = f"Name: {form_data.name}\nEmail: {form_data.email}\nMessage: {form_data.message}" | |
title = "New message from your website!" | |
# Send the email | |
try: | |
send_simple_message(receiver_email,title,text) | |
except Exception as e: | |
print(e) | |
return {"message": "Failed to send email."} | |
# Endpoint to handle form submission | |
async def send_contact_form(form_data: ContactForm, background_tasks: BackgroundTasks): | |
background_tasks.add_task(send_email, form_data) | |
return {"message": "Email sent successfully!"} | |
class UserInput(BaseModel): | |
query: str | |
stream: Optional[bool] = False | |
messages: Optional[list[dict]] = [] | |
class ChunkToDB(BaseModel): | |
message: str | |
title: str | |
async def add_chunk_to_db(chunk: ChunkToDB): | |
try: | |
title = chunk.title | |
message = chunk.message | |
return get_vectorstore(text_chunk=message,index=index,title=title) | |
except Exception as e: | |
return {"message": str(e)} | |
async def list_vectors(): | |
try: | |
return index.list() | |
except Exception as e: | |
return {"message": str(e)} | |
async def generate(user_input: UserInput): | |
try: | |
print(user_input.stream,user_input.query) | |
if user_input.stream: | |
return StreamingResponse(generate_stream(user_input.query,user_input.messages,index_name=index,stream=True,vector_store=vector_store),media_type="application/json") | |
else: | |
return generate_stream(user_input.query,user_input.messages,index_name=index,stream=False,vector_store=vector_store) | |
except Exception as e: | |
return {"message": str(e)} | |
async def retreive_context_response(query: str): | |
try: | |
return retreive_context(index=index,query=query) | |
except Exception as e: | |
return {"message": str(e)} | |
async def delete_vector(filename_id: str): | |
try: | |
return index.delete(ids=[filename_id]) | |
except Exception as e: | |
return {"message": str(e)} | |
async def check_server(): | |
return {"message":"Server is running"} | |
async def read_root(): | |
return {"message":"Welcome to the AI API"} | |