import threading # to allow streaming response import time # to pave the deliver of the message import gradio # for the interface import spaces # for GPU import transformers # to load an LLM import langchain_community.vectorstores # to load the publication vectorstore import langchain_huggingface # for embeddings # The greeting message GREETING = ( "Howdy! " "I'm an AI agent that uses [retrieval-augmented generation](https://en.wikipedia.org/wiki/Retrieval-augmented_generation) pipeline to answer questions about additive manufacturing research. " "I still make some mistakes though. " "What can I tell you about today?" ) # Example queries EXAMPLE_QUERIES = [ "Tell me about new research at the intersection of additive manufacturing and machine learning.", ] # The embedding model name EMBEDDING_MODEL_NAME = "all-MiniLM-L12-v2" # The LLM model name LLM_MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct" # The number of publications to retrieve PUBLICATIONS_TO_RETRIEVE = 5 def embedding( model_name: str = "all-MiniLM-L12-v2", device: str = "mps", normalize_embeddings: bool = False, ) -> langchain_huggingface.HuggingFaceEmbeddings: """ Get the embedding function :param model_name: The model name :type model_name: str :param device: The device to use :type device: str :param normalize_embeddings: Whether to normalize embeddings :type normalize_embeddings: bool :return: The embedding function :rtype: langchain_huggingface.HuggingFaceEmbeddings """ return langchain_huggingface.HuggingFaceEmbeddings( model_name=model_name, model_kwargs={"device": device}, encode_kwargs={"normalize_embeddings": normalize_embeddings}, ) def load_publication_vectorstore() -> langchain_community.vectorstores.FAISS: """ Load the publication vectorstore :return: The publication vectorstore :rtype: langchain_community.vectorstores.FAISS """ return langchain_community.vectorstores.FAISS.load_local( folder_path="publication_vectorstore", embeddings=embedding(), allow_dangerous_deserialization=True, ) publication_vectorstore = load_publication_vectorstore() # Create an LLM pipeline that we can send queries to tokenizer = transformers.AutoTokenizer.from_pretrained( LLM_MODEL_NAME, trust_remote_code=True ) streamer = transformers.TextIteratorStreamer( tokenizer, skip_prompt=True, skip_special_tokens=True ) chatmodel = transformers.AutoModelForCausalLM.from_pretrained( LLM_MODEL_NAME, device_map="auto", torch_dtype="auto", trust_remote_code=True ) def preprocess(query: str, k: int) -> tuple[str, str]: """ Searches the dataset for the top k most relevant papers to the query and returns a prompt and references Args: query (str): The user's query k (int): The number of results to return Returns: tuple[str, str]: A tuple containing the prompt and references """ documents = publication_vectorstore.search( query, k=PUBLICATIONS_TO_RETRIEVE, search_type="similarity" ) prompt = ( "You are an AI assistant who delights in helping people learn about research from the Design Research Collective, which is a research lab at Carnegie Mellon University led by Professor Chris McComb. " "Your main task is to provide a concise ANSWER to the USER_QUERY that includes as many of the RESEARCH_ABSTRACTS as possible. " "The RESEARCH_ABSTRACTS are provided in the `.bibtex` format. Your ANSWER should contain citations to the RESEARCH_ABSTRACTS using (AUTHOR, YEAR) format. " "DO NOT list references at the end of the answer.\n\n" "===== RESEARCH_EXCERPTS =====:\n{{EXCERPTS_GO_HERE}}\n\n" "===== USER_QUERY =====:\n{{QUERY_GOES_HERE}}\n\n" "===== ANSWER =====:\n" ) research_excerpts = [ '"... ' + document.page_content + '..."' for document in documents ] prompt = prompt.replace("{{EXCERPTS_GO_HERE}}", "\n\n".join(research_excerpts)) prompt = prompt.replace("{{QUERY_GOES_HERE}}", query) print(prompt) return prompt, "" @spaces.GPU def reply(message: str, history: list[str]) -> str: """ This function is responsible for crafting a response Args: message (str): The user's message history (list[str]): The conversation history Returns: str: The AI's response """ # Apply preprocessing message, bypass = preprocess(message, PUBLICATIONS_TO_RETRIEVE) # This is some handling that is applied to the history variable to put it in a good format history_transformer_format = [ {"role": role, "content": message_pair[idx]} for message_pair in history for idx, role in enumerate(["user", "assistant"]) if message_pair[idx] is not None ] + [{"role": "user", "content": message}] # Stream a response from pipe text = tokenizer.apply_chat_template( history_transformer_format, tokenize=False, add_generation_prompt=True ) model_inputs = tokenizer([text], return_tensors="pt").to("cuda:0") generate_kwargs = dict(model_inputs, streamer=streamer, max_new_tokens=512) t = threading.Thread(target=chatmodel.generate, kwargs=generate_kwargs) t.start() partial_message = "" for new_token in streamer: if new_token != "<": partial_message += new_token time.sleep(0.01) yield partial_message yield partial_message + "\n\n" + bypass # Create and run the gradio interface gradio.ChatInterface( reply, examples=EXAMPLE_QUERIES, chatbot=gradio.Chatbot( show_label=False, show_share_button=False, show_copy_button=False, value=[[None, GREETING]], avatar_images=( "https://cdn.dribbble.com/users/316121/screenshots/2333676/11-04_scotty-plaid_dribbble.png", "https://media.thetab.com/blogs.dir/90/files/2021/06/screenshot-2021-06-10-at-110730-1024x537.png", ), height="60vh", bubble_full_width=False, ), retry_btn=None, undo_btn=None, clear_btn=None, theme=gradio.themes.Default(font=[gradio.themes.GoogleFont("Zilla Slab")]), ).launch(debug=True)