Spaces:
Running
Running
import pandas as pd | |
from surprise import Dataset, Reader, SVD | |
from surprise.model_selection import train_test_split | |
from datetime import datetime, timedelta | |
import pickle | |
import random | |
from sqlalchemy.orm import Session | |
from sqlalchemy import func | |
from typing import List | |
from orders.models import Order, Meal, RecommendationModel | |
from users.models import User | |
class MealRecommender: | |
def __init__(self, db: Session): | |
self.db = db | |
self.retrain_interval = timedelta(days=1) | |
self.algo = self.load_or_train_model() | |
def fetch_data(self): | |
# Fetch data in batches to handle large datasets | |
batch_size = 1000 | |
offset = 0 | |
data = [] | |
while True: | |
batch = self.db.query(Order.user_id, Order.meal_id, Order.quantity).offset(offset).limit(batch_size).all() | |
if not batch: | |
break | |
data.extend(batch) | |
offset += batch_size | |
return pd.DataFrame(data, columns=['user_id', 'meal_id', 'quantity']) | |
def train_model(self): | |
data = self.fetch_data() | |
if data.empty: | |
return None | |
reader = Reader(rating_scale=(1, 5)) | |
dataset = Dataset.load_from_df(data[['user_id', 'meal_id', 'quantity']], reader) | |
trainset = dataset.build_full_trainset() | |
algo = SVD() | |
algo.fit(trainset) | |
# Save model to database | |
model_binary = pickle.dumps(algo) | |
model_record = RecommendationModel(model=model_binary, created_at=datetime.now()) | |
self.db.add(model_record) | |
self.db.commit() | |
return algo | |
def load_or_train_model(self): | |
latest_model = self.db.query(RecommendationModel).order_by(RecommendationModel.created_at.desc()).first() | |
if latest_model and datetime.now() - latest_model.created_at <= self.retrain_interval: | |
return pickle.loads(latest_model.model) | |
else: | |
return self.train_model() | |
def get_recommendations(self, user: User): | |
if self.algo is None: | |
return self.get_random_recommendations() | |
all_meals = self.db.query(Meal).all() | |
meal_ids = [meal.id for meal in all_meals] | |
predictions = [self.algo.predict(str(user.id), str(meal_id)) for meal_id in meal_ids] | |
sorted_predictions = sorted(predictions, key=lambda x: x.est, reverse=True) | |
top_recommendations = self.db.query(Meal).filter(Meal.id.in_([int(pred.iid) for pred in sorted_predictions[:20]])).all() | |
top_recommendations = self.adjust_for_preferences(user, top_recommendations) | |
return top_recommendations[:5] # Return top 5 recommendations | |
def adjust_for_preferences(self, user: User, recommendations: List[Meal]) -> List[Meal]: | |
preferences = user.preferences if user.preferences else [] | |
preference_scores = {meal.id: 0 for meal in recommendations} | |
for meal in recommendations: | |
for preferred in preferences: | |
if preferred.lower() in meal.name.lower() or preferred.lower() in meal.description.lower(): | |
preference_scores[meal.id] += 1 | |
sorted_recommendations = sorted(recommendations, key=lambda meal: preference_scores[meal.id], reverse=True) | |
return sorted_recommendations | |
def get_random_recommendations(self): | |
all_meals = self.db.query(Meal).all() | |
return random.sample(all_meals, min(5, len(all_meals))) | |