from typing import Any, List, Mapping, Optional from langchain_core.callbacks.manager import CallbackManagerForLLMRun from langchain_core.language_models.llms import LLM from typing import Literal import requests from langchain.prompts import PromptTemplate, ChatPromptTemplate from operator import itemgetter from langchain.memory import ChatMessageHistory, ConversationBufferMemory from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_community.chat_models import ChatOpenAI from langchain_core.runnables import RunnableLambda, RunnablePassthrough from langchain_core.messages import AIMessage, HumanMessage from langchain_community.document_loaders import DirectoryLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import PyMuPDFLoader import os, requests from langchain.embeddings import HuggingFaceEmbeddings from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings from langchain.vectorstores import FAISS from langchain_core.runnables import RunnableBranch import pickle, asyncio, traceback # os.environ['FAISS_NO_AVX2'] = '1' import pandas as pd async def create_vectorstore(): API_TOKEN = os.getenv('HF_INFER_API') loader = os.getenv('knowledge_base') # web_loader = load_web("https://lintasmediadanawa.com") splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=20) # docs = splitter.create_documents([loader]+web_loader) docs = splitter.create_documents([loader]) print(len(docs)) emb_model = HuggingFaceEmbeddings(model_name='sentence-transformers/paraphrase-multilingual-mpnet-base-v2', encode_kwargs={'normalize_embeddings': True}) # emb_model = HuggingFaceInferenceAPIEmbeddings( # api_key=API_TOKEN, model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2", encode_kwargs={'normalize_embeddings': True} # ) async def add_docs(d): db.aadd_documents(await splitter.atransform_documents([d])) db = await FAISS.afrom_documents(docs, emb_model) f = pickle.load(open("wi_knowledge.dat", "rb")) print("Docs len :", len(f)) tasks = [] for d in f: tasks.append(db.aadd_documents(await splitter.atransform_documents([d]))) await asyncio.gather(*tasks) # asyncio.run(db.aadd_documents(asyncio.run(splitter.atransform_documents(f)))) # emb_model = HuggingFaceInferenceAPIEmbeddings( # api_key=API_TOKEN, model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2", encode_kwargs={'normalize_embeddings': True} # ) # x = open("wi_knowledge.pkl", 'rb') # db = FAISS.deserialize_from_bytes( # embeddings=emb_model, serialized=x # ) # db = pickle.load(x) # print(db) # db.add_documents( splitter.transform_documents(docs) ) return db def custom_chain_with_history(llm, memory): # prompt = PromptTemplate.from_template("""<|system|> # You are a helpful and informative AI customer service assistant. Always remember to thank the customer when they say thank you and greet them when they greet you. # You have access to the following context of knowledge base and internal resources to find the most relevant information for the customer's needs: # {context} # Respond to the user with the following chat history between you and the user: # {chat_history} # <|user|> # {question} # <|assistant|> # """) prompt = PromptTemplate.from_template("""<|system|> Anda adalah asisten AI Chatbot customer service. Anda memiliki akses table dibawah ini untuk menemukan informasi yang paling relevan dengan kebutuhan user: {context} Berikan respon kepada user berdasarkan riwayat chat berikut dengan bahasa yang digunakan terakhir kali oleh user, jika tidak ada informasi yang relevan maka itu adalah informasi yang rahasia dan Anda tidak diizinkan untuk menyebarkan informasi tersebut kepada user: {chat_history} <|user|> {question} <|assistant|> """) def prompt_memory(memory): t = "" for x in memory.chat_memory.messages: t += f"<|assistant|>\n{x.content}\n" if type(x) is AIMessage else f"<|user|>\n{x.content}\n" return "" if len(t) == 0 else t def format_docs(docs): # print(len(docs)) return "\n".join([f"{i+1}. {d.page_content}" for i,d in enumerate(docs)]) # prompt = ChatPromptTemplate.from_messages( # [ # ("system", "You are a helpful chatbot"), # MessagesPlaceholder(variable_name="history"), # ("human", "{input}"), # ] # ) # return {"chat_history":prompt_memory, "context":asyncio.run(create_vectorstore()).as_retriever(search_type="similarity", search_kwargs={"k": 12}) | format_docs, "question": RunnablePassthrough()} | prompt | llm return {"chat_history":lambda x:prompt_memory(x['memory']), "context":itemgetter("question") | asyncio.run(create_vectorstore()).as_retriever(search_type="similarity", search_kwargs={"k": 12}) | format_docs, "question": lambda x:x['question']} | prompt | llm def format_df(df): out = "" for x in df.columns: out+= x + "|" out = out[:-1] + "\n\n" for _,row in df.iterrows(): for x in row.values: out += str(x) + "|" out = out[:-1] out += "\n" return out def out_format(text, llm, df): prompt = PromptTemplate.from_template("""Fix the following code. Do not give explanation, just create the python code: {code} Error Message : {err} Always change the corresponding columns into datetime format with parameter day_first=True, example: df['column_name'] = pd.to_datetime(df['column_name'], day_first=True) Always use idxmin or idxmax instead of array indicies whenever it is possible Always use .iloc to query a dataframe instead of using array indicies directly The output must follow the following example format: ```python # Generated Code ``` """) err_chain = prompt | llm e_ = None for i in range(6): try : print(text) text_split = text.split("`python")[-1].split("```")[0].replace('\_', "_") # text_split = text.split("# Generated Code")[-1].split("```")[0].replace("\_", "_") if "response" not in text_split: text = text.split("```")[0].replace('\_', "_") else : text = text_split print(text) try : exec(text) except : text_split = text.split("# Generated Code")[-1].split("```")[0].replace("\_", "_") if "response" not in text_split: text = text.split("```")[0].replace('\_', "_") else : text = "# Generated Code" + text_split print(text) exec(text) return text except Exception as e: print(f"ERORRR! ATTEMPT : {i}\n",str(traceback.format_exc(limit=2))) text = err_chain.invoke({"code":text, "err":str(traceback.format_exc(limit=2))}) e_ = traceback.format_exc(limit=2) # exec(text) return "Bad Python Code, Error Message : " + str(e_) def unique_value_str_func(unique_val): return "\n".join([str(i+1) + "." + k + ": " + str(v) for i,(k,v) in enumerate(unique_val.items())]) def custom_dataframe_chain(llm, df, unique_values): unique_str = unique_value_str_func(unique_values) print(unique_str) prompt = PromptTemplate.from_template("""You have access to a pandas dataframe variable named "df". Below are the examples of the dataframe: {df_example} Given the following user input, create relevant python code to get the relevant information in the dataframe and store the response string result in a variable named "response". Do not explain, just create the python code: {question} Always change the corresponding columns into datetime format with parameter day_first=True, example: df['column_name'] = pd.to_datetime(df['column_name'], day_first=True) Always use idxmin or idxmax instead of array indicies whenever it is possible Do not import pandas and Do not create or re-assign "df" variable Below is the unique value of the important categorical columns: {unique_val} The output must follow the following example format: ```python # Generated Code ``` """).partial(unique_val=unique_str) return prompt | llm | RunnableLambda(lambda x:out_format(x, llm, df)) def custom_unique_df_chain(llm, df): prompt = PromptTemplate(template="""You have access to a pandas dataframe variable named "df". Below are the examples of the dataframe: {df_example} Create unique values for the important non-datetime categorical columns with maximum 20 unique values for each columns. Store the unique values in a variable named "response" with the following format of python dictionary: {{ column_name1 : [list_of_unique_column1], column_name2 : [list_of_unique_values_column2], column_name3 : [list_of_unique_values_column3] }} The output must follow the following example format: ```python # Generated Code ``` """, input_variables=["df_example"]) return prompt | llm | RunnableLambda(lambda x:out_format(x, llm, df)) def custom_combined_chain(llm, df_chain, memory_chain): # prompt = PromptTemplate.from_template(""" Given the following question, classify it as either being more relevant with a dataframe object of ticket submissions' history or several documents of user guide and general knowledge: # # {question} # # Respond with ONLY one word either "ticket" or "knowledge" # """) prompt = PromptTemplate.from_template(""" You have access to the following data sources: 1. Dataframe : use this data source to retrieve anything about ticket submission history 2. Documents : use this data source to retrieve anything related to user guide and work instruction or any other question not related to ticket submission history {question} Respond with ONLY one word either "dataframe" or "documents" """) # def route(info): # if 'ticket' in info['topic']: # return df_chain # else: # return memory_chain # full_chain = RunnablePassthrough.assign(topic= (prompt | llm)) | RunnableLambda(route) # combined_chain = prompt | llm return RunnablePassthrough.assign(topic=prompt | llm) | RunnableBranch( (lambda x: "dataframe" in x['topic'].lower(), df_chain), memory_chain ) class CustomLLM(LLM): repo_id : str api_token : str model_type: Literal["text2text-generation", "text-generation"] max_new_tokens: int = None temperature: float = 0.001 timeout: float = None top_p: float = None top_k : int = None repetition_penalty : float = None stop : List[str] = [] @property def _llm_type(self) -> str: return "custom" def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: headers = {"Authorization": f"Bearer {self.api_token}"} API_URL = f"https://api-inference.huggingface.co/models/{self.repo_id}" parameters_dict = { 'max_new_tokens': self.max_new_tokens, 'temperature': self.temperature, 'timeout': self.timeout, 'top_p': self.top_p, 'top_k': self.top_k, 'repetition_penalty': self.repetition_penalty, 'stop':self.stop } if self.model_type == 'text-generation': parameters_dict["return_full_text"]=False data = {"inputs": prompt, "parameters":parameters_dict, "options":{"wait_for_model":True}} data = requests.post(API_URL, headers=headers, json=data).json() return data[0]['generated_text'] @property def _identifying_params(self) -> Mapping[str, Any]: """Get the identifying parameters.""" return { 'repo_id': self.repo_id, 'model_type':self.model_type, 'stop_sequences':self.stop, 'max_new_tokens': self.max_new_tokens, 'temperature': self.temperature, 'timeout': self.timeout, 'top_p': self.top_p, 'top_k': self.top_k, 'repetition_penalty': self.repetition_penalty }