|
|
|
""" |
|
Credit to Derek Thomas, derek@huggingface.co |
|
""" |
|
|
|
import subprocess |
|
|
|
|
|
import spaces |
|
import logging |
|
from pathlib import Path |
|
from time import perf_counter |
|
|
|
import gradio as gr |
|
from jinja2 import Environment, FileSystemLoader |
|
import numpy as np |
|
from sentence_transformers import CrossEncoder |
|
|
|
from backend.query_llm import generate_hf, generate_openai,generate_gemini |
|
from backend.semantic_search import table, retriever |
|
|
|
VECTOR_COLUMN_NAME = "vector" |
|
TEXT_COLUMN_NAME = "text" |
|
|
|
proj_dir = Path(__file__).parent |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) |
|
|
|
|
|
template = env.get_template('template.j2') |
|
template_html = env.get_template('template_html.j2') |
|
|
|
|
|
|
|
cross_encoder = CrossEncoder('BAAI/bge-reranker-base') |
|
|
|
examples = ['My transhipment cargo is missing','can u explain and tabulate difference between b 17 bond and a warehousing bond', |
|
'What are benefits of the AEO Scheme and eligibility criteria?', |
|
'What are penalties for customs offences? ', 'what are penalties to customs officers misusing their powers under customs act?','What are eligibility criteria for exemption from cost recovery charges','list in detail what is procedure for obtaining new approval for openeing a CFS attached to an ICD'] |
|
|
|
|
|
def add_text(history, text): |
|
history = [] if history is None else history |
|
history = history + [(text, None)] |
|
print('add_text function done..returning history' ,history) |
|
return history, gr.Textbox(value="", interactive=False) |
|
|
|
|
|
def bot(history, api_kind): |
|
top_rerank = 15 |
|
top_k_rank = 10 |
|
query = history[-1][0] |
|
print('history[-1][0]',history[-1][0]) |
|
print('api kind ',api_kind) |
|
|
|
if not query: |
|
gr.Warning("Please submit a non-empty string as a prompt") |
|
raise ValueError("Empty string was submitted") |
|
|
|
logger.warning('Retrieving documents...') |
|
|
|
document_start = perf_counter() |
|
|
|
query_vec = retriever.encode(query) |
|
print(query) |
|
query_vec_flat = [arr.flatten() for arr in query_vec] |
|
logger.warning(f'Finished query vec') |
|
|
|
|
|
|
|
|
|
logger.warning(f'Finished search') |
|
documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_rerank).to_list() |
|
documents = [doc[TEXT_COLUMN_NAME] for doc in documents] |
|
logger.warning(f'start cross encoder {len(documents)}') |
|
|
|
query_doc_pair = [[query, doc] for doc in documents] |
|
cross_scores = cross_encoder.predict(query_doc_pair) |
|
sim_scores_argsort = list(reversed(np.argsort(cross_scores))) |
|
logger.warning(f'Finished cross encoder {len(documents)}') |
|
|
|
documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]] |
|
logger.warning(f'num documents {len(documents)}') |
|
|
|
document_time = perf_counter() - document_start |
|
logger.warning(f'Finished Retrieving documents in {round(document_time, 2)} seconds...') |
|
|
|
|
|
prompt = template.render(documents=documents, query=query) |
|
prompt_html = template_html.render(documents=documents, query=query) |
|
|
|
if api_kind == "HuggingFace": |
|
generate_fn = generate_hf |
|
elif api_kind == "Gemini": |
|
print("Gemini condition satisfied") |
|
generate_fn = generate_gemini |
|
elif api_kind is None: |
|
gr.Warning("API name was not provided") |
|
raise ValueError("API name was not provided") |
|
else: |
|
gr.Warning(f"API {api_kind} is not supported") |
|
raise ValueError(f"API {api_kind} is not supported") |
|
try: |
|
count_tokens = lambda text: len([token.strip() for token in text.split() if token.strip()]) |
|
print(prompt_html,'token count is',count_tokens(prompt_html)) |
|
history[-1][1] = "" |
|
for character in generate_fn(prompt, history[:-1]): |
|
history[-1][1] = character |
|
yield history, prompt_html |
|
print('final history is ',history) |
|
|
|
|
|
except Exception as e: |
|
print('An unexpected error occurred during generation:', str(e)) |
|
yield f"An unexpected error occurred during generation: {str(e)}" |
|
|
|
with gr.Blocks(theme='WeixuanYuan/Soft_dark') as CHATBOT: |
|
|
|
gr.HTML(value=""" |
|
<div style="display: flex; align-items: center; justify-content: space-between;"> |
|
<h1 style="color: #008000">ADWITIYA - <span style="color: #008000">Customs Manual Chatbot</span></h1> |
|
<img src='logo.png' alt="Chatbot" width="50" height="50" /> |
|
</div> |
|
""", elem_id="heading") |
|
|
|
|
|
gr.HTML(value="""<p style="font-family: sans-serif; font-size: 16px;">A free chat bot developed by National Customs Targeting Center using Open source LLMs.(Dedicated to 75th Batch IRS Probationers)</p>""", elem_id="description") |
|
|
|
chatbot = gr.Chatbot( |
|
[], |
|
elem_id="chatbot", |
|
avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', |
|
'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), |
|
bubble_full_width=False, |
|
show_copy_button=True, |
|
show_share_button=True, |
|
) |
|
|
|
with gr.Row(): |
|
txt = gr.Textbox( |
|
scale=3, |
|
show_label=False, |
|
placeholder="Enter text and press enter", |
|
container=False, |
|
) |
|
txt_btn = gr.Button(value="Submit text", scale=1) |
|
|
|
api_kind = gr.Radio(choices=["HuggingFace","Gemini"], value="HuggingFace") |
|
|
|
prompt_html = gr.HTML() |
|
|
|
try: |
|
|
|
txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( |
|
bot, [chatbot, api_kind], [chatbot, prompt_html]) |
|
except Exception as e: |
|
print ('Exception txt btn click ' ,str(e)) |
|
|
|
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) |
|
try: |
|
|
|
txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then( |
|
bot, [chatbot, api_kind], [chatbot, prompt_html]) |
|
except Exception as e: |
|
print ('Exception ' ,str(e)) |
|
|
|
|
|
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False) |
|
|
|
|
|
gr.Examples(examples, txt) |
|
|
|
|
|
RAG_db=gr.State() |
|
|
|
|
|
|
|
with gr.Blocks(title="Quiz Maker", theme=gr.themes.Default(primary_hue="green", secondary_hue="green"), css="style.css") as QUIZBOT: |
|
def system_instructions(question_difficulty, topic,documents_str): |
|
return f"""<s> [INST] Your are a great teacher and your task is to create 10 questions with 4 choices with a {question_difficulty} difficulty about topic request " {topic} " only from the below given documents, {documents_str} then create an answers. Index in JSON format, the questions as "Q#":"" to "Q#":"", the four choices as "Q#:C1":"" to "Q#:C4":"", and the answers as "A#":"Q#:C#" to "A#":"Q#:C#". [/INST]""" |
|
|
|
def load_model(): |
|
RAG= RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0") |
|
RAG_db.value=RAG.from_index('.ragatouille/colbert/indexes/cbseclass10index') |
|
return 'Ready to Go!!' |
|
with gr.Column(scale=4): |
|
gr.HTML(""" |
|
<center> |
|
<h1><span style="color: purple;">AI NANBAN</span> - CBSE Class Quiz Maker</h1> |
|
<h2>AI-powered Learning Game</h2> |
|
<i>β οΈ Students create quiz from any topic /CBSE Chapter ! β οΈ</i> |
|
</center> |
|
""") |
|
|
|
with gr.Column(scale=2): |
|
load_btn = gr.Button("Click to Load!π") |
|
load_text=gr.Textbox() |
|
load_btn.click(load_model,[],load_text) |
|
|
|
|
|
topic = gr.Textbox(label="Enter the Topic for Quiz", placeholder="Write any topic from CBSE notes") |
|
|
|
with gr.Row(): |
|
radio = gr.Radio( |
|
["easy", "average", "hard"], label="How difficult should the quiz be?" |
|
) |
|
|
|
|
|
generate_quiz_btn = gr.Button("Generate Quiz!π") |
|
quiz_msg=gr.Textbox() |
|
|
|
question_radios = [gr.Radio(visible=False), gr.Radio(visible=False), gr.Radio( |
|
visible=False), gr.Radio(visible=False), gr.Radio(visible=False), gr.Radio(visible=False), gr.Radio(visible=False), gr.Radio( |
|
visible=False), gr.Radio(visible=False), gr.Radio(visible=False)] |
|
|
|
print(question_radios) |
|
|
|
@spaces.GPU |
|
@generate_quiz_btn.click(inputs=[radio, topic], outputs=[quiz_msg]+question_radios, api_name="generate_quiz") |
|
def generate_quiz(question_difficulty, topic): |
|
top_k_rank=10 |
|
RAG_db_=RAG_db.value |
|
documents_full=RAG_db_.search(topic,k=top_k_rank) |
|
|
|
|
|
|
|
generate_kwargs = dict( |
|
temperature=0.2, |
|
max_new_tokens=4000, |
|
top_p=0.95, |
|
repetition_penalty=1.0, |
|
do_sample=True, |
|
seed=42, |
|
) |
|
question_radio_list = [] |
|
count=0 |
|
while count<=3: |
|
try: |
|
documents=[item['content'] for item in documents_full] |
|
document_summaries = [f"[DOCUMENT {i+1}]: {summary}{count}" for i, summary in enumerate(documents)] |
|
documents_str='\n'.join(document_summaries) |
|
formatted_prompt = system_instructions( |
|
question_difficulty, topic,documents_str) |
|
print(formatted_prompt) |
|
pre_prompt = [ |
|
{"role": "system", "content": formatted_prompt} |
|
] |
|
response = client.text_generation( |
|
formatted_prompt, **generate_kwargs, stream=False, details=False, return_full_text=False, |
|
) |
|
output_json = json.loads(f"{response}") |
|
|
|
|
|
print(response) |
|
print('output json', output_json) |
|
|
|
global quiz_data |
|
|
|
quiz_data = output_json |
|
|
|
|
|
|
|
for question_num in range(1, 11): |
|
question_key = f"Q{question_num}" |
|
answer_key = f"A{question_num}" |
|
|
|
question = quiz_data.get(question_key) |
|
answer = quiz_data.get(quiz_data.get(answer_key)) |
|
|
|
if not question or not answer: |
|
continue |
|
|
|
choice_keys = [f"{question_key}:C{i}" for i in range(1, 5)] |
|
choice_list = [] |
|
for choice_key in choice_keys: |
|
choice = quiz_data.get(choice_key, "Choice not found") |
|
choice_list.append(f"{choice}") |
|
|
|
radio = gr.Radio(choices=choice_list, label=question, |
|
visible=True, interactive=True) |
|
|
|
question_radio_list.append(radio) |
|
if len(question_radio_list)==10: |
|
break |
|
else: |
|
print('10 questions not generated . So trying again!') |
|
count+=1 |
|
continue |
|
except Exception as e: |
|
count+=1 |
|
print(f"Exception occurred: {e}") |
|
if count==3: |
|
print('Retry exhausted') |
|
gr.Warning('Sorry. Pls try with another topic !') |
|
else: |
|
print(f"Trying again..{count} time...please wait") |
|
continue |
|
|
|
print('Question radio list ' , question_radio_list) |
|
|
|
return ['Quiz Generated!']+ question_radio_list |
|
|
|
check_button = gr.Button("Check Score") |
|
|
|
score_textbox = gr.Markdown() |
|
|
|
@check_button.click(inputs=question_radios, outputs=score_textbox) |
|
def compare_answers(*user_answers): |
|
user_anwser_list = [] |
|
user_anwser_list = user_answers |
|
|
|
answers_list = [] |
|
|
|
for question_num in range(1, 20): |
|
answer_key = f"A{question_num}" |
|
answer = quiz_data.get(quiz_data.get(answer_key)) |
|
if not answer: |
|
break |
|
answers_list.append(answer) |
|
|
|
score = 0 |
|
|
|
for item in user_anwser_list: |
|
if item in answers_list: |
|
score += 1 |
|
if score>5: |
|
message = f"### Good ! You got {score} over 10!" |
|
elif score>7: |
|
message = f"### Excellent ! You got {score} over 10!" |
|
else: |
|
message = f"### You got {score} over 10! Dont worry . You can prepare well and try better next time !" |
|
|
|
return message |
|
|
|
|
|
|
|
demo = gr.TabbedInterface([CHATBOT,QUIZBOT], ["AI ChatBot", "AI Nanban-Quizbot"]) |
|
|
|
|
|
demo.queue() |
|
demo.launch(debug=True) |
|
|