# hkust_bnb_visualiser.py
# This module provides the main visualization for the HKUST BNB+ platform.
# It handles database connections, data retrieval, search relevance calculation, and map visualization
# for BNB listings across different neighborhoods in Hong Kong. The class integrates with traffic data
# to provide eco-friendly discount calculations based on traffic conditions.
# Key capabilities:
# - Text search functionality using sentence transformers
# - Traffic spot integration for eco-friendly discount calculations
# Author: Gordon Li (20317033)
# Date: March 2025
import oracledb
import pandas as pd
import folium
from html import escape
from sentence_transformers import SentenceTransformer, util
from geopy.distance import geodesic
import logging
from visualiser.td_traffic_spot_visualiser import TrafficSpotManager
from constant.hkust_bnb_constant import (
GET_ALL_NEIGHBORHOODS,
GET_NEIGHBORHOOD_LISTINGS,
GET_LISTING_REVIEWS,
GET_LISTING_REVIEWS_FOR_SEARCH,
DISCOUNT_INFO_TEMPLATE,
TRAFFIC_SPOT_INFO_TEMPLATE,
RELEVANCE_INFO_TEMPLATE,
POPUP_CONTENT_TEMPLATE,
MAP_SCRIPT
)
class HKUSTBNBVisualiser:
# Main class for BNB data visualization and management.
# Handles database connections, data retrieval, and rendering of interactive maps.
# Initializes the BNB visualizer with database connection, traffic spot manager, and NLP model.
# Sets up connection pool, loads traffic data, initializes sentence transformer model,
# and prepares neighborhood data with caching structures.
def __init__(self):
self.connection_params = {
'user': 'slliac',
'password': '7033',
'dsn': 'imz409.ust.hk:1521/imz409'
}
self.pool = oracledb.SessionPool(
user=self.connection_params['user'],
password=self.connection_params['password'],
dsn=self.connection_params['dsn'],
min=2,
max=5,
increment=1,
getmode=oracledb.SPOOL_ATTRVAL_WAIT
)
self.traffic_manager = TrafficSpotManager(self.connection_params)
logging.info(f"Traffic spots initialized, {len(self.traffic_manager.traffic_spots)} spots loaded")
try:
model_name = "sentence-transformers/all-MiniLM-L6-v2"
self.model = SentenceTransformer(model_name)
print(f"Loaded Sentence Transformer model: {model_name}")
except Exception as e:
print(f"Error loading model: {str(e)}")
self.model = None
try:
self.neighborhoods = self.get_all_neighborhoods()
self.cached_listings = {}
self.cached_embeddings = {}
except Exception as e:
print(f"Initialization error: {str(e)}")
self.neighborhoods = []
self.cached_listings = {}
self.cached_embeddings = {}
# Finds the nearest traffic spot to a given BNB listing location.
# Parameters:
# airbnb_lat: The latitude of the BNB listing
# airbnb_lng: The longitude of the BNB listing
# max_distance_km: Maximum distance in kilometers to consider a traffic spot (default: 0.7)
# Returns:
# Tuple containing (nearest_traffic_spot, distance_in_km) or (None, None) if no spot is found
def find_nearest_traffic_spot(self, airbnb_lat, airbnb_lng, max_distance_km=0.7):
nearest_spot = None
min_distance = float('inf')
for spot in self.traffic_manager.traffic_spots:
if not spot.is_valid():
continue
distance = geodesic(
(airbnb_lat, airbnb_lng),
(spot.latitude, spot.longitude)
).kilometers
if distance < min_distance and distance <= max_distance_km:
min_distance = distance
nearest_spot = spot
if nearest_spot:
return nearest_spot, min_distance
else:
return None, None
# Retrieves all available neighborhoods from the database.
# Returns:
# List of neighborhood names as strings
def get_all_neighborhoods(self):
connection = self.pool.acquire()
try:
cursor = connection.cursor()
cursor.prefetchrows = 50
cursor.arraysize = 50
cursor.execute(GET_ALL_NEIGHBORHOODS)
neighborhoods = [row[0] for row in cursor.fetchall()]
return neighborhoods
except Exception as e:
print(f"Database error getting neighborhoods: {str(e)}")
return []
finally:
self.pool.release(connection)
# Retrieves BNB listings for a specific neighborhood with caching.
# Parameters:
# neighborhood: The neighborhood name to retrieve listings for
# limit: Maximum number of listings to retrieve (default: 10)
# Returns:
# List of listing data rows from the database
def get_neighborhood_listings(self, neighborhood, limit=10):
if limit not in [10, 20, 30, 40, 50]:
limit = 10
if neighborhood in self.cached_listings and limit in self.cached_listings[neighborhood]:
return self.cached_listings[neighborhood][limit]
if neighborhood not in self.cached_listings:
self.cached_listings[neighborhood] = {}
connection = self.pool.acquire()
try:
cursor = connection.cursor()
cursor.prefetchrows = 50
cursor.arraysize = 50
cursor.execute(
GET_NEIGHBORHOOD_LISTINGS,
neighborhood=neighborhood,
limit=limit
)
listings = cursor.fetchall()
self.cached_listings[neighborhood][limit] = listings
return listings
except Exception as e:
print(f"Database error: {str(e)}")
return []
finally:
self.pool.release(connection)
# Retrieves reviews for a specific listing ID.
# Parameters:
# listing_id: The ID of the listing to get reviews for
# Returns:
# List of tuples containing (review_date, reviewer_name, comments)
def get_listing_reviews(self, listing_id):
connection = self.pool.acquire()
try:
cursor = connection.cursor()
cursor.execute(
GET_LISTING_REVIEWS,
listing_id=int(listing_id)
)
reviews = cursor.fetchall()
formatted_reviews = []
for review in reviews:
review_date, reviewer_name, comments = review
formatted_review = (
str(review_date) if review_date else '',
str(reviewer_name) if reviewer_name else '',
str(comments) if comments else ''
)
formatted_reviews.append(formatted_review)
return formatted_reviews
except Exception as e:
print(f"Error fetching reviews: {str(e)}")
return []
finally:
self.pool.release(connection)
# Retrieves review content for search functionality.
# Parameters:
# listing_id: The ID of the listing to get reviews for
# Returns:
# List of review comment strings for semantic search
def get_listing_reviews_for_search(self, listing_id):
connection = self.pool.acquire()
try:
cursor = connection.cursor()
cursor.execute(
GET_LISTING_REVIEWS_FOR_SEARCH,
listing_id=int(listing_id)
)
reviews = cursor.fetchall()
formatted_reviews = []
for review in reviews:
if review[0] is not None:
if hasattr(review[0], 'read'):
formatted_reviews.append(review[0].read())
else:
formatted_reviews.append(str(review[0]))
return formatted_reviews
except Exception as e:
print(f"Error fetching reviews for search: {str(e)}")
return []
finally:
self.pool.release(connection)
# Computes cosine similarity between two embeddings.
# Parameters:
# query_embedding: Embedding tensor for the search query
# target_embedding: Embedding tensor for the target text
# Returns:
# Float value representing similarity (0.0-1.0)
def compute_similarity(self, query_embedding, target_embedding):
if query_embedding is None or target_embedding is None:
return 0.0
try:
similarity = util.pytorch_cos_sim(query_embedding, target_embedding).item()
return similarity
except Exception as e:
print(f"Error computing similarity: {str(e)}")
return 0.0
# Computes relevance scores for listings based on search query.
# Parameters:
# df: DataFrame containing listing data
# search_query: User's search query string
# Returns:
# List of relevance scores for each listing in the DataFrame
def compute_search_scores(self, df, search_query):
if not search_query or self.model is None:
return [0.0] * len(df)
try:
query_key = f"query_{search_query}"
if query_key not in self.cached_embeddings:
self.cached_embeddings[query_key] = self.model.encode(search_query, convert_to_tensor=True)
query_embedding = self.cached_embeddings[query_key]
scores = []
for idx, row in df.iterrows():
title = str(row['name'])
reviews = self.get_listing_reviews_for_search(row['id'])
title_key = f"title_{row['id']}"
review_key = f"review_{row['id']}"
if title_key not in self.cached_embeddings:
title_embedding = self.model.encode(title, convert_to_tensor=True)
self.cached_embeddings[title_key] = title_embedding
else:
title_embedding = self.cached_embeddings[title_key]
review_embedding = None
if reviews and len(reviews) > 0:
if review_key not in self.cached_embeddings:
review_text = " ".join(reviews[:5])
review_embedding = self.model.encode(review_text, convert_to_tensor=True)
self.cached_embeddings[review_key] = review_embedding
else:
review_embedding = self.cached_embeddings[review_key]
title_similarity = self.compute_similarity(query_embedding, title_embedding)
review_similarity = 0.0
if review_embedding is not None:
review_similarity = self.compute_similarity(query_embedding, review_embedding)
final_score = title_similarity * 0.7 + review_similarity * 0.3 if review_embedding is not None else title_similarity
scores.append(final_score)
return scores
except Exception as e:
print(f"Error in search scoring: {str(e)}")
return [0.0] * len(df)
# Sorts a DataFrame of listings by their relevance to a search query.
# Parameters:
# df: DataFrame containing listing data
# search_query: User's search query string
# Returns:
# DataFrame sorted by relevance to the search query
def sort_by_relevance(self, df, search_query):
if not search_query:
return df
scores = self.compute_search_scores(df, search_query)
df['relevance_score'] = scores
df['relevance_percentage'] = df['relevance_score'] * 100
return df.sort_values('relevance_score', ascending=False)
# Creates an interactive map and DataFrame for display in the UI.
# Parameters:
# neighborhood: The neighborhood to display listings for (default: "Sha Tin")
# show_traffic: Whether to show traffic spots on the map (default: True)
# center_lat: Latitude to center the map on (default: None, will use mean of listings)
# center_lng: Longitude to center the map on (default: None, will use mean of listings)
# selected_id: ID of the currently selected listing (default: None)
# search_query: User's search query string (default: None)
# current_page: Current page number for pagination (default: 1)
# items_per_page: Number of items to show per page (default: 3)
# listings_limit: Maximum number of listings to retrieve (default: 10)
# Returns:
# Tuple containing (folium_map, listings_dataframe)
def create_map_and_data(self, neighborhood="Sha Tin", show_traffic=True, center_lat=None, center_lng=None,
selected_id=None, search_query=None, current_page=1, items_per_page=3, listings_limit=10):
if listings_limit not in [10, 20, 30, 40, 50]:
listings_limit = 10
listings = self.get_neighborhood_listings(neighborhood, listings_limit)
if not listings:
return None, None
df = pd.DataFrame(listings, columns=[
'id', 'name', 'host_name', 'neighbourhood',
'latitude', 'longitude', 'room_type', 'price',
'number_of_reviews', 'reviews_per_month',
'minimum_nights', 'availability_365'
])
numeric_cols = ['latitude', 'longitude', 'price', 'number_of_reviews',
'minimum_nights', 'availability_365', 'reviews_per_month']
for col in numeric_cols:
df[col] = pd.to_numeric(df[col], errors='coerce')
if search_query:
df = self.sort_by_relevance(df, search_query)
if df.empty:
return None, None
if center_lat is None or center_lng is None:
center_lat = df['latitude'].mean()
center_lng = df['longitude'].mean()
m = folium.Map(
location=[center_lat, center_lng],
zoom_start=16 if (center_lat is not None and center_lng is not None) else 14,
tiles='OpenStreetMap'
)
all_traffic_spots_to_display = set()
all_nearest_traffic_spots = {}
for idx, row in df.iterrows():
nearest_spot, distance = self.find_nearest_traffic_spot(row['latitude'], row['longitude'])
if nearest_spot:
all_nearest_traffic_spots[row['id']] = (nearest_spot, distance)
all_traffic_spots_to_display.add(nearest_spot.key)
lines_group = folium.FeatureGroup(name="Connection Lines")
m.add_child(lines_group)
if show_traffic and all_traffic_spots_to_display:
self.traffic_manager.add_spots_to_map(m, all_traffic_spots_to_display)
for idx, row in df.iterrows():
marker_id = f"marker_{row['id']}"
traffic_spot_info = ""
discount_info = ""
discounted_price = row['price']
if row['id'] in all_nearest_traffic_spots:
nearest_spot, distance = all_nearest_traffic_spots[row['id']]
discount_rate = nearest_spot.get_discount_rate()
if discount_rate > 0:
discounted_price = row['price'] * (1 - discount_rate)
discount_percentage = int(discount_rate * 100)
discount_info = DISCOUNT_INFO_TEMPLATE.format(
discount_percentage=discount_percentage,
original_price=row['price'],
discounted_price=discounted_price,
avg_vehicle_count=nearest_spot.avg_vehicle_count,
observation_count=len(nearest_spot.dataset_rows)
)
distance_str = f"{distance:.2f} km" if distance >= 0.1 else f"{distance * 1000:.0f} meters"
traffic_spot_info = TRAFFIC_SPOT_INFO_TEMPLATE.format(
spot_key=escape(str(nearest_spot.key)),
distance_str=distance_str
)
folium.PolyLine(
locations=[
[row['latitude'], row['longitude']],
[nearest_spot.latitude, nearest_spot.longitude]
],
color='blue',
weight=2,
opacity=0.7,
dash_array='5',
tooltip=f"Distance: {distance_str}"
).add_to(lines_group)
relevance_info = ""
if search_query and 'relevance_percentage' in row and 'relevance_features' in row:
relevance_info = RELEVANCE_INFO_TEMPLATE.format(
relevance_percentage=row['relevance_percentage'],
relevance_features=row['relevance_features'],
matching_features=row['matching_features']
)
price_display = f"Price: ${row['price']:.0f}"
if discount_info:
price_display = (f"Price: "
f"${row['price']:.0f} "
f"${discounted_price:.0f}")
popup_content = POPUP_CONTENT_TEMPLATE.format(
listing_name=escape(str(row['name'])),
host_name=escape(str(row['host_name'])),
room_type=escape(str(row['room_type'])),
price_display=price_display,
review_count=row['number_of_reviews'],
discount_info=discount_info,
traffic_spot_info=traffic_spot_info,
relevance_info=relevance_info
)
marker_color = 'green' if selected_id == row['id'] else 'red'
marker = folium.Marker(
location=[row['latitude'], row['longitude']],
popup=popup_content,
icon=folium.Icon(color=marker_color, icon='home'),
)
marker.add_to(m)
if selected_id is not None and row['id'] == selected_id:
marker._name = marker_id
folium.Element(MAP_SCRIPT).add_to(m)
folium.LayerControl().add_to(m)
return m, df