ghana-streamlit / app.py
poemsforaphrodite's picture
Create app.py
2434cea verified
raw
history blame
8.37 kB
import streamlit as st
import PyPDF2
import io
import os
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
from openai import OpenAI
import uuid
import re
import time
# Load environment variables from .env file
load_dotenv()
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Initialize Pinecone
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENVIRONMENT")
INDEX_NAME = "ghana"
EMBEDDING_MODEL = "text-embedding-3-large"
EMBEDDING_DIMENSION = 3072
# Initialize Pinecone
pc = Pinecone(api_key=PINECONE_API_KEY)
# Check if the index exists
if INDEX_NAME not in pc.list_indexes().names():
# Create the index with updated dimensions
pc.create_index(
name=INDEX_NAME,
dimension=EMBEDDING_DIMENSION,
metric="cosine",
spec=ServerlessSpec(
cloud=PINECONE_ENVIRONMENT.split('-')[0], # Assuming environment is in format 'gcp-starter'
region=PINECONE_ENVIRONMENT.split('-')[1]
)
)
else:
# Optionally, verify the existing index's dimension matches
existing_index = pc.describe_index(INDEX_NAME)
if existing_index.dimension != EMBEDDING_DIMENSION:
raise ValueError(f"Existing index '{INDEX_NAME}' has dimension {existing_index.dimension}, expected {EMBEDDING_DIMENSION}. Please choose a different index name or adjust accordingly.")
# Connect to the Pinecone index
index = pc.Index(INDEX_NAME)
def transcribe_pdf(pdf_file):
print("Starting PDF transcription...")
# Read PDF and extract text
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file))
text = ""
for page in pdf_reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
print(f"Extracted {len(text)} characters from PDF.")
# Dynamic Chunking
chunks = dynamic_chunking(text, max_tokens=500, overlap=50)
print(f"Created {len(chunks)} chunks from the extracted text.")
# Process chunks one by one
progress_bar = st.progress(0)
for i, chunk in enumerate(chunks):
print(f"Processing chunk {i+1}/{len(chunks)}...")
# Generate embedding for the chunk
embedding = get_embedding(chunk)
# Prepare upsert data
upsert_data = [(str(uuid.uuid4()), embedding, {"text": chunk})]
# Upsert to Pinecone
print(f"Upserting vector to Pinecone index '{INDEX_NAME}'...")
index.upsert(vectors=upsert_data)
# Update progress bar
progress = (i + 1) / len(chunks)
progress_bar.progress(progress)
# Optional: Add a small delay to avoid potential rate limits
time.sleep(0.5)
progress_bar.empty()
return f"Successfully processed and upserted {len(chunks)} chunks to Pinecone index '{INDEX_NAME}'."
def dynamic_chunking(text, max_tokens=200, overlap=100):
print(f"Starting dynamic chunking with max_tokens={max_tokens} and overlap={overlap}...")
tokens = re.findall(r'\S+', text)
chunks = []
start = 0
while start < len(tokens):
end = start + max_tokens
chunk = ' '.join(tokens[start:end])
chunks.append(chunk)
start += max_tokens - overlap
print(f"Dynamic chunking complete. Created {len(chunks)} chunks.")
return chunks
def get_embedding(chunk):
print("Generating embedding for chunk...")
try:
response = client.embeddings.create(
input=chunk, # Now we can pass the chunk directly
model=EMBEDDING_MODEL
)
embedding = response.data[0].embedding
print("Successfully generated embedding.")
return embedding
except Exception as e:
print(f"Error during embedding generation: {str(e)}")
raise e
def clear_database():
print("Clearing the Pinecone index...")
try:
index.delete(delete_all=True)
return "Successfully cleared all vectors from the Pinecone index."
except Exception as e:
print(f"Error clearing the Pinecone index: {str(e)}")
return f"Error clearing the Pinecone index: {str(e)}"
def query_database(query_text):
print(f"Querying database with: {query_text}")
try:
query_embedding = get_embedding(query_text)
results = index.query(vector=query_embedding, top_k=5, include_metadata=True)
context = ""
for match in results['matches']:
metadata = match.get('metadata', {})
text = metadata.get('text', '')
context += f"{text}\n\n"
if not context:
return "No relevant information found in the database."
return generate_answer(query_text, context)
except Exception as e:
print(f"Error querying the database: {str(e)}")
return f"Error querying the database: {str(e)}"
def generate_answer(query, context):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an assistant for the Ghana Labor Act. Use the provided context to answer the user's question accurately and concisely."},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}
]
)
return response.choices[0].message.content
except Exception as e:
print(f"Error generating answer: {str(e)}")
return f"Error generating answer: {str(e)}"
def generate_hr_document(prompt):
print(f"Generating HR document with prompt: {prompt}")
try:
response = client.chat.completions.create(
model="gpt-4o-mini", # Updated to use gpt-4o-mini
messages=[
{"role": "system", "content": "You are an HR assistant. Generate a professional HR document based on the given prompt."},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content
except Exception as e:
print(f"Error generating HR document: {str(e)}")
return f"Error generating HR document: {str(e)}"
def main():
st.set_page_config(page_title="HR Document Assistant", layout="wide")
st.title("HR Document Assistant")
tab1, tab2, tab3, tab4 = st.tabs(["πŸ“€ Upload PDF", "πŸ” Query Database", "πŸ“ Generate HR Document", "πŸ—‘οΈ Clear Database"])
with tab1:
st.header("Upload PDF")
st.write("Upload a PDF file to extract its text content, chunk it dynamically, and upsert the chunks to the Pinecone index.")
pdf_file = st.file_uploader("Upload PDF", type="pdf")
if st.button("πŸ“₯ Transcribe and Upsert"):
if pdf_file is not None:
with st.spinner("Processing PDF..."):
result = transcribe_pdf(pdf_file.read())
st.success(result)
else:
st.error("Please upload a PDF file first.")
with tab2:
st.header("Query Database")
st.write("Enter a query about the Ghana Labor Act.")
query = st.text_input("Enter your query", placeholder="What does the Act say about...?")
if st.button("πŸ”Ž Get Answer"):
answer = query_database(query)
st.markdown("### Answer:")
st.write(answer)
with tab3:
st.header("Generate HR Document")
st.write("Enter a prompt to generate an HR document using GPT-4.")
prompt = st.text_area("Enter your prompt", placeholder="Describe the HR document you need...")
if st.button("✍️ Generate Document"):
document = generate_hr_document(prompt)
st.text_area("Generated Document", value=document, height=400)
with tab4:
st.header("Clear Database")
st.write("Use this option carefully. It will remove all data from the Pinecone index.")
if st.button("πŸ—‘οΈ Clear Database"):
result = clear_database()
st.success(result)
st.markdown("""
### πŸ“Œ Note
- Ensure you have the necessary API keys set up for OpenAI and Pinecone.
- The PDF upload process may take some time depending on the file size.
- Generated HR documents are based on AI and may require human review.
""")
if __name__ == "__main__":
main()