Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
from huggingface_hub import InferenceClient | |
import os | |
import requests | |
import scipy.io.wavfile | |
import io | |
import time | |
from gradio_client import Client, file | |
client = InferenceClient( | |
"meta-llama/Meta-Llama-3-8B-Instruct", token=os.getenv("hf_token") | |
) | |
def process_audio(audio_data): | |
if audio_data is None: | |
return "No audio provided.", "" | |
# Check if audio_data is a tuple and extract data | |
if isinstance(audio_data, tuple): | |
sample_rate, data = audio_data | |
else: | |
return "Invalid audio data format.", "" | |
# Convert the audio data to WAV format in memory | |
buf = io.BytesIO() | |
scipy.io.wavfile.write(buf, sample_rate, data) | |
wav_bytes = buf.getvalue() | |
buf.close() | |
API_URL = "https://api-inference.huggingface.co/models/openai/whisper-large-v2" | |
headers = {"Authorization": f"Bearer {os.getenv('hf_token')}"} | |
def query(wav_data): | |
response = requests.post(API_URL, headers=headers, data=wav_data) | |
return response.json() | |
# Call the API to process the audio | |
output = query(wav_bytes) | |
print(output) # Check output in console (logs in HF space) | |
# Check the API response | |
if "text" in output: | |
recognized_text = output["text"] | |
return recognized_text, recognized_text | |
else: | |
recognized_text = ( | |
"The ASR module is still loading, please press the button again!" | |
) | |
return recognized_text, "" | |
def master_decision(message): | |
decision_response = "" | |
judge_system_message = """You are helpful assistant. You will be given queries from the user and you decide on which domain the query belongs to. You have three domains : ["movies","music","others"]. If you don't know about the domain of a query, it is to be classified as "others". Please give a one word answer in smaller caps.""" | |
m_message = [ | |
{"role": "system", "content": judge_system_message}, | |
{"role": "user", "content": message}, | |
] | |
for m in client.chat_completion( | |
m_message, | |
stream=True, | |
): | |
token = m.choices[0].delta.content | |
decision_response += token | |
print(decision_response) | |
if "movies" in decision_response: | |
movie_client = Client("ironserengety/movies-recommender") | |
result = movie_client.predict( | |
message=message, | |
system_message="You are a movie recommender named 'Exodia'. You are extremely reliable. You always mention your name in the beginning of conversation. You will provide me with answers from the given info. Give not more than 3 choices and make sure that answers are complete sentences. Give short one-line descriptions of each sentence.", | |
max_tokens=512, | |
temperature=0.7, | |
top_p=0.95, | |
api_name="/chat", | |
) | |
print(result) | |
return decision_response, result | |
# elif "music" in decision_response: | |
elif "music" in decision_response: | |
music_client = Client("ironserengety/MusicRetriever") | |
result = music_client.predict(message=message, api_name="/respond") | |
response = result | |
return decision_response, response | |
else: | |
# others | |
system_message = "You are a helpful chatbot that answers questions. Give any answer within 50 words." | |
messages = [{"role": "system", "content": system_message}] | |
# for val in history: | |
# print(val[0]) | |
# if val[0] != None: | |
# if val[0]: | |
# messages.append({"role": "user", "content": val[0]}) | |
# if val[1]: | |
# messages.append({"role": "assistant", "content": val[1]}) | |
messages.append({"role": "user", "content": message}) | |
response = "" | |
print(messages) | |
for message in client.chat_completion( | |
messages, | |
stream=True, | |
): | |
token = message.choices[0].delta.content | |
response += token | |
return decision_response, response | |
def tts_part_new(response): | |
result = "" | |
text = response | |
client = Client("tonyassi/voice-clone") | |
result = client.predict(text, audio=file("siri.wav"), api_name="/predict") | |
return result | |
def get_chatbot_response(audio_data): | |
response_text, _ = process_audio(audio_data) | |
domain, response = master_decision(response_text) | |
if domain == "music": | |
return response, response | |
else: | |
return response, tts_part_new(response) | |
def chat_interface(): | |
with gr.Blocks() as demo: | |
# audio_input = gr.Audio(source="microphone", type="filepath", label="Speak") | |
audio_input = gr.Audio( | |
sources="microphone", | |
type="numpy", # Get audio data and sample rate | |
label="Say Something...", | |
) | |
btn = gr.Button(value="Send") | |
response_textbox = gr.Textbox(label="Response Text") | |
audio_output = gr.Audio(label="Response Audio") | |
btn.click( | |
get_chatbot_response, | |
inputs=[audio_input], | |
outputs=[response_textbox, audio_output], | |
) | |
return demo | |
if __name__ == "__main__": | |
# demo = create_interface() | |
demo = chat_interface() | |
demo.launch(show_error=True) | |