from typing import Tuple, TypedDict, Optional import datetime from datetime import datetime, timedelta import pandas as pd from next_place_ai.classes import DataPreparation, DatasetManager, AzureScore from dotenv import load_dotenv import os load_dotenv() class ProcessedSynapse(TypedDict): id: Optional[str] nextplace_id: Optional[str] property_id: Optional[str] listing_id: Optional[str] address: Optional[str] city: Optional[str] state: Optional[str] zip_code: Optional[str] price: Optional[float] beds: Optional[int] baths: Optional[float] sqft: Optional[int] lot_size: Optional[int] year_built: Optional[int] days_on_market: Optional[int] latitude: Optional[float] longitude: Optional[float] property_type: Optional[str] last_sale_date: Optional[str] hoa_dues: Optional[float] query_date: Optional[str] class CustomNextPlaceModel: def __init__(self): self.repo_id = os.getenv('REPO_ID') self.hf_token = os.getenv('HF_TOKEN') self._load_model() def _load_model(self): """ Load all required models for the prediction pipeline """ try: # Model A scoring self.score_a = AzureScore( repo_id=self.repo_id, token=self.hf_token, model_filename='A', scored_labels='A' ) # Model B scorings self.score_b_1 = AzureScore( repo_id=self.repo_id, token=self.hf_token, model_filename='B_1', scored_labels='B' ) self.score_b_2 = AzureScore( repo_id=self.repo_id, token=self.hf_token, model_filename='B_2', scored_labels='B' ) self.score_b_3 = AzureScore( repo_id=self.repo_id, token=self.hf_token, model_filename='B_3', scored_labels='B' ) # Model C scorings self.score_c_models = { '1': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[1]', scored_labels='price'), '2': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[2]', scored_labels='price'), '3_4': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[3, 4]', scored_labels='price'), '5_6': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[5, 6]', scored_labels='price'), '7': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[7]', scored_labels='price'), '8_9': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_C_8_9', scored_labels='price') } # Time model self.score_t_1 = AzureScore( repo_id=self.repo_id, token=self.hf_token, model_filename='model_T_1', scored_labels='days' ) # Data preparation module self.data_manager = DatasetManager(repo_id=self.repo_id, token=self.hf_token) except Exception as e: raise ValueError(f"Error loading models: {str(e)}") def predict(self, validators_data: pd.DataFrame) -> pd.DataFrame: """ Main prediction pipeline for processing input data Args: validators_data (pd.DataFrame): Input validation dataset Returns: pd.DataFrame: Processed prediction results """ # Ensure input is a DataFrame and has at least one row if not isinstance(validators_data, pd.DataFrame) or validators_data.empty: raise ValueError("Input must be a non-empty pandas DataFrame") # Prepare data preparation instance dp = DataPreparation(validators_data) # Prepare initial dataset dp.prepare_data() # Predict A scores score_A = self.score_a.predict_proba_dataset(dp.X) # Combine datasets combined_dataset = dp.combine_datasets(score_A, dp.X) combined_dataset = combined_dataset.drop(columns=['0']) combined_dataset, _ = dp.create_convolution_features(combined_dataset, combined_dataset.columns.to_list(), 3) # Predict B scores for different categories # score_B_1 = self.score_b_1.predict_proba_dataset(combined_dataset[combined_dataset['A']==1]) # score_B_2 = self.score_b_2.predict_proba_dataset(combined_dataset[combined_dataset['A']==2]) # score_B_3 = self.score_b_3.predict_proba_dataset(combined_dataset[combined_dataset['A']==3]) b_scores = { '1': self.score_b_1.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 1]) if not combined_dataset[combined_dataset['A'] == 1].empty else pd.DataFrame( {'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}), '2': self.score_b_2.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 2]) if not combined_dataset[combined_dataset['A'] == 2].empty else pd.DataFrame( {'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}), '3': self.score_b_3.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 3]) if not combined_dataset[combined_dataset['A'] == 3].empty else pd.DataFrame( {'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}), } # Concatenate B scores df_B = pd.concat([b_scores['1'], b_scores['2'], b_scores['3']], ignore_index=True) df_B_ = df_B.dropna() # Further combine and process dataset combined_dataset = dp.combine_datasets(df_B_, dp.X) combined_dataset = combined_dataset.drop(columns=['0']) combined_dataset, _ = dp.create_convolution_features(combined_dataset, combined_dataset.columns.to_list(), 3) # Predict C scores for different categories c_scores = { '1': self.score_c_models['1'].predict_dataset(combined_dataset[combined_dataset['B'].isin([1])]) if not combined_dataset[combined_dataset['B'].isin([1])].empty else pd.DataFrame({'price': [0]}), '2': self.score_c_models['2'].predict_dataset(combined_dataset[combined_dataset['B'].isin([2])]) if not combined_dataset[combined_dataset['B'].isin([2])].empty else pd.DataFrame({'price': [0]}), '3_4': self.score_c_models['3_4'].predict_dataset(combined_dataset[combined_dataset['B'].isin([3, 4])]) if not combined_dataset[combined_dataset['B'].isin([3, 4])].empty else pd.DataFrame({'price': [0]}), '5_6': self.score_c_models['5_6'].predict_dataset(combined_dataset[combined_dataset['B'].isin([5, 6])]) if not combined_dataset[combined_dataset['B'].isin([5, 6])].empty else pd.DataFrame({'price': [0]}), '7': self.score_c_models['7'].predict_dataset(combined_dataset[combined_dataset['B'].isin([7])]) if not combined_dataset[combined_dataset['B'].isin([7])].empty else pd.DataFrame({'price': [0]}), '8_9': self.score_c_models['8_9'].predict_dataset(combined_dataset[combined_dataset['B'].isin([8, 9])]) if not combined_dataset[combined_dataset['B'].isin([8, 9])].empty else pd.DataFrame({'price': [0]}) } df_C = pd.concat( [c_scores[key][['price']] for key in c_scores if isinstance(c_scores[key], pd.DataFrame) and 'price' in c_scores[key].columns and not c_scores[key].empty], ignore_index=True ) df_C_ = df_C[df_C['price'] != 0].copy() # Combine datasets t_df_ = pd.concat([combined_dataset.reset_index(drop=True), df_C_.reset_index(drop=True)], axis=1) # Predict time score_t_1 = self.score_t_1.predict_dataset(t_df_).astype(int) # Final result result = pd.concat([df_C_.reset_index(drop=True), score_t_1.reset_index(drop=True)], axis=1) return result def run_inference(self, input_data: ProcessedSynapse) -> Tuple[float, str]: input_data = pd.DataFrame([input_data]) result = self.predict(input_data) predicted_sale_price, predicted_days = result['price'].iloc[0], result['days'].iloc[0] # кол-во дней нужно преобразовать в дату в виде строки current_days_on_market = input_data['days_on_market'].iloc[0] if 'days_on_market' in input_data else 0 # Вычисление даты размещения на рынке date_listed = datetime.now() - timedelta(days=int(current_days_on_market)) # Вычисление предсказанной даты продажи predicted_sale_date = (date_listed + timedelta(days=int(predicted_days))).strftime('%Y-%m-%d') return float(predicted_sale_price), predicted_sale_date