Spaces:
Runtime error
Runtime error
#from ctransformers import AutoModelForCausalLM | |
import re, requests, json | |
import gradio as gr | |
import random | |
import torch | |
from itertools import chain | |
import asyncio | |
from llama_cpp import Llama | |
import datetime | |
from transformers import ( | |
StoppingCriteriaList, | |
MaxLengthCriteria, | |
) | |
# Created by | |
# https://huggingface.co/gorkemgoknar | |
#Coqui V1 api render voice, you can also use XTTS | |
#COQUI_URL="https://app.coqui.ai/api/v2/samples" | |
COQUI_URL="https://app.coqui.ai/api/v2/samples/multilingual/render/" | |
### Warning each sample will consume your credits | |
COQUI_TOKEN=os.environ.get("COQUI_TOKEN") | |
PER_RUN_MAX_VOICE=int( os.environ.get("PER_RUN_MAX_VOICE") ) | |
PER_RUN_COUNTER=0 | |
RUN_START_HOUR=datetime.datetime.now().hour | |
MAX_NEW_TOKENS = 30 | |
GPU_LAYERS = 0 | |
STOP_LIST=["###","##"] | |
LLAMA_VERBOSE=False | |
#stopping_criteria = StoppingCriteriaList([MaxLengthCriteria(max_length=64)]) | |
from huggingface_hub import hf_hub_download | |
hf_hub_download(repo_id="gorkemgoknar/llama2-7f-moviechatbot-ggml-q4", local_dir=".", filename="llama2-7f-fp16-ggml-q4.bin") | |
model_path="./llama2-7f-fp16-ggml-q4.bin" | |
import langid | |
llm = Llama(model_path=model_path,n_gpu_layers=0, n_ctx=256,n_batch=256,verbose=LLAMA_VERBOSE) | |
# to use with ctransfomers | |
#llm = AutoModelForCausalLM.from_pretrained("gorkemgoknar/llama2-7f-moviechatbot-ggml-q4", | |
# model_type='llama', | |
# gpu_layers=GPU_LAYERS, | |
# max_new_tokens=MAX_NEW_TOKENS, | |
# stop=STOP_LIST) | |
########################################## | |
#You can use coqui.ai api to generate audio | |
#first you need to create clone voice for characters | |
voices = {} | |
voices["Gerald"]=os.environ.get("VOICE_ID_GERALD") | |
voices["Vader"]=os.environ.get("VOICE_ID_VADER") | |
voices["Batman"]=os.environ.get("VOICE_ID_BATMAN") | |
voices["Gandalf"]=os.environ.get("VOICE_ID_GANDALF") | |
voices["Morpheus"]=os.environ.get("VOICE_ID_MORPHEUS") | |
voices["Neo"]=os.environ.get("VOICE_ID_NEO") | |
voices["Ig-11"]=os.environ.get("VOICE_ID_IG11") | |
voices["Tony Stark"]=os.environ.get("VOICE_ID_TONY") | |
voices["Kirk"]=os.environ.get("VOICE_ID_KIRK") | |
voices["Spock"]=os.environ.get("VOICE_ID_SPOCK") | |
voices["Don"]=os.environ.get("VOICE_ID_DON") | |
voices["Morgan"]=os.environ.get("VOICE_ID_MORGAN") | |
voices["Yoda"]=os.environ.get("VOICE_ID_YODA") | |
voices["Ian"]=os.environ.get("VOICE_ID_IAN") | |
voices["Thanos"]=os.environ.get("VOICE_ID_THANOS") | |
def get_audio_url(text,character): | |
url = COQUI_URL | |
text_language=langid.classify(text)[0] | |
supported_languages=["en","de","fr","es","it","pt","pl"] | |
if text_language not in supported_languages: | |
text_language="en" | |
# voice id of "Baldur Sanjin" from buildin coqui.ai speakers | |
# more via https://docs.coqui.ai/reference/speakers_retrieve | |
payload = { | |
"voice_id": voices[character], ## Voice id in form of (this is dummy) "a399c204-7040-4f1d-bb92-5223fa1aeceb" | |
"text": f"{text}", | |
"emotion": "Neutral", ## You can set Angry, Surprise etc on V1 api.. XTTS auto understands it | |
"speed": 1, | |
"language": text_language | |
} | |
headers = { | |
"accept": "application/json", | |
"content-type": "application/json", | |
"authorization": f"Bearer {COQUI_TOKEN}" | |
} | |
response = requests.post(url, json=payload, headers=headers) | |
res = json.loads(response.text) | |
print("Character:",character, "text:",text,) | |
print("Audio response",res) | |
return res["audio_url"] | |
def get_response_cpp(prompt): | |
output = llm(prompt, max_tokens=32, stop=["#","sierpeda"], echo=True) | |
#print(output) | |
response_text= output["choices"][0]["text"] | |
return response_text | |
def build_question(character,question,context=None, answer=None,history=None , use_history=False, modify_history=True,human_character=None,add_answer_to_history=True): | |
# THIS MODEL (gorkemgoknar/llama2-7f-moviechatbot-ggml-q4) is specifically fined tuned by | |
# ### Context: {context}### History: {history}### {human_character}: {question}### {character}: {answer} | |
# Where History contains all previous lines talked by characters in order | |
# Context is actually arbitrary it shows something characters can start talking upon | |
if context is None: | |
context= "movie" | |
#if human_character is None: | |
# human_character="" | |
#else: | |
# human_character="#"+"I am " + human_character +"#" | |
if use_history: | |
if history is None: | |
if answer is None: | |
history="" | |
else: | |
history=answer | |
else: | |
if modify_history: | |
if answer is None: | |
history=history | |
else: | |
if add_answer_to_history: | |
history=history +"#" + answer | |
else: | |
history=history | |
else: | |
history=history | |
if human_character is None: | |
prompt = f"### Context: {context}### History: {history}### Human: {question}### {character}:" | |
else: | |
prompt = f"### Context: {context}### History: {history}### {human_character}: {question}### {character}:" | |
else: | |
if human_character is None: | |
prompt = f"### Context: {context}### Human: {question}### {character}:" | |
else: | |
prompt = f"### Context: {context}### {human_character}: {question}### {character}:" | |
return prompt,history | |
def get_answer_from_response(text,character): | |
# on HF it has same text plus additional | |
# on llama_cpp same full text | |
response= text.split(f"### {character}:")[1] | |
# on cpp it continues | |
# response= text | |
# get only first line of response | |
response= response.split("###")[0] | |
response= response.split("#")[0] | |
# Weirdly llama2 7f creates some German or Polski on the end... need to crop them | |
response= response.split("Unterscheidung")[0] # weird, german seperators on output | |
response= response.split("Hinweis")[0] # weird, german seperators on output | |
response= response.split("sierp ")[0] # weird, sierp | |
response= response.split("sierpni ")[0] # weird, sierp | |
response= response.split("sierpien")[0] # weird, sierp | |
response= response.split("\n")[0] # cut at end of line | |
response= re.split("sierp.+\d+", response)[0] # comes as sierpina 2018 something something | |
response= re.split("styczen.+\d+", response)[0] # comes as styczen 2018 something something | |
response= re.split("kwierk.+\d+", response)[0] # comes as kwierk 2018 something something | |
response= response.split(":")[0] | |
if response.startswith('"'): | |
response= response[1:] | |
return response | |
def run_chatter(num_repeat=2, character="kirk",human_character="Mr. Sulu",context="Captain Kirk from U.S.S. Enterprise", | |
initial_question="There is a ship approaching captain!", | |
withaudio=False, | |
history=None, | |
add_answer_to_history=True, | |
answer=None, | |
debug_print=False, | |
use_cpu=False): | |
question=initial_question | |
dialogue="" | |
if debug_print: | |
print("**** START Dialogue ****") | |
print("Input History:",history) | |
audio_urls=[] | |
for i in range(num_repeat): | |
if question is not None: | |
question=question.strip() | |
if answer is not None: | |
answer=answer.strip() | |
prompt,history= build_question(character,question,context=context,history=history,answer=answer,human_character=human_character,use_history=True,add_answer_to_history=add_answer_to_history) | |
print("PROMPT:",prompt) | |
response= get_response_cpp(prompt) | |
print("RESPONSE:",response) | |
answer = get_answer_from_response(response,character).strip() | |
if withaudio: | |
answer_audio_url = get_audio_url(answer) | |
audio_urls.append(answer_audio_url) | |
if debug_print: | |
print("\nAct:",i+1) | |
dialogue = dialogue + f"{human_character}: {question}" + "\n" | |
if debug_print: | |
print(f"{human_character}:",question) | |
print(f"{character}:",answer) | |
dialogue = dialogue + f"{character}: {answer}" + "\n" | |
if question is not None: | |
question=question.strip() | |
if answer is not None: | |
answer=answer.strip() | |
prompt,history= build_question(human_character,answer,context=context,history=history,answer=question,human_character=character,use_history=True,add_answer_to_history=add_answer_to_history) | |
print("PROMPT:",prompt) | |
response= get_response_cpp(prompt) | |
print("RESPONSE:",response) | |
resp_answer = get_answer_from_response(response,human_character) | |
if withaudio: | |
# No use.. running on main | |
response_audio_url = get_audio_url(resp_answer) | |
audio_urls.append(response_audio_url) | |
if debug_print: | |
print(f"{human_character}:",resp_answer) | |
question = resp_answer | |
if debug_print: | |
print("Final History:",history) | |
print("**** END Dialogue ****") | |
if withaudio: | |
return dialogue,question,answer,history,audio_urls | |
else: | |
return dialogue,question,answer,history | |
###################### | |
# GRADIO PART | |
###################### | |
# to close on Jupyter remote | |
#if("interface" in vars()): | |
# print("Closing existing interface") | |
# interface.close() | |
css=""" | |
.chatbox {display:flex;flex-direction:column} | |
.user_msg, .resp_msg {padding:4px;margin-bottom:4px;border-radius:4px;width:80%} | |
.user_msg {background-color:cornflowerblue;color:white;align-self:start} | |
.resp_msg {background-color:lightgray;align-self:self-end} | |
.audio {background-color:cornflowerblue;color:white;align-self:start;height:5em} | |
""" | |
def get_per_run_voice_counter(increase=False): | |
hour_now = datetime.datetime.now().hour | |
global PER_RUN_COUNTER | |
print("Per run check: Hour now:", hour_now, " RUN_START_HOUR:",RUN_START_HOUR," PER_RUN_COUNTER",PER_RUN_COUNTER) | |
if hour_now>RUN_START_HOUR: | |
#reset hourly voice calls | |
print("resetting per run voice calls") | |
PER_RUN_COUNTER = 0 | |
elif increase: | |
PER_RUN_COUNTER = PER_RUN_COUNTER + 1 | |
print("per run voice calls:", PER_RUN_COUNTER) | |
print("Per run check: Hour now:", hour_now, " RUN_START_HOUR:",RUN_START_HOUR," PER_RUN_COUNTER",PER_RUN_COUNTER) | |
return PER_RUN_COUNTER | |
async def add_text(WITH_AUDIO,char1,char2,runs,context,initial_question,history,VOICE_COUNTER): | |
print(f"{char1} talks to {char2}") | |
history = None | |
last_question=None | |
# todo build a context from dropdown | |
returned_history = "" | |
unnamed_question="This weird guy did not input anything.. so, tell me a joke!" | |
if initial_question is None: | |
initial_question = unnamed_question | |
if initial_question=="": | |
initial_question = unnamed_question | |
for i in range(int(runs)): | |
print("char1:",char1," :", initial_question) | |
returned_history += char1 + " : " + initial_question + "\n" | |
dialogue,last_question,last_answer,history = run_chatter(num_repeat=1, | |
character=char2, | |
human_character=char1, | |
context=context, | |
initial_question=initial_question, | |
withaudio=False, | |
history=history, | |
answer=last_question, | |
debug_print=False, | |
add_answer_to_history=False | |
) | |
print("char2:",char2," :", last_answer) | |
returned_history += char2 + " : " + last_answer + "\n" | |
# add last answer to history | |
history = history + "#" +initial_question + "#"+ last_answer | |
print("WITH_AUDIO",WITH_AUDIO) | |
if int(WITH_AUDIO): | |
use_voice=True | |
else: | |
use_voice=False | |
print("Voice Counter:",VOICE_COUNTER) | |
if initial_question=="..." and last_answer=="...": | |
use_voice=False | |
global PER_RUN_MAX_VOICE | |
if use_voice: | |
global PER_RUN_MAX_VOICE | |
can_use_voice=get_per_run_voice_counter()<PER_RUN_MAX_VOICE | |
if not can_use_voice: | |
print("Voice limit reached for this hour, try again in an hour") | |
gr.Warning("Hourly overal voice limit reached, try again in an hour... running without voice.") | |
use_voice=False | |
if use_voice and (VOICE_COUNTER>VOICE_LIMIT): | |
print("You have reached voiced limit, try with voice later.. running without voice") | |
gr.Warning("You have reached voiced limit.. running without voice") | |
use_voice=False | |
if use_voice: | |
char1_audio_url= get_audio_url(initial_question,char1) | |
VOICE_COUNTER+=1 | |
get_per_run_voice_counter(increase=True) | |
char2_audio_url= get_audio_url(last_answer,char2) | |
VOICE_COUNTER+=1 | |
get_per_run_voice_counter(increase=True) | |
print("Voice Counter:",VOICE_COUNTER) | |
if use_voice: | |
audios = ( | |
gr.Audio.update() , | |
gr.Audio.update() , | |
gr.Audio.update() , | |
gr.Audio.update() , | |
gr.Audio.update() , | |
gr.Audio.update() , | |
gr.Audio.update() , | |
gr.Audio.update() | |
) | |
else: | |
audios = ( | |
gr.Audio.update(visible=False) , | |
gr.Audio.update(visible=False) , | |
gr.Audio.update(visible=False) , | |
gr.Audio.update(visible=False) , | |
gr.Audio.update(visible=False) , | |
gr.Audio.update(visible=False) , | |
gr.Audio.update(visible=False) , | |
gr.Audio.update(visible=False) | |
) | |
audios = list(audios) | |
#should now do a loop | |
if use_voice: | |
audios[i*2] = gr.Audio.update(char1_audio_url, visible=True,label=str(i*2 )+"_"+char1) | |
audios[i*2 + 1] = gr.Audio.update(char2_audio_url, visible=True,label=str(i*2 + 1)+"_"+char2) | |
audios = tuple(audios) | |
#This needs to be last before yield | |
initial_question=last_question | |
yield gr.update(value=initial_question, interactive=True),returned_history, *audios, VOICE_COUNTER | |
history=None | |
#some selected ones are in for demo use (there are more, get a copy and try it , just do not expect much with this fast finetuned model) | |
CHARACTER_1_CHOICES = ["Gandalf","Gerald", "Morpheus", "Neo","Kirk","Spock","Vader","Yoda","Ig-11","Tony Stark","Batman","Thanos"] | |
CHARACTER_2_CHOICES = ["Gandalf","Gerald", "Morpheus", "Neo","Kirk","Spock","Vader","Yoda","Ig-11","Tony Stark","Batman","Thanos"] | |
CONTEXT_CHOICES = ["talks friendly", | |
"insults", | |
"diss in rap", | |
"on a cruise ship going to Mars from Earth", | |
"blames on something", | |
"tries to save the world", | |
"talks agressively", | |
"argues over if a movie is good"] | |
EXAMPLE_INITIALS=["I challenge you to battle of words!", | |
"how much would a woodchuck chuck if a woodchuck could chuck wood?", | |
"The world is changing.", | |
"What do you think about AI?", | |
"I went to the supermarket yesterday.", | |
"Who are you?", | |
"I am richer than you!", | |
"Wie geht es dir?", | |
"O que você fez ontem?", | |
"Il fait trop chaud aujourd'hui."] | |
VOICE_CHOICES=["With Coqui.ai Voice", | |
"No voice"] | |
RUN_COUNT = [2,3,4] | |
title = "Metayazar - Movie Chatbot Llama Finetuned Voice powered by Coqui.ai" | |
description = "Auto-chat your favorite movie characters. Voice via Coqui.ai" | |
article = "<p style='text-align: center'><a href='https://www.linkedin.com/pulse/ai-goes-job-interview-g%C3%B6rkem-g%C3%B6knar/' target='_blank'>AI Goes to Job Interview</a> | <a href='https://www.metayazar.com/' target='_blank'>Metayazar AI Writer</a> |<a href='https://www.linkedin.com/in/goknar/' target='_blank'>Görkem Göknar</a></p>" | |
def change_run_count(run_count): | |
print("update run count:",run_count) | |
visible_audios=[False,False,False,False,False,False,False,False] | |
run_count=int(run_count) | |
for i in range(run_count*2-1): | |
if i>=len(visible_audios): | |
break | |
visible_audios[i] = False # Set true to become visible upon change | |
return_list=[] | |
#Max audio 8 | |
for i in range(8): | |
return_list.append( gr.Audio.update( visible=visible_audios[i]) ) | |
return return_list | |
def switch_voice(with_voice, WITH_AUDIO,VOICE_COUNTER): | |
print("update use voice:",with_voice) | |
if (VOICE_COUNTER>VOICE_LIMIT) or (PER_RUN_COUNTER>PER_RUN_MAX_VOICE): | |
gr.Warning("Unfortunately voice limit is reached, try again after another time, or use without voice") | |
WITH_AUDIO=0 | |
else: | |
if with_voice==VOICE_CHOICES[0]: | |
WITH_AUDIO=1 | |
else: | |
WITH_AUDIO=0 | |
return with_voice, WITH_AUDIO | |
with gr.Blocks(css=css) as interface: | |
VOICE_COUNTER=gr.State(value=0) | |
WITH_AUDIO=gr.State(value=1) | |
VOICE_LIMIT=os.environ.get("VOICE_LIMIT") | |
with gr.Row(): | |
drop_char1 = gr.components.Dropdown(CHARACTER_1_CHOICES,label="Character 1",value=CHARACTER_1_CHOICES[0]) | |
drop_char2 = gr.components.Dropdown(CHARACTER_2_CHOICES,label="Character 2",value=CHARACTER_2_CHOICES[1]) | |
run_count = gr.components.Dropdown(RUN_COUNT,label="Line count per character",value="2") | |
context_choice = gr.components.Dropdown(CONTEXT_CHOICES, label="Context",value=CONTEXT_CHOICES[0]) | |
with gr.Row(): | |
with_voice = gr.components.Dropdown(VOICE_CHOICES,label="Voice via Coqui.ai (demo)",value=VOICE_CHOICES[0]) | |
with gr.Row(): | |
txt = gr.Textbox( | |
show_label=False, | |
placeholder="Enter text and press enter, or upload an image", | |
value=EXAMPLE_INITIALS[0],elem_classes="user_msg" | |
) | |
submit_btn = gr.Button(value="Submit") | |
examples = gr.Examples(examples=EXAMPLE_INITIALS, | |
inputs=[txt]) | |
with gr.Row(): | |
with gr.Column(): | |
history = gr.Textbox(lines=25, | |
show_label=True, | |
label="History", | |
placeholder="History", | |
).style(height=50) | |
with gr.Column(): | |
audio1 = gr.Audio(elem_id="audio1",elem_classes="audio",autoplay=False,visible=False) | |
audio2 = gr.Audio(elem_id="audio2",elem_classes="audio",autoplay=False,visible=False) | |
audio3 = gr.Audio(elem_id="audio3",elem_classes="audio",autoplay=False,visible=False) | |
audio4 = gr.Audio(elem_id="audio4",elem_classes="audio",autoplay=False,visible=False) | |
audio5 = gr.Audio(elem_id="audio5",elem_classes="audio",autoplay=False,visible=False) | |
audio6 = gr.Audio(elem_id="audio6",elem_classes="audio",autoplay=False,visible=False) | |
audio7 = gr.Audio(elem_id="audio7",elem_classes="audio",autoplay=False,visible=False) | |
audio8 = gr.Audio(elem_id="audio8",elem_classes="audio",autoplay=False,visible=False) | |
with_voice.change(switch_voice,[with_voice,WITH_AUDIO,VOICE_COUNTER],[with_voice,WITH_AUDIO]) | |
run_count.change(change_run_count,[run_count],[audio1,audio2,audio3,audio4,audio5,audio6,audio7,audio8]) | |
submit_btn.click(add_text, [WITH_AUDIO,drop_char1, drop_char2,run_count, context_choice, txt,history,VOICE_COUNTER], [txt,history,audio1,audio2,audio3,audio4,audio5,audio6,audio7,audio8,VOICE_COUNTER], api_name="chat") | |
interface.queue().launch() | |