import os
from threading import Thread
from typing import Iterator
import gradio as gr
import spaces
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, TextStreamer
from llama_index.core.prompts.prompts import SimpleInputPrompt
from llama_index.llms.huggingface import HuggingFaceLLM
from llama_index.legacy.embeddings.langchain import LangchainEmbedding
#from langchain.embeddings.huggingface import HuggingFaceEmbeddings # This import should now work
from langchain_huggingface import HuggingFaceEmbeddings
from sentence_transformers import SentenceTransformer
from llama_index.core import set_global_service_context, ServiceContext
from llama_index.core import VectorStoreIndex, download_loader, Document # Import Document
from pathlib import Path
import fitz # PyMuPDF
MAX_MAX_NEW_TOKENS = 2048
DEFAULT_MAX_NEW_TOKENS = 512
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096"))
DEFAULT_SYS_PROMPT = """\
"""
DESCRIPTION = """\
# Test Chat Information System for MEPO 2024 courtesy of Dr. Dancy & THiCC Lab
Duplicated, then modified from [llama-2 7B example](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat)
"""
LICENSE = """
---
As a derivate work of [Llama-2-7b-chat](https://huggingface.co/meta-llama/Llama-2-7b-chat) by Meta,
this demo is governed by the original [license](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat/blob/main/LICENSE.txt) and [acceptable use policy](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat/blob/main/USE_POLICY.md).
"""
SYSTEM_PROMPT = """[INST] <>
<>"""
def read_pdf_to_documents(file_path):
doc = fitz.open(file_path)
documents = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
text = page.get_text()
documents.append(Document(text=text)) # Now Document is defined
return documents
# Function to update the global system prompt
def update_system_prompt(new_prompt):
global SYSTEM_PROMPT
SYSTEM_PROMPT = new_prompt
query_wrapper_prompt = SimpleInputPrompt("{query_str} [/INST]")
return "System prompt updated."
@spaces.GPU(duration=240)
def query_model(question):
llm = HuggingFaceLLM(
context_window=4096,
max_new_tokens=256,
system_prompt=SYSTEM_PROMPT,
query_wrapper_prompt=query_wrapper_prompt,
model=model,
tokenizer=tokenizer
)
#embeddings = LangchainEmbedding(HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2"))
service_context = ServiceContext.from_defaults(chunk_size=1024, llm=llm, embed_model=embeddings)
set_global_service_context(service_context)
response = query_engine.query(question)
# formatted_response = format_paragraph(response.response)
return response.response
def format_paragraph(text, line_length=80):
words = text.split()
lines = []
current_line = []
current_length = 0
for word in words:
if current_length + len(word) + 1 > line_length:
lines.append(' '.join(current_line))
current_line = [word]
current_length = len(word) + 1
else:
current_line.append(word)
current_length += len(word) + 1
if current_line:
lines.append(' '.join(current_line))
return '\n'.join(lines)
if not torch.cuda.is_available():
DESCRIPTION += "We won't be able to run this space! We need GPU processing"
if torch.cuda.is_available():
model_id = "meta-llama/Llama-2-7b-chat-hf"
model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, device_map="auto")
tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.use_default_system_prompt = False
# Throw together the query wrapper
query_wrapper_prompt = SimpleInputPrompt("{query_str} [/INST]")
llm = HuggingFaceLLM(context_window=4096,
max_new_tokens=256,
system_prompt=SYSTEM_PROMPT,
query_wrapper_prompt=query_wrapper_prompt,
model=model, tokenizer=tokenizer)
embeddings = LangchainEmbedding(HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2"))
service_context = ServiceContext.from_defaults(chunk_size=1024, llm=llm, embed_model=embeddings)
set_global_service_context(service_context)
file_path = Path("files/Full Pamplet.pdf")
documents = read_pdf_to_documents(file_path)
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
update_prompt_interface = gr.Interface(
fn=update_system_prompt,
inputs=gr.Textbox(lines=5, placeholder="Enter the system prompt here...", label="System Prompt", value=SYSTEM_PROMPT),
outputs=gr.Textbox(label="Status"),
title="System Prompt Updater",
description="Update the system prompt used for context."
)
# Create Gradio interface for querying the model
query_interface = gr.Interface(
fn=query_model,
inputs=gr.Textbox(lines=2, placeholder="Enter your question here...", label="User Question"),
outputs=gr.Textbox(label="Response"),
title="Document Query Assistant",
description="Ask questions based on the content of the loaded pamphlet."
)
# Combine the interfaces
combined_interface = gr.TabbedInterface([update_prompt_interface, query_interface], ["Update System Prompt", "Query Assistant"])
# Launch the combined interface
#combined_interface.launch()
"""
@spaces.GPU(duration=240)
def generate(
message: str,
chat_history: list[tuple[str, str]],
system_prompt: str,
max_new_tokens: int = MAX_MAX_NEW_TOKENS,
temperature: float = 0.6,
top_p: float = 0.9,
top_k: int = 50,
repetition_penalty: float = 1.2,
) -> Iterator[str]:
conversation = []
if system_prompt:
conversation.append({"role": "system", "content": system_prompt})
for user, assistant in chat_history:
conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}])
conversation.append({"role": "user", "content": message})
input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt")
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH:
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:]
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.")
input_ids = input_ids.to(model.device)
streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
{"input_ids": input_ids},
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
top_p=top_p,
top_k=top_k,
temperature=temperature,
num_beams=1,
repetition_penalty=repetition_penalty,
)
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start()
outputs = []
for text in streamer:
outputs.append(text)
yield "".join(outputs)
chat_interface = gr.ChatInterface(
fn=generate,
additional_inputs=[
gr.Textbox(label="System prompt", lines=6),
gr.Slider(
label="Max new tokens",
minimum=1,
maximum=MAX_MAX_NEW_TOKENS,
step=1,
value=DEFAULT_MAX_NEW_TOKENS,
),
gr.Slider(
label="Temperature",
minimum=0.1,
maximum=4.0,
step=0.1,
value=0.6,
),
gr.Slider(
label="Top-p (nucleus sampling)",
minimum=0.05,
maximum=1.0,
step=0.05,
value=0.9,
),
gr.Slider(
label="Top-k",
minimum=1,
maximum=1000,
step=1,
value=50,
),
gr.Slider(
label="Repetition penalty",
minimum=1.0,
maximum=2.0,
step=0.05,
value=1.2,
),
],
stop_btn=None,
examples=[
["Hello there! How are you doing?"],
["Can you explain briefly to me what is the Python programming language?"],
["Explain the plot of Cinderella in a sentence."],
["How many hours does it take a man to eat a Helicopter?"],
["Write a 100-word article on 'Benefits of Open-Source in AI research'"],
],
)
"""
with gr.Blocks(css="style.css") as demo:
gr.Markdown(DESCRIPTION)
#chat_interface.render()
combined_interface.render()
gr.Markdown(LICENSE)
if __name__ == "__main__":
demo.queue(max_size=20).launch()