Spaces:
Running
Running
File size: 3,290 Bytes
d58f539 f7de418 d58f539 f7de418 d58f539 669ae67 f7de418 d58f539 f7de418 16020a5 f7de418 16020a5 f7de418 d58f539 f7de418 16020a5 f7de418 16020a5 d58f539 f7de418 d58f539 16020a5 f7de418 16020a5 f7de418 16020a5 f7de418 16020a5 f7de418 d58f539 f7de418 d58f539 f7de418 16020a5 f7de418 d58f539 f7de418 d58f539 f7de418 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
import gradio as gr
import numpy as np
import io
from pydub import AudioSegment
import tempfile
import os
import base64
import openai
from dataclasses import dataclass, field
from threading import Lock
# Lepton API setup
client = openai.OpenAI(
base_url="https://llama3-1-8b.lepton.run/api/v1/",
api_key=os.environ.get('LEPTON_API_TOKEN')
)
@dataclass
class AppState:
conversation: list = field(default_factory=list)
lock: Lock = field(default_factory=Lock)
def transcribe_audio(audio):
# This is a placeholder function. In a real-world scenario, you'd use a
# speech-to-text service here. For now, we'll just return a dummy transcript.
return "This is a dummy transcript. Please implement actual speech-to-text functionality."
def generate_response_and_audio(message, state):
with state.lock:
state.conversation.append({"role": "user", "content": message})
completion = client.chat.completions.create(
model="llama3-1-8b",
messages=state.conversation,
max_tokens=128,
stream=True,
extra_body={
"require_audio": "true",
"tts_preset_id": "jessica",
}
)
full_response = ""
audio_chunks = []
for chunk in completion:
if not chunk.choices:
continue
content = chunk.choices[0].delta.content
audio = getattr(chunk.choices[0], 'audio', [])
if content:
full_response += content
yield full_response, None, state
if audio:
audio_chunks.extend(audio)
audio_data = b''.join([base64.b64decode(a) for a in audio_chunks])
yield full_response, audio_data, state
state.conversation.append({"role": "assistant", "content": full_response})
def chat(message, state):
if not message:
return "", None, state
return generate_response_and_audio(message, state)
def process_audio(audio, state):
if audio is None:
return "", state
# Convert numpy array to wav
audio_segment = AudioSegment(
audio[1].tobytes(),
frame_rate=audio[0],
sample_width=audio[1].dtype.itemsize,
channels=1 if len(audio[1].shape) == 1 else audio[1].shape[1]
)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio:
audio_segment.export(temp_audio.name, format="wav")
transcript = transcribe_audio(temp_audio.name)
os.unlink(temp_audio.name)
return transcript, state
with gr.Blocks() as demo:
state = gr.State(AppState())
with gr.Row():
with gr.Column(scale=1):
audio_input = gr.Audio(source="microphone", type="numpy")
with gr.Column(scale=2):
chatbot = gr.Chatbot()
text_input = gr.Textbox(show_label=False, placeholder="Type your message here...")
with gr.Column(scale=1):
audio_output = gr.Audio(label="Generated Audio")
audio_input.change(process_audio, [audio_input, state], [text_input, state])
text_input.submit(chat, [text_input, state], [chatbot, audio_output, state])
demo.launch() |