quick-spin / test.py
DrishtiSharma's picture
Create test.py
0c77c36 verified
# to-do: Enable downloading multiple patent PDFs via corresponding links
import sys
import os
import re
import shutil
import time
import fitz
import streamlit as st
import nltk
import tempfile
import subprocess
# Pin NLTK to version 3.9.1
REQUIRED_NLTK_VERSION = "3.9.1"
subprocess.run([sys.executable, "-m", "pip", "install", f"nltk=={REQUIRED_NLTK_VERSION}"])
# Set up temporary directory for NLTK resources
nltk_data_path = os.path.join(tempfile.gettempdir(), "nltk_data")
os.makedirs(nltk_data_path, exist_ok=True)
nltk.data.path.append(nltk_data_path)
# Download 'punkt_tab' for compatibility
try:
print("Ensuring NLTK 'punkt_tab' resource is downloaded...")
nltk.download("punkt_tab", download_dir=nltk_data_path)
except Exception as e:
print(f"Error downloading NLTK 'punkt_tab': {e}")
raise e
sys.path.append(os.path.abspath("."))
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.llms import OpenAI
from langchain.document_loaders import UnstructuredPDFLoader
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import NLTKTextSplitter
from patent_downloader import PatentDownloader
from langchain.document_loaders import PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
PERSISTED_DIRECTORY = tempfile.mkdtemp()
# Fetch API key securely from the environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("Critical Error: OpenAI API key not found in the environment variables. Please configure it.")
st.stop()
def check_poppler_installed():
if not shutil.which("pdfinfo"):
raise EnvironmentError(
"Poppler is not installed or not in PATH. Install 'poppler-utils' for PDF processing."
)
check_poppler_installed()
def load_docs(document_path):
"""
Load and clean the PDF content, then split into chunks.
"""
try:
import fitz # PyMuPDF for text extraction
# Step 1: Extract plain text from PDF
doc = fitz.open(document_path)
extracted_text = []
for page_num, page in enumerate(doc):
page_text = page.get_text("text") # Extract text
clean_page_text = clean_extracted_text(page_text)
if clean_page_text: # Keep only non-empty cleaned text
extracted_text.append(clean_page_text)
doc.close()
# Combine all pages into one text
full_text = "\n".join(extracted_text)
st.write(f"📄 Total Cleaned Text Length: {len(full_text)} characters")
# Step 2: Chunk the cleaned text
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100,
separators=["\n\n", "\n", " ", ""]
)
split_docs = text_splitter.create_documents([full_text])
# Debug: Show total chunks count and first 3 chunks for verification
st.write(f"🔍 Total Chunks After Splitting: {len(split_docs)}")
for i, doc in enumerate(split_docs[:3]): # Show first 3 chunks only
st.write(f"Chunk {i + 1}: {doc.page_content[:300]}...")
return split_docs
except Exception as e:
st.error(f"Failed to load and process PDF: {e}")
st.stop()
def clean_extracted_text(text):
"""
Cleans extracted text to remove metadata, headers, and irrelevant content.
"""
lines = text.split("\n")
cleaned_lines = []
for line in lines:
line = line.strip()
# Filter out lines with metadata patterns
if (
re.match(r"^(U\.S\.|United States|Sheet|Figure|References|Patent No|Date of Patent)", line)
or re.match(r"^\(?\d+\)?$", line) # Matches single numbers (page numbers)
or "Examiner" in line
or "Attorney" in line
or len(line) < 30 # Skip very short lines
):
continue
cleaned_lines.append(line)
return "\n".join(cleaned_lines)
def already_indexed(vectordb, file_name):
indexed_sources = set(
x["source"] for x in vectordb.get(include=["metadatas"])["metadatas"]
)
return file_name in indexed_sources
def load_chain(file_name=None):
"""
Load cleaned PDF text, split into chunks, and update the vectorstore.
"""
loaded_patent = st.session_state.get("LOADED_PATENT")
# Debug: Show persist directory
st.write(f"🗂 Using Persisted Directory: {PERSISTED_DIRECTORY}")
vectordb = Chroma(
persist_directory=PERSISTED_DIRECTORY,
embedding_function=HuggingFaceEmbeddings(),
)
if loaded_patent == file_name or already_indexed(vectordb, file_name):
st.write("✅ Already indexed.")
else:
st.write("🔄 Starting document processing and vectorstore update...")
# Remove existing collection and load new docs
vectordb.delete_collection()
docs = load_docs(file_name)
# Update vectorstore
vectordb = Chroma.from_documents(
docs, HuggingFaceEmbeddings(), persist_directory=PERSISTED_DIRECTORY
)
vectordb.persist()
st.write("✅ Vectorstore successfully updated and persisted.")
# Save loaded patent in session state
st.session_state["LOADED_PATENT"] = file_name
# Debug: Check vectorstore indexing summary
indexed_docs = vectordb.get(include=["documents"])
st.write(f"✅ Total Indexed Documents: {len(indexed_docs['documents'])}")
# Test retrieval with a simple query
retriever = vectordb.as_retriever(search_kwargs={"k": 3})
test_query = "What is this document about?"
results = retriever.get_relevant_documents(test_query)
st.write("🔍 Test Retrieval Results for Query:")
if results:
for i, res in enumerate(results):
st.write(f"Retrieved Doc {i + 1}: {res.page_content[:200]}...")
else:
st.warning("No documents retrieved for test query.")
# Configure memory for conversation
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
return ConversationalRetrievalChain.from_llm(
OpenAI(temperature=0, openai_api_key=OPENAI_API_KEY),
retriever,
memory=memory
)
def extract_patent_number(url):
pattern = r"/patent/([A-Z]{2}\d+)"
match = re.search(pattern, url)
return match.group(1) if match else None
def download_pdf(patent_number):
try:
patent_downloader = PatentDownloader(verbose=True)
output_path = patent_downloader.download(patents=patent_number, output_path=tempfile.gettempdir())
return output_path[0]
except Exception as e:
st.error(f"Failed to download patent PDF: {e}")
st.stop()
def preview_pdf(pdf_path, scale_factor=0.5):
"""
Generate and display a resized preview of the first page of the PDF.
Args:
pdf_path (str): Path to the PDF file.
scale_factor (float): Factor to reduce the image size (default is 0.5).
Returns:
str: Path to the resized image preview.
"""
try:
# Open the PDF and extract the first page
doc = fitz.open(pdf_path)
first_page = doc[0]
# Apply scaling using a transformation matrix
matrix = fitz.Matrix(scale_factor, scale_factor) # Scale down the image
pix = first_page.get_pixmap(matrix=matrix) # Generate scaled image
# Save the preview image
temp_image_path = os.path.join(tempfile.gettempdir(), "pdf_preview.png")
pix.save(temp_image_path)
doc.close()
return temp_image_path
except Exception as e:
st.error(f"Error generating PDF preview: {e}")
return None
if __name__ == "__main__":
st.set_page_config(
page_title="Patent Chat: Google Patents Chat Demo",
page_icon="📖",
layout="wide",
initial_sidebar_state="expanded",
)
st.header("📖 Patent Chat: Google Patents Chat Demo")
# Input for Google Patent Link
patent_link = st.text_area(
"Enter Google Patent Link:",
value="https://patents.google.com/patent/US8676427B1/en",
height=90
)
# Initialize session state
for key in ["LOADED_PATENT", "pdf_preview", "loaded_pdf_path", "chain", "messages"]:
if key not in st.session_state:
st.session_state[key] = None
# Button to load and process patent
if st.button("Load and Process Patent"):
if not patent_link:
st.warning("Please enter a valid Google patent link.")
st.stop()
# Extract patent number
patent_number = extract_patent_number(patent_link)
if not patent_number:
st.error("Invalid patent link format.")
st.stop()
st.write(f"Patent number: **{patent_number}**")
# File handling
pdf_path = os.path.join(tempfile.gettempdir(), f"{patent_number}.pdf")
if not os.path.isfile(pdf_path):
with st.spinner("📥 Downloading patent file..."):
try:
pdf_path = download_pdf(patent_number)
st.write(f"✅ File downloaded: {pdf_path}")
except Exception as e:
st.error(f"Failed to download patent: {e}")
st.stop()
else:
st.write("✅ File already downloaded.")
# Generate PDF preview only if not already displayed
if not st.session_state.get("pdf_preview_displayed", False):
with st.spinner("🖼️ Generating PDF preview..."):
preview_image_path = preview_pdf(pdf_path, scale_factor=0.5)
if preview_image_path:
st.session_state.pdf_preview = preview_image_path
st.image(preview_image_path, caption="First Page Preview", use_container_width=False)
st.session_state["pdf_preview_displayed"] = True
else:
st.warning("Failed to generate PDF preview.")
st.session_state.pdf_preview = None
# Load the document into the system
with st.spinner("🔄 Loading document into the system..."):
try:
st.session_state.chain = load_chain(pdf_path)
st.session_state.LOADED_PATENT = patent_number
st.session_state.loaded_pdf_path = pdf_path
st.session_state.messages = [{"role": "assistant", "content": "Hello! How can I assist you with this patent?"}]
st.success("🚀 Document successfully loaded! You can now start asking questions.")
except Exception as e:
st.error(f"Failed to load the document: {e}")
st.stop()
# Display previous chat messages
if st.session_state.messages:
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# User input for questions
if st.session_state.chain:
if user_input := st.chat_input("What is your question?"):
# User message
st.session_state.messages.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
# Assistant response
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
with st.spinner("Generating response..."):
try:
# Generate response using the chain
assistant_response = st.session_state.chain({"question": user_input})
full_response = assistant_response.get("answer", "I'm sorry, I couldn't process that question.")
except Exception as e:
full_response = f"An error occurred: {e}"
message_placeholder.markdown(full_response)
st.session_state.messages.append({"role": "assistant", "content": full_response})
else:
st.info("Press the 'Load and Process Patent' button to start processing.")