import openai, os, time import pandas as pd from datasets import load_dataset from document_model import Listing, SearchResultItem from pydantic import ValidationError from pymongo.collection import Collection from pymongo.errors import OperationFailure from pymongo.operations import SearchIndexModel from pymongo.mongo_client import MongoClient DB_NAME = "airbnb_dataset" COLLECTION_NAME = "listings_reviews" def connect_to_database(): MONGODB_ATLAS_CLUSTER_URI = os.environ["MONGODB_ATLAS_CLUSTER_URI"] mongo_client = MongoClient(MONGODB_ATLAS_CLUSTER_URI, appname="advanced-rag") db = mongo_client.get_database(DB_NAME) collection = db.get_collection(COLLECTION_NAME) return db, collection def rag_ingestion(collection): dataset = load_dataset("MongoDB/airbnb_embeddings", streaming=True, split="train") dataset_df = pd.DataFrame(dataset) listings = process_records(dataset_df) collection.delete_many({}) collection.insert_many(listings) return "Manually create a vector search index (in free tier, this feature is not available via SDK)" def rag_retrieval(openai_api_key, prompt, db, collection, stages=[], vector_index="vector_index"): get_knowledge = vector_search(openai_api_key, prompt, db, collection, stages, vector_index) if not get_knowledge: return "No results found.", "No source information available." search_results_models = [ SearchResultItem(**result) for result in get_knowledge ] search_results_df = pd.DataFrame([item.dict() for item in search_results_models]) print("###") print(search_results_df) print("###") return search_results_df def rag_inference(openai_api_key, prompt, search_results): openai.api_key = openai_api_key content = f"Answer this user question: {prompt} with the following context:\n{search_results}" completion = openai.chat.completions.create( model="gpt-4o", messages=[ { "role": "system", "content": "You are an AirBnB listing recommendation system."}, { "role": "user", "content": content } ] ) return completion.choices[0].message.content def process_records(data_frame): records = data_frame.to_dict(orient="records") # Handle potential NaT values for record in records: for key, value in record.items(): # List values if isinstance(value, list): processed_list = [None if pd.isnull(v) else v for v in value] record[key] = processed_list # Scalar values else: if pd.isnull(value): record[key] = None try: # Convert each dictionary to a Listing instance return [Listing(**record).dict() for record in records] except ValidationError as e: print("Validation error:", e) return [] def vector_search(openai_api_key, user_query, db, collection, additional_stages=[], vector_index="vector_index"): query_embedding = get_text_embedding(openai_api_key, user_query) if query_embedding is None: return "Invalid query or embedding generation failed." vector_search_stage = { "$vectorSearch": { "index": vector_index, "queryVector": query_embedding, "path": "text_embeddings", "numCandidates": 150, "limit": 20, "filter": { "$and": [ {"accommodates": {"$eq": 2}}, {"bedrooms": {"$eq": 1}} ] }, } } pipeline = [vector_search_stage] + additional_stages results = collection.aggregate(pipeline) explain_query_execution = db.command( "explain", { "aggregate": collection.name, "pipeline": pipeline, "cursor": {} }, verbosity='executionStats') vector_search_explain = explain_query_execution["stages"][0]["$vectorSearch"] millis_elapsed = vector_search_explain["explain"]["collectStats"]["millisElapsed"] print(f"Query execution time: {millis_elapsed} milliseconds") return list(results) def get_text_embedding(openai_api_key, text): if not text or not isinstance(text, str): return None openai.api_key = openai_api_key try: embedding = openai.embeddings.create( input=text, model="text-embedding-3-small", dimensions=1536).data[0].embedding return embedding except Exception as e: print(f"Error in get_embedding: {e}") return None