Spaces:
Sleeping
Sleeping
import streamlit as st | |
import PyPDF2 | |
import re | |
import nltk | |
from nltk.tokenize import word_tokenize | |
from nltk.corpus import wordnet | |
import requests | |
from typing import Optional | |
import pandas as pd | |
from sqlalchemy import create_engine, Column, Integer, String, Float | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import sessionmaker | |
import json | |
import openai # Import OpenAI | |
# Initialize NLTK resources | |
def download_nltk_resources(): | |
resources = { | |
'punkt': 'tokenizers/punkt', | |
'averaged_perceptron_tagger': 'taggers/averaged_perceptron_tagger', | |
'wordnet': 'corpora/wordnet', | |
'stopwords': 'corpora/stopwords' | |
} | |
for package, resource in resources.items(): | |
try: | |
nltk.data.find(resource) | |
except LookupError: | |
nltk.download(package) | |
download_nltk_resources() | |
# Ensure spaCy model is downloaded | |
import spacy | |
try: | |
nlp = spacy.load("en_core_web_sm") | |
except OSError: | |
spacy.cli.download("en_core_web_sm") | |
nlp = spacy.load("en_core_web_sm") | |
# Database setup | |
Base = declarative_base() | |
class ResumeScore(Base): | |
__tablename__ = 'resume_scores' | |
id = Column(Integer, primary_key=True) | |
resume_name = Column(String) | |
score = Column(Float) | |
skills = Column(String) | |
certifications = Column(String) | |
experience_years = Column(Float) | |
education_level = Column(String) | |
summary = Column(String) | |
# Create engine and session | |
engine = create_engine('sqlite:///resumes.db') | |
Base.metadata.create_all(engine) | |
Session = sessionmaker(bind=engine) | |
session = Session() | |
# Custom CSS to enhance UI | |
def set_custom_css(): | |
st.markdown(""" | |
<style> | |
.stProgress .st-bo { | |
background-color: #f0f2f6; | |
} | |
.stProgress .st-bp { | |
background: linear-gradient(to right, #4CAF50, #8BC34A); | |
} | |
.skill-tag { | |
display: inline-block; | |
padding: 5px 10px; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
def get_docparser_data(file, api_key, parser_id) -> Optional[dict]: | |
try: | |
# First, upload the document | |
upload_url = f"https://api.docparser.com/v1/document/upload/{parser_id}" | |
# Create proper headers with base64 encoded API key | |
import base64 | |
auth_string = base64.b64encode(f"{api_key}:".encode()).decode() | |
headers = { | |
'Authorization': f'Basic {auth_string}' | |
} | |
# Prepare the file for upload | |
files = { | |
'file': (file.name, file, 'application/pdf') | |
} | |
# Upload document | |
upload_response = requests.post( | |
upload_url, | |
headers=headers, | |
files=files | |
) | |
upload_response.raise_for_status() | |
# Get document ID from upload response | |
upload_data = upload_response.json() | |
# Extract document ID from the correct response format | |
document_id = upload_data.get('id') | |
if not document_id: | |
st.error("Failed to get document ID from upload response") | |
return None | |
# Wait a moment for processing | |
import time | |
time.sleep(3) # Increased wait time to ensure document is processed | |
# Get parsed results | |
results_url = f"https://api.docparser.com/v1/results/{parser_id}/{document_id}" | |
results_response = requests.get( | |
results_url, | |
headers=headers | |
) | |
results_response.raise_for_status() | |
# Handle results | |
results_data = results_response.json() | |
if isinstance(results_data, list) and len(results_data) > 0: | |
# Map the fields according to your Docparser parser configuration | |
result = results_data[0] # Get the first result | |
parsed_data = { | |
'name': result.get('name', result.get('full_name', 'Unknown')), | |
'email': result.get('email', 'Unknown'), | |
'phone': result.get('phone', result.get('phone_number', 'Unknown')), | |
'skills': result.get('skills', []), | |
'certifications': result.get('certifications', []), | |
'experience_years': float(result.get('experience_years', 0)), | |
'degree': result.get('degree', result.get('education_degree', 'Not specified')), | |
'institution': result.get('institution', result.get('university', 'Not specified')), | |
'year': result.get('year', result.get('graduation_year', 'Not specified')), | |
'summary': result.get('summary', result.get('profile_summary', 'No summary available')), | |
'projects': result.get('projects', []) | |
} | |
# Convert skills from string to list if needed | |
if isinstance(parsed_data['skills'], str): | |
parsed_data['skills'] = [skill.strip() for skill in parsed_data['skills'].split(',')] | |
# Convert certifications from string to list if needed | |
if isinstance(parsed_data['certifications'], str): | |
parsed_data['certifications'] = [cert.strip() for cert in parsed_data['certifications'].split(',')] | |
return parsed_data | |
else: | |
st.error(f"No parsed data received from Docparser: {results_data}") | |
return None | |
except requests.exceptions.HTTPError as http_err: | |
st.error(f"HTTP error occurred: {http_err}") | |
if hasattr(http_err, 'response') and http_err.response is not None: | |
st.error(f"Response content: {http_err.response.content}") | |
except json.JSONDecodeError as json_err: | |
st.error(f"JSON decode error: {json_err}") | |
st.error("Raw response content: " + str(upload_response.content if 'upload_response' in locals() else 'No response')) | |
except Exception as e: | |
st.error(f"Error fetching data from Docparser: {e}") | |
st.error(f"Upload data: {upload_data if 'upload_data' in locals() else 'No upload data'}") | |
st.error(f"Results data: {results_data if 'results_data' in locals() else 'No results data'}") | |
return None | |
def get_openai_data(file, openai_key: str) -> Optional[dict]: | |
openai.api_key = openai_key | |
try: | |
file_content = file.read() | |
response = openai.Completion.create( | |
engine="text-davinci-003", | |
prompt=f"Extract and analyze the resume content: {file_content}", | |
max_tokens=1500 | |
) | |
return response.choices[0].text | |
except Exception as e: | |
st.error(f"Error fetching data from OpenAI: {e}") | |
return None | |
def calculate_weighted_score(skills, certifications, experience_years, education_level, projects, skill_weight, certification_weight, experience_weight, education_weight, project_weight): | |
skill_score = min(len(skills) * 15, 100) | |
certification_score = min(len(certifications) * 20, 100) | |
experience_score = min(experience_years * 15, 100) | |
education_score = 100 if education_level else 0 | |
project_score = min(len(projects) * 10, 100) # Assuming each project contributes 10 points | |
total_score = ( | |
skill_score * skill_weight + | |
certification_score * certification_weight + | |
experience_score * experience_weight + | |
education_score * education_weight + | |
project_score * project_weight | |
) | |
return round(min(total_score, 100), 2) | |
def process_resume(file, job_description, filename, parser_choice, openai_key=None, api_key=None, parser_id=None, skill_weight=0.9, certification_weight=0.05, experience_weight=0.03, education_weight=0.02, project_weight=0.1): | |
try: | |
if parser_choice == "Docparser": | |
data = get_docparser_data(file, api_key, parser_id) | |
elif parser_choice == "OpenAI": | |
data = get_openai_data(file, openai_key) | |
else: | |
st.error("Invalid parser choice") | |
return None | |
if not data: | |
st.warning(f"Failed to extract data from the resume {filename}") | |
return None | |
# Extract fields from the response | |
personal_details = { | |
'name': data.get('name', 'Unknown'), | |
'email': data.get('email', 'Unknown'), | |
'phone': data.get('phone', 'Unknown') | |
} | |
education = { | |
'degree': data.get('degree', 'Not specified'), | |
'institution': data.get('institution', 'Not specified'), | |
'year': data.get('year', 'Not specified') | |
} | |
experience_years = data.get('experience_years', 0) | |
# Ensure certifications, skills, and projects are lists of strings | |
certifications = [cert if isinstance(cert, str) else str(cert) for cert in data.get('certifications', [])] | |
skills = [skill if isinstance(skill, str) else str(skill) for skill in data.get('skills', [])] | |
projects = [project if isinstance(project, str) else str(project) for project in data.get('projects', [])] # Assuming 'projects' is a key in the data | |
summary = data.get('summary', 'No summary available') | |
# Calculate weighted score | |
weighted_score = calculate_weighted_score( | |
skills, certifications, experience_years, education.get('degree', 'Not specified'), projects, | |
skill_weight, certification_weight, experience_weight, education_weight, project_weight | |
) | |
resume_name = filename or personal_details.get('name', 'Unknown') | |
skills_str = ', '.join(skills) | |
certifications_str = ', '.join(certifications) | |
projects_str = ', '.join(projects) | |
resume_score = ResumeScore( | |
resume_name=resume_name, | |
score=weighted_score, | |
skills=skills_str, | |
certifications=certifications_str, | |
experience_years=experience_years, | |
education_level=education.get('degree', 'Not specified'), | |
summary=summary | |
) | |
session.add(resume_score) | |
session.commit() | |
result = { | |
'name': resume_name, | |
'score': weighted_score, | |
'personal_details': personal_details, | |
'education': education, | |
'experience': {'total_years': experience_years}, | |
'certifications': certifications, | |
'skills': skills, | |
'projects': projects, # Include projects in the result | |
'summary': summary | |
} | |
return result | |
except Exception as e: | |
st.error(f"Error processing the resume {filename}: {e}") | |
session.rollback() | |
return None | |
def process_resumes(files, job_description, parser_choice, openai_key=None, api_key=None, parser_id=None, skill_weight=0.9, certification_weight=0.05, experience_weight=0.03, education_weight=0.02, project_weight=0.1): | |
scores = [] | |
processed_count = 0 | |
try: | |
if not files: | |
st.warning("No PDF files uploaded") | |
return [] | |
total_files = len(files) | |
progress_bar = st.progress(0) | |
for index, file in enumerate(files): | |
result = process_resume(file, job_description, file.name, parser_choice, openai_key, api_key, parser_id, skill_weight, certification_weight, experience_weight, education_weight, project_weight) | |
if result: | |
scores.append(result) | |
processed_count += 1 | |
progress = (index + 1) / total_files | |
progress_bar.progress(progress) | |
st.success(f"Successfully processed {processed_count} resumes") | |
return scores | |
except Exception as e: | |
st.error(f"Error processing resumes: {e}") | |
session.rollback() | |
return [] | |
def display_results(result): | |
with st.expander(f"📄 {result.get('name', 'Unknown')} - Match: {result['score']}%"): | |
st.write(f"### Overall Match Score: {result['score']}%") | |
st.write("### Skills Found:") | |
if result['skills']: | |
for skill in result['skills']: | |
st.markdown(f"- {skill}") | |
else: | |
st.markdown("No skills found.") | |
st.write("### Certifications:") | |
if result['certifications']: | |
for cert in result['certifications']: | |
st.markdown(f"- {cert}") | |
else: | |
st.markdown("No certifications found.") | |
st.write(f"### Total Years of Experience: {result['experience'].get('total_years', 0)}") | |
st.write("### Education:") | |
degree = result['education'].get('degree', 'Not specified') | |
st.markdown(f"- Degree: {degree}") | |
if st.button(f"View Detailed Analysis ({result.get('name', 'Unknown')})", key=f"view_{result.get('name', 'default')}"): | |
st.write("#### Resume Summary:") | |
st.text(result['summary']) | |
def view_scores(): | |
st.header("Stored Resume Scores") | |
resumes = session.query(ResumeScore).order_by(ResumeScore.score.desc()).all() | |
if resumes: | |
data = [] | |
for idx, resume in enumerate(resumes, start=1): | |
try: | |
# Attempt to parse skills and certifications as JSON | |
skills = json.loads(resume.skills) | |
certifications = json.loads(resume.certifications) | |
# Extract values if they are in JSON format | |
skills_str = ', '.join([skill['key_0'] for skill in skills]) if isinstance(skills, list) else resume.skills | |
certifications_str = ', '.join([cert['key_0'] for cert in certifications]) if isinstance(certifications, list) else resume.certifications | |
except json.JSONDecodeError: | |
# If parsing fails, treat them as plain strings | |
skills_str = resume.skills | |
certifications_str = resume.certifications | |
data.append({ | |
'S.No': idx, | |
'Name': resume.resume_name, | |
'Score': resume.score, | |
'Skills': skills_str, | |
'Certifications': certifications_str, | |
'Experience (Years)': resume.experience_years, | |
'Education': resume.education_level, | |
'Projects': resume.summary | |
}) | |
df = pd.DataFrame(data) | |
df_display = df[['S.No', 'Name', 'Score', 'Skills', 'Certifications', 'Experience (Years)', 'Education', 'Projects']] | |
# Define a threshold for best-fit resumes | |
threshold = 50 | |
best_fits = df[df['Score'] >= threshold] | |
# Display all resumes | |
st.subheader("All Resumes") | |
for index, row in df_display.iterrows(): | |
with st.container(): | |
col1, col2, col3 = st.columns([3, 1, 1]) | |
with col1: | |
st.write(f"**{row['Name']}** (Score: {row['Score']}%)") | |
st.write(f"Skills: {row['Skills']}") | |
st.write(f"Experience: {row['Experience (Years)']} years") | |
with col2: | |
if st.button(f"View Details", key=f"view_{index}"): | |
st.write(f"### Analysis Report") | |
st.write(f"Skills: {row['Skills']}") | |
st.write(f"Certifications: {row['Certifications']}") | |
st.write(f"Experience: {row['Experience (Years)']} years") | |
st.write(f"Education: {row['Education']}") | |
st.write(f"Projects: {row['Projects']}") | |
with col3: | |
if st.button(f"Delete", key=f"delete_{index}"): | |
resume_to_delete = session.query(ResumeScore).filter_by(resume_name=row['Name']).first() | |
if resume_to_delete: | |
session.delete(resume_to_delete) | |
session.commit() | |
st.success(f"Deleted {row['Name']}") | |
st.rerun() # Use st.rerun() instead of experimental_set_query_params | |
# Display best-fit resumes | |
if not best_fits.empty: | |
st.subheader("Best Fit Resumes") | |
for index, row in best_fits.iterrows(): | |
with st.container(): | |
col1, col2, col3 = st.columns([3, 1, 1]) | |
with col1: | |
st.write(f"**{row['Name']}** (Score: {row['Score']}%)") | |
st.write(f"Skills: {row['Skills']}") | |
st.write(f"Experience: {row['Experience (Years)']} years") | |
with col2: | |
if st.button(f"View Details", key=f"view_best_{index}"): | |
st.write(f"### Analysis Report") | |
st.write(f"Skills: {row['Skills']}") | |
st.write(f"Certifications: {row['Certifications']}") | |
st.write(f"Experience: {row['Experience (Years)']} years") | |
st.write(f"Education: {row['Education']}") | |
st.write(f"Projects: {row['Projects']}") | |
with col3: | |
if st.button(f"Delete", key=f"delete_best_{index}"): | |
resume_to_delete = session.query(ResumeScore).filter_by(resume_name=row['Name']).first() | |
if resume_to_delete: | |
session.delete(resume_to_delete) | |
session.commit() | |
st.success(f"Deleted {row['Name']}") | |
st.rerun() # Use st.rerun() instead of experimental_set_query_params | |
else: | |
st.write("No resume scores available.") | |
def main(): | |
st.title("Resume Analyzer") | |
set_custom_css() | |
menu = ["Home", "View Scores"] | |
choice = st.sidebar.selectbox("Menu", menu) | |
if choice == "Home": | |
analysis_type = st.selectbox("Select Analysis Type:", ["Single Resume", "Folder Upload"]) | |
method_choice = st.selectbox("Select Method:", ["Use LLM", "Use Field Extraction"]) | |
openai_key = None # Initialize openai_key | |
if method_choice == "Use LLM": | |
openai_key = st.text_input("Enter OpenAI API Key:", type="password") | |
parser_choice = "OpenAI" | |
else: | |
parser_choice = "Docparser" # Only Docparser is available for field extraction | |
api_key = st.text_input("Enter Docparser API Key:", type="password") | |
parser_id = st.text_input("Enter Docparser Parser ID:") | |
job_description = st.text_area("Enter job description:", height=150, placeholder="Paste job description here...", key="job_desc") | |
# Configure weights | |
st.sidebar.header("Configure Weights") | |
skill_weight = st.sidebar.slider("Skill Weight", 0.0, 1.0, 0.9) | |
certification_weight = st.sidebar.slider("Certification Weight", 0.0, 1.0, 0.05) | |
experience_weight = st.sidebar.slider("Experience Weight", 0.0, 1.0, 0.03) | |
education_weight = st.sidebar.slider("Education Weight", 0.0, 1.0, 0.02) | |
project_weight = st.sidebar.slider("Project Weight", 0.0, 1.0, 0.1) # New slider for project weight | |
if analysis_type == "Single Resume": | |
uploaded_file = st.file_uploader("Upload a resume PDF file", type="pdf") | |
if st.button("Analyze Resume"): | |
if not uploaded_file: | |
st.error("Please upload a resume PDF file") | |
return | |
if not job_description: | |
st.error("Please enter a job description") | |
return | |
if method_choice == "Use LLM" and not openai_key: | |
st.error("Please enter the OpenAI API key") | |
return | |
if method_choice == "Use Field Extraction" and (not api_key or not parser_id): | |
st.error("Please enter the Docparser API key and Parser ID") | |
return | |
with st.spinner("Processing resume..."): | |
result = process_resume(uploaded_file, job_description, uploaded_file.name, parser_choice, openai_key, api_key, parser_id, skill_weight, certification_weight, experience_weight, education_weight, project_weight) | |
if result: | |
st.success("Analysis complete!") | |
display_results(result) | |
else: | |
st.warning("Failed to process the resume.") | |
elif analysis_type == "Folder Upload": | |
uploaded_files = st.file_uploader("Upload multiple resume PDF files", type="pdf", accept_multiple_files=True) | |
if st.button("Analyze Resumes"): | |
if not uploaded_files: | |
st.error("Please upload resume PDF files") | |
return | |
if not job_description: | |
st.error("Please enter a job description") | |
return | |
if method_choice == "Use LLM" and not openai_key: | |
st.error("Please enter the OpenAI API key") | |
return | |
if method_choice == "Use Field Extraction" and (not api_key or not parser_id): | |
st.error("Please enter the Docparser API key and Parser ID") | |
return | |
with st.spinner("Processing resumes..."): | |
scores = process_resumes(uploaded_files, job_description, parser_choice, openai_key, api_key, parser_id, skill_weight, certification_weight, experience_weight, education_weight, project_weight) | |
if scores: | |
st.success("Analysis complete!") | |
for result in scores: | |
display_results(result) | |
else: | |
st.warning("No valid resumes found to process") | |
with st.expander("ℹ️ How to use"): | |
st.markdown(""" | |
1. Select the analysis type: Single Resume or Folder Upload. | |
2. Choose the method: Use LLM or Use Field Extraction. | |
3. If using LLM, enter the OpenAI API key. | |
4. If using Field Extraction, enter the Docparser API key and Parser ID. | |
5. Upload a resume PDF file or multiple files. | |
6. Paste the job description. | |
7. Configure the weights for skills, certifications, experience, education, and projects. | |
8. Click 'Analyze' to start processing. | |
9. View the match score and extracted information. | |
10. Click 'View Detailed Analysis' to see the summary and more details. | |
""") | |
elif choice == "View Scores": | |
view_scores() | |
if __name__ == "__main__": | |
main() |