import gradio as gr from gradio_webrtc import WebRTC, ReplyOnPause, AdditionalOutputs import numpy as np import io from pydub import AudioSegment import openai import time import base64 def create_client(api_key): return openai.OpenAI( base_url="https://llama3-1-8b.lepton.run/api/v1/", api_key=api_key ) def update_or_append_conversation(conversation, id, role, content): # Find if there's an existing message with the given id for message in conversation: if message.get("id") == id and message.get("role") == role: message["content"] = content return # If not found, append a new message conversation.append({"id": id, "role": role, "content": content}) def generate_response_and_audio(audio_bytes: bytes, lepton_conversation: list[str], client: OpenAI, output_format: str): if client is None: raise gr.Error("Please enter a valid API key first.") bitrate = 128 if output_format == "mp3" else 32 # Higher bitrate for MP3, lower for OPUS audio_data = base64.b64encode(audio_bytes).decode() try: stream = state.client.chat.completions.create( extra_body={ "require_audio": True, "tts_preset_id": "jessica", "tts_audio_format": format_, "tts_audio_bitrate": bitrate }, model="llama3.1-8b", messages=lepton_conversation + [{"role": "user", "content": [{"type": "audio", "data": audio_data}]}], temperature=0.7, max_tokens=256, stream=True, ) id = str(time.time()) full_response = "" asr_result = "" for chunk in stream: if not chunk.choices: continue delta = chunk.choices[0].delta content = delta.get("content", "") audio = getattr(chunk.choices[0], "audio", []) asr_results = getattr(chunk.choices[0], "asr_results", []) if asr_results: asr_result += "".join(asr_results) yield id, None, asr_result, None if content: full_response += content yield id, full_response, None, None if audio: # Accumulate audio bytes and yield them audio_bytes_accumulated = b''.join([base64.b64decode(a) for a in audio]) audio = AudioSegment.from_file(io.BytesIO(audio_bytes_accumulated)) audio_array = np.array(audio.get_array_of_samples(), dtype=np.int16).reshape(1, -1) print("audio.shape", audio_array.shape) print("sampling_rate", audio.frame_rate) yield id, None, None, (audio.frame_rate, audio_array) yield id, full_response, asr_result, None except Exception as e: raise gr.Error(f"Error during audio streaming: {e}") def response(audio: tuple[int, np.ndarray], lepton_conversation: list[dict], gradio_conversation: list[dict], client: OpenAI, output_format: str): audio_buffer = io.BytesIO() segment = AudioSegment( audio[1].tobytes(), frame_rate=audio[0], sample_width=audio[1].dtype.itemsize, channels=1, ) segment.export(audio_buffer, format="wav") generator = generate_response_and_audio(audio_buffer.getvalue(), state) for id, text, asr, audio in generator: if asr: update_or_append_conversation(lepton_conversation, id, "user", asr) update_or_append_conversation(gradio_conversation, id, "user", asr) if text: update_or_append_conversation(lepton_conversation, id, "assistant", text) update_or_append_conversation(gradio_conversation, id, "assistant", text) if audio: yield audio, AdditionalOutputs(lepton_conversation, gradio_conversation) else: yield AdditionalOutputs(lepton_conversation, gradio_conversation) with gr.Blocks() as demo: with gr.Row(): api_key_input = gr.Textbox(type="password", label="Enter your Lepton API Key") set_key_button = gr.Button("Set API Key") api_key_status = gr.Textbox(label="API Key Status", interactive=False) with gr.Row(): format_dropdown = gr.Dropdown(choices=["mp3", "opus"], value="mp3", label="Output Audio Format") with gr.Row(): with gr.Column(): input_audio = gr.Audio(label="Input Audio", sources="microphone", type="numpy") with gr.Column(): chatbot = gr.Chatbot(label="Conversation", type="messages") output_audio = gr.Audio(label="Output Audio", autoplay=True) state = gr.State(AppState()) set_key_button.click(set_api_key, inputs=[api_key_input, state], outputs=[api_key_status, state]) format_dropdown.change(update_format, inputs=[format_dropdown, state], outputs=[state]) stream = input_audio.stream( process_audio, [input_audio, state], [input_audio, state], stream_every=0.25, # Reduced to make it more responsive time_limit=60, # Increased to allow for longer messages ) stream.then( maybe_call_response, inputs=[state], outputs=[chatbot, output_audio, state], ) # Automatically restart recording after the assistant's response restart = output_audio.change( start_recording_user, [state], [input_audio] ) # Add a "Stop Conversation" button cancel = gr.Button("Stop Conversation", variant="stop") cancel.click(lambda: (AppState(stopped=True), gr.update(recording=False)), None, [state, input_audio], cancels=[stream, restart]) demo.launch()