File size: 6,470 Bytes
e0d9c8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import os
import csv
from services import audio
import random
import pinecone
import gradio as gr
from openai import OpenAI


OPENAI_CLIENT = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
OPENAI_MODEL = os.getenv("OPENAI_MODEL")
pinecone.init(api_key=os.getenv("PINECONE_API_KEY"), environment=os.getenv("PINECONE_ENV"))
INDEX = pinecone.Index(os.getenv("PINECONE_INDEX"))


def start_chat(client_name: str) -> tuple[list[list[str | None]], gr.helpers, gr.helpers]:
    """
    Initialize chat with greeting text and audio in spanish
    :param client_name: name of the client
    :return: (chat history with greeting, audio with updated file and gradio update with visible=True)
    """
    # Get greeting text and audio, the first one available in spanish
    with open(f'assets/{client_name}/greetings/es.csv', mode='r', encoding='utf-8') as infile:
        reader = csv.reader(infile)
        greeting = next(reader)[0]
    audio_name = f'assets/{client_name}/media/audio/greeting_es_0.wav'

    # Initialize chat
    chat_history = [['', greeting]]

    return chat_history, gr.update(value=f'{audio_name}'), gr.update(visible=True)


def get_random_data(client_name: str) -> gr.helpers:
    """
    Returns an audio with a random data in spanish
    :param client_name: name of the client for this chatbot
    :return: gradio audio updated with a random data from the client
    """
    random_options = []
    path_audios = f'assets/{client_name}/media/audio'
    for random_audio in os.listdir(path_audios):
        if random_audio.startswith('random') and 'es' in random_audio:
            random_options.append(os.path.join(path_audios, random_audio))

    # Get any of the found random files
    num = random.randint(0, len(random_options) - 1)
    return gr.update(value=random_options[num])


def get_answer(
        chat_history: list[tuple[str, str]], user_input: str, client_name: str, general_prompt: str, context_prompt: str
) -> tuple[list[tuple[str, str]], str, gr.helpers]:
    """
    Gets the answer from the chatbot and returns it as an audio and text
    :param chat_history: previous chat history
    :param user_input: user question
    :param client_name: name of the client
    :param general_prompt: prompt used for answering the questions
    :param context_prompt: prompt used for finding the context in the vectorstore
    :return:
    """
    # Format chat history to OpenAI format msg history
    msg_history = [{'role': 'system', 'content': general_prompt}]
    for i, (user, bot) in enumerate(chat_history):
        if i == 0:
            msg_history.append({'role': 'assistant', 'content': bot})
        else:
            msg_history.append({'role': 'user', 'content': user})
            msg_history.append({'role': 'assistant', 'content': bot})

    # Get standalone question
    standalone_question = _get_standalone_question(user_input, msg_history, context_prompt)

    # Get context
    context = _get_context(standalone_question, client_name)

    # Get answer from chatbot
    response = _get_response(context, msg_history, user_input, general_prompt)

    # Get audio:
    audio.get_audio(response, 'es')

    # Update chat_history
    chat_history.append((user_input, response))

    return chat_history, "", gr.update(value='output.wav')


def _get_response(context: str, message_history: list[dict], question: str, prompt: str) -> str:
    """
    Gets the response from ChatGPT
    :param context: text obtained from the vectorstore
    :param message_history: chat history in the format used by OpenAI
    :param question: user question
    :param prompt: prompt used to answer the questions
    :return: response from ChatGPT
    """
    message_history[0]['content'] = prompt.replace('CONTEXT', context)
    message_history.append({'role': 'user', 'content': question})
    return _call_api(message_history, 0.7)


def _get_embedding(text: str) -> list[float]:
    """
    Gets the embedding of a given text
    :param text: input text
    :return: embedding of the text
    """
    response = OPENAI_CLIENT.embeddings.create(
        input=text,
        model='text-embedding-ada-002'
    )
    return response.data[0].embedding


def _call_api(message_history: list[dict], temperature: float) -> str:
    """
    Gets response form OpenAI API
    :param message_history: chat history in the format used by OpenAI
    :param temperature: randomness of the output
    :return: ChatGPT answer
    """
    response = OPENAI_CLIENT.chat.completions.create(
        model=OPENAI_MODEL,
        temperature=temperature,
        messages=message_history
    )
    return response.choices[0].message.content


def _get_standalone_question(question: str, message_history: list[dict], prompt_q: str) -> str:
    """
    Gets a standalone question/phrase based on the user's question and the previous messages. Used since
    some questions are too simple like "yes, please"
    :param question: user question
    :param message_history: msg history in the format used by OpenAI
    :param prompt_q: prompt used to get a text that will be used in the vectorstore
    :return: string with the standalone phrase
    """
    # Format the message history like: Human: blablablá \nAssistant: blablablá
    history = ''
    for i, msg in enumerate(message_history):
        if i == 0:
            continue  # Omit the prompt
        if i % 2 == 0:
            history += f'Human: {msg["content"]}\n'
        else:
            history += f'Assistant: {msg["content"]}\n'

    # Add history and question to the prompt and call chatgpt
    prompt = [{'role': 'system', 'content': ''}]
    content = prompt_q.replace('HISTORY', history).replace('QUESTION', question)
    prompt[0]['content'] = content

    return _call_api(prompt, 0.01)


def _get_context(question: str, client_name: str) -> str:
    """
    Gets the 10 nearest vectors to the given question
    :param question: standalone text
    :param client_name: name of the client, used as namespace in the vectorstore
    :return: formatted text with the nearest vectors
    """
    q_embedding = _get_embedding(question)

    # Get most similar vectors
    result = INDEX.query(
        vector=q_embedding,
        top_k=10,
        include_metadata=True,
        namespace=f'{client_name}-context'
    )['matches']

    # Crete a string based on the text of each vector
    context = ''
    for r in result:
        context += r['metadata']['Text'] + '\n'
    return context