from flask import Flask, render_template, request, jsonify, send_from_directory , send_file from flask_cors import CORS import time import uuid import threading from collections import deque import subprocess import os app = Flask(__name__) CORS(app) # Set the paths for espeak-ng and the script directory script_path = os.path.dirname(os.path.abspath(__file__)) espeak_path = "espeak-ng" # Change this to the full path if needed static_folder = os.path.join(script_path, "static") # Ensure the static folder exists os.makedirs(static_folder, exist_ok=True) class Component: def __init__(self, component_type, label, options=None, value=None): self.component_type = component_type self.label = label self.options = options self.value = value def render(self): if self.component_type == "textbox": return f'' elif self.component_type == "radio": options_html = ''.join([f'''
''' for index,option in enumerate(self.options)]) return options_html elif self.component_type == "audio": return f'' elif self.component_type == "output": return f'
Placeholder for {self.label}
' elif self.component_type == "better_textbox": return f'
{self.value}
' else: return "" class Row: def __init__(self, components): self.components = components def render(self): components_html = ''.join([component.render() for component in self.components]) return f'
{components_html}
' class Column: def __init__(self, components): self.components = components def render(self): components_html = ''.join([component.render() for component in self.components]) return f'
{components_html}
' class GradioInterface: def __init__(self, fn, input_components, output_components, max_parallel_tasks=5): self.fn = fn self.input_components = input_components self.output_components = output_components self.max_parallel_tasks = max_parallel_tasks self.task_queue = deque() self.results = {} self.current_tasks = {} self.task_threads = {} self.stop_flags = {} def submit(self, input_data): task_id = str(uuid.uuid4()) if len(self.current_tasks) < self.max_parallel_tasks: self.task_queue.append(task_id) self.current_tasks[task_id] = input_data stop_flag = threading.Event() self.stop_flags[task_id] = stop_flag thread = threading.Thread(target=self.process_task, args=(task_id, input_data, stop_flag)) self.task_threads[task_id] = thread thread.start() return task_id else: return None def process_task(self, task_id, input_data, stop_flag): result = self.fn(input_data) self.results[task_id] = result self.task_queue.remove(task_id) del self.current_tasks[task_id] del self.task_threads[task_id] del self.stop_flags[task_id] def get_result(self, task_id): if task_id in self.results: return {"status": "completed", "result": self.results[task_id]} return {"status": "pending"} def render_components(self): input_html = ''.join([component.render() for component in self.input_components]) output_html = ''.join([component.render() for component in self.output_components]) return input_html #f"{input_html}{output_html}" def get_queue_position(self): return len(self.task_queue) def stop_task(self, task_id): if task_id in self.stop_flags: self.stop_flags[task_id].set() return True return False import os import shutil import subprocess def find_espeak_ng_data_folder(): try: # https://github.com/espeak-ng/espeak-ng/blob/b006f6d4f997fbfe4016cf29767743e6b397d0fb/src/espeak-ng.c#L309 data_folder = subprocess.check_output([espeak_path, '--version']).decode().split('Data at: ')[1].strip() return data_folder except Exception as e: print(f"Error: {str(e)}") return None def copy_lang_dict(data_folder, destination,lang): dict_file=lang+'_dict' en_dict_path = os.path.join(data_folder,dict_file ) try: # Copy the en_dict folder to the destination shutil.copyfile(en_dict_path, os.path.join(destination, dict_file)) print(f"Successfully copied {dict_file} to {destination}") except Exception as e: print(f"Error copying {dict_file}: {str(e)}") import requests def download_files(urls, folder): os.makedirs(folder, exist_ok=True) # Create folder if it doesn't exist for url in urls: filename = os.path.join(folder, url.split('/')[-1]) # Get file name from URL response = requests.get(url) if response.status_code == 200: with open(filename, 'wb') as f: f.write(response.content) print(f'Downloaded: {filename} ✅') else: print(f'Failed to download: {url} ❌') urls = [ 'https://github.com/espeak-ng/espeak-ng/raw/refs/heads/master/dictsource/fa_rules', 'https://github.com/espeak-ng/espeak-ng/raw/refs/heads/master/dictsource/fa_list', 'https://github.com/espeak-ng/espeak-ng/raw/refs/heads/master/dictsource/fa_extra' ] download_files(urls, os.path.join(static_folder,"dictsource")) def compile_espeak(): compile_command = [espeak_path, "--compile=fa"] cwd_path = os.path.join(static_folder,"dictsource") os.makedirs(cwd_path, exist_ok=True) subprocess.run(compile_command, cwd=cwd_path) print("Compilation done!") import gzip phonetc_data_file_path = os.path.join(static_folder, 'fa_extra.txt.gz') generations_file_path = os.path.join(static_folder, 'generations.txt.gz') import time import datetime def get_iran_time_no_packages(): iran_offset_seconds = 3.5 * 3600 # 3.5 hours * seconds/hour utc_time = time.gmtime() iran_time_tuple = time.localtime(time.mktime(utc_time) + iran_offset_seconds) iran_datetime = datetime.datetime(*iran_time_tuple[:6]) # Create datetime object formatted_time = iran_datetime.strftime("%Y-%m-%d %H:%M:%S") return formatted_time with gzip.open(phonetc_data_file_path, 'at',encoding='utf-8') as f: # 'at' for text mode f.write(f'// started at : {get_iran_time_no_packages()}\n') import soundfile as sf from app_utils import tts_interface , models def tts(input_data): text = input_data.get('text') voice = input_data.get('voice') input_file_path = os.path.join(script_path, "input_text.txt") # Path for the input text file audio_file = os.path.join(static_folder, f'output_{uuid.uuid4()}.wav') # Path for the output audio file with gzip.open(generations_file_path, 'at',encoding='utf-8') as f: f.write(f'{voice}\t{text.strip()}\n') # Write the input text to a file with open(input_file_path, 'w', encoding='utf-8') as file: file.write(text) # Phonemize the text phonemes_command = [espeak_path, "-v", 'fa', "-x", "-q", "-f", input_file_path] phonemes_result = subprocess.run(phonemes_command, capture_output=True, text=True) phonemized_text = phonemes_result.stdout.strip() # Generate audio file (sample_rate, audio_data), final_status = tts_interface(voice, text, '') sf.write( audio_file, audio_data, samplerate=sample_rate, subtype="PCM_16", ) for model in models: if voice==model[2]: model_url = model[3] status = f'مدل : {model_url}\nآوانگاشت: {phonemized_text}' return {"audio": f"/static/{os.path.basename(audio_file)}", # Return the static path "status": status } def phonemize(input_data): text = input_data.get('word') voice = 'fa' task = input_data.get('task') phonetic = input_data.get('phonetic',"").strip() input_file_path = os.path.join(script_path, "input_text.txt") # Path for the input text file audio_file = os.path.join(static_folder, f'output_{uuid.uuid4()}.wav') # Path for the output audio file if phonetic=='' or task=='phonemize' : phonetic=text else: phonetic=f'[[{phonetic.strip()}]]' # Write the input text to a file with open(input_file_path, 'w', encoding='utf-8') as file: file.write(phonetic) # Phonemize the text phonemes_command = [espeak_path, "-v", voice, "-x", "-q", "-f", input_file_path] phonemes_result = subprocess.run(phonemes_command, capture_output=True, text=True) phonemized_text = phonemes_result.stdout.strip() # Generate audio file audio_cmd = f'"{espeak_path}" -v {voice} -w "{audio_file}" -f "{input_file_path}"' subprocess.run(audio_cmd, shell=True) status = f'متن : {text}\nآوانگاشت: {phonemized_text}' if task=='send' and input_data.get('phonetic',False): with gzip.open(phonetc_data_file_path, 'at',encoding='utf-8') as f: # 'at' for text mode f.write(f'{text.strip()}\t{input_data.get("phonetic",False).strip()}\n') return {"audio": f"/static/{os.path.basename(audio_file)}", "text": text, "phonemes":phonemized_text, "status": status } def listWords(input_data): data = [] with gzip.open(phonetc_data_file_path, 'rt',encoding='utf-8') as f: # 'rt' for text mode for line in f: if not line.startswith('//'): # Skip commented lines parts = line.strip().split('\t') if len(parts) == 2: data.append({'word': parts[0].strip(), 'phonetic': parts[1].strip()}) return data def example_function(input_data): return input_data input_options = list([i[2] for i in models]) input_components = [ Component("radio", "voice", options=input_options, value=input_options[7]) ] output_components = [ Component("output", "phonemized_output"), Component("audio", "audio_output") ] ifaces = { 'tts' : GradioInterface( fn=tts, input_components=input_components, output_components=output_components, max_parallel_tasks=5 ) , 'phonemize' : GradioInterface( fn=phonemize, input_components=input_components, output_components=output_components, max_parallel_tasks=5 ) , 'words' : GradioInterface( fn=listWords, input_components=input_components, output_components=output_components, max_parallel_tasks=5 ) , } @app.route("/", methods=["GET"]) def index(): return render_template("index.html", components=ifaces['tts'].render_components()) @app.route("/submit", methods=["POST"]) def submit(): tab = request.form.get("tab") task_id = ifaces[tab].submit(request.form) if task_id: position = ifaces[tab].get_queue_position() return jsonify({"task_id": task_id, "position": position}) else: return jsonify({"error": "Task queue is full"}), 429 @app.route("/result//", methods=["GET"]) def result(task_id,tab): result_data = ifaces[tab].get_result(task_id) if result_data["status"] == "completed": return jsonify(result_data) return jsonify({"status": result_data["status"]}) @app.route("/queue", methods=["GET"]) def queue(): return jsonify({"queue": list(iface.task_queue)}) @app.route("/stop/", methods=["POST"]) def stop(task_id): stopped = iface.stop_task(task_id) if stopped: return jsonify({"status": "stopped"}) return jsonify({"status": "not found"}), 404 @app.route('/static/') def send_static(filename): return send_from_directory(static_folder, filename) @app.route('/download/fa_extra') def download(): return send_file(phonetc_data_file_path, as_attachment=True) @app.route('/download/fa_dict') def download1(): dict_file='fa_dict' compile_espeak() return send_file(os.path.join(find_espeak_ng_data_folder(),dict_file ), as_attachment=True) if __name__ == "__main__": app.run(host='0.0.0.0', port= 7860)