from fastapi import FastAPI, Request, File, UploadFile, HTTPException, Form, Response, Request from fastapi.responses import JSONResponse import requests from pydantic import BaseModel # from IPython.display import display, Markdown from fastapi.middleware.cors import CORSMiddleware from langchain_community.vectorstores import Qdrant from langchain_google_genai import ChatGoogleGenerativeAI # from langchain_together import Together from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings, HuggingFaceEmbeddings # from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings from langchain.chains.query_constructor.base import AttributeInfo from langchain_core.prompts import ChatPromptTemplate from langchain.retrievers.self_query.base import SelfQueryRetriever from langchain_core.output_parsers import StrOutputParser from qdrant_client import QdrantClient from langchain_core.prompts import PromptTemplate import os from dotenv import load_dotenv import random from pymongo import MongoClient import pandas as pd import io from langchain.docstore.document import Document from bson import ObjectId from datetime import datetime, timedelta from uuid import uuid4 from cryptography.fernet import Fernet import base64 import json # import uvicorn from langchain_core.runnables import RunnablePassthrough from transformers import AutoModel, AutoTokenizer from langchain.globals import set_debug, set_verbose from operator import itemgetter from langchain_openai.chat_models import ChatOpenAI import openai from datetime import datetime set_debug(True) # set_verbose(True) # load_dotenv() # API KEYS AND SECRETS LOADED FROM ENV # SECRET_KEY = os.getenv("SECRET_KEY").encode() # GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") # OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") # MONGODB_USER = os.getenv("MONGODB_USER") # MONGODB_PASSWORD = os.getenv("MONGODB_PASSWORD") SECRET_KEY = os.environ.get("SECRET_KEY").encode() GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY") MONGODB_USER = os.environ.get("MONGODB_USER") MONGODB_PASSWORD = os.environ.get("MONGODB_PASSWORD") # print(SECRET_KEY, GOOGLE_API_KEY, OPENAI_API_KEY, QDRANT_API_KEY, MONGODB_USER, MONGODB_PASSWORD) app = FastAPI() # setting up the secret key for encryption f = Fernet(SECRET_KEY) class LoginData(BaseModel): client_id: str client_secret: str code: str redirect_uri: str class RefreshToken(BaseModel): user_id: str client_id: str client_secret: str # Allow all origins origins = ['http://localhost:3000', 'http://127.0.0.1:3000', 'https://localhost:3000', 'https://127.0.0.1:3000', 'http://localhost:3001', 'http://127.0.0.1:3001', 'https://localhost:3001', 'https://127.0.0.1:3001', 'http://127.0.0.1:5500'] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], ) print("Starting the server.........") try: print("Waiting for the embedding model to load.............") # Load the embedding model model_name = "BAAI/bge-large-en" model_kwargs = {"device": "cpu"} encode_kwargs = {"normalize_embeddings": True} embeddings = HuggingFaceBgeEmbeddings( model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs ) print("Embedding model loaded.............") except Exception as e: print(f"Error loading embedding model: {e}") try: print("Loading Qdrant client") print("Waiting for the Qdrant embeddings from Docker to load.............") qdrantClient = QdrantClient( url="https://0b1a549b-0a48-4407-83ba-760296707b5d.us-east4-0.gcp.cloud.qdrant.io:6333", api_key=QDRANT_API_KEY, ) print("testing qdrantClient") print("all collections:", qdrantClient.get_collections()) print("Qdrant embeddings from Docker were loaded.............") except Exception as e: print(f"Error loading Qdrant embeddings: {e}") try: print("Waiting for the LLM from Docker to load.............") llm = ChatOpenAI(model="gpt-3.5-turbo", api_key=OPENAI_API_KEY) print("LLM model loaded.............") except Exception as e: print(f"Error loading LLM: {e}") # FUNCTIONS TO SETUP THE CONNECTIONS # Function to connect to MongoDB def connect_mongo(): try: uri = f"mongodb+srv://{MONGODB_USER}:{MONGODB_PASSWORD}@cluster0.x69zkkf.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0" mongodbClient = MongoClient(uri, authSource='admin') # mongodbClient = MongoClient(host="localhost",port= 27017, username="user",password='root', authSource='admin') # mongodbClient = MongoClient(host="localhost",port= 27017, username="user",password='root', authSource='admin') print("Connected to MongoDB") return mongodbClient except Exception as e: print(f"Error connecting to MongoDB: {e}") # FUNCTIONS TO UPLOAD THE FILES # check upload file validity def check_file_validation(type, file_content, file_name): if type == "productsDB": # check for the file type if file_name.endswith('.csv'): df = pd.read_csv(io.BytesIO(file_content)) elif file_name.endswith(('.xls', '.xlsx')): df = pd.read_excel(io.BytesIO(file_content)) else: print("Unsupported file format") raise ValueError("Unsupported file format") # Setting up the required columns required_columns = ['id', 'title', 'description', 'product_link', 'sale_price', ] columns_present = set(df.columns) if set(required_columns).issubset(columns_present): print("All required columns are present") return True else: print("Missing required columns") raise ValueError("Missing required columns") else: if file_name.endswith('.csv'): df = pd.read_csv(io.BytesIO(file_content)) elif file_name.endswith(('.xls', '.xlsx')): df = pd.read_excel(io.BytesIO(file_content)) else: print("Unsupported file format") raise ValueError("Unsupported file format") # Setting up the required columns required_columns = ['id', 'review_date', 'review_time','rating_details'] columns_present = set(df.columns) if set(required_columns).issubset(columns_present): print("All required columns are present") return True else: print("Missing required columns") raise ValueError("Missing required columns") # Function to save data to MongoDB def save_to_mongodb(file_content, file_name, collection_name, operation,db, type): try: if file_name.endswith('.csv'): df = pd.read_csv(io.BytesIO(file_content)) elif file_name.endswith(('.xls', '.xlsx')): df = pd.read_excel(io.BytesIO(file_content)) else: print("Unsupported file format") raise ValueError("Unsupported file format") if type == "productsDB": if "all_specifications" in df.columns: print("All Specifications columns present") df['all_specifications'] = df["all_specifications"].apply(eval) if "alt_images" in df.columns: print("Alt Images columns present") df['alt_images'] = df["alt_images"].apply(eval) elif type == "reviewsDB": if "review_images" in df.columns: print("image columns present") df['review_images'] = df["review_images"].apply(eval) uploaded_file = df.to_dict(orient="records") if operation == "replace": if collection_name in db.list_collection_names(): db[collection_name].drop() print("Dropped existing database") db.create_collection(collection_name) db[collection_name].insert_many(uploaded_file) print("Inserted new database after dropping the existing one") else: print(f"Collection {collection_name} does not exist") raise ValueError(f"Collection {collection_name} does not exist") elif operation == "append": # Check if the collection exists if collection_name in db.list_collection_names(): print("Collection exists") # Check if the collection is empty if db[collection_name].count_documents({}) > 0: print("Collection not empty") # iterating through each row and append it to the collection for record in uploaded_file: # logic to check if the record already exists in the collection existing_record = db[collection_name].find_one({'Id': record['Id']}) if existing_record: db[collection_name].update_one({'Id': record['Id']}, {'$set': record}) print(f"Record with id {record['Id']} already exists in the collection and updated") else: db[collection_name].insert_one(record) # print(f"Record with id {record['id']} appended to the collection") else: print(f"Collection {collection_name} does not exist") raise ValueError(f"Collection {collection_name} does not exist") elif operation == "new": db.create_collection(collection_name) db[collection_name].insert_many(uploaded_file) print("Inserted new database") except Exception as e: print(f"Error saving to MongoDB: {e}") raise HTTPException(status_code=400, detail=f"Error saving to MongoDB: {e}") # FUNCTIONS TO FETCH DATA FOR THE DASHBOARD INGESTION # Function to retrieve data from MongoDB def get_data_from_mongodb(collection_name, db, skip=0, limit=10, ): try: # print("Collection_name in get_data_from_mongodb:", collection_name) if collection_name in db.list_collection_names(): # print("collection found... in data") # total_data = db[collection_name].find() data = db[collection_name].find().skip(skip).limit(limit) data = [{k: v for k, v in doc.items() if k != '_id'} for doc in data] # print("Length in get_data_from_mongodb:", len(data)) return data else: # print(f"Collection {collection_name} does not exist in data") return [] except Exception as e: print(f"Error getting data from MongoDB: {e}") raise HTTPException(status_code=400, detail=f"Error getting data from MongoDB: {e}") # Function to get data count from MongoDB def get_data_count_from_mongodb(collection_name, db): try: if collection_name in db.list_collection_names(): # print("collection found... in count") total_data = db[collection_name].find() data_count = len(list(total_data)) return data_count else: # print(f"Collection {collection_name} does not exist in count") return 0 except Exception as e: print(f"Error getting data count from MongoDB: {e}") raise HTTPException(status_code=400, detail=f"Error getting data count from MongoDB: {e}") # FUNCTIONS FOR QDRANT EMBEDDINGS # Function to get the vector store def get_vector_store(shopId): try: print("shopId:", shopId) print("all collections in get_vector_store:", qdrantClient.get_collections()) vector_store = Qdrant( client = qdrantClient, collection_name = shopId, embeddings = embeddings, ) return vector_store except Exception as e: print(f"failed to load the vector store, {e}") raise HTTPException(status_code=400, detail=f"failed to load the vector store, {e}") # Function to retrieve data from MongoDB for embeddings def get_data_from_mongodb_for_embeddings(collection_name, db): try: # print("Collection_name in get_data_from_mongodb:", collection_name) if collection_name in db.list_collection_names(): print("collection found... in data") data = db[collection_name].find() data = [{k: v for k, v in doc.items() if k != '_id'} for doc in data] print("Length in get_data_from_mongodb:", len(data)) return data else: print(f"Collection {collection_name} does not exist in data") raise HTTPException(status_code=400, detail=f"Collection {collection_name} does not exist in data") except Exception as e: print(f"Error getting data from MongoDB: {e}") raise HTTPException(status_code=400, detail=f"Error getting data from MongoDB: {e}") # filtering the product for embedding def transform_products(product_list): tranformed_products = [] for product in product_list: tranformed_product = {} for key, value in product.items(): # skipping these keys if key not in ['_id', 'source', 'alt_images', 'reviews']: if key == "all_specifications": tranformed_product[key] = str(value) else: tranformed_product[key] = value # adding the first 15 reviews tranformed_product['reviews'] = json.dumps([review['rating_details'] for review in product.get('reviews', [])[:15]]) # tranformed_product['reviews'] = [review['rating_details'] for review in product.get('reviews', [])[:15]] tranformed_products.append(tranformed_product) return tranformed_products # Function to create embeddings and save in Qdrant cloud def create_embeddings(mapped_data_collection, embedding_collection, db): # global vector_store try: try: print("mapped_data_collection:", mapped_data_collection) product_list = get_data_from_mongodb_for_embeddings(mapped_data_collection, db) except Exception as e: print(f"Error getting data from MongoDB for embeddings: {e}") texts = [str(item['title']) + "\n" + str(item['description']) for item in product_list] # skip_columns = ['_id', 'source', 'alt_images'] # metadatas = [{k: str(v) if isinstance(v, ObjectId) else v for k, v in item.items()} for item in product_list] # print("product_list:", product_list) metadatas = transform_products(product_list) # print(metadatas[0]) docs = [Document(page_content=txt, metadata={"source": meta}) for txt, meta in zip(texts, metadatas)] print("New Data loaded.........") print("Creating embeddings in qdrant .......") # print("QDRANT_API_KEY:", QDRANT_API_KEY) vector_store = Qdrant.from_documents( docs, embeddings, url="https://0b1a549b-0a48-4407-83ba-760296707b5d.us-east4-0.gcp.cloud.qdrant.io:6333", api_key=QDRANT_API_KEY, collection_name=embedding_collection, force_recreate=True ) print("New Vector store loaded.........") except Exception as e: print(f"Error creating embeddings: {e}") raise HTTPException(status_code=400, detail=f"Error creating embeddings: {e}") # function to retrieve products from qdrant def retrieve_product(user_input, vector_store, k=10): result = vector_store.similarity_search_with_score(query=user_input, k=k) return result # function to create context from user query def create_context(user_input, vector_store): result = retrieve_product(user_input, vector_store) context = "" for index, value in enumerate(result): product = value product_title = product[ 0 ].page_content # Extracting the page_content for each result which is a string product_metadata = product[0].metadata[ "source" ] # Extracting the metadata for each result which is a dictionary with key values context += f""" * Product {index + 1} - - Product name : {product_metadata["name"]} - Product price: {product_metadata["discount_price"]} - Brief description of the product: {product_metadata["product_desc"]} - Detailed description of the product: {product_metadata["about_this_item"]} - Rating value (1.0 - 5.0): {product_metadata["ratings"]} - Overall review: {product_metadata["overall_review"]} """ # print(f"product_title: {type(product_title)}", product_title) # print(f"product_metadata: {type(product_metadata)}", product_metadata) return context # Define the aggregation pipeline for merging mongoDB collections def create_map_data_pipeline(reviews_collection_name, merged_collection_name): pipeline = [ { "$lookup": { "from": reviews_collection_name, "localField": "id", "foreignField": "id", "as": "reviews" } }, { "$project": { "_id": 1, "id": 1, "title": 1, "description": 1, "product_link": 1, "sale_price": 1, "all_specifications": 1, "overall_rating": 1, "image": 1, "alt_images": 1, "source": 1, "domain": 1, "reviews": { "$map": { "input": "$reviews", "as": "review", "in": { "_id": "$$review._id", "rating": "$$review.rating", "rating_title": "$$review.rating_title", "rating_details": "$$review.rating_details", "review_date": "$$review.review_date", "review_time": "$$review.review_time", # "review_images": "$$review.review_images", "source": "$$review.source" } } } } }, { "$out": merged_collection_name } ] return pipeline def parse_datetime(date_str, time_str): return datetime.strptime(f"{date_str} {time_str}", "%m/%d/%Y %H:%M") # COMMON GLOBAL VARIABLES # prompt template for the mistral model # template = """ # You are a friendly, conversational AI ecommerce assistant. The context includes 5 ecommerce products. # Use only the following context, to find the answer to the questions from the customer. # Its very important that you follow the below instructions. # -Dont use general knowledge to answer the question # -If you dont find the answer from the context or the question is not related to the context, just say that you don't know the answer. # -By any chance the customer should not know you are referring to a context. # Context: # {context} # Question: # {question} # Helpful Answer: # """ metadata_field_info = [ AttributeInfo( name="title", description="The Name of the product", type="string", ), AttributeInfo( name="product_link", description="The link of the product", type="string", ), AttributeInfo( name="sale_price", description="The actual price of the product", type="string", ), AttributeInfo( name="sale_price", description="The actual price of the product", type="integer", ), AttributeInfo( name="description", description="The description of the product", type="string", ), AttributeInfo( name="all_specifications", description="The Specification ,Hardware and Software configuration of the product like ram/RAM , storage/ROM ", type="string", ), AttributeInfo( name="all_specifications", description="The build of the product like ram/RAM , storage/ROM/memory , processors ", type="integer", ), # AttributeInfo( # name="all_specifications", # description="A dictionary or an object with key value pairs of specifications of the product like display:6.1 inches, battery: 5000 mAh, camera: 48MP", # type="dictionary or object", # ), AttributeInfo( name="domain", description="The domain of the product cellphone, smartphone, mobilephone, phone, mobile", type="string", ), AttributeInfo( name="image", description="The link to the image of the product", type="string", ), AttributeInfo( name="overall_rating", description="The overall rating of the product", type="string", ), AttributeInfo( name="overall_rating", description="The overall rating of the product", type="integer", ), AttributeInfo( name="reviews", description="A list of reviews of the product from the customers.", type="string" ), # AttributeInfo( # name="reviews", # description="A list of strings which are reviews of the product from the customers.", # type="list[string]" # ), ] document_content_description = "A review of the ecommerce product data." # ECOMMERCE_TEMPLATE = """\ # You are an AI assistant named ShopIntel, with expertise in electronic products, especially smartphones, laptops, displays, and electronics ecommerce gadgets and products. Your role is to assist users by providing accurate product information, answering their questions about electronics products in proper headings and bullet points, and making their shopping experience enjoyable and efficient. # YOU ARE BEST AT ANSWERING THESE BELOW TASKS: # (use this only for product-related queries): # - When a user asks you a product-related query, answer them in a more concise and accurate manner with all the specifications and product links(source link) from the self_query_retriever(context). # - When a user asks to compare between products, then give the response output in tabular format with all the specifications and source link(source link) from the self_query_retriever(context). # (use this for conversational and greetings related queries): # - When a user greets you, just greet them and give your introduction and what you are capable of, and make their conversation more engaging.\ # Also, suggest them top 5 potential questions related to ecommerce products asked by the users.\ # (use this when user asks some irrelevant queries out of context): # - When a user asks you about some product which you do not have the answer, just politely deny them and suggest some questions what you can answer them and you are built for. # - When a user asks you an out-of-context query, just say, "Oops!Looks like your question took a detour from our shopping adventure.\ # My expertise lies in assisting with queries such as:" and give some suggestions related to smartphones, laptops, and other electronics gadgets to ask.\ # Also suggest most trending questions asked by the users or the user himself based on their past shopping experience. # # Question:{question} # # """ # prompt = ChatPromptTemplate.from_template(ECOMMERCE_TEMPLATE) # ECOMMERCE_TEMPLATE = """\ # You are an AI assistant named ShopIntel, with expertise in electronic products, especially smartphones, laptops, displays, and electronics ecommerce gadgets and products. Your role is to assist users by providing accurate product information, answering their questions about electronics products in proper headings and bullet points, and making their shopping experience enjoyable and efficient. # YOU ARE BEST AT ANSWERING THESE BELOW TASKS: # (use this only for product-related queries): # - When a user asks you a product-related query, answer them in a more concise and accurate manner with all the specifications and product links(source link) as well. # - When a user asks to compare between products, then give the response output in tabular format with all the specifications and source link(source link) as well. # (use this for conversational and greetings related queries): # - When a user greets you, just greet them and give your introduction and what you are capable of, and make their conversation more engaging.\ # Also, suggest them top 5 potential questions related to ecommerce products asked by the users.\ # (use this when user asks some irrelevant queries out of context): # - When a user asks you about some product which you do not have the answer, just politely deny them and suggest some questions what you can answer them and you are built for. # - When a user asks you an out-of-context query, just say, "Oops!Looks like your question took a detour from our shopping adventure.\ # My expertise lies in assisting with queries such as:" and give some suggestions related to smartphones, laptops, and other electronics gadgets to ask.\ # Also suggest most trending questions asked by the users or the user himself based on their past shopping experience. # Query:{question} # Context:{context} # """ # rag_prompt = ChatPromptTemplate.from_template(ECOMMERCE_TEMPLATE) ECOMMERCE_TEMPLATE = """\ You are an AI assistant named ShopIntel, with expertise in electronic products, especially smartphones, laptops, displays, and electronics ecommerce gadgets and products. Your role is to assist users by providing accurate product information, answering their questions about electronics products in proper headings and bullet points, and making their shopping experience enjoyable and efficient. YOU ARE BEST AT ANSWERING THESE BELOW TASKS: (use this only for product-related queries from the self_query_retriever): - If the product doesnt match with the context product, skip those products. - When a user asks you a product-related query, answer them in a more concise and accurate manner with all the specifications and product links(source link) as well. - When a user asks to compare between products, then give the response output in an accurate manner comparing all the specifications and source link(source link) as well. (use this for conversational and greetings related queries): - When a user greets you, just greet them and give your introduction and what you are capable of, and make their conversation more engaging. Also, suggest them some potential questions related to ecommerce products to buy. (use this when user asks some irrelevant queries out of context): - When a user asks you an out-of-context query, just say, "Oops!Looks like your question took a detour from our shopping adventure. My expertise lies in assisting with queries such as:" and give them 5 suggestions related to smartphones, laptops, and other electronics gadgets to ask. Query:{question} Context:{context} """ prompt = ChatPromptTemplate.from_template(ECOMMERCE_TEMPLATE) # Connect to MongoDB database and creating a collection 'ShopIntelCollection' mongodbClient = connect_mongo() # qdrantClient = connect_qdrant() # APP ROUTES # Function to login user and get access token from google @app.post("/google/oauth/token") async def google_oauth(login: LoginData, response: Response, request: Request): global f try : print("Inside google oauth") # print("login client id", login.client_id, type(login.client_id)) # print("login client secret", login.client_secret, type(login.client_secret)) # print("login code", login.code, type(login.code)) # print("login redirect uri", login.redirect_uri, type(login.redirect_uri)) db = mongodbClient["ShopIntel_Users"] # setting the properites to get the access token try: token_url = "https://oauth2.googleapis.com/token" headers = { "Content-Type": "application/x-www-form-urlencoded", "Access-Control-Allow-Origin": "http://localhost:3000", } payload = { "client_id": login.client_id, "client_secret": login.client_secret, "code": login.code, "grant_type": "authorization_code", "redirect_uri": login.redirect_uri, } # using the token from react-oauth to get the access token res = requests.post(token_url, headers=headers, data=payload) res = res.json() # print("res", res) # Extracting the expires_in to setup the cookie for expiretoken expires_in = res.get("expires_in") access_token = res.get("access_token") refresh_token = res.get("refresh_token") except Exception as e: print(f"Failed to fetch OAuth token: {e}") raise HTTPException(status_code=500, detail="Failed to fetch OAuth token") # print("access_token", access_token) # print("refresh_token", refresh_token) # Retrieve user information using the access token try : userinfo_url = "https://www.googleapis.com/oauth2/v1/userinfo" userinfo_headers = { "Authorization": f"Bearer {access_token}" } userinfo_res = requests.get(userinfo_url, headers=userinfo_headers) if userinfo_res.status_code != 200: raise HTTPException(status_code=userinfo_res.status_code, detail=userinfo_res.text) userinfo = userinfo_res.json() # print(userinfo) # Extract email, name, profile or any other required information email = userinfo.get("email") name = userinfo.get("name") profile = userinfo.get("picture") except Exception as e: print(f"Failed to fetch user information: {e}") raise HTTPException(status_code=userinfo_res.status_code, detail=f"Failed to fetch user information {e}") # encrypt the refresh token and access token and converting the bytes to base64 encrypted_refresh_token_bytes = f.encrypt(refresh_token.encode()) encrypted_access_token_bytes = f.encrypt(access_token.encode()) encrypted_refresh_token_base64 = base64.urlsafe_b64encode(encrypted_refresh_token_bytes).decode() encrypted_access_token_base64 = base64.urlsafe_b64encode(encrypted_access_token_bytes).decode() # print("encrypted_access_token_base64", encrypted_access_token_base64, type(encrypted_access_token_base64)) # if (encrypted_access_token_base64 == access_token): # print("Warning They match !") # else : # print("Success they do not match !") # if (base64.urlsafe_b64decode(encrypted_access_token_base64) == encrypted_access_token_bytes): # print("Success Was decoded back to bytes and they match !") # else : # print("Warning Was not decoded back to bytes and they do not match !") # decrypted_refresh_token = f.decrypt(encrypted_refresh_token).decode() # print("decrypted_refresh_token", decrypted_refresh_token, type(decrypted_refresh_token)) # Check if the user already exists in the database try: if email: # Check if the collection exists if db.list_collection_names(): print("Collection exists") # Check if the collection is empty if db["users_collection"].count_documents({}) > 0: print("Collection not empty") existing_user = db["users_collection"].find_one({"email": email}) # Extracting the user _id from mongodb database if existing_user: print("A user exists with the same email. Updating refresh token.") db["users_collection"].update_one( {"email": email}, {"$set": {"refresh_token": encrypted_refresh_token_base64}} ) user = existing_user["shopId"] print("Shop id:", user) else: print("No user exist with the same email. So creating a new user") user = str(uuid4())[:8] print("User id:", user) document = {"shopId": user, "email": email, "name": name, "profile": profile, "refresh_token": encrypted_refresh_token_base64} db["users_collection"].insert_one(document) else: print("Collection is empty") user = str(uuid4())[:8] document = {"shopId": user, "email": email, "name": name, "profile": profile, "refresh_token": encrypted_refresh_token_base64} db["users_collection"].insert_one(document) else: print("Collection does not exist") db.create_collection("users_collection") user = str(uuid4())[:8] document = {"shopId": user, "email": email, "name": name, "profile": profile, "refresh_token": encrypted_refresh_token_base64} db["users_collection"].insert_one(document) except Exception as e: print("Error while saving to mongodb", e) raise HTTPException(status_code=500, detail="Error while saving to mongodb") return JSONResponse(content={"message": "Login successful","access_token": encrypted_access_token_base64, "expires_in": expires_in, "shopId": user, "name": name, "email": email, "profile": profile}, status_code=200) except Exception as e: print("Error:", e) return JSONResponse(content={"message": "Login failed"}, status_code=400) # function to logout user and remove refresh token @app.post("/logout") async def logout(request: Request): try: data = await request.json() shopId = data.get("shopId") db = mongodbClient["ShopIntel_Users"] print("shopId", shopId) db["users_collection"].update_one( {"shopId": shopId}, {"$unset": {"refresh_token": ""}} ) return JSONResponse(content={"message": "Logout successful"}, status_code=200) except Exception as e: print("Error:", e) # endpoint to refresh access token @app.post("/refreshToken") async def refreshToken(refreshToken: RefreshToken, request: Request): global f try: print("Inside refresh token") print("user_id", refreshToken.user_id) print("client_id", refreshToken.client_id) print("client_secret", refreshToken.client_secret) try : db = mongodbClient["ShopIntel_Users"] existing_user = db["users_collection"].find_one({"shopId": refreshToken.user_id}) if existing_user: print("A user exists, getting the refresh token") # decoding to bytes and decrypting encrypted_refresh_token_base64 = existing_user["refresh_token"] encrypted_refresh_token_bytes = base64.urlsafe_b64decode(encrypted_refresh_token_base64) refresh_token = f.decrypt(encrypted_refresh_token_bytes).decode() print("refresh_token", refresh_token) else: print("Invalid user id") raise HTTPException(detail="Invalid user id") except Exception as e: print("Failed to fetch refresh token from mongodb: ", e) raise HTTPException(detail=f"Failed to fetch refresh token from mongodb, error: {e}") try : token_url = "https://oauth2.googleapis.com/token" headers = { "Content-Type": "application/x-www-form-urlencoded" } payload = { "client_id": refreshToken.client_id, "client_secret": refreshToken.client_secret, "refresh_token": refresh_token, "grant_type": "refresh_token" } res = requests.post(token_url, headers=headers, data=payload) if res.status_code != 200: raise HTTPException(status_code=res.status_code, detail=res.text) data = res.json() expires_in = data.get("expires_in") access_token = data.get("access_token") # encrypt the access_token and convert the bytes to string for JSON serialization encrypted_access_token_bytes = f.encrypt(access_token.encode()) encrypted_access_token_base64 = base64.urlsafe_b64encode(encrypted_access_token_bytes).decode() return JSONResponse(content={"access_token": encrypted_access_token_base64, "expires_in": expires_in}, status_code=200) except Exception as e: print("Failed to fetch a new access token using the existing refresh token: ", e) raise HTTPException(status_code=res.status_code, detail=f"Failed to fetch a new access token using the existing refresh token") except Exception as e: print("Failed to fetch refresh token: ", e) raise HTTPException(status_code=500, detail=f"Failed to fetch refresh token: {e}") # # message endpoint to chatbot # @app.get("/message") # async def message(request: Request): # data = await request.json() # message = data["message"] # try: # context = create_context(message, vector_store) # prompt = PromptTemplate( # template=template, input_variables=["context", "question"] # ) # prompt_formatted_str = prompt.format(context=context, question=message) # output = llm.invoke(prompt_formatted_str) # print(output) # return JSONResponse(content={"response": output}) # except Exception as e: # print("Error:", e) # error_responses = [ # "We apologize for the inconvenience, but an error occurred while processing your request.", # "We're sorry, but it seems there was a technical issue. Please try again later.", # "We've encountered an unexpected issue. Let's explore alternative options.", # "We regret to inform you that our system is experiencing technical difficulties. Thank you for your patience.", # "Unfortunately, we've encountered a problem on our end. We're investigating and will provide an update soon.", # "It appears there's a technical glitch in our system. We're working diligently to resolve it.", # "Regrettably, an unexpected error occurred while processing your request. Please stand by.", # "Our servers are experiencing some instability. Rest assured, our team is actively addressing the issue.", # "We're aware of the issue and actively working to resolve it. Thank you for your understanding.", # ] # error_message = random.choice(error_responses) # print(error_message) # return JSONResponse(content={"response": error_message}) # fetch data from the end point @app.get("/fetch-data") async def fetch_data(request: Request): try: user = request.query_params.get("shopId") db = mongodbClient[f"shopID_{user}"] if request.query_params.get("getDataCount"): print("Fetching data count") products_count = get_data_count_from_mongodb(collection_name=f"products-{user}", db = db) reviews_count = get_data_count_from_mongodb(collection_name=f"reviews-{user}", db = db) return JSONResponse(content={"products_count": products_count, "reviews_count": reviews_count}) elif request.query_params.get("getDataBase"): print("Fetching data base") # print("user uid", user) page = int(request.query_params.get("page")) page_size = int(request.query_params.get("pageSize")) skip = (page - 1) * page_size products= get_data_from_mongodb(collection_name=f"products-{user}", skip=skip, limit=page_size, db=db) products_count = get_data_count_from_mongodb(collection_name=f"products-{user}", db=db) reviews = get_data_from_mongodb(collection_name=f"reviews-{user}", skip=skip, limit=page_size, db=db) reviews_count = get_data_count_from_mongodb(collection_name=f"reviews-{user}", db=db) return JSONResponse(content={"products": products, "products_count": products_count, "reviews": reviews, "reviews_count": reviews_count}) except Exception as e: print(f"An error occurred in fetching data: {e}") raise HTTPException(status_code=400, detail=f"An error occurred in fetching data: {e}") # upload file end point @app.post("/upload-file") async def upload_file(file: UploadFile = File(...), uploadType: str = Form(...), uploadDatabase: str = Form(...), shopId: str = Form(...)): try: db = mongodbClient[f"shopID_{shopId}"] # print("Uploading file") # print("Uploading file") if not file: raise HTTPException(status_code=400, detail="No file provided") print("file", file, type(file)) file_content = await file.read() # print("file successfully read") file_name = file.filename upload_type = uploadType upload_database = uploadDatabase print("upload_type", upload_type, type(upload_type)) print("upload_database", upload_database, type(upload_database)) if upload_database == "productsDB": collection_name = f"products-{shopId}" elif upload_database == "reviewsDB": collection_name = f"reviews-{shopId}" # collection_name = f"products-{user1}" try: check_file_validation(type=upload_database, file_content=file_content, file_name=file_name) except ValueError as e: print(f"File validation failed: {e}") raise HTTPException(status_code=400, detail=f"File validation failed: {e}") print("valid file") if upload_type == "append" or upload_type == "replace" or upload_type == "new": save_to_mongodb(file_content=file_content, file_name=file_name, collection_name=collection_name, operation=upload_type, db=db, type=upload_database) else: raise HTTPException(status_code=400, detail=f"Invalid upload type: {upload_type}") print("Successfully uploaded file") return JSONResponse(content={"message": f"The file was successfully uploaded."}, status_code=200) except HTTPException as http_error: # Raise HTTP exceptions explicitly raise http_error except ValueError as value_error: # Catch specific ValueError related to file validation print(f"Error validating file: {value_error}") raise HTTPException(status_code=400, detail=f"Error validating file: {value_error}") except Exception as e: # Catch any other unexpected errors print(f"An error occurred in uploading file: {e}") raise HTTPException(status_code=400, detail=f"An error occurred in uploading file: {e}") # embedding generation endpoint @app.post("/generate-embeddings") async def generate_embeddings(request: Request): try: data = await request.json() shopId = data.get("shopId") print(data) print(shopId) merged_collection_name = f"map_data-{shopId}" reviews_collection_name = f"reviews-{shopId}" products_collection_name = f"products-{shopId}" db = mongodbClient[f"shopID_{shopId}"] if merged_collection_name in db.list_collection_names(): print("map_data collection exist") db[merged_collection_name].drop() print("Dropped the existing map_data collection") else: print("map_data collection does not exist") try: # calling the pipeline to generate merged collection pipeline = create_map_data_pipeline(reviews_collection_name=reviews_collection_name, merged_collection_name=merged_collection_name) # aggregating the pipeline db[products_collection_name].aggregate(pipeline) except Exception as e: print(f"An error occurred in generating embeddings collection in mongodb: {e}") raise HTTPException(status_code=400, detail=f"An error occurred in generating embeddings collection in mongodb: {e}") try: create_embeddings(mapped_data_collection=merged_collection_name, embedding_collection=shopId, db=db) except Exception as e: print(f"An error occurred in creating embeddings: {e}") raise HTTPException(status_code=400, detail=f"An error occurred in creating embeddings: {e}") return JSONResponse(content={"message": "Embedding generation successful"}, status_code=200) except Exception as e: print(f"An error occurred in generating embeddings: {e}") return JSONResponse(content={"message": f"An error occurred in generating embeddings: {e}"}, status_code=400) # ROUTES FOR CHATBOT # endpoint to get the productsInformation @app.post("/productInformation") async def productInformation(request: Request): try: data = await request.json() shopId = data.get("shopId") print("shopId", shopId) product = data.get("productId") print("product", product) try: db = mongodbClient[f"shopID_{shopId}"] collection_name = f"map_data-{shopId}" if collection_name in db.list_collection_names(): print("Collection found... in data") document = db[collection_name].find_one({"id": product}) if document: print(f"Product found and returning the data for {product}") # Remove the main document _id if it exists if '_id' in document: del document['_id'] if 'reviews' in document: reviews = document['reviews'] # Sort reviews by date and time first reviews.sort(key=lambda x: parse_datetime(x['review_date'], x['review_time']), reverse=True) # Filter and sort reviews with rating >= 4 top_reviews = sorted( [review for review in reviews if review['rating'] >= 4], key=lambda x: -x['rating'] )[:3] # Filter and sort reviews with rating < 4 top_critical_reviews = sorted( [review for review in reviews if review['rating'] < 4], key=lambda x: x['rating'] )[:3] # Remove '_id' from each review in top_reviews for review in top_reviews: if '_id' in review: del review['_id'] # Remove '_id' from each review in top_critical_reviews for review in top_critical_reviews: if '_id' in review: del review['_id'] # Add the top reviews to the document document['top_reviews'] = top_reviews document['top_critical_reviews'] = top_critical_reviews # Remove the original reviews field del document['reviews'] productInformation = document else: print(f"product {product} not found") raise HTTPException(status_code=400, detail=f"product {product} not found") else: print(f"Collection {collection_name} does not exist in data") raise HTTPException(status_code=400, detail=f"Collection {collection_name} does not exist in data") except Exception as e: print(f"Error in getting product Information from mongodb: {e}") return JSONResponse(content={"productsInformation": productInformation }, status_code=200) except Exception as e: print(f"An error occurred in getting productInformation: {e}") return JSONResponse(content={"message": f"An error occurred in getting productInformation: {e}"}, status_code=400) # endpoint to message @app.post("/message") async def message(request: Request): try: data = await request.json() shopId = data.get("shopId") userId = data.get("userId") userIP = data.get("userIP") sessionId = data.get("sessionId") message = data.get("message") print("shopId", shopId) print("message", message) print("userId", userId) print("userIP", userIP) print("sessionId", sessionId) try: db = mongodbClient[f"shopID_{shopId}"] collection_name = f"chatBotUsers-{shopId}" if collection_name in db.list_collection_names(): print("Collection found... in data") document = db[collection_name].find_one({"userId": userId}) if document: print(f"User found ") else: print(f"User {userId} not found") document = {"userId": userId, "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")} db[collection_name].insert_one(document) else: print(f"Collection {collection_name} does not exist in data") db.create_collection(collection_name) document = {"userId": userId,"email":"","created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "ip_address": userIP} db[collection_name].insert_one(document) except Exception as e: print(f"Error saving user to mongodb: {e}") raise HTTPException(status_code=400, detail=f"Error saving user to mongodb: {e}") try: # vector_store = get_vector_store(uid) print("shopId in message:", shopId) print("all collections in message:", qdrantClient.get_collections()) # print("qdrantClient:", qdrantClient) vector_store = Qdrant( client = qdrantClient, collection_name = shopId, embeddings = embeddings, ) except Exception as e: print(f"{e}") raise HTTPException(status_code=400, detail=f"{e}") try: print("Setting up the self query retriever") self_query_retriever = SelfQueryRetriever.from_llm( llm, vector_store, document_content_description, metadata_field_info, # enable_limit= True, verbose=True ) except Exception as e: print("Error in setting up the self query retriever:", e) raise HTTPException(status_code=400, detail=f"Error in setting up the self query retriever: {e}") try: query = message chain = ( # PromptTemplate.from_template(ECOMMERCE_TEMPLATE) {"context": self_query_retriever, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() # for filterint the output ) result = chain.invoke({"question": query}) print(type(result)) print(result) except Exception as e: print("Error in generating answer:", e) raise HTTPException(status_code=400, detail=f"Error in generating answer: {e}") try: db = mongodbClient[f"shopID_{shopId}"] collection_name = f"Interactions-{shopId}" if collection_name in db.list_collection_names(): print("Collection found... in data") else: print(f"Collection {collection_name} does not exist in data") db.create_collection(collection_name) interactionId = str(uuid4())[:8] document = {"interactionId": interactionId, "userId": userId, "ip_address": userIP, "sessionId": sessionId, "email": "", "message": message, "response": result, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")} db[collection_name].insert_one(document) except Exception as e: print(f"Error saving interaction to mongodb: {e}") raise HTTPException(status_code=400, detail=f"Error saving interaction to mongodb: {e}") return JSONResponse(content={"response": result}, status_code=200) except Exception as e: print("Error:", e) error_responses = [ "We apologize for the inconvenience, but an error occurred while processing your request.", "We're sorry, but it seems there was a technical issue. Please try again later.", "We've encountered an unexpected issue. Let's explore alternative options.", "We regret to inform you that our system is experiencing technical difficulties. Thank you for your patience.", "Unfortunately, we've encountered a problem on our end. We're investigating and will provide an update soon.", "It appears there's a technical glitch in our system. We're working diligently to resolve it.", "Regrettably, an unexpected error occurred while processing your request. Please stand by.", "Our servers are experiencing some instability. Rest assured, our team is actively addressing the issue.", "We're aware of the issue and actively working to resolve it. Thank you for your understanding.", ] error_message = random.choice(error_responses) print(error_message) return JSONResponse(content={"response": error_message})