advanced-rag / custom_utils.py
bstraehle's picture
Update custom_utils.py
6931de0 verified
raw
history blame
8.91 kB
import openai, os, time
import pandas as pd
from datasets import load_dataset
from document_model import Listing, SearchResultItem
from pydantic import ValidationError
from pymongo.collection import Collection
from pymongo.errors import OperationFailure
from pymongo.operations import SearchIndexModel
from pymongo.mongo_client import MongoClient
DB_NAME = "airbnb_dataset"
COLLECTION_NAME = "listings_reviews"
def connect_to_database():
MONGODB_ATLAS_CLUSTER_URI = os.environ["MONGODB_ATLAS_CLUSTER_URI"]
mongo_client = MongoClient(MONGODB_ATLAS_CLUSTER_URI, appname="advanced-rag")
db = mongo_client.get_database(DB_NAME)
collection = db.get_collection(COLLECTION_NAME)
return db, collection
def rag_ingestion(collection):
dataset = load_dataset("MongoDB/airbnb_embeddings", streaming=True, split="train")
dataset_df = pd.DataFrame(dataset)
listings = process_records(dataset_df)
collection.delete_many({})
collection.insert_many(listings)
return "Manually create a vector search index (in free tier, this feature is not available via SDK)"
def rag_retrieval(openai_api_key, prompt, db, collection, stages=[], vector_index="vector_index"):
# Assuming vector_search returns a list of dictionaries with keys 'title' and 'plot'
get_knowledge = vector_search(openai_api_key, prompt, db, collection, stages, vector_index)
# Check if there are any results
if not get_knowledge:
return "No results found.", "No source information available."
# Convert search results into a list of SearchResultItem models
search_results_models = [
SearchResultItem(**result)
for result in get_knowledge
]
# Convert search results into a DataFrame for better rendering in Jupyter
search_results_df = pd.DataFrame([item.dict() for item in search_results_models])
print("###")
print(search_results_df)
print("###")
return search_results_df
def rag_inference(openai_api_key, prompt, search_results):
openai.api_key = openai_api_key
# Generate system response using OpenAI's completion
content = f"Answer this user question: {prompt} with the following context:\n{search_results}"
completion = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are an AirBnB listing recommendation system."},
{
"role": "user",
"content": content
}
]
)
completion_result = completion.choices[0].message.content
print("###")
print(completion_result)
print("###")
return completion_result
def process_records(data_frame):
records = data_frame.to_dict(orient="records")
# Handle potential NaT values
for record in records:
for key, value in record.items():
# List values
if isinstance(value, list):
processed_list = [None if pd.isnull(v) else v for v in value]
record[key] = processed_list
# Scalar values
else:
if pd.isnull(value):
record[key] = None
try:
# Convert each dictionary to a Listing instance
return [Listing(**record).dict() for record in records]
except ValidationError as e:
print("Validation error:", e)
return []
def vector_search(openai_api_key, user_query, db, collection, additional_stages=[], vector_index="vector_index_text"):
"""
Perform a vector search in the MongoDB collection based on the user query.
Args:
user_query (str): The user's query string.
db (MongoClient.database): The database object.
collection (MongoCollection): The MongoDB collection to search.
additional_stages (list): Additional aggregation stages to include in the pipeline.
Returns:
list: A list of matching documents.
"""
# Generate embedding for the user query
query_embedding = get_embedding(openai_api_key, user_query)
if query_embedding is None:
return "Invalid query or embedding generation failed."
# Define the vector search stage
vector_search_stage = {
"$vectorSearch": {
"index": vector_index, # specifies the index to use for the search
"queryVector": query_embedding, # the vector representing the query
"path": "text_embeddings", # field in the documents containing the vectors to search against
"numCandidates": 150, # number of candidate matches to consider
"limit": 20, # return top 20 matches
}
}
# Define the aggregate pipeline with the vector search stage and additional stages
pipeline = [vector_search_stage] + additional_stages
# Execute the search
results = collection.aggregate(pipeline)
explain_query_execution = db.command( # sends a database command directly to the MongoDB server
'explain', { # return information about how MongoDB executes a query or command without actually running it
'aggregate': collection.name, # specifies the name of the collection on which the aggregation is performed
'pipeline': pipeline, # the aggregation pipeline to analyze
'cursor': {} # indicates that default cursor behavior should be used
},
verbosity='executionStats') # detailed statistics about the execution of each stage of the aggregation pipeline
vector_search_explain = explain_query_execution['stages'][0]['$vectorSearch']
#millis_elapsed = vector_search_explain['explain']['collectStats']['millisElapsed']
print(vector_search_explain)
#print(f"Total time for the execution to complete on the database server: {millis_elapsed} milliseconds")
return list(results)
def vector_search_with_filter(user_query, db, collection, additional_stages=[], vector_index="vector_index_2"):
"""
Perform a vector search in the MongoDB collection based on the user query.
Args:
user_query (str): The user's query string.
db (MongoClient.database): The database object.
collection (MongoCollection): The MongoDB collection to search.
additional_stages (list): Additional aggregation stages to include in the pipeline.
Returns:
list: A list of matching documents.
"""
# Generate embedding for the user query
query_embedding = get_embedding(user_query)
if query_embedding is None:
return "Invalid query or embedding generation failed."
# Define the vector search stage
vector_search_stage = {
"$vectorSearch": {
"index": vector_index, # specifies the index to use for the search
"queryVector": query_embedding, # the vector representing the query
"path": "text_embeddings", # field in the documents containing the vectors to search against
"numCandidates": 150, # number of candidate matches to consider
"limit": 20, # return top 20 matches
"filter": {
"$and": [
{"accommodates": {"$gte": 2}},
{"bedrooms": {"$lte": 7}}
]
},
}
}
# Define the aggregate pipeline with the vector search stage and additional stages
pipeline = [vector_search_stage] + additional_stages
# Execute the search
results = collection.aggregate(pipeline)
explain_query_execution = db.command( # sends a database command directly to the MongoDB server
'explain', { # return information about how MongoDB executes a query or command without actually running it
'aggregate': collection.name, # specifies the name of the collection on which the aggregation is performed
'pipeline': pipeline, # the aggregation pipeline to analyze
'cursor': {} # indicates that default cursor behavior should be used
},
verbosity='executionStats') # detailed statistics about the execution of each stage of the aggregation pipeline
vector_search_explain = explain_query_execution['stages'][0]['$vectorSearch']
millis_elapsed = vector_search_explain['explain']['collectStats']['millisElapsed']
print(f"Total time for the execution to complete on the database server: {millis_elapsed} milliseconds")
return list(results)
def get_embedding(openai_api_key, text):
"""Generate an embedding for the given text using OpenAI's API."""
# Check for valid input
if not text or not isinstance(text, str):
return None
openai.api_key = openai_api_key
try:
embedding = openai.embeddings.create(
input=text,
model="text-embedding-3-small", dimensions=1536).data[0].embedding
return embedding
except Exception as e:
print(f"Error in get_embedding: {e}")
return None