Spaces:
Running
Running
import os | |
import json | |
import gradio as gr | |
import shutil | |
import subprocess | |
import requests | |
import tarfile | |
from pathlib import Path | |
import soundfile as sf | |
import sherpa_onnx | |
from deep_translator import GoogleTranslator | |
import numpy as np | |
from iso639 import Lang | |
import pycountry | |
# Load model JSON | |
MODEL_JSON_URL = "https://github.com/willwade/tts-wrapper/blob/main/tts_wrapper/engines/sherpaonnx/merged_models.json" | |
MODEL_JSON_PATH = "./models.json" | |
# Load models | |
if not os.path.exists(MODEL_JSON_PATH): | |
response = requests.get(MODEL_JSON_URL.replace("/blob/", "/raw/")) | |
with open(MODEL_JSON_PATH, "w") as f: | |
f.write(response.text) | |
with open(MODEL_JSON_PATH, "r") as f: | |
models = json.load(f) | |
def get_model_display_info(model_info): | |
"""Create a display string for a model.""" | |
# Get language info | |
lang_info = model_info.get('language', [{}])[0] | |
lang_name = lang_info.get('language_name', lang_info.get('Language Name', 'Unknown')) | |
lang_code = lang_info.get('lang_code', lang_info.get('Iso Code', 'Unknown')) | |
# Get model info | |
voice_name = model_info.get('name', model_info.get('id', 'Unknown')) | |
developer = model_info.get('developer', '') | |
quality = model_info.get('quality', 'MMS' if 'mms' in voice_name.lower() else '') | |
# Create display name | |
model_display = f"{voice_name} ({developer}" | |
if quality: | |
model_display += f" - {quality}" | |
model_display += ")" | |
# Combine language and model info | |
return f"{lang_name} ({lang_code}) | {model_display}" | |
# Group models by language | |
models_by_lang = {} | |
for model_id, model_info in models.items(): | |
# Get language info from the first language in the list | |
lang_info = model_info.get('language', [{}])[0] | |
lang_name = lang_info.get('language_name', lang_info.get('Language Name', 'Unknown')) | |
lang_code = lang_info.get('lang_code', lang_info.get('Iso Code', 'Unknown')) | |
group_key = f"{lang_name} ({lang_code})" | |
if group_key not in models_by_lang: | |
models_by_lang[group_key] = [] | |
# Add model to language group | |
models_by_lang[group_key].append((get_model_display_info(model_info), model_id)) | |
# Create dropdown choices with model IDs as values | |
dropdown_choices = [] | |
models_by_display = {} # Map display names to model IDs | |
for lang, model_list in sorted(models_by_lang.items()): | |
# Add all models in this language group | |
for display_name, model_id in sorted(model_list): | |
dropdown_choices.append(display_name) | |
models_by_display[display_name] = model_id | |
def get_language_code(model_info): | |
"""Get the language code.""" | |
if not model_info.get("language"): | |
return None | |
lang_info = model_info["language"][0] | |
# Try both key formats for language code | |
lang_code = lang_info.get("lang_code", lang_info.get("Iso Code", "")).lower() | |
return lang_code | |
# Special cases for codes not in ISO standard | |
SPECIAL_CODES = { | |
"cmn": "zh", # Mandarin Chinese | |
"yue": "zh", # Cantonese | |
"pi": "el", # Pali (using Greek for this model) | |
"guj": "gu", # Gujarati | |
} | |
def get_translate_code(iso_code): | |
"""Convert ISO code to Google Translate code.""" | |
if not iso_code: | |
return None | |
# Remove any script or dialect specifiers | |
base_code = iso_code.split('-')[0].lower() | |
# Check special cases first | |
if base_code in SPECIAL_CODES: | |
return SPECIAL_CODES[base_code] | |
try: | |
# Try to get the ISO 639-1 (2-letter) code | |
lang = Lang(base_code) | |
return lang.pt1 | |
except: | |
# If that fails, try to find a matching language in pycountry | |
try: | |
lang = pycountry.languages.get(alpha_3=base_code) | |
if lang and hasattr(lang, 'alpha_2'): | |
return lang.alpha_2 | |
except: | |
pass | |
# If all else fails, try to use the original code | |
if len(base_code) == 2: | |
return base_code | |
return None | |
def translate_text(input_text, source_lang="en", target_lang="en"): | |
"""Translate text using Google Translator.""" | |
try: | |
# If source and target are the same, or if target is English, return as is | |
if source_lang == target_lang or target_lang == "en": | |
return input_text | |
first_error = None | |
# Try with original language code | |
try: | |
translated = GoogleTranslator(source=source_lang, target=target_lang).translate(input_text) | |
return translated | |
except Exception as e: | |
first_error = e | |
print(f"First translation attempt failed: {str(e)}") | |
# Try with 'auto' as source language | |
try: | |
translated = GoogleTranslator(source='auto', target=target_lang).translate(input_text) | |
return translated | |
except Exception as e: | |
print(f"Second translation attempt failed: {str(e)}") | |
if first_error: | |
raise first_error | |
raise e | |
except Exception as e: | |
print(f"Translation error: {str(e)} for target language: {target_lang}") | |
print(f"Attempted to use language code: {target_lang}") | |
return input_text | |
def detect_language(text): | |
"""Detect the language of the input text using Google Translator.""" | |
try: | |
return GoogleTranslator().detect(text).lower() | |
except: | |
return "en" # Default to English if detection fails | |
def download_and_extract_model(url, destination): | |
"""Download and extract the model files.""" | |
print(f"Downloading from URL: {url}") | |
print(f"Destination: {destination}") | |
# Convert Hugging Face URL format if needed | |
if "huggingface.co" in url: | |
# Replace /tree/main/ with /resolve/main/ for direct file download | |
base_url = url.replace("/tree/main/", "/resolve/main/") | |
model_id = base_url.split("/")[-1] | |
# Check if this is an MMS model | |
is_mms_model = "mms-tts-multilingual-models-onnx" in url | |
if is_mms_model: | |
# MMS models have both model.onnx and tokens.txt | |
model_url = f"{base_url}/model.onnx" | |
tokens_url = f"{base_url}/tokens.txt" | |
# Download model.onnx | |
print("Downloading model.onnx...") | |
model_path = os.path.join(destination, "model.onnx") | |
response = requests.get(model_url, stream=True) | |
if response.status_code != 200: | |
raise Exception(f"Failed to download model from {model_url}. Status code: {response.status_code}") | |
total_size = int(response.headers.get('content-length', 0)) | |
block_size = 8192 | |
downloaded = 0 | |
print(f"Total size: {total_size / (1024*1024):.1f} MB") | |
with open(model_path, "wb") as f: | |
for chunk in response.iter_content(chunk_size=block_size): | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
if total_size > 0: | |
percent = int((downloaded / total_size) * 100) | |
if percent % 10 == 0: | |
print(f" {percent}%", end="", flush=True) | |
print("\nModel download complete") | |
# Download tokens.txt | |
print("Downloading tokens.txt...") | |
tokens_path = os.path.join(destination, "tokens.txt") | |
response = requests.get(tokens_url, stream=True) | |
if response.status_code != 200: | |
raise Exception(f"Failed to download tokens from {tokens_url}. Status code: {response.status_code}") | |
with open(tokens_path, "wb") as f: | |
f.write(response.content) | |
print("Tokens download complete") | |
return | |
else: | |
# Other models are stored as tar.bz2 files | |
url = f"{base_url}.tar.bz2" | |
# Try the URL | |
response = requests.get(url, stream=True) | |
if response.status_code != 200: | |
raise Exception(f"Failed to download model from {url}. Status code: {response.status_code}") | |
# Check if this is a Git LFS file pointer | |
content_start = response.content[:100].decode('utf-8', errors='ignore') | |
if content_start.startswith('version https://git-lfs.github.com/spec/v1'): | |
raise Exception(f"Received Git LFS pointer instead of file content from {url}") | |
# Create model directory if it doesn't exist | |
os.makedirs(destination, exist_ok=True) | |
# For non-MMS models, handle tar.bz2 files | |
tar_path = os.path.join(destination, "model.tar.bz2") | |
# Download the file | |
print("Downloading model archive...") | |
response = requests.get(url, stream=True) | |
total_size = int(response.headers.get('content-length', 0)) | |
block_size = 8192 | |
downloaded = 0 | |
print(f"Total size: {total_size / (1024*1024):.1f} MB") | |
with open(tar_path, "wb") as f: | |
for chunk in response.iter_content(chunk_size=block_size): | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
if total_size > 0: | |
percent = int((downloaded / total_size) * 100) | |
if percent % 10 == 0: | |
print(f" {percent}%", end="", flush=True) | |
print("\nDownload complete") | |
# Extract the tar.bz2 file | |
print(f"Extracting {tar_path} to {destination}") | |
try: | |
with tarfile.open(tar_path, "r:bz2") as tar: | |
tar.extractall(path=destination) | |
os.remove(tar_path) | |
print("Extraction complete") | |
except Exception as e: | |
print(f"Error during extraction: {str(e)}") | |
raise | |
print("Contents of destination directory:") | |
for root, dirs, files in os.walk(destination): | |
print(f"\nDirectory: {root}") | |
if dirs: | |
print(" Subdirectories:", dirs) | |
if files: | |
print(" Files:", files) | |
def find_model_files(model_dir): | |
"""Find model files in the given directory and its subdirectories.""" | |
model_files = {} | |
# Check if this is an MMS model | |
is_mms = 'mms' in os.path.basename(model_dir).lower() | |
for root, _, files in os.walk(model_dir): | |
for file in files: | |
file_path = os.path.join(root, file) | |
# Model file | |
if file.endswith('.onnx'): | |
model_files['model'] = file_path | |
# Tokens file | |
elif file == 'tokens.txt': | |
model_files['tokens'] = file_path | |
# Lexicon file (only for non-MMS models) | |
elif file == 'lexicon.txt' and not is_mms: | |
model_files['lexicon'] = file_path | |
# Create empty lexicon file if needed (only for non-MMS models) | |
if not is_mms and 'model' in model_files and 'lexicon' not in model_files: | |
model_dir = os.path.dirname(model_files['model']) | |
lexicon_path = os.path.join(model_dir, 'lexicon.txt') | |
with open(lexicon_path, 'w', encoding='utf-8') as f: | |
pass # Create empty file | |
model_files['lexicon'] = lexicon_path | |
return model_files if 'model' in model_files else {} | |
def generate_audio(text, model_info): | |
"""Generate audio from text using the specified model.""" | |
try: | |
model_dir = os.path.join("./models", model_info['id']) | |
print(f"\nLooking for model in: {model_dir}") | |
# Download model if it doesn't exist | |
if not os.path.exists(model_dir): | |
print(f"Model directory doesn't exist, downloading {model_info['id']}...") | |
os.makedirs(model_dir, exist_ok=True) | |
download_and_extract_model(model_info['url'], model_dir) | |
print(f"Contents of {model_dir}:") | |
for item in os.listdir(model_dir): | |
item_path = os.path.join(model_dir, item) | |
if os.path.isdir(item_path): | |
print(f" Directory: {item}") | |
print(f" Contents: {os.listdir(item_path)}") | |
else: | |
print(f" File: {item}") | |
# Find and validate model files | |
model_files = find_model_files(model_dir) | |
if not model_files or 'model' not in model_files: | |
raise ValueError(f"Could not find required model files in {model_dir}") | |
print("\nFound model files:") | |
print(f"Model: {model_files['model']}") | |
print(f"Tokens: {model_files.get('tokens', 'Not found')}") | |
print(f"Lexicon: {model_files.get('lexicon', 'Not required for MMS')}\n") | |
# Check if this is an MMS model | |
is_mms = 'mms' in os.path.basename(model_dir).lower() | |
# Create configuration based on model type | |
if is_mms: | |
if 'tokens' not in model_files or not os.path.exists(model_files['tokens']): | |
raise ValueError("tokens.txt is required for MMS models") | |
# MMS models use tokens.txt and no lexicon | |
vits_config = sherpa_onnx.OfflineTtsVitsModelConfig( | |
model_files['model'], # model | |
'', # lexicon | |
model_files['tokens'], # tokens | |
'', # data_dir | |
'', # dict_dir | |
0.667, # noise_scale | |
0.8, # noise_scale_w | |
1.0 # length_scale | |
) | |
else: | |
# Non-MMS models use lexicon.txt | |
if 'tokens' not in model_files or not os.path.exists(model_files['tokens']): | |
raise ValueError("tokens.txt is required for VITS models") | |
# Set data dir if it exists | |
espeak_data = os.path.join(os.path.dirname(model_files['model']), 'espeak-ng-data') | |
data_dir = espeak_data if os.path.exists(espeak_data) else '' | |
# Get lexicon path if it exists | |
lexicon = model_files.get('lexicon', '') if os.path.exists(model_files.get('lexicon', '')) else '' | |
# Create VITS model config | |
vits_config = sherpa_onnx.OfflineTtsVitsModelConfig( | |
model_files['model'], # model | |
lexicon, # lexicon | |
model_files['tokens'], # tokens | |
data_dir, # data_dir | |
'', # dict_dir | |
0.667, # noise_scale | |
0.8, # noise_scale_w | |
1.0 # length_scale | |
) | |
# Create the model config with VITS | |
model_config = sherpa_onnx.OfflineTtsModelConfig() | |
model_config.vits = vits_config | |
# Create TTS configuration | |
config = sherpa_onnx.OfflineTtsConfig( | |
model=model_config, | |
max_num_sentences=2 | |
) | |
# Initialize TTS engine | |
tts = sherpa_onnx.OfflineTts(config) | |
# Generate audio | |
audio_data = tts.generate(text) | |
# Ensure we have valid audio data | |
if audio_data is None or len(audio_data.samples) == 0: | |
raise ValueError("Failed to generate audio - no data generated") | |
# Convert samples list to numpy array and normalize | |
audio_array = np.array(audio_data.samples, dtype=np.float32) | |
if np.any(audio_array): # Check if array is not all zeros | |
audio_array = audio_array / np.abs(audio_array).max() | |
else: | |
raise ValueError("Generated audio is empty") | |
# Return in Gradio's expected format (numpy array, sample rate) | |
return (audio_array, audio_data.sample_rate) | |
except Exception as e: | |
error_msg = str(e) | |
# Check for OOV or token conversion errors | |
if "out of vocabulary" in error_msg.lower() or "token" in error_msg.lower(): | |
error_msg = f"Text contains unsupported characters: {error_msg}" | |
print(f"Error generating audio: {error_msg}") | |
print(f"Error in TTS generation: {error_msg}") | |
raise | |
def tts_interface(selected_model, text, translate_enabled, status_output): | |
try: | |
if not text.strip(): | |
return None, "Please enter some text" | |
# Get model ID from the display name mapping | |
model_id = models_by_display.get(selected_model) | |
if not model_id or model_id not in models: | |
return None, "Please select a model" | |
model_info = models[model_id] | |
# Get the language code and check if translation is needed | |
lang_code = get_language_code(model_info) | |
translate_code = get_translate_code(lang_code) | |
# Store original text for status message | |
original_text = text | |
translated_text = None | |
was_translated = False | |
# Only translate if translation is enabled and needed | |
if translate_enabled and translate_code and translate_code != "en": | |
if not translate_code: | |
return None, f"Cannot determine translation target language from code: {lang_code}" | |
print(f"Translating to {translate_code}") | |
translated_text = translate_text(text, "en", translate_code) | |
if translated_text != text: # Only mark as translated if text actually changed | |
was_translated = True | |
text = translated_text | |
try: | |
# Update status with language info | |
lang_info = model_info.get('language', [{}])[0] | |
lang_name = lang_info.get('language_name', 'Unknown') | |
voice_name = model_info.get('name', model_id) | |
status = f"Generating speech using {voice_name} ({lang_name})..." | |
# Generate audio | |
audio_data, sample_rate = generate_audio(text, model_info) | |
# Include translation info in final status if text was actually translated | |
final_status = f"Generated speech using {voice_name} ({lang_name})" | |
if was_translated: | |
final_status += f"\nTranslated: '{original_text}' → '{translated_text}'" | |
else: | |
final_status += f"\nText: '{text}'" | |
return (sample_rate, audio_data), final_status | |
except ValueError as e: | |
# Handle known errors with user-friendly messages | |
error_msg = str(e) | |
if "cannot process some words" in error_msg.lower(): | |
return None, error_msg | |
return None, f"Error: {error_msg}" | |
except Exception as e: | |
print(f"Error in TTS generation: {str(e)}") | |
error_msg = str(e) | |
return None, f"Error: {error_msg}" | |
# Gradio Interface | |
with gr.Blocks() as app: | |
gr.Markdown("# Sherpa-ONNX TTS Demo") | |
with gr.Row(): | |
with gr.Column(): | |
model_dropdown = gr.Dropdown( | |
choices=dropdown_choices, | |
label="Select Model", | |
value=dropdown_choices[0] if dropdown_choices else None | |
) | |
text_input = gr.Textbox( | |
label="Text to speak", | |
placeholder="Enter text here...", | |
lines=3 | |
) | |
translate_checkbox = gr.Checkbox( | |
label="Translate to model language", | |
value=False | |
) | |
with gr.Row(): | |
generate_btn = gr.Button("Generate Audio") | |
stop_btn = gr.Button("Stop") | |
with gr.Column(): | |
audio_output = gr.Audio( | |
label="Generated Audio", | |
type="numpy" | |
) | |
status_text = gr.Textbox( | |
label="Status", | |
interactive=False | |
) | |
def on_model_change(model_name): | |
# Get model info | |
model_id = models_by_display.get(model_name) | |
if not model_id or model_id not in models: | |
return False | |
model_info = models[model_id] | |
lang_code = get_language_code(model_info) | |
# Auto-check translation for non-English models | |
should_translate = lang_code and lang_code.lower() != "en" | |
return should_translate | |
# Update translation checkbox when model changes | |
model_dropdown.change( | |
fn=on_model_change, | |
inputs=[model_dropdown], | |
outputs=[translate_checkbox] | |
) | |
# Set up event handlers | |
gen_event = generate_btn.click( | |
fn=tts_interface, | |
inputs=[model_dropdown, text_input, translate_checkbox, status_text], | |
outputs=[audio_output, status_text] | |
) | |
stop_btn.click( | |
fn=None, | |
cancels=gen_event, | |
queue=False | |
) | |
app.launch() | |