import gradio as gr import PyPDF2 import io import os from dotenv import load_dotenv from pinecone import Pinecone, ServerlessSpec from openai import OpenAI import uuid import re import time # Load environment variables from .env file load_dotenv() # Initialize OpenAI client client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Initialize Pinecone PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENVIRONMENT") INDEX_NAME = "ghana" EMBEDDING_MODEL = "text-embedding-3-large" EMBEDDING_DIMENSION = 3072 # Initialize Pinecone pc = Pinecone(api_key=PINECONE_API_KEY) # Check if the index exists if INDEX_NAME not in pc.list_indexes().names(): # Create the index with updated dimensions pc.create_index( name=INDEX_NAME, dimension=EMBEDDING_DIMENSION, metric="cosine", spec=ServerlessSpec( cloud=PINECONE_ENVIRONMENT.split('-')[0], # Assuming environment is in format 'gcp-starter' region=PINECONE_ENVIRONMENT.split('-')[1] ) ) else: # Optionally, verify the existing index's dimension matches existing_index = pc.describe_index(INDEX_NAME) if existing_index.dimension != EMBEDDING_DIMENSION: raise ValueError(f"Existing index '{INDEX_NAME}' has dimension {existing_index.dimension}, expected {EMBEDDING_DIMENSION}. Please choose a different index name or adjust accordingly.") # Connect to the Pinecone index index = pc.Index(INDEX_NAME) def transcribe_pdf(pdf_file): print("Starting PDF transcription...") # Read PDF and extract text pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file)) text = "" for page in pdf_reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" print(f"Extracted {len(text)} characters from PDF.") # Dynamic Chunking chunks = dynamic_chunking(text, max_tokens=500, overlap=50) print(f"Created {len(chunks)} chunks from the extracted text.") # Process chunks one by one for i, chunk in enumerate(chunks): print(f"Processing chunk {i+1}/{len(chunks)}...") # Generate embedding for the chunk embedding = get_embedding(chunk) # Prepare upsert data upsert_data = [(str(uuid.uuid4()), embedding, {"text": chunk})] # Upsert to Pinecone print(f"Upserting vector to Pinecone index '{INDEX_NAME}'...") index.upsert(vectors=upsert_data) # Optional: Add a small delay to avoid potential rate limits time.sleep(0.5) return f"Successfully processed and upserted {len(chunks)} chunks to Pinecone index '{INDEX_NAME}'." def dynamic_chunking(text, max_tokens=500, overlap=50): print(f"Starting dynamic chunking with max_tokens={max_tokens} and overlap={overlap}...") tokens = re.findall(r'\S+', text) chunks = [] start = 0 while start < len(tokens): end = start + max_tokens chunk = ' '.join(tokens[start:end]) chunks.append(chunk) start += max_tokens - overlap print(f"Dynamic chunking complete. Created {len(chunks)} chunks.") return chunks def get_embedding(chunk): print("Generating embedding for chunk...") print(chunk) try: response = client.embeddings.create( input=chunk, # Now we can pass the chunk directly model=EMBEDDING_MODEL ) print(chunk) embedding = response.data[0].embedding print("Successfully generated embedding.") return embedding except Exception as e: print(f"Error during embedding generation: {str(e)}") raise e def clear_database(): print("Clearing the Pinecone index...") try: index.delete(delete_all=True) return "Successfully cleared all vectors from the Pinecone index." except Exception as e: print(f"Error clearing the Pinecone index: {str(e)}") return f"Error clearing the Pinecone index: {str(e)}" # Create the Gradio app using Blocks with gr.Blocks() as app: gr.Markdown("# PDF Transcription and Pinecone Database Management") with gr.Tab("Transcribe PDF"): gr.Markdown("Upload a PDF file to extract its text content, chunk it dynamically, and upsert the chunks to a Pinecone index named 'ghana'.") pdf_input = gr.File(label="Upload PDF", type="binary") transcribe_button = gr.Button("Transcribe and Upsert") transcription_output = gr.Textbox(label="Transcription Result") transcribe_button.click(fn=transcribe_pdf, inputs=pdf_input, outputs=transcription_output) with gr.Tab("Clear Database"): gr.Markdown("Click the button to clear all vectors from the Pinecone index.") clear_button = gr.Button("Clear Database") clear_output = gr.Textbox(label="Clear Database Result") clear_button.click(fn=clear_database, outputs=clear_output) if __name__ == "__main__": app.launch()