Spaces:
Sleeping
Sleeping
import concurrent.futures as cf | |
import glob | |
import io | |
import os | |
import time | |
from pathlib import Path | |
from tempfile import NamedTemporaryFile | |
from typing import List, Literal | |
import gradio as gr | |
from loguru import logger | |
from openai import OpenAI | |
from promptic import llm | |
from pydantic import BaseModel, ValidationError | |
from pypdf import PdfReader | |
from tenacity import retry, retry_if_exception_type | |
import locale | |
import re | |
import requests | |
from dotenv import load_dotenv | |
from gradio.routes import mount_gradio_app | |
from fastapi import FastAPI | |
import gettext | |
from gradio.themes.utils.theme_dropdown import create_theme_dropdown | |
import litellm | |
litellm.set_verbose=True | |
def set_locale(locale_name): | |
try: | |
# Try to set the desired locale | |
locale.setlocale(locale.LC_ALL, locale_name) | |
print(f"Locale set to: {locale_name}") | |
except locale.Error: | |
# Fallback to the default system locale | |
locale.setlocale(locale.LC_ALL, '') | |
print("Unsupported locale, falling back to default locale") | |
# Setup gettext | |
def setup_translation(lang_code): | |
set_locale('en_US.UTF-8') | |
locale_path = os.path.join(os.path.dirname(__file__), 'locales') | |
try: | |
translation = gettext.translation('messages', localedir=locale_path, languages=[lang_code]) | |
translation.install() | |
return translation.gettext # Return the translation function '_' | |
except FileNotFoundError: | |
logger.error(f"Translation file for language '{lang_code}' not found in {locale_path}") | |
return lambda s: s # Fallback to no translation | |
except UnicodeDecodeError as e: | |
logger.error(f"UnicodeDecodeError: {e}") | |
return lambda s: s # Fallback to no translation | |
def read_readme(): | |
readme_path = Path("README.md") | |
if readme_path.exists(): | |
with open(readme_path, "r") as file: | |
content = file.read() | |
# Use regex to remove metadata enclosed in -- ... -- | |
content = re.sub(r'--.*?--', '', content, flags=re.DOTALL) | |
return content | |
else: | |
return "README.md not found. Please check the repository for more information." | |
# Initialize _ with a default function | |
_ = setup_translation('en') | |
def update_language(lang): | |
# get user language | |
if lang == 'fr': | |
_ = setup_translation('fr') | |
elif lang == 'en': | |
_ = setup_translation('en') | |
# else: | |
# _ = lambda s: s # Default fallback, no translation | |
update_language(lang='en') | |
# Define a data structure for instruction templates | |
class InstructionTemplate(BaseModel): | |
intro: str | |
text_instructions: str | |
scratch_pad: str | |
prelude: str | |
dialog: str | |
# Define the instruction templates | |
INSTRUCTION_TEMPLATES = { | |
"podcast": InstructionTemplate( | |
intro= _("podcast.intro"), | |
text_instructions=_("podcast.text_instructions"), | |
scratch_pad=_("podcast.scratch_pad"), | |
prelude=_("podcast.prelude"), | |
dialog=_("podcast.dialog"), | |
), | |
"SciAgents material discovery summary": InstructionTemplate( | |
intro=_("discovery.intro"), | |
text_instructions=_("discovery.text_instructions"), | |
scratch_pad=_("discovery.scratch_pad"), | |
prelude=_("discovery.prelude"), | |
dialog=_("discovery.dialog"), | |
), | |
"lecture": InstructionTemplate( | |
intro=_("lecture.intro"), | |
text_instructions=_("lecture.text_instructions"), | |
scratch_pad=_("lecture.scratch_pad"), | |
prelude=_("lecture.prelude"), | |
dialog=_("lecture.dialog"), | |
), | |
"summary": InstructionTemplate( | |
intro=_("summary.intro"), | |
text_instructions=_("summary.text_instructions"), | |
scratch_pad=_("summary.scratch_pad"), | |
prelude=_("summary.prelude"), | |
dialog=_("summary.dialog"), | |
), | |
"short summary": InstructionTemplate( | |
intro=_("shortsummary.intro"), | |
text_instructions=_("shortsummary.text_instructions"), | |
scratch_pad=_("shortsummary.scratch_pad"), | |
prelude=_("shortsummary.prelude"), | |
dialog=_("shortsummary.dialog"), | |
), | |
} | |
def update_instructions_language(lang): | |
update_language(lang) | |
INSTRUCTION_TEMPLATES["podcast"] = InstructionTemplate( | |
intro=_("podcast.intro"), | |
text_instructions=_("podcast.text_instructions"), | |
scratch_pad=_("podcast.scratch_pad"), | |
prelude=_("podcast.prelude"), | |
dialog=_("podcast.dialog"), | |
) | |
return update_instructions("podcast") | |
def update_instructions(template): | |
selected_template = INSTRUCTION_TEMPLATES[template] | |
return ( | |
selected_template.intro, | |
selected_template.text_instructions, | |
selected_template.scratch_pad, | |
selected_template.prelude, | |
selected_template.dialog | |
) | |
# Define standard values | |
STANDARD_TEXT_MODELS = [ | |
"o1-preview-2024-09-12", | |
"o1-preview", | |
"gpt-4o-2024-08-06", | |
"gpt-4o", | |
"gpt-4o-mini-2024-07-18", | |
"gpt-4o-mini", | |
"o1-mini-2024-09-12", | |
"o1-mini", | |
"chatgpt-4o-latest", | |
"gpt-4-turbo", | |
"openai/custom_model", | |
] | |
STANDARD_AUDIO_MODELS = [ | |
"tts-1", | |
"tts-1-hd", | |
] | |
STANDARD_VOICES = [ | |
"alloy", | |
"echo", | |
"fable", | |
"onyx", | |
"nova", | |
"shimmer", | |
] | |
class DialogueItem(BaseModel): | |
text: str | |
speaker: Literal["speaker-1", "speaker-2"] | |
class Dialogue(BaseModel): | |
scratchpad: str | |
dialogue: List[DialogueItem] | |
def get_mp3(text: str, voice: str, audio_model: str, api_key: str = None) -> bytes: | |
client = OpenAI( | |
api_key=api_key or os.getenv("OPENAI_API_KEY"), | |
) | |
with client.audio.speech.with_streaming_response.create( | |
model=audio_model, | |
voice=voice, | |
input=text, | |
) as response: | |
with io.BytesIO() as file: | |
for chunk in response.iter_bytes(): | |
file.write(chunk) | |
return file.getvalue() | |
from functools import wraps | |
def conditional_llm(model, api_base=None, api_key=None): | |
""" | |
Conditionally apply the @llm decorator based on the api_base parameter. | |
If api_base is provided, it applies the @llm decorator with api_base. | |
Otherwise, it applies the @llm decorator without api_base. | |
""" | |
def decorator(func): | |
if api_base: | |
return llm(model=model, api_base=api_base)(func) | |
else: | |
return llm(model=model, api_key=api_key)(func) | |
return decorator | |
def get_text_from_url(url: str) -> str: | |
"""Fetch text content from a given URL.""" | |
try: | |
response = requests.get('https://r.jina.ai/' + url) | |
response.raise_for_status() | |
return response.text | |
except requests.RequestException as e: | |
raise gr.Error(f"Error fetching content from URL: {str(e)}") | |
def generate_audio( | |
url: str, | |
openai_api_key: str = None, | |
text_model: str = "gpt-4o-mini-2024-07-18", | |
audio_model: str = "tts-1", | |
speaker_1_voice: str = "alloy", | |
speaker_2_voice: str = "echo", | |
api_base: str = None, | |
intro_instructions: str = None, | |
text_instructions: str = None , | |
scratch_pad_instructions: str = None , | |
prelude_dialog: str = None, | |
podcast_dialog_instructions: str = None, | |
edited_transcript: str = None, | |
user_feedback: str = None, | |
original_text: str = None, | |
debug = False, | |
# template_dropdown : str = "", = original text ? | |
use_default_template : bool = False, | |
) -> tuple: | |
if not url: | |
return None, None, None, "Please provide a valid URL before generating audio." | |
if use_default_template: | |
intro_instructions = INSTRUCTION_TEMPLATES[original_text]["intro"] | |
text_instructions = INSTRUCTION_TEMPLATES[original_text]["text_instructions"] | |
scratch_pad_instructions = INSTRUCTION_TEMPLATES[original_text]["scratch_pad"] | |
prelude_dialog = INSTRUCTION_TEMPLATES[original_text]["prelude"] | |
podcast_dialog_instructions = INSTRUCTION_TEMPLATES[original_text]["dialog"] | |
try: | |
# Validate API Key | |
if not os.getenv("OPENAI_API_KEY") and not openai_api_key: | |
raise gr.Error("OpenAI API key is required") | |
# combined_text = original_text or "" | |
# # If there's no original text, fetch it from the provided URL | |
# if not combined_text: | |
combined_text = get_text_from_url(url) | |
# Configure the LLM based on selected model and api_base | |
def generate_dialogue(text: str, intro_instructions: str, text_instructions: str, scratch_pad_instructions: str, | |
prelude_dialog: str, podcast_dialog_instructions: str, | |
edited_transcript: str = None, user_feedback: str = None, ) -> Dialogue: | |
""" | |
{intro_instructions} | |
Here is the original input text: | |
<input_text> | |
{text} | |
</input_text> | |
{text_instructions} | |
<scratchpad> | |
{scratch_pad_instructions} | |
</scratchpad> | |
{prelude_dialog} | |
<podcast_dialogue> | |
{podcast_dialog_instructions} | |
</podcast_dialogue> | |
{edited_transcript}{user_feedback} | |
""" | |
instruction_improve='Based on the original text, please generate an improved version of the dialogue by incorporating the edits, comments and feedback.' | |
edited_transcript_processed="\nPreviously generated edited transcript, with specific edits and comments that I want you to carefully address:\n"+"<edited_transcript>\n"+edited_transcript+"</edited_transcript>" if edited_transcript !="" else "" | |
user_feedback_processed="\nOverall user feedback:\n\n"+user_feedback if user_feedback !="" else "" | |
if edited_transcript_processed.strip()!='' or user_feedback_processed.strip()!='': | |
user_feedback_processed="<requested_improvements>"+user_feedback_processed+"\n\n"+instruction_improve+"</requested_improvements>" | |
if debug: | |
logger.info (edited_transcript_processed) | |
logger.info (user_feedback_processed) | |
logger.info (combined_text) | |
# Generate the dialogue using the LLM | |
llm_output = generate_dialogue( | |
combined_text, | |
intro_instructions=intro_instructions, | |
text_instructions=text_instructions, | |
scratch_pad_instructions=scratch_pad_instructions, | |
prelude_dialog=prelude_dialog, | |
podcast_dialog_instructions=podcast_dialog_instructions, | |
edited_transcript=edited_transcript_processed, | |
user_feedback=user_feedback_processed | |
) | |
# Generate audio from the transcript | |
audio = b"" | |
transcript = "" | |
characters = 0 | |
with cf.ThreadPoolExecutor() as executor: | |
futures = [] | |
for line in llm_output.dialogue: | |
transcript_line = f"{line.speaker}: {line.text}" | |
voice = speaker_1_voice if line.speaker == "speaker-1" else speaker_2_voice | |
future = executor.submit(get_mp3, line.text, voice, audio_model, openai_api_key) | |
futures.append((future, transcript_line)) | |
characters += len(line.text) | |
for future, transcript_line in futures: | |
audio_chunk = future.result() | |
audio += audio_chunk | |
transcript += transcript_line + "\n\n" | |
logger.info(f"Generated {characters} characters of audio") | |
temporary_directory = "./gradio_cached_examples/tmp/" | |
os.makedirs(temporary_directory, exist_ok=True) | |
# Use a temporary file -- Gradio's audio component doesn't work with raw bytes in Safari | |
temporary_file = NamedTemporaryFile( | |
dir=temporary_directory, | |
delete=False, | |
suffix=".mp3", | |
) | |
temporary_file.write(audio) | |
temporary_file.close() | |
# Delete any files in the temp directory that end with .mp3 and are over a day old | |
for file in glob.glob(f"{temporary_directory}*.mp3"): | |
if os.path.isfile(file) and time.time() - os.path.getmtime(file) > 24 * 60 * 60: | |
os.remove(file) | |
# audio_file, transcript, original_text = generate_audio(*args) | |
# return audio_file, transcript, original_text, None # Return None as the error when successful | |
return temporary_file.name, transcript, combined_text, None | |
except Exception as e: | |
# If an error occurs during generation, return None for the outputs and the error message | |
return None, None, None, str(e) | |
# def validate_and_generate_audio(*args): | |
# url = args[0] | |
# if not url: | |
# return None, None, None, "Please provide a valid URL before generating audio." | |
# try: | |
# audio_file, transcript, original_text = generate_audio(*args) | |
# return audio_file, transcript, original_text, None # Return None as the error when successful | |
# except Exception as e: | |
# # If an error occurs during generation, return None for the outputs and the error message | |
# return None, None, None, str(e) | |
# def edit_and_regenerate(edited_transcript, user_feedback, *args): | |
# # Replace the original transcript and feedback in the args with the new ones | |
# #new_args = list(args) | |
# #new_args[-2] = edited_transcript # Update edited transcript | |
# #new_args[-1] = user_feedback # Update user feedback | |
# return validate_and_generate_audio(*new_args) | |
# New function to handle user feedback and regeneration | |
def process_feedback_and_regenerate(feedback, *args): | |
# Add user feedback to the args | |
new_args = list(args) | |
new_args.append(feedback) # Add user feedback as a new argument | |
return generate_audio(*new_args) | |
with gr.Blocks(theme='lone17/kotaemon', title="Text to Audio") as demo: | |
with gr.Row(equal_height=True): | |
with gr.Column(scale=10): | |
gr.Markdown( | |
""" | |
# Convert Text into an audio podcast, lecture, summary and others | |
First, provide a URL with the text content, select options, then push Generate Audio. | |
You can also select a variety of custom options and direct the way the result is generated. | |
""" | |
) | |
with gr.Column(scale=3): | |
with gr.Group(): | |
#dropdown.render() | |
toggle_dark = gr.Button(value="Toggle Dark") | |
#dropdown.change(None, dropdown, None, js=js) | |
toggle_dark.click( | |
None, | |
js=""" | |
() => { | |
document.body.classList.toggle('dark'); | |
} | |
""", | |
) | |
#add language selection, trigger a reload | |
lang = gr.Dropdown( | |
label="Language", | |
choices=["en", "fr"], | |
value="en", | |
info="Select the language for the interface.", | |
) | |
submit_btn = gr.Button("Generate Audio") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
url_input = gr.Textbox( | |
label="URL", | |
placeholder="Enter the URL of the text content", | |
info="Provide the URL of the webpage containing the text you want to convert to audio.", | |
) | |
openai_api_key = gr.Textbox( | |
label="OpenAI API Key", | |
visible=True, # Always show the API key field | |
placeholder="Enter your OpenAI API Key here...", | |
type="password" # Hide the API key input | |
) | |
text_model = gr.Dropdown( | |
label="Text Generation Model", | |
choices=STANDARD_TEXT_MODELS, | |
value="gpt-4o-mini", #"gpt-4o-mini", | |
info="Select the model to generate the dialogue text.", | |
) | |
audio_model = gr.Dropdown( | |
label="Audio Generation Model", | |
choices=STANDARD_AUDIO_MODELS, | |
value="tts-1", | |
info="Select the model to generate the audio.", | |
) | |
speaker_1_voice = gr.Dropdown( | |
label="Speaker 1 Voice", | |
choices=STANDARD_VOICES, | |
value="alloy", | |
info="Select the voice for Speaker 1.", | |
) | |
speaker_2_voice = gr.Dropdown( | |
label="Speaker 2 Voice", | |
choices=STANDARD_VOICES, | |
value="echo", | |
info="Select the voice for Speaker 2.", | |
) | |
api_base = gr.Textbox( | |
label="Custom API Base", | |
placeholder="Enter custom API base URL if using a custom/local model...", | |
info="If you are using a custom or local model, provide the API base URL here, e.g.: http://localhost:8080/v1 for llama.cpp REST server.", | |
) | |
with gr.Column(scale=3): | |
template_dropdown = gr.Dropdown( | |
label="Instruction Template", | |
choices=list(INSTRUCTION_TEMPLATES.keys()), | |
value="podcast", | |
info="Select the instruction template to use. You can also edit any of the fields for more tailored results.", | |
) | |
default_template_checkbox = gr.Checkbox(label="skip all template customization") | |
selected_template = INSTRUCTION_TEMPLATES["podcast"] | |
intro_instructions = gr.Textbox( | |
label="Intro Instructions", | |
lines=10, | |
value=selected_template.intro, | |
info="Provide the introductory instructions for generating the dialogue.", | |
) | |
text_instructions = gr.Textbox( | |
label="Standard Text Analysis Instructions", | |
lines=10, | |
placeholder="Enter text analysis instructions...", | |
value=selected_template.text_instructions, | |
info="Provide the instructions for analyzing the raw data and text.", | |
) | |
scratch_pad_instructions = gr.Textbox( | |
label="Scratch Pad Instructions", | |
lines=15, | |
value=selected_template.scratch_pad, | |
info="Provide the scratch pad instructions for brainstorming presentation/dialogue content.", | |
) | |
prelude_dialog = gr.Textbox( | |
label="Prelude Dialog", | |
lines=5, | |
value=selected_template.prelude, | |
info="Provide the prelude instructions before the presentation/dialogue is developed.", | |
) | |
podcast_dialog_instructions = gr.Textbox( | |
label="Podcast Dialog Instructions", | |
lines=20, | |
value=selected_template.dialog, | |
info="Provide the instructions for generating the presentation or podcast dialogue.", | |
) | |
# @gr.render(inputs=default_template_checkbox) | |
# def show_customization(checkbox ): | |
# if not checkbox: | |
# gr.Markdown("## No customization") | |
# else: | |
# gr.Markdown("## customization") | |
# intro_instructions.visible = checkbox.value | |
# text_instructions.visible = checkbox.value | |
# scratch_pad_instructions.visible = checkbox.value | |
# prelude_dialog.visible = checkbox.value | |
# podcast_dialog_instructions.visible = checkbox.value | |
# # set those dialog to not visible | |
audio_output = gr.Audio(label="Audio", format="mp3", interactive=False, autoplay=False) | |
transcript_output = gr.Textbox(label="Transcript", lines=20, show_copy_button=True) | |
original_text_output = gr.Textbox(label="Original Text", lines=10, visible=False) | |
error_output = gr.Textbox(visible=False) # Hidden textbox to store error message | |
use_edited_transcript = gr.Checkbox(label="Use Edited Transcript (check if you want to make edits to the initially generated transcript)", value=False) | |
edited_transcript = gr.Textbox(label="Edit Transcript Here. E.g., mark edits in the text with clear instructions. E.g., '[ADD DEFINITION OF MATERIOMICS]'.", lines=20, visible=False, | |
show_copy_button=True, interactive=False) | |
user_feedback = gr.Textbox(label="Provide Feedback or Notes", lines=10, #placeholder="Enter your feedback or notes here..." | |
) | |
regenerate_btn = gr.Button("Regenerate Audio with Edits and Feedback") | |
# Function to update the interactive state of edited_transcript | |
def update_edit_box(checkbox_value): | |
return gr.update(interactive=checkbox_value, lines=20 if checkbox_value else 20, visible=True if checkbox_value else False) | |
# Update the interactive state of edited_transcript when the checkbox is toggled | |
use_edited_transcript.change( | |
fn=update_edit_box, | |
inputs=[use_edited_transcript], | |
outputs=[edited_transcript] | |
) | |
# Update instruction fields when template is changed | |
template_dropdown.change( | |
fn=update_instructions, | |
inputs=[template_dropdown], | |
outputs=[intro_instructions, text_instructions, scratch_pad_instructions, prelude_dialog, podcast_dialog_instructions] | |
) | |
lang.change(fn=update_instructions_language, | |
inputs=[lang], | |
outputs=[intro_instructions, text_instructions, scratch_pad_instructions, prelude_dialog, podcast_dialog_instructions] | |
) | |
submit_btn.click( | |
fn=generate_audio, | |
inputs=[ | |
# url_input, openai_api_key, text_model, audio_model, | |
# speaker_1_voice, speaker_2_voice, api_base, | |
# None,None,None,None,None, | |
# edited_transcript, | |
# user_feedback,template_dropdown,default_template_checkbox | |
# if default_template_checkbox else | |
url_input, openai_api_key, text_model, audio_model, | |
speaker_1_voice, speaker_2_voice, api_base, | |
intro_instructions, text_instructions, scratch_pad_instructions, | |
prelude_dialog, podcast_dialog_instructions, | |
edited_transcript, | |
user_feedback,template_dropdown,default_template_checkbox | |
], | |
outputs=[audio_output, | |
transcript_output, | |
original_text_output, | |
error_output] | |
).then( | |
fn=lambda audio, transcript, original_text, error: ( | |
transcript if transcript else "", | |
error if error else None | |
), | |
inputs=[audio_output, transcript_output, original_text_output, error_output], | |
outputs=[edited_transcript, error_output] | |
).then( | |
fn=lambda error: gr.Warning(error) if error else None, | |
inputs=[error_output], | |
outputs=[] | |
) | |
regenerate_btn.click( | |
fn=lambda use_edit, edit, *args: generate_audio( | |
*args[:12], # All inputs up to podcast_dialog_instructions | |
edit if use_edit else "", # Use edited transcript if checkbox is checked, otherwise empty string | |
*args[12:] # user_feedback and original_text_output | |
), | |
inputs=[ | |
use_edited_transcript, edited_transcript, | |
url_input, openai_api_key, text_model, audio_model, | |
speaker_1_voice, speaker_2_voice, api_base, | |
intro_instructions, text_instructions, scratch_pad_instructions, | |
prelude_dialog, podcast_dialog_instructions, | |
user_feedback, original_text_output | |
], | |
outputs=[audio_output, transcript_output, original_text_output, error_output] | |
).then( | |
fn=lambda audio, transcript, original_text, error: ( | |
transcript if transcript else "", | |
error if error else None | |
), | |
inputs=[audio_output, transcript_output, original_text_output, error_output], | |
outputs=[edited_transcript, error_output] | |
).then( | |
fn=lambda error: gr.Warning(error) if error else None, | |
inputs=[error_output], | |
outputs=[] | |
) | |
# Add README content at the bottom | |
gr.Markdown("---") # Horizontal line to separate the interface from README | |
gr.Markdown(read_readme()) | |
# Enable queueing for better performance | |
demo.queue(max_size=20, default_concurrency_limit=32) | |
import subprocess | |
def execute_command(cmd): | |
try: | |
# Use subprocess to run the command | |
result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
# Return the stdout and stderr from the command execution | |
return result.stdout + result.stderr | |
except Exception as e: | |
return str(e) | |
# Launch the Gradio app | |
if __name__ == "__main__": | |
logger.info(execute_command('msgfmt locales/fr/LC_MESSAGES/messages.po -o locales/fr/LC_MESSAGES/messages.mo')) | |
logger.info(execute_command('msgfmt locales/en/LC_MESSAGES/messages.po -o locales/en/LC_MESSAGES/messages.mo')) | |
load_dotenv() # This line brings all environment variables from .env into os.environ | |
app, local_url, share_url = demo.launch(share=False) | |