File size: 4,259 Bytes
28d0c5f
 
74a35d9
 
 
3fdcb38
74a35d9
3fdcb38
74a35d9
28d0c5f
 
e8a1983
28d0c5f
 
 
 
 
 
3fdcb38
 
 
28d0c5f
 
 
 
3fdcb38
 
 
 
 
 
 
 
 
 
 
 
 
28d0c5f
74a35d9
28d0c5f
 
 
3fdcb38
 
28d0c5f
 
 
 
 
 
 
 
 
 
 
 
3fdcb38
 
 
 
28d0c5f
3fdcb38
023235e
3fdcb38
bafb40b
023235e
 
3fdcb38
023235e
3fdcb38
 
 
 
 
28d0c5f
 
 
 
bafb40b
 
 
 
 
 
 
28d0c5f
 
 
3fdcb38
e8a1983
3fdcb38
 
 
7aaf29c
 
 
 
 
 
 
3fdcb38
7aaf29c
3fdcb38
7aaf29c
3fdcb38
7aaf29c
 
3fdcb38
 
7aaf29c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import json
import pickle
from pathlib import Path

import epitran
import pandas as pd

from aip_trainer import PROJECT_ROOT_FOLDER, app_logger
from aip_trainer.models import RuleBasedModels


class TextDataset:
    def __init__(self, table, language='-'):
        self.table_dataframe = table
        self.number_of_samples = len(table)
        self.language = language

    def __getitem__(self, idx):
        language_sentence = f"{self.language}_sentence" if self.language != '-' else 'sentence'
        language_series = self.table_dataframe[language_sentence]
        return [language_series.iloc[idx]]

    def __len__(self):
        return self.number_of_samples

    def get_category_from_df_by_language(self, language: str, category_value:int):
        selector = self.table_dataframe[f"{language}_category"] == category_value
        df_by_category = self.table_dataframe[selector]
        return df_by_category

    def get_random_sample_from_df(self, language: str, category_value:int):
        app_logger.info(f"language={language}, category_value={category_value}.")
        choice = self.table_dataframe.sample(n=1)
        if category_value !=0:
            df_language_filtered_by_category_and_language = self.get_category_from_df_by_language(language, category_value)
            choice = df_language_filtered_by_category_and_language.sample(n=1)
        return [choice[f"{language}_sentence"].iloc[0]]


sample_folder = Path(PROJECT_ROOT_FOLDER / "aip_trainer" / "lambdas")
lambda_database = {}
lambda_ipa_converter = {}

with open(sample_folder / 'data_de_en_with_categories.json', 'r') as src:
    df = pd.read_json(src)

lambda_database['de'] = TextDataset(df, 'de')
lambda_database['en'] = TextDataset(df, 'en')
lambda_translate_new_sample = False
lambda_ipa_converter['de'] = RuleBasedModels.EpitranPhonemConverter(
    epitran.Epitran('deu-Latn'))
lambda_ipa_converter['en'] = RuleBasedModels.EngPhonemConverter()


def lambda_handler(event, context):
    body = json.loads(event['body'])

    try:
        category = int(body['category'])
    except KeyError:
        category = 0
    language = body['language']
    try:
        current_transcript = str(body["transcript"])
    except KeyError:
        current_transcript = get_random_selection(language, category, is_gradio_output=False)
    current_transcript = current_transcript if isinstance(current_transcript, str) else current_transcript[0]
    current_ipa = lambda_ipa_converter[language].convertToPhonem(current_transcript)

    app_logger.info(f"real_transcript='{current_transcript}', ipa_transcript='{current_ipa}'.")
    result = {
        'real_transcript': current_transcript,
        'ipa_transcript': current_ipa,
        'transcript_translation': ""
    }

    return json.dumps(result)


def get_random_selection(language: str, category: int, is_gradio_output=True):
    lambda_df_lang = lambda_database[language]
    current_transcript = lambda_df_lang.get_random_sample_from_df(language, category)
    app_logger.info(f"category={category}, language={language}, current_transcript={current_transcript}.")
    return current_transcript[0] if is_gradio_output else current_transcript


def getSentenceCategory(sentence) -> int:
    number_of_words = len(sentence.split())
    categories_word_limits = [0, 8, 20, 100000]
    for category in range(len(categories_word_limits) - 1):
        if categories_word_limits[category] < number_of_words <= categories_word_limits[category + 1]:
            return category + 1


def get_pickle2json_dataframe(
        custom_pickle_filename_no_ext: Path | str = 'data_de_en_2',
        custom_folder: Path = sample_folder
    ):
    custom_folder = Path(custom_folder)
    with open(custom_folder / f'{custom_pickle_filename_no_ext}.pickle', 'rb') as handle:
        df2 = pickle.load(handle)
        pass
        df2["de_category"] = df2["de_sentence"].apply(getSentenceCategory)
        print("de_category added")
        df2["en_category"] = df2["en_sentence"].apply(getSentenceCategory)
        print("en_category added")
    df_json = df2.to_json()
    with open(custom_folder / f'{custom_pickle_filename_no_ext}.json', 'w') as dst:
        dst.write(df_json)
        print("data_de_en_with_categories.json written")