Spaces:
Running
Running
import openai, os, time | |
#import pandas as pd | |
from datasets import load_dataset | |
#from pydantic import ValidationError | |
from pymongo.collection import Collection | |
from pymongo.errors import OperationFailure | |
from pymongo.mongo_client import MongoClient | |
from pymongo.operations import SearchIndexModel | |
DB_NAME = "airbnb_dataset" | |
COLLECTION_NAME = "listings_reviews" | |
def connect_to_database(): | |
MONGODB_ATLAS_CLUSTER_URI = os.environ["MONGODB_ATLAS_CLUSTER_URI"] | |
mongo_client = MongoClient(MONGODB_ATLAS_CLUSTER_URI, appname="advanced-rag") | |
db = mongo_client.get_database(DB_NAME) | |
collection = db.get_collection(COLLECTION_NAME) | |
return db, collection | |
def rag_ingestion(collection): | |
dataset = load_dataset("bstraehle/airbnb-san-francisco-202403-embed", streaming=True, split="train") | |
collection.delete_many({}) | |
collection.insert_many(dataset) | |
return "Manually create a vector search index (in free tier, this feature is not available via SDK)" | |
def rag_retrieval(openai_api_key, | |
prompt, | |
accomodates, | |
bedrooms, | |
db, | |
collection, | |
vector_index="vector_index"): | |
### | |
### Pre-retrieval processing: index filter | |
### Post-retrieval processing: result filter | |
#match_stage = { | |
# "$match": { | |
# "accommodates": { "$eq": 2}, | |
# "bedrooms": { "$eq": 1} | |
# } | |
#} | |
#additional_stages = [match_stage] | |
### | |
""" | |
projection_stage = { | |
"$project": { | |
"_id": 0, | |
"name": 1, | |
"accommodates": 1, | |
"address.street": 1, | |
"address.government_area": 1, | |
"address.market": 1, | |
"address.country": 1, | |
"address.country_code": 1, | |
"address.location.type": 1, | |
"address.location.coordinates": 1, | |
"address.location.is_location_exact": 1, | |
"summary": 1, | |
"space": 1, | |
"neighborhood_overview": 1, | |
"notes": 1, | |
"score": {"$meta": "vectorSearchScore"} | |
} | |
} | |
additional_stages = [projection_stage] | |
""" | |
### | |
#review_average_stage = { | |
# "$addFields": { | |
# "averageReviewScore": { | |
# "$divide": [ | |
# { | |
# "$add": [ | |
# "$review_scores.review_scores_accuracy", | |
# "$review_scores.review_scores_cleanliness", | |
# "$review_scores.review_scores_checkin", | |
# "$review_scores.review_scores_communication", | |
# "$review_scores.review_scores_location", | |
# "$review_scores.review_scores_value", | |
# ] | |
# }, | |
# 6 # Divide by the number of review score types to get the average | |
# ] | |
# }, | |
# # Calculate a score boost factor based on the number of reviews | |
# "reviewCountBoost": "$number_of_reviews" | |
# } | |
#} | |
#weighting_stage = { | |
# "$addFields": { | |
# "combinedScore": { | |
# # Example formula that combines average review score and review count boost | |
# "$add": [ | |
# {"$multiply": ["$averageReviewScore", 0.9]}, # Weighted average review score | |
# {"$multiply": ["$reviewCountBoost", 0.1]} # Weighted review count boost | |
# ] | |
# } | |
# } | |
#} | |
# Apply the combinedScore for sorting | |
#sorting_stage_sort = { | |
# "$sort": {"combinedScore": -1} # Descending order to boost higher combined scores | |
#} | |
#additional_stages = [review_average_stage, weighting_stage, sorting_stage_sort] | |
### | |
additional_stages = [] | |
### | |
### | |
get_knowledge = vector_search( | |
openai_api_key, | |
prompt, | |
accomodates, | |
bedrooms, | |
db, | |
collection, | |
additional_stages, | |
vector_index) | |
if not get_knowledge: | |
return "No results found.", "No source information available." | |
print("###") | |
print(get_knowledge) | |
print("###") | |
return get_knowledge | |
def rag_inference(openai_api_key, | |
prompt, | |
search_results): | |
openai.api_key = openai_api_key | |
content = f"Answer this user question: {prompt} with the following context:\n{search_results}" | |
completion = openai.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "system", | |
"content": "You are an AirBnB listing recommendation system."}, | |
{ | |
"role": "user", | |
"content": content | |
} | |
] | |
) | |
return completion.choices[0].message.content | |
def vector_search(openai_api_key, | |
user_query, | |
accommodates, | |
bedrooms, | |
db, | |
collection, | |
additional_stages=[], | |
vector_index="vector_index"): | |
query_embedding = get_text_embedding(openai_api_key, user_query) | |
if query_embedding is None: | |
return "Invalid query or embedding generation failed." | |
#vector_search_stage = { | |
# "$vectorSearch": { | |
# "index": vector_index, | |
# "queryVector": query_embedding, | |
# "path": "description_embedding", | |
# "numCandidates": 150, | |
# "limit": 3, | |
# } | |
#} | |
vector_search_stage = { | |
"$vectorSearch": { | |
"index": vector_index, | |
"queryVector": query_embedding, | |
"path": "description_embedding", | |
"numCandidates": 150, | |
"limit": 10, | |
"filter": { | |
"$and": [ | |
{"accommodates": {"$eq": accommodates}}, | |
{"bedrooms": {"$eq": bedrooms}} | |
] | |
}, | |
} | |
} | |
remove_embedding_stage = { | |
"$unset": "description_embedding" | |
} | |
pipeline = [vector_search_stage, remove_embedding_stage]# + additional_stages | |
results = collection.aggregate(pipeline) | |
#explain_query_execution = db.command( | |
# "explain", { | |
# "aggregate": collection.name, | |
# "pipeline": pipeline, | |
# "cursor": {} | |
# }, | |
# verbosity='executionStats') | |
#vector_search_explain = explain_query_execution["stages"][0]["$vectorSearch"] | |
#millis_elapsed = vector_search_explain["explain"]["collectStats"]["millisElapsed"] | |
#print(f"Query execution time: {millis_elapsed} milliseconds") | |
return list(results) | |
def get_text_embedding(openai_api_key, text): | |
if not text or not isinstance(text, str): | |
return None | |
openai.api_key = openai_api_key | |
try: | |
embedding = openai.embeddings.create( | |
input=text, | |
model="text-embedding-3-small", dimensions=1536).data[0].embedding | |
return embedding | |
except Exception as e: | |
print(f"Error in get_embedding: {e}") | |
return None |