Spaces:
Sleeping
Sleeping
import pathlib | |
import re | |
from typing import Union | |
import os | |
import ast | |
import streamlit as st | |
import sqlalchemy | |
from sqlalchemy.orm import Session | |
from sqlalchemy import select | |
import openai | |
from models import User, Job, JobsApplied, ENGINE | |
from llm import gpt | |
openai.api_key = os.environ['OPEN_API_KEY'] | |
def clear_cache(): | |
for k in st.session_state: | |
del st.session_state[k] | |
def add_user_to_db(): | |
col1, col2 = st.columns(2) | |
with col1: | |
first_name = st.text_input(label = "First Name", placeholder="First Name").title().strip() | |
with col2: | |
last_name = st.text_input(label = "Last Name", placeholder="last Name").title().strip() | |
email_id = st.text_input(label = "Email ID", placeholder = "Enter Email ID").strip() | |
new_password = st.text_input(label = "New Password",placeholder="New Password", type = "password").strip() | |
confirm_password = st.text_input(label = "Password",placeholder="New Password", type = "password").strip() | |
col1, col2, col3 = st.columns(3) | |
with col3: | |
if st.button("Back", key="Mistake"): | |
clear_cache() | |
with col1: | |
if st.button("Create Profile"): | |
if new_password != confirm_password: | |
st.error("Passwords do not match") | |
return | |
if not(first_name and last_name and email_id and new_password): | |
st.error("No text fields must be blank!") | |
return | |
if re.search('^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$',email_id) is None: | |
st.error("Invalid Email ID") | |
return | |
with Session(ENGINE) as session: | |
stmt = select(User).where(User.email_id == email_id) | |
result = session.scalars(stmt).one_or_none() | |
if result is not None: | |
st.error("User already exists!") | |
else: | |
user_object = User(email_id = email_id, password = new_password, | |
first_name = first_name, last_name = last_name) | |
session.add_all([user_object]) | |
session.commit() | |
st.success("Successfully Created!") | |
clear_cache() | |
return | |
def login() -> None: | |
login_container = st.empty() | |
with login_container.container(): | |
email_id = st.text_input(label=":email: Email ID", placeholder = "Email ID").lower().strip() | |
password = st.text_input(label = ":lock: Password", placeholder = "Password", type = "password") | |
col1, col2,col3,col4, = st.columns(4) | |
with col4: | |
if st.button("Sign Up"): | |
st.session_state['sign_up_clicked'] = True | |
with col1: | |
sign_in = st.button("Sign In") | |
if sign_in: | |
with Session(ENGINE) as session: | |
stmt = select(User).where(User.email_id == email_id).where(User.password == password) | |
user = session.scalars(stmt).one_or_none() | |
if user is None: | |
st.error("Invalid email ID/password") | |
else: | |
st.session_state['user_logged'] = True | |
st.session_state['user'] = user | |
if st.session_state['sign_up_clicked']: | |
login_container.empty() | |
add_user_to_db() | |
return | |
if st.session_state['user'] is not None: | |
login_container.empty() | |
def add_to_db(**job_applied): | |
with Session(ENGINE) as session: | |
obj = JobsApplied(**job_applied) | |
session.add_all([obj]) | |
session.commit() | |
def apply_for_jobs(): | |
with Session(ENGINE) as session: | |
stmt = select(Job) | |
jobs = session.scalars(stmt).all() | |
job_container = st.empty() | |
with job_container.container(): | |
col1, col2 = st.columns(2) | |
with col1: | |
job_id = st.selectbox(label="Job ID", options = [job.job_id for job in jobs]) | |
with col2: | |
post_name = st.text_input(label="Post",value = [job.post_name for job in jobs if job.job_id == job_id][-1], disabled=True) | |
experience = st.number_input(label = 'Expereince',min_value=0) | |
col1, col2 = st.columns(2) | |
with col1: | |
primary_skills = st.text_input(label="Primary Skills",placeholder="Primary Skills", help="Input your skills delimited by a comma").lower().strip() | |
with col2: | |
secondary_skills = st.text_input(label="Secondary Skills",placeholder="Secondary Skills", help="Input your skills delimited by a comma").lower().strip() | |
uploded_file = st.file_uploader(label = "Resume Upload", type = ["pdf","docx"]) | |
col1, col2 = st.columns(2) | |
with col2: | |
if st.button("Apply"): | |
st.session_state['applied_for_job'] = True | |
if st.session_state['applied_for_job']: | |
if uploded_file is None: | |
st.error("No Resume Uploaded") | |
return False | |
with Session(ENGINE) as session: | |
stmt = select(JobsApplied).where(JobsApplied.email_id == st.session_state['user'].email_id) | |
res = session.scalars(stmt).one_or_none() | |
if res is not None: | |
st.info("You have already applied for job") | |
return False | |
stmt = select(Job).where(Job.job_id == job_id) | |
selected_job = session.scalars(stmt).one() | |
if st.session_state['applied_for_job'] and not(selected_job.min_experience <= experience <= selected_job.max_experience): | |
return False | |
resumes_dir = pathlib.Path(f'resumes/{job_id}') | |
resumes_dir.mkdir(exist_ok = True) | |
resume = resumes_dir / f"{st.session_state['user'].email_id} - {st.session_state['user'].full_name}" | |
resume.touch() | |
resume.write_bytes(uploded_file.read()) | |
add_to_db(**{"email_id": st.session_state["user"].email_id, | |
"job_id": job_id, 'experience': experience, | |
'primary_skills': primary_skills, | |
'secondary_skills':secondary_skills, | |
'round_number': 0, 'rank': -1}) | |
return True | |
def jobs_applied(): | |
with Session(ENGINE) as session: | |
stmt = select(JobsApplied).where(JobsApplied.email_id == st.session_state['user'].email_id) | |
jobs_applied = session.scalars(stmt).all() | |
if jobs_applied: | |
for job_applied in jobs_applied: | |
with st.expander(f'Job ID: {job_applied.job_id} Round Number: {job_applied.round_number}'): | |
if job_applied.round_number > 0: | |
temporary = st.empty() | |
with temporary.container(): | |
get_interview_questions(job_applied.job_id) | |
temporary.empty() | |
st.markdown(f'<p style="font-family:Garamond">You will be notified about the result</p>',unsafe_allow_html=True) | |
if not(st.session_state['rank']) >= 7: | |
with Session(ENGINE) as session: | |
session.delete(job_applied) | |
def get_interview_questions(job_id): | |
with Session(ENGINE) as session: | |
stmt = select(Job).where(Job.job_id == job_id.strip()) | |
job = session.scalars(stmt).one_or_none() | |
if job is not None: | |
lst = gpt(f"Generate a set of 10 multiple-choice questions (MCQs) based on the subject of {job.post_name} with experience between {job.min_experience} - {job.max_experience} years.Return it as a list of dictionaries with question as key and optionaand correct answer as kets") | |
st.write(f"{lst!r}") | |
mcqs = ast.literal_eval(lst.strip()) | |
rank = 0 | |
for index, mcq in enumerate(mcqs,start=1): | |
question = mcq["question"] if 'question' in mcq.keys() else mcq["Question"] | |
options = mcq["options"] if 'options' in mcq.keys() else mcq["Options"] | |
correct_answer = mcq["correct_answer"] if 'correct_answer' in mcq.keys() else mcq["Correct Answer"] | |
answer = st.radio(f'Q{index} {question}',key=mcq,options=answer) | |
if answer.strip() == correct_answer.strip(): | |
rank += 1 | |
if st.button('Submit'): | |
st.session_state['rank'] = rank | |
return st.session_state['rank'] | |
if 'sign_up_clicked' not in st.session_state: | |
st.session_state['sign_up_clicked'] = False | |
if 'user_logged' not in st.session_state: | |
st.session_state['user_logged'] = False | |
if 'user' not in st.session_state: | |
st.session_state['user'] = None | |
if 'applied_for_job' not in st.session_state: | |
st.session_state['applied_for_job'] = False | |
if 'rank' not in st.session_state: | |
st.session_state['rank'] = False | |
login() | |
if st.session_state['user_logged']: | |
col1, col2 = st.columns(2) | |
with col2: | |
st.button('Log Out',on_click=clear_cache) | |
tab1, tab2= st.tabs(["Apply Jobs","Place Holder"]) | |
with tab1: | |
apply_for_jobs() | |
with tab2: | |
jobs_applied() |