shopintelbackend / main.py
arkamaldeen's picture
Upload 5 files
1e0aaab verified
from fastapi import FastAPI, Request, File, UploadFile, HTTPException, Form, Response, Request
from fastapi.responses import JSONResponse
import requests
from pydantic import BaseModel
# from IPython.display import display, Markdown
from fastapi.middleware.cors import CORSMiddleware
from langchain_community.vectorstores import Qdrant
from langchain_google_genai import ChatGoogleGenerativeAI
# from langchain_together import Together
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings, HuggingFaceEmbeddings
# from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings
from langchain.chains.query_constructor.base import AttributeInfo
from langchain_core.prompts import ChatPromptTemplate
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_core.output_parsers import StrOutputParser
from qdrant_client import QdrantClient
from langchain_core.prompts import PromptTemplate
import os
from dotenv import load_dotenv
import random
from pymongo import MongoClient
import pandas as pd
import io
from langchain.docstore.document import Document
from bson import ObjectId
from datetime import datetime, timedelta
from uuid import uuid4
from cryptography.fernet import Fernet
import base64
import json
# import uvicorn
from langchain_core.runnables import RunnablePassthrough
from transformers import AutoModel, AutoTokenizer
from langchain.globals import set_debug, set_verbose
from operator import itemgetter
from langchain_openai.chat_models import ChatOpenAI
import openai
from datetime import datetime
set_debug(True)
# set_verbose(True)
# load_dotenv()
# API KEYS AND SECRETS LOADED FROM ENV
# SECRET_KEY = os.getenv("SECRET_KEY").encode()
# GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
# MONGODB_USER = os.getenv("MONGODB_USER")
# MONGODB_PASSWORD = os.getenv("MONGODB_PASSWORD")
SECRET_KEY = os.environ.get("SECRET_KEY").encode()
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY")
MONGODB_USER = os.environ.get("MONGODB_USER")
MONGODB_PASSWORD = os.environ.get("MONGODB_PASSWORD")
# print(SECRET_KEY, GOOGLE_API_KEY, OPENAI_API_KEY, QDRANT_API_KEY, MONGODB_USER, MONGODB_PASSWORD)
app = FastAPI()
# setting up the secret key for encryption
f = Fernet(SECRET_KEY)
class LoginData(BaseModel):
client_id: str
client_secret: str
code: str
redirect_uri: str
class RefreshToken(BaseModel):
user_id: str
client_id: str
client_secret: str
# Allow all origins
origins = ['http://localhost:3000', 'http://127.0.0.1:3000',
'https://localhost:3000', 'https://127.0.0.1:3000',
'http://localhost:3001', 'http://127.0.0.1:3001',
'https://localhost:3001', 'https://127.0.0.1:3001', 'http://127.0.0.1:5500']
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
)
print("Starting the server.........")
try:
print("Waiting for the embedding model to load.............")
# Load the embedding model
model_name = "BAAI/bge-large-en"
model_kwargs = {"device": "cpu"}
encode_kwargs = {"normalize_embeddings": True}
embeddings = HuggingFaceBgeEmbeddings(
model_name=model_name,
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs
)
print("Embedding model loaded.............")
except Exception as e:
print(f"Error loading embedding model: {e}")
try:
print("Loading Qdrant client")
print("Waiting for the Qdrant embeddings from Docker to load.............")
qdrantClient = QdrantClient(
url="https://0b1a549b-0a48-4407-83ba-760296707b5d.us-east4-0.gcp.cloud.qdrant.io:6333",
api_key=QDRANT_API_KEY,
)
print("testing qdrantClient")
print("all collections:", qdrantClient.get_collections())
print("Qdrant embeddings from Docker were loaded.............")
except Exception as e:
print(f"Error loading Qdrant embeddings: {e}")
try:
print("Waiting for the LLM from Docker to load.............")
llm = ChatOpenAI(model="gpt-3.5-turbo", api_key=OPENAI_API_KEY)
print("LLM model loaded.............")
except Exception as e:
print(f"Error loading LLM: {e}")
# FUNCTIONS TO SETUP THE CONNECTIONS
# Function to connect to MongoDB
def connect_mongo():
try:
uri = f"mongodb+srv://{MONGODB_USER}:{MONGODB_PASSWORD}@cluster0.x69zkkf.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
mongodbClient = MongoClient(uri, authSource='admin')
# mongodbClient = MongoClient(host="localhost",port= 27017, username="user",password='root', authSource='admin')
# mongodbClient = MongoClient(host="localhost",port= 27017, username="user",password='root', authSource='admin')
print("Connected to MongoDB")
return mongodbClient
except Exception as e:
print(f"Error connecting to MongoDB: {e}")
# FUNCTIONS TO UPLOAD THE FILES
# check upload file validity
def check_file_validation(type, file_content, file_name):
if type == "productsDB":
# check for the file type
if file_name.endswith('.csv'):
df = pd.read_csv(io.BytesIO(file_content))
elif file_name.endswith(('.xls', '.xlsx')):
df = pd.read_excel(io.BytesIO(file_content))
else:
print("Unsupported file format")
raise ValueError("Unsupported file format")
# Setting up the required columns
required_columns = ['id', 'title', 'description', 'product_link', 'sale_price', ]
columns_present = set(df.columns)
if set(required_columns).issubset(columns_present):
print("All required columns are present")
return True
else:
print("Missing required columns")
raise ValueError("Missing required columns")
else:
if file_name.endswith('.csv'):
df = pd.read_csv(io.BytesIO(file_content))
elif file_name.endswith(('.xls', '.xlsx')):
df = pd.read_excel(io.BytesIO(file_content))
else:
print("Unsupported file format")
raise ValueError("Unsupported file format")
# Setting up the required columns
required_columns = ['id', 'review_date', 'review_time','rating_details']
columns_present = set(df.columns)
if set(required_columns).issubset(columns_present):
print("All required columns are present")
return True
else:
print("Missing required columns")
raise ValueError("Missing required columns")
# Function to save data to MongoDB
def save_to_mongodb(file_content, file_name, collection_name, operation,db, type):
try:
if file_name.endswith('.csv'):
df = pd.read_csv(io.BytesIO(file_content))
elif file_name.endswith(('.xls', '.xlsx')):
df = pd.read_excel(io.BytesIO(file_content))
else:
print("Unsupported file format")
raise ValueError("Unsupported file format")
if type == "productsDB":
if "all_specifications" in df.columns:
print("All Specifications columns present")
df['all_specifications'] = df["all_specifications"].apply(eval)
if "alt_images" in df.columns:
print("Alt Images columns present")
df['alt_images'] = df["alt_images"].apply(eval)
elif type == "reviewsDB":
if "review_images" in df.columns:
print("image columns present")
df['review_images'] = df["review_images"].apply(eval)
uploaded_file = df.to_dict(orient="records")
if operation == "replace":
if collection_name in db.list_collection_names():
db[collection_name].drop()
print("Dropped existing database")
db.create_collection(collection_name)
db[collection_name].insert_many(uploaded_file)
print("Inserted new database after dropping the existing one")
else:
print(f"Collection {collection_name} does not exist")
raise ValueError(f"Collection {collection_name} does not exist")
elif operation == "append":
# Check if the collection exists
if collection_name in db.list_collection_names():
print("Collection exists")
# Check if the collection is empty
if db[collection_name].count_documents({}) > 0:
print("Collection not empty")
# iterating through each row and append it to the collection
for record in uploaded_file:
# logic to check if the record already exists in the collection
existing_record = db[collection_name].find_one({'Id': record['Id']})
if existing_record:
db[collection_name].update_one({'Id': record['Id']}, {'$set': record})
print(f"Record with id {record['Id']} already exists in the collection and updated")
else:
db[collection_name].insert_one(record)
# print(f"Record with id {record['id']} appended to the collection")
else:
print(f"Collection {collection_name} does not exist")
raise ValueError(f"Collection {collection_name} does not exist")
elif operation == "new":
db.create_collection(collection_name)
db[collection_name].insert_many(uploaded_file)
print("Inserted new database")
except Exception as e:
print(f"Error saving to MongoDB: {e}")
raise HTTPException(status_code=400, detail=f"Error saving to MongoDB: {e}")
# FUNCTIONS TO FETCH DATA FOR THE DASHBOARD INGESTION
# Function to retrieve data from MongoDB
def get_data_from_mongodb(collection_name, db, skip=0, limit=10, ):
try:
# print("Collection_name in get_data_from_mongodb:", collection_name)
if collection_name in db.list_collection_names():
# print("collection found... in data")
# total_data = db[collection_name].find()
data = db[collection_name].find().skip(skip).limit(limit)
data = [{k: v for k, v in doc.items() if k != '_id'} for doc in data]
# print("Length in get_data_from_mongodb:", len(data))
return data
else:
# print(f"Collection {collection_name} does not exist in data")
return []
except Exception as e:
print(f"Error getting data from MongoDB: {e}")
raise HTTPException(status_code=400, detail=f"Error getting data from MongoDB: {e}")
# Function to get data count from MongoDB
def get_data_count_from_mongodb(collection_name, db):
try:
if collection_name in db.list_collection_names():
# print("collection found... in count")
total_data = db[collection_name].find()
data_count = len(list(total_data))
return data_count
else:
# print(f"Collection {collection_name} does not exist in count")
return 0
except Exception as e:
print(f"Error getting data count from MongoDB: {e}")
raise HTTPException(status_code=400, detail=f"Error getting data count from MongoDB: {e}")
# FUNCTIONS FOR QDRANT EMBEDDINGS
# Function to get the vector store
def get_vector_store(shopId):
try:
print("shopId:", shopId)
print("all collections in get_vector_store:", qdrantClient.get_collections())
vector_store = Qdrant(
client = qdrantClient,
collection_name = shopId,
embeddings = embeddings,
)
return vector_store
except Exception as e:
print(f"failed to load the vector store, {e}")
raise HTTPException(status_code=400, detail=f"failed to load the vector store, {e}")
# Function to retrieve data from MongoDB for embeddings
def get_data_from_mongodb_for_embeddings(collection_name, db):
try:
# print("Collection_name in get_data_from_mongodb:", collection_name)
if collection_name in db.list_collection_names():
print("collection found... in data")
data = db[collection_name].find()
data = [{k: v for k, v in doc.items() if k != '_id'} for doc in data]
print("Length in get_data_from_mongodb:", len(data))
return data
else:
print(f"Collection {collection_name} does not exist in data")
raise HTTPException(status_code=400, detail=f"Collection {collection_name} does not exist in data")
except Exception as e:
print(f"Error getting data from MongoDB: {e}")
raise HTTPException(status_code=400, detail=f"Error getting data from MongoDB: {e}")
# filtering the product for embedding
def transform_products(product_list):
tranformed_products = []
for product in product_list:
tranformed_product = {}
for key, value in product.items():
# skipping these keys
if key not in ['_id', 'source', 'alt_images', 'reviews']:
if key == "all_specifications":
tranformed_product[key] = str(value)
else:
tranformed_product[key] = value
# adding the first 15 reviews
tranformed_product['reviews'] = json.dumps([review['rating_details'] for review in product.get('reviews', [])[:15]])
# tranformed_product['reviews'] = [review['rating_details'] for review in product.get('reviews', [])[:15]]
tranformed_products.append(tranformed_product)
return tranformed_products
# Function to create embeddings and save in Qdrant cloud
def create_embeddings(mapped_data_collection, embedding_collection, db):
# global vector_store
try:
try:
print("mapped_data_collection:", mapped_data_collection)
product_list = get_data_from_mongodb_for_embeddings(mapped_data_collection, db)
except Exception as e:
print(f"Error getting data from MongoDB for embeddings: {e}")
texts = [str(item['title']) + "\n" + str(item['description']) for item in product_list]
# skip_columns = ['_id', 'source', 'alt_images']
# metadatas = [{k: str(v) if isinstance(v, ObjectId) else v for k, v in item.items()} for item in product_list]
# print("product_list:", product_list)
metadatas = transform_products(product_list)
# print(metadatas[0])
docs = [Document(page_content=txt, metadata={"source": meta}) for txt, meta in zip(texts, metadatas)]
print("New Data loaded.........")
print("Creating embeddings in qdrant .......")
# print("QDRANT_API_KEY:", QDRANT_API_KEY)
vector_store = Qdrant.from_documents(
docs,
embeddings,
url="https://0b1a549b-0a48-4407-83ba-760296707b5d.us-east4-0.gcp.cloud.qdrant.io:6333",
api_key=QDRANT_API_KEY,
collection_name=embedding_collection,
force_recreate=True
)
print("New Vector store loaded.........")
except Exception as e:
print(f"Error creating embeddings: {e}")
raise HTTPException(status_code=400, detail=f"Error creating embeddings: {e}")
# function to retrieve products from qdrant
def retrieve_product(user_input, vector_store, k=10):
result = vector_store.similarity_search_with_score(query=user_input, k=k)
return result
# function to create context from user query
def create_context(user_input, vector_store):
result = retrieve_product(user_input, vector_store)
context = ""
for index, value in enumerate(result):
product = value
product_title = product[
0
].page_content # Extracting the page_content for each result which is a string
product_metadata = product[0].metadata[
"source"
] # Extracting the metadata for each result which is a dictionary with key values
context += f"""
* Product {index + 1} -
- Product name : {product_metadata["name"]}
- Product price: {product_metadata["discount_price"]}
- Brief description of the product: {product_metadata["product_desc"]}
- Detailed description of the product: {product_metadata["about_this_item"]}
- Rating value (1.0 - 5.0): {product_metadata["ratings"]}
- Overall review: {product_metadata["overall_review"]}
"""
# print(f"product_title: {type(product_title)}", product_title)
# print(f"product_metadata: {type(product_metadata)}", product_metadata)
return context
# Define the aggregation pipeline for merging mongoDB collections
def create_map_data_pipeline(reviews_collection_name, merged_collection_name):
pipeline = [
{
"$lookup": {
"from": reviews_collection_name,
"localField": "id",
"foreignField": "id",
"as": "reviews"
}
},
{
"$project": {
"_id": 1,
"id": 1,
"title": 1,
"description": 1,
"product_link": 1,
"sale_price": 1,
"all_specifications": 1,
"overall_rating": 1,
"image": 1,
"alt_images": 1,
"source": 1,
"domain": 1,
"reviews": {
"$map": {
"input": "$reviews",
"as": "review",
"in": {
"_id": "$$review._id",
"rating": "$$review.rating",
"rating_title": "$$review.rating_title",
"rating_details": "$$review.rating_details",
"review_date": "$$review.review_date",
"review_time": "$$review.review_time",
# "review_images": "$$review.review_images",
"source": "$$review.source"
}
}
}
}
},
{
"$out": merged_collection_name
}
]
return pipeline
def parse_datetime(date_str, time_str):
return datetime.strptime(f"{date_str} {time_str}", "%m/%d/%Y %H:%M")
# COMMON GLOBAL VARIABLES
# prompt template for the mistral model
# template = """
# You are a friendly, conversational AI ecommerce assistant. The context includes 5 ecommerce products.
# Use only the following context, to find the answer to the questions from the customer.
# Its very important that you follow the below instructions.
# -Dont use general knowledge to answer the question
# -If you dont find the answer from the context or the question is not related to the context, just say that you don't know the answer.
# -By any chance the customer should not know you are referring to a context.
# Context:
# {context}
# Question:
# {question}
# Helpful Answer:
# """
metadata_field_info = [
AttributeInfo(
name="title",
description="The Name of the product",
type="string",
),
AttributeInfo(
name="product_link",
description="The link of the product",
type="string",
),
AttributeInfo(
name="sale_price",
description="The actual price of the product",
type="string",
),
AttributeInfo(
name="sale_price",
description="The actual price of the product",
type="integer",
),
AttributeInfo(
name="description",
description="The description of the product",
type="string",
),
AttributeInfo(
name="all_specifications",
description="The Specification ,Hardware and Software configuration of the product like ram/RAM , storage/ROM ",
type="string",
),
AttributeInfo(
name="all_specifications",
description="The build of the product like ram/RAM , storage/ROM/memory , processors ",
type="integer",
),
# AttributeInfo(
# name="all_specifications",
# description="A dictionary or an object with key value pairs of specifications of the product like display:6.1 inches, battery: 5000 mAh, camera: 48MP",
# type="dictionary or object",
# ),
AttributeInfo(
name="domain",
description="The domain of the product cellphone, smartphone, mobilephone, phone, mobile",
type="string",
),
AttributeInfo(
name="image",
description="The link to the image of the product",
type="string",
),
AttributeInfo(
name="overall_rating",
description="The overall rating of the product",
type="string",
),
AttributeInfo(
name="overall_rating",
description="The overall rating of the product",
type="integer",
),
AttributeInfo(
name="reviews",
description="A list of reviews of the product from the customers.",
type="string"
),
# AttributeInfo(
# name="reviews",
# description="A list of strings which are reviews of the product from the customers.",
# type="list[string]"
# ),
]
document_content_description = "A review of the ecommerce product data."
# ECOMMERCE_TEMPLATE = """\
# You are an AI assistant named ShopIntel, with expertise in electronic products, especially smartphones, laptops, displays, and electronics ecommerce gadgets and products. Your role is to assist users by providing accurate product information, answering their questions about electronics products in proper headings and bullet points, and making their shopping experience enjoyable and efficient.
# YOU ARE BEST AT ANSWERING THESE BELOW TASKS:
# (use this only for product-related queries):
# - When a user asks you a product-related query, answer them in a more concise and accurate manner with all the specifications and product links(source link) from the self_query_retriever(context).
# - When a user asks to compare between products, then give the response output in tabular format with all the specifications and source link(source link) from the self_query_retriever(context).
# (use this for conversational and greetings related queries):
# - When a user greets you, just greet them and give your introduction and what you are capable of, and make their conversation more engaging.\
# Also, suggest them top 5 potential questions related to ecommerce products asked by the users.\
# (use this when user asks some irrelevant queries out of context):
# - When a user asks you about some product which you do not have the answer, just politely deny them and suggest some questions what you can answer them and you are built for.
# - When a user asks you an out-of-context query, just say, "Oops!Looks like your question took a detour from our shopping adventure.\
# My expertise lies in assisting with queries such as:" and give some suggestions related to smartphones, laptops, and other electronics gadgets to ask.\
# Also suggest most trending questions asked by the users or the user himself based on their past shopping experience.
# <question>
# Question:{question}
# <question>
# """
# prompt = ChatPromptTemplate.from_template(ECOMMERCE_TEMPLATE)
# ECOMMERCE_TEMPLATE = """\
# You are an AI assistant named ShopIntel, with expertise in electronic products, especially smartphones, laptops, displays, and electronics ecommerce gadgets and products. Your role is to assist users by providing accurate product information, answering their questions about electronics products in proper headings and bullet points, and making their shopping experience enjoyable and efficient.
# YOU ARE BEST AT ANSWERING THESE BELOW TASKS:
# (use this only for product-related queries):
# - When a user asks you a product-related query, answer them in a more concise and accurate manner with all the specifications and product links(source link) as well.
# - When a user asks to compare between products, then give the response output in tabular format with all the specifications and source link(source link) as well.
# (use this for conversational and greetings related queries):
# - When a user greets you, just greet them and give your introduction and what you are capable of, and make their conversation more engaging.\
# Also, suggest them top 5 potential questions related to ecommerce products asked by the users.\
# (use this when user asks some irrelevant queries out of context):
# - When a user asks you about some product which you do not have the answer, just politely deny them and suggest some questions what you can answer them and you are built for.
# - When a user asks you an out-of-context query, just say, "Oops!Looks like your question took a detour from our shopping adventure.\
# My expertise lies in assisting with queries such as:" and give some suggestions related to smartphones, laptops, and other electronics gadgets to ask.\
# Also suggest most trending questions asked by the users or the user himself based on their past shopping experience.
# Query:{question}
# Context:{context}
# """
# rag_prompt = ChatPromptTemplate.from_template(ECOMMERCE_TEMPLATE)
ECOMMERCE_TEMPLATE = """\
You are an AI assistant named ShopIntel, with expertise in electronic products, especially smartphones, laptops, displays, and electronics ecommerce gadgets and products. Your role is to assist users by providing accurate product information, answering their questions about electronics products in proper headings and bullet points, and making their shopping experience enjoyable and efficient.
YOU ARE BEST AT ANSWERING THESE BELOW TASKS:
(use this only for product-related queries from the self_query_retriever):
- If the product doesnt match with the context product, skip those products.
- When a user asks you a product-related query, answer them in a more concise and accurate manner with all the specifications and product links(source link) as well.
- When a user asks to compare between products, then give the response output in an accurate manner comparing all the specifications and source link(source link) as well.
(use this for conversational and greetings related queries):
- When a user greets you, just greet them and give your introduction and what you are capable of, and make their conversation more engaging. Also, suggest them some potential questions related to ecommerce products to buy.
(use this when user asks some irrelevant queries out of context):
- When a user asks you an out-of-context query, just say, "Oops!Looks like your question took a detour from our shopping adventure. My expertise lies in assisting with queries such as:" and give them 5 suggestions related to smartphones, laptops, and other electronics gadgets to ask.
Query:{question}
Context:{context}
"""
prompt = ChatPromptTemplate.from_template(ECOMMERCE_TEMPLATE)
# Connect to MongoDB database and creating a collection 'ShopIntelCollection'
mongodbClient = connect_mongo()
# qdrantClient = connect_qdrant()
# APP ROUTES
# Function to login user and get access token from google
@app.post("/google/oauth/token")
async def google_oauth(login: LoginData, response: Response, request: Request):
global f
try :
print("Inside google oauth")
# print("login client id", login.client_id, type(login.client_id))
# print("login client secret", login.client_secret, type(login.client_secret))
# print("login code", login.code, type(login.code))
# print("login redirect uri", login.redirect_uri, type(login.redirect_uri))
db = mongodbClient["ShopIntel_Users"]
# setting the properites to get the access token
try:
token_url = "https://oauth2.googleapis.com/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Access-Control-Allow-Origin": "http://localhost:3000",
}
payload = {
"client_id": login.client_id,
"client_secret": login.client_secret,
"code": login.code,
"grant_type": "authorization_code",
"redirect_uri": login.redirect_uri,
}
# using the token from react-oauth to get the access token
res = requests.post(token_url, headers=headers, data=payload)
res = res.json()
# print("res", res)
# Extracting the expires_in to setup the cookie for expiretoken
expires_in = res.get("expires_in")
access_token = res.get("access_token")
refresh_token = res.get("refresh_token")
except Exception as e:
print(f"Failed to fetch OAuth token: {e}")
raise HTTPException(status_code=500, detail="Failed to fetch OAuth token")
# print("access_token", access_token)
# print("refresh_token", refresh_token)
# Retrieve user information using the access token
try :
userinfo_url = "https://www.googleapis.com/oauth2/v1/userinfo"
userinfo_headers = {
"Authorization": f"Bearer {access_token}"
}
userinfo_res = requests.get(userinfo_url, headers=userinfo_headers)
if userinfo_res.status_code != 200:
raise HTTPException(status_code=userinfo_res.status_code, detail=userinfo_res.text)
userinfo = userinfo_res.json()
# print(userinfo)
# Extract email, name, profile or any other required information
email = userinfo.get("email")
name = userinfo.get("name")
profile = userinfo.get("picture")
except Exception as e:
print(f"Failed to fetch user information: {e}")
raise HTTPException(status_code=userinfo_res.status_code, detail=f"Failed to fetch user information {e}")
# encrypt the refresh token and access token and converting the bytes to base64
encrypted_refresh_token_bytes = f.encrypt(refresh_token.encode())
encrypted_access_token_bytes = f.encrypt(access_token.encode())
encrypted_refresh_token_base64 = base64.urlsafe_b64encode(encrypted_refresh_token_bytes).decode()
encrypted_access_token_base64 = base64.urlsafe_b64encode(encrypted_access_token_bytes).decode()
# print("encrypted_access_token_base64", encrypted_access_token_base64, type(encrypted_access_token_base64))
# if (encrypted_access_token_base64 == access_token):
# print("Warning They match !")
# else :
# print("Success they do not match !")
# if (base64.urlsafe_b64decode(encrypted_access_token_base64) == encrypted_access_token_bytes):
# print("Success Was decoded back to bytes and they match !")
# else :
# print("Warning Was not decoded back to bytes and they do not match !")
# decrypted_refresh_token = f.decrypt(encrypted_refresh_token).decode()
# print("decrypted_refresh_token", decrypted_refresh_token, type(decrypted_refresh_token))
# Check if the user already exists in the database
try:
if email:
# Check if the collection exists
if db.list_collection_names():
print("Collection exists")
# Check if the collection is empty
if db["users_collection"].count_documents({}) > 0:
print("Collection not empty")
existing_user = db["users_collection"].find_one({"email": email})
# Extracting the user _id from mongodb database
if existing_user:
print("A user exists with the same email. Updating refresh token.")
db["users_collection"].update_one(
{"email": email},
{"$set": {"refresh_token": encrypted_refresh_token_base64}}
)
user = existing_user["shopId"]
print("Shop id:", user)
else:
print("No user exist with the same email. So creating a new user")
user = str(uuid4())[:8]
print("User id:", user)
document = {"shopId": user, "email": email, "name": name, "profile": profile, "refresh_token": encrypted_refresh_token_base64}
db["users_collection"].insert_one(document)
else:
print("Collection is empty")
user = str(uuid4())[:8]
document = {"shopId": user, "email": email, "name": name, "profile": profile, "refresh_token": encrypted_refresh_token_base64}
db["users_collection"].insert_one(document)
else:
print("Collection does not exist")
db.create_collection("users_collection")
user = str(uuid4())[:8]
document = {"shopId": user, "email": email, "name": name, "profile": profile, "refresh_token": encrypted_refresh_token_base64}
db["users_collection"].insert_one(document)
except Exception as e:
print("Error while saving to mongodb", e)
raise HTTPException(status_code=500, detail="Error while saving to mongodb")
return JSONResponse(content={"message": "Login successful","access_token": encrypted_access_token_base64, "expires_in": expires_in, "shopId": user, "name": name, "email": email, "profile": profile}, status_code=200)
except Exception as e:
print("Error:", e)
return JSONResponse(content={"message": "Login failed"}, status_code=400)
# function to logout user and remove refresh token
@app.post("/logout")
async def logout(request: Request):
try:
data = await request.json()
shopId = data.get("shopId")
db = mongodbClient["ShopIntel_Users"]
print("shopId", shopId)
db["users_collection"].update_one(
{"shopId": shopId},
{"$unset": {"refresh_token": ""}}
)
return JSONResponse(content={"message": "Logout successful"}, status_code=200)
except Exception as e:
print("Error:", e)
# endpoint to refresh access token
@app.post("/refreshToken")
async def refreshToken(refreshToken: RefreshToken, request: Request):
global f
try:
print("Inside refresh token")
print("user_id", refreshToken.user_id)
print("client_id", refreshToken.client_id)
print("client_secret", refreshToken.client_secret)
try :
db = mongodbClient["ShopIntel_Users"]
existing_user = db["users_collection"].find_one({"shopId": refreshToken.user_id})
if existing_user:
print("A user exists, getting the refresh token")
# decoding to bytes and decrypting
encrypted_refresh_token_base64 = existing_user["refresh_token"]
encrypted_refresh_token_bytes = base64.urlsafe_b64decode(encrypted_refresh_token_base64)
refresh_token = f.decrypt(encrypted_refresh_token_bytes).decode()
print("refresh_token", refresh_token)
else:
print("Invalid user id")
raise HTTPException(detail="Invalid user id")
except Exception as e:
print("Failed to fetch refresh token from mongodb: ", e)
raise HTTPException(detail=f"Failed to fetch refresh token from mongodb, error: {e}")
try :
token_url = "https://oauth2.googleapis.com/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
payload = {
"client_id": refreshToken.client_id,
"client_secret": refreshToken.client_secret,
"refresh_token": refresh_token,
"grant_type": "refresh_token"
}
res = requests.post(token_url, headers=headers, data=payload)
if res.status_code != 200:
raise HTTPException(status_code=res.status_code, detail=res.text)
data = res.json()
expires_in = data.get("expires_in")
access_token = data.get("access_token")
# encrypt the access_token and convert the bytes to string for JSON serialization
encrypted_access_token_bytes = f.encrypt(access_token.encode())
encrypted_access_token_base64 = base64.urlsafe_b64encode(encrypted_access_token_bytes).decode()
return JSONResponse(content={"access_token": encrypted_access_token_base64, "expires_in": expires_in}, status_code=200)
except Exception as e:
print("Failed to fetch a new access token using the existing refresh token: ", e)
raise HTTPException(status_code=res.status_code, detail=f"Failed to fetch a new access token using the existing refresh token")
except Exception as e:
print("Failed to fetch refresh token: ", e)
raise HTTPException(status_code=500, detail=f"Failed to fetch refresh token: {e}")
# # message endpoint to chatbot
# @app.get("/message")
# async def message(request: Request):
# data = await request.json()
# message = data["message"]
# try:
# context = create_context(message, vector_store)
# prompt = PromptTemplate(
# template=template, input_variables=["context", "question"]
# )
# prompt_formatted_str = prompt.format(context=context, question=message)
# output = llm.invoke(prompt_formatted_str)
# print(output)
# return JSONResponse(content={"response": output})
# except Exception as e:
# print("Error:", e)
# error_responses = [
# "We apologize for the inconvenience, but an error occurred while processing your request.",
# "We're sorry, but it seems there was a technical issue. Please try again later.",
# "We've encountered an unexpected issue. Let's explore alternative options.",
# "We regret to inform you that our system is experiencing technical difficulties. Thank you for your patience.",
# "Unfortunately, we've encountered a problem on our end. We're investigating and will provide an update soon.",
# "It appears there's a technical glitch in our system. We're working diligently to resolve it.",
# "Regrettably, an unexpected error occurred while processing your request. Please stand by.",
# "Our servers are experiencing some instability. Rest assured, our team is actively addressing the issue.",
# "We're aware of the issue and actively working to resolve it. Thank you for your understanding.",
# ]
# error_message = random.choice(error_responses)
# print(error_message)
# return JSONResponse(content={"response": error_message})
# fetch data from the end point
@app.get("/fetch-data")
async def fetch_data(request: Request):
try:
user = request.query_params.get("shopId")
db = mongodbClient[f"shopID_{user}"]
if request.query_params.get("getDataCount"):
print("Fetching data count")
products_count = get_data_count_from_mongodb(collection_name=f"products-{user}", db = db)
reviews_count = get_data_count_from_mongodb(collection_name=f"reviews-{user}", db = db)
return JSONResponse(content={"products_count": products_count, "reviews_count": reviews_count})
elif request.query_params.get("getDataBase"):
print("Fetching data base")
# print("user uid", user)
page = int(request.query_params.get("page"))
page_size = int(request.query_params.get("pageSize"))
skip = (page - 1) * page_size
products= get_data_from_mongodb(collection_name=f"products-{user}", skip=skip, limit=page_size, db=db)
products_count = get_data_count_from_mongodb(collection_name=f"products-{user}", db=db)
reviews = get_data_from_mongodb(collection_name=f"reviews-{user}", skip=skip, limit=page_size, db=db)
reviews_count = get_data_count_from_mongodb(collection_name=f"reviews-{user}", db=db)
return JSONResponse(content={"products": products, "products_count": products_count, "reviews": reviews, "reviews_count": reviews_count})
except Exception as e:
print(f"An error occurred in fetching data: {e}")
raise HTTPException(status_code=400, detail=f"An error occurred in fetching data: {e}")
# upload file end point
@app.post("/upload-file")
async def upload_file(file: UploadFile = File(...), uploadType: str = Form(...), uploadDatabase: str = Form(...), shopId: str = Form(...)):
try:
db = mongodbClient[f"shopID_{shopId}"]
# print("Uploading file")
# print("Uploading file")
if not file:
raise HTTPException(status_code=400, detail="No file provided")
print("file", file, type(file))
file_content = await file.read()
# print("file successfully read")
file_name = file.filename
upload_type = uploadType
upload_database = uploadDatabase
print("upload_type", upload_type, type(upload_type))
print("upload_database", upload_database, type(upload_database))
if upload_database == "productsDB":
collection_name = f"products-{shopId}"
elif upload_database == "reviewsDB":
collection_name = f"reviews-{shopId}"
# collection_name = f"products-{user1}"
try:
check_file_validation(type=upload_database, file_content=file_content, file_name=file_name)
except ValueError as e:
print(f"File validation failed: {e}")
raise HTTPException(status_code=400, detail=f"File validation failed: {e}")
print("valid file")
if upload_type == "append" or upload_type == "replace" or upload_type == "new":
save_to_mongodb(file_content=file_content, file_name=file_name, collection_name=collection_name, operation=upload_type, db=db, type=upload_database)
else:
raise HTTPException(status_code=400, detail=f"Invalid upload type: {upload_type}")
print("Successfully uploaded file")
return JSONResponse(content={"message": f"The file was successfully uploaded."}, status_code=200)
except HTTPException as http_error:
# Raise HTTP exceptions explicitly
raise http_error
except ValueError as value_error:
# Catch specific ValueError related to file validation
print(f"Error validating file: {value_error}")
raise HTTPException(status_code=400, detail=f"Error validating file: {value_error}")
except Exception as e:
# Catch any other unexpected errors
print(f"An error occurred in uploading file: {e}")
raise HTTPException(status_code=400, detail=f"An error occurred in uploading file: {e}")
# embedding generation endpoint
@app.post("/generate-embeddings")
async def generate_embeddings(request: Request):
try:
data = await request.json()
shopId = data.get("shopId")
print(data)
print(shopId)
merged_collection_name = f"map_data-{shopId}"
reviews_collection_name = f"reviews-{shopId}"
products_collection_name = f"products-{shopId}"
db = mongodbClient[f"shopID_{shopId}"]
if merged_collection_name in db.list_collection_names():
print("map_data collection exist")
db[merged_collection_name].drop()
print("Dropped the existing map_data collection")
else:
print("map_data collection does not exist")
try:
# calling the pipeline to generate merged collection
pipeline = create_map_data_pipeline(reviews_collection_name=reviews_collection_name, merged_collection_name=merged_collection_name)
# aggregating the pipeline
db[products_collection_name].aggregate(pipeline)
except Exception as e:
print(f"An error occurred in generating embeddings collection in mongodb: {e}")
raise HTTPException(status_code=400, detail=f"An error occurred in generating embeddings collection in mongodb: {e}")
try:
create_embeddings(mapped_data_collection=merged_collection_name, embedding_collection=shopId, db=db)
except Exception as e:
print(f"An error occurred in creating embeddings: {e}")
raise HTTPException(status_code=400, detail=f"An error occurred in creating embeddings: {e}")
return JSONResponse(content={"message": "Embedding generation successful"}, status_code=200)
except Exception as e:
print(f"An error occurred in generating embeddings: {e}")
return JSONResponse(content={"message": f"An error occurred in generating embeddings: {e}"}, status_code=400)
# ROUTES FOR CHATBOT
# endpoint to get the productsInformation
@app.post("/productInformation")
async def productInformation(request: Request):
try:
data = await request.json()
shopId = data.get("shopId")
print("shopId", shopId)
product = data.get("productId")
print("product", product)
try:
db = mongodbClient[f"shopID_{shopId}"]
collection_name = f"map_data-{shopId}"
if collection_name in db.list_collection_names():
print("Collection found... in data")
document = db[collection_name].find_one({"id": product})
if document:
print(f"Product found and returning the data for {product}")
# Remove the main document _id if it exists
if '_id' in document:
del document['_id']
if 'reviews' in document:
reviews = document['reviews']
# Sort reviews by date and time first
reviews.sort(key=lambda x: parse_datetime(x['review_date'], x['review_time']), reverse=True)
# Filter and sort reviews with rating >= 4
top_reviews = sorted(
[review for review in reviews if review['rating'] >= 4],
key=lambda x: -x['rating']
)[:3]
# Filter and sort reviews with rating < 4
top_critical_reviews = sorted(
[review for review in reviews if review['rating'] < 4],
key=lambda x: x['rating']
)[:3]
# Remove '_id' from each review in top_reviews
for review in top_reviews:
if '_id' in review:
del review['_id']
# Remove '_id' from each review in top_critical_reviews
for review in top_critical_reviews:
if '_id' in review:
del review['_id']
# Add the top reviews to the document
document['top_reviews'] = top_reviews
document['top_critical_reviews'] = top_critical_reviews
# Remove the original reviews field
del document['reviews']
productInformation = document
else:
print(f"product {product} not found")
raise HTTPException(status_code=400, detail=f"product {product} not found")
else:
print(f"Collection {collection_name} does not exist in data")
raise HTTPException(status_code=400, detail=f"Collection {collection_name} does not exist in data")
except Exception as e:
print(f"Error in getting product Information from mongodb: {e}")
return JSONResponse(content={"productsInformation": productInformation }, status_code=200)
except Exception as e:
print(f"An error occurred in getting productInformation: {e}")
return JSONResponse(content={"message": f"An error occurred in getting productInformation: {e}"}, status_code=400)
# endpoint to message
@app.post("/message")
async def message(request: Request):
try:
data = await request.json()
shopId = data.get("shopId")
userId = data.get("userId")
userIP = data.get("userIP")
sessionId = data.get("sessionId")
message = data.get("message")
print("shopId", shopId)
print("message", message)
print("userId", userId)
print("userIP", userIP)
print("sessionId", sessionId)
try:
db = mongodbClient[f"shopID_{shopId}"]
collection_name = f"chatBotUsers-{shopId}"
if collection_name in db.list_collection_names():
print("Collection found... in data")
document = db[collection_name].find_one({"userId": userId})
if document:
print(f"User found ")
else:
print(f"User {userId} not found")
document = {"userId": userId, "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
db[collection_name].insert_one(document)
else:
print(f"Collection {collection_name} does not exist in data")
db.create_collection(collection_name)
document = {"userId": userId,"email":"","created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "ip_address": userIP}
db[collection_name].insert_one(document)
except Exception as e:
print(f"Error saving user to mongodb: {e}")
raise HTTPException(status_code=400, detail=f"Error saving user to mongodb: {e}")
try:
# vector_store = get_vector_store(uid)
print("shopId in message:", shopId)
print("all collections in message:", qdrantClient.get_collections())
# print("qdrantClient:", qdrantClient)
vector_store = Qdrant(
client = qdrantClient,
collection_name = shopId,
embeddings = embeddings,
)
except Exception as e:
print(f"{e}")
raise HTTPException(status_code=400, detail=f"{e}")
try:
print("Setting up the self query retriever")
self_query_retriever = SelfQueryRetriever.from_llm(
llm,
vector_store,
document_content_description,
metadata_field_info,
# enable_limit= True,
verbose=True
)
except Exception as e:
print("Error in setting up the self query retriever:", e)
raise HTTPException(status_code=400, detail=f"Error in setting up the self query retriever: {e}")
try:
query = message
chain = (
# PromptTemplate.from_template(ECOMMERCE_TEMPLATE)
{"context": self_query_retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser() # for filterint the output
)
result = chain.invoke({"question": query})
print(type(result))
print(result)
except Exception as e:
print("Error in generating answer:", e)
raise HTTPException(status_code=400, detail=f"Error in generating answer: {e}")
try:
db = mongodbClient[f"shopID_{shopId}"]
collection_name = f"Interactions-{shopId}"
if collection_name in db.list_collection_names():
print("Collection found... in data")
else:
print(f"Collection {collection_name} does not exist in data")
db.create_collection(collection_name)
interactionId = str(uuid4())[:8]
document = {"interactionId": interactionId, "userId": userId, "ip_address": userIP, "sessionId": sessionId, "email": "", "message": message, "response": result, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
db[collection_name].insert_one(document)
except Exception as e:
print(f"Error saving interaction to mongodb: {e}")
raise HTTPException(status_code=400, detail=f"Error saving interaction to mongodb: {e}")
return JSONResponse(content={"response": result}, status_code=200)
except Exception as e:
print("Error:", e)
error_responses = [
"We apologize for the inconvenience, but an error occurred while processing your request.",
"We're sorry, but it seems there was a technical issue. Please try again later.",
"We've encountered an unexpected issue. Let's explore alternative options.",
"We regret to inform you that our system is experiencing technical difficulties. Thank you for your patience.",
"Unfortunately, we've encountered a problem on our end. We're investigating and will provide an update soon.",
"It appears there's a technical glitch in our system. We're working diligently to resolve it.",
"Regrettably, an unexpected error occurred while processing your request. Please stand by.",
"Our servers are experiencing some instability. Rest assured, our team is actively addressing the issue.",
"We're aware of the issue and actively working to resolve it. Thank you for your understanding.",
]
error_message = random.choice(error_responses)
print(error_message)
return JSONResponse(content={"response": error_message})