alessandro trinca tornidor commited on
Commit
0700cb3
·
1 Parent(s): 2f5403b

doc: add/update docstring and typing hints

Browse files
AIModels.py CHANGED
@@ -8,22 +8,50 @@ class NeuralASR(ModelInterfaces.IASRModel):
8
  audio_transcript = None
9
 
10
  def __init__(self, model: torch.nn.Module, decoder) -> None:
 
 
 
 
 
 
 
11
  super().__init__()
12
  self.model = model
13
  self.decoder = decoder # Decoder from CTC-outputs to transcripts
14
 
15
  def getTranscript(self) -> str:
16
- """Get the transcripts of the process audio"""
 
 
 
 
 
 
 
 
17
  assert self.audio_transcript is not None, 'Can get audio transcripts without having processed the audio'
18
  return self.audio_transcript
19
 
20
  def getWordLocations(self) -> list:
21
- """Get the pair of words location from audio"""
 
 
 
 
 
 
 
 
22
  assert self.word_locations_in_samples is not None, 'Can get word locations without having processed the audio'
23
  return self.word_locations_in_samples
24
 
25
- def processAudio(self, audio: torch.Tensor):
26
- """Process the audio"""
 
 
 
 
 
27
  audio_length_in_samples = audio.shape[1]
28
  with torch.inference_mode():
29
  nn_output = self.model(audio)
@@ -34,11 +62,27 @@ class NeuralASR(ModelInterfaces.IASRModel):
34
 
35
  class NeuralTTS(ModelInterfaces.ITextToSpeechModel):
36
  def __init__(self, model: torch.nn.Module, sampling_rate: int) -> None:
 
 
 
 
 
 
 
37
  super().__init__()
38
  self.model = model
39
  self.sampling_rate = sampling_rate
40
 
41
  def getAudioFromSentence(self, sentence: str) -> np.array:
 
 
 
 
 
 
 
 
 
42
  with torch.inference_mode():
43
  audio_transcript = self.model.apply_tts(texts=[sentence],
44
  sample_rate=self.sampling_rate)[0]
@@ -48,12 +92,27 @@ class NeuralTTS(ModelInterfaces.ITextToSpeechModel):
48
 
49
  class NeuralTranslator(ModelInterfaces.ITranslationModel):
50
  def __init__(self, model: torch.nn.Module, tokenizer) -> None:
 
 
 
 
 
 
 
51
  super().__init__()
52
  self.model = model
53
  self.tokenizer = tokenizer
54
 
55
  def translateSentence(self, sentence: str) -> str:
56
- """Get the transcripts of the process audio"""
 
 
 
 
 
 
 
 
57
  tokenized_text = self.tokenizer(sentence, return_tensors='pt')
58
  translation = self.model.generate(**tokenized_text)
