|
from typing import Tuple, TypedDict, Optional |
|
import datetime |
|
from datetime import datetime, timedelta |
|
import pandas as pd |
|
from next_place_ai.classes import DataPreparation, DatasetManager, AzureScore |
|
from dotenv import load_dotenv |
|
import os |
|
|
|
load_dotenv() |
|
|
|
|
|
class ProcessedSynapse(TypedDict): |
|
id: Optional[str] |
|
nextplace_id: Optional[str] |
|
property_id: Optional[str] |
|
listing_id: Optional[str] |
|
address: Optional[str] |
|
city: Optional[str] |
|
state: Optional[str] |
|
zip_code: Optional[str] |
|
price: Optional[float] |
|
beds: Optional[int] |
|
baths: Optional[float] |
|
sqft: Optional[int] |
|
lot_size: Optional[int] |
|
year_built: Optional[int] |
|
days_on_market: Optional[int] |
|
latitude: Optional[float] |
|
longitude: Optional[float] |
|
property_type: Optional[str] |
|
last_sale_date: Optional[str] |
|
hoa_dues: Optional[float] |
|
query_date: Optional[str] |
|
|
|
|
|
class CustomNextPlaceModel: |
|
|
|
def __init__(self): |
|
self.repo_id = os.getenv('REPO_ID') |
|
self.hf_token = os.getenv('HF_TOKEN') |
|
self._load_model() |
|
|
|
def _load_model(self): |
|
""" |
|
Load all required models for the prediction pipeline |
|
""" |
|
try: |
|
|
|
self.score_a = AzureScore( |
|
repo_id=self.repo_id, |
|
token=self.hf_token, |
|
model_filename='A', |
|
scored_labels='A' |
|
) |
|
|
|
|
|
self.score_b_1 = AzureScore( |
|
repo_id=self.repo_id, |
|
token=self.hf_token, |
|
model_filename='B_1', |
|
scored_labels='B' |
|
) |
|
self.score_b_2 = AzureScore( |
|
repo_id=self.repo_id, |
|
token=self.hf_token, |
|
model_filename='B_2', |
|
scored_labels='B' |
|
) |
|
self.score_b_3 = AzureScore( |
|
repo_id=self.repo_id, |
|
token=self.hf_token, |
|
model_filename='B_3', |
|
scored_labels='B' |
|
) |
|
|
|
|
|
self.score_c_models = { |
|
'1': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[1]', scored_labels='price'), |
|
'2': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[2]', scored_labels='price'), |
|
'3_4': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[3, 4]', scored_labels='price'), |
|
'5_6': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[5, 6]', scored_labels='price'), |
|
'7': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[7]', scored_labels='price'), |
|
'8_9': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_C_8_9', scored_labels='price') |
|
} |
|
|
|
|
|
self.score_t_1 = AzureScore( |
|
repo_id=self.repo_id, |
|
token=self.hf_token, |
|
model_filename='model_T_1', |
|
scored_labels='days' |
|
) |
|
|
|
|
|
self.data_manager = DatasetManager(repo_id=self.repo_id, token=self.hf_token) |
|
|
|
except Exception as e: |
|
raise ValueError(f"Error loading models: {str(e)}") |
|
|
|
def predict(self, validators_data: pd.DataFrame) -> pd.DataFrame: |
|
""" |
|
Main prediction pipeline for processing input data |
|
|
|
Args: |
|
validators_data (pd.DataFrame): Input validation dataset |
|
|
|
Returns: |
|
pd.DataFrame: Processed prediction results |
|
""" |
|
|
|
if not isinstance(validators_data, pd.DataFrame) or validators_data.empty: |
|
raise ValueError("Input must be a non-empty pandas DataFrame") |
|
|
|
|
|
dp = DataPreparation(validators_data) |
|
|
|
|
|
dp.prepare_data() |
|
|
|
|
|
score_A = self.score_a.predict_proba_dataset(dp.X) |
|
|
|
|
|
combined_dataset = dp.combine_datasets(score_A, dp.X) |
|
combined_dataset = combined_dataset.drop(columns=['0']) |
|
combined_dataset, _ = dp.create_convolution_features(combined_dataset, combined_dataset.columns.to_list(), 3) |
|
|
|
|
|
|
|
|
|
|
|
b_scores = { |
|
'1': self.score_b_1.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 1]) |
|
if not combined_dataset[combined_dataset['A'] == 1].empty else pd.DataFrame( |
|
{'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}), |
|
'2': self.score_b_2.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 2]) |
|
if not combined_dataset[combined_dataset['A'] == 2].empty else pd.DataFrame( |
|
{'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}), |
|
'3': self.score_b_3.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 3]) |
|
if not combined_dataset[combined_dataset['A'] == 3].empty else pd.DataFrame( |
|
{'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}), |
|
} |
|
|
|
|
|
df_B = pd.concat([b_scores['1'], b_scores['2'], b_scores['3']], ignore_index=True) |
|
|
|
df_B_ = df_B.dropna() |
|
|
|
|
|
combined_dataset = dp.combine_datasets(df_B_, dp.X) |
|
combined_dataset = combined_dataset.drop(columns=['0']) |
|
combined_dataset, _ = dp.create_convolution_features(combined_dataset, combined_dataset.columns.to_list(), 3) |
|
|
|
|
|
c_scores = { |
|
'1': self.score_c_models['1'].predict_dataset(combined_dataset[combined_dataset['B'].isin([1])]) |
|
if not combined_dataset[combined_dataset['B'].isin([1])].empty else pd.DataFrame({'price': [0]}), |
|
'2': self.score_c_models['2'].predict_dataset(combined_dataset[combined_dataset['B'].isin([2])]) |
|
if not combined_dataset[combined_dataset['B'].isin([2])].empty else pd.DataFrame({'price': [0]}), |
|
'3_4': self.score_c_models['3_4'].predict_dataset(combined_dataset[combined_dataset['B'].isin([3, 4])]) |
|
if not combined_dataset[combined_dataset['B'].isin([3, 4])].empty else pd.DataFrame({'price': [0]}), |
|
'5_6': self.score_c_models['5_6'].predict_dataset(combined_dataset[combined_dataset['B'].isin([5, 6])]) |
|
if not combined_dataset[combined_dataset['B'].isin([5, 6])].empty else pd.DataFrame({'price': [0]}), |
|
'7': self.score_c_models['7'].predict_dataset(combined_dataset[combined_dataset['B'].isin([7])]) |
|
if not combined_dataset[combined_dataset['B'].isin([7])].empty else pd.DataFrame({'price': [0]}), |
|
'8_9': self.score_c_models['8_9'].predict_dataset(combined_dataset[combined_dataset['B'].isin([8, 9])]) |
|
if not combined_dataset[combined_dataset['B'].isin([8, 9])].empty else pd.DataFrame({'price': [0]}) |
|
} |
|
df_C = pd.concat( |
|
[c_scores[key][['price']] for key in c_scores |
|
if |
|
isinstance(c_scores[key], pd.DataFrame) and 'price' in c_scores[key].columns and not c_scores[key].empty], |
|
ignore_index=True |
|
) |
|
|
|
df_C_ = df_C[df_C['price'] != 0].copy() |
|
|
|
|
|
t_df_ = pd.concat([combined_dataset.reset_index(drop=True), df_C_.reset_index(drop=True)], axis=1) |
|
|
|
|
|
score_t_1 = self.score_t_1.predict_dataset(t_df_).astype(int) |
|
|
|
|
|
result = pd.concat([df_C_.reset_index(drop=True), score_t_1.reset_index(drop=True)], axis=1) |
|
|
|
return result |
|
|
|
def run_inference(self, input_data: ProcessedSynapse) -> Tuple[float, str]: |
|
|
|
input_data = pd.DataFrame([input_data]) |
|
result = self.predict(input_data) |
|
predicted_sale_price, predicted_days = result['price'].iloc[0], result['days'].iloc[0] |
|
|
|
current_days_on_market = input_data['days_on_market'].iloc[0] if 'days_on_market' in input_data else 0 |
|
|
|
|
|
date_listed = datetime.now() - timedelta(days=int(current_days_on_market)) |
|
|
|
|
|
predicted_sale_date = (date_listed + timedelta(days=int(predicted_days))).strftime('%Y-%m-%d') |
|
|
|
return float(predicted_sale_price), predicted_sale_date |
|
|