Spaces:
Sleeping
Sleeping
import streamlit as st | |
import PyPDF2 | |
import io | |
import os | |
from dotenv import load_dotenv | |
from pinecone import Pinecone, ServerlessSpec | |
from openai import OpenAI | |
import uuid | |
import re | |
import time | |
# Load environment variables from .env file | |
load_dotenv() | |
# Initialize OpenAI client | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
# Initialize Pinecone | |
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENVIRONMENT") | |
INDEX_NAME = "ghana" | |
EMBEDDING_MODEL = "text-embedding-3-large" | |
EMBEDDING_DIMENSION = 3072 | |
# Initialize Pinecone | |
pc = Pinecone(api_key=PINECONE_API_KEY) | |
# Check if the index exists | |
if INDEX_NAME not in pc.list_indexes().names(): | |
# Create the index with updated dimensions | |
pc.create_index( | |
name=INDEX_NAME, | |
dimension=EMBEDDING_DIMENSION, | |
metric="cosine", | |
spec=ServerlessSpec( | |
cloud=PINECONE_ENVIRONMENT.split('-')[0], # Assuming environment is in format 'gcp-starter' | |
region=PINECONE_ENVIRONMENT.split('-')[1] | |
) | |
) | |
else: | |
# Optionally, verify the existing index's dimension matches | |
existing_index = pc.describe_index(INDEX_NAME) | |
if existing_index.dimension != EMBEDDING_DIMENSION: | |
raise ValueError(f"Existing index '{INDEX_NAME}' has dimension {existing_index.dimension}, expected {EMBEDDING_DIMENSION}. Please choose a different index name or adjust accordingly.") | |
# Connect to the Pinecone index | |
index = pc.Index(INDEX_NAME) | |
def transcribe_pdf(pdf_file): | |
print("Starting PDF transcription...") | |
# Read PDF and extract text | |
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file)) | |
text = "" | |
for page in pdf_reader.pages: | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
print(f"Extracted {len(text)} characters from PDF.") | |
# Dynamic Chunking | |
chunks = dynamic_chunking(text, max_tokens=500, overlap=50) | |
print(f"Created {len(chunks)} chunks from the extracted text.") | |
# Process chunks one by one | |
progress_bar = st.progress(0) | |
for i, chunk in enumerate(chunks): | |
print(f"Processing chunk {i+1}/{len(chunks)}...") | |
# Generate embedding for the chunk | |
embedding = get_embedding(chunk) | |
# Prepare upsert data | |
upsert_data = [(str(uuid.uuid4()), embedding, {"text": chunk})] | |
# Upsert to Pinecone | |
print(f"Upserting vector to Pinecone index '{INDEX_NAME}'...") | |
index.upsert(vectors=upsert_data) | |
# Update progress bar | |
progress = (i + 1) / len(chunks) | |
progress_bar.progress(progress) | |
# Optional: Add a small delay to avoid potential rate limits | |
time.sleep(0.5) | |
progress_bar.empty() | |
return f"Successfully processed and upserted {len(chunks)} chunks to Pinecone index '{INDEX_NAME}'." | |
def dynamic_chunking(text, max_tokens=200, overlap=100): | |
print(f"Starting dynamic chunking with max_tokens={max_tokens} and overlap={overlap}...") | |
tokens = re.findall(r'\S+', text) | |
chunks = [] | |
start = 0 | |
while start < len(tokens): | |
end = start + max_tokens | |
chunk = ' '.join(tokens[start:end]) | |
chunks.append(chunk) | |
start += max_tokens - overlap | |
print(f"Dynamic chunking complete. Created {len(chunks)} chunks.") | |
return chunks | |
def get_embedding(chunk): | |
print("Generating embedding for chunk...") | |
try: | |
response = client.embeddings.create( | |
input=chunk, # Now we can pass the chunk directly | |
model=EMBEDDING_MODEL | |
) | |
embedding = response.data[0].embedding | |
print("Successfully generated embedding.") | |
return embedding | |
except Exception as e: | |
print(f"Error during embedding generation: {str(e)}") | |
raise e | |
def clear_database(): | |
print("Clearing the Pinecone index...") | |
try: | |
index.delete(delete_all=True) | |
return "Successfully cleared all vectors from the Pinecone index." | |
except Exception as e: | |
print(f"Error clearing the Pinecone index: {str(e)}") | |
return f"Error clearing the Pinecone index: {str(e)}" | |
def query_database(query_text): | |
print(f"Querying database with: {query_text}") | |
try: | |
query_embedding = get_embedding(query_text) | |
results = index.query(vector=query_embedding, top_k=5, include_metadata=True) | |
context = "" | |
for match in results['matches']: | |
metadata = match.get('metadata', {}) | |
text = metadata.get('text', '') | |
context += f"{text}\n\n" | |
if not context: | |
return "No relevant information found in the database." | |
return generate_answer(query_text, context) | |
except Exception as e: | |
print(f"Error querying the database: {str(e)}") | |
return f"Error querying the database: {str(e)}" | |
def generate_answer(query, context): | |
try: | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are an assistant for the Ghana Labor Act. Use the provided context to answer the user's question accurately and concisely."}, | |
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} | |
] | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
print(f"Error generating answer: {str(e)}") | |
return f"Error generating answer: {str(e)}" | |
def generate_hr_document(prompt): | |
print(f"Generating HR document with prompt: {prompt}") | |
try: | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", # Updated to use gpt-4o-mini | |
messages=[ | |
{"role": "system", "content": "You are an HR assistant. Generate a professional HR document based on the given prompt."}, | |
{"role": "user", "content": prompt} | |
] | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
print(f"Error generating HR document: {str(e)}") | |
return f"Error generating HR document: {str(e)}" | |
def main(): | |
st.set_page_config(page_title="HR Document Assistant", layout="wide") | |
st.title("HR Document Assistant") | |
tab1, tab2, tab3, tab4 = st.tabs(["π€ Upload PDF", "π Query Database", "π Generate HR Document", "ποΈ Clear Database"]) | |
with tab1: | |
st.header("Upload PDF") | |
st.write("Upload a PDF file to extract its text content, chunk it dynamically, and upsert the chunks to the Pinecone index.") | |
pdf_file = st.file_uploader("Upload PDF", type="pdf") | |
if st.button("π₯ Transcribe and Upsert"): | |
if pdf_file is not None: | |
with st.spinner("Processing PDF..."): | |
result = transcribe_pdf(pdf_file.read()) | |
st.success(result) | |
else: | |
st.error("Please upload a PDF file first.") | |
with tab2: | |
st.header("Query Database") | |
st.write("Enter a query about the Ghana Labor Act.") | |
query = st.text_input("Enter your query", placeholder="What does the Act say about...?") | |
if st.button("π Get Answer"): | |
answer = query_database(query) | |
st.markdown("### Answer:") | |
st.write(answer) | |
with tab3: | |
st.header("Generate HR Document") | |
st.write("Enter a prompt to generate an HR document using GPT-4.") | |
prompt = st.text_area("Enter your prompt", placeholder="Describe the HR document you need...") | |
if st.button("βοΈ Generate Document"): | |
document = generate_hr_document(prompt) | |
st.text_area("Generated Document", value=document, height=400) | |
with tab4: | |
st.header("Clear Database") | |
st.write("Use this option carefully. It will remove all data from the Pinecone index.") | |
if st.button("ποΈ Clear Database"): | |
result = clear_database() | |
st.success(result) | |
st.markdown(""" | |
### π Note | |
- Ensure you have the necessary API keys set up for OpenAI and Pinecone. | |
- The PDF upload process may take some time depending on the file size. | |
- Generated HR documents are based on AI and may require human review. | |
""") | |
if __name__ == "__main__": | |
main() |