import streamlit as st import PyPDF2 import io import os from dotenv import load_dotenv from pinecone import Pinecone, ServerlessSpec from openai import OpenAI import uuid import re import time # Load environment variables from .env file load_dotenv() # Initialize OpenAI client client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Initialize Pinecone PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENVIRONMENT") INDEX_NAME = "ghana" EMBEDDING_MODEL = "text-embedding-3-large" EMBEDDING_DIMENSION = 3072 # Initialize Pinecone pc = Pinecone(api_key=PINECONE_API_KEY) # Check if the index exists if INDEX_NAME not in pc.list_indexes().names(): # Create the index with updated dimensions pc.create_index( name=INDEX_NAME, dimension=EMBEDDING_DIMENSION, metric="cosine", spec=ServerlessSpec( cloud=PINECONE_ENVIRONMENT.split('-')[0], # Assuming environment is in format 'gcp-starter' region=PINECONE_ENVIRONMENT.split('-')[1] ) ) else: # Optionally, verify the existing index's dimension matches existing_index = pc.describe_index(INDEX_NAME) if existing_index.dimension != EMBEDDING_DIMENSION: raise ValueError(f"Existing index '{INDEX_NAME}' has dimension {existing_index.dimension}, expected {EMBEDDING_DIMENSION}. Please choose a different index name or adjust accordingly.") # Connect to the Pinecone index index = pc.Index(INDEX_NAME) def transcribe_pdf(pdf_file): print("Starting PDF transcription...") # Read PDF and extract text pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file)) text = "" for page in pdf_reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" print(f"Extracted {len(text)} characters from PDF.") # Dynamic Chunking chunks = dynamic_chunking(text, max_tokens=500, overlap=50) print(f"Created {len(chunks)} chunks from the extracted text.") # Process chunks one by one progress_bar = st.progress(0) for i, chunk in enumerate(chunks): print(f"Processing chunk {i+1}/{len(chunks)}...") # Generate embedding for the chunk embedding = get_embedding(chunk) # Prepare upsert data upsert_data = [(str(uuid.uuid4()), embedding, {"text": chunk})] # Upsert to Pinecone print(f"Upserting vector to Pinecone index '{INDEX_NAME}'...") index.upsert(vectors=upsert_data) # Update progress bar progress = (i + 1) / len(chunks) progress_bar.progress(progress) # Optional: Add a small delay to avoid potential rate limits time.sleep(0.5) progress_bar.empty() return f"Successfully processed and upserted {len(chunks)} chunks to Pinecone index '{INDEX_NAME}'." def dynamic_chunking(text, max_tokens=200, overlap=100): print(f"Starting dynamic chunking with max_tokens={max_tokens} and overlap={overlap}...") tokens = re.findall(r'\S+', text) chunks = [] start = 0 while start < len(tokens): end = start + max_tokens chunk = ' '.join(tokens[start:end]) chunks.append(chunk) start += max_tokens - overlap print(f"Dynamic chunking complete. Created {len(chunks)} chunks.") return chunks def get_embedding(chunk): print("Generating embedding for chunk...") try: response = client.embeddings.create( input=chunk, # Now we can pass the chunk directly model=EMBEDDING_MODEL ) embedding = response.data[0].embedding print("Successfully generated embedding.") return embedding except Exception as e: print(f"Error during embedding generation: {str(e)}") raise e def clear_database(): print("Clearing the Pinecone index...") try: index.delete(delete_all=True) return "Successfully cleared all vectors from the Pinecone index." except Exception as e: print(f"Error clearing the Pinecone index: {str(e)}") return f"Error clearing the Pinecone index: {str(e)}" def query_database(query_text): print(f"Querying database with: {query_text}") try: query_embedding = get_embedding(query_text) results = index.query(vector=query_embedding, top_k=5, include_metadata=True) context = "" for match in results['matches']: metadata = match.get('metadata', {}) text = metadata.get('text', '') context += f"{text}\n\n" if not context: return "No relevant information found in the database." return generate_answer(query_text, context) except Exception as e: print(f"Error querying the database: {str(e)}") return f"Error querying the database: {str(e)}" def generate_answer(query, context): try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are an assistant for the Ghana Labor Act. Use the provided context to answer the user's question accurately and concisely."}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} ] ) return response.choices[0].message.content except Exception as e: print(f"Error generating answer: {str(e)}") return f"Error generating answer: {str(e)}" def generate_hr_document(prompt): print(f"Generating HR document with prompt: {prompt}") try: response = client.chat.completions.create( model="gpt-4o-mini", # Updated to use gpt-4o-mini messages=[ {"role": "system", "content": "You are an HR assistant. Generate a professional HR document based on the given prompt."}, {"role": "user", "content": prompt} ] ) return response.choices[0].message.content except Exception as e: print(f"Error generating HR document: {str(e)}") return f"Error generating HR document: {str(e)}" def main(): st.set_page_config(page_title="HR Document Assistant", layout="wide") st.title("HR Document Assistant") tab1, tab2, tab3, tab4 = st.tabs(["📤 Upload PDF", "🔍 Query Database", "📝 Generate HR Document", "🗑️ Clear Database"]) with tab1: st.header("Upload PDF") st.write("Upload a PDF file to extract its text content, chunk it dynamically, and upsert the chunks to the Pinecone index.") pdf_file = st.file_uploader("Upload PDF", type="pdf") if st.button("📥 Transcribe and Upsert"): if pdf_file is not None: with st.spinner("Processing PDF..."): result = transcribe_pdf(pdf_file.read()) st.success(result) else: st.error("Please upload a PDF file first.") with tab2: st.header("Query Database") st.write("Enter a query about the Ghana Labor Act.") query = st.text_input("Enter your query", placeholder="What does the Act say about...?") if st.button("🔎 Get Answer"): answer = query_database(query) st.markdown("### Answer:") st.write(answer) with tab3: st.header("Generate HR Document") st.write("Enter a prompt to generate an HR document using GPT-4.") prompt = st.text_area("Enter your prompt", placeholder="Describe the HR document you need...") if st.button("✍️ Generate Document"): document = generate_hr_document(prompt) st.text_area("Generated Document", value=document, height=400) with tab4: st.header("Clear Database") st.write("Use this option carefully. It will remove all data from the Pinecone index.") if st.button("🗑️ Clear Database"): result = clear_database() st.success(result) st.markdown(""" ### 📌 Note - Ensure you have the necessary API keys set up for OpenAI and Pinecone. - The PDF upload process may take some time depending on the file size. - Generated HR documents are based on AI and may require human review. """) if __name__ == "__main__": main()