advanced-rag / custom_utils.py
bstraehle's picture
Update custom_utils.py
26e1b1b verified
raw
history blame
11 kB
import openai, os, time
from datasets import load_dataset
from pymongo.mongo_client import MongoClient
DB_NAME = "airbnb_dataset"
COLLECTION_NAME = "listings_reviews"
def connect_to_database():
MONGODB_ATLAS_CLUSTER_URI = os.environ["MONGODB_ATLAS_CLUSTER_URI"]
mongo_client = MongoClient(MONGODB_ATLAS_CLUSTER_URI, appname="advanced-rag")
db = mongo_client.get_database(DB_NAME)
collection = db.get_collection(COLLECTION_NAME)
return db, collection
def rag_ingestion(collection):
dataset = load_dataset("bstraehle/airbnb-san-francisco-202403-embed", streaming=True, split="train")
collection.delete_many({})
collection.insert_many(dataset)
return "Manually create a vector search index (in free tier, this feature is not available via SDK)"
def rag_retrieval_naive(openai_api_key,
prompt,
accomodates,
bedrooms,
db,
collection,
vector_index="vector_index"):
# Naive RAG: Semantic search
retrieval_result = vector_search_naive(
openai_api_key,
prompt,
accomodates,
bedrooms,
db,
collection,
vector_index
)
if not retrieval_result:
return "No results found."
#print(retrieval_result)
return retrieval_result
def rag_retrieval_advanced(openai_api_key,
prompt,
accomodates,
bedrooms,
db,
collection,
vector_index="vector_index"):
# Advanced RAG: Semantic search plus...
# 1a) Pre-retrieval processing: index filter (accomodates, bedrooms) plus...
# 1b) Post-retrieval processing: result filter (accomodates, bedrooms) plus...
# 2) Weighted average review, sorted in descending order
additional_stages = [
get_stage_average_review_and_review_count(),
get_stage_weighting(),
get_stage_sorting()
]
retrieval_result = vector_search_advanced(
openai_api_key,
prompt,
accomodates,
bedrooms,
db,
collection,
additional_stages,
vector_index
)
if not retrieval_result:
return "No results found."
#print(retrieval_result)
return retrieval_result
def inference(openai_api_key, prompt):
content = (
"Answer the question.\n"
"If you don't know the answer, just say that you don't know, don't try to make up an answer.\n"
"Keep the answer as concise as possible.\n\n"
f"Question: {prompt}\n"
"Helpful Answer: "
)
return invoke_llm(openai_api_key, content)
def rag_inference(openai_api_key, prompt, retrieval_result):
content = (
"Use the following pieces of context to answer the question at the end.\n"
"If you don't know the answer, just say that you don't know, don't try to make up an answer.\n"
"Keep the answer as concise as possible.\n\n"
f"{retrieval_result}\n\n"
f"Question: {prompt}\n"
"Helpful Answer: "
)
return invoke_llm(openai_api_key, content)
def invoke_llm(openai_api_key, content):
openai.api_key = openai_api_key
completion = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are an AirBnB listing recommendation system."},
{
"role": "user",
"content": content
}
],
temperature=0.01
)
return completion.choices[0].message.content
def vector_search_naive(openai_api_key,
prompt,
accomodates,
bedrooms,
db,
collection,
vector_index="vector_index"):
query_embedding = get_text_embedding(openai_api_key, prompt)
if query_embedding is None:
return "Invalid query or embedding generation failed."
vector_search_stage = {
"$vectorSearch": {
"index": vector_index,
"queryVector": query_embedding,
"path": "description_embedding",
"numCandidates": 150,
"limit": 25,
}
}
pipeline = [
vector_search_stage,
get_stage_include_fields(),
get_stage_filter_result(accomodates, bedrooms)
]
return invoke_search(db, collection, pipeline)
def vector_search_advanced(openai_api_key,
prompt,
accommodates,
bedrooms,
db,
collection,
additional_stages=[],
vector_index="vector_index"):
query_embedding = get_text_embedding(openai_api_key, prompt)
if query_embedding is None:
return "Invalid query or embedding generation failed."
vector_search_and_filter_stage = {
"$vectorSearch": {
"index": vector_index,
"queryVector": query_embedding,
"path": "description_embedding",
"numCandidates": 150,
"limit": 25,
"filter": {
"$and": [
{"accommodates": {"$eq": accommodates}},
{"bedrooms": {"$eq": bedrooms}}
]
},
}
}
pipeline = [
vector_search_and_filter_stage,
get_stage_include_fields()
] + additional_stages
return invoke_search(db, collection, pipeline)
def get_stage_exclude_fields():
return {
"$unset": "description_embedding"
}
def get_stage_include_fields():
return {
"$project": {
"id": 1,
"listing_url": 1,
"name": 1,
"description": 1,
"neighborhood_overview": 1,
"picture_url": 1,
"host_id": 1,
"host_url": 1,
"host_name": 1,
"host_since": 1,
"host_location": 1,
"host_about": 1,
"host_response_time": 1,
"host_response_rate": 1,
"host_acceptance_rate": 1,
"host_is_superhost": 1,
"host_thumbnail_url": 1,
"host_picture_url": 1,
"host_neighbourhood": 1,
"host_listings_count": 1,
"host_total_listings_count": 1,
"host_verifications": 1,
"host_has_profile_pic": 1,
"host_identity_verified": 1,
"neighbourhood": 1,
"neighbourhood_cleansed": 1,
"neighbourhood_group_cleansed": 1,
"latitude": 1,
"longitude": 1,
"property_type": 1,
"room_type": 1,
"accommodates": 1,
"bathrooms": 1,
"bathrooms_text": 1,
"bedrooms": 1,
"beds": 1,
"amenities": 1,
"price": 1,
"minimum_nights": 1,
"maximum_nights": 1,
"minimum_minimum_nights": 1,
"maximum_minimum_nights": 1,
"minimum_maximum_nights": 1,
"maximum_maximum_nights": 1,
"minimum_nights_avg_ntm": 1,
"maximum_nights_avg_ntm": 1,
"calendar_updated": 1,
"has_availability": 1,
"availability_30": 1,
"availability_60": 1,
"availability_90": 1,
"availability_365": 1,
"number_of_reviews": 1,
"number_of_reviews_ltm": 1,
"number_of_reviews_l30d": 1,
"first_review": 1,
"last_review": 1,
"review_scores_rating": 1,
"review_scores_accuracy": 1,
"review_scores_cleanliness": 1,
"review_scores_checkin": 1,
"review_scores_communication": 1,
"review_scores_location": 1,
"review_scores_value": 1,
"license": 1,
"instant_bookable": 1,
"calculated_host_listings_count": 1,
"calculated_host_listings_count_entire_homes": 1,
"calculated_host_listings_count_private_rooms": 1,
"calculated_host_listings_count_shared_rooms": 1,
"reviews_per_month": 1,
}
}
def get_stage_filter_result(accomodates, bedrooms):
return {
"$match": {
"accommodates": { "$eq": accomodates},
"bedrooms": { "$eq": bedrooms}
}
}
def get_stage_average_review_and_review_count():
return {
"$addFields": {
"averageReview": {
"$divide": [
{
"$add": [
"$review_scores_rating",
"$review_scores_accuracy",
"$review_scores_cleanliness",
"$review_scores_checkin",
"$review_scores_communication",
"$review_scores_location",
"$review_scores_value",
]
},
7
]
},
"reviewCount": "$number_of_reviews"
}
}
def get_stage_weighting():
return {
"$addFields": {
"weightedAverageReview": {
"$add": [
{"$multiply": ["$averageReview", 0.9]},
{"$multiply": ["$reviewCount", 0.1]},
]
}
}
}
def get_stage_sorting():
return {
"$sort": {"weightedAverageReview": -1}
}
def invoke_search(db, collection, pipeline):
results = collection.aggregate(pipeline)
print(f"Vector search millis elapsed: {get_millis_elapsed(db, collection, pipeline)}")
return list(results)
def get_millis_elapsed(db, collection, pipeline):
explain_query_execution = db.command(
"explain", {
"aggregate": collection.name,
"pipeline": pipeline,
"cursor": {}
},
verbosity="executionStats")
explain_vector_search = explain_query_execution["stages"][0]["$vectorSearch"]
return explain_vector_search["explain"]["collectStats"]["allCollectorStats"]["millisElapsed"]
def get_text_embedding(openai_api_key, text):
if not text or not isinstance(text, str):
return None
openai.api_key = openai_api_key
try:
return openai.embeddings.create(
input=text,
model="text-embedding-3-small", dimensions=1536
).data[0].embedding
except Exception as e:
print(f"Error in get_embedding: {e}")
return None