import concurrent.futures as cf
import glob
import io
import os
import time
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import List, Literal
import gradio as gr
from loguru import logger
from openai import OpenAI
from promptic import llm
from pydantic import BaseModel, ValidationError
from pypdf import PdfReader
from tenacity import retry, retry_if_exception_type
import locale
import re
import requests
from dotenv import load_dotenv
from gradio.routes import mount_gradio_app
from fastapi import FastAPI
import gettext
from gradio.themes.utils.theme_dropdown import create_theme_dropdown
import litellm
litellm.set_verbose=True
def set_locale(locale_name):
try:
# Try to set the desired locale
locale.setlocale(locale.LC_ALL, locale_name)
print(f"Locale set to: {locale_name}")
except locale.Error:
# Fallback to the default system locale
locale.setlocale(locale.LC_ALL, '')
print("Unsupported locale, falling back to default locale")
# Setup gettext
def setup_translation(lang_code):
set_locale('en_US.UTF-8')
locale_path = os.path.join(os.path.dirname(__file__), 'locales')
try:
translation = gettext.translation('messages', localedir=locale_path, languages=[lang_code])
translation.install()
return translation.gettext # Return the translation function '_'
except FileNotFoundError:
logger.error(f"Translation file for language '{lang_code}' not found in {locale_path}")
return lambda s: s # Fallback to no translation
except UnicodeDecodeError as e:
logger.error(f"UnicodeDecodeError: {e}")
return lambda s: s # Fallback to no translation
def read_readme():
readme_path = Path("README.md")
if readme_path.exists():
with open(readme_path, "r") as file:
content = file.read()
# Use regex to remove metadata enclosed in -- ... --
content = re.sub(r'--.*?--', '', content, flags=re.DOTALL)
return content
else:
return "README.md not found. Please check the repository for more information."
# Initialize _ with a default function
_ = setup_translation('en')
def update_language(lang):
# get user language
if lang == 'fr':
_ = setup_translation('fr')
elif lang == 'en':
_ = setup_translation('en')
# else:
# _ = lambda s: s # Default fallback, no translation
update_language(lang='en')
# Define a data structure for instruction templates
class InstructionTemplate(BaseModel):
intro: str
text_instructions: str
scratch_pad: str
prelude: str
dialog: str
# Define the instruction templates
INSTRUCTION_TEMPLATES = {
"podcast": InstructionTemplate(
intro= _("podcast.intro"),
text_instructions=_("podcast.text_instructions"),
scratch_pad=_("podcast.scratch_pad"),
prelude=_("podcast.prelude"),
dialog=_("podcast.dialog"),
),
"SciAgents material discovery summary": InstructionTemplate(
intro=_("discovery.intro"),
text_instructions=_("discovery.text_instructions"),
scratch_pad=_("discovery.scratch_pad"),
prelude=_("discovery.prelude"),
dialog=_("discovery.dialog"),
),
"lecture": InstructionTemplate(
intro=_("lecture.intro"),
text_instructions=_("lecture.text_instructions"),
scratch_pad=_("lecture.scratch_pad"),
prelude=_("lecture.prelude"),
dialog=_("lecture.dialog"),
),
"summary": InstructionTemplate(
intro=_("summary.intro"),
text_instructions=_("summary.text_instructions"),
scratch_pad=_("summary.scratch_pad"),
prelude=_("summary.prelude"),
dialog=_("summary.dialog"),
),
"short summary": InstructionTemplate(
intro=_("shortsummary.intro"),
text_instructions=_("shortsummary.text_instructions"),
scratch_pad=_("shortsummary.scratch_pad"),
prelude=_("shortsummary.prelude"),
dialog=_("shortsummary.dialog"),
),
}
def update_instructions_language(lang):
update_language(lang)
INSTRUCTION_TEMPLATES["podcast"] = InstructionTemplate(
intro=_("podcast.intro"),
text_instructions=_("podcast.text_instructions"),
scratch_pad=_("podcast.scratch_pad"),
prelude=_("podcast.prelude"),
dialog=_("podcast.dialog"),
)
return update_instructions("podcast")
def update_instructions(template):
selected_template = INSTRUCTION_TEMPLATES[template]
return (
selected_template.intro,
selected_template.text_instructions,
selected_template.scratch_pad,
selected_template.prelude,
selected_template.dialog
)
# Define standard values
STANDARD_TEXT_MODELS = [
"o1-preview-2024-09-12",
"o1-preview",
"gpt-4o-2024-08-06",
"gpt-4o",
"gpt-4o-mini-2024-07-18",
"gpt-4o-mini",
"o1-mini-2024-09-12",
"o1-mini",
"chatgpt-4o-latest",
"gpt-4-turbo",
"openai/custom_model",
]
STANDARD_AUDIO_MODELS = [
"tts-1",
"tts-1-hd",
]
STANDARD_VOICES = [
"alloy",
"echo",
"fable",
"onyx",
"nova",
"shimmer",
]
class DialogueItem(BaseModel):
text: str
speaker: Literal["speaker-1", "speaker-2"]
class Dialogue(BaseModel):
scratchpad: str
dialogue: List[DialogueItem]
def get_mp3(text: str, voice: str, audio_model: str, api_key: str = None) -> bytes:
client = OpenAI(
api_key=api_key or os.getenv("OPENAI_API_KEY"),
)
with client.audio.speech.with_streaming_response.create(
model=audio_model,
voice=voice,
input=text,
) as response:
with io.BytesIO() as file:
for chunk in response.iter_bytes():
file.write(chunk)
return file.getvalue()
from functools import wraps
def conditional_llm(model, api_base=None, api_key=None):
"""
Conditionally apply the @llm decorator based on the api_base parameter.
If api_base is provided, it applies the @llm decorator with api_base.
Otherwise, it applies the @llm decorator without api_base.
"""
def decorator(func):
if api_base:
return llm(model=model, api_base=api_base)(func)
else:
return llm(model=model, api_key=api_key)(func)
return decorator
def get_text_from_url(url: str) -> str:
"""Fetch text content from a given URL."""
try:
response = requests.get('https://r.jina.ai/' + url)
response.raise_for_status()
return response.text
except requests.RequestException as e:
raise gr.Error(f"Error fetching content from URL: {str(e)}")
def generate_audio(
url: str,
openai_api_key: str = None,
text_model: str = "gpt-4o-mini-2024-07-18",
audio_model: str = "tts-1",
speaker_1_voice: str = "alloy",
speaker_2_voice: str = "echo",
api_base: str = None,
intro_instructions: str = None,
text_instructions: str = None ,
scratch_pad_instructions: str = None ,
prelude_dialog: str = None,
podcast_dialog_instructions: str = None,
edited_transcript: str = None,
user_feedback: str = None,
original_text: str = None,
debug = False,
# template_dropdown : str = "", = original text ?
use_default_template : bool = False,
) -> tuple:
if not url:
return None, None, None, "Please provide a valid URL before generating audio."
if use_default_template:
intro_instructions = INSTRUCTION_TEMPLATES[original_text]["intro"]
text_instructions = INSTRUCTION_TEMPLATES[original_text]["text_instructions"]
scratch_pad_instructions = INSTRUCTION_TEMPLATES[original_text]["scratch_pad"]
prelude_dialog = INSTRUCTION_TEMPLATES[original_text]["prelude"]
podcast_dialog_instructions = INSTRUCTION_TEMPLATES[original_text]["dialog"]
try:
# Validate API Key
if not os.getenv("OPENAI_API_KEY") and not openai_api_key:
raise gr.Error("OpenAI API key is required")
# combined_text = original_text or ""
# # If there's no original text, fetch it from the provided URL
# if not combined_text:
combined_text = get_text_from_url(url)
# Configure the LLM based on selected model and api_base
@retry(retry=retry_if_exception_type(ValidationError))
@conditional_llm(model=text_model, api_base=api_base, api_key=openai_api_key)
def generate_dialogue(text: str, intro_instructions: str, text_instructions: str, scratch_pad_instructions: str,
prelude_dialog: str, podcast_dialog_instructions: str,
edited_transcript: str = None, user_feedback: str = None, ) -> Dialogue:
"""
{intro_instructions}
Here is the original input text:
{text}
{text_instructions}
{scratch_pad_instructions}
{prelude_dialog}
{podcast_dialog_instructions}
{edited_transcript}{user_feedback}
"""
instruction_improve='Based on the original text, please generate an improved version of the dialogue by incorporating the edits, comments and feedback.'
edited_transcript_processed="\nPreviously generated edited transcript, with specific edits and comments that I want you to carefully address:\n"+"\n"+edited_transcript+"" if edited_transcript !="" else ""
user_feedback_processed="\nOverall user feedback:\n\n"+user_feedback if user_feedback !="" else ""
if edited_transcript_processed.strip()!='' or user_feedback_processed.strip()!='':
user_feedback_processed=""+user_feedback_processed+"\n\n"+instruction_improve+""
if debug:
logger.info (edited_transcript_processed)
logger.info (user_feedback_processed)
logger.info (combined_text)
# Generate the dialogue using the LLM
llm_output = generate_dialogue(
combined_text,
intro_instructions=intro_instructions,
text_instructions=text_instructions,
scratch_pad_instructions=scratch_pad_instructions,
prelude_dialog=prelude_dialog,
podcast_dialog_instructions=podcast_dialog_instructions,
edited_transcript=edited_transcript_processed,
user_feedback=user_feedback_processed
)
# Generate audio from the transcript
audio = b""
transcript = ""
characters = 0
with cf.ThreadPoolExecutor() as executor:
futures = []
for line in llm_output.dialogue:
transcript_line = f"{line.speaker}: {line.text}"
voice = speaker_1_voice if line.speaker == "speaker-1" else speaker_2_voice
future = executor.submit(get_mp3, line.text, voice, audio_model, openai_api_key)
futures.append((future, transcript_line))
characters += len(line.text)
for future, transcript_line in futures:
audio_chunk = future.result()
audio += audio_chunk
transcript += transcript_line + "\n\n"
logger.info(f"Generated {characters} characters of audio")
temporary_directory = "./gradio_cached_examples/tmp/"
os.makedirs(temporary_directory, exist_ok=True)
# Use a temporary file -- Gradio's audio component doesn't work with raw bytes in Safari
temporary_file = NamedTemporaryFile(
dir=temporary_directory,
delete=False,
suffix=".mp3",
)
temporary_file.write(audio)
temporary_file.close()
# Delete any files in the temp directory that end with .mp3 and are over a day old
for file in glob.glob(f"{temporary_directory}*.mp3"):
if os.path.isfile(file) and time.time() - os.path.getmtime(file) > 24 * 60 * 60:
os.remove(file)
# audio_file, transcript, original_text = generate_audio(*args)
# return audio_file, transcript, original_text, None # Return None as the error when successful
return temporary_file.name, transcript, combined_text, None
except Exception as e:
# If an error occurs during generation, return None for the outputs and the error message
return None, None, None, str(e)
# def validate_and_generate_audio(*args):
# url = args[0]
# if not url:
# return None, None, None, "Please provide a valid URL before generating audio."
# try:
# audio_file, transcript, original_text = generate_audio(*args)
# return audio_file, transcript, original_text, None # Return None as the error when successful
# except Exception as e:
# # If an error occurs during generation, return None for the outputs and the error message
# return None, None, None, str(e)
# def edit_and_regenerate(edited_transcript, user_feedback, *args):
# # Replace the original transcript and feedback in the args with the new ones
# #new_args = list(args)
# #new_args[-2] = edited_transcript # Update edited transcript
# #new_args[-1] = user_feedback # Update user feedback
# return validate_and_generate_audio(*new_args)
# New function to handle user feedback and regeneration
def process_feedback_and_regenerate(feedback, *args):
# Add user feedback to the args
new_args = list(args)
new_args.append(feedback) # Add user feedback as a new argument
return generate_audio(*new_args)
with gr.Blocks(theme='lone17/kotaemon', title="Text to Audio") as demo:
with gr.Row(equal_height=True):
with gr.Column(scale=10):
gr.Markdown(
"""
# Convert Text into an audio podcast, lecture, summary and others
First, provide a URL with the text content, select options, then push Generate Audio.
You can also select a variety of custom options and direct the way the result is generated.
"""
)
with gr.Column(scale=3):
with gr.Group():
#dropdown.render()
toggle_dark = gr.Button(value="Toggle Dark")
#dropdown.change(None, dropdown, None, js=js)
toggle_dark.click(
None,
js="""
() => {
document.body.classList.toggle('dark');
}
""",
)
#add language selection, trigger a reload
lang = gr.Dropdown(
label="Language",
choices=["en", "fr"],
value="en",
info="Select the language for the interface.",
)
submit_btn = gr.Button("Generate Audio")
with gr.Row():
with gr.Column(scale=2):
url_input = gr.Textbox(
label="URL",
placeholder="Enter the URL of the text content",
info="Provide the URL of the webpage containing the text you want to convert to audio.",
)
openai_api_key = gr.Textbox(
label="OpenAI API Key",
visible=True, # Always show the API key field
placeholder="Enter your OpenAI API Key here...",
type="password" # Hide the API key input
)
text_model = gr.Dropdown(
label="Text Generation Model",
choices=STANDARD_TEXT_MODELS,
value="gpt-4o-mini", #"gpt-4o-mini",
info="Select the model to generate the dialogue text.",
)
audio_model = gr.Dropdown(
label="Audio Generation Model",
choices=STANDARD_AUDIO_MODELS,
value="tts-1",
info="Select the model to generate the audio.",
)
speaker_1_voice = gr.Dropdown(
label="Speaker 1 Voice",
choices=STANDARD_VOICES,
value="alloy",
info="Select the voice for Speaker 1.",
)
speaker_2_voice = gr.Dropdown(
label="Speaker 2 Voice",
choices=STANDARD_VOICES,
value="echo",
info="Select the voice for Speaker 2.",
)
api_base = gr.Textbox(
label="Custom API Base",
placeholder="Enter custom API base URL if using a custom/local model...",
info="If you are using a custom or local model, provide the API base URL here, e.g.: http://localhost:8080/v1 for llama.cpp REST server.",
)
with gr.Column(scale=3):
template_dropdown = gr.Dropdown(
label="Instruction Template",
choices=list(INSTRUCTION_TEMPLATES.keys()),
value="podcast",
info="Select the instruction template to use. You can also edit any of the fields for more tailored results.",
)
default_template_checkbox = gr.Checkbox(label="skip all template customization")
selected_template = INSTRUCTION_TEMPLATES["podcast"]
intro_instructions = gr.Textbox(
label="Intro Instructions",
lines=10,
value=selected_template.intro,
info="Provide the introductory instructions for generating the dialogue.",
)
text_instructions = gr.Textbox(
label="Standard Text Analysis Instructions",
lines=10,
placeholder="Enter text analysis instructions...",
value=selected_template.text_instructions,
info="Provide the instructions for analyzing the raw data and text.",
)
scratch_pad_instructions = gr.Textbox(
label="Scratch Pad Instructions",
lines=15,
value=selected_template.scratch_pad,
info="Provide the scratch pad instructions for brainstorming presentation/dialogue content.",
)
prelude_dialog = gr.Textbox(
label="Prelude Dialog",
lines=5,
value=selected_template.prelude,
info="Provide the prelude instructions before the presentation/dialogue is developed.",
)
podcast_dialog_instructions = gr.Textbox(
label="Podcast Dialog Instructions",
lines=20,
value=selected_template.dialog,
info="Provide the instructions for generating the presentation or podcast dialogue.",
)
# @gr.render(inputs=default_template_checkbox)
# def show_customization(checkbox ):
# if not checkbox:
# gr.Markdown("## No customization")
# else:
# gr.Markdown("## customization")
# intro_instructions.visible = checkbox.value
# text_instructions.visible = checkbox.value
# scratch_pad_instructions.visible = checkbox.value
# prelude_dialog.visible = checkbox.value
# podcast_dialog_instructions.visible = checkbox.value
# # set those dialog to not visible
audio_output = gr.Audio(label="Audio", format="mp3", interactive=False, autoplay=False)
transcript_output = gr.Textbox(label="Transcript", lines=20, show_copy_button=True)
original_text_output = gr.Textbox(label="Original Text", lines=10, visible=False)
error_output = gr.Textbox(visible=False) # Hidden textbox to store error message
use_edited_transcript = gr.Checkbox(label="Use Edited Transcript (check if you want to make edits to the initially generated transcript)", value=False)
edited_transcript = gr.Textbox(label="Edit Transcript Here. E.g., mark edits in the text with clear instructions. E.g., '[ADD DEFINITION OF MATERIOMICS]'.", lines=20, visible=False,
show_copy_button=True, interactive=False)
user_feedback = gr.Textbox(label="Provide Feedback or Notes", lines=10, #placeholder="Enter your feedback or notes here..."
)
regenerate_btn = gr.Button("Regenerate Audio with Edits and Feedback")
# Function to update the interactive state of edited_transcript
def update_edit_box(checkbox_value):
return gr.update(interactive=checkbox_value, lines=20 if checkbox_value else 20, visible=True if checkbox_value else False)
# Update the interactive state of edited_transcript when the checkbox is toggled
use_edited_transcript.change(
fn=update_edit_box,
inputs=[use_edited_transcript],
outputs=[edited_transcript]
)
# Update instruction fields when template is changed
template_dropdown.change(
fn=update_instructions,
inputs=[template_dropdown],
outputs=[intro_instructions, text_instructions, scratch_pad_instructions, prelude_dialog, podcast_dialog_instructions]
)
lang.change(fn=update_instructions_language,
inputs=[lang],
outputs=[intro_instructions, text_instructions, scratch_pad_instructions, prelude_dialog, podcast_dialog_instructions]
)
submit_btn.click(
fn=generate_audio,
inputs=[
# url_input, openai_api_key, text_model, audio_model,
# speaker_1_voice, speaker_2_voice, api_base,
# None,None,None,None,None,
# edited_transcript,
# user_feedback,template_dropdown,default_template_checkbox
# if default_template_checkbox else
url_input, openai_api_key, text_model, audio_model,
speaker_1_voice, speaker_2_voice, api_base,
intro_instructions, text_instructions, scratch_pad_instructions,
prelude_dialog, podcast_dialog_instructions,
edited_transcript,
user_feedback,template_dropdown,default_template_checkbox
],
outputs=[audio_output,
transcript_output,
original_text_output,
error_output]
).then(
fn=lambda audio, transcript, original_text, error: (
transcript if transcript else "",
error if error else None
),
inputs=[audio_output, transcript_output, original_text_output, error_output],
outputs=[edited_transcript, error_output]
).then(
fn=lambda error: gr.Warning(error) if error else None,
inputs=[error_output],
outputs=[]
)
regenerate_btn.click(
fn=lambda use_edit, edit, *args: generate_audio(
*args[:12], # All inputs up to podcast_dialog_instructions
edit if use_edit else "", # Use edited transcript if checkbox is checked, otherwise empty string
*args[12:] # user_feedback and original_text_output
),
inputs=[
use_edited_transcript, edited_transcript,
url_input, openai_api_key, text_model, audio_model,
speaker_1_voice, speaker_2_voice, api_base,
intro_instructions, text_instructions, scratch_pad_instructions,
prelude_dialog, podcast_dialog_instructions,
user_feedback, original_text_output
],
outputs=[audio_output, transcript_output, original_text_output, error_output]
).then(
fn=lambda audio, transcript, original_text, error: (
transcript if transcript else "",
error if error else None
),
inputs=[audio_output, transcript_output, original_text_output, error_output],
outputs=[edited_transcript, error_output]
).then(
fn=lambda error: gr.Warning(error) if error else None,
inputs=[error_output],
outputs=[]
)
# Add README content at the bottom
gr.Markdown("---") # Horizontal line to separate the interface from README
gr.Markdown(read_readme())
# Enable queueing for better performance
demo.queue(max_size=20, default_concurrency_limit=32)
import subprocess
def execute_command(cmd):
try:
# Use subprocess to run the command
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
# Return the stdout and stderr from the command execution
return result.stdout + result.stderr
except Exception as e:
return str(e)
# Launch the Gradio app
if __name__ == "__main__":
logger.info(execute_command('msgfmt locales/fr/LC_MESSAGES/messages.po -o locales/fr/LC_MESSAGES/messages.mo'))
logger.info(execute_command('msgfmt locales/en/LC_MESSAGES/messages.po -o locales/en/LC_MESSAGES/messages.mo'))
load_dotenv() # This line brings all environment variables from .env into os.environ
app, local_url, share_url = demo.launch(share=False)