Spaces:
Sleeping
Sleeping
import os | |
import random | |
from datetime import datetime, timedelta | |
from typing import Union | |
import pathlib | |
import smtplib | |
from email.message import EmailMessage | |
import streamlit as st | |
from sqlalchemy.orm import Session | |
from sqlalchemy import select | |
from models import Employee, Job , JobsApplied,User, ENGINE | |
from llm import parse, rerank | |
st.markdown("""- | |
<style> | |
.stButton > button{ | |
border: black; | |
box-shadow: 0 12px 16px 0 rgba(0,0,0,0.24), 0 17px 50px 0 rgba(0,0,0,0.19); | |
} | |
</style>""",unsafe_allow_html=True) | |
def clear_cache(): | |
for k in st.session_state: | |
del st.session_state[k] | |
def login() -> Union[Employee,None]: | |
login_container = st.empty() | |
with login_container.container(): | |
employee_id = st.text_input(label=":email: User ID", placeholder = "User ID").upper().strip() | |
password = st.text_input(label = ":lock: Password", placeholder = "Password", type = "password") | |
button = st.button('Login', type = "secondary") | |
if button: | |
if employee_id and password: | |
with Session(ENGINE) as session: | |
stmt = select(Employee).where(Employee.employee_id == employee_id).\ | |
where(Employee.password == password) | |
employee = session.scalars(stmt).one_or_none() | |
print(employee) | |
if employee is None: | |
st.error('Invalid UserID/password') | |
return | |
st.session_state['employee_logged'] = True | |
st.session_state['employee'] = employee | |
login_container.empty() | |
return employee | |
else: | |
st.error('Empty UserID/Password') | |
def add_to_db(**job) -> None: | |
job_id = ''.join(chr(random.randint(65,90)) for i in range(5))+f'{random.randint(100,1000)}' | |
job['job_id'] = job_id | |
job['employee_id'] = st.session_state["employee"].employee_id | |
job['created_at'] = datetime.now().strftime("%d-%m-%Y %H:%M:%S") | |
job['primary_skills'] = ', '.join(job['primary_skills']) | |
job['secondary_skills'] = ', '.join(job['secondary_skills']) | |
with Session(ENGINE) as session: | |
job_object = Job(**job) | |
session.add_all([job_object]) | |
session.commit() | |
st.success(f"Successfully posted job {job['post_name']}!") | |
def add_job() -> None: | |
post_name = st.text_input(label = 'Post Name').strip().title() | |
description = st.text_area(label = 'Job Description').strip() | |
responsibilities = st.text_area(label = "Responsibility").strip() | |
vacancies = st.number_input(label="Number Vacancies",min_value=1) | |
col1, col2 = st.columns(2) | |
with col1: | |
min_experience = st.number_input(label = "Minimum Experience",min_value = 0, ) | |
with col2: | |
max_experience = st.number_input(label = "Maximum Experience",min_value = 1, ) | |
number_of_primary_skills = st.number_input(label = "How many primary skills?",min_value = 1, ) | |
primary_skills = [st.text_input(label = f'Primary Skill {i}',key=f'Primary Skill {i}').strip() for i in range(1,number_of_primary_skills+1)] | |
number_of_secondary_skills = st.number_input(label = "How many secondary skills?",min_value = 0, ) | |
secondary_skills = [st.text_input(label = f'Secondary Skill {i}',key=f'Secondary Skill {i}').strip() for i in range(1,number_of_secondary_skills+1)] | |
st.markdown("Expires at") | |
col1, col2 = st.columns(2) | |
with col1: | |
expires_date = st.date_input(label = "expries date",value =datetime.now() + timedelta(days = 1), min_value = datetime.now() + timedelta(days = 1), max_value=None, label_visibility="collapsed") | |
print(expires_date ) | |
with col2: | |
expires_time = st.time_input(label = "Time Input", label_visibility="collapsed") | |
print(expires_time) | |
expires_at = f'{expires_date.strftime("%d-%m-%Y")} {expires_time.strftime("%H:%M:%S")}' | |
st.markdown("<p style='color: red'>⚠️ Check before you submit once submitted you cannot make changes</p>", unsafe_allow_html=True) | |
if st.button("Post"): | |
if (post_name and description and responsibilities and primary_skills): | |
if min_experience <= max_experience: | |
job = { | |
"post_name" : post_name, 'description': description,'min_experience': min_experience,'max_experience': max_experience, | |
'primary_skills': primary_skills, 'secondary_skills': secondary_skills,'responsibilities':responsibilities, | |
'expires_at': expires_at, 'vacancies': vacancies | |
} | |
add_to_db(**job) | |
return True | |
st.error("Minimum Experience must be less than maximum experience") | |
return | |
st.error("Post Name/Description/Responsibility/Primary SKills are required") | |
def post_job(): | |
st.session_state['employee'] | |
st.session_state['employee_logged'] | |
if st.session_state['employee_logged']: | |
col1, col2 = st.columns([0.001, 0.01]) | |
with col2: | |
st.markdown(f'Would you like to include a job vacancy listing. {st.session_state["employee"].full_name}?') | |
with col1: | |
if st.button(':heavy_plus_sign:'): | |
st.session_state['add_job'] = True | |
if st.session_state['add_job']: | |
add_job_container = st.empty() | |
with add_job_container.container(): | |
is_job_posted = add_job() | |
if is_job_posted: | |
add_job_container.empty() | |
def get_jobs_posted() -> None: | |
with Session(ENGINE) as session: | |
stmt = select(Job).where(Job.employee_id == st.session_state["employee"].employee_id) | |
jobs = session.scalars(stmt).all() | |
print(jobs) | |
for job in jobs: | |
with st.expander(f"{job.job_id}: {job.post_name} posted at: {job.created_at}"): | |
pass | |
def get_users_who_applied_for_jobs(): | |
#Get Jobs Posted | |
with Session(ENGINE) as session: | |
stmt = select(Job).where(Job.employee_id == st.session_state["employee"].employee_id) | |
jobs = session.scalars(stmt).all() | |
if not jobs: | |
st.info("No jobs posted!") | |
return | |
job_id = st.selectbox("Filter by job_id",options=[job.job_id for job in jobs]) | |
with Session(ENGINE) as session: | |
stmt = select(JobsApplied).where(JobsApplied.job_id == job_id) | |
users_applied_for_jobs = session.scalars(stmt).all() | |
if not users_applied_for_jobs: | |
st.info('No users have applied for job') | |
return False | |
re_rank_resumes_container = st.empty() | |
with re_rank_resumes_container.container(): | |
st.markdown("""<p style="font-family:Garamond;text-indent: 45vh;font-size:25px">Jobs Application Details</p>""",unsafe_allow_html=True) | |
col1, col2 = st.columns([0.001, 0.01]) | |
with col1: | |
round_number = users_applied_for_jobs[0].round_number | |
st.markdown(f"""<p style="font-size:20px"><span style = "font-family:Garamond;">Round</span><span style="font-family:monospace;padding: 10px 20px">{round_number}</span></p>""", unsafe_allow_html=True) | |
with col2: | |
num_docs = st.number_input(label="Top Resumes",min_value=1, max_value=len(users_applied_for_jobs)) | |
for jobs_application in users_applied_for_jobs: | |
st.markdown(f"""<p style="border: 2px solid black;padding: 10px 20px; box-shadow: 10px 10px 5px #aaaaaa"><span style="font-family:Garamond;">User Email ID</span>: <span style="font-family:monospace">{jobs_application.email_id}</span><br><span style="font-family:Garamond">Current Rank: <span style="font-family:monospace">{jobs_application.rank}</span></p>""",unsafe_allow_html=True) | |
col1,col2, col3 = st.columns(3) | |
with col2: | |
if st.button("Rank Resumes"): | |
st.session_state['rank_resumes'] = True | |
#Ranking Resumes | |
if st.session_state['rank_resumes']: | |
re_rank_resumes_container.empty() | |
docs = get_docs(users_applied_for_jobs,job_id) | |
re_ranked = rerank(job_id,docs,num_docs) | |
candidates = [r.document['text'].split('-'*50)[0].strip() for r in re_ranked.results] | |
st.markdown(f"<p style='font-family:Garamond;text-indent: 45vh;font-size:25px'>Resumes re-ranked:</p>",unsafe_allow_html=True) | |
for candidate in candidates: | |
st.markdown(f"""<p style="border: 2px solid black;padding: 10px 20px; box-shadow: 10px 10px 5px #aaaaaa"><span style="font-family:Garamond;">User Email ID</span>: <span style="font-family:monospace">{candidate}</span></p>""",unsafe_allow_html=True) | |
col1,col2, col3 = st.columns(3) | |
with col2: | |
if st.button("Send Emails"): | |
st.session_state['send_resumes'] = True | |
if st.session_state['send_resumes']: | |
add_job_aplication_details(job_id,round_number,candidates) | |
def add_job_aplication_details(job_id,round_number,candidates): | |
def update(job_application): | |
print('----------------------------------------4') | |
job_application.round_number += 1 | |
job_application.rank = job_application.round_number | |
return job_application | |
with Session(ENGINE) as session: | |
stmt = select(JobsApplied).where(JobsApplied.job_id == job_id).where(JobsApplied.email_id.not_in(candidates)) | |
jobs = session.scalars(stmt).all() | |
for job in jobs: | |
session.delete(job) | |
session.commit() | |
print('deleted') | |
with Session(ENGINE) as session: | |
stmt = select(JobsApplied).where(JobsApplied.job_id == job_id).where(JobsApplied.email_id.in_(candidates)) | |
jobs = session.scalars(stmt).all() | |
for job in jobs: | |
update(job) | |
session.commit() | |
print('updated') | |
sending_emails(job_id, candidates) | |
def sending_emails(job_id, candidates): | |
email_message = EmailMessage() | |
email_message['From'] = st.session_state['employee'].email_id | |
email_message['To'] = candidates | |
email_message['Subject'] = f'You are selected for Job ID: {job_id}.' | |
email_message.set_content('Hello,\nYou are selected to attend the interview. Please login to your profile and attend.') | |
with smtplib.SMTP_SSL('smtp.gmail.com',465) as smtp: | |
# st.write(os.environ['EMAIL']) | |
smtp.login(st.session_state['employee'].email_id, os.environ['EMAIL']) | |
smtp.send_message(email_message) | |
st.success(f'⚠️Email Sent SUccessfully') | |
def get_docs(jobs_applications,job_id): | |
docs = [] | |
print(f'resumes/{job_id}') | |
resumes_dir = pathlib.Path(f'resumes/{job_id}') | |
for resume in resumes_dir.iterdir(): | |
print(resume) | |
parsed_text = parse(resume) | |
print(parsed_text) | |
print('---------------------------------------------------------------------------') | |
email_id = resume.stem.split('-')[0] | |
text = f"{email_id}"+'-'*50+parsed_text | |
docs.append(text) | |
return docs | |
print('Before settings') | |
# Setting Session states | |
if 'employee_logged' not in st.session_state: | |
st.session_state['employee_logged'] = False | |
if 'employee' not in st.session_state: | |
st.session_state['employee'] = None | |
if 'add_job' not in st.session_state: | |
st.session_state['add_job'] = False | |
if 'add_new_skill' not in st.session_state: | |
st.session_state['add_new_skill'] = False | |
if 'rank_resumes' not in st.session_state: | |
st.session_state['rank_resumes'] = False | |
if 'rank_resumes' not in st.session_state: | |
st.session_state['rank_resumes'] = False | |
if 'send_resumes' not in st.session_state: | |
st.session_state['send_resumes'] = False | |
print('script start') | |
#Login | |
if not st.session_state['employee_logged']: | |
employee = login() | |
if st.session_state['employee_logged']: | |
col1, col2 = st.columns(2) | |
with col2: | |
st.button('Log Out',on_click=clear_cache) | |
tab1, tab2, tab3 = st.tabs(["Post Jobs","Jobs Posted","Get Job Applied Details"]) | |
with tab1: | |
post_job() | |
with tab2: | |
get_jobs_posted() | |
with tab3: | |
get_users_who_applied_for_jobs() | |