59
  translated_text = self.tokenizer.batch_decode(
 
8
  audio_transcript = None
9
 
10
  def __init__(self, model: torch.nn.Module, decoder) -> None:
11
+ """
12
+ Initialize the NeuralASR (Audio Speech Recognition) model.
13
+
14
+ Args:
15
+ model (torch.nn.Module): The neural network model for ASR.
16
+ decoder: The decoder to convert CTC outputs to transcripts.
17
+ """
18
  super().__init__()
19
  self.model = model
20
  self.decoder = decoder # Decoder from CTC-outputs to transcripts
21
 
22
  def getTranscript(self) -> str:
23
+ """
24
+ Get the transcript of the processed audio.
25
+
26
+ Returns:
27
+ str: The audio transcript.
28
+
29
+ Raises:
30
+ AssertionError: If the audio has not been processed.
31
+ """
32
  assert self.audio_transcript is not None, 'Can get audio transcripts without having processed the audio'
33
  return self.audio_transcript
34
 
35
  def getWordLocations(self) -> list:
36
+ """
37
+ Get the word locations from the processed audio.
38
+
39
+ Returns:
40
+ list: A list of word locations in samples.
41
+
42
+ Raises:
43
+ AssertionError: If the audio has not been processed.
44
+ """
45
  assert self.word_locations_in_samples is not None, 'Can get word locations without having processed the audio'
46
  return self.word_locations_in_samples
47
 
48
+ def processAudio(self, audio: torch.Tensor) -> None:
49
+ """
50
+ Process the audio to generate transcripts and word locations.
51
+
52
+ Args:
53
+ audio (torch.Tensor): The input audio tensor.
54
+ """
55
  audio_length_in_samples = audio.shape[1]
56
  with torch.inference_mode():
57
  nn_output = self.model(audio)
 
62
 
63
  class NeuralTTS(ModelInterfaces.ITextToSpeechModel):
64
  def __init__(self, model: torch.nn.Module, sampling_rate: int) -> None:
65
+ """
66
+ Initialize the NeuralTTS (Text to Speech) model.
67
+
68
+ Args:
69
+ model (torch.nn.Module): The neural network model for TTS.
70
+ sampling_rate (int): The sampling rate for the audio.
71
+ """
72
  super().__init__()
73
  self.model = model
74
  self.sampling_rate = sampling_rate
75
 
76
  def getAudioFromSentence(self, sentence: str) -> np.array:
77
+ """
78
+ Generate audio from a given sentence.
79
+
80
+ Args:
81
+ sentence (str): The input sentence.
82
+
83
+ Returns:
84
+ np.array: The generated audio as a numpy array.
85
+ """
86
  with torch.inference_mode():
87
  audio_transcript = self.model.apply_tts(texts=[sentence],
88
  sample_rate=self.sampling_rate)[0]
 
92
 
93
  class NeuralTranslator(ModelInterfaces.ITranslationModel):
94
  def __init__(self, model: torch.nn.Module, tokenizer) -> None:
95
+ """
96
+ Initialize the NeuralTranslator model.
97
+
98
+ Args:
99
+ model (torch.nn.Module): The neural network model for translation.
100
+ tokenizer: The tokenizer for text processing.
101
+ """
102
  super().__init__()
103
  self.model = model
104
  self.tokenizer = tokenizer
105
 
106
  def translateSentence(self, sentence: str) -> str:
107
+ """
108
+ Translate a given sentence to the target language.
109
+
110
+ Args:
111
+ sentence (str): The input sentence.
112
+
113
+ Returns:
114
+ str: The translated sentence.
115
+ """
116
  tokenized_text = self.tokenizer(sentence, return_tensors='pt')
117
  translation = self.model.generate(**tokenized_text)
118
  translated_text = self.tokenizer.batch_decode(
ModelInterfaces.py CHANGED
@@ -4,6 +4,7 @@ import numpy as np
4
 
5
 
6
  class IASRModel(metaclass=abc.ABCMeta):
 
7
  @classmethod
8
  def __subclasshook__(cls, subclass):
9
  return (hasattr(subclass, 'getTranscript') and
@@ -30,6 +31,7 @@ class IASRModel(metaclass=abc.ABCMeta):
30
 
31
 
32
  class ITranslationModel(metaclass=abc.ABCMeta):
 
33
  @classmethod
34
  def __subclasshook__(cls, subclass):
35
  return (hasattr(subclass, 'translateSentence') and
@@ -42,6 +44,7 @@ class ITranslationModel(metaclass=abc.ABCMeta):
42
 
43
 
44
  class ITextToSpeechModel(metaclass=abc.ABCMeta):
 
45
  @classmethod
46
  def __subclasshook__(cls, subclass):
47
  return (hasattr(subclass, 'getAudioFromSentence') and
@@ -54,6 +57,7 @@ class ITextToSpeechModel(metaclass=abc.ABCMeta):
54
 
55
 
56
  class ITextToPhonemModel(metaclass=abc.ABCMeta):
 
57
  @classmethod
58
  def __subclasshook__(cls, subclass):
59
  return (hasattr(subclass, 'convertToPhonem') and
 
4
 
5
 
6
  class IASRModel(metaclass=abc.ABCMeta):
7
+ """Automatic Speech Recognition Model Interface"""
8
  @classmethod
9
  def __subclasshook__(cls, subclass):
10
  return (hasattr(subclass, 'getTranscript') and
 
31
 
32
 
33
  class ITranslationModel(metaclass=abc.ABCMeta):
34
+ """Translation model"""
35
  @classmethod
36
  def __subclasshook__(cls, subclass):
37
  return (hasattr(subclass, 'translateSentence') and
 
44
 
45
 
46
  class ITextToSpeechModel(metaclass=abc.ABCMeta):
47
+ """Text to Speech model"""
48
  @classmethod
49
  def __subclasshook__(cls, subclass):
50
  return (hasattr(subclass, 'getAudioFromSentence') and
 
57
 
58
 
59
  class ITextToPhonemModel(metaclass=abc.ABCMeta):
60
+ """Text to Phonem model, needed to evaluate the correctness of speech pronunciation"""
61
  @classmethod
62
  def __subclasshook__(cls, subclass):
63
  return (hasattr(subclass, 'convertToPhonem') and
RuleBasedModels.py CHANGED
@@ -4,8 +4,22 @@ import numpy as np
4
  import epitran
5
  import eng_to_ipa
6
 
 
 
7
 
8
  def get_phonem_converter(language: str):
 
 
 
 
 
 
 
 
 
 
 
 
9
  if language == 'de':
10
  phonem_converter = EpitranPhonemConverter(
11
  epitran.Epitran('deu-Latn'))
@@ -17,24 +31,61 @@ def get_phonem_converter(language: str):
17
  return phonem_converter
18
 
19
  class EpitranPhonemConverter(ModelInterfaces.ITextToPhonemModel):
 
 
 
20
  word_locations_in_samples = None
21
  audio_transcript = None
22
 
23
  def __init__(self, epitran_model) -> None:
 
 
 
 
 
 
24
  super().__init__()
25
  self.epitran_model = epitran_model
26
 
27
  def convertToPhonem(self, sentence: str) -> str:
 
 
 
 
 
 
 
 
 
 
28
  phonem_representation = self.epitran_model.transliterate(sentence)
 
29
  return phonem_representation
30
 
31
 
32
  class EngPhonemConverter(ModelInterfaces.ITextToPhonemModel):
 
 
 
33
 
34
  def __init__(self,) -> None:
 
 
 
35
  super().__init__()
36
 
37
  def convertToPhonem(self, sentence: str) -> str:
 
 
 
 
 
 
 
 
 
 
38
  phonem_representation = eng_to_ipa.convert(sentence)
39
  phonem_representation = phonem_representation.replace('*','')
 
40
  return phonem_representation
 
4
  import epitran
5
  import eng_to_ipa
6
 
7
+ from constants import app_logger
8
+
9
 
10
  def get_phonem_converter(language: str):
11
+ """
12
+ Get the phoneme converter for the specified language.
13
+
14
+ Args:
15
+ language (str): The language code (e.g., 'de' for German, 'en' for English).
16
+
17
+ Returns:
18
+ ModelInterfaces.ITextToPhonemModel: The phoneme converter for the specified language.
19
+
20
+ Raises:
21
+ ValueError: If the language is not implemented.
22
+ """
23
  if language == 'de':
24
  phonem_converter = EpitranPhonemConverter(
25
  epitran.Epitran('deu-Latn'))
 
31
  return phonem_converter
32
 
33
  class EpitranPhonemConverter(ModelInterfaces.ITextToPhonemModel):
34
+ """
35
+ A phoneme converter using the Epitran library for transliteration.
36
+ """
37
  word_locations_in_samples = None
38
  audio_transcript = None
39
 
40
  def __init__(self, epitran_model) -> None:
41
+ """
42
+ Initialize the EpitranPhonemConverter with an Epitran model.
43
+
44
+ Args:
45
+ epitran_model: The Epitran model for transliteration.
46
+ """
47
  super().__init__()
48
  self.epitran_model = epitran_model
49
 
50
  def convertToPhonem(self, sentence: str) -> str:
51
+ """
52
+ Convert a sentence to its phoneme representation.
53
+
54
+ Args:
55
+ sentence (str): The input sentence.
56
+
57
+ Returns:
58
+ str: The phoneme representation of the sentence.
59
+ """
60
+ app_logger.debug(f'starting EpitranPhonemConverter.convertToPhonem for sentence/token "{sentence}"...')
61
  phonem_representation = self.epitran_model.transliterate(sentence)
62
+ app_logger.debug(f'EpitranPhonemConverter: got phonem_representation for sentence/token "{sentence}"!')
63
  return phonem_representation
64
 
65
 
66
  class EngPhonemConverter(ModelInterfaces.ITextToPhonemModel):
67
+ """
68
+ A phoneme converter for English using the eng\_to\_ipa library.
69
+ """
70
 
71
  def __init__(self,) -> None:
72
+ """
73
+ Initialize the EngPhonemConverter.
74
+ """
75
  super().__init__()
76
 
77
  def convertToPhonem(self, sentence: str) -> str:
78
+ """
79
+ Convert a sentence to its phoneme representation.
80
+
81
+ Args:
82
+ sentence (str): The input sentence.
83
+
84
+ Returns:
85
+ str: The phoneme representation of the sentence.
86
+ """
87
+ app_logger.debug(f'starting EngPhonemConverter.convertToPhonem for sentence/token "{sentence}"...')
88
  phonem_representation = eng_to_ipa.convert(sentence)
89
  phonem_representation = phonem_representation.replace('*','')
90
+ app_logger.debug(f'EngPhonemConverter: got phonem_representation for sentence/token "{sentence}"!')
91
  return phonem_representation
WordMatching.py CHANGED
@@ -14,6 +14,16 @@ TIME_THRESHOLD_MAPPING = 5.0
14
 
15
 
16
  def get_word_distance_matrix(words_estimated: list, words_real: list) -> np.ndarray:
 
 
 
 
 
 
 
 
 
 
17
  number_of_real_words = len(words_real)
18
  number_of_estimated_words = len(words_estimated)
19
 
@@ -32,6 +42,15 @@ def get_word_distance_matrix(words_estimated: list, words_real: list) -> np.ndar
32
 
33
 
34
  def get_best_path_from_distance_matrix(word_distance_matrix):
 
 
 
 
 
 
 
 
 
35
  modelCpp = cp_model.CpModel()
36
 
37
  number_of_real_words = word_distance_matrix.shape[1]
@@ -86,6 +105,17 @@ def get_best_path_from_distance_matrix(word_distance_matrix):
86
 
87
 
88
  def get_resulting_string(mapped_indices: np.ndarray, words_estimated: list, words_real: list) -> Tuple[List, List]:
 
 
 
 
 
 
 
 
 
 
 
89
  mapped_words = []
90
  mapped_words_indices = []
91
  WORD_NOT_FOUND_TOKEN = '-'
@@ -128,6 +158,17 @@ def get_resulting_string(mapped_indices: np.ndarray, words_estimated: list, word
128
 
129
 
130
  def get_best_mapped_words(words_estimated: list | str, words_real: list | str, use_dtw:bool = False) -> tuple[list, list]:
 
 
 
 
 
 
 
 
 
 
 
131
  app_logger.info(f"words_estimated: '{words_estimated}', words_real: '{words_real}', use_dtw:{use_dtw}.")
132
  word_distance_matrix = get_word_distance_matrix(
133
  words_estimated, words_real)
@@ -175,7 +216,17 @@ def get_best_mapped_words(words_estimated: list | str, words_real: list | str, u
175
  # return mapped_words, mapped_words_indices
176
 
177
 
178
- def getWhichLettersWereTranscribedCorrectly(real_word, transcribed_word):
 
 
 
 
 
 
 
 
 
 
179
  is_leter_correct = [None] * len(real_word)
180
  for idx, letter in enumerate(real_word):
181
  letter = letter.lower()
 
14
 
15
 
16
  def get_word_distance_matrix(words_estimated: list, words_real: list) -> np.ndarray:
17
+ """
18
+ Calculate the word distance matrix using Levenshtein distance.
19
+
20
+ Args:
21
+ words_estimated (list): List of estimated words.
22
+ words_real (list): List of real words.
23
+
24
+ Returns:
25
+ np.ndarray: The word distance matrix.
26
+ """
27
  number_of_real_words = len(words_real)
28
  number_of_estimated_words = len(words_estimated)
29
 
 
42
 
43
 
44
  def get_best_path_from_distance_matrix(word_distance_matrix):
45
+ """
46
+ Get the best path from the word distance matrix using constraint programming.
47
+
48
+ Args:
49
+ word_distance_matrix (np.ndarray): The word distance matrix.
50
+
51
+ Returns:
52
+ np.ndarray: The best path indices.
53
+ """
54
  modelCpp = cp_model.CpModel()
55
 
56
  number_of_real_words = word_distance_matrix.shape[1]
 
105
 
106
 
107
  def get_resulting_string(mapped_indices: np.ndarray, words_estimated: list, words_real: list) -> Tuple[List, List]:
108
+ """
109
+ Get the resulting string and indices from the mapped indices.
110
+
111
+ Args:
112
+ mapped_indices (np.ndarray): The mapped indices.
113
+ words_estimated (list): List of estimated words.
114
+ words_real (list): List of real words.
115
+
116
+ Returns:
117
+ Tuple[List, List]: The mapped words and their indices.
118
+ """
119
  mapped_words = []
120
  mapped_words_indices = []
121
  WORD_NOT_FOUND_TOKEN = '-'
 
158
 
159
 
160
  def get_best_mapped_words(words_estimated: list | str, words_real: list | str, use_dtw:bool = False) -> tuple[list, list]:
161
+ """
162
+ Get the best mapped words using either DTW or constraint programming.
163
+
164
+ Args:
165
+ words_estimated (list | str): List of estimated words or a single estimated word.
166
+ words_real (list | str): List of real words or a single real word.
167
+ use_dtw (bool, optional): Whether to use DTW for mapping. Defaults to False.
168
+
169
+ Returns:
170
+ tuple[list, list]: The mapped words and their indices.
171
+ """
172
  app_logger.info(f"words_estimated: '{words_estimated}', words_real: '{words_real}', use_dtw:{use_dtw}.")
173
  word_distance_matrix = get_word_distance_matrix(
174
  words_estimated, words_real)
 
216
  # return mapped_words, mapped_words_indices
217
 
218
 
219
+ def getWhichLettersWereTranscribedCorrectly(real_word: str, transcribed_word: list) -> list:
220
+ """
221
+ Determine which letters were transcribed correctly.
222
+
223
+ Args:
224
+ real_word (str): The real word.
225
+ transcribed_word (str): The transcribed word.
226
+
227
+ Returns:
228
+ list: A list indicating whether each letter was transcribed correctly (1 for correct, 0 for incorrect).
229
+ """
230
  is_leter_correct = [None] * len(real_word)
231
  for idx, letter in enumerate(real_word):
232
  letter = letter.lower()
WordMetrics.py CHANGED
@@ -1,10 +1,12 @@
1
  import numpy as np
2
 
 
3
  # ref from https://gitlab.com/-/snippets/1948157
4
  # For some variants, look here https://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Levenshtein_distance#Python
5
 
6
- # Pure python
7
- def edit_distance_python2(a, b):
 
8
  # This version is commutative, so as an optimization we force |a|>=|b|
9
  if len(a) < len(b):
10
  return edit_distance_python(b, a)
@@ -12,44 +14,54 @@ def edit_distance_python2(a, b):
12
  return len(a)
13
  # Only two rows are really needed: the one currently filled in, and the previous
14
  distances = []
15
- distances.append([i for i in range(len(b)+1)])
16
- distances.append([0 for _ in range(len(b)+1)])
17
  # We can prefill the first row:
18
  costs = [0 for _ in range(3)]
19
  for i, a_token in enumerate(a, start=1):
20
  distances[1][0] += 1 # Deals with the first column.
21
  for j, b_token in enumerate(b, start=1):
22
- costs[0] = distances[1][j-1] + 1
23
  costs[1] = distances[0][j] + 1
24
- costs[2] = distances[0][j-1] + (0 if a_token == b_token else 1)
25
  distances[1][j] = min(costs)
26
  # Move to the next row:
27
  distances[0][:] = distances[1][:]
28
  return distances[1][len(b)]
29
 
 
30
  #https://stackabuse.com/levenshtein-distance-and-text-similarity-in-python/
31
- def edit_distance_python(seq1, seq2):
 
 
 
 
 
 
 
 
 
32
  size_x = len(seq1) + 1
33
  size_y = len(seq2) + 1
34
- matrix = np.zeros ((size_x, size_y))
35
  for x in range(size_x):
36
- matrix [x, 0] = x
37
  for y in range(size_y):
38
- matrix [0, y] = y
39
 
40
  for x in range(1, size_x):
41
  for y in range(1, size_y):
42
- if seq1[x-1] == seq2[y-1]:
43
- matrix [x,y] = min(
44
- matrix[x-1, y] + 1,
45
- matrix[x-1, y-1],
46
- matrix[x, y-1] + 1
47
  )
48
  else:
49
- matrix [x,y] = min(
50
- matrix[x-1,y] + 1,
51
- matrix[x-1,y-1] + 1,
52
- matrix[x,y-1] + 1
53
  )
54
  #print (matrix)
55
- return matrix[size_x - 1, size_y - 1]
 
1
  import numpy as np
2
 
3
+
4
  # ref from https://gitlab.com/-/snippets/1948157
5
  # For some variants, look here https://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Levenshtein_distance#Python
6
 
7
+ # Pure/numpy python
8
+ def edit_distance_python2(a: str, b: str) -> np.ndarray | int:
9
+ """A pure python levenshtein distance implementation"""
10
  # This version is commutative, so as an optimization we force |a|>=|b|
11
  if len(a) < len(b):
12
  return edit_distance_python(b, a)
 
14
  return len(a)
15
  # Only two rows are really needed: the one currently filled in, and the previous
16
  distances = []
17
+ distances.append([i for i in range(len(b) + 1)])
18
+ distances.append([0 for _ in range(len(b) + 1)])
19
  # We can prefill the first row:
20
  costs = [0 for _ in range(3)]
21
  for i, a_token in enumerate(a, start=1):
22
  distances[1][0] += 1 # Deals with the first column.
23
  for j, b_token in enumerate(b, start=1):
24
+ costs[0] = distances[1][j - 1] + 1
25
  costs[1] = distances[0][j] + 1
26
+ costs[2] = distances[0][j - 1] + (0 if a_token == b_token else 1)
27
  distances[1][j] = min(costs)
28
  # Move to the next row:
29
  distances[0][:] = distances[1][:]
30
  return distances[1][len(b)]
31
 
32
+
33
  #https://stackabuse.com/levenshtein-distance-and-text-similarity-in-python/
34
+ def edit_distance_python(seq1: str, seq2: str) -> np.ndarray:
35
+ """A levenshtein distance implementation.
36
+
37
+ Args:
38
+ seq1 (str): First sequence.
39
+ seq2 (str): Second sequence.
40
+
41
+ Returns:
42
+ np.ndarray: The levenshtein distance between the two sequences.
43
+ """
44
  size_x = len(seq1) + 1
45
  size_y = len(seq2) + 1
46
+ matrix = np.zeros((size_x, size_y))
47
  for x in range(size_x):
48
+ matrix[x, 0] = x
49
  for y in range(size_y):
50
+ matrix[0, y] = y
51
 
52
  for x in range(1, size_x):
53
  for y in range(1, size_y):
54
+ if seq1[x - 1] == seq2[y - 1]:
55
+ matrix[x, y] = min(
56
+ matrix[x - 1, y] + 1,
57
+ matrix[x - 1, y - 1],
58
+ matrix[x, y - 1] + 1
59
  )
60
  else:
61
+ matrix[x, y] = min(
62
+ matrix[x - 1, y] + 1,
63
+ matrix[x - 1, y - 1] + 1,
64
+ matrix[x, y - 1] + 1
65
  )
66
  #print (matrix)
67
+ return matrix[size_x - 1, size_y - 1]
faster_whisper_wrapper.py CHANGED
@@ -3,10 +3,12 @@ from typing import Union
3
  import numpy as np
4
  import onnxruntime
5
  import torch
6
- from faster_whisper import WhisperModel
7
 
8
  from ModelInterfaces import IASRModel
9
  from constants import sample_rate_resample, app_logger, IS_TESTING, DEVICE
 
 
10
 
11
  device = onnxruntime.get_device()
12
  device = "cpu" if IS_TESTING or device.lower() == DEVICE.lower() else device
@@ -15,7 +17,16 @@ device_compute = "int8_float16" if device == "cuda" else "int8"
15
  app_logger.info(f"device: {device}, device_compute: {device_compute} #")
16
 
17
 
18
- def parse_word_info(word_info, sample_rate):
 
 
 
 
 
 
 
 
 
19
  start_ts = float(word_info.start) * sample_rate
20
  end_ts = float(word_info.end) * sample_rate
21
  word = word_info.word
@@ -23,14 +34,24 @@ def parse_word_info(word_info, sample_rate):
23
 
24
 
25
  class FasterWhisperASRModel(IASRModel):
26
- def __init__(self, model_name="base", language=None):
 
27
  self.asr = WhisperModel(model_name, device=device, compute_type=device_compute)
28
  self._transcript = ""
29
  self._word_locations = []
30
  self.sample_rate = sample_rate_resample
31
  self.language = language
32
 
33
- def processAudio(self, audio:Union[np.ndarray, torch.Tensor]):
 
 
 
 
 
 
 
 
 
34
  # 'audio' can be a path to a file or a numpy array of audio samples.
35
  if isinstance(audio, torch.Tensor):
36
  audio = audio.detach().cpu().numpy()
@@ -50,7 +71,9 @@ class FasterWhisperASRModel(IASRModel):
50
  self._transcript = " ".join(transcript)
51
 
52
  def getTranscript(self) -> str:
 
53
  return self._transcript
54
 
55
- def getWordLocations(self) -> list:
 
56
  return self._word_locations
 
3
  import numpy as np
4
  import onnxruntime
5
  import torch
6
+ from faster_whisper import WhisperModel, transcribe
7
 
8
  from ModelInterfaces import IASRModel
9
  from constants import sample_rate_resample, app_logger, IS_TESTING, DEVICE
10
+ from typing_hints import ParsedWordInfo
11
+
12
 
13
  device = onnxruntime.get_device()
14
  device = "cpu" if IS_TESTING or device.lower() == DEVICE.lower() else device
 
17
  app_logger.info(f"device: {device}, device_compute: {device_compute} #")
18
 
19
 
20
+ def parse_word_info(word_info: transcribe.Word, sample_rate: int) -> ParsedWordInfo:
21
+ """Parse a word info object from WhisperModel into a dictionary with start and end timestamps.
22
+
23
+ Args:
24
+ word_info (transcribe.Word): Word object from WhisperModel.transcribe module
25
+ sample_rate (int): Sample rate of the audio
26
+
27
+ Returns:
28
+ ParsedWordInfo: Dictionary with the current single word, start_ts and end_ts keys
29
+ """
30
  start_ts = float(word_info.start) * sample_rate
31
  end_ts = float(word_info.end) * sample_rate
32
  word = word_info.word
 
34
 
35
 
36
  class FasterWhisperASRModel(IASRModel):
37
+ """Faster Whisper ASR model wrapper class. This class is used to transcribe audio and store the transcript and word locations."""
38
+ def __init__(self, model_name:str="base", language:str=None):
39
  self.asr = WhisperModel(model_name, device=device, compute_type=device_compute)
40
  self._transcript = ""
41
  self._word_locations = []
42
  self.sample_rate = sample_rate_resample
43
  self.language = language
44
 
45
+ def processAudio(self, audio:Union[np.ndarray, torch.Tensor]) -> None:
46
+ """Transcribe audio and store the transcript and word locations updating self._transcript and self._word_locations,
47
+ get these values using getTranscript() and getWordLocations() respectively.
48
+
49
+ Args:
50
+ audio (np.ndarray or torch.Tensor): Audio samples to transcribe.
51
+
52
+ Returns:
53
+ None
54
+ """
55
  # 'audio' can be a path to a file or a numpy array of audio samples.
56
  if isinstance(audio, torch.Tensor):
57
  audio = audio.detach().cpu().numpy()
 
71
  self._transcript = " ".join(transcript)
72
 
73
  def getTranscript(self) -> str:
74
+ """Get the transcript of the audio."""
75
  return self._transcript
76
 
77
+ def getWordLocations(self) -> list[ParsedWordInfo]:
78
+ """Get a list of ParsedWordInfo"""
79
  return self._word_locations
lambdaChangeModel.py CHANGED
@@ -1,4 +1,5 @@
1
  import json
 
2
 
3
  import pronunciationTrainer
4
 
@@ -6,7 +7,8 @@ import pronunciationTrainer
6
  trainer_SST_lambda = {'de': pronunciationTrainer.getTrainer("de"), 'en': pronunciationTrainer.getTrainer("en")}
7
 
8
 
9
- def lambda_handler(event, context):
 
10
  data = json.loads(event['body'])
11
  model_name = data['modelName']
12
  trainer_SST_lambda["de"] = pronunciationTrainer.getTrainer("de", model_name=model_name)
 
1
  import json
2
+ from typing import Any
3
 
4
  import pronunciationTrainer
5
 
 
7
  trainer_SST_lambda = {'de': pronunciationTrainer.getTrainer("de"), 'en': pronunciationTrainer.getTrainer("en")}
8
 
9
 
10
+ def lambda_handler(event: dict[str], context: Any) -> str:
11
+ """Lambda handler to change the model used by the pronunciation trainer (Currently not used)"""
12
  data = json.loads(event['body'])
13
  model_name = data['modelName']
14
  trainer_SST_lambda["de"] = pronunciationTrainer.getTrainer("de", model_name=model_name)
lambdaGetSample.py CHANGED
@@ -5,26 +5,44 @@ import pandas as pd
5
 
6
  import RuleBasedModels
7
  from constants import app_logger
 
8
 
9
 
10
  class TextDataset:
11
- def __init__(self, table, language):
 
12
  self.table_dataframe = table
13
  self.language = language
14
 
15
- def __getitem__(self, idx):
16
  line = [self.table_dataframe['sentence'].iloc[idx]]
17
  return line
18
 
19
- def __len__(self):
20
  return len(self.table_dataframe)
21
 
22
- def get_category_from_df(self, category_value:int):
 
 
 
 
 
 
 
 
23
  selector = self.table_dataframe["category"] == category_value
24
  df_by_category = self.table_dataframe[selector]
25
  return df_by_category
26
 
27
- def get_random_sample_from_df(self, category_value:int):
 
 
 
 
 
 
 
 
28
  app_logger.info(f"language={self.language}, category_value={category_value}.")
29
  choice = self.table_dataframe.sample(n=1)
30
  if category_value !=0:
@@ -49,11 +67,11 @@ for lang in available_languages:
49
  lambda_translate_new_sample = False
50
 
51
 
52
- def lambda_handler(event, context):
53
  """
54
  lambda handler to return a random text sample from the dataset.
55
 
56
- Parameters:
57
  event (dict): The event data passed to the Lambda function.
58
  context (dict): The context in which the Lambda function is called.
59
 
@@ -87,20 +105,20 @@ def lambda_handler(event, context):
87
  raise ex
88
 
89
 
90
- def get_random_selection(language: str, category: int) -> str:
91
  """
92
  Get a random text sample from the dataset.
93
 
94
- Parameters:
95
  language (str): The language code.
96
- category (int): The category value to filter the dataset.
97
 
98
  Returns:
99
  str: The selected text sample.
100
  """
101
  lambda_df_lang = lambda_database[language]
102
- current_transcript = lambda_df_lang.get_random_sample_from_df(category)
103
- app_logger.info(f"category={category}, language={language}, current_transcript={current_transcript}.")
104
  return current_transcript[0]
105
 
106
 
@@ -121,7 +139,7 @@ def get_enriched_dataframe_csv(
121
  """
122
  Read a csv dataframe adding a 'category' column.
123
 
124
- Parameters:
125
  language (str): The language code (e.g. "de" for German).
126
  custom_dataframe_csv_filename_no_ext (str): The csv dataframe without extension.
127
  custom_folder (Path): The folder containing the csv dataframe.
 
5
 
6
  import RuleBasedModels
7
  from constants import app_logger
8
+ from typing_hints import Category
9
 
10
 
11
  class TextDataset:
12
+ """Sentences dataset."""
13
+ def __init__(self, table: pd.DataFrame, language: str):
14
  self.table_dataframe = table
15
  self.language = language
16
 
17
+ def __getitem__(self, idx) -> list[str]:
18
  line = [self.table_dataframe['sentence'].iloc[idx]]
19
  return line
20
 
21
+ def __len__(self) -> int:
22
  return len(self.table_dataframe)
23
 
24
+ def get_category_from_df(self, category_value:Category) -> pd.DataFrame:
25
+ """Filter the sentence dataframe by category returning
26
+
27
+ Args:
28
+ category_value (int): The category value to filter the dataframe.
29
+
30
+ Returns:
31
+ pd.DataFrame: The filtered dataframe.
32
+ """
33
  selector = self.table_dataframe["category"] == category_value
34
  df_by_category = self.table_dataframe[selector]
35
  return df_by_category
36
 
37
+ def get_random_sample_from_df(self, category_value:Category) -> list[str]:
38
+ """Get a random sentence from the category filtered dataframe.
39
+
40
+ Args:
41
+ category_value (int): The category value to filter the dataframe.
42
+
43
+ Returns:
44
+ list: A list with the selected sentence.
45
+ """
46
  app_logger.info(f"language={self.language}, category_value={category_value}.")
47
  choice = self.table_dataframe.sample(n=1)
48
  if category_value !=0:
 
67
  lambda_translate_new_sample = False
68
 
69
 
70
+ def lambda_handler(event: dict[str], context) -> str:
71
  """
72
  lambda handler to return a random text sample from the dataset.
73
 
74
+ Args:
75
  event (dict): The event data passed to the Lambda function.
76
  context (dict): The context in which the Lambda function is called.
77
 
 
105
  raise ex
106
 
107
 
108
+ def get_random_selection(language: str, category_value: Category) -> str:
109
  """
110
  Get a random text sample from the dataset.
111
 
112
+ Args:
113
  language (str): The language code.
114
+ category_value (int): The category value to filter the dataset.
115
 
116
  Returns:
117
  str: The selected text sample.
118
  """
119
  lambda_df_lang = lambda_database[language]
120
+ current_transcript = lambda_df_lang.get_random_sample_from_df(category_value)
121
+ app_logger.info(f"category_value={category_value}, language={language}, current_transcript={current_transcript}.")
122
  return current_transcript[0]
123
 
124
 
 
139
  """
140
  Read a csv dataframe adding a 'category' column.
141
 
142
+ Args:
143
  language (str): The language code (e.g. "de" for German).
144
  custom_dataframe_csv_filename_no_ext (str): The csv dataframe without extension.
145
  custom_folder (Path): The folder containing the csv dataframe.
lambdaSpeechToScore.py CHANGED
@@ -24,16 +24,16 @@ trainer_SST_lambda = {'de': pronunciationTrainer.getTrainer("de"), 'en': pronunc
24
  transform = Resample(orig_freq=sample_rate_start, new_freq=sample_rate_resample)
25
 
26
 
27
- def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
28
  """
29
  Lambda handler for speech-to-score.
30
 
31
- Parameters:
32
  event (Dict[str, Any]): The event data containing the request body.
33
  context (Any): The context in which the lambda function is executed.
34
 
35
  Returns:
36
- Dict[str, Any]: The response containing the speech-to-score results.
37
  """
38
  body = event['body']
39
  data = json.loads(body)
@@ -67,7 +67,7 @@ def get_speech_to_score_dict(
67
  """
68
  Process the audio file and return a dictionary with speech-to-score results.
69
 
70
- Parameters:
71
  use_dtw:
72
  real_text (str): The text to be matched with the audio.
73
  file_bytes_or_audiotmpfile (str | bytes | dict): The audio file in bytes or a temporary file.
@@ -184,7 +184,7 @@ def get_speech_to_score_tuple(real_text: str, file_bytes_or_audiotmpfile: str |
184
  """
185
  Process the audio file and return a tuple with speech-to-score results.
186
 
187
- Parameters:
188
  real_text (str): The text to be matched with the audio.
189
  file_bytes_or_audiotmpfile (str | dict): The audio file in bytes or a temporary file.
190
  language (str): The language of the audio.
@@ -227,7 +227,7 @@ def soundfile_write(audiofile: str | Path, data: np.ndarray, samplerate: int) ->
227
  """
228
  Write audio data to a file using soundfile.
229
 
230
- Parameters:
231
  audiofile (str | Path): The path to the audio file.
232
  data (np.ndarray): The audio data to write.
233
  samplerate (int): The sample rate of the audio data.
@@ -243,7 +243,7 @@ def get_selected_word(idx_recorded_word: int, raw_json_output: str) -> tuple[str
243
  """
244
  Get the selected word, its audio file, and duration from the recognition output.
245
 
246
- Parameters:
247
  idx_recorded_word (int): The index of the recorded word.
248
  raw_json_output (str): The JSON output from the recognition process.
249
 
@@ -267,7 +267,7 @@ def get_splitted_audio_file(audiotmpfile: str | Path, start_time: list[float], e
267
  """
268
  Split the audio file into segments based on start and end times.
269
 
270
- Parameters:
271
  audiotmpfile (str | Path): The path to the audio file.
272
  start_time (list[float]): The start times of the segments.
273
  end_time (list[float]): The end times of the segments.
@@ -296,7 +296,7 @@ def get_file_with_custom_suffix(basefile: str | Path, custom_suffix: str) -> Pat
296
  """
297
  Generate a file path with a custom suffix.
298
 
299
- Parameters:
300
  basefile (str | Path): The base file path.
301
  custom_suffix (str): The custom suffix to add to the file name.
302
 
@@ -315,7 +315,7 @@ def calc_start_end(sr_native: int, time_position: float, n_channels: int) -> int
315
  """
316
  Calculate the start or end position in samples.
317
 
318
- Parameters:
319
  sr_native (int): The native sample rate.
320
  time_position (float): The time position in seconds.
321
  n_channels (int): The number of audio channels.
@@ -330,7 +330,7 @@ def soundfile_load(path: str | Path, offset: float = 0.0, duration: float = None
330
  """
331
  Load an audio buffer using soundfile.
332
 
333
- Parameters:
334
  path (str | Path): The path to the audio file.
335
  offset (float): The offset in seconds to start reading the file.
336
  duration (float): The duration in seconds to read from the file.
@@ -369,7 +369,7 @@ def audioread_load(path: str | Path, offset: float = 0.0, duration: float = None
369
  """
370
  This loads one block at a time, and then concatenates the results.
371
 
372
- Parameters:
373
  path (str | Path): The path to the audio file.
374
  offset (float): The offset in seconds to start reading the file.
375
  duration (float): The duration in seconds to read from the file.
 
24
  transform = Resample(orig_freq=sample_rate_start, new_freq=sample_rate_resample)
25
 
26
 
27
+ def lambda_handler(event: dict[str], context: Any) -> str:
28
  """
29
  Lambda handler for speech-to-score.
30
 
31
+ Args:
32
  event (Dict[str, Any]): The event data containing the request body.
33
  context (Any): The context in which the lambda function is executed.
34
 
35
  Returns:
36
+ str: The json response containing the speech-to-score results.
37
  """
38
  body = event['body']
39
  data = json.loads(body)
 
67
  """
68
  Process the audio file and return a dictionary with speech-to-score results.
69
 
70
+ Args:
71
  use_dtw:
72
  real_text (str): The text to be matched with the audio.
73
  file_bytes_or_audiotmpfile (str | bytes | dict): The audio file in bytes or a temporary file.
 
184
  """
185
  Process the audio file and return a tuple with speech-to-score results.
186
 
187
+ Args:
188
  real_text (str): The text to be matched with the audio.
189
  file_bytes_or_audiotmpfile (str | dict): The audio file in bytes or a temporary file.
190
  language (str): The language of the audio.
 
227
  """
228
  Write audio data to a file using soundfile.
229
 
230
+ Args:
231
  audiofile (str | Path): The path to the audio file.
232
  data (np.ndarray): The audio data to write.
233
  samplerate (int): The sample rate of the audio data.
 
243
  """
244
  Get the selected word, its audio file, and duration from the recognition output.
245
 
246
+ Args:
247
  idx_recorded_word (int): The index of the recorded word.
248
  raw_json_output (str): The JSON output from the recognition process.
249
 
 
267
  """
268
  Split the audio file into segments based on start and end times.
269
 
270
+ Args:
271
  audiotmpfile (str | Path): The path to the audio file.
272
  start_time (list[float]): The start times of the segments.
273
  end_time (list[float]): The end times of the segments.
 
296
  """
297
  Generate a file path with a custom suffix.
298
 
299
+ Args:
300
  basefile (str | Path): The base file path.
301
  custom_suffix (str): The custom suffix to add to the file name.
302
 
 
315
  """
316
  Calculate the start or end position in samples.
317
 
318
+ Args:
319
  sr_native (int): The native sample rate.
320
  time_position (float): The time position in seconds.
321
  n_channels (int): The number of audio channels.
 
330
  """
331
  Load an audio buffer using soundfile.
332
 
333
+ Args:
334
  path (str | Path): The path to the audio file.
335
  offset (float): The offset in seconds to start reading the file.
336
  duration (float): The duration in seconds to read from the file.
 
369
  """
370
  This loads one block at a time, and then concatenates the results.
371
 
372
+ Args:
373
  path (str | Path): The path to the audio file.
374
  offset (float): The offset in seconds to start reading the file.
375
  duration (float): The duration in seconds to read from the file.
models.py CHANGED
@@ -19,6 +19,16 @@ default_speaker_dict = {
19
 
20
 
21
  def getASRModel(language: str, model_name: str = MODEL_NAME_DEFAULT) -> IASRModel:
 
 
 
 
 
 
 
 
 
 
22
  models_dict = {
23
  "whisper": __get_model_whisper,
24
  "faster_whisper": __get_model_faster_whisper,
@@ -63,6 +73,7 @@ def __eval_apply_neural_asr(model: nn.Module, decoder: Decoder, language: str):
63
 
64
 
65
  def getTranslationModel(language: str) -> nn.Module:
 
66
  from transformers import AutoTokenizer
67
  from transformers import AutoModelForSeq2SeqLM
68
  if language == 'de':
 
19
 
20
 
21
  def getASRModel(language: str, model_name: str = MODEL_NAME_DEFAULT) -> IASRModel:
22
+ """Wrapper function to get the ASR model based on the model name and language.
23
+ Currently supported models are 'whisper', 'faster_whisper', and 'silero'.
24
+
25
+ Args:
26
+ language: str: The language of the model.
27
+ model_name: str: The name of the model to use. Default is 'whisper'.
28
+
29
+ Returns:
30
+ IASRModel: The ASR model instance.
31
+ """
32
  models_dict = {
33
  "whisper": __get_model_whisper,
34
  "faster_whisper": __get_model_faster_whisper,
 
73
 
74
 
75
  def getTranslationModel(language: str) -> nn.Module:
76
+ """Wrapper function to get the translation model based on the language."""
77
  from transformers import AutoTokenizer
78
  from transformers import AutoModelForSeq2SeqLM
79
  if language == 'de':
pronunciationTrainer.py CHANGED
@@ -13,26 +13,25 @@ import models as mo
13
  from constants import app_logger, MODEL_NAME_DEFAULT, sample_rate_resample
14
 
15
 
16
- def getTrainer(language: str, model_name: str = MODEL_NAME_DEFAULT):
17
- asr_model = mo.getASRModel(language, model_name=model_name)
18
- if language == 'de':
19
- phonem_converter = RuleBasedModels.EpitranPhonemConverter(epitran.Epitran('deu-Latn'))
20
- elif language == 'en':
21
- phonem_converter = RuleBasedModels.EngPhonemConverter()
22
- else:
23
- raise ValueError(f"Language '{language}' not implemented")
24
- trainer = PronunciationTrainer(asr_model, phonem_converter)
25
-
26
- return trainer
27
 
 
 
28
 
29
- def preprocessAudioStandalone(audio: torch.tensor) -> torch.tensor:
 
 
30
  audio = audio-torch.mean(audio)
31
  audio = audio/torch.max(torch.abs(audio))
32
  return audio
33
 
34
 
35
  class PronunciationTrainer:
 
 
 
36
  current_transcript: str
37
  current_ipa: str
38
 
@@ -46,11 +45,26 @@ class PronunciationTrainer:
46
  sampling_rate = sample_rate_resample
47
 
48
  def __init__(self, asr_model: mi.IASRModel, word_to_ipa_coverter: mi.ITextToPhonemModel) -> None:
 
 
 
 
 
 
 
49
  self.asr_model = asr_model
50
  self.ipa_converter = word_to_ipa_coverter
51
 
52
- def getTranscriptAndWordsLocations(self, audio_length_in_samples: int):
 
 
 
 
 
53
 
 
 
 
54
  audio_transcript = self.asr_model.getTranscript()
55
  word_locations_in_samples = self.asr_model.getWordLocations()
56
 
@@ -77,8 +91,17 @@ class PronunciationTrainer:
77
 
78
  ##################### ASR Functions ###########################
79
 
80
- def processAudioForGivenText(self, recordedAudio: torch.Tensor = None, real_text=None):
 
 
 
 
 
 
81
 
 
 
 
82
  start = time.time()
83
  recording_transcript, recording_ipa, word_locations = self.getAudioTranscript(
84
  recordedAudio)
@@ -108,7 +131,16 @@ class PronunciationTrainer:
108
 
109
  return result
110
 
111
- def getAudioTranscript(self, recordedAudio: torch.Tensor = None):
 
 
 
 
 
 
 
 
 
112
  current_recorded_audio = recordedAudio
113
 
114
  current_recorded_audio = self.preprocessAudio(
@@ -124,6 +156,16 @@ class PronunciationTrainer:
124
  return current_recorded_transcript, current_recorded_ipa, current_recorded_word_locations
125
 
126
  def getWordLocationsFromRecordInSeconds(self, word_locations, mapped_words_indices) -> list:
 
 
 
 
 
 
 
 
 
 
127
  app_logger.info(f"len_list: word_locations:{len(word_locations)}, mapped_words_indices:{len(mapped_words_indices)}, {len(word_locations) == len(mapped_words_indices)}...")
128
  start_time = []
129
  end_time = []
@@ -138,6 +180,16 @@ class PronunciationTrainer:
138
 
139
  ##################### Evaluation Functions ###########################
140
  def matchSampleAndRecordedWords(self, real_text, recorded_transcript):
 
 
 
 
 
 
 
 
 
 
141
  words_estimated = recorded_transcript.split()
142
 
143
  try:
@@ -160,6 +212,15 @@ class PronunciationTrainer:
160
  return real_and_transcribed_words, real_and_transcribed_words_ipa, mapped_words_indices
161
 
162
  def getPronunciationAccuracy(self, real_and_transcribed_words_ipa) -> float:
 
 
 
 
 
 
 
 
 
163
  total_mismatches = 0.
164
  number_of_phonemes = 0.
165
  current_words_pronunciation_accuracy = []
@@ -181,9 +242,27 @@ class PronunciationTrainer:
181
  return np.round(percentage_of_correct_pronunciations), current_words_pronunciation_accuracy
182
 
183
  def removePunctuation(self, word: str) -> str:
 
 
 
 
 
 
 
 
 
184
  return ''.join([char for char in word if char not in punctuation])
185
 
186
  def getWordsPronunciationCategory(self, accuracies) -> list:
 
 
 
 
 
 
 
 
 
187
  categories = []
188
 
189
  for accuracy in accuracies:
@@ -193,7 +272,48 @@ class PronunciationTrainer:
193
  return categories
194
 
195
  def getPronunciationCategoryFromAccuracy(self, accuracy) -> int:
 
 
 
 
 
 
 
 
 
196
  return np.argmin(abs(self.categories_thresholds-accuracy))
197
 
198
  def preprocessAudio(self, audio: torch.tensor) -> torch.tensor:
 
 
 
 
 
 
 
 
 
199
  return preprocessAudioStandalone(audio)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  from constants import app_logger, MODEL_NAME_DEFAULT, sample_rate_resample
14
 
15
 
16
+ def preprocessAudioStandalone(audio: torch.tensor) -> torch.tensor:
17
+ """
18
+ Preprocess the audio by normalizing it.
 
 
 
 
 
 
 
 
19
 
20
+ Args:
21
+ audio (torch.tensor): The input audio tensor.
22
 
23
+ Returns:
24
+ torch.tensor: The normalized audio tensor.
25
+ """
26
  audio = audio-torch.mean(audio)
27
  audio = audio/torch.max(torch.abs(audio))
28
  return audio
29
 
30
 
31
  class PronunciationTrainer:
32
+ """
33
+ A class to train and evaluate pronunciation accuracy using ASR and phoneme conversion models.
34
+ """
35
  current_transcript: str
36
  current_ipa: str
37
 
 
45
  sampling_rate = sample_rate_resample
46
 
47
  def __init__(self, asr_model: mi.IASRModel, word_to_ipa_coverter: mi.ITextToPhonemModel) -> None:
48
+ """
49
+ Initialize the PronunciationTrainer with ASR and phoneme conversion models.
50
+
51
+ Args:
52
+ asr_model (mi.IASRModel): The ASR model to use.
53
+ word_to_ipa_coverter (mi.ITextToPhonemModel): The phoneme conversion model to use.
54
+ """
55
  self.asr_model = asr_model
56
  self.ipa_converter = word_to_ipa_coverter
57
 
58
+ def getTranscriptAndWordsLocations(self, audio_length_in_samples: int) -> tuple[str, list]:
59
+ """
60
+ Get the transcript and word locations from the ASR model.
61
+
62
+ Args:
63
+ audio_length_in_samples (int): The length of the audio in samples.
64
 
65
+ Returns:
66
+ tuple: A tuple containing the audio transcript and word locations in samples.
67
+ """
68
  audio_transcript = self.asr_model.getTranscript()
69
  word_locations_in_samples = self.asr_model.getWordLocations()
70
 
 
91
 
92
  ##################### ASR Functions ###########################
93
 
94
+ def processAudioForGivenText(self, recordedAudio: torch.Tensor = None, real_text=None) -> dict:
95
+ """
96
+ Process the recorded audio and evaluate pronunciation accuracy.
97
+
98
+ Args:
99
+ recordedAudio (torch.Tensor, optional): The recorded audio tensor. Defaults to None.
100
+ real_text (str, optional): The real text to compare against. Defaults to None.
101
 
102
+ Returns:
103
+ dict: A dictionary containing the evaluation results.
104
+ """
105
  start = time.time()
106
  recording_transcript, recording_ipa, word_locations = self.getAudioTranscript(
107
  recordedAudio)
 
131
 
132
  return result
133
 
134
+ def getAudioTranscript(self, recordedAudio: torch.Tensor = None) -> tuple[str | list]:
135
+ """
136
+ Get the transcript and IPA representation of the recorded audio.
137
+
138
+ Args:
139
+ recordedAudio (torch.Tensor, optional): The recorded audio tensor. Defaults to None.
140
+
141
+ Returns:
142
+ tuple: A tuple containing the transcript, IPA representation, and word locations.
143
+ """
144
  current_recorded_audio = recordedAudio
145
 
146
  current_recorded_audio = self.preprocessAudio(
 
156
  return current_recorded_transcript, current_recorded_ipa, current_recorded_word_locations
157
 
158
  def getWordLocationsFromRecordInSeconds(self, word_locations, mapped_words_indices) -> list:
159
+ """
160
+ Get the start and end times of words in the recorded audio in seconds.
161
+
162
+ Args:
163
+ word_locations (list): The word locations in samples.
164
+ mapped_words_indices (list): The indices of the mapped words.
165
+
166
+ Returns:
167
+ list: A list containing the start and end times of words in seconds.
168
+ """
169
  app_logger.info(f"len_list: word_locations:{len(word_locations)}, mapped_words_indices:{len(mapped_words_indices)}, {len(word_locations) == len(mapped_words_indices)}...")
170
  start_time = []
171
  end_time = []
 
180
 
181
  ##################### Evaluation Functions ###########################
182
  def matchSampleAndRecordedWords(self, real_text, recorded_transcript):
183
+ """
184
+ Match the real text with the recorded transcript and get the IPA representations.
185
+
186
+ Args:
187
+ real_text (str): The real text to compare against.
188
+ recorded_transcript (str): The recorded transcript.
189
+
190
+ Returns:
191
+ tuple: A tuple containing the matched words, IPA representations, and mapped word indices.
192
+ """
193
  words_estimated = recorded_transcript.split()
194
 
195
  try:
 
212
  return real_and_transcribed_words, real_and_transcribed_words_ipa, mapped_words_indices
213
 
214
  def getPronunciationAccuracy(self, real_and_transcribed_words_ipa) -> float:
215
+ """
216
+ Calculate the pronunciation accuracy based on the IPA representations.
217
+
218
+ Args:
219
+ real_and_transcribed_words_ipa (list): A list of tuples containing the real and transcribed IPA representations.
220
+
221
+ Returns:
222
+ float: The percentage of correct pronunciations.
223
+ """
224
  total_mismatches = 0.
225
  number_of_phonemes = 0.
226
  current_words_pronunciation_accuracy = []
 
242
  return np.round(percentage_of_correct_pronunciations), current_words_pronunciation_accuracy
243
 
244
  def removePunctuation(self, word: str) -> str:
245
+ """
246
+ Remove punctuation from a word.
247
+
248
+ Args:
249
+ word (str): The input word.
250
+
251
+ Returns:
252
+ str: The word without punctuation.
253
+ """
254
  return ''.join([char for char in word if char not in punctuation])
255
 
256
  def getWordsPronunciationCategory(self, accuracies) -> list:
257
+ """
258
+ Get the pronunciation category for each word based on accuracy.
259
+
260
+ Args:
261
+ accuracies (list): A list of pronunciation accuracies.
262
+
263
+ Returns:
264
+ list: A list of pronunciation categories.
265
+ """
266
  categories = []
267
 
268
  for accuracy in accuracies:
 
272
  return categories
273
 
274
  def getPronunciationCategoryFromAccuracy(self, accuracy) -> int:
275
+ """
276
+ Get the pronunciation category based on accuracy.
277
+
278
+ Args:
279
+ accuracy (float): The pronunciation accuracy.
280
+
281
+ Returns:
282
+ int: The pronunciation category.
283
+ """
284
  return np.argmin(abs(self.categories_thresholds-accuracy))
285
 
286
  def preprocessAudio(self, audio: torch.tensor) -> torch.tensor:
287
+ """
288
+ Preprocess the audio by normalizing it.
289
+
290
+ Args:
291
+ audio (torch.tensor): The input audio tensor.
292
+
293
+ Returns:
294
+ torch.tensor: The normalized audio tensor.
295
+ """
296
  return preprocessAudioStandalone(audio)
297
+
298
+
299
+ def getTrainer(language: str, model_name: str = MODEL_NAME_DEFAULT) -> PronunciationTrainer:
300
+ """
301
+ Get a PronunciationTrainer instance for the specified language and model.
302
+
303
+ Args:
304
+ language (str): The language of the model.
305
+ model_name (str, optional): The name of the model. Defaults to MODEL_NAME_DEFAULT.
306
+
307
+ Returns:
308
+ PronunciationTrainer: An instance of PronunciationTrainer.
309
+ """
310
+ asr_model = mo.getASRModel(language, model_name=model_name)
311
+ if language == 'de':
312
+ phonem_converter = RuleBasedModels.EpitranPhonemConverter(epitran.Epitran('deu-Latn'))
313
+ elif language == 'en':
314
+ phonem_converter = RuleBasedModels.EngPhonemConverter()
315
+ else:
316
+ raise ValueError(f"Language '{language}' not implemented")
317
+ trainer = PronunciationTrainer(asr_model, phonem_converter)
318
+
319
+ return trainer
typing_hints.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Annotated, Optional, TypeAlias, TypedDict
2
+
3
+ import annotated_types
4
+
5
+
6
+ Category: TypeAlias = Annotated[int, annotated_types.Ge(0), annotated_types.Le(4)]
7
+ PositiveFloat: TypeAlias = Annotated[float, annotated_types.Ge(0)]
8
+
9
+
10
+ class ParsedWordInfo(TypedDict):
11
+ word: str
12
+ start_ts: PositiveFloat
13
+ end_ts: PositiveFloat
utilsFileIO.py CHANGED
@@ -1,5 +1,6 @@
1
  import string
2
  import random
 
3
 
4
  from constants import ALLOWED_ORIGIN
5
 
@@ -11,14 +12,13 @@ headers = {
11
  }
12
 
13
 
14
- def generateRandomString(str_length: int = 20):
15
  # printing lowercase
16
  letters = string.ascii_lowercase
17
  return ''.join(random.choice(letters) for i in range(str_length))
18
 
19
 
20
- def return_response(body, mimetype="application/json", status=200):
21
- from flask import Response
22
  return Response(
23
  response=body,
24
  status=status,
@@ -27,5 +27,5 @@ def return_response(body, mimetype="application/json", status=200):
27
  )
28
 
29
 
30
- def return_response_ok(body, mimetype="application/json"):
31
  return return_response(body, mimetype, 200)
 
1
  import string
2
  import random
3
+ from flask import Response
4
 
5
  from constants import ALLOWED_ORIGIN
6
 
 
12
  }
13
 
14
 
15
+ def generateRandomString(str_length: int = 20) -> str:
16
  # printing lowercase
17
  letters = string.ascii_lowercase
18
  return ''.join(random.choice(letters) for i in range(str_length))
19
 
20
 
21
+ def return_response(body, mimetype="application/json", status=200) -> Response:
 
22
  return Response(
23
  response=body,
24
  status=status,
 
27
  )
28
 
29
 
30
+ def return_response_ok(body, mimetype="application/json") -> Response:
31
  return return_response(body, mimetype, 200)
whisper_wrapper.py CHANGED
@@ -8,7 +8,12 @@ from ModelInterfaces import IASRModel
8
  from constants import sample_rate_resample, app_logger
9
 
10
 
11
- def parse_word_info(word_info, sample_rate):
 
 
 
 
 
12
  word = word_info["word"]
13
  start_ts = float(word_info["start"]) * sample_rate
14
  end_ts = float(word_info["end"]) * sample_rate
@@ -16,6 +21,7 @@ def parse_word_info(word_info, sample_rate):
16
 
17
 
18
  class WhisperASRModel(IASRModel):
 
19
  def __init__(self, model_name="base", language=None):
20
  self.asr = whisper.load_model(model_name)
21
  self._transcript = ""
@@ -24,6 +30,15 @@ class WhisperASRModel(IASRModel):
24
  self.language = language
25
 
26
  def processAudio(self, audio:Union[np.ndarray, torch.Tensor]):
 
 
 
 
 
 
 
 
 
27
  # 'audio' can be a path to a file or a numpy array of audio samples.
28
  if isinstance(audio, torch.Tensor):
29
  audio = audio.detach().cpu().numpy()
@@ -41,7 +56,9 @@ class WhisperASRModel(IASRModel):
41
  app_logger.info(f"elaborated segment {segment['id']}/{len_segments-1}: type={type(segment)}, len(words):{len(words)}, text:{segment['text']} #")
42
 
43
  def getTranscript(self) -> str:
 
44
  return self._transcript
45
 
46
- def getWordLocations(self) -> list:
 
47
  return self._word_locations
 
8
  from constants import sample_rate_resample, app_logger
9
 
10
 
11
+ def parse_word_info(word_info: dict, sample_rate: int) -> dict:
12
+ """Parse a word info object from WhisperModel into a dictionary with start and end timestamps.
13
+
14
+ Args:
15
+ word_info (dict): Word dictionary object
16
+ """
17
  word = word_info["word"]
18
  start_ts = float(word_info["start"]) * sample_rate
19
  end_ts = float(word_info["end"]) * sample_rate
 
21
 
22
 
23
  class WhisperASRModel(IASRModel):
24
+ """Whisper ASR model wrapper class. This class is used to transcribe audio and store the transcript and word locations."""
25
  def __init__(self, model_name="base", language=None):
26
  self.asr = whisper.load_model(model_name)
27
  self._transcript = ""
 
30
  self.language = language
31
 
32
  def processAudio(self, audio:Union[np.ndarray, torch.Tensor]):
33
+ """Transcribe audio and store the transcript and word locations updating self._transcript and self._word_locations,
34
+ get these values using getTranscript() and getWordLocations() respectively.
35
+
36
+ Args:
37
+ audio (np.ndarray or torch.Tensor): Audio samples to transcribe.
38
+
39
+ Returns:
40
+ None
41
+ """
42
  # 'audio' can be a path to a file or a numpy array of audio samples.
43
  if isinstance(audio, torch.Tensor):
44
  audio = audio.detach().cpu().numpy()
 
56
  app_logger.info(f"elaborated segment {segment['id']}/{len_segments-1}: type={type(segment)}, len(words):{len(words)}, text:{segment['text']} #")
57
 
58
  def getTranscript(self) -> str:
59
+ """Get the transcript of the audio."""
60
  return self._transcript
61
 
62
+ def getWordLocations(self) -> list[dict]:
63
+ """Get the word locations of the audio."""
64
  return self._word_locations