File size: 9,028 Bytes
8811068
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87e00b2
8811068
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a64d7a5
8811068
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import datetime
import os
from string import Template
import openai
import re
import glob
import pickle
import time
import json5
from retrying import retry
from code_generator import check_json_script, collect_and_check_audio_data
import random
import string

import utils
import voice_presets
from code_generator import AudioCodeGenerator
openai.api_base = os.environ['WAVJOURNEY_OPENAI_URL']
# Enable this for debugging
USE_OPENAI_CACHE = False
openai_cache = []
if USE_OPENAI_CACHE:
    os.makedirs('cache', exist_ok=True)
    for cache_file in glob.glob('cache/*.pkl'):
        with open(cache_file, 'rb') as file:
            openai_cache.append(pickle.load(file))

def chat_with_gpt(prompt, api_key):
    if USE_OPENAI_CACHE:
        filtered_object = list(filter(lambda x: x['prompt'] == prompt, openai_cache))
        if len(filtered_object) > 0:
            response = filtered_object[0]['response']
            return response
    
    try:
        openai.api_key = api_key
        chat = openai.ChatCompletion.create(
            # model="gpt-3.5-turbo",
            model="gpt-3.5-turbo",
            messages=[
                {
                    "role": "system",
                    "content": "You are a helpful assistant."
                },
                {
                    "role": "user",
                    "content": prompt
                }
            ]
        )
    finally:
        openai.api_key = ''

    if USE_OPENAI_CACHE:
        cache_obj = {
            'prompt': prompt,
            'response': chat['choices'][0]['message']['content']
        }
        with open(f'cache/{time.time()}.pkl', 'wb') as _openai_cache:
            pickle.dump(cache_obj, _openai_cache)
            openai_cache.append(cache_obj)

    return chat['choices'][0]['message']['content']


def get_file_content(filename):
    with open(filename, 'r') as file:
        return file.read().strip()


def write_to_file(filename, content):
    with open(filename, 'w') as file:
        file.write(content)


def extract_substring_with_quotes(input_string, quotes="'''"):
    pattern = f"{quotes}(.*?){quotes}"
    matches = re.findall(pattern, input_string, re.DOTALL)
    return matches


def try_extract_content_from_quotes(content):
    if "'''" in content:
        return extract_substring_with_quotes(content)[0]
    elif "```" in content:
        return extract_substring_with_quotes(content, quotes="```")[0]
    else:
        return content

def maybe_get_content_from_file(content_or_filename):
    if os.path.exists(content_or_filename):
        with open(content_or_filename, 'r') as file:
            return file.read().strip()
    return content_or_filename



# Pipeline Interface Guidelines:
#
# Init calls:
# - Init calls must be called before running the actual steps
#   - init_session() is called every time a gradio webpage is loaded
#
# Single Step:
# - takes input (file or content) and output path as input
# - most of time just returns output content
# 
# Compositional Step:
# - takes session_id as input (you have session_id, you have all the paths)
# - run a series of steps

# This is called for every new gradio webpage

def init_session(session_id=''):
    def uid8():
        return ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))

    if session_id == '':
        session_id = f'{datetime.datetime.now().strftime("%Y%m%d%H%M%S")}_{uid8()}'
    # create the paths
    os.makedirs(utils.get_session_voice_preset_path(session_id))
    os.makedirs(utils.get_session_audio_path(session_id))
    print(f'New session created, session_id={session_id}')
    return session_id

@retry(stop_max_attempt_number=3)
def input_text_to_json_script_with_retry(complete_prompt_path, api_key):
    print("    trying ...")
    complete_prompt = get_file_content(complete_prompt_path)
    json_response = try_extract_content_from_quotes(chat_with_gpt(complete_prompt, api_key))
    json_data = json5.loads(json_response)

    try:
        check_json_script(json_data)
        collect_and_check_audio_data(json_data)
    except Exception as err:
        print(f'JSON ERROR: {err}')
        retry_complete_prompt = f'{complete_prompt}\n```\n{json_response}```\nThe script above has format error(s). Return the fixed script.\n\nScript:\n'
        write_to_file(complete_prompt_path, retry_complete_prompt)
        raise err

    return json_response

