from flask import Flask, render_template, request, jsonify, send_from_directory , send_file
from flask_cors import CORS
import time
import uuid
import threading
from collections import deque
import subprocess
import os
app = Flask(__name__)
CORS(app)
# Set the paths for espeak-ng and the script directory
script_path = os.path.dirname(os.path.abspath(__file__))
espeak_path = "espeak-ng" # Change this to the full path if needed
static_folder = os.path.join(script_path, "static")
# Ensure the static folder exists
os.makedirs(static_folder, exist_ok=True)
class Component:
def __init__(self, component_type, label, options=None, value=None):
self.component_type = component_type
self.label = label
self.options = options
self.value = value
def render(self):
if self.component_type == "textbox":
return f''
elif self.component_type == "radio":
options_html = ''.join([f'''
''' for index,option in enumerate(self.options)])
return options_html
elif self.component_type == "audio":
return f''
elif self.component_type == "output":
return f'
Placeholder for {self.label}
'
elif self.component_type == "better_textbox":
return f'
{self.value}
'
else:
return ""
class Row:
def __init__(self, components):
self.components = components
def render(self):
components_html = ''.join([component.render() for component in self.components])
return f'
{components_html}
'
class Column:
def __init__(self, components):
self.components = components
def render(self):
components_html = ''.join([component.render() for component in self.components])
return f'
{components_html}
'
class GradioInterface:
def __init__(self, fn, input_components, output_components, max_parallel_tasks=5):
self.fn = fn
self.input_components = input_components
self.output_components = output_components
self.max_parallel_tasks = max_parallel_tasks
self.task_queue = deque()
self.results = {}
self.current_tasks = {}
self.task_threads = {}
self.stop_flags = {}
def submit(self, input_data):
task_id = str(uuid.uuid4())
if len(self.current_tasks) < self.max_parallel_tasks:
self.task_queue.append(task_id)
self.current_tasks[task_id] = input_data
stop_flag = threading.Event()
self.stop_flags[task_id] = stop_flag
thread = threading.Thread(target=self.process_task, args=(task_id, input_data, stop_flag))
self.task_threads[task_id] = thread
thread.start()
return task_id
else:
return None
def process_task(self, task_id, input_data, stop_flag):
result = self.fn(input_data)
self.results[task_id] = result
self.task_queue.remove(task_id)
del self.current_tasks[task_id]
del self.task_threads[task_id]
del self.stop_flags[task_id]
def get_result(self, task_id):
if task_id in self.results:
return {"status": "completed", "result": self.results[task_id]}
return {"status": "pending"}
def render_components(self):
input_html = ''.join([component.render() for component in self.input_components])
output_html = ''.join([component.render() for component in self.output_components])
return input_html #f"{input_html}{output_html}"
def get_queue_position(self):
return len(self.task_queue)
def stop_task(self, task_id):
if task_id in self.stop_flags:
self.stop_flags[task_id].set()
return True
return False
import os
import shutil
import subprocess
def find_espeak_ng_data_folder():
try:
# https://github.com/espeak-ng/espeak-ng/blob/b006f6d4f997fbfe4016cf29767743e6b397d0fb/src/espeak-ng.c#L309
data_folder = subprocess.check_output([espeak_path, '--version']).decode().split('Data at: ')[1].strip()
return data_folder
except Exception as e:
print(f"Error: {str(e)}")
return None
def copy_lang_dict(data_folder, destination,lang):
dict_file=lang+'_dict'
en_dict_path = os.path.join(data_folder,dict_file )
try:
# Copy the en_dict folder to the destination
shutil.copyfile(en_dict_path, os.path.join(destination, dict_file))
print(f"Successfully copied {dict_file} to {destination}")
except Exception as e:
print(f"Error copying {dict_file}: {str(e)}")
import requests
def download_files(urls, folder):
os.makedirs(folder, exist_ok=True) # Create folder if it doesn't exist
for url in urls:
filename = os.path.join(folder, url.split('/')[-1]) # Get file name from URL
response = requests.get(url)
if response.status_code == 200:
with open(filename, 'wb') as f:
f.write(response.content)
print(f'Downloaded: {filename} ✅')
else:
print(f'Failed to download: {url} ❌')
urls = [
'https://github.com/espeak-ng/espeak-ng/raw/refs/heads/master/dictsource/fa_rules',
'https://github.com/espeak-ng/espeak-ng/raw/refs/heads/master/dictsource/fa_list',
'https://github.com/espeak-ng/espeak-ng/raw/refs/heads/master/dictsource/fa_extra'
]
download_files(urls, os.path.join(static_folder,"dictsource"))
def compile_espeak():
compile_command = [espeak_path, "--compile=fa"]
cwd_path = os.path.join(static_folder,"dictsource")
os.makedirs(cwd_path, exist_ok=True)
subprocess.run(compile_command, cwd=cwd_path)
print("Compilation done!")
import gzip
phonetc_data_file_path = os.path.join(static_folder, 'fa_extra.txt.gz')
generations_file_path = os.path.join(static_folder, 'generations.txt.gz')
import time
import datetime
def get_iran_time_no_packages():
iran_offset_seconds = 3.5 * 3600 # 3.5 hours * seconds/hour
utc_time = time.gmtime()
iran_time_tuple = time.localtime(time.mktime(utc_time) + iran_offset_seconds)
iran_datetime = datetime.datetime(*iran_time_tuple[:6]) # Create datetime object
formatted_time = iran_datetime.strftime("%Y-%m-%d %H:%M:%S")
return formatted_time
with gzip.open(phonetc_data_file_path, 'at',encoding='utf-8') as f: # 'at' for text mode
f.write(f'// started at : {get_iran_time_no_packages()}\n')
import soundfile as sf
from app_utils import tts_interface , models
def tts(input_data):
text = input_data.get('text')
voice = input_data.get('voice')
input_file_path = os.path.join(script_path, "input_text.txt") # Path for the input text file
audio_file = os.path.join(static_folder, f'output_{uuid.uuid4()}.wav') # Path for the output audio file
with gzip.open(generations_file_path, 'at',encoding='utf-8') as f:
f.write(f'{voice}\t{text.strip()}\n')
# Write the input text to a file
with open(input_file_path, 'w', encoding='utf-8') as file:
file.write(text)
# Phonemize the text
phonemes_command = [espeak_path, "-v", 'fa', "-x", "-q", "-f", input_file_path]
phonemes_result = subprocess.run(phonemes_command, capture_output=True, text=True)
phonemized_text = phonemes_result.stdout.strip()
# Generate audio file
(sample_rate, audio_data), final_status = tts_interface(voice, text, '')
sf.write(
audio_file,
audio_data,
samplerate=sample_rate,
subtype="PCM_16",
)
for model in models:
if voice==model[2]:
model_url = model[3]
status = f'مدل : {model_url}\nآوانگاشت: {phonemized_text}'
return {"audio": f"/static/{os.path.basename(audio_file)}", # Return the static path
"status": status
}
def phonemize(input_data):
text = input_data.get('word')
voice = 'fa'
task = input_data.get('task')
phonetic = input_data.get('phonetic',"").strip()
input_file_path = os.path.join(script_path, "input_text.txt") # Path for the input text file
audio_file = os.path.join(static_folder, f'output_{uuid.uuid4()}.wav') # Path for the output audio file
if phonetic=='' or task=='phonemize' :
phonetic=text
else:
phonetic=f'[[{phonetic.strip()}]]'
# Write the input text to a file
with open(input_file_path, 'w', encoding='utf-8') as file:
file.write(phonetic)
# Phonemize the text
phonemes_command = [espeak_path, "-v", voice, "-x", "-q", "-f", input_file_path]
phonemes_result = subprocess.run(phonemes_command, capture_output=True, text=True)
phonemized_text = phonemes_result.stdout.strip()
# Generate audio file
audio_cmd = f'"{espeak_path}" -v {voice} -w "{audio_file}" -f "{input_file_path}"'
subprocess.run(audio_cmd, shell=True)
status = f'متن : {text}\nآوانگاشت: {phonemized_text}'
if task=='send' and input_data.get('phonetic',False):
with gzip.open(phonetc_data_file_path, 'at',encoding='utf-8') as f: # 'at' for text mode
f.write(f'{text.strip()}\t{input_data.get("phonetic",False).strip()}\n')
return {"audio": f"/static/{os.path.basename(audio_file)}",
"text": text,
"phonemes":phonemized_text,
"status": status
}
def listWords(input_data):
data = []
with gzip.open(phonetc_data_file_path, 'rt',encoding='utf-8') as f: # 'rt' for text mode
for line in f:
if not line.startswith('//'): # Skip commented lines
parts = line.strip().split('\t')
if len(parts) == 2:
data.append({'word': parts[0].strip(), 'phonetic': parts[1].strip()})
return data
def example_function(input_data):
return input_data
input_options = list([i[2] for i in models])
input_components = [
Component("radio", "voice", options=input_options, value=input_options[7])
]
output_components = [
Component("output", "phonemized_output"),
Component("audio", "audio_output")
]
ifaces = {
'tts' : GradioInterface(
fn=tts,
input_components=input_components,
output_components=output_components,
max_parallel_tasks=5
)
,
'phonemize' : GradioInterface(
fn=phonemize,
input_components=input_components,
output_components=output_components,
max_parallel_tasks=5
)
,
'words' : GradioInterface(
fn=listWords,
input_components=input_components,
output_components=output_components,
max_parallel_tasks=5
)
,
}
@app.route("/", methods=["GET"])
def index():
return render_template("index.html", components=ifaces['tts'].render_components())
@app.route("/submit", methods=["POST"])
def submit():
tab = request.form.get("tab")
task_id = ifaces[tab].submit(request.form)
if task_id:
position = ifaces[tab].get_queue_position()
return jsonify({"task_id": task_id, "position": position})
else:
return jsonify({"error": "Task queue is full"}), 429
@app.route("/result//", methods=["GET"])
def result(task_id,tab):
result_data = ifaces[tab].get_result(task_id)
if result_data["status"] == "completed":
return jsonify(result_data)
return jsonify({"status": result_data["status"]})
@app.route("/queue", methods=["GET"])
def queue():
return jsonify({"queue": list(iface.task_queue)})
@app.route("/stop/", methods=["POST"])
def stop(task_id):
stopped = iface.stop_task(task_id)
if stopped:
return jsonify({"status": "stopped"})
return jsonify({"status": "not found"}), 404
@app.route('/static/')
def send_static(filename):
return send_from_directory(static_folder, filename)
@app.route('/download/fa_extra')
def download():
return send_file(phonetc_data_file_path, as_attachment=True)
@app.route('/download/fa_dict')
def download1():
dict_file='fa_dict'
compile_espeak()
return send_file(os.path.join(find_espeak_ng_data_folder(),dict_file ), as_attachment=True)
if __name__ == "__main__":
app.run(host='0.0.0.0', port= 7860)