Spaces:
Sleeping
Sleeping
import os | |
import re | |
import warnings | |
import gradio as gr | |
from transformers import pipeline, AutoProcessor | |
from pyctcdecode import build_ctcdecoder | |
from transformers import Wav2Vec2ProcessorWithLM | |
from indictrans import Transliterator | |
# Initialize ASR pipelines | |
asr_models = { | |
"Odiya": pipeline(task="automatic-speech-recognition", model="cdactvm/w2v-bert-2.0-odia_v1"), | |
"Odiya-trans": pipeline(task="automatic-speech-recognition", model="cdactvm/w2v-bert-2.0-odia_v1"), | |
"Hindi": pipeline(task="automatic-speech-recognition", model="cdactvm/w2v-bert-2.0-hindi_v1"), | |
"Hindi-trans": pipeline(task="automatic-speech-recognition", model="cdactvm/w2v-bert-2.0-hindi_v1"), | |
# Add other models similarly | |
# "Kannada": pipeline(...), | |
# "Telugu": pipeline(...), | |
# "Bangala": pipeline(...), | |
"Assamese-Model2": pipeline(task="automatic-speech-recognition", model="cdactvm/w2v-assames"), | |
} | |
# Initialize Assamese model with Language Model | |
processor = AutoProcessor.from_pretrained("cdactvm/w2v-assames") | |
vocab_dict = processor.tokenizer.get_vocab() | |
sorted_vocab_dict = {k.lower(): v for k, v in sorted(vocab_dict.items(), key=lambda item: item[1])} | |
decoder = build_ctcdecoder(labels=list(sorted_vocab_dict.keys()), kenlm_model_path="lm.binary") | |
processor_with_lm = Wav2Vec2ProcessorWithLM(feature_extractor=processor.feature_extractor, | |
tokenizer=processor.tokenizer, | |
decoder=decoder) | |
asr_models["Assamese-LM"] = pipeline("automatic-speech-recognition", model="cdactvm/w2v-assames", | |
tokenizer=processor_with_lm, | |
feature_extractor=processor_with_lm.feature_extractor, | |
decoder=processor_with_lm.decoder) | |
# Initialize Transliterator | |
transliterators = { | |
"Odiya-trans": Transliterator(source='ori', target='eng', build_lookup=True), | |
"Hindi-trans": Transliterator(source='hin', target='eng', build_lookup=True), | |
# Add other transliterators similarly | |
} | |
# Function to clean HTML tags from text | |
def cleanhtml(raw_html): | |
return re.sub(r'<.*?>', '', raw_html) | |
# Transcribe audio using the selected model | |
def transcribe(lng, speech, transliterate=False): | |
model = asr_models.get(lng) | |
if not model: | |
return f"Unsupported language: {lng}" | |
result = model(speech) | |
text = result.get("text") | |
if text is None: | |
return "Error: ASR returned None" | |
if transliterate: | |
trn = transliterators.get(lng + "-trans") | |
if not trn: | |
return f"Transliterator not available for: {lng}" | |
sentence = trn.transform(text) | |
if sentence is None: | |
return "Error: Transliteration returned None" | |
return process_transcription(sentence) | |
return cleanhtml(text) | |
# Function to process and correct transcriptions | |
def process_transcription(sentence): | |
replaced_words = replace_words(sentence) | |
processed_sentence = process_doubles(replaced_words) | |
return convert_to_text(processed_sentence) | |
# Replace incorrectly spelled words | |
def replace_words(sentence): | |
replacements = [ | |
(r'\bjiro\b', 'zero'), (r'\bjero\b', 'zero'), | |
(r'\bnn\b', 'one'), (r'\bn\b', 'one'), (r'\bvan\b', 'one'), (r'\bna\b', 'one'), (r'\bek\b', 'one'), | |
(r'\btu\b', 'two'), (r'\btoo\b', 'two'), (r'\bdo\b', 'two'), | |
(r'\bthiri\b', 'three'), (r'\btiri\b', 'three'), (r'\bdubalathri\b', 'double three'), (r'\btin\b', 'three'), | |
(r'\bfor\b', 'four'), (r'\bfore\b', 'four'), | |
(r'\bfib\b', 'five'), (r'\bpaanch\b', 'five'), | |
(r'\bchha\b', 'six'), (r'\bchhah\b', 'six'), (r'\bchau\b', 'six'), | |
(r'\bdublseven\b', 'double seven'), (r'\bsath\b', 'seven'), | |
(r'\baath\b', 'eight'), | |
(r'\bnau\b', 'nine'), | |
(r'\bdas\b', 'ten'), | |
(r'\bnineeit\b', 'nine eight'), | |
(r'\bfipeit\b', 'five eight'), (r'\bdubal\b', 'double'), (r'\bsevenatu\b', 'seven two'), | |
] | |
for pattern, replacement in replacements: | |
sentence = re.sub(pattern, replacement, sentence) | |
return sentence | |
# Process "double" followed by a number | |
def process_doubles(sentence): | |
tokens = sentence.split() | |
result = [] | |
i = 0 | |
while i < len(tokens): | |
if tokens[i] in ("double", "dubal") and i + 1 < len(tokens): | |
result.extend([tokens[i + 1]] * 2) | |
i += 2 | |
else: | |
result.append(tokens[i]) | |
i += 1 | |
return ' '.join(result) | |
# Convert Soundex code back to text | |
def convert_to_text(input_sentence): | |
word_to_code_map = {} | |
transcript = sentence_to_transcript(input_sentence, word_to_code_map) | |
if transcript is None: | |
return "Error: Transcript conversion returned None" | |
numbers = text2int(transcript) | |
if numbers is None: | |
return "Error: Text to number conversion returned None" | |
code_to_word_map = {v: k for k, v in word_to_code_map.items()} | |
return transcript_to_sentence(numbers, code_to_word_map) | |
# Convert text to numerical representation | |
def text2int(textnum, numwords={}): | |
units = ['Z600', 'O500', 'T000', 'T600', 'F600', 'F100', 'S220', 'S150', 'E300', 'N500', | |
'T500', 'E415', 'T410', 'T635', 'F635', 'F135', 'S235', 'S153', 'E235', 'N535'] | |
tens = ['', '', 'T537', 'T637', 'F637', 'F137', 'S230', 'S153', 'E230', 'N530'] | |
scales = ['H536', 'T253', 'M450', 'C600'] | |
ordinal_words = {'oh': 'Z600', 'first': 'O500', 'second': 'T000', 'third': 'T600', 'fourth': 'F600', 'fifth': 'F100', | |
'sixth': 'S200', 'seventh': 'S150', 'eighth': 'E230', 'ninth': 'N500', 'twelfth': 'T410'} | |
ordinal_endings = [('ieth', 'y'), ('th', '')] | |
if not numwords: | |
numwords['and'] = (1, 0) | |
for idx, word in enumerate(units): numwords[word] = (1, idx) | |
for idx, word in enumerate(tens): numwords[word] = (1, idx * 10) | |
for idx, word in enumerate(scales): numwords[word] = (10 ** (idx * 3 or 2), 0) | |
textnum = textnum.replace('-', ' ') | |
current = result = 0 | |
curstring = '' | |
onnumber = False | |
lastunit = False | |
lastscale = False | |
def is_numword(x): | |
if is_number(x): | |
return True | |
if word in numwords: | |
return True | |
return False | |
def from_numword(x): | |
if is_number(x): | |
scale = 0 | |
increment = int(x.replace(',', '')) | |
return scale, increment | |
return numwords[x] | |
for word in textnum.split(): | |
if word in ordinal_words: | |
scale, increment = (1, ordinal_words[word]) | |
current = current * scale + increment | |
if scale > 100: | |
result += current | |
current = 0 | |
lastunit = True | |
lastscale = False | |
onnumber = True | |
else: | |
for ending, replacement in ordinal_endings: | |
if word.endswith(ending): | |
word = "%s%s" % (word[:-len(ending)], replacement) | |
if not is_numword(word) or (word == 'and' and not lastscale): | |
if onnumber: | |
curstring += repr(result + current) + " " | |
curstring += word | |
if word[-1] != '-': | |
curstring += " " | |
result = current = 0 | |
onnumber = False | |
lastunit = False | |
lastscale = False | |
else: | |
scale, increment = from_numword(word) | |
onnumber = True | |
if lastunit and (word in units or word in ordinal_words): | |
curstring += repr(result + current) | |
result = current = 0 | |
if scale > 1: | |
current = max(1, current) | |
current = current * scale + increment | |
if scale > 100: | |
result += current | |
current = 0 | |
lastunit = word in units | |
lastscale = word in scales | |
if onnumber: | |
curstring += repr(result + current) | |
return curstring | |
# Check if a word is a number | |
def is_number(s): | |
try: | |
float(s.replace(',', '')) | |
return True | |
except ValueError: | |
return False | |
# Convert sentence to transcript using Soundex | |
def sentence_to_transcript(sentence, word_to_code_map): | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore") | |
from metaphone import doublemetaphone | |
transcript = [] | |
for word in sentence.split(): | |
codes = doublemetaphone(word) | |
word_code = next((code for code in codes if code), None) | |
if not word_code: | |
continue | |
if word_code not in word_to_code_map: | |
word_to_code_map[word] = word_code | |
transcript.append(word_code) | |
return ' '.join(transcript) | |
# Convert transcript back to sentence using Soundex | |
def transcript_to_sentence(transcript, code_to_word_map): | |
sentence = [] | |
for code in transcript.split(): | |
word = code_to_word_map.get(code, '') | |
if word: | |
sentence.append(word) | |
return ' '.join(sentence) | |
# Set theme colors for Gradio interface | |
theme_colors = { | |
"bg_color": "#0E1117", | |
"bg_secondary_color": "#161A25", | |
"input_color": "#161A25", | |
"input_text_color": "#C0C0BF", | |
"button_color": "#4A6AF2", | |
"button_primary_text_color": "#FFFFFF", | |
"button_secondary_color": "#A0A0A0", | |
"button_secondary_text_color": "#000000" | |
} | |
# Apply theme to Gradio blocks | |
def apply_theme(demo): | |
demo.set_theme({ | |
"background_color": theme_colors["bg_color"], | |
"secondary_background_color": theme_colors["bg_secondary_color"], | |
"input_background_color": theme_colors["input_color"], | |
"input_text_color": theme_colors["input_text_color"], | |
"button_primary_background_color": theme_colors["button_color"], | |
"button_primary_text_color": theme_colors["button_primary_text_color"], | |
"button_secondary_background_color": theme_colors["button_secondary_color"], | |
"button_secondary_text_color": theme_colors["button_secondary_text_color"] | |
}) | |
# Create Gradio interface | |
with gr.Blocks() as demo: | |
apply_theme(demo) | |
gr.Markdown("<h1><center>Test</center></h1>") | |
with gr.Row(): | |
language = gr.Dropdown(list(asr_models.keys()), label="Language", value="Hindi") | |
speech_input = gr.Audio(source="microphone", type="filepath", label="Speech") | |
text_output = gr.Textbox(label="Output") | |
submit_btn = gr.Button("Submit") | |
def process_audio(lang, speech): | |
transliterate = lang.endswith("-trans") | |
return transcribe(lang, speech, transliterate) | |
submit_btn.click(process_audio, inputs=[language, speech_input], outputs=text_output) | |
# Launch the Gradio app on a different port | |
demo.launch(server_port=7861) | |