# Step 1: input_text to json
def input_text_to_json_script(input_text, output_path, api_key):
    input_text = maybe_get_content_from_file(input_text)
    text_to_audio_script_prompt = get_file_content('prompts/text_to_json.prompt')
    prompt = f'{text_to_audio_script_prompt}\n\nInput text: {input_text}\n\nScript:\n'
    complete_prompt_path = output_path / 'complete_input_text_to_audio_script.prompt'
    write_to_file(complete_prompt_path, prompt)
    audio_script_response = input_text_to_json_script_with_retry(complete_prompt_path, api_key)
    generated_audio_script_filename = output_path / 'audio_script.json'
    write_to_file(generated_audio_script_filename, audio_script_response)
    return audio_script_response

# Step 2: json to char-voice map
def json_script_to_char_voice_map(json_script, voices, output_path, api_key):
    json_script_content = maybe_get_content_from_file(json_script)
    prompt = get_file_content('prompts/audio_script_to_character_voice_map.prompt')
    presets_str = '\n'.join(f"{preset['id']}: {preset['desc']}" for preset in voices.values())
    prompt = Template(prompt).substitute(voice_and_desc=presets_str)
    prompt = f"{prompt}\n\nAudio script:\n'''\n{json_script_content}\n'''\n\noutput:\n"
    write_to_file(output_path / 'complete_audio_script_to_char_voice_map.prompt', prompt)
    char_voice_map_response = try_extract_content_from_quotes(chat_with_gpt(prompt, api_key))
    char_voice_map = json5.loads(char_voice_map_response)
    # enrich char_voice_map with voice preset metadata
    complete_char_voice_map = {c: voices[char_voice_map[c]] for c in char_voice_map}
    char_voice_map_filename = output_path / 'character_voice_map.json'
    write_to_file(char_voice_map_filename, json5.dumps(complete_char_voice_map))
    return complete_char_voice_map
    
# Step 3: json to py code
def json_script_and_char_voice_map_to_audio_gen_code(json_script_filename, char_voice_map_filename, output_path, result_filename):
    audio_code_generator = AudioCodeGenerator()
    code = audio_code_generator.parse_and_generate(
        json_script_filename,
        char_voice_map_filename,
        output_path,
        result_filename
    )
    write_to_file(output_path / 'audio_generation.py', code)

# Step 4: py code to final wav
def audio_code_gen_to_result(audio_gen_code_path):
    audio_gen_code_filename = audio_gen_code_path / 'audio_generation.py'
    os.system(f'PYTHONPATH=. python {audio_gen_code_filename}')

# Function call used by Gradio: input_text to json
def generate_json_file(session_id, input_text, api_key):
    output_path = utils.get_session_path(session_id)
    # Step 1
    print(f'session_id={session_id}, Step 1: Writing audio script based on text: {input_text} ...')
    return input_text_to_json_script(input_text, output_path, api_key)

# Function call used by Gradio: json to result wav
def generate_audio(session_id, json_script, api_key):
    def count_lines(content):
        # Split the string using the newline character and count the non-empty lines
        return sum(1 for line in content.split('\n') if line.strip())

    max_lines = utils.get_max_script_lines()
    if count_lines(json_script) > max_lines:
        raise ValueError(f'The number of lines of the JSON script has exceeded {max_lines}!')

    output_path = utils.get_session_path(session_id)
    output_audio_path = utils.get_session_audio_path(session_id)
    voices = voice_presets.get_merged_voice_presets(session_id)

    # Step 2
    print(f'session_id={session_id}, Step 2: Parsing character voice with LLM...')
    char_voice_map = json_script_to_char_voice_map(json_script, voices, output_path, api_key)
    # Step 3
    json_script_filename = output_path / 'audio_script.json'
    char_voice_map_filename = output_path / 'character_voice_map.json'
    result_wav_basename = f'res_{session_id}'
    print(f'session_id={session_id}, Step 3: Compiling audio script to Python program ...')
    json_script_and_char_voice_map_to_audio_gen_code(json_script_filename, char_voice_map_filename, output_path, result_wav_basename)
    # Step 4
    print(f'session_id={session_id}, Step 4: Start running Python program ...')
    audio_code_gen_to_result(output_path)

    result_wav_filename = output_audio_path / f'{result_wav_basename}.wav'
    print(f'Done all processes, result: {result_wav_filename}')
    return result_wav_filename, char_voice_map

# Convenient function call used by wavjourney_cli
def full_steps(session_id, input_text, api_key):
    json_script = generate_json_file(session_id, input_text, api_key)
    return generate_audio(session_id, json_script, api_key)