import os import json import gradio as gr import shutil import subprocess import requests import tarfile from pathlib import Path import soundfile as sf import sherpa_onnx from deep_translator import GoogleTranslator import numpy as np from iso639 import Lang import pycountry # Load model JSON MODEL_JSON_URL = "https://github.com/willwade/tts-wrapper/blob/main/tts_wrapper/engines/sherpaonnx/merged_models.json" MODEL_JSON_PATH = "./models.json" # Load models if not os.path.exists(MODEL_JSON_PATH): response = requests.get(MODEL_JSON_URL.replace("/blob/", "/raw/")) with open(MODEL_JSON_PATH, "w") as f: f.write(response.text) with open(MODEL_JSON_PATH, "r") as f: models = json.load(f) def get_model_display_info(model_info): """Create a display string for a model.""" # Get language info lang_info = model_info.get('language', [{}])[0] lang_name = lang_info.get('language_name', lang_info.get('Language Name', 'Unknown')) lang_code = lang_info.get('lang_code', lang_info.get('Iso Code', 'Unknown')) # Get model info voice_name = model_info.get('name', model_info.get('id', 'Unknown')) developer = model_info.get('developer', '') quality = model_info.get('quality', 'MMS' if 'mms' in voice_name.lower() else '') # Create display name model_display = f"{voice_name} ({developer}" if quality: model_display += f" - {quality}" model_display += ")" # Combine language and model info return f"{lang_name} ({lang_code}) | {model_display}" # Group models by language models_by_lang = {} for model_id, model_info in models.items(): # Get language info from the first language in the list lang_info = model_info.get('language', [{}])[0] lang_name = lang_info.get('language_name', lang_info.get('Language Name', 'Unknown')) lang_code = lang_info.get('lang_code', lang_info.get('Iso Code', 'Unknown')) group_key = f"{lang_name} ({lang_code})" if group_key not in models_by_lang: models_by_lang[group_key] = [] # Add model to language group models_by_lang[group_key].append((get_model_display_info(model_info), model_id)) # Create dropdown choices with model IDs as values dropdown_choices = [] models_by_display = {} # Map display names to model IDs for lang, model_list in sorted(models_by_lang.items()): # Add all models in this language group for display_name, model_id in sorted(model_list): dropdown_choices.append(display_name) models_by_display[display_name] = model_id def get_language_code(model_info): """Get the language code.""" if not model_info.get("language"): return None lang_info = model_info["language"][0] # Try both key formats for language code lang_code = lang_info.get("lang_code", lang_info.get("Iso Code", "")).lower() return lang_code # Special cases for codes not in ISO standard SPECIAL_CODES = { "cmn": "zh", # Mandarin Chinese "yue": "zh", # Cantonese "pi": "el", # Pali (using Greek for this model) "guj": "gu", # Gujarati } def get_translate_code(iso_code): """Convert ISO code to Google Translate code.""" if not iso_code: return None # Remove any script or dialect specifiers base_code = iso_code.split('-')[0].lower() # Check special cases first if base_code in SPECIAL_CODES: return SPECIAL_CODES[base_code] try: # Try to get the ISO 639-1 (2-letter) code lang = Lang(base_code) return lang.pt1 except: # If that fails, try to find a matching language in pycountry try: lang = pycountry.languages.get(alpha_3=base_code) if lang and hasattr(lang, 'alpha_2'): return lang.alpha_2 except: pass # If all else fails, try to use the original code if len(base_code) == 2: return base_code return None def translate_text(input_text, source_lang="en", target_lang="en"): """Translate text using Google Translator.""" if source_lang == target_lang: return input_text try: # Convert ISO code to Google Translate code target_lang = get_translate_code(target_lang) try: translated = GoogleTranslator(source=source_lang, target=target_lang).translate(input_text) return f"{translated} (translated from: {input_text})" except Exception as first_error: # If the first attempt fails with the mapped code, try with the original try: translated = GoogleTranslator(source=source_lang, target=target_lang).translate(input_text) return f"{translated} (translated from: {input_text})" except: raise first_error except Exception as e: print(f"Translation error: {str(e)} for target language: {target_lang}") print(f"Attempted to use language code: {target_lang}") return f"Translation Error: Could not translate to {target_lang}. Original text: {input_text}" def download_and_extract_model(url, destination): """Download and extract the model files.""" print(f"Downloading from URL: {url}") print(f"Destination: {destination}") # Convert Hugging Face URL format if needed if "huggingface.co" in url: # Replace /tree/main/ with /resolve/main/ for direct file download base_url = url.replace("/tree/main/", "/resolve/main/") model_id = base_url.split("/")[-1] # Check if this is an MMS model is_mms_model = "mms-tts-multilingual-models-onnx" in url if is_mms_model: # MMS models have both model.onnx and tokens.txt model_url = f"{base_url}/model.onnx" tokens_url = f"{base_url}/tokens.txt" # Download model.onnx print("Downloading model.onnx...") model_path = os.path.join(destination, "model.onnx") response = requests.get(model_url, stream=True) if response.status_code != 200: raise Exception(f"Failed to download model from {model_url}. Status code: {response.status_code}") total_size = int(response.headers.get('content-length', 0)) block_size = 8192 downloaded = 0 print(f"Total size: {total_size / (1024*1024):.1f} MB") with open(model_path, "wb") as f: for chunk in response.iter_content(chunk_size=block_size): if chunk: f.write(chunk) downloaded += len(chunk) if total_size > 0: percent = int((downloaded / total_size) * 100) if percent % 10 == 0: print(f" {percent}%", end="", flush=True) print("\nModel download complete") # Download tokens.txt print("Downloading tokens.txt...") tokens_path = os.path.join(destination, "tokens.txt") response = requests.get(tokens_url, stream=True) if response.status_code != 200: raise Exception(f"Failed to download tokens from {tokens_url}. Status code: {response.status_code}") with open(tokens_path, "wb") as f: f.write(response.content) print("Tokens download complete") return else: # Other models are stored as tar.bz2 files url = f"{base_url}.tar.bz2" # Try the URL response = requests.get(url, stream=True) if response.status_code != 200: raise Exception(f"Failed to download model from {url}. Status code: {response.status_code}") # Check if this is a Git LFS file pointer content_start = response.content[:100].decode('utf-8', errors='ignore') if content_start.startswith('version https://git-lfs.github.com/spec/v1'): raise Exception(f"Received Git LFS pointer instead of file content from {url}") # Create model directory if it doesn't exist os.makedirs(destination, exist_ok=True) # For non-MMS models, handle tar.bz2 files tar_path = os.path.join(destination, "model.tar.bz2") # Download the file print("Downloading model archive...") response = requests.get(url, stream=True) total_size = int(response.headers.get('content-length', 0)) block_size = 8192 downloaded = 0 print(f"Total size: {total_size / (1024*1024):.1f} MB") with open(tar_path, "wb") as f: for chunk in response.iter_content(chunk_size=block_size): if chunk: f.write(chunk) downloaded += len(chunk) if total_size > 0: percent = int((downloaded / total_size) * 100) if percent % 10 == 0: print(f" {percent}%", end="", flush=True) print("\nDownload complete") # Extract the tar.bz2 file print(f"Extracting {tar_path} to {destination}") try: with tarfile.open(tar_path, "r:bz2") as tar: tar.extractall(path=destination) os.remove(tar_path) print("Extraction complete") except Exception as e: print(f"Error during extraction: {str(e)}") raise print("Contents of destination directory:") for root, dirs, files in os.walk(destination): print(f"\nDirectory: {root}") if dirs: print(" Subdirectories:", dirs) if files: print(" Files:", files) def find_model_files(model_dir): """Find model files in the given directory and its subdirectories.""" model_files = {} # Check if this is an MMS model is_mms = 'mms' in os.path.basename(model_dir).lower() for root, _, files in os.walk(model_dir): for file in files: file_path = os.path.join(root, file) # Model file if file.endswith('.onnx'): model_files['model'] = file_path # Tokens file elif file == 'tokens.txt': model_files['tokens'] = file_path # Lexicon file (only for non-MMS models) elif file == 'lexicon.txt' and not is_mms: model_files['lexicon'] = file_path # Create empty lexicon file if needed (only for non-MMS models) if not is_mms and 'model' in model_files and 'lexicon' not in model_files: model_dir = os.path.dirname(model_files['model']) lexicon_path = os.path.join(model_dir, 'lexicon.txt') with open(lexicon_path, 'w', encoding='utf-8') as f: pass # Create empty file model_files['lexicon'] = lexicon_path return model_files if 'model' in model_files else {} def generate_audio(text, model_info): """Generate audio from text using the specified model.""" try: model_dir = os.path.join("./models", model_info['id']) print(f"\nLooking for model in: {model_dir}") # Download model if it doesn't exist if not os.path.exists(model_dir): print(f"Model directory doesn't exist, downloading {model_info['id']}...") os.makedirs(model_dir, exist_ok=True) download_and_extract_model(model_info['url'], model_dir) print(f"Contents of {model_dir}:") for item in os.listdir(model_dir): item_path = os.path.join(model_dir, item) if os.path.isdir(item_path): print(f" Directory: {item}") print(f" Contents: {os.listdir(item_path)}") else: print(f" File: {item}") # Find and validate model files model_files = find_model_files(model_dir) if not model_files or 'model' not in model_files: raise ValueError(f"Could not find required model files in {model_dir}") print("\nFound model files:") print(f"Model: {model_files['model']}") print(f"Tokens: {model_files.get('tokens', 'Not found')}") print(f"Lexicon: {model_files.get('lexicon', 'Not required for MMS')}\n") # Check if this is an MMS model is_mms = 'mms' in os.path.basename(model_dir).lower() # Create configuration based on model type if is_mms: if 'tokens' not in model_files or not os.path.exists(model_files['tokens']): raise ValueError("tokens.txt is required for MMS models") # MMS models use tokens.txt and no lexicon vits_config = sherpa_onnx.OfflineTtsVitsModelConfig( model_files['model'], # model '', # lexicon model_files['tokens'], # tokens '', # data_dir '', # dict_dir 0.667, # noise_scale 0.8, # noise_scale_w 1.0 # length_scale ) else: # Non-MMS models use lexicon.txt if 'tokens' not in model_files or not os.path.exists(model_files['tokens']): raise ValueError("tokens.txt is required for VITS models") # Set data dir if it exists espeak_data = os.path.join(os.path.dirname(model_files['model']), 'espeak-ng-data') data_dir = espeak_data if os.path.exists(espeak_data) else '' # Get lexicon path if it exists lexicon = model_files.get('lexicon', '') if os.path.exists(model_files.get('lexicon', '')) else '' # Create VITS model config vits_config = sherpa_onnx.OfflineTtsVitsModelConfig( model_files['model'], # model lexicon, # lexicon model_files['tokens'], # tokens data_dir, # data_dir '', # dict_dir 0.667, # noise_scale 0.8, # noise_scale_w 1.0 # length_scale ) # Create the model config with VITS model_config = sherpa_onnx.OfflineTtsModelConfig() model_config.vits = vits_config # Create TTS configuration config = sherpa_onnx.OfflineTtsConfig( model=model_config, max_num_sentences=2 ) # Initialize TTS engine tts = sherpa_onnx.OfflineTts(config) # Generate audio audio_data = tts.generate(text) # Ensure we have valid audio data if audio_data is None or len(audio_data.samples) == 0: raise ValueError("Failed to generate audio - no data generated") # Convert samples list to numpy array and normalize audio_array = np.array(audio_data.samples, dtype=np.float32) if np.any(audio_array): # Check if array is not all zeros audio_array = audio_array / np.abs(audio_array).max() else: raise ValueError("Generated audio is empty") # Return in Gradio's expected format (numpy array, sample rate) return (audio_array, audio_data.sample_rate) except Exception as e: error_msg = str(e) # Check for OOV or token conversion errors if "out of vocabulary" in error_msg.lower() or "token" in error_msg.lower(): error_msg = f"Text contains unsupported characters: {error_msg}" print(f"Error generating audio: {error_msg}") print(f"Error in TTS generation: {error_msg}") raise def tts_interface(selected_model, text, translate_enabled, status_output): try: if not text.strip(): return None, "Please enter some text" # Get model ID from the display name mapping model_id = models_by_display.get(selected_model) if not model_id or model_id not in models: return None, "Please select a model" model_info = models[model_id] # Check if this is an MMS model is_mms = 'mms' in model_id.lower() # Get the language code and check if translation is needed lang_code = get_language_code(model_info) translate_code = get_translate_code(lang_code) # For MMS models, we always need to translate if is_mms: if not translate_code: return None, f"Cannot determine translation target language from code: {lang_code}" print(f"MMS model detected, translating to {translate_code}") text = translate_text(text, "en", translate_code) # For other models, check if translation is enabled and needed elif translate_enabled and translate_code and translate_code != "en": if not translate_code: return None, f"Cannot determine translation target language from code: {lang_code}" print(f"Will translate to {translate_code} (from ISO code {lang_code})") text = translate_text(text, "en", translate_code) try: # Update status with language info lang_info = model_info.get('language', [{}])[0] lang_name = lang_info.get('language_name', 'Unknown') voice_name = model_info.get('name', model_id) status = f"Generating speech using {voice_name} ({lang_name})..." # Generate audio audio_data, sample_rate = generate_audio(text, model_info) return (sample_rate, audio_data), f"Generated speech using {voice_name} ({lang_name})" except ValueError as e: # Handle known errors with user-friendly messages error_msg = str(e) if "cannot process some words" in error_msg.lower(): return None, error_msg return None, f"Error: {error_msg}" except Exception as e: print(f"Error in TTS generation: {str(e)}") error_msg = str(e) return None, f"Error: {error_msg}" # Gradio Interface with gr.Blocks() as app: gr.Markdown("# Sherpa-ONNX TTS Demo") with gr.Row(): with gr.Column(): model_dropdown = gr.Dropdown( choices=dropdown_choices, label="Select Model", value=dropdown_choices[0] if dropdown_choices else None ) text_input = gr.Textbox( label="Text to speak", placeholder="Enter text here...", lines=3 ) translate_checkbox = gr.Checkbox( label="Translate to model language", value=False ) with gr.Row(): generate_btn = gr.Button("Generate Audio") stop_btn = gr.Button("Stop") with gr.Column(): audio_output = gr.Audio( label="Generated Audio", type="numpy" ) status_text = gr.Textbox( label="Status", interactive=False ) # Handle model selection to update translate checkbox def update_translate_checkbox(selected_model): """Update visibility of translate checkbox based on selected model's language.""" try: # Find the model info for the selected model for lang_group in models_by_lang.values(): for display_name, model_id in lang_group: if display_name == selected_model: model_info = models[model_id] lang_info = model_info.get('language', [{}])[0] lang_code = lang_info.get('lang_code', '') return {"visible": lang_code != 'en'} return {"visible": False} except Exception as e: print(f"Error updating translate checkbox: {str(e)}") return {"visible": False} model_dropdown.change( fn=update_translate_checkbox, inputs=[model_dropdown], outputs=[translate_checkbox] ) # Set up event handlers gen_event = generate_btn.click( fn=tts_interface, inputs=[model_dropdown, text_input, translate_checkbox, status_text], outputs=[audio_output, status_text] ) stop_btn.click( fn=None, cancels=gen_event, queue=False ) app.launch()