|
|
|
""" |
|
Credit to Derek Thomas, derek@huggingface.co |
|
""" |
|
|
|
import subprocess |
|
|
|
|
|
|
|
import logging |
|
from pathlib import Path |
|
from time import perf_counter |
|
|
|
import gradio as gr |
|
from jinja2 import Environment, FileSystemLoader |
|
import numpy as np |
|
from sentence_transformers import CrossEncoder |
|
|
|
from backend.query_llm import generate_hf, generate_openai,generate_gemini |
|
from backend.semantic_search import table, retriever |
|
|
|
VECTOR_COLUMN_NAME = "vector" |
|
TEXT_COLUMN_NAME = "text" |
|
|
|
proj_dir = Path(__file__).parent |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) |
|
|
|
|
|
template = env.get_template('template.j2') |
|
template_html = env.get_template('template_html.j2') |
|
|
|
|
|
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') |
|
|
|
|
|
examples = ['My transhipment cargo is missing', |
|
'What are benefits of the AEO Scheme and eligibility criteria?', |
|
'What are penalties for customs offences? ', 'what are penalties to customs officers misusing their powers under customs act?','What are eligibility criteria for exemption from cost recovery charges','list in detail what is procedure for obtaining new approval for openeing a CFS attached to an ICD'] |
|
|
|
|
|
def add_text(history, text): |
|
history = [] if history is None else history |
|
history = history + [(text, None)] |
|
print('add_text function done..returning history' ,history) |
|
return history, gr.Textbox(value="", interactive=False) |
|
|
|
|
|
def bot(history, api_kind): |
|
top_rerank = 15 |
|
top_k_rank = 8 |
|
query = history[-1][0] |
|
print('history[-1][0]',history[-1][0]) |
|
print('api kind ',api_kind) |
|
|
|
if not query: |
|
gr.Warning("Please submit a non-empty string as a prompt") |
|
raise ValueError("Empty string was submitted") |
|
|
|
logger.warning('Retrieving documents...') |
|
|
|
document_start = perf_counter() |
|
|
|
query_vec = retriever.encode(query) |
|
print(query) |
|
query_vec_flat = [arr.flatten() for arr in query_vec] |
|
logger.warning(f'Finished query vec') |
|
|
|
|
|
|
|
|
|
logger.warning(f'Finished search') |
|
documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_rerank).to_list() |
|
documents = [doc[TEXT_COLUMN_NAME] for doc in documents] |
|
logger.warning(f'start cross encoder {len(documents)}') |
|
|
|
query_doc_pair = [[query, doc] for doc in documents] |
|
cross_scores = cross_encoder.predict(query_doc_pair) |
|
sim_scores_argsort = list(reversed(np.argsort(cross_scores))) |
|
logger.warning(f'Finished cross encoder {len(documents)}') |
|
|
|
documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]] |
|
logger.warning(f'num documents {len(documents)}') |
|
|
|
document_time = perf_counter() - document_start |
|
logger.warning(f'Finished Retrieving documents in {round(document_time, 2)} seconds...') |
|
|
|
|
|
prompt = template.render(documents=documents, query=query) |
|
prompt_html = template_html.render(documents=documents, query=query) |
|
|
|
if api_kind == "HuggingFace": |
|
generate_fn = generate_hf |
|
elif api_kind == "Gemini": |
|
print("Gemini condition satisfied") |
|
generate_fn = generate_gemini |
|
elif api_kind is None: |
|
gr.Warning("API name was not provided") |
|
raise ValueError("API name was not provided") |
|
else: |
|
gr.Warning(f"API {api_kind} is not supported") |
|
raise ValueError(f"API {api_kind} is not supported") |
|
try: |
|
count_tokens = lambda text: len([token.strip() for token in text.split() if token.strip()]) |
|
print(prompt_html,'token count is',count_tokens(prompt_html)) |
|
history[-1][1] = "" |
|
for character in generate_fn(prompt, history[:-1]): |
|
history[-1][1] = character |
|
yield history, prompt_html |
|
print('final history is ',history) |
|
|
|
|
|
except Exception as e: |
|
print('An unexpected error occurred during generation:', str(e)) |
|
yield f"An unexpected error occurred during generation: {str(e)}" |
|
|
|
with gr.Blocks(theme='WeixuanYuan/Soft_dark') as demo: |
|
|
|
gr.HTML(value=""" |
|
<div style="display: flex; align-items: center; justify-content: space-between;"> |
|
<h1 style="color: #008000">ADWITIYA - <span style="color: #008000">Customs Manual Chatbot</span></h1> |
|
<img src='logo.png' alt="Chatbot" width="50" height="50" /> |
|
</div> |
|
""", elem_id="heading") |
|
|
|
|
|
gr.HTML(value="""<p style="font-family: sans-serif; font-size: 16px;">A free chat bot developed by National Customs Targeting Center using Open source LLMs.(Dedicated to 75th Batch IRS Probationers)</p>""", elem_id="description") |
|
|
|
chatbot = gr.Chatbot( |
|
[], |
|
elem_id="chatbot", |
|
avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', |
|
'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), |
|
bubble_full_width=False, |
|
show_copy_button=True, |
|
show_share_button=True, |
|
) |
|
|
|
with gr.Row(): |
|
txt = gr.Textbox( |
|
scale=3, |
|
show_label=False, |
|
placeholder="Enter text and press enter", |
|
container=False, |
|
) |
|
txt_btn = gr.Button(value="Submit text", scale=1) |
|
|
|
api_kind = gr.Radio(choices=["HuggingFace","Gemini"], value="HuggingFace") |
|
|
|
|
|
prompt_html = gr.Textbox(label='Retrieved Documents') |
|
try: |
|
|
|
txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( |
|
bot, [chatbot, api_kind], [chatbot, prompt_html]) |
|
except Exception as e: |
|
print ('Exception txt btn click ' ,str(e)) |
|
|
|
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) |
|
try: |
|
|
|
txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( |
|
bot, [chatbot, api_kind], [chatbot, prompt_html]) |
|
except Exception as e: |
|
print ('Exception ' ,str(e)) |
|
|
|
|
|
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) |
|
|
|
|
|
gr.Examples(examples, txt) |
|
|
|
demo.queue() |
|
demo.launch(debug=True) |
|
|