Spaces:
Sleeping
Sleeping
""" | |
Credit to Derek Thomas, derek@huggingface.co | |
""" | |
import subprocess | |
# subprocess.run(["pip", "install", "--upgrade", "transformers[torch,sentencepiece]==4.34.1"]) | |
import logging | |
from pathlib import Path | |
from time import perf_counter | |
import gradio as gr | |
from jinja2 import Environment, FileSystemLoader | |
import numpy as np | |
from sentence_transformers import CrossEncoder | |
from backend.query_llm import generate_hf, generate_openai,generate_gemini | |
from backend.semantic_search import table, retriever | |
VECTOR_COLUMN_NAME = "vector" | |
TEXT_COLUMN_NAME = "text" | |
proj_dir = Path(__file__).parent | |
# Setting up the logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Set up the template environment with the templates directory | |
env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) | |
# Load the templates directly from the environment | |
template = env.get_template('template.j2') | |
template_html = env.get_template('template_html.j2') | |
# crossEncoder | |
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
#cross_encoder = CrossEncoder('BAAI/bge-reranker-base') | |
# Examples | |
examples = ['My transhipment cargo is missing', | |
'What are benefits of the AEO Scheme and eligibility criteria?', | |
'What are penalties for customs offences? ', 'what are penalties to customs officers misusing their powers under customs act?','What are eligibility criteria for exemption from cost recovery charges','list in detail what is procedure for obtaining new approval for openeing a CFS attached to an ICD'] | |
def add_text(history, text): | |
history = [] if history is None else history | |
history = history + [(text, None)] | |
print('add_text function done..returning history' ,history) | |
return history, gr.Textbox(value="", interactive=False) | |
def bot(history, api_kind): | |
top_rerank = 15 | |
top_k_rank = 10 | |
query = history[-1][0] | |
print('history[-1][0]',history[-1][0]) | |
print('api kind ',api_kind) | |
if not query: | |
gr.Warning("Please submit a non-empty string as a prompt") | |
raise ValueError("Empty string was submitted") | |
logger.warning('Retrieving documents...') | |
# Retrieve documents relevant to query | |
document_start = perf_counter() | |
query_vec = retriever.encode(query) | |
print(query) | |
query_vec_flat = [arr.flatten() for arr in query_vec] | |
logger.warning(f'Finished query vec') | |
#documents = table.search(query_vec_flat, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank).to_list() | |
logger.warning(f'Finished search') | |
documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_rerank).to_list() | |
documents = [doc[TEXT_COLUMN_NAME] for doc in documents] | |
logger.warning(f'start cross encoder {len(documents)}') | |
# Retrieve documents relevant to query | |
query_doc_pair = [[query, doc] for doc in documents] | |
cross_scores = cross_encoder.predict(query_doc_pair) | |
sim_scores_argsort = list(reversed(np.argsort(cross_scores))) | |
logger.warning(f'Finished cross encoder {len(documents)}') | |
documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]] | |
logger.warning(f'num documents {len(documents)}') | |
document_time = perf_counter() - document_start | |
logger.warning(f'Finished Retrieving documents in {round(document_time, 2)} seconds...') | |
# Create Prompt | |
prompt = template.render(documents=documents, query=query) | |
prompt_html = template_html.render(documents=documents, query=query) | |
if api_kind == "HuggingFace": | |
generate_fn = generate_hf | |
elif api_kind == "Gemini": | |
print("Gemini condition satisfied") | |
generate_fn = generate_gemini | |
elif api_kind is None: | |
gr.Warning("API name was not provided") | |
raise ValueError("API name was not provided") | |
else: | |
gr.Warning(f"API {api_kind} is not supported") | |
raise ValueError(f"API {api_kind} is not supported") | |
try: | |
count_tokens = lambda text: len([token.strip() for token in text.split() if token.strip()]) | |
print(prompt_html,'token count is',count_tokens(prompt_html)) | |
history[-1][1] = "" | |
for character in generate_fn(prompt, history[:-1]): | |
history[-1][1] = character | |
yield history, prompt_html | |
print('final history is ',history) | |
# return history[-1][1], prompt_html | |
except Exception as e: # Catch any exception | |
print('An unexpected error occurred during generation:', str(e)) | |
yield f"An unexpected error occurred during generation: {str(e)}" | |
with gr.Blocks(theme='WeixuanYuan/Soft_dark') as demo: | |
# Beautiful heading with logo | |
gr.HTML(value=""" | |
<div style="display: flex; align-items: center; justify-content: space-between;"> | |
<h1 style="color: #008000">ADWITIYA - <span style="color: #008000">Customs Manual Chatbot(UNDER MAINTENANCE..WILL BE BACK SOON)</span></h1> | |
<img src='logo.png' alt="Chatbot" width="50" height="50" /> | |
</div> | |
""", elem_id="heading") | |
# Formatted description | |
gr.HTML(value="""<p style="font-family: sans-serif; font-size: 16px;">A free chat bot developed by National Customs Targeting Center using Open source LLMs.(Dedicated to 75th Batch IRS Probationers)</p>""", elem_id="description") | |
chatbot = gr.Chatbot( | |
[], | |
elem_id="chatbot", | |
avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', | |
'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), | |
bubble_full_width=False, | |
show_copy_button=True, | |
show_share_button=True, | |
) | |
with gr.Row(): | |
txt = gr.Textbox( | |
scale=3, | |
show_label=False, | |
placeholder="Enter text and press enter", | |
container=False, | |
) | |
txt_btn = gr.Button(value="Submit text", scale=1) | |
api_kind = gr.Radio(choices=["HuggingFace","Gemini"], value="HuggingFace") | |
#prompt_html = gr.HTML() | |
prompt_html = gr.Textbox() | |
try: | |
# Turn off interactivity while generating if you click | |
txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( | |
bot, [chatbot, api_kind], [chatbot, prompt_html]) | |
except Exception as e: | |
print ('Exception txt btn click ' ,str(e)) | |
# Turn it back on | |
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) | |
try: | |
# Turn off interactivity while generating if you hit enter | |
txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( | |
bot, [chatbot, api_kind], [chatbot, prompt_html]) | |
except Exception as e: | |
print ('Exception ' ,str(e)) | |
# Turn it back on | |
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) | |
# Examples | |
gr.Examples(examples, txt) | |
demo.queue() | |
demo.launch(debug=True) | |