advanced-rag / custom_utils.py
bstraehle's picture
Update custom_utils.py
8451a7d verified
raw
history blame
4.71 kB
import openai, os, time
import pandas as pd
from datasets import load_dataset
from document_model import Listing, SearchResultItem
from pydantic import ValidationError
from pymongo.collection import Collection
from pymongo.errors import OperationFailure
from pymongo.operations import SearchIndexModel
from pymongo.mongo_client import MongoClient
DB_NAME = "airbnb_dataset"
COLLECTION_NAME = "listings_reviews"
def connect_to_database():
MONGODB_ATLAS_CLUSTER_URI = os.environ["MONGODB_ATLAS_CLUSTER_URI"]
mongo_client = MongoClient(MONGODB_ATLAS_CLUSTER_URI, appname="advanced-rag")
db = mongo_client.get_database(DB_NAME)
collection = db.get_collection(COLLECTION_NAME)
return db, collection
def rag_ingestion(collection):
dataset = load_dataset("MongoDB/airbnb_embeddings", streaming=True, split="train")
dataset_df = pd.DataFrame(dataset)
listings = process_records(dataset_df)
collection.delete_many({})
collection.insert_many(listings)
return "Manually create a vector search index (in free tier, this feature is not available via SDK)"
def rag_retrieval(openai_api_key, prompt, db, collection, stages=[], vector_index="vector_index"):
get_knowledge = vector_search(openai_api_key, prompt, db, collection, stages, vector_index)
if not get_knowledge:
return "No results found.", "No source information available."
search_results_models = [
SearchResultItem(**result)
for result in get_knowledge
]
search_results_df = pd.DataFrame([item.dict() for item in search_results_models])
print("###")
print(search_results_df)
print("###")
return search_results_df
def rag_inference(openai_api_key, prompt, search_results):
openai.api_key = openai_api_key
content = f"Answer this user question: {prompt} with the following context:\n{search_results}"
completion = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are an AirBnB listing recommendation system."},
{
"role": "user",
"content": content
}
]
)
return completion.choices[0].message.content
def process_records(data_frame):
records = data_frame.to_dict(orient="records")
# Handle potential NaT values
for record in records:
for key, value in record.items():
# List values
if isinstance(value, list):
processed_list = [None if pd.isnull(v) else v for v in value]
record[key] = processed_list
# Scalar values
else:
if pd.isnull(value):
record[key] = None
try:
# Convert each dictionary to a Listing instance
return [Listing(**record).dict() for record in records]
except ValidationError as e:
print("Validation error:", e)
return []
def vector_search(openai_api_key, user_query, db, collection, additional_stages=[], vector_index="vector_index"):
query_embedding = get_text_embedding(openai_api_key, user_query)
if query_embedding is None:
return "Invalid query or embedding generation failed."
vector_search_stage = {
"$vectorSearch": {
"index": vector_index,
"queryVector": query_embedding,
"path": "text_embeddings",
"numCandidates": 150,
"limit": 20,
"filter": {
"$and": [
{"accommodates": {"$eq": 2}},
{"bedrooms": {"$eq": 1}}
]
},
}
}
pipeline = [vector_search_stage] + additional_stages
results = collection.aggregate(pipeline)
explain_query_execution = db.command(
"explain", {
"aggregate": collection.name,
"pipeline": pipeline,
"cursor": {}
},
verbosity='executionStats')
vector_search_explain = explain_query_execution["stages"][0]["$vectorSearch"]
millis_elapsed = vector_search_explain["explain"]["collectStats"]["millisElapsed"]
print(f"Query execution time: {millis_elapsed} milliseconds")
return list(results)
def get_text_embedding(openai_api_key, text):
if not text or not isinstance(text, str):
return None
openai.api_key = openai_api_key
try:
embedding = openai.embeddings.create(
input=text,
model="text-embedding-3-small", dimensions=1536).data[0].embedding
return embedding
except Exception as e:
print(f"Error in get_embedding: {e}")
return None