from fastapi import FastAPI, HTTPException, UploadFile, File,Request,Depends,status,BackgroundTasks from fastapi.security import OAuth2PasswordBearer from pydantic import BaseModel, Json,EmailStr from typing import Optional from pinecone import Pinecone, ServerlessSpec from uuid import uuid4 import os from dotenv import load_dotenv from rag import * from fastapi.responses import StreamingResponse import json from prompt import * from typing import Literal import time from fastapi.middleware.cors import CORSMiddleware import requests import smtplib from email.mime.text import MIMEText load_dotenv() ## setup pinecone index pinecone_api_key = os.environ.get("PINECONE_API_KEY") pc = Pinecone(api_key=pinecone_api_key) index_name = os.environ.get("INDEX_NAME") # change if desired existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] if index_name not in existing_indexes: pc.create_index( name=index_name, dimension=1536, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"), ) while not pc.describe_index(index_name).status["ready"]: time.sleep(1) index = pc.Index(index_name) vector_store = PineconeVectorStore(index=index, embedding=embedding) ## setup authorization api_keys = [os.environ.get("FASTAPI_API_KEY")] oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # use token authentication def api_key_auth(api_key: str = Depends(oauth2_scheme)): if api_key not in api_keys: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Forbidden" ) dev_mode = os.environ.get("DEV") if dev_mode == "True": app = FastAPI() else: app = FastAPI(dependencies=[Depends(api_key_auth)]) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) # Pydantic model for the form data class ContactForm(BaseModel): name: str email: EmailStr message: str def send_simple_message(to,subject,text): api_key = os.getenv("MAILGUN_API_KEY") return requests.post( "https://api.mailgun.net/v3/sandboxafc6970ffdab40ee9566a4e180b117fd.mailgun.org/messages", auth=("api", api_key), data={"from": "Excited User ", "to": [to], "subject": subject, "text": text}) # Function to send email def send_email(form_data: ContactForm): # sender_email = os.getenv("SENDER_EMAIL") # sender_password = os.getenv("SENDER_PASSWORD") receiver_email = os.getenv("RECEIVER_EMAIL") # Your email # Setup the message content text = f"Name: {form_data.name}\nEmail: {form_data.email}\nMessage: {form_data.message}" title = "New message from your website!" # Send the email try: send_simple_message(receiver_email,title,text) except Exception as e: print(e) return {"message": "Failed to send email."} # Endpoint to handle form submission @app.post("/send_email") async def send_contact_form(form_data: ContactForm, background_tasks: BackgroundTasks): background_tasks.add_task(send_email, form_data) return {"message": "Email sent successfully!"} class UserInput(BaseModel): query: str stream: Optional[bool] = False messages: Optional[list[dict]] = [] class ChunkToDB(BaseModel): message: str title: str @app.post("/add_chunk_to_db") async def add_chunk_to_db(chunk: ChunkToDB): try: title = chunk.title message = chunk.message return get_vectorstore(text_chunk=message,index=index,title=title) except Exception as e: return {"message": str(e)} @app.get("/list_vectors") async def list_vectors(): try: return index.list() except Exception as e: return {"message": str(e)} @app.post("/generate") async def generate(user_input: UserInput): try: print(user_input.stream,user_input.query) if user_input.stream: return StreamingResponse(generate_stream(user_input.query,user_input.messages,index_name=index,stream=True,vector_store=vector_store),media_type="application/json") else: return generate_stream(user_input.query,user_input.messages,index_name=index,stream=False,vector_store=vector_store) except Exception as e: return {"message": str(e)} @app.post("/retreive_context") async def retreive_context_response(query: str): try: return retreive_context(index=index,query=query) except Exception as e: return {"message": str(e)} @app.delete("/delete_vector") async def delete_vector(filename_id: str): try: return index.delete(ids=[filename_id]) except Exception as e: return {"message": str(e)} @app.get("/check_server") async def check_server(): return {"message":"Server is running"} @app.get("/") async def read_root(): return {"message":"Welcome to the AI API"}