import os from threading import Thread from typing import Iterator import gradio as gr import spaces import torch from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, TextStreamer from llama_index.core.prompts.prompts import SimpleInputPrompt from llama_index.llms.huggingface import HuggingFaceLLM from llama_index.legacy.embeddings.langchain import LangchainEmbedding #from langchain.embeddings.huggingface import HuggingFaceEmbeddings # This import should now work from langchain_huggingface import HuggingFaceEmbeddings from sentence_transformers import SentenceTransformer from llama_index.core import set_global_service_context, ServiceContext from llama_index.core import VectorStoreIndex, download_loader, Document # Import Document from pathlib import Path import fitz # PyMuPDF MAX_MAX_NEW_TOKENS = 2048 DEFAULT_MAX_NEW_TOKENS = 512 MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) DEFAULT_SYS_PROMPT = """\ """ DESCRIPTION = """\ # Test Chat Information System for MEPO 2024 courtesy of Dr. Dancy & THiCC Lab Duplicated, then modified from [llama-2 7B example](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat) """ LICENSE = """

--- As a derivate work of [Llama-2-7b-chat](https://huggingface.co/meta-llama/Llama-2-7b-chat) by Meta, this demo is governed by the original [license](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat/blob/main/LICENSE.txt) and [acceptable use policy](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat/blob/main/USE_POLICY.md). """ SYSTEM_PROMPT = """[INST] <> <>""" def read_pdf_to_documents(file_path): doc = fitz.open(file_path) documents = [] for page_num in range(len(doc)): page = doc.load_page(page_num) text = page.get_text() documents.append(Document(text=text)) # Now Document is defined return documents # Function to update the global system prompt def update_system_prompt(new_prompt): global SYSTEM_PROMPT SYSTEM_PROMPT = new_prompt query_wrapper_prompt = SimpleInputPrompt("{query_str} [/INST]") return "System prompt updated." @spaces.GPU(duration=240) def query_model(question): llm = HuggingFaceLLM( context_window=4096, max_new_tokens=256, system_prompt=SYSTEM_PROMPT, query_wrapper_prompt=query_wrapper_prompt, model=model, tokenizer=tokenizer ) #embeddings = LangchainEmbedding(HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")) service_context = ServiceContext.from_defaults(chunk_size=1024, llm=llm, embed_model=embeddings) set_global_service_context(service_context) response = query_engine.query(question) # formatted_response = format_paragraph(response.response) return response.response def format_paragraph(text, line_length=80): words = text.split() lines = [] current_line = [] current_length = 0 for word in words: if current_length + len(word) + 1 > line_length: lines.append(' '.join(current_line)) current_line = [word] current_length = len(word) + 1 else: current_line.append(word) current_length += len(word) + 1 if current_line: lines.append(' '.join(current_line)) return '\n'.join(lines) if not torch.cuda.is_available(): DESCRIPTION += "We won't be able to run this space! We need GPU processing" if torch.cuda.is_available(): llama_model_id = "meta-llama/Llama-2-7b-chat-hf" model = AutoModelForCausalLM.from_pretrained(llama_model_id, torch_dtype=torch.float16, device_map="auto") tokenizer = AutoTokenizer.from_pretrained(llama_model_id) tokenizer.use_default_system_prompt = False # Throw together the query wrapper query_wrapper_prompt = SimpleInputPrompt("{query_str} [/INST]") llm = HuggingFaceLLM(context_window=4096, max_new_tokens=256, system_prompt=SYSTEM_PROMPT, query_wrapper_prompt=query_wrapper_prompt, model=model, tokenizer=tokenizer) embeddings = LangchainEmbedding(HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")) service_context = ServiceContext.from_defaults(chunk_size=1024, llm=llm, embed_model=embeddings) set_global_service_context(service_context) print(os.listdir()) file_path = Path("files/Full_Pamplet.pdf") documents = read_pdf_to_documents(file_path) index = VectorStoreIndex.from_documents(documents) query_engine = index.as_query_engine() update_prompt_interface = gr.Interface( fn=update_system_prompt, inputs=gr.Textbox(lines=5, placeholder="Enter the system prompt here...", label="System Prompt", value=SYSTEM_PROMPT), outputs=gr.Textbox(label="Status"), title="System Prompt Updater", description="Update the system prompt used for context." ) # Create Gradio interface for querying the model query_interface = gr.Interface( fn=query_model, inputs=gr.Textbox(lines=2, placeholder="Enter your question here...", label="User Question"), outputs=gr.Textbox(label="Response"), title="Document Query Assistant", description="Ask questions based on the content of the loaded pamphlet." ) # Combine the interfaces combined_interface = gr.TabbedInterface([update_prompt_interface, query_interface], ["Update System Prompt", "Query Assistant"]) # Launch the combined interface #combined_interface.launch() """ @spaces.GPU(duration=240) def generate( message: str, chat_history: list[tuple[str, str]], system_prompt: str, max_new_tokens: int = MAX_MAX_NEW_TOKENS, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2, ) -> Iterator[str]: conversation = [] if system_prompt: conversation.append({"role": "system", "content": system_prompt}) for user, assistant in chat_history: conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) conversation.append({"role": "user", "content": message}) input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") input_ids = input_ids.to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( {"input_ids": input_ids}, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, top_k=top_k, temperature=temperature, num_beams=1, repetition_penalty=repetition_penalty, ) t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() outputs = [] for text in streamer: outputs.append(text) yield "".join(outputs) chat_interface = gr.ChatInterface( fn=generate, additional_inputs=[ gr.Textbox(label="System prompt", lines=6), gr.Slider( label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS, ), gr.Slider( label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6, ), gr.Slider( label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9, ), gr.Slider( label="Top-k", minimum=1, maximum=1000, step=1, value=50, ), gr.Slider( label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2, ), ], stop_btn=None, examples=[ ["Hello there! How are you doing?"], ["Can you explain briefly to me what is the Python programming language?"], ["Explain the plot of Cinderella in a sentence."], ["How many hours does it take a man to eat a Helicopter?"], ["Write a 100-word article on 'Benefits of Open-Source in AI research'"], ], ) """ with gr.Blocks(css="style.css") as demo: gr.Markdown(DESCRIPTION) #chat_interface.render() combined_interface.render() gr.Markdown(LICENSE) if __name__ == "__main__": demo.queue(max_size=20).launch(share=True)