Spaces:
Running
Running
import os | |
import json | |
import gradio as gr | |
import shutil | |
import subprocess | |
import requests | |
import tarfile | |
from pathlib import Path | |
import soundfile as sf | |
import sherpa_onnx | |
import numpy as np | |
models = [ | |
['mms fa','https://huggingface.co/willwade/mms-tts-multilingual-models-onnx/resolve/main/fas'], | |
['coqui-vits-female1-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/persian-tts-female1-vits-coqui'], | |
['coqui-vits-male1-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/persian-tts-male1-vits-coqui'], | |
['coqui-vits-male-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/male-male-coqui-vits'], | |
['coqui-vits-female-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/female-female-coqui-vits'], | |
['coqui-vits-female-GPTInformal-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/female-GPTInformal-coqui-vits'], | |
['coqui-vits-male-SmartGitiCorp','https://huggingface.co/karim23657/persian-tts-vits/tree/main/male-SmartGitiCorp-coqui-vits'], | |
['vits-piper-fa-gyro-medium','https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/vits-piper-fa_IR-gyro-medium.tar.bz2'], | |
['piper-fa-amir-medium','https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/vits-piper-fa_IR-amir-medium.tar.bz2'], | |
['vits-mimic3-fa-haaniye_low','https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/vits-mimic3-fa-haaniye_low.tar.bz2'], | |
['vits-piper-fa_en-rezahedayatfar-ibrahimwalk-medium','https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/vits-piper-fa_en-rezahedayatfar-ibrahimwalk-medium.tar.bz2'], | |
] | |
models_n =[ | |
[ | |
"mms fa", | |
"🌠 راد" | |
], | |
[ | |
"coqui-vits-female1-karim23657", | |
"🌺 نگار" | |
], | |
[ | |
"coqui-vits-male1-karim23657", | |
"🌟 آرش" | |
], | |
[ | |
"coqui-vits-male-karim23657", | |
"🦁 کیان" | |
], | |
[ | |
"coqui-vits-female-karim23657", | |
"🌷 مهتاب" | |
], | |
[ | |
"coqui-vits-female-GPTInformal-karim23657", | |
"🌼 شیوا" | |
], | |
[ | |
"coqui-vits-male-SmartGitiCorp", | |
"🚀 بهمن" | |
], | |
[ | |
"vits-piper-fa-gyro-medium", | |
"💧 نیما" | |
], | |
[ | |
"piper-fa-amir-medium", | |
"⚡️ آریا" | |
], | |
[ | |
"vits-mimic3-fa-haaniye_low", | |
"🌹 ریما" | |
], | |
[ | |
"vits-piper-fa_en-rezahedayatfar-ibrahimwalk-medium", | |
"🌠 پیام" | |
] | |
] | |
for i,m in enumerate(models): | |
models_n[i][0]=m[1] | |
dropdown_choices = list([i[1] for i in models_n]) | |
def download_and_extract_model(url, destination): | |
"""Download and extract the model files.""" | |
print(f"Downloading from URL: {url}") | |
print(f"Destination: {destination}") | |
# Convert Hugging Face URL format if needed | |
if "huggingface.co" in url: | |
# Replace /tree/main/ with /resolve/main/ for direct file download | |
base_url = url.replace("/tree/main/", "/resolve/main/") | |
model_id = base_url.split("/")[-1] | |
# Check if this is an MMS model | |
is_mms_model = True | |
if is_mms_model: | |
# MMS models have both model.onnx and tokens.txt | |
model_url = f"{base_url}/model.onnx" | |
tokens_url = f"{base_url}/tokens.txt" | |
# Download model.onnx | |
print("Downloading model.onnx...") | |
model_path = os.path.join(destination, "model.onnx") | |
response = requests.get(model_url, stream=True) | |
if response.status_code != 200: | |
raise Exception(f"Failed to download model from {model_url}. Status code: {response.status_code}") | |
total_size = int(response.headers.get('content-length', 0)) | |
block_size = 8192 | |
downloaded = 0 | |
print(f"Total size: {total_size / (1024*1024):.1f} MB") | |
with open(model_path, "wb") as f: | |
for chunk in response.iter_content(chunk_size=block_size): | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
if total_size > 0: | |
percent = int((downloaded / total_size) * 100) | |
if percent % 10 == 0: | |
print(f" {percent}%", end="", flush=True) | |
print("\nModel download complete") | |
# Download tokens.txt | |
print("Downloading tokens.txt...") | |
tokens_path = os.path.join(destination, "tokens.txt") | |
response = requests.get(tokens_url, stream=True) | |
if response.status_code != 200: | |
raise Exception(f"Failed to download tokens from {tokens_url}. Status code: {response.status_code}") | |
with open(tokens_path, "wb") as f: | |
f.write(response.content) | |
print("Tokens download complete") | |
return | |
else: | |
# Other models are stored as tar.bz2 files | |
url = f"{base_url}.tar.bz2" | |
# Try the URL | |
response = requests.get(url, stream=True) | |
if response.status_code != 200: | |
raise Exception(f"Failed to download model from {url}. Status code: {response.status_code}") | |
# Check if this is a Git LFS file pointer | |
content_start = response.content[:100].decode('utf-8', errors='ignore') | |
if content_start.startswith('version https://git-lfs.github.com/spec/v1'): | |
raise Exception(f"Received Git LFS pointer instead of file content from {url}") | |
# Create model directory if it doesn't exist | |
os.makedirs(destination, exist_ok=True) | |
# For non-MMS models, handle tar.bz2 files | |
tar_path = os.path.join(destination, "model.tar.bz2") | |
# Download the file | |
print("Downloading model archive...") | |
response = requests.get(url, stream=True) | |
total_size = int(response.headers.get('content-length', 0)) | |
block_size = 8192 | |
downloaded = 0 | |
print(f"Total size: {total_size / (1024*1024):.1f} MB") | |
with open(tar_path, "wb") as f: | |
for chunk in response.iter_content(chunk_size=block_size): | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
if total_size > 0: | |
percent = int((downloaded / total_size) * 100) | |
if percent % 10 == 0: | |
print(f" {percent}%", end="", flush=True) | |
print("\nDownload complete") | |
# Extract the tar.bz2 file | |
print(f"Extracting {tar_path} to {destination}") | |
try: | |
with tarfile.open(tar_path, "r:bz2") as tar: | |
tar.extractall(path=destination) | |
os.remove(tar_path) | |
print("Extraction complete") | |
except Exception as e: | |
print(f"Error during extraction: {str(e)}") | |
raise | |
print("Contents of destination directory:") | |
for root, dirs, files in os.walk(destination): | |
print(f"\nDirectory: {root}") | |
if dirs: | |
print(" Subdirectories:", dirs) | |
if files: | |
print(" Files:", files) | |
def dl_espeak_data(): | |
# Download the file | |
tar_path='espeak-ng-data.tar.bz2' | |
print("Downloading model archive...") | |
response = requests.get('https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/espeak-ng-data.tar.bz2', stream=True) | |
total_size = int(response.headers.get('content-length', 0)) | |
block_size = 8192 | |
downloaded = 0 | |
print(f"Total size: {total_size / (1024*1024):.1f} MB") | |
with open(tar_path, "wb") as f: | |
for chunk in response.iter_content(chunk_size=block_size): | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
if total_size > 0: | |
percent = int((downloaded / total_size) * 100) | |
if percent % 10 == 0: | |
print(f" {percent}%", end="", flush=True) | |
print("\nDownload complete") | |
# Extract the tar.bz2 file | |
destination=os.path.dirname(os.path.abspath(__file__)) | |
print(f"Extracting {tar_path} to {destination}") | |
try: | |
with tarfile.open(tar_path, "r:bz2") as tar: | |
tar.extractall(path=destination) | |
os.remove(tar_path) | |
print("Extraction complete") | |
except Exception as e: | |
print(f"Error during extraction: {str(e)}") | |
raise | |
print("Contents of destination directory:") | |
for root, dirs, files in os.walk(destination): | |
print(f"\nDirectory: {root}") | |
if dirs: | |
print(" Subdirectories:", dirs) | |
if files: | |
print(" Files:", files) | |
dl_espeak_data() | |
def find_model_files(model_dir): | |
"""Find model files in the given directory and its subdirectories.""" | |
model_files = {} | |
# Check if this is an MMS model | |
is_mms = True | |
for root, _, files in os.walk(model_dir): | |
for file in files: | |
file_path = os.path.join(root, file) | |
# Model file | |
if file.endswith('.onnx'): | |
model_files['model'] = file_path | |
# Tokens file | |
elif file == 'tokens.txt': | |
model_files['tokens'] = file_path | |
# Lexicon file (only for non-MMS models) | |
elif file == 'lexicon.txt' and not is_mms: | |
model_files['lexicon'] = file_path | |
# Create empty lexicon file if needed (only for non-MMS models) | |
if not is_mms and 'model' in model_files and 'lexicon' not in model_files: | |
model_dir = os.path.dirname(model_files['model']) | |
lexicon_path = os.path.join(model_dir, 'lexicon.txt') | |
with open(lexicon_path, 'w', encoding='utf-8') as f: | |
pass # Create empty file | |
model_files['lexicon'] = lexicon_path | |
return model_files if 'model' in model_files else {} | |
def generate_audio(text, model_info): | |
"""Generate audio from text using the specified model.""" | |
try: | |
model_dir = os.path.join("./models", model_info) | |
print(f"\nLooking for model in: {model_dir}") | |
# Download model if it doesn't exist | |
if not os.path.exists(model_dir): | |
print(f"Model directory doesn't exist, downloading {model_info}...") | |
os.makedirs(model_dir, exist_ok=True) | |
for i in models_n: | |
if model_info == i[1]: | |
model_url=i[0] | |
download_and_extract_model(model_url, model_dir) | |
print(f"Contents of {model_dir}:") | |
for item in os.listdir(model_dir): | |
item_path = os.path.join(model_dir, item) | |
if os.path.isdir(item_path): | |
print(f" Directory: {item}") | |
print(f" Contents: {os.listdir(item_path)}") | |
else: | |
print(f" File: {item}") | |
# Find and validate model files | |
model_files = find_model_files(model_dir) | |
if not model_files or 'model' not in model_files: | |
raise ValueError(f"Could not find required model files in {model_dir}") | |
print("\nFound model files:") | |
print(f"Model: {model_files['model']}") | |
print(f"Tokens: {model_files.get('tokens', 'Not found')}") | |
print(f"Lexicon: {model_files.get('lexicon', 'Not required for MMS')}\n") | |
# Check if this is an MMS model | |
is_mms = 'mms' in os.path.basename(model_dir).lower() | |
# Create configuration based on model type | |
if is_mms: | |
if 'tokens' not in model_files or not os.path.exists(model_files['tokens']): | |
raise ValueError("tokens.txt is required for MMS models") | |
# MMS models use tokens.txt and no lexicon | |
vits_config = sherpa_onnx.OfflineTtsVitsModelConfig( | |
model_files['model'], # model | |
'', # lexicon | |
model_files['tokens'], # tokens | |
'', # data_dir | |
'', # dict_dir | |
0.667, # noise_scale | |
0.8, # noise_scale_w | |
1.0 # length_scale | |
) | |
else: | |
# Non-MMS models use lexicon.txt | |
if 'tokens' not in model_files or not os.path.exists(model_files['tokens']): | |
raise ValueError("tokens.txt is required for VITS models") | |
# Set data dir if it exists | |
espeak_data = os.path.join(os.path.dirname(model_files['model']), 'espeak-ng-data') | |
data_dir = espeak_data if os.path.exists(espeak_data) else 'espeak-ng-data' | |
# Get lexicon path if it exists | |
lexicon = model_files.get('lexicon', '') if os.path.exists(model_files.get('lexicon', '')) else '' | |
# Create VITS model config | |
vits_config = sherpa_onnx.OfflineTtsVitsModelConfig( | |
model_files['model'], # model | |
lexicon, # lexicon | |
model_files['tokens'], # tokens | |
data_dir, # data_dir | |
'', # dict_dir | |
0.667, # noise_scale | |
0.8, # noise_scale_w | |
1.0 # length_scale | |
) | |
# Create the model config with VITS | |
model_config = sherpa_onnx.OfflineTtsModelConfig() | |
model_config.vits = vits_config | |
# Create TTS configuration | |
config = sherpa_onnx.OfflineTtsConfig( | |
model=model_config, | |
max_num_sentences=2 | |
) | |
# Initialize TTS engine | |
tts = sherpa_onnx.OfflineTts(config) | |
# Generate audio | |
audio_data = tts.generate(text) | |
# Ensure we have valid audio data | |
if audio_data is None or len(audio_data.samples) == 0: | |
raise ValueError("Failed to generate audio - no data generated") | |
# Convert samples list to numpy array and normalize | |
audio_array = np.array(audio_data.samples, dtype=np.float32) | |
if np.any(audio_array): # Check if array is not all zeros | |
audio_array = audio_array / np.abs(audio_array).max() | |
else: | |
raise ValueError("Generated audio is empty") | |
# Return in Gradio's expected format (numpy array, sample rate) | |
return (audio_array, audio_data.sample_rate) | |
except Exception as e: | |
error_msg = str(e) | |
# Check for OOV or token conversion errors | |
if "out of vocabulary" in error_msg.lower() or "token" in error_msg.lower(): | |
error_msg = f"Text contains unsupported characters: {error_msg}" | |
print(f"Error generating audio: {error_msg}") | |
print(f"Error in TTS generation: {error_msg}") | |
raise | |
def tts_interface(selected_model, text, status_output): | |
try: | |
if not text.strip(): | |
return None, "Please enter some text" | |
model_id = selected_model | |
# Store original text for status message | |
original_text = text | |
try: | |
# Update status with language info | |
voice_name = model_id | |
status = f"Generating speech using {voice_name} ..." | |
# Generate audio | |
audio_data, sample_rate = generate_audio(text, model_id) | |
# Include translation info in final status if text was actually translated | |
final_status = f"Generated speech using {voice_name}" | |
final_status += f"\nText: '{text}'" | |
return (sample_rate, audio_data), final_status | |
except ValueError as e: | |
# Handle known errors with user-friendly messages | |
error_msg = str(e) | |
if "cannot process some words" in error_msg.lower(): | |
return None, error_msg | |
return None, f"Error: {error_msg}" | |
except Exception as e: | |
print(f"Error in TTS generation: {str(e)}") | |
error_msg = str(e) | |
return None, f"Error: {error_msg}" | |
# Gradio Interface | |
with gr.Blocks() as app: | |
gr.Markdown("# Sherpa-ONNX متن به گفتار") | |
with gr.Row(): | |
with gr.Column(): | |
model_dropdown = gr.Radio( | |
choices=dropdown_choices, | |
label="مدل", | |
value=dropdown_choices[0] if dropdown_choices else None | |
) | |
text_input = gr.Textbox( | |
label="متن", | |
placeholder="متن را وارد کنید ...", | |
lines=3 | |
) | |
with gr.Row(): | |
generate_btn = gr.Button("بگو") | |
stop_btn = gr.Button("توقف") | |
with gr.Column(): | |
audio_output = gr.Audio( | |
label="گفتار", | |
type="numpy" | |
) | |
status_text = gr.Textbox( | |
label="وضعیت", | |
interactive=False | |
) | |
# Set up event handlers | |
gen_event = generate_btn.click( | |
fn=tts_interface, | |
inputs=[model_dropdown, text_input, status_text], | |
outputs=[audio_output, status_text] | |
) | |
stop_btn.click( | |
fn=None, | |
cancels=gen_event, | |
queue=False | |
) | |
app.launch() |