Spaces:
Runtime error
Runtime error
File size: 9,028 Bytes
8811068 87e00b2 8811068 a64d7a5 8811068 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
import datetime
import os
from string import Template
import openai
import re
import glob
import pickle
import time
import json5
from retrying import retry
from code_generator import check_json_script, collect_and_check_audio_data
import random
import string
import utils
import voice_presets
from code_generator import AudioCodeGenerator
openai.api_base = os.environ['WAVJOURNEY_OPENAI_URL']
# Enable this for debugging
USE_OPENAI_CACHE = False
openai_cache = []
if USE_OPENAI_CACHE:
os.makedirs('cache', exist_ok=True)
for cache_file in glob.glob('cache/*.pkl'):
with open(cache_file, 'rb') as file:
openai_cache.append(pickle.load(file))
def chat_with_gpt(prompt, api_key):
if USE_OPENAI_CACHE:
filtered_object = list(filter(lambda x: x['prompt'] == prompt, openai_cache))
if len(filtered_object) > 0:
response = filtered_object[0]['response']
return response
try:
openai.api_key = api_key
chat = openai.ChatCompletion.create(
# model="gpt-3.5-turbo",
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": prompt
}
]
)
finally:
openai.api_key = ''
if USE_OPENAI_CACHE:
cache_obj = {
'prompt': prompt,
'response': chat['choices'][0]['message']['content']
}
with open(f'cache/{time.time()}.pkl', 'wb') as _openai_cache:
pickle.dump(cache_obj, _openai_cache)
openai_cache.append(cache_obj)
return chat['choices'][0]['message']['content']
def get_file_content(filename):
with open(filename, 'r') as file:
return file.read().strip()
def write_to_file(filename, content):
with open(filename, 'w') as file:
file.write(content)
def extract_substring_with_quotes(input_string, quotes="'''"):
pattern = f"{quotes}(.*?){quotes}"
matches = re.findall(pattern, input_string, re.DOTALL)
return matches
def try_extract_content_from_quotes(content):
if "'''" in content:
return extract_substring_with_quotes(content)[0]
elif "```" in content:
return extract_substring_with_quotes(content, quotes="```")[0]
else:
return content
def maybe_get_content_from_file(content_or_filename):
if os.path.exists(content_or_filename):
with open(content_or_filename, 'r') as file:
return file.read().strip()
return content_or_filename
# Pipeline Interface Guidelines:
#
# Init calls:
# - Init calls must be called before running the actual steps
# - init_session() is called every time a gradio webpage is loaded
#
# Single Step:
# - takes input (file or content) and output path as input
# - most of time just returns output content
#
# Compositional Step:
# - takes session_id as input (you have session_id, you have all the paths)
# - run a series of steps
# This is called for every new gradio webpage
def init_session(session_id=''):
def uid8():
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
if session_id == '':
session_id = f'{datetime.datetime.now().strftime("%Y%m%d%H%M%S")}_{uid8()}'
# create the paths
os.makedirs(utils.get_session_voice_preset_path(session_id))
os.makedirs(utils.get_session_audio_path(session_id))
print(f'New session created, session_id={session_id}')
return session_id
@retry(stop_max_attempt_number=3)
def input_text_to_json_script_with_retry(complete_prompt_path, api_key):
print(" trying ...")
complete_prompt = get_file_content(complete_prompt_path)
json_response = try_extract_content_from_quotes(chat_with_gpt(complete_prompt, api_key))
json_data = json5.loads(json_response)
try:
check_json_script(json_data)
collect_and_check_audio_data(json_data)
except Exception as err:
print(f'JSON ERROR: {err}')
retry_complete_prompt = f'{complete_prompt}\n```\n{json_response}```\nThe script above has format error(s). Return the fixed script.\n\nScript:\n'
write_to_file(complete_prompt_path, retry_complete_prompt)
raise err
return json_response
# Step 1: input_text to json
def input_text_to_json_script(input_text, output_path, api_key):
input_text = maybe_get_content_from_file(input_text)
text_to_audio_script_prompt = get_file_content('prompts/text_to_json.prompt')
prompt = f'{text_to_audio_script_prompt}\n\nInput text: {input_text}\n\nScript:\n'
complete_prompt_path = output_path / 'complete_input_text_to_audio_script.prompt'
write_to_file(complete_prompt_path, prompt)
audio_script_response = input_text_to_json_script_with_retry(complete_prompt_path, api_key)
generated_audio_script_filename = output_path / 'audio_script.json'
write_to_file(generated_audio_script_filename, audio_script_response)
return audio_script_response
# Step 2: json to char-voice map
def json_script_to_char_voice_map(json_script, voices, output_path, api_key):
json_script_content = maybe_get_content_from_file(json_script)
prompt = get_file_content('prompts/audio_script_to_character_voice_map.prompt')
presets_str = '\n'.join(f"{preset['id']}: {preset['desc']}" for preset in voices.values())
prompt = Template(prompt).substitute(voice_and_desc=presets_str)
prompt = f"{prompt}\n\nAudio script:\n'''\n{json_script_content}\n'''\n\noutput:\n"
write_to_file(output_path / 'complete_audio_script_to_char_voice_map.prompt', prompt)
char_voice_map_response = try_extract_content_from_quotes(chat_with_gpt(prompt, api_key))
char_voice_map = json5.loads(char_voice_map_response)
# enrich char_voice_map with voice preset metadata
complete_char_voice_map = {c: voices[char_voice_map[c]] for c in char_voice_map}
char_voice_map_filename = output_path / 'character_voice_map.json'
write_to_file(char_voice_map_filename, json5.dumps(complete_char_voice_map))
return complete_char_voice_map
# Step 3: json to py code
def json_script_and_char_voice_map_to_audio_gen_code(json_script_filename, char_voice_map_filename, output_path, result_filename):
audio_code_generator = AudioCodeGenerator()
code = audio_code_generator.parse_and_generate(
json_script_filename,
char_voice_map_filename,
output_path,
result_filename
)
write_to_file(output_path / 'audio_generation.py', code)
# Step 4: py code to final wav
def audio_code_gen_to_result(audio_gen_code_path):
audio_gen_code_filename = audio_gen_code_path / 'audio_generation.py'
os.system(f'PYTHONPATH=. python {audio_gen_code_filename}')
# Function call used by Gradio: input_text to json
def generate_json_file(session_id, input_text, api_key):
output_path = utils.get_session_path(session_id)
# Step 1
print(f'session_id={session_id}, Step 1: Writing audio script based on text: {input_text} ...')
return input_text_to_json_script(input_text, output_path, api_key)
# Function call used by Gradio: json to result wav
def generate_audio(session_id, json_script, api_key):
def count_lines(content):
# Split the string using the newline character and count the non-empty lines
return sum(1 for line in content.split('\n') if line.strip())
max_lines = utils.get_max_script_lines()
if count_lines(json_script) > max_lines:
raise ValueError(f'The number of lines of the JSON script has exceeded {max_lines}!')
output_path = utils.get_session_path(session_id)
output_audio_path = utils.get_session_audio_path(session_id)
voices = voice_presets.get_merged_voice_presets(session_id)
# Step 2
print(f'session_id={session_id}, Step 2: Parsing character voice with LLM...')
char_voice_map = json_script_to_char_voice_map(json_script, voices, output_path, api_key)
# Step 3
json_script_filename = output_path / 'audio_script.json'
char_voice_map_filename = output_path / 'character_voice_map.json'
result_wav_basename = f'res_{session_id}'
print(f'session_id={session_id}, Step 3: Compiling audio script to Python program ...')
json_script_and_char_voice_map_to_audio_gen_code(json_script_filename, char_voice_map_filename, output_path, result_wav_basename)
# Step 4
print(f'session_id={session_id}, Step 4: Start running Python program ...')
audio_code_gen_to_result(output_path)
result_wav_filename = output_audio_path / f'{result_wav_basename}.wav'
print(f'Done all processes, result: {result_wav_filename}')
return result_wav_filename, char_voice_map
# Convenient function call used by wavjourney_cli
def full_steps(session_id, input_text, api_key):
json_script = generate_json_file(session_id, input_text, api_key)
return generate_audio(session_id, json_script, api_key)
|