import pandas as pd import numpy as np import tensorflow as tf import tensorflow_hub as hub import sys import random sys.path.append('models') from official.nlp.data import classifier_data_lib from official.nlp.bert import tokenization from official.nlp import optimization tf.get_logger().setLevel('ERROR') from huggingface_hub import InferenceClient import math import gradio as gr from datetime import datetime num_warmup_steps=1 num_train_steps=1 init_lr = 3e-5 optimizer = optimization.create_optimizer(init_lr=init_lr,num_train_steps=num_train_steps,num_warmup_steps=num_warmup_steps,optimizer_type='adamw') ### Load Model checkpoint_filepath=r'./Checkpoint' model = tf.keras.models.load_model(checkpoint_filepath, custom_objects={'KerasLayer':hub.KerasLayer , 'AdamWeightDecay': optimizer}) df_report = pd.read_csv('./CTH_Description.csv') df_report['CTH Code'] = df_report['CTH Code'].astype(str).str.zfill(8) df_report_DUTY = pd.read_csv('./CTH_WISE_DUTY_RATE.csv') df_report_DUTY['CTH'] = df_report_DUTY['CTH'].astype(str).str.zfill(8) df = pd.read_csv("./CTH_CODE_MAP.csv") df['CTH'] = df['CTH'].astype(str).str.zfill(8) df = df[['CTH', 'code']] client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") class_names=df[['CTH','code']].drop_duplicates(subset='CTH').sort_values(by='code',ignore_index=True)['CTH'].values.tolist() label_list=list(range(0,len(class_names))) max_seq_length = 200 # maximum length of (token) input sequences . it can be any number train_batch_size = 32 # batch size ( 16 choosen to avoid Out-Of-Memory errors) # Get BERT layer and tokenizer: # More details here: https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/4 bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/4" , trainable = True) vocab_file = bert_layer.resolved_object.vocab_file.asset_path.numpy() do_lower_case = bert_layer.resolved_object.do_lower_case.numpy() tokenizer = tokenization.FullTokenizer(vocab_file , do_lower_case) # This provides a function to convert each row to input features and label ( as required by BERT) max_seq_length = 200 # maximum length of (token) input sequences . it can be any number def to_feature(text, label, label_list=label_list, max_seq_length=max_seq_length, tokenizer=tokenizer): example = classifier_data_lib.InputExample(guid = None, text_a = text.numpy(), text_b = None, label = label.numpy()) feature = classifier_data_lib.convert_single_example(0 , example , label_list , max_seq_length , tokenizer) return (feature.input_ids , feature.input_mask , feature.segment_ids , feature.label_id) def to_feature_map(text, label): input_ids , input_mask , segment_ids , label_id = tf.py_function(to_feature , inp = [text , label], Tout = [tf.int32 , tf.int32 , tf.int32 , tf.int32]) input_ids.set_shape([max_seq_length]) input_mask.set_shape([max_seq_length]) segment_ids.set_shape([max_seq_length]) label_id.set_shape([]) x = { "input_word_ids": input_ids, "input_mask": input_mask, "input_type_ids": segment_ids } return(x,label_id) def print3largest(arr, arr_size): third = first = second = -sys.maxsize for i in range(0, arr_size): if (arr[i] > first): third = second second = first first = arr[i] elif (arr[i] > second): third = second second = arr[i] elif (arr[i] > third): third = arr[i] pred_value_max_three=[first, second, third] return pred_value_max_three def count_special_character(string): special_char= 0 for i in range(len(string)): ch = string[i] if (string[i].isalpha()): continue else: special_char += 1 if len(string)==special_char: return False else: return True def format_prompt(message, history): prompt = "" for user_prompt, bot_response in history: prompt += f"[INST] {user_prompt} [/INST]" prompt += f" {bot_response} " prompt += f"[INST] {message} [/INST]" return prompt additional_inputs=[ gr.Textbox( label="System Prompt", max_lines=1, interactive=True, ), gr.Slider( label="Temperature", value=0.9, minimum=0.0, maximum=1.0, step=0.05, interactive=True, info="Higher values produce more diverse outputs", ), gr.Slider( label="Max new tokens", value=1024, minimum=0, maximum=4096, step=64, interactive=True, info="The maximum numbers of new tokens", ), gr.Slider( label="Top-p (nucleus sampling)", value=0.50, minimum=0.0, maximum=1, step=0.05, interactive=True, info="Higher values sample more low-probability tokens", ), gr.Slider( label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Penalize repeated tokens", ) ] def predict_CTH(txt): print('Desc: ',txt) now = datetime.now() print("Time =", now) if (txt!='') and len(txt)>=3 and (count_special_character(txt)): valid_data = tf.data.Dataset.from_tensor_slices(([txt] , [1])) # 1 refers to 'entertainment' and 2 refers to 'sport' valid_data = (valid_data.map(to_feature_map).batch(1)) preds = model.predict(valid_data) predicted_values = tf.nn.softmax(preds) arr = predicted_values.numpy().tolist()[0] n = len(arr) pred_value_max_three=print3largest(arr, n) sum_all = pred_value_max_three[0] + pred_value_max_three[1] + pred_value_max_three[2] val_1 = pred_value_max_three[0]/sum_all val_2 = pred_value_max_three[1]/sum_all val_3 = pred_value_max_three[2]/sum_all if pred_value_max_three[0]<=0.000131: Var_CTH=[] Var_desc=[] Var_duty=[] pred_duty='' pred_desc='' pred_CTH='' return{'Not a adequate description':float(1.0)} else: Var_CTH=[] Var_desc=[] Var_duty=[] pred_duty='' pred_desc='' pred_CTH='' for i in pred_value_max_three: predicted_code=np.where(predicted_values.numpy()==i)[1][0] pred_CTH=df[df['code'] == predicted_code]['CTH'].iloc[0] try: pred_duty=df_report_DUTY[df_report_DUTY['CTH']==str(pred_CTH)]['DUTY_RATE'].iloc[0] except: pred_duty='' pass try: pred_desc=df_report[df_report['CTH Code']==str(pred_CTH)]['Concat Description'].iloc[0] except: pred_desc='' pass Var_CTH.append(pred_CTH) Var_desc.append(pred_desc) Var_duty.append(pred_duty) P1 ='CTH: '+str(Var_CTH[0])+' Duty Rate(%): '+ str(Var_duty[0]) P2 ='CTH: '+str(Var_CTH[1])+' Duty Rate(%): '+ str(Var_duty[1]) P3 ='CTH: '+str(Var_CTH[2])+' Duty Rate(%): '+ str(Var_duty[2]) Q1='Desc: '+str(Var_desc[0]) Q2='Desc: '+str(Var_desc[1]) Q3='Desc: '+str(Var_desc[2]) return {str(P1):float(val_1),str(Q1):float(val_1), str(P2):float(val_2),str(Q2):float(val_2), str(P3):float(val_3),str(Q3):float(val_3),} else: return{'Enter Correct Description':float(1.0)} def llm_model_function(txt,history,chatbot=[], temperature=0.9, max_new_tokens=1024, top_p=0.95, repetition_penalty=1.0,): system_prompt=[] chatbot=[] if (txt!='') and len(txt)>=3 and (count_special_character(txt)): valid_data = tf.data.Dataset.from_tensor_slices(([txt] , [1])) # 1 refers to 'entertainment' and 2 refers to 'sport' valid_data = (valid_data.map(to_feature_map).batch(1)) preds = model.predict(valid_data) predicted_values = tf.nn.softmax(preds) arr = predicted_values.numpy().tolist()[0] n = len(arr) pred_value_max_three=print3largest(arr, n) sum_all = pred_value_max_three[0] + pred_value_max_three[1] + pred_value_max_three[2] val_1 = pred_value_max_three[0]/sum_all val_2 = pred_value_max_three[1]/sum_all val_3 = pred_value_max_three[2]/sum_all if pred_value_max_three[0]<=0.000131: Var_CTH=[] Var_desc=[] Var_duty=[] pred_duty='' pred_desc='' pred_CTH='' #return{'Not a adequate description':float(1.0)} chatbot.append(('Not a adequate description', 'Not a adequate description')) return "", chatbot else: Var_CTH=[] Var_desc=[] Var_duty=[] pred_duty='' pred_desc='' pred_CTH='' for i in pred_value_max_three: predicted_code=np.where(predicted_values.numpy()==i)[1][0] pred_CTH=df[df['code'] == predicted_code]['CTH'].iloc[0] try: pred_duty=df_report_DUTY[df_report_DUTY['CTH']==str(pred_CTH)]['DUTY_RATE'].iloc[0] pred_desc=df_report[df_report['CTH Code']==str(pred_CTH)]['Concat Description'].iloc[0] except: pred_duty='' pred_desc='' pass Var_CTH.append(pred_CTH) Var_desc.append(pred_desc) Var_duty.append(pred_duty) P1 ='CTH: '+str(Var_CTH[0])+' Duty Rate(%): '+ str(Var_duty[0]) P2 ='CTH: '+str(Var_CTH[1])+' Duty Rate(%): '+ str(Var_duty[1]) P3 ='CTH: '+str(Var_CTH[2])+' Duty Rate(%): '+ str(Var_duty[2]) Q1='Desc: '+str(Var_desc[0]) Q2='Desc: '+str(Var_desc[1]) Q3='Desc: '+str(Var_desc[2]) output_str_msg='1. '+str(P1)+' '+str(Q1)+' '+'2. '+str(P2)+' '+str(Q2)+' '+'3. '+str(P3)+' '+str(Q3) prompt=f'First Explain What is the product- {txt}. Which is the most appropriate 8 Digit classification code out of the three given below classes. Explain the reason step by step. if none of the three classification is applicable more precisely due to lack of any additional information, tell you need additional information and what is the that additional information. {output_str_msg} ?' temperature = float(temperature) if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_new_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=42, ) formatted_prompt = format_prompt(f", {prompt}", history) stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) output = "" for response in stream: output += response.token.text chatbot.append((txt, output)) return "", chatbot else: # warning_msg = f"Unexpected response" # raise gr.Error(warning_msg) chatbot.append(('Not a adequate description', 'Not a adequate description')) return "", chatbot def product_explaination(txt,history,chatbot=[], temperature=0.9, max_new_tokens=1024, top_p=0.95, repetition_penalty=1.0,): print('Input Descrption is:',txt) chatbot=[] prompt=f'What is the product- {txt}?' print('prompt',prompt) temperature = float(temperature) if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_new_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=42, ) formatted_prompt = format_prompt(f", {prompt}", history) stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) output = "" for response in stream: output += response.token.text chatbot.append((txt, output)) return "", chatbot