File size: 9,153 Bytes
9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 9cfbab9 25d9ae3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
from typing import Tuple, TypedDict, Optional
import datetime
from datetime import datetime, timedelta
import pandas as pd
from next_place_ai.classes import DataPreparation, DatasetManager, AzureScore
from dotenv import load_dotenv
import os
load_dotenv()
class ProcessedSynapse(TypedDict):
id: Optional[str]
nextplace_id: Optional[str]
property_id: Optional[str]
listing_id: Optional[str]
address: Optional[str]
city: Optional[str]
state: Optional[str]
zip_code: Optional[str]
price: Optional[float]
beds: Optional[int]
baths: Optional[float]
sqft: Optional[int]
lot_size: Optional[int]
year_built: Optional[int]
days_on_market: Optional[int]
latitude: Optional[float]
longitude: Optional[float]
property_type: Optional[str]
last_sale_date: Optional[str]
hoa_dues: Optional[float]
query_date: Optional[str]
class CustomNextPlaceModel:
def __init__(self):
self.repo_id = os.getenv('REPO_ID')
self.hf_token = os.getenv('HF_TOKEN')
self._load_model()
def _load_model(self):
"""
Load all required models for the prediction pipeline
"""
try:
# Model A scoring
self.score_a = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='A',
scored_labels='A'
)
# Model B scorings
self.score_b_1 = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='B_1',
scored_labels='B'
)
self.score_b_2 = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='B_2',
scored_labels='B'
)
self.score_b_3 = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='B_3',
scored_labels='B'
)
# Model C scorings
self.score_c_models = {
'1': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[1]', scored_labels='price'),
'2': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[2]', scored_labels='price'),
'3_4': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[3, 4]', scored_labels='price'),
'5_6': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[5, 6]', scored_labels='price'),
'7': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[7]', scored_labels='price'),
'8_9': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_C_8_9', scored_labels='price')
}
# Time model
self.score_t_1 = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='model_T_1',
scored_labels='days'
)
# Data preparation module
self.data_manager = DatasetManager(repo_id=self.repo_id, token=self.hf_token)
except Exception as e:
raise ValueError(f"Error loading models: {str(e)}")
def predict(self, validators_data: pd.DataFrame) -> pd.DataFrame:
"""
Main prediction pipeline for processing input data
Args:
validators_data (pd.DataFrame): Input validation dataset
Returns:
pd.DataFrame: Processed prediction results
"""
# Ensure input is a DataFrame and has at least one row
if not isinstance(validators_data, pd.DataFrame) or validators_data.empty:
raise ValueError("Input must be a non-empty pandas DataFrame")
# Prepare data preparation instance
dp = DataPreparation(validators_data)
# Prepare initial dataset
dp.prepare_data()
# Predict A scores
score_A = self.score_a.predict_proba_dataset(dp.X)
# Combine datasets
combined_dataset = dp.combine_datasets(score_A, dp.X)
combined_dataset = combined_dataset.drop(columns=['0'])
combined_dataset, _ = dp.create_convolution_features(combined_dataset, combined_dataset.columns.to_list(), 3)
# Predict B scores for different categories
# score_B_1 = self.score_b_1.predict_proba_dataset(combined_dataset[combined_dataset['A']==1])
# score_B_2 = self.score_b_2.predict_proba_dataset(combined_dataset[combined_dataset['A']==2])
# score_B_3 = self.score_b_3.predict_proba_dataset(combined_dataset[combined_dataset['A']==3])
b_scores = {
'1': self.score_b_1.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 1])
if not combined_dataset[combined_dataset['A'] == 1].empty else pd.DataFrame(
{'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}),
'2': self.score_b_2.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 2])
if not combined_dataset[combined_dataset['A'] == 2].empty else pd.DataFrame(
{'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}),
'3': self.score_b_3.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 3])
if not combined_dataset[combined_dataset['A'] == 3].empty else pd.DataFrame(
{'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}),
}
# Concatenate B scores
df_B = pd.concat([b_scores['1'], b_scores['2'], b_scores['3']], ignore_index=True)
df_B_ = df_B.dropna()
# Further combine and process dataset
combined_dataset = dp.combine_datasets(df_B_, dp.X)
combined_dataset = combined_dataset.drop(columns=['0'])
combined_dataset, _ = dp.create_convolution_features(combined_dataset, combined_dataset.columns.to_list(), 3)
# Predict C scores for different categories
c_scores = {
'1': self.score_c_models['1'].predict_dataset(combined_dataset[combined_dataset['B'].isin([1])])
if not combined_dataset[combined_dataset['B'].isin([1])].empty else pd.DataFrame({'price': [0]}),
'2': self.score_c_models['2'].predict_dataset(combined_dataset[combined_dataset['B'].isin([2])])
if not combined_dataset[combined_dataset['B'].isin([2])].empty else pd.DataFrame({'price': [0]}),
'3_4': self.score_c_models['3_4'].predict_dataset(combined_dataset[combined_dataset['B'].isin([3, 4])])
if not combined_dataset[combined_dataset['B'].isin([3, 4])].empty else pd.DataFrame({'price': [0]}),
'5_6': self.score_c_models['5_6'].predict_dataset(combined_dataset[combined_dataset['B'].isin([5, 6])])
if not combined_dataset[combined_dataset['B'].isin([5, 6])].empty else pd.DataFrame({'price': [0]}),
'7': self.score_c_models['7'].predict_dataset(combined_dataset[combined_dataset['B'].isin([7])])
if not combined_dataset[combined_dataset['B'].isin([7])].empty else pd.DataFrame({'price': [0]}),
'8_9': self.score_c_models['8_9'].predict_dataset(combined_dataset[combined_dataset['B'].isin([8, 9])])
if not combined_dataset[combined_dataset['B'].isin([8, 9])].empty else pd.DataFrame({'price': [0]})
}
df_C = pd.concat(
[c_scores[key][['price']] for key in c_scores
if
isinstance(c_scores[key], pd.DataFrame) and 'price' in c_scores[key].columns and not c_scores[key].empty],
ignore_index=True
)
df_C_ = df_C[df_C['price'] != 0].copy()
# Combine datasets
t_df_ = pd.concat([combined_dataset.reset_index(drop=True), df_C_.reset_index(drop=True)], axis=1)
# Predict time
score_t_1 = self.score_t_1.predict_dataset(t_df_).astype(int)
# Final result
result = pd.concat([df_C_.reset_index(drop=True), score_t_1.reset_index(drop=True)], axis=1)
return result
def run_inference(self, input_data: ProcessedSynapse) -> Tuple[float, str]:
input_data = pd.DataFrame([input_data])
result = self.predict(input_data)
predicted_sale_price, predicted_days = result['price'].iloc[0], result['days'].iloc[0] # кол-во дней нужно преобразовать в дату в виде строки
current_days_on_market = input_data['days_on_market'].iloc[0] if 'days_on_market' in input_data else 0
# Вычисление даты размещения на рынке
date_listed = datetime.now() - timedelta(days=int(current_days_on_market))
# Вычисление предсказанной даты продажи
predicted_sale_date = (date_listed + timedelta(days=int(predicted_days))).strftime('%Y-%m-%d')
return float(predicted_sale_price), predicted_sale_date
|