import concurrent.futures as cf import glob import io import os import time from pathlib import Path from tempfile import NamedTemporaryFile from typing import List, Literal import gradio as gr from loguru import logger from openai import OpenAI from promptic import llm from pydantic import BaseModel, ValidationError from pypdf import PdfReader from tenacity import retry, retry_if_exception_type import locale import re import requests from dotenv import load_dotenv from gradio.routes import mount_gradio_app from fastapi import FastAPI import gettext from gradio.themes.utils.theme_dropdown import create_theme_dropdown import litellm litellm.set_verbose=True def set_locale(locale_name): try: # Try to set the desired locale locale.setlocale(locale.LC_ALL, locale_name) print(f"Locale set to: {locale_name}") except locale.Error: # Fallback to the default system locale locale.setlocale(locale.LC_ALL, '') print("Unsupported locale, falling back to default locale") # Setup gettext def setup_translation(lang_code): set_locale('en_US.UTF-8') locale_path = os.path.join(os.path.dirname(__file__), 'locales') try: translation = gettext.translation('messages', localedir=locale_path, languages=[lang_code]) translation.install() return translation.gettext # Return the translation function '_' except FileNotFoundError: logger.error(f"Translation file for language '{lang_code}' not found in {locale_path}") return lambda s: s # Fallback to no translation except UnicodeDecodeError as e: logger.error(f"UnicodeDecodeError: {e}") return lambda s: s # Fallback to no translation def read_readme(): readme_path = Path("README.md") if readme_path.exists(): with open(readme_path, "r") as file: content = file.read() # Use regex to remove metadata enclosed in -- ... -- content = re.sub(r'--.*?--', '', content, flags=re.DOTALL) return content else: return "README.md not found. Please check the repository for more information." # Initialize _ with a default function _ = setup_translation('en') def update_language(lang): # get user language if lang == 'fr': _ = setup_translation('fr') elif lang == 'en': _ = setup_translation('en') # else: # _ = lambda s: s # Default fallback, no translation update_language(lang='en') # Define a data structure for instruction templates class InstructionTemplate(BaseModel): intro: str text_instructions: str scratch_pad: str prelude: str dialog: str # Define the instruction templates INSTRUCTION_TEMPLATES = { "podcast": InstructionTemplate( intro= _("podcast.intro"), text_instructions=_("podcast.text_instructions"), scratch_pad=_("podcast.scratch_pad"), prelude=_("podcast.prelude"), dialog=_("podcast.dialog"), ), "SciAgents material discovery summary": InstructionTemplate( intro=_("discovery.intro"), text_instructions=_("discovery.text_instructions"), scratch_pad=_("discovery.scratch_pad"), prelude=_("discovery.prelude"), dialog=_("discovery.dialog"), ), "lecture": InstructionTemplate( intro=_("lecture.intro"), text_instructions=_("lecture.text_instructions"), scratch_pad=_("lecture.scratch_pad"), prelude=_("lecture.prelude"), dialog=_("lecture.dialog"), ), "summary": InstructionTemplate( intro=_("summary.intro"), text_instructions=_("summary.text_instructions"), scratch_pad=_("summary.scratch_pad"), prelude=_("summary.prelude"), dialog=_("summary.dialog"), ), "short summary": InstructionTemplate( intro=_("shortsummary.intro"), text_instructions=_("shortsummary.text_instructions"), scratch_pad=_("shortsummary.scratch_pad"), prelude=_("shortsummary.prelude"), dialog=_("shortsummary.dialog"), ), } def update_instructions_language(lang): update_language(lang) INSTRUCTION_TEMPLATES["podcast"] = InstructionTemplate( intro=_("podcast.intro"), text_instructions=_("podcast.text_instructions"), scratch_pad=_("podcast.scratch_pad"), prelude=_("podcast.prelude"), dialog=_("podcast.dialog"), ) return update_instructions("podcast") def update_instructions(template): selected_template = INSTRUCTION_TEMPLATES[template] return ( selected_template.intro, selected_template.text_instructions, selected_template.scratch_pad, selected_template.prelude, selected_template.dialog ) # Define standard values STANDARD_TEXT_MODELS = [ "o1-preview-2024-09-12", "o1-preview", "gpt-4o-2024-08-06", "gpt-4o", "gpt-4o-mini-2024-07-18", "gpt-4o-mini", "o1-mini-2024-09-12", "o1-mini", "chatgpt-4o-latest", "gpt-4-turbo", "openai/custom_model", ] STANDARD_AUDIO_MODELS = [ "tts-1", "tts-1-hd", ] STANDARD_VOICES = [ "alloy", "echo", "fable", "onyx", "nova", "shimmer", ] class DialogueItem(BaseModel): text: str speaker: Literal["speaker-1", "speaker-2"] class Dialogue(BaseModel): scratchpad: str dialogue: List[DialogueItem] def get_mp3(text: str, voice: str, audio_model: str, api_key: str = None) -> bytes: client = OpenAI( api_key=api_key or os.getenv("OPENAI_API_KEY"), ) with client.audio.speech.with_streaming_response.create( model=audio_model, voice=voice, input=text, ) as response: with io.BytesIO() as file: for chunk in response.iter_bytes(): file.write(chunk) return file.getvalue() from functools import wraps def conditional_llm(model, api_base=None, api_key=None): """ Conditionally apply the @llm decorator based on the api_base parameter. If api_base is provided, it applies the @llm decorator with api_base. Otherwise, it applies the @llm decorator without api_base. """ def decorator(func): if api_base: return llm(model=model, api_base=api_base)(func) else: return llm(model=model, api_key=api_key)(func) return decorator def get_text_from_url(url: str) -> str: """Fetch text content from a given URL.""" try: response = requests.get('https://r.jina.ai/' + url) response.raise_for_status() return response.text except requests.RequestException as e: raise gr.Error(f"Error fetching content from URL: {str(e)}") def generate_audio( url: str, openai_api_key: str = None, text_model: str = "gpt-4o-mini-2024-07-18", audio_model: str = "tts-1", speaker_1_voice: str = "alloy", speaker_2_voice: str = "echo", api_base: str = None, intro_instructions: str = None, text_instructions: str = None , scratch_pad_instructions: str = None , prelude_dialog: str = None, podcast_dialog_instructions: str = None, edited_transcript: str = None, user_feedback: str = None, original_text: str = None, debug = False, # template_dropdown : str = "", = original text ? use_default_template : bool = False, ) -> tuple: if not url: return None, None, None, "Please provide a valid URL before generating audio." if use_default_template: intro_instructions = INSTRUCTION_TEMPLATES[original_text]["intro"] text_instructions = INSTRUCTION_TEMPLATES[original_text]["text_instructions"] scratch_pad_instructions = INSTRUCTION_TEMPLATES[original_text]["scratch_pad"] prelude_dialog = INSTRUCTION_TEMPLATES[original_text]["prelude"] podcast_dialog_instructions = INSTRUCTION_TEMPLATES[original_text]["dialog"] try: # Validate API Key if not os.getenv("OPENAI_API_KEY") and not openai_api_key: raise gr.Error("OpenAI API key is required") # combined_text = original_text or "" # # If there's no original text, fetch it from the provided URL # if not combined_text: combined_text = get_text_from_url(url) # Configure the LLM based on selected model and api_base @retry(retry=retry_if_exception_type(ValidationError)) @conditional_llm(model=text_model, api_base=api_base, api_key=openai_api_key) def generate_dialogue(text: str, intro_instructions: str, text_instructions: str, scratch_pad_instructions: str, prelude_dialog: str, podcast_dialog_instructions: str, edited_transcript: str = None, user_feedback: str = None, ) -> Dialogue: """ {intro_instructions} Here is the original input text: {text} {text_instructions} {scratch_pad_instructions} {prelude_dialog} {podcast_dialog_instructions} {edited_transcript}{user_feedback} """ instruction_improve='Based on the original text, please generate an improved version of the dialogue by incorporating the edits, comments and feedback.' edited_transcript_processed="\nPreviously generated edited transcript, with specific edits and comments that I want you to carefully address:\n"+"\n"+edited_transcript+"" if edited_transcript !="" else "" user_feedback_processed="\nOverall user feedback:\n\n"+user_feedback if user_feedback !="" else "" if edited_transcript_processed.strip()!='' or user_feedback_processed.strip()!='': user_feedback_processed=""+user_feedback_processed+"\n\n"+instruction_improve+"" if debug: logger.info (edited_transcript_processed) logger.info (user_feedback_processed) logger.info (combined_text) # Generate the dialogue using the LLM llm_output = generate_dialogue( combined_text, intro_instructions=intro_instructions, text_instructions=text_instructions, scratch_pad_instructions=scratch_pad_instructions, prelude_dialog=prelude_dialog, podcast_dialog_instructions=podcast_dialog_instructions, edited_transcript=edited_transcript_processed, user_feedback=user_feedback_processed ) # Generate audio from the transcript audio = b"" transcript = "" characters = 0 with cf.ThreadPoolExecutor() as executor: futures = [] for line in llm_output.dialogue: transcript_line = f"{line.speaker}: {line.text}" voice = speaker_1_voice if line.speaker == "speaker-1" else speaker_2_voice future = executor.submit(get_mp3, line.text, voice, audio_model, openai_api_key) futures.append((future, transcript_line)) characters += len(line.text) for future, transcript_line in futures: audio_chunk = future.result() audio += audio_chunk transcript += transcript_line + "\n\n" logger.info(f"Generated {characters} characters of audio") temporary_directory = "./gradio_cached_examples/tmp/" os.makedirs(temporary_directory, exist_ok=True) # Use a temporary file -- Gradio's audio component doesn't work with raw bytes in Safari temporary_file = NamedTemporaryFile( dir=temporary_directory, delete=False, suffix=".mp3", ) temporary_file.write(audio) temporary_file.close() # Delete any files in the temp directory that end with .mp3 and are over a day old for file in glob.glob(f"{temporary_directory}*.mp3"): if os.path.isfile(file) and time.time() - os.path.getmtime(file) > 24 * 60 * 60: os.remove(file) # audio_file, transcript, original_text = generate_audio(*args) # return audio_file, transcript, original_text, None # Return None as the error when successful return temporary_file.name, transcript, combined_text, None except Exception as e: # If an error occurs during generation, return None for the outputs and the error message return None, None, None, str(e) # def validate_and_generate_audio(*args): # url = args[0] # if not url: # return None, None, None, "Please provide a valid URL before generating audio." # try: # audio_file, transcript, original_text = generate_audio(*args) # return audio_file, transcript, original_text, None # Return None as the error when successful # except Exception as e: # # If an error occurs during generation, return None for the outputs and the error message # return None, None, None, str(e) # def edit_and_regenerate(edited_transcript, user_feedback, *args): # # Replace the original transcript and feedback in the args with the new ones # #new_args = list(args) # #new_args[-2] = edited_transcript # Update edited transcript # #new_args[-1] = user_feedback # Update user feedback # return validate_and_generate_audio(*new_args) # New function to handle user feedback and regeneration def process_feedback_and_regenerate(feedback, *args): # Add user feedback to the args new_args = list(args) new_args.append(feedback) # Add user feedback as a new argument return generate_audio(*new_args) with gr.Blocks(theme='lone17/kotaemon', title="Text to Audio") as demo: with gr.Row(equal_height=True): with gr.Column(scale=10): gr.Markdown( """ # Convert Text into an audio podcast, lecture, summary and others First, provide a URL with the text content, select options, then push Generate Audio. You can also select a variety of custom options and direct the way the result is generated. """ ) with gr.Column(scale=3): with gr.Group(): #dropdown.render() toggle_dark = gr.Button(value="Toggle Dark") #dropdown.change(None, dropdown, None, js=js) toggle_dark.click( None, js=""" () => { document.body.classList.toggle('dark'); } """, ) #add language selection, trigger a reload lang = gr.Dropdown( label="Language", choices=["en", "fr"], value="en", info="Select the language for the interface.", ) submit_btn = gr.Button("Generate Audio") with gr.Row(): with gr.Column(scale=2): url_input = gr.Textbox( label="URL", placeholder="Enter the URL of the text content", info="Provide the URL of the webpage containing the text you want to convert to audio.", ) openai_api_key = gr.Textbox( label="OpenAI API Key", visible=True, # Always show the API key field placeholder="Enter your OpenAI API Key here...", type="password" # Hide the API key input ) text_model = gr.Dropdown( label="Text Generation Model", choices=STANDARD_TEXT_MODELS, value="gpt-4o-mini", #"gpt-4o-mini", info="Select the model to generate the dialogue text.", ) audio_model = gr.Dropdown( label="Audio Generation Model", choices=STANDARD_AUDIO_MODELS, value="tts-1", info="Select the model to generate the audio.", ) speaker_1_voice = gr.Dropdown( label="Speaker 1 Voice", choices=STANDARD_VOICES, value="alloy", info="Select the voice for Speaker 1.", ) speaker_2_voice = gr.Dropdown( label="Speaker 2 Voice", choices=STANDARD_VOICES, value="echo", info="Select the voice for Speaker 2.", ) api_base = gr.Textbox( label="Custom API Base", placeholder="Enter custom API base URL if using a custom/local model...", info="If you are using a custom or local model, provide the API base URL here, e.g.: http://localhost:8080/v1 for llama.cpp REST server.", ) with gr.Column(scale=3): template_dropdown = gr.Dropdown( label="Instruction Template", choices=list(INSTRUCTION_TEMPLATES.keys()), value="podcast", info="Select the instruction template to use. You can also edit any of the fields for more tailored results.", ) default_template_checkbox = gr.Checkbox(label="skip all template customization") selected_template = INSTRUCTION_TEMPLATES["podcast"] intro_instructions = gr.Textbox( label="Intro Instructions", lines=10, value=selected_template.intro, info="Provide the introductory instructions for generating the dialogue.", ) text_instructions = gr.Textbox( label="Standard Text Analysis Instructions", lines=10, placeholder="Enter text analysis instructions...", value=selected_template.text_instructions, info="Provide the instructions for analyzing the raw data and text.", ) scratch_pad_instructions = gr.Textbox( label="Scratch Pad Instructions", lines=15, value=selected_template.scratch_pad, info="Provide the scratch pad instructions for brainstorming presentation/dialogue content.", ) prelude_dialog = gr.Textbox( label="Prelude Dialog", lines=5, value=selected_template.prelude, info="Provide the prelude instructions before the presentation/dialogue is developed.", ) podcast_dialog_instructions = gr.Textbox( label="Podcast Dialog Instructions", lines=20, value=selected_template.dialog, info="Provide the instructions for generating the presentation or podcast dialogue.", ) # @gr.render(inputs=default_template_checkbox) # def show_customization(checkbox ): # if not checkbox: # gr.Markdown("## No customization") # else: # gr.Markdown("## customization") # intro_instructions.visible = checkbox.value # text_instructions.visible = checkbox.value # scratch_pad_instructions.visible = checkbox.value # prelude_dialog.visible = checkbox.value # podcast_dialog_instructions.visible = checkbox.value # # set those dialog to not visible audio_output = gr.Audio(label="Audio", format="mp3", interactive=False, autoplay=False) transcript_output = gr.Textbox(label="Transcript", lines=20, show_copy_button=True) original_text_output = gr.Textbox(label="Original Text", lines=10, visible=False) error_output = gr.Textbox(visible=False) # Hidden textbox to store error message use_edited_transcript = gr.Checkbox(label="Use Edited Transcript (check if you want to make edits to the initially generated transcript)", value=False) edited_transcript = gr.Textbox(label="Edit Transcript Here. E.g., mark edits in the text with clear instructions. E.g., '[ADD DEFINITION OF MATERIOMICS]'.", lines=20, visible=False, show_copy_button=True, interactive=False) user_feedback = gr.Textbox(label="Provide Feedback or Notes", lines=10, #placeholder="Enter your feedback or notes here..." ) regenerate_btn = gr.Button("Regenerate Audio with Edits and Feedback") # Function to update the interactive state of edited_transcript def update_edit_box(checkbox_value): return gr.update(interactive=checkbox_value, lines=20 if checkbox_value else 20, visible=True if checkbox_value else False) # Update the interactive state of edited_transcript when the checkbox is toggled use_edited_transcript.change( fn=update_edit_box, inputs=[use_edited_transcript], outputs=[edited_transcript] ) # Update instruction fields when template is changed template_dropdown.change( fn=update_instructions, inputs=[template_dropdown], outputs=[intro_instructions, text_instructions, scratch_pad_instructions, prelude_dialog, podcast_dialog_instructions] ) lang.change(fn=update_instructions_language, inputs=[lang], outputs=[intro_instructions, text_instructions, scratch_pad_instructions, prelude_dialog, podcast_dialog_instructions] ) submit_btn.click( fn=generate_audio, inputs=[ # url_input, openai_api_key, text_model, audio_model, # speaker_1_voice, speaker_2_voice, api_base, # None,None,None,None,None, # edited_transcript, # user_feedback,template_dropdown,default_template_checkbox # if default_template_checkbox else url_input, openai_api_key, text_model, audio_model, speaker_1_voice, speaker_2_voice, api_base, intro_instructions, text_instructions, scratch_pad_instructions, prelude_dialog, podcast_dialog_instructions, edited_transcript, user_feedback,template_dropdown,default_template_checkbox ], outputs=[audio_output, transcript_output, original_text_output, error_output] ).then( fn=lambda audio, transcript, original_text, error: ( transcript if transcript else "", error if error else None ), inputs=[audio_output, transcript_output, original_text_output, error_output], outputs=[edited_transcript, error_output] ).then( fn=lambda error: gr.Warning(error) if error else None, inputs=[error_output], outputs=[] ) regenerate_btn.click( fn=lambda use_edit, edit, *args: generate_audio( *args[:12], # All inputs up to podcast_dialog_instructions edit if use_edit else "", # Use edited transcript if checkbox is checked, otherwise empty string *args[12:] # user_feedback and original_text_output ), inputs=[ use_edited_transcript, edited_transcript, url_input, openai_api_key, text_model, audio_model, speaker_1_voice, speaker_2_voice, api_base, intro_instructions, text_instructions, scratch_pad_instructions, prelude_dialog, podcast_dialog_instructions, user_feedback, original_text_output ], outputs=[audio_output, transcript_output, original_text_output, error_output] ).then( fn=lambda audio, transcript, original_text, error: ( transcript if transcript else "", error if error else None ), inputs=[audio_output, transcript_output, original_text_output, error_output], outputs=[edited_transcript, error_output] ).then( fn=lambda error: gr.Warning(error) if error else None, inputs=[error_output], outputs=[] ) # Add README content at the bottom gr.Markdown("---") # Horizontal line to separate the interface from README gr.Markdown(read_readme()) # Enable queueing for better performance demo.queue(max_size=20, default_concurrency_limit=32) import subprocess def execute_command(cmd): try: # Use subprocess to run the command result = subprocess.run(cmd, shell=True, capture_output=True, text=True) # Return the stdout and stderr from the command execution return result.stdout + result.stderr except Exception as e: return str(e) # Launch the Gradio app if __name__ == "__main__": logger.info(execute_command('msgfmt locales/fr/LC_MESSAGES/messages.po -o locales/fr/LC_MESSAGES/messages.mo')) logger.info(execute_command('msgfmt locales/en/LC_MESSAGES/messages.po -o locales/en/LC_MESSAGES/messages.mo')) load_dotenv() # This line brings all environment variables from .env into os.environ app, local_url, share_url = demo.launch(share=False)