Spaces:
Sleeping
Sleeping
import streamlit as st | |
import PyPDF2 | |
import io | |
import os | |
from dotenv import load_dotenv | |
from pinecone import Pinecone, ServerlessSpec | |
from openai import OpenAI | |
import uuid | |
import re | |
import time | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
# Load environment variables from .env file | |
load_dotenv() | |
# Initialize OpenAI client | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
# Initialize Pinecone | |
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENVIRONMENT") | |
INDEX_NAME = "ghana" | |
EMBEDDING_MODEL = "text-embedding-3-large" | |
EMBEDDING_DIMENSION = 3072 | |
# Initialize Pinecone | |
pc = Pinecone(api_key=PINECONE_API_KEY) | |
# Check if the index exists | |
if INDEX_NAME not in pc.list_indexes().names(): | |
# Create the index with updated dimensions | |
pc.create_index( | |
name=INDEX_NAME, | |
dimension=EMBEDDING_DIMENSION, | |
metric="cosine", | |
spec=ServerlessSpec( | |
cloud=PINECONE_ENVIRONMENT.split('-')[0], # Assuming environment is in format 'gcp-starter' | |
region=PINECONE_ENVIRONMENT.split('-')[1] | |
) | |
) | |
else: | |
# Optionally, verify the existing index's dimension matches | |
existing_index = pc.describe_index(INDEX_NAME) | |
if existing_index.dimension != EMBEDDING_DIMENSION: | |
raise ValueError(f"Existing index '{INDEX_NAME}' has dimension {existing_index.dimension}, expected {EMBEDDING_DIMENSION}. Please choose a different index name or adjust accordingly.") | |
# Connect to the Pinecone index | |
index = pc.Index(INDEX_NAME) | |
def transcribe_pdf(pdf_file): | |
print("Starting PDF transcription...") | |
# Read PDF and extract text | |
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file)) | |
text = "" | |
for page in pdf_reader.pages: | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
print(f"Extracted {len(text)} characters from PDF.") | |
# Dynamic Chunking | |
chunks = dynamic_chunking(text, max_tokens=500, overlap=50) | |
print(f"Created {len(chunks)} chunks from the extracted text.") | |
# Process chunks one by one | |
progress_bar = st.progress(0) | |
for i, chunk in enumerate(chunks): | |
print(f"Processing chunk {i+1}/{len(chunks)}...") | |
# Generate embedding for the chunk | |
embedding = get_embedding(chunk) | |
# Prepare upsert data | |
upsert_data = [(str(uuid.uuid4()), embedding, {"text": chunk})] | |
# Upsert to Pinecone | |
print(f"Upserting vector to Pinecone index '{INDEX_NAME}'...") | |
index.upsert(vectors=upsert_data) | |
# Update progress bar | |
progress = (i + 1) / len(chunks) | |
progress_bar.progress(progress) | |
# Optional: Add a small delay to avoid potential rate limits | |
time.sleep(0.5) | |
progress_bar.empty() | |
return f"Successfully processed and upserted {len(chunks)} chunks to Pinecone index '{INDEX_NAME}'." | |
def dynamic_chunking(text, max_tokens=200, overlap=100): | |
print(f"Starting dynamic chunking with max_tokens={max_tokens} and overlap={overlap}...") | |
tokens = re.findall(r'\S+', text) | |
chunks = [] | |
start = 0 | |
while start < len(tokens): | |
end = start + max_tokens | |
chunk = ' '.join(tokens[start:end]) | |
chunks.append(chunk) | |
start += max_tokens - overlap | |
print(f"Dynamic chunking complete. Created {len(chunks)} chunks.") | |
return chunks | |
def get_embedding(chunk): | |
print("Generating embedding for chunk...") | |
try: | |
response = client.embeddings.create( | |
input=chunk, # Now we can pass the chunk directly | |
model=EMBEDDING_MODEL | |
) | |
embedding = response.data[0].embedding | |
print("Successfully generated embedding.") | |
return embedding | |
except Exception as e: | |
print(f"Error during embedding generation: {str(e)}") | |
raise e | |
def clear_database(): | |
print("Clearing the Pinecone index...") | |
try: | |
index.delete(delete_all=True) | |
return "Successfully cleared all vectors from the Pinecone index." | |
except Exception as e: | |
print(f"Error clearing the Pinecone index: {str(e)}") | |
return f"Error clearing the Pinecone index: {str(e)}" | |
def query_database(query_text): | |
print(f"Querying database with: {query_text}") | |
try: | |
query_embedding = get_embedding(query_text) | |
results = index.query(vector=query_embedding, top_k=5, include_metadata=True) | |
context = "" | |
for match in results['matches']: | |
metadata = match.get('metadata', {}) | |
text = metadata.get('text', '') | |
context += f"{text}\n\n" | |
if not context: | |
return "No relevant information found in the database." | |
return generate_answer(query_text, context) | |
except Exception as e: | |
print(f"Error querying the database: {str(e)}") | |
return f"Error querying the database: {str(e)}" | |
def generate_answer(query, context): | |
try: | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are an assistant for the Ghana Labor Act. Use the provided context to answer the user's question accurately and concisely."}, | |
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} | |
] | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
print(f"Error generating answer: {str(e)}") | |
return f"Error generating answer: {str(e)}" | |
def generate_hr_document(document_type, additional_info): | |
print(f"Generating HR document: {document_type}") | |
try: | |
prompt = f"""Generate a professional {document_type} for an HR department. | |
Additional information: {additional_info} | |
Important: Format the response as plain text, not markdown. Use appropriate line breaks and spacing for readability.""" | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are an expert HR assistant. Generate a professional HR document based on the given type and additional information. Format the response as plain text, not markdown."}, | |
{"role": "user", "content": prompt} | |
] | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
print(f"Error generating HR document: {str(e)}") | |
return f"Error generating HR document: {str(e)}" | |
def calculate_paye(annual_income): | |
tax_bands = [ | |
(5880, 0), | |
(1320, 0.05), | |
(1560, 0.10), | |
(38000, 0.175), | |
(192000, 0.25), | |
(366240, 0.30), | |
(float('inf'), 0.35) | |
] | |
remaining_income = annual_income | |
total_tax = 0 | |
for band, rate in tax_bands: | |
if remaining_income <= 0: | |
break | |
taxable_amount = min(band, remaining_income) | |
tax = taxable_amount * rate | |
total_tax += tax | |
remaining_income -= taxable_amount | |
return total_tax | |
def calculate_ssnit(basic_salary): | |
return basic_salary * 0.055 | |
def main(): | |
st.set_page_config(page_title="HR Document Assistant", layout="wide") | |
# Create a header with logo and title | |
col1, col2 = st.columns([1, 4]) | |
with col1: | |
st.image("logo.png", width=200) # Adjust the width as needed | |
with col2: | |
st.title("HR Document Assistant") | |
tab1, tab2, tab3, tab4, tab5 = st.tabs([ | |
"📤 Upload PDF", | |
"🔍 Query Database", | |
"📝 Generate HR Document", | |
"🧮 Tax Calculator", | |
"🗑️ Clear Database" | |
]) | |
with tab1: | |
st.header("Upload PDF") | |
st.write("Upload a PDF file to extract its text content, chunk it dynamically, and upsert the chunks to the Pinecone index.") | |
pdf_file = st.file_uploader("Upload PDF", type="pdf") | |
if st.button("📥 Transcribe and Upsert"): | |
if pdf_file is not None: | |
with st.spinner("Processing PDF..."): | |
result = transcribe_pdf(pdf_file.read()) | |
st.success(result) | |
else: | |
st.error("Please upload a PDF file first.") | |
with tab2: | |
st.header("Query Database") | |
st.write("Enter a query about the Ghana Labor Act.") | |
query = st.text_input("Enter your query", placeholder="What does the Act say about...?") | |
if st.button("🔎 Get Answer"): | |
answer = query_database(query) | |
st.markdown("### Answer:") | |
st.write(answer) | |
with tab3: | |
st.header("Generate HR Document") | |
st.write("Select an HR document type and provide additional information to generate the document.") | |
document_types = [ | |
"Employment Contract", "Offer Letter", "Job Description", "Employee Handbook", | |
"Performance Review Form", "Disciplinary Action Form", "Leave Request Form", | |
"Onboarding Checklist", "Termination Letter", "Non-Disclosure Agreement (NDA)", | |
"Code of Conduct", "Workplace Policy", "Benefits Summary", "Compensation Plan", | |
"Training and Development Plan", "Resignation Letter", "Exit Interview Form", | |
"Employee Grievance Form", "Time-off Request Form", "Workplace Safety Guidelines" | |
] | |
selected_document = st.selectbox("Select HR Document Type", document_types) | |
additional_info = st.text_area( | |
"Additional Information", | |
placeholder="Enter any specific details or requirements for the document..." | |
) | |
if st.button("✍️ Generate Document"): | |
with st.spinner("Generating document..."): | |
document = generate_hr_document(selected_document, additional_info) | |
st.subheader(f"Generated {selected_document}") | |
st.text_area("Document Content", value=document, height=400) | |
st.download_button( | |
label="Download Document", | |
data=document, | |
file_name=f"{selected_document.lower().replace(' ', '_')}.txt", | |
mime="text/plain" | |
) | |
with tab4: | |
st.header("Tax Calculator") | |
st.write("Calculate PAYE and SSNIT contributions based on annual income and basic salary.") | |
salary_examples = { | |
"Entry Level": (36000, 30000), | |
"Mid Level": (72000, 60000), | |
"Senior Level": (120000, 90000), | |
"Executive": (240000, 180000) | |
} | |
selected_example = st.selectbox( | |
"Select a salary example or enter custom values:", | |
["Custom"] + list(salary_examples.keys()) | |
) | |
if selected_example == "Custom": | |
annual_income = st.number_input("Annual Income (GH₵)", min_value=0.0, value=0.0, step=1000.0) | |
basic_salary = st.number_input("Basic Salary (GH₵)", min_value=0.0, value=0.0, step=1000.0) | |
else: | |
annual_income, basic_salary = salary_examples[selected_example] | |
st.write(f"Annual Income: GH₵ {annual_income:.2f}") | |
st.write(f"Basic Salary: GH₵ {basic_salary:.2f}") | |
if st.button("Calculate Taxes"): | |
ssnit_contribution = calculate_ssnit(basic_salary) | |
taxable_income = annual_income - ssnit_contribution | |
paye = calculate_paye(taxable_income) | |
net_income = annual_income - ssnit_contribution - paye | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("Tax Breakdown") | |
st.write(f"SSNIT Contribution: GH₵ {ssnit_contribution:.2f}") | |
st.write(f"PAYE: GH₵ {paye:.2f}") | |
st.write(f"Total Deductions: GH₵ {(ssnit_contribution + paye):.2f}") | |
st.write(f"Net Income: GH₵ {net_income:.2f}") | |
with col2: | |
# Pie chart for income breakdown | |
fig, ax = plt.subplots(figsize=(3, 2)) | |
sizes = [ssnit_contribution, paye, net_income] | |
labels = ['SSNIT', 'PAYE', 'Net'] | |
colors = ['#ff9999', '#66b3ff', '#99ff99'] | |
ax.pie( | |
sizes, labels=labels, colors=colors, | |
autopct='%1.1f%%', startangle=90, textprops={'fontsize': 6} | |
) | |
ax.axis('equal') | |
plt.title("Income Breakdown", fontsize=8) | |
st.pyplot(fig) | |
# Display tax rates by income bracket as a table | |
st.subheader("Tax Rates by Income Bracket") | |
tax_data = { | |
"Income Range (GH₵)": [ | |
"0 - 5,880", "5,881 - 7,200", "7,201 - 8,760", | |
"8,761 - 46,760", "46,761 - 238,760", | |
"238,761 - 605,000", "Above 605,000" | |
], | |
"Rate (%)": [0, 5, 10, 17.5, 25, 30, 35] | |
} | |
df = pd.DataFrame(tax_data) | |
st.table(df) | |
with tab5: | |
st.header("Clear Database") | |
st.write("Use this option carefully. It will remove all data from the Pinecone index.") | |
if st.button("🗑️ Clear Database"): | |
result = clear_database() | |
st.success(result) | |
st.markdown(""" | |
### 📌 Note | |
- Ensure you have the necessary API keys set up for OpenAI and Pinecone. | |
- The PDF upload process may take some time depending on the file size. | |
- Generated HR documents are based on AI and may require human review. | |
""") | |
if __name__ == "__main__": | |
main() |