from torch.utils.data import Dataset import pickle from src.cocktails.utilities.ingredients_utilities import extract_ingredients, ingredient_list, ingredient_profiles, ingredients_per_type from src.cocktails.utilities.other_scrubbing_utilities import print_recipe import numpy as np def get_representation_from_ingredient(ingredients, quantities, max_q_per_ing, index, params): assert len(ingredients) == len(quantities) ing, q = ingredients[index], quantities[index] proportion = q / np.sum(quantities) index_ing = ingredient_list.index(ing) # add keys of profile rep_ingredient = [] rep_ingredient += [ingredient_profiles[k][index_ing] for k in params['ing_keys']] # add category encoding # rep_ingredient += list(params['category_encodings'][ingredient_profiles['type'][index_ing]]) # add quantitiy and relative quantity rep_ingredient += [q / max_q_per_ing[ing], proportion] ing_one_hot = np.zeros(len(ingredient_list)) ing_one_hot[index_ing] = 1 rep_ingredient += list(ing_one_hot) indexes_to_normalize = list(range(len(params['ing_keys']))) #TODO: should we add ing one hot? Or make sure no 2 ing have same embedding return np.array(rep_ingredient), indexes_to_normalize def get_max_n_ingredients(data): max_count = 0 ingredient_set = set() alcohol_set = set() liqueur_set = set() ing_str = np.array(data['ingredients_str']) for i in range(len(data['names'])): ingredients, quantities = extract_ingredients(ing_str[i]) max_count = max(max_count, len(ingredients)) for ing in ingredients: ingredient_set.add(ing) if ing in ingredients_per_type['liquor']: alcohol_set.add(ing) if ing in ingredients_per_type['liqueur']: liqueur_set.add(ing) return max_count, ingredient_set, alcohol_set, liqueur_set # Add your custom dataset class here class MyDataset(Dataset): def __init__(self, split, params): data = params['raw_data'] self.dim_rep_ingredient = params['dim_rep_ingredient'] n_data = len(data["names"]) preparation_list = sorted(set(data['category'])) categories_list = sorted(set(data['subcategory'])) glasses_list = sorted(set(data['glass'])) max_ingredients, ingredient_set, liquor_set, liqueur_set = get_max_n_ingredients(data) ingredient_set = sorted(ingredient_set) self.ingredient_set = ingredient_set ingredient_quantities = [] # output of our network ingr_strs = np.array(data['ingredients_str']) for i in range(n_data): ingredients, quantities = extract_ingredients(ingr_strs[i]) # get ingredient presence and quantity ingredient_q_rep = np.zeros([len(ingredient_set)]) for ing, q in zip(ingredients, quantities): ingredient_q_rep[ingredient_set.index(ing)] = q ingredient_quantities.append(ingredient_q_rep) # take care of ingredient quantities (OUTPUTS) ingredient_quantities = np.array(ingredient_quantities) ingredients_presence = (ingredient_quantities>0).astype(np.int) min_ing_quantities = np.min(ingredient_quantities, axis=0) max_ing_quantities = np.max(ingredient_quantities, axis=0) def normalize_ing_quantities(ing_quantities): return ((ing_quantities - min_ing_quantities) / (max_ing_quantities - min_ing_quantities)).copy() def denormalize_ing_quantities(normalized_ing_quantities): return (normalized_ing_quantities * (max_ing_quantities - min_ing_quantities) + min_ing_quantities).copy() ing_q_when_present = ingredient_quantities.copy() for i in range(len(ing_q_when_present)): ing_q_when_present[i, np.where(ing_q_when_present[i, :] == 0)] = np.nan self.min_when_present_ing_quantities = np.nanmin(ing_q_when_present, axis=0) def filter_decoder_output(output): output_unnormalized = output * max_ing_quantities if output.ndim == 1: output_unnormalized[np.where(output_unnormalized 2: taste_rep_valid.append(True) taste_rep_ground_truth.append([float(tr.split('[')[1].split(',')[0]), float(tr.split(']')[0].split(',')[1][1:])]) else: taste_rep_valid.append(False) taste_rep_ground_truth.append([np.nan, np.nan]) taste_rep_ground_truth = np.array(taste_rep_ground_truth) taste_rep_valid = np.array(taste_rep_valid) taste_rep_ground_truth /= 10 auxiliary_data = dict(categories=categories, glasses=glasses, prep_type=prep_type, cocktail_reps=computed_cocktail_reps, ingredients_presence=ingredients_presence, taste_reps=taste_rep_ground_truth, volume=volumes, ingredients_quantities=ingredient_quantities) self.auxiliary_keys = sorted(params['auxiliaries_dict'].keys()) assert self.auxiliary_keys == sorted(auxiliary_data.keys()) data_preprocessing = dict(min_max_ing_quantities=(min_ing_quantities, max_ing_quantities), min_max_ing_reps=(min_ing_reps, max_ing_reps), min_max_vol=(min_vol, max_vol)) if split == 'train': with open(params['save_path'] + 'normalization_funcs.pickle', 'wb') as f: pickle.dump(data_preprocessing, f) n_data = len(input_data) assert len(ingredient_quantities) == n_data for aux in self.auxiliary_keys: assert len(auxiliary_data[aux]) == n_data if split == 'train': indexes = np.arange(int(0.9 * n_data)) elif split == 'test': indexes = np.arange(int(0.9 * n_data), n_data) elif split == 'all': indexes = np.arange(n_data) else: raise ValueError # np.random.shuffle(indexes) self.taste_rep_valid = taste_rep_valid[indexes] self.input_ingredients = input_data[indexes] self.ingredient_quantities = ingredient_quantities[indexes] self.computed_cocktail_reps = computed_cocktail_reps[indexes] self.auxiliaries = dict() for aux in self.auxiliary_keys: self.auxiliaries[aux] = auxiliary_data[aux][indexes] self.nb_ingredients = nb_ingredients[indexes] def __len__(self): return len(self.input_ingredients) def get_auxiliary_data(self, idx): out = dict() for aux in self.auxiliary_keys: out[aux] = self.auxiliaries[aux][idx] return out def __getitem__(self, idx): assert self.nb_ingredients[idx] == np.argwhere(~np.isnan(self.input_ingredients[idx])).flatten().size / self.dim_rep_ingredient return [self.nb_ingredients[idx], self.input_ingredients[idx], self.ingredient_quantities[idx], self.computed_cocktail_reps[idx], self.get_auxiliary_data(idx), self.taste_rep_valid[idx]]