Spaces:
Runtime error
Runtime error
from torch.utils.data import Dataset | |
import pickle | |
from src.cocktails.utilities.ingredients_utilities import extract_ingredients, ingredient_list, ingredient_profiles, ingredients_per_type | |
from src.cocktails.utilities.other_scrubbing_utilities import print_recipe | |
import numpy as np | |
def get_representation_from_ingredient(ingredients, quantities, max_q_per_ing, index, params): | |
assert len(ingredients) == len(quantities) | |
ing, q = ingredients[index], quantities[index] | |
proportion = q / np.sum(quantities) | |
index_ing = ingredient_list.index(ing) | |
# add keys of profile | |
rep_ingredient = [] | |
rep_ingredient += [ingredient_profiles[k][index_ing] for k in params['ing_keys']] | |
# add category encoding | |
# rep_ingredient += list(params['category_encodings'][ingredient_profiles['type'][index_ing]]) | |
# add quantitiy and relative quantity | |
rep_ingredient += [q / max_q_per_ing[ing], proportion] | |
ing_one_hot = np.zeros(len(ingredient_list)) | |
ing_one_hot[index_ing] = 1 | |
rep_ingredient += list(ing_one_hot) | |
indexes_to_normalize = list(range(len(params['ing_keys']))) | |
#TODO: should we add ing one hot? Or make sure no 2 ing have same embedding | |
return np.array(rep_ingredient), indexes_to_normalize | |
def get_max_n_ingredients(data): | |
max_count = 0 | |
ingredient_set = set() | |
alcohol_set = set() | |
liqueur_set = set() | |
ing_str = np.array(data['ingredients_str']) | |
for i in range(len(data['names'])): | |
ingredients, quantities = extract_ingredients(ing_str[i]) | |
max_count = max(max_count, len(ingredients)) | |
for ing in ingredients: | |
ingredient_set.add(ing) | |
if ing in ingredients_per_type['liquor']: | |
alcohol_set.add(ing) | |
if ing in ingredients_per_type['liqueur']: | |
liqueur_set.add(ing) | |
return max_count, ingredient_set, alcohol_set, liqueur_set | |
# Add your custom dataset class here | |
class MyDataset(Dataset): | |
def __init__(self, split, params): | |
data = params['raw_data'] | |
self.dim_rep_ingredient = params['dim_rep_ingredient'] | |
n_data = len(data["names"]) | |
preparation_list = sorted(set(data['category'])) | |
categories_list = sorted(set(data['subcategory'])) | |
glasses_list = sorted(set(data['glass'])) | |
max_ingredients, ingredient_set, liquor_set, liqueur_set = get_max_n_ingredients(data) | |
ingredient_set = sorted(ingredient_set) | |
self.ingredient_set = ingredient_set | |
ingredient_quantities = [] # output of our network | |
ingr_strs = np.array(data['ingredients_str']) | |
for i in range(n_data): | |
ingredients, quantities = extract_ingredients(ingr_strs[i]) | |
# get ingredient presence and quantity | |
ingredient_q_rep = np.zeros([len(ingredient_set)]) | |
for ing, q in zip(ingredients, quantities): | |
ingredient_q_rep[ingredient_set.index(ing)] = q | |
ingredient_quantities.append(ingredient_q_rep) | |
# take care of ingredient quantities (OUTPUTS) | |
ingredient_quantities = np.array(ingredient_quantities) | |
ingredients_presence = (ingredient_quantities>0).astype(np.int) | |
min_ing_quantities = np.min(ingredient_quantities, axis=0) | |
max_ing_quantities = np.max(ingredient_quantities, axis=0) | |
def normalize_ing_quantities(ing_quantities): | |
return ((ing_quantities - min_ing_quantities) / (max_ing_quantities - min_ing_quantities)).copy() | |
def denormalize_ing_quantities(normalized_ing_quantities): | |
return (normalized_ing_quantities * (max_ing_quantities - min_ing_quantities) + min_ing_quantities).copy() | |
ing_q_when_present = ingredient_quantities.copy() | |
for i in range(len(ing_q_when_present)): | |
ing_q_when_present[i, np.where(ing_q_when_present[i, :] == 0)] = np.nan | |
self.min_when_present_ing_quantities = np.nanmin(ing_q_when_present, axis=0) | |
def filter_decoder_output(output): | |
output_unnormalized = output * max_ing_quantities | |
if output.ndim == 1: | |
output_unnormalized[np.where(output_unnormalized<self.min_when_present_ing_quantities)] = 0 | |
else: | |
for i in range(output.shape[0]): | |
output_unnormalized[i, np.where(output_unnormalized[i] < self.min_when_present_ing_quantities)] = 0 | |
return output_unnormalized.copy() | |
self.filter_decoder_output = filter_decoder_output | |
# arg_mins = np.nanargmin(ing_q_when_present, axis=0) | |
# | |
# for ing, minq, argminq in zip(ingredient_set, self.min_when_present_ing_quantities, arg_mins): | |
# print(f'__\n{ing}: {minq}') | |
# print_recipe(ingr_strs[argminq]) | |
# ingredients, quantities = extract_ingredients(ingr_strs[argminq]) | |
# # get ingredient presence and quantity | |
# ingredient_q_rep = np.zeros([len(ingredient_set)]) | |
# for ing, q in zip(ingredients, quantities): | |
# ingredient_q_rep[ingredient_set.index(ing)] = q | |
# print(np.array(data['urls'])[argminq]) | |
# stop = 1 | |
self.max_ing_quantities = max_ing_quantities | |
self.mean_ing_quantities = np.mean(ingredient_quantities, axis=0) | |
self.std_ing_quantities = np.std(ingredient_quantities, axis=0) | |
if split == 'train': | |
np.savetxt(params['save_path'] + 'min_when_present_ing_quantities.txt', self.min_when_present_ing_quantities) | |
np.savetxt(params['save_path'] + 'max_ing_quantities.txt', max_ing_quantities) | |
np.savetxt(params['save_path'] + 'mean_ing_quantities.txt', self.mean_ing_quantities) | |
np.savetxt(params['save_path'] + 'std_ing_quantities.txt', self.std_ing_quantities) | |
# print(ingredient_quantities[0]) | |
# ingredient_quantities = (ingredient_quantities - self.mean_ing_quantities) / self.std_ing_quantities | |
# print(ingredient_quantities[0]) | |
# print(ingredient_quantities[0] * self.std_ing_quantities + self.mean_ing_quantities ) | |
ingredient_quantities = ingredient_quantities / max_ing_quantities#= normalize_ing_quantities(ingredient_quantities) | |
max_q_per_ing = dict(zip(ingredient_set, max_ing_quantities)) | |
# print(ingredient_quantities[0]) | |
######### | |
# Process input representation_analysis: list of ingredient representation_analysis | |
######### | |
input_data = [] # input of ingredient encoders | |
all_ing_reps = [] | |
for i in range(n_data): | |
ingredients, quantities = extract_ingredients(ingr_strs[i]) | |
# get ingredient presence and quantity | |
ingredient_q_rep = np.zeros([len(ingredient_set)]) | |
for ing, q in zip(ingredients, quantities): | |
ingredient_q_rep[ingredient_set.index(ing)] = q | |
# get main liquor | |
cocktail_rep = [] | |
for j in range(len(ingredients)): | |
cocktail_rep.append(get_representation_from_ingredient(ingredients, quantities, max_q_per_ing, index=j, params=params)[0]) | |
all_ing_reps.append(cocktail_rep[-1].copy()) | |
input_data.append(cocktail_rep) | |
all_ing_reps = np.array(all_ing_reps) | |
min_ing_reps = np.min(all_ing_reps[:, params['indexes_ing_to_normalize']], axis=0) | |
max_ing_reps = np.max(all_ing_reps[:, params['indexes_ing_to_normalize']], axis=0) | |
def normalize_ing_reps(ing_reps): | |
if ing_reps.ndim == 1: | |
ing_reps = ing_reps.reshape(1, -1) | |
out = ing_reps.copy() | |
out[:, params['indexes_ing_to_normalize']] = (out[:, params['indexes_ing_to_normalize']] - min_ing_reps) / (max_ing_reps - min_ing_reps) | |
return out | |
def denormalize_ing_reps(normalized_ing_reps): | |
if normalized_ing_reps.ndim == 1: | |
normalized_ing_reps = normalized_ing_reps.reshape(1, -1) | |
out = normalized_ing_reps.copy() | |
out[:, params['indexes_ing_to_normalize']] = out[:, params['indexes_ing_to_normalize']] * (max_ing_reps - min_ing_reps) + min_ing_reps | |
return out | |
# put everything in a big matrix | |
dim_cocktail_rep = max_ingredients * self.dim_rep_ingredient | |
input_data2 = [] | |
nb_ingredients = [] | |
for d in input_data: | |
cocktail_rep = np.zeros([dim_cocktail_rep]) | |
cocktail_rep.fill(np.nan) | |
index = 0 | |
nb_ingredients.append(len(d)) | |
for dj in d: | |
cocktail_rep[index:index + self.dim_rep_ingredient] = normalize_ing_reps(dj) | |
index += self.dim_rep_ingredient | |
input_data2.append(cocktail_rep) | |
input_data = np.array(input_data2) | |
nb_ingredients = np.array(nb_ingredients) | |
# let us now extract various possible output we might want to predict: | |
######### | |
# Process output cocktail representation_analysis (computed from ingredient reps) | |
######### | |
# quantities_indexes = np.arange(20, 456, 57) | |
# qs = input_data[0, quantities_indexes] | |
# ingredient_quantities[0] | |
# get final volume | |
volumes = np.array(params['raw_data']['end volume']) | |
min_vol = volumes.min() | |
max_vol = volumes.max() | |
def normalize_vol(volume): | |
return (volume - min_vol) / (max_vol - min_vol) | |
def denormalize_vol(normalized_vol): | |
return normalized_vol * (max_vol - min_vol) + min_vol | |
volumes = normalize_vol(volumes) | |
# computed cocktail representation | |
computed_cocktail_reps = params['cocktail_reps'] | |
self.dim_rep = computed_cocktail_reps.shape[1] | |
######### | |
# Process output sub categories | |
######### | |
categories = np.array([categories_list.index(sc) for sc in data['subcategory']]) | |
counts = dict(zip(categories_list, [0] * len(categories))) | |
for c in data['subcategory']: | |
counts[c] += 1 | |
for k in counts.keys(): | |
counts[k] /= len(data['subcategory']) | |
self.categories = categories_list | |
self.categories_weights = [] | |
for c in self.categories: | |
self.categories_weights.append(1/len(self.categories)/counts[c]) | |
print(counts) | |
######### | |
# Process output glass type | |
######### | |
glasses = np.array([glasses_list.index(sc) for sc in data['glass']]) | |
counts = dict(zip(glasses_list, [0] * len(set(data['glass'])))) | |
for c in data['glass']: | |
counts[c] += 1 | |
for k in counts.keys(): | |
counts[k] /= len(data['glass']) | |
self.glasses = glasses_list | |
self.glasses_weights = [] | |
for c in self.glasses: | |
self.glasses_weights.append(1 / len(self.glasses) / counts[c]) | |
print(counts) | |
######### | |
# Process output preparation type | |
######### | |
prep_type = np.array([preparation_list.index(sc) for sc in data['category']]) | |
counts = dict(zip(preparation_list, [0] * len(preparation_list))) | |
for c in data['category']: | |
counts[c] += 1 | |
for k in counts.keys(): | |
counts[k] /= len(data['category']) | |
self.prep_types = preparation_list | |
self.prep_types_weights = [] | |
for c in self.prep_types: | |
self.prep_types_weights.append(1 / len(self.prep_types) / counts[c]) | |
print(counts) | |
taste_reps = list(data['taste_rep']) | |
taste_rep_ground_truth = [] | |
taste_rep_valid = [] | |
for tr in taste_reps: | |
if len(tr) > 2: | |
taste_rep_valid.append(True) | |
taste_rep_ground_truth.append([float(tr.split('[')[1].split(',')[0]), float(tr.split(']')[0].split(',')[1][1:])]) | |
else: | |
taste_rep_valid.append(False) | |
taste_rep_ground_truth.append([np.nan, np.nan]) | |
taste_rep_ground_truth = np.array(taste_rep_ground_truth) | |
taste_rep_valid = np.array(taste_rep_valid) | |
taste_rep_ground_truth /= 10 | |
auxiliary_data = dict(categories=categories, | |
glasses=glasses, | |
prep_type=prep_type, | |
cocktail_reps=computed_cocktail_reps, | |
ingredients_presence=ingredients_presence, | |
taste_reps=taste_rep_ground_truth, | |
volume=volumes, | |
ingredients_quantities=ingredient_quantities) | |
self.auxiliary_keys = sorted(params['auxiliaries_dict'].keys()) | |
assert self.auxiliary_keys == sorted(auxiliary_data.keys()) | |
data_preprocessing = dict(min_max_ing_quantities=(min_ing_quantities, max_ing_quantities), | |
min_max_ing_reps=(min_ing_reps, max_ing_reps), | |
min_max_vol=(min_vol, max_vol)) | |
if split == 'train': | |
with open(params['save_path'] + 'normalization_funcs.pickle', 'wb') as f: | |
pickle.dump(data_preprocessing, f) | |
n_data = len(input_data) | |
assert len(ingredient_quantities) == n_data | |
for aux in self.auxiliary_keys: | |
assert len(auxiliary_data[aux]) == n_data | |
if split == 'train': | |
indexes = np.arange(int(0.9 * n_data)) | |
elif split == 'test': | |
indexes = np.arange(int(0.9 * n_data), n_data) | |
elif split == 'all': | |
indexes = np.arange(n_data) | |
else: | |
raise ValueError | |
# np.random.shuffle(indexes) | |
self.taste_rep_valid = taste_rep_valid[indexes] | |
self.input_ingredients = input_data[indexes] | |
self.ingredient_quantities = ingredient_quantities[indexes] | |
self.computed_cocktail_reps = computed_cocktail_reps[indexes] | |
self.auxiliaries = dict() | |
for aux in self.auxiliary_keys: | |
self.auxiliaries[aux] = auxiliary_data[aux][indexes] | |
self.nb_ingredients = nb_ingredients[indexes] | |
def __len__(self): | |
return len(self.input_ingredients) | |
def get_auxiliary_data(self, idx): | |
out = dict() | |
for aux in self.auxiliary_keys: | |
out[aux] = self.auxiliaries[aux][idx] | |
return out | |
def __getitem__(self, idx): | |
assert self.nb_ingredients[idx] == np.argwhere(~np.isnan(self.input_ingredients[idx])).flatten().size / self.dim_rep_ingredient | |
return [self.nb_ingredients[idx], self.input_ingredients[idx], self.ingredient_quantities[idx], self.computed_cocktail_reps[idx], self.get_auxiliary_data(idx), | |
self.taste_rep_valid[idx]] |