""" Credit to Derek Thomas, derek@huggingface.co """ import subprocess # subprocess.run(["pip", "install", "--upgrade", "transformers[torch,sentencepiece]==4.34.1"]) import logging from pathlib import Path from time import perf_counter import gradio as gr from jinja2 import Environment, FileSystemLoader import numpy as np from sentence_transformers import CrossEncoder from backend.query_llm import generate_hf, generate_openai,generate_gemini from backend.semantic_search import table, retriever VECTOR_COLUMN_NAME = "vector" TEXT_COLUMN_NAME = "text" proj_dir = Path(__file__).parent # Setting up the logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Set up the template environment with the templates directory env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) # Load the templates directly from the environment template = env.get_template('template.j2') template_html = env.get_template('template_html.j2') # crossEncoder cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') #cross_encoder = CrossEncoder('BAAI/bge-reranker-base') # Examples examples = ['My transhipment cargo is missing','can u explain and tabulate difference between b 17 bond and a warehousing bond', 'What are benefits of the AEO Scheme and eligibility criteria?', 'What are penalties for customs offences? ', 'what are penalties to customs officers misusing their powers under customs act?','What are eligibility criteria for exemption from cost recovery charges','list in detail what is procedure for obtaining new approval for openeing a CFS attached to an ICD'] def add_text(history, text): history = [] if history is None else history history = history + [(text, None)] print('add_text function done..returning history' ,history) return history, gr.Textbox(value="", interactive=False) def bot(history, api_kind): top_rerank = 15 top_k_rank = 8 query = history[-1][0] print('history[-1][0]',history[-1][0]) print('api kind ',api_kind) if not query: gr.Warning("Please submit a non-empty string as a prompt") raise ValueError("Empty string was submitted") logger.warning('Retrieving documents...') # Retrieve documents relevant to query document_start = perf_counter() query_vec = retriever.encode(query) print(query) query_vec_flat = [arr.flatten() for arr in query_vec] logger.warning(f'Finished query vec') #documents = table.search(query_vec_flat, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank).to_list() logger.warning(f'Finished search') documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_rerank).to_list() documents = [doc[TEXT_COLUMN_NAME] for doc in documents] logger.warning(f'start cross encoder {len(documents)}') # Retrieve documents relevant to query query_doc_pair = [[query, doc] for doc in documents] cross_scores = cross_encoder.predict(query_doc_pair) sim_scores_argsort = list(reversed(np.argsort(cross_scores))) logger.warning(f'Finished cross encoder {len(documents)}') documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]] logger.warning(f'num documents {len(documents)}') document_time = perf_counter() - document_start logger.warning(f'Finished Retrieving documents in {round(document_time, 2)} seconds...') # Create Prompt prompt = template.render(documents=documents, query=query) prompt_html = template_html.render(documents=documents, query=query) if api_kind == "HuggingFace": generate_fn = generate_hf elif api_kind == "Gemini": print("Gemini condition satisfied") generate_fn = generate_gemini elif api_kind is None: gr.Warning("API name was not provided") raise ValueError("API name was not provided") else: gr.Warning(f"API {api_kind} is not supported") raise ValueError(f"API {api_kind} is not supported") try: count_tokens = lambda text: len([token.strip() for token in text.split() if token.strip()]) print(prompt_html,'token count is',count_tokens(prompt_html)) history[-1][1] = "" for character in generate_fn(prompt, history[:-1]): history[-1][1] = character yield history, prompt_html print('final history is ',history) # return history[-1][1], prompt_html except Exception as e: # Catch any exception print('An unexpected error occurred during generation:', str(e)) yield f"An unexpected error occurred during generation: {str(e)}" with gr.Blocks(theme='WeixuanYuan/Soft_dark') as demo: # Beautiful heading with logo gr.HTML(value="""
A free chat bot developed by National Customs Targeting Center using Open source LLMs.(Dedicated to 75th Batch IRS Probationers)
""", elem_id="description") chatbot = gr.Chatbot( [], elem_id="chatbot", avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', 'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), bubble_full_width=False, show_copy_button=True, show_share_button=True, ) with gr.Row(): txt = gr.Textbox( scale=3, show_label=False, placeholder="Enter text and press enter", container=False, ) txt_btn = gr.Button(value="Submit text", scale=1) api_kind = gr.Radio(choices=["HuggingFace","Gemini"], value="HuggingFace") prompt_html = gr.HTML() #prompt_html = gr.Textbox(label='Retrieved Documents') try: # Turn off interactivity while generating if you click txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( bot, [chatbot, api_kind], [chatbot, prompt_html]) except Exception as e: print ('Exception txt btn click ' ,str(e)) # Turn it back on txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) try: # Turn off interactivity while generating if you hit enter txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( bot, [chatbot, api_kind], [chatbot, prompt_html]) except Exception as e: print ('Exception ' ,str(e)) # Turn it back on txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) # Examples gr.Examples(examples, txt) demo.queue() demo.launch(debug=True)