Spaces:
Runtime error
Runtime error
# Import necessary libraries | |
import PyPDF2 | |
from transformers import RagTokenizer, RagRetriever, RagSequenceForGeneration | |
from sentence_transformers import SentenceTransformer | |
import torch | |
from tkinter import Tk, filedialog | |
# Step 1: Ask for the PDF file via file upload dialog | |
def upload_file(): | |
"""Open a file dialog to select a PDF file.""" | |
root = Tk() | |
root.withdraw() # Hide the root window | |
file_path = filedialog.askopenfilename( | |
title="Select PDF File", | |
filetypes=[("PDF Files", "*.pdf")], | |
) | |
if not file_path: | |
raise ValueError("No file was selected. Please upload a PDF.") | |
return file_path | |
print("Please select the PDF file containing the chapter.") | |
try: | |
file_path = upload_file() | |
print(f"File selected: {file_path}") | |
except Exception as e: | |
print(f"Error: {e}") | |
exit() | |
# Step 2: Extract text from the PDF | |
def extract_text_from_pdf(file_path): | |
text = "" | |
with open(file_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
return text | |
chapter_text = extract_text_from_pdf(file_path) | |
print("Text extracted from the PDF successfully!") | |
# Step 3: Split the text into smaller passages | |
def split_text_into_chunks(text, chunk_size=500): | |
"""Split the text into chunks of size chunk_size.""" | |
words = text.split() | |
chunks = [] | |
for i in range(0, len(words), chunk_size): | |
chunk = " ".join(words[i:i + chunk_size]) | |
chunks.append(chunk) | |
return chunks | |
passages = split_text_into_chunks(chapter_text) | |
print(f"Chapter split into {len(passages)} passages for RAG processing.") | |
# Step 4: Initialize the RAG model and tokenizer | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Load the RAG model, tokenizer, and retriever | |
tokenizer = RagTokenizer.from_pretrained("facebook/rag-token-nq") | |
retriever = RagRetriever.from_pretrained( | |
"facebook/rag-token-nq", | |
index_name="custom", | |
passages=passages, # Set passages as the custom index | |
use_dummy_dataset=True, # Dummy dataset required for custom index | |
) | |
model = RagSequenceForGeneration.from_pretrained("facebook/rag-token-nq").to(device) | |
# Step 5: Encode passages into embeddings for retrieval | |
sentence_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") | |
passage_embeddings = sentence_model.encode(passages, convert_to_tensor=True) | |
retriever.index.set_passages(passages, passage_embeddings.cpu().detach().numpy()) | |
print("Passages indexed successfully!") | |
# Step 6: Define a function to generate answers | |
def generate_answer(question, passages): | |
inputs = tokenizer.prepare_seq2seq_batch( | |
questions=[question], return_tensors="pt" | |
).to(device) | |
generated_ids = model.generate(**inputs) | |
answer = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
return answer | |
# Step 7: Interactive Question-Answering | |
print("\nChapter is ready. You can now ask questions!") | |
while True: | |
user_question = input("\nEnter your question (or type 'exit' to quit): ") | |
if user_question.lower() == "exit": | |
print("Exiting the application. Thank you!") | |
break | |
try: | |
answer = generate_answer(user_question, passages) | |
print(f"Answer: {answer}") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |