paper-central / paper_chat_tab.py
IAMJB's picture
revert
64632c4
raw
history blame
14.1 kB
import gradio as gr
from PyPDF2 import PdfReader
from bs4 import BeautifulSoup
import openai
import traceback
import requests
from io import BytesIO
from transformers import AutoTokenizer
import json
import os
from openai import OpenAI
# Cache for tokenizers to avoid reloading
tokenizer_cache = {}
# Global variables for providers
PROVIDERS = {
"SambaNova": {
"name": "SambaNova",
"logo": "https://venturebeat.com/wp-content/uploads/2020/02/SambaNovaLogo_H_F.jpg",
"endpoint": "https://api.sambanova.ai/v1/",
"api_key_env_var": "SAMBANOVA_API_KEY",
"models": [
"Meta-Llama-3.1-70B-Instruct",
# Add more models if needed
],
"type": "tuples",
"max_total_tokens": "50000",
},
"Hyperbolic": {
"name": "hyperbolic",
"logo": "https://www.nftgators.com/wp-content/uploads/2024/07/Hyperbolic.jpg",
"endpoint": "https://api.hyperbolic.xyz/v1",
"api_key_env_var": "HYPERBOLIC_API_KEY",
"models": [
"meta-llama/Meta-Llama-3.1-405B-Instruct",
],
"type": "tuples",
"max_total_tokens": "50000",
},
}
# Function to fetch paper information from OpenReview
def fetch_paper_info_neurips(paper_id):
url = f"https://openreview.net/forum?id={paper_id}"
response = requests.get(url)
if response.status_code != 200:
return None
html_content = response.content
soup = BeautifulSoup(html_content, 'html.parser')
# Extract title
title_tag = soup.find('h2', class_='citation_title')
title = title_tag.get_text(strip=True) if title_tag else 'Title not found'
# Extract authors
authors = []
author_div = soup.find('div', class_='forum-authors')
if author_div:
author_tags = author_div.find_all('a')
authors = [tag.get_text(strip=True) for tag in author_tags]
author_list = ', '.join(authors) if authors else 'Authors not found'
# Extract abstract
abstract_div = soup.find('strong', text='Abstract:')
if abstract_div:
abstract_paragraph = abstract_div.find_next_sibling('div')
abstract = abstract_paragraph.get_text(strip=True) if abstract_paragraph else 'Abstract not found'
else:
abstract = 'Abstract not found'
# Construct preamble in Markdown
preamble = f"**[{title}](https://openreview.net/forum?id={paper_id})**\n\n{author_list}\n\n"
return preamble
def fetch_paper_content(paper_id):
try:
# Construct the URL
url = f"https://openreview.net/pdf?id={paper_id}"
# Fetch the PDF
response = requests.get(url)
response.raise_for_status() # Raise an exception for HTTP errors
# Read the PDF content
pdf_content = BytesIO(response.content)
reader = PdfReader(pdf_content)
# Extract text from the PDF
text = ""
for page in reader.pages:
text += page.extract_text()
return text # Return full text; truncation will be handled later
except Exception as e:
print(f"An error occurred: {e}")
return None
def create_chat_interface(provider_dropdown, model_dropdown, paper_content, hf_token_input, default_type,
provider_max_total_tokens):
# Define the function to handle the chat
print("the type is", default_type.value)
def get_fn(message, history, paper_content_value, hf_token_value, provider_name_value, model_name_value,
max_total_tokens):
provider_info = PROVIDERS[provider_name_value]
endpoint = provider_info['endpoint']
api_key_env_var = provider_info['api_key_env_var']
models = provider_info['models']
max_total_tokens = int(max_total_tokens)
# Load tokenizer and cache it
tokenizer_key = f"{provider_name_value}_{model_name_value}"
if tokenizer_key not in tokenizer_cache:
# Load the tokenizer; adjust the model path based on the provider and model
# This is a placeholder; you need to provide the correct tokenizer path
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B-Instruct",
token=os.environ.get("HF_TOKEN"))
tokenizer_cache[tokenizer_key] = tokenizer
else:
tokenizer = tokenizer_cache[tokenizer_key]
# Include the paper content as context
if paper_content_value:
context = f"The discussion is about the following paper:\n{paper_content_value}\n\n"
else:
context = ""
# Tokenize the context
context_tokens = tokenizer.encode(context)
context_token_length = len(context_tokens)
# Prepare the messages without context
messages = []
message_tokens_list = []
total_tokens = context_token_length # Start with context tokens
for user_msg, assistant_msg in history:
# Tokenize user message
user_tokens = tokenizer.encode(user_msg)
messages.append({"role": "user", "content": user_msg})
message_tokens_list.append(len(user_tokens))
total_tokens += len(user_tokens)
# Tokenize assistant message
if assistant_msg:
assistant_tokens = tokenizer.encode(assistant_msg)
messages.append({"role": "assistant", "content": assistant_msg})
message_tokens_list.append(len(assistant_tokens))
total_tokens += len(assistant_tokens)
# Tokenize the new user message
message_tokens = tokenizer.encode(message)
messages.append({"role": "user", "content": message})
message_tokens_list.append(len(message_tokens))
total_tokens += len(message_tokens)
# Check if total tokens exceed the maximum allowed tokens
if total_tokens > max_total_tokens:
# Attempt to truncate the context first
available_tokens = max_total_tokens - (total_tokens - context_token_length)
if available_tokens > 0:
# Truncate the context to fit the available tokens
truncated_context_tokens = context_tokens[:available_tokens]
context = tokenizer.decode(truncated_context_tokens)
context_token_length = available_tokens
total_tokens = total_tokens - len(context_tokens) + context_token_length
else:
# Not enough space for context; remove it
context = ""
total_tokens -= context_token_length
context_token_length = 0
# If total tokens still exceed the limit, truncate the message history
while total_tokens > max_total_tokens and len(messages) > 1:
# Remove the oldest message
removed_message = messages.pop(0)
removed_tokens = message_tokens_list.pop(0)
total_tokens -= removed_tokens
# Rebuild the final messages list including the (possibly truncated) context
final_messages = []
if context:
final_messages.append(
{"role": "system", "content": f"{context}"})
final_messages.extend(messages)
# Use the provider's API key
api_key = hf_token_value or os.environ.get(api_key_env_var)
if not api_key:
raise ValueError("API token is not provided.")
# Initialize the OpenAI client with the provider's endpoint
client = OpenAI(
base_url=endpoint,
api_key=api_key,
)
try:
# Create the chat completion
completion = client.chat.completions.create(
model=model_name_value,
messages=final_messages,
stream=True,
)
response_text = ""
for chunk in completion:
delta = chunk.choices[0].delta.content or ""
response_text += delta
yield response_text
except json.JSONDecodeError as e:
print("Failed to decode JSON during the completion creation process.")
print(f"Error Message: {e.msg}")
print(f"Error Position: Line {e.lineno}, Column {e.colno} (Character {e.pos})")
print(f"Problematic JSON Data: {e.doc}")
yield f"{e.doc}"
except openai.OpenAIError as openai_err:
# Handle other OpenAI-related errors
print(f"An OpenAI error occurred: {openai_err}")
yield f"{openai_err}"
except Exception as ex:
# Handle any other exceptions
print(f"An unexpected error occurred: {ex}")
yield f"{ex}"
# Create the ChatInterface
chat_interface = gr.ChatInterface(
fn=get_fn,
chatbot=gr.Chatbot(
label="Chatbot",
scale=1,
height=400,
autoscroll=True,
),
additional_inputs=[paper_content, hf_token_input, provider_dropdown, model_dropdown, provider_max_total_tokens],
type="tuples",
)
return chat_interface
def paper_chat_tab(paper_id):
with gr.Column():
# Textbox to display the paper title and authors
content = gr.Markdown(value="")
# Preamble message to hint the user
gr.Markdown("**Note:** Providing your own API token can help you avoid rate limits.")
# Input for API token
provider_names = list(PROVIDERS.keys())
default_provider = provider_names[0]
default_type = gr.State(value=PROVIDERS[default_provider]["type"])
default_max_total_tokens = gr.State(value=PROVIDERS[default_provider]["max_total_tokens"])
provider_dropdown = gr.Dropdown(
label="Select Provider",
choices=provider_names,
value=default_provider
)
hf_token_input = gr.Textbox(
label=f"Enter your {default_provider} API token (optional)",
type="password",
placeholder=f"Enter your {default_provider} API token to avoid rate limits"
)
# Dropdown for selecting the model
model_dropdown = gr.Dropdown(
label="Select Model",
choices=PROVIDERS[default_provider]['models'],
value=PROVIDERS[default_provider]['models'][0]
)
# Placeholder for the provider logo
logo_html = gr.HTML(
value=f'<img src="{PROVIDERS[default_provider]["logo"]}" width="100px" />'
)
# Note about the provider
note_markdown = gr.Markdown(f"**Note:** This model is supported by {default_provider}.")
# State to store the paper content
paper_content = gr.State()
# Function to update models and logo when provider changes
def update_provider(selected_provider):
provider_info = PROVIDERS[selected_provider]
models = provider_info['models']
logo_url = provider_info['logo']
chatbot_message_type = provider_info['type']
max_total_tokens = provider_info['max_total_tokens']
# Update the models dropdown
model_dropdown_choices = gr.update(choices=models, value=models[0])
# Update the logo image
logo_html_content = f'<img src="{logo_url}" width="100px" />'
logo_html_update = gr.update(value=logo_html_content)
# Update the note markdown
note_markdown_update = gr.update(value=f"**Note:** This model is supported by {selected_provider}.")
# Update the hf_token_input label and placeholder
hf_token_input_update = gr.update(
label=f"Enter your {selected_provider} API token (optional)",
placeholder=f"Enter your {selected_provider} API token to avoid rate limits"
)
return model_dropdown_choices, logo_html_update, note_markdown_update, hf_token_input_update, chatbot_message_type, max_total_tokens
provider_dropdown.change(
fn=update_provider,
inputs=provider_dropdown,
outputs=[model_dropdown, logo_html, note_markdown, hf_token_input, default_type, default_max_total_tokens],
queue=False
)
# Function to update the paper info
def update_paper_info(paper_id_value, selected_model):
preamble = fetch_paper_info_neurips(paper_id_value)
text = fetch_paper_content(paper_id_value)
if preamble is None:
preamble = "Paper not found or could not retrieve paper information."
if text is None:
return preamble, None
return preamble, text
# Update paper content when paper ID or model changes
paper_id.change(
fn=update_paper_info,
inputs=[paper_id, model_dropdown],
outputs=[content, paper_content]
)
model_dropdown.change(
fn=update_paper_info,
inputs=[paper_id, model_dropdown],
outputs=[content, paper_content],
queue=False,
)
# Create the chat interface
chat_interface = create_chat_interface(provider_dropdown, model_dropdown, paper_content, hf_token_input,
default_type, default_max_total_tokens)
def main():
"""
Launches the Gradio app.
"""
with gr.Blocks(css_paths="style.css") as demo:
x = gr.State(value="") # Initialize with an empty state
def update_state():
"""
Function to update the state.
"""
return "5G7ve8E1Lu"
with gr.Row():
update_button = gr.Button("Update State") # Button to update the state
# Update the state and reflect the change in the display
update_button.click(update_state, inputs=[], outputs=[x])
paper_chat_tab(x)
demo.launch(ssr_mode=False)
# Run the main function when the script is executed
if __name__ == "__main__":
main()