Spaces:
Sleeping
Sleeping
import streamlit as st | |
from dotenv import load_dotenv | |
from azure.cosmos import CosmosClient | |
from typing import List | |
import os | |
import requests | |
import json | |
import re | |
load_dotenv() | |
def get_db_items(offset: int, database_name: str, container_name: str, num_items: int = 10) -> List[dict]: | |
try: | |
client: CosmosClient = st.session_state["db_items"] | |
database = client.get_database_client(database_name) | |
container = database.get_container_client(container_name) | |
query = f"SELECT * FROM c ORDER BY c._ts DESC OFFSET {offset} LIMIT {num_items}" | |
items = list(container.query_items(query=query, enable_cross_partition_query=True)) | |
return items | |
except Exception as e: | |
print(f"Fehler beim Abrufen der Daten aus der Cosmos DB: {e}") | |
st.error("Someting went wrong. Please try again later.", icon="π¨") | |
return [] | |
def update_db_item(item: dict, database_name: str, container_name: str): | |
try: | |
database = st.session_state["db_items"].get_database_client(database_name) | |
container = database.get_container_client(container_name) | |
container.upsert_item(item) | |
except Exception as e: | |
print(f"Fehler beim Updaten des Items: {e}") | |
st.error("Someting went wrong. Please try again later.", icon="π¨") | |
def get_summary(input_params) -> dict: | |
try: | |
url = f"{os.environ.get('BASE_API')}/api/create-interview-summary" | |
response = requests.post(url=url,data=input_params) | |
response.raise_for_status() | |
return response.json() | |
except Exception as e: | |
print(f"Fehler beim erstellen des Termins: {str(e)}") | |
#Mache abfrage auf den HTTP Status Code | |
if response.status_code == 500: | |
st.info("There is currently no recording available from Zoom. Please try again in a few minutes.", icon="βΉοΈ") | |
else: | |
st.error(f"Something went wrong: {str(e)}", icon="π¨") | |
def change_meeting_items(): | |
st.session_state["detailed_view"] = False | |
st.session_state["selected_item"] = None | |
if st.session_state["meeting_type"] == "Candidate interview": | |
st.session_state["items_to_show"] = st.session_state["interview_items"] | |
else: | |
st.session_state["items_to_show"] = st.session_state["assessment_items"] | |
def back_to_overview(): | |
st.session_state["detailed_view"] = False | |
st.session_state["selected_item"] = None | |
st.session_state["created_question_evaluations"] = [] | |
def summary_button_clicked(interview_object: dict): | |
st.session_state["detailed_view"] = True | |
st.session_state["selected_item"] = interview_object | |
def update_assessment_db_item(): | |
try: | |
client: CosmosClient = st.session_state["db_items"] | |
database = client.get_database_client("assessment-database") | |
container = database.get_container_client("assessments") | |
assessment_item = st.session_state["selected_item"] | |
assessment_item["process_status"] = "evaluations_created" | |
for i, question in enumerate(assessment_item["questions"]): | |
question["score"] = st.session_state[f"rating_question_{i}"] | |
question["evaluation"] = st.session_state[f"evaluation_question_{i}"] | |
for i, tasks in enumerate(assessment_item["coding_tasks"]): | |
tasks["score"] = st.session_state[f"rating_coding_task_{i}"] | |
tasks["evaluation"] = st.session_state[f"evaluation_coding_task_{i}"] | |
container.upsert_item(assessment_item) | |
except Exception as e: | |
print(f"Fehler beim Updaten des Items: {e}") | |
st.error("Someting went wrong. Please try again later.", icon="π¨") | |
def remove_html_tags(input_string: str): | |
clean = re.compile('<.*?>') | |
return re.sub(clean, '', input_string) | |
if "db_items" not in st.session_state: | |
endpoint = os.getenv("COSMOS_DB_ENDPOINT") | |
key = os.getenv("COSMOS_DB_KEY") | |
client = CosmosClient(endpoint, key) | |
st.session_state["db_items"] = client | |
if "interview_items" not in st.session_state: | |
initial_items = get_db_items(0, "appointment-database", "appointments", 50) | |
st.session_state["interview_items"] = initial_items | |
if "assessment_items" not in st.session_state: | |
initial_items = get_db_items(0, "assessment-database", "assessments", 50) | |
st.session_state["assessment_items"] = initial_items | |
if "items_to_show" not in st.session_state: | |
st.session_state["items_to_show"] = st.session_state["interview_items"] | |
if "detailed_view" not in st.session_state: | |
st.session_state["detailed_view"] = False | |
if "selected_item" not in st.session_state: | |
st.session_state["selected_item"] = None | |
col1, col2 = st.columns([2, 1]) | |
col1.title("Meeting Manager") | |
col2.image("https://www.workgenius.com/wp-content/uploads/2023/03/WorkGenius_navy-1.svg") | |
st.radio("Select a meeting type", ("Candidate interview", "Peer to peer assessment"), key="meeting_type", on_change=change_meeting_items) | |
st.divider() | |
if st.session_state["detailed_view"]: | |
if st.session_state["meeting_type"] == "Candidate interview": | |
st.subheader("Summary for meeting: "+st.session_state["selected_item"]["job_title"]) | |
print(isinstance(st.session_state["selected_item"]["summary_recruiter"],str)) | |
if isinstance(st.session_state["selected_item"]["summary_recruiter"],str): | |
with st.spinner("Creating summary..."): | |
input_params = { | |
"meeting_id": st.session_state["selected_item"]["zoom_meeting_id"], | |
} | |
summary = get_summary(json.dumps(input_params)) | |
copied_db_item = st.session_state["selected_item"].copy() | |
if summary: | |
if st.session_state["meeting_type"] == "Candidate interview": | |
#Operations for candidate interviews | |
copied_db_item["process_status"] = "summaries_created" | |
copied_db_item["summary_recruiter"] = summary | |
update_db_item(copied_db_item, "appointment-database", "appointments") | |
else: | |
#TODO: Operations for peer to peer assessments | |
print("To implement") | |
st.write(summary) | |
else: | |
st.write(st.session_state["selected_item"]["summary_recruiter"]) | |
st.button("Back to overview", on_click=back_to_overview) | |
else: | |
st.subheader("Evaluation for meeting: "+st.session_state["selected_item"]["assessment_title"]) | |
with st.expander("Interview transcript"): | |
st.write(st.session_state["selected_item"]["interview_transcript"]) | |
for i, question in enumerate(st.session_state["selected_item"]["questions"]): | |
with st.expander(f"Question {i+1}: {remove_html_tags(question['question'])}"): | |
st.number_input("Score (1-10)", key=f"rating_question_{i}", min_value=1, max_value=10) | |
st.text_area("Your evaluation:", key=f"evaluation_question_{i}") | |
st.write("AI based score: "+str(question["score"])) | |
st.write("AI based evalutaion: "+question["evaluation"]) | |
for i, tasks in enumerate(st.session_state["selected_item"]["coding_tasks"]): | |
with st.expander(f"Coding task {i+1}: {tasks['coding_task']}"): | |
st.number_input("Score (1-10)", key=f"rating_coding_task_{i}", min_value=1, max_value=10) | |
st.text_area("Your evaluation:", key=f"evaluation_coding_task_{i}") | |
st.write("AI based score: "+str(tasks["score"])) | |
st.write("AI based evalutaion: "+tasks["evaluation"]) | |
col_evaluations = st.columns([1,2,1]) | |
with col_evaluations[0]: | |
st.button("Back to overview", on_click=back_to_overview, use_container_width=True) | |
with col_evaluations[2]: | |
st.button("Save evaluation", on_click=update_assessment_db_item, use_container_width=True) | |
else: | |
st.subheader("Candidate interview:" if st.session_state["meeting_type"] == "Candidate interview" else "Peer to peer assessments:") | |
# st.write(st.session_state["interview_items"]) | |
meeting_title = st.columns([6,2]) | |
meeting_title[0].write("**Meeting title:**") | |
for i, interview in enumerate(st.session_state["items_to_show"]): | |
cols = st.columns([6,2]) | |
if st.session_state["meeting_type"] == "Candidate interview": | |
cols[0].write(interview["job_title"]) | |
cols[1].button("Summary", key=f"button_{i}", on_click=summary_button_clicked, args=(interview,)) | |
else: | |
cols[0].write(interview["assessment_title"]) | |
cols[1].button("Evaluation", key=f"button_{i}", on_click=summary_button_clicked, args=(interview,)) |