Spaces:
Sleeping
Sleeping
import speech_recognition as sr | |
from gtts import gTTS | |
import gradio as gr | |
from io import BytesIO | |
import numpy as np | |
from dataclasses import dataclass, field | |
import time | |
from pydub import AudioSegment | |
import librosa | |
from utils.vad import get_speech_timestamps, collect_chunks, VadOptions | |
from PIL import Image | |
from ClassPrompt import PromptClass | |
import render | |
creator_prompt = PromptClass() | |
r = sr.Recognizer() | |
class AppState: | |
stream: np.ndarray | None = None | |
sampling_rate: int = 0 | |
pause_detected: bool = False | |
started_talking: bool = False | |
stopped: bool = False | |
history: list = field(default_factory=list) | |
typing: bool = False | |
painting:bool = False | |
image_out:Image.Image = None | |
image_in:Image = None | |
conversation:list = field(default_factory=list) | |
recording: bool = False # Thêm thuộc tính recording | |
pause_threshold: float = 1 # Thêm thuộc tính pause_threshold | |
strength: float = 1.0 | |
ckpt:list = field(default_factory=list) | |
guidance: float = 8 | |
def run_vad(ori_audio, sr): | |
_st = time.time() | |
try: | |
audio = ori_audio | |
audio = audio.astype(np.float32) / 32768.0 | |
sampling_rate = 16000 | |
audio = librosa.resample(audio, orig_sr=sr, target_sr=sampling_rate) | |
vad_parameters = {} | |
vad_parameters = VadOptions(**vad_parameters) | |
speech_chunks = get_speech_timestamps(audio, vad_parameters) | |
audio = collect_chunks(audio, speech_chunks) | |
duration_after_vad = audio.shape[0] / sampling_rate # Khai báo và tính toán duration_after_vad | |
vad_audio = audio | |
vad_audio = np.round(vad_audio * 32768.0).astype(np.int16) | |
vad_audio_bytes = vad_audio.tobytes() | |
return duration_after_vad, vad_audio_bytes, round(time.time() - _st, 4) | |
except Exception as e: | |
return -1, ori_audio, round(time.time() - _st, 4) | |
def determine_pause(audio:np.ndarray,sampling_rate:int,state:AppState) -> bool: | |
"""Phát hiện tạm dừng trong âm thanh.""" | |
temp_audio = audio | |
dur_vad, _, time_vad = run_vad(temp_audio, sampling_rate) | |
duration = len(audio) / sampling_rate | |
if dur_vad > 0.5 and not state.started_talking: | |
print("started talking") | |
state.started_talking = True | |
return False | |
print(f"duration_after_vad: {dur_vad:.3f} s, time_vad: {time_vad:.3f} s") | |
return (duration - dur_vad) > state.pause_threshold # Sử dụng state.pause_threshold | |
def process_audio(audio:tuple,state:AppState,image:Image, streng:float,ckpt,guidance): | |
if state.recording: # Kiểm tra state.stream: | |
time.sleep(0.1) | |
if state.stream is None: | |
state.stream = audio[1] | |
state.sampling_rate = audio[0] | |
else: | |
state.stream = np.concatenate((state.stream, audio[1])) | |
state.image_in=image | |
state.strength=streng | |
state.ckpt=ckpt | |
state.guidance=guidance | |
pause_detected = determine_pause(state.stream, state.sampling_rate, state) | |
state.pause_detected = pause_detected | |
if state.pause_detected and state.started_talking: | |
state.recording = False | |
return state, gr.Audio(recording=False) | |
return state, None | |
def transcribe_audio(audio_segment): | |
audio_buffer = BytesIO() | |
audio_segment.export(audio_buffer, format="wav") | |
audio_buffer.seek(0) | |
try: | |
with sr.AudioFile(audio_buffer) as source: | |
r.adjust_for_ambient_noise(source) | |
text = r.recognize_google(r.record(source), language='vi') | |
return text | |
except sr.UnknownValueError: | |
print("Could not understand audio.") | |
except sr.RequestError as e: | |
print(f"Could not request results from Google Speech Recognition service; {e}") | |
return "" | |
def chat_with_onlinemodel(user_input, state:AppState): | |
state.history.append({"role": "user", "content": user_input}) | |
response = creator_prompt.chat(provider="SambaNova", model="Meta-Llama-3.1-405B-Instruct", input_text=state.history) | |
bot_response = response | |
characters = bot_response.replace("*","") | |
state.history.append({"role": "assistant", "content": characters}) | |
state.conversation.append({"role": "user", "content":"Bạn: " + user_input}) | |
state.conversation.append({"role": "assistant", "content":"Bot: " + characters}) | |
return characters, state | |
def synthesize_speech(text): | |
"""Chuyển đổi text sang giọng nói bằng gTTS.""" | |
try: | |
mp3 = gTTS(text, tld='com.vn', lang='vi', slow=False) | |
mp3_fp = BytesIO() | |
mp3.write_to_fp(mp3_fp) | |
audio_bytes = mp3_fp.getvalue() | |
mp3_fp.close() | |
return audio_bytes # Chỉ trả về audio_bytes | |
except Exception as e: | |
print(f"Lỗi tổng hợp giọng nói: {e}") | |
return None | |
def response_audio(state:AppState): | |
"""Xử lý yêu cầu và tạo phản hồi.""" | |
if not state.pause_detected and not state.started_talking: | |
return state, None | |
textin="" | |
audio_segment = AudioSegment( | |
state.stream.tobytes(), | |
frame_rate=state.sampling_rate, | |
sample_width=state.stream.dtype.itemsize, | |
channels=1 if state.stream.ndim == 1 else state.stream.shape[1] | |
) | |
textin = transcribe_audio(audio_segment) | |
state.stream = None | |
if state.typing is False: | |
txt,state = chuyen_trangthai(textin, state) | |
if txt == True: | |
return state, synthesize_speech("chuyển sang trạng thái dùng bàn phím") | |
if textin != "": | |
paint=state.painting | |
state.painting = text_check(textin, state.painting) | |
if paint != state.painting: | |
return state, synthesize_speech("Đã chuyển sang chế độ " + ("vẽ" if state.painting else "nói chuyện")) | |
if state.painting is True: | |
promptx = prompt_hugingface(textin,"Hugging Face","Qwen/Qwen2.5-72B-Instruct","Medium") | |
img=resize(state.image_in) | |
state.image_out = render.generate_images(textin, img) | |
audio_bytes = synthesize_speech("Bạn thấy tôi vẽ "+textin+" có đẹp không") | |
return state, audio_bytes | |
else: | |
print("Đang nghĩ...") | |
text_out, state = chat_with_onlinemodel(textin,state) | |
audio_bytes = synthesize_speech(text_out) | |
return state, audio_bytes | |
else: | |
return state, synthesize_speech("Tôi nghe không rõ") # Trả về thông báo lỗi nếu synthesize_speech thất bại | |
def response_text(state:AppState,textin,image:Image, prompt, progress=gr.Progress(track_tqdm=True)): | |
"""Xử lý yêu cầu và tạo phản hồi.""" | |
#state.recording = False # Dừng ghi âm | |
if state.typing is True: | |
txt,state = chuyen_trangthai(textin, state) | |
if txt == False: | |
return state, synthesize_speech("chuyển sang trạng thái nói") | |
if textin != "": | |
paint=state.painting | |
state.painting = text_check(textin, state.painting) | |
if paint != state.painting: | |
return state, synthesize_speech("Đã chuyển sang chế độ " + ("vẽ" if state.painting else "nói chuyện")) | |
if state.painting is True: | |
state.conversation.append({"role": "user", "content":"Bạn: " + textin}) | |
#state.image_out = generate_image(textin, image, streng, ckpt,guidance) | |
img=resize(image) | |
image_out = render.generate_images(textin, img) | |
state.image_out = image_out | |
audio_bytes = synthesize_speech("Bạn thấy tôi vẽ "+prompt+" có đẹp không") | |
return state, audio_bytes | |
else: | |
print("Đang nghĩ...") | |
text_out, state = chat_with_onlinemodel(textin,state=state) | |
audio_bytes = synthesize_speech(text_out) | |
return state, audio_bytes | |
else: | |
return state, synthesize_speech("Hãy gõ nội dung") # Trả về thông báo lỗi nếu synthesize_speech thất bại | |
def text_check(textin, painting): | |
if not painting: | |
return "sang chế độ vẽ" in textin | |
return "sang chế độ nói" not in textin | |
def chuyen_trangthai(textin, state:AppState): | |
if "muốn nói chuyện" in textin: | |
state.started_talking = False | |
state.recording = True | |
state.stopped=False | |
state.typing = False | |
return False, state | |
elif "dùng bàn phím" in textin: | |
state.started_talking = False | |
state.recording = False | |
state.stopped=True | |
state.typing = True | |
return True, state | |
else: | |
return state.typing, state | |
def start_recording_user(state:AppState,progress=gr.Progress(track_tqdm=True)): # Sửa lỗi tại đây | |
state.stopped = False # Cho phép bắt đầu ghi âm lại nếu đang ở trạng thái recording | |
state.started_talking = False | |
state.recording = True | |
return gr.Audio(recording=True), state | |
def restart_recording(state:AppState): # Sửa lỗi tại đây | |
if not state.stopped: # Cho phép bắt đầu ghi âm lại nếu đang ở trạng thái recording | |
state.started_talking = False | |
state.recording = True | |
return gr.Audio(recording=True), state | |
else: | |
state.started_talking = False | |
state.recording = False | |
return gr.Audio(recording=False), state | |
def prompt_hugingface(prompt,llm_provider,model,type): | |
result = creator_prompt.generate( | |
input_text=prompt, | |
long_talk=True, | |
compress=True, | |
compression_level="hard", | |
poster=False, | |
prompt_type=type, # Use the updated prompt_type here | |
custom_base_prompt="", | |
provider=llm_provider, | |
model=model | |
) | |
output = result | |
return output | |
def resize(img:Image.Image): | |
height = (img.height // 8) * 8 | |
width = (img.width // 8) * 8 | |
imgre = img.resize((width,height)) | |
return imgre | |
loaded = "" | |
steps = 50 | |
def update_model_choices(provider): | |
provider_models = { | |
"Hugging Face": [ | |
"Qwen/Qwen2.5-72B-Instruct", | |
"meta-llama/Meta-Llama-3.1-70B-Instruct", | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"mistralai/Mistral-7B-Instruct-v0.3" | |
], | |
"SambaNova": [ | |
"Meta-Llama-3.1-70B-Instruct", | |
"Meta-Llama-3.1-405B-Instruct", | |
"Meta-Llama-3.1-8B-Instruct" | |
], | |
} | |
models = provider_models.get(provider, []) | |
return gr.Dropdown(choices=models, value=models[0] if models else "") | |
title = "Chat tiếng việt by tuphamkts" | |
description = "Muốn vẽ nói: Chuyển sang chế độ vẽ. Muốn chat nói: Chuyển sang chế độ nói. Chế độ gõ: Tôi muốn dùng bàn phím, chế độ nói: Tôi muốn nói chuyện. Ghi chú: Chỉ dừng chương trình khi tôi đang nói (lịch sử chat sẽ bị xóa khi dừng chương trình)." | |
examples = ["Chuyển sang chế độ vẽ","Chuyển sang chế độ nói"] | |
with gr.Blocks(title=title) as demo: | |
gr.HTML(f"<div style='text-align: center;'><h1>{title}</h1><p>{description}</p></div>") | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Column(visible=False) as prompt_visible: | |
with gr.Row(): | |
llm_provider = gr.Dropdown(choices=["Hugging Face", "SambaNova"], label="Nguồn model", value="Hugging Face") | |
model = gr.Dropdown(label="Chọn Model", choices=["Qwen/Qwen2.5-72B-Instruct","meta-llama/Meta-Llama-3.1-70B-Instruct","mistralai/Mixtral-8x7B-Instruct-v0.1","mistralai/Mistral-7B-Instruct-v0.3"], value="Qwen/Qwen2.5-72B-Instruct") | |
prompt_types = ["Long", "Short", "Medium", "OnlyObjects", "NoFigure", "Landscape", "Fantasy"] | |
prompt_type = gr.Dropdown(choices=prompt_types, label="Phong cách", value="Medium", interactive=True) | |
input_prompt = gr.Textbox(label="Nhập nội dung muốn vẽ",value="Một cô gái", type="text"), | |
generate_prompt = gr.Button("Tạo Prompt", variant="stop") | |
with gr.Column(visible=False) as typing_visible: | |
input_text = gr.Textbox(label="Nhập nội dung trao đổi", type="text"), | |
submit = gr.Button("Áp dụng", variant="stop") | |
input_audio = gr.Audio(label="Nói cho tôi nghe nào", sources="microphone", type="numpy") | |
output_audio = gr.Audio(label="Trợ lý", autoplay=True, sources=None,type="numpy") | |
input_image = gr.Image(label="Hình ảnh của bạn", sources=["upload","clipboard","webcam"], type="pil",visible=True) | |
with gr.Column(visible=False) as image_visible: | |
output_image = gr.Image(label="Hình ảnh sau xử lý", sources=None, type="pil",visible=True) | |
with gr.Column(visible=True) as chatbot_visible: | |
chatbot = gr.Chatbot(label="Nội dung trò chuyện",type="messages") | |
state = gr.State(value=AppState()) | |
#state = gr.State(value=AppState(typing=True, painting=True)) | |
startrecord = input_audio.start_recording( | |
start_recording_user, | |
[state], | |
[input_audio, state], | |
) | |
stream = input_audio.stream( | |
process_audio, | |
[input_audio, state,input_image], | |
[state,input_audio], | |
stream_every=1, | |
time_limit=30, | |
) | |
respond = input_audio.stop_recording( | |
fn=response_audio, | |
inputs=[state], | |
outputs=[state, output_audio], | |
) | |
respond.then(lambda s: s.conversation, [state], [chatbot]) | |
respond.then(lambda s: s.image_out, [state], [output_image]) | |
restart = output_audio.stop( | |
restart_recording, | |
[state], | |
[input_audio, state], | |
) | |
restart.then(lambda s: gr.update(visible= not s.typing, recording = not s.typing), [state], [input_audio]) | |
restart.then(lambda s: gr.update(visible=s.typing), [state], [typing_visible]) | |
restart.then(lambda s: gr.update(visible=s.painting), [state], [image_visible]) | |
restart.then(lambda s: gr.update(visible=(s.painting and s.typing) if s.painting==True else False), [state], [prompt_visible]) | |
restart.then(lambda s: gr.update(visible= not s.painting), [state], [chatbot_visible]) | |
cancel = gr.Button("Dừng chương trình", variant="stop", interactive=False) | |
stream.then(lambda s: gr.update(interactive= not s.stopped), [state], [cancel]) | |
cancel.click( | |
lambda: (AppState(stopped=True, recording=False, started_talking = False), gr.Audio(recording=False), gr.update(interactive=False)), | |
None,[state, input_audio, cancel], | |
cancels=[respond, stream, startrecord] # Thêm startrecord và stream vào cancels | |
) | |
sub = submit.click( | |
fn=response_text, | |
inputs=[state, input_text[0], input_image, input_prompt[0]], | |
outputs=[state, output_audio] | |
) | |
sub.then(lambda s: s.conversation, [state], [chatbot]) | |
sub.then(lambda s: s.image_out, [state], [output_image]) | |
generator = generate_prompt.click( | |
fn=prompt_hugingface, | |
inputs=[input_prompt[0],llm_provider,model,prompt_type], | |
outputs=[input_text[0]] | |
) | |
llm_provider.change( | |
update_model_choices, | |
inputs=[llm_provider], | |
outputs=[model] | |
) | |
gr.Examples( | |
examples=examples, | |
inputs=input_text, | |
) | |
if __name__ == "__main__": | |
demo.launch(server_name="0.0.0.0", server_port=7860, share=False) |