import json from typing import Dict from db.schema import Feedback, Response from db.crud import ingest, read import pandas as pd import streamlit as st from datetime import datetime import os from dotenv import load_dotenv load_dotenv() VALIDATION_CODE = os.getenv("VALIDATION_CODE") class SurveyState: """Class to handle survey state management""" def __init__(self): pass def save_state(self, prolific_id: str, current_state: Dict) -> None: """Save current state to Firebase""" try: # Saving to Firebase via the ingest function feedback = Feedback(user_id=prolific_id, time_stamp=datetime.now().isoformat(), responses=current_state["responses"]) ingest(feedback) # Ingest feedback to Firebase st.success("Your progress has been saved! You can continue later.") except Exception as e: st.error(f"Error saving state to Firebase: {e}") def load_state(self, prolific_id: str) -> Dict: """Load state for a specific user from Firebase""" try: # Retrieve the state from Firebase for the given Prolific ID state = self.get_state_from_firebase(prolific_id) if state: return state except Exception as e: st.error(f"Error loading state from Firebase: {e}") return {} class SurveyState: """Class to handle survey state management""" def __init__(self): pass @staticmethod def save_state(self, username: str, current_state: Dict) -> None: """Save current state to Firebase""" try: # Saving to Firebase via the ingest function feedback = Feedback(user_id=username, time_stamp=datetime.now().isoformat(), responses=current_state["responses"]) ingest(feedback) # Ingest feedback to Firebase st.success("Your progress has been saved! You can continue later.") except Exception as e: st.error(f"Error saving state to Firebase: {e}") def load_state(self, username: str) -> Dict: """Load state for a specific user from Firebase""" try: # Retrieve the state from Firebase for the given Prolific ID state = self.get_state_from_firebase(username) if state: return state except Exception as e: st.error(f"Error loading state from Firebase: {e}") return {} def get_state_from_firebase(self, username: str) -> Dict: # Placeholder method to simulate getting state from Firebase # In practice, you will query Firebase to get the current state data = read(username) return {} def initialization(): """Initialize session state variables.""" if "current_index" not in st.session_state: st.session_state.current_index = 0 if "username" not in st.session_state: st.session_state.username = None if "responses" not in st.session_state: st.session_state.responses = [] if "completed" not in st.session_state: st.session_state.completed = False if "survey_state" not in st.session_state: st.session_state.survey_state = SurveyState() def validate_username(username: str) -> bool: return bool(username.strip()) def validate_code(input_code: str) -> bool: """Validate the entered code against the hardcoded validation code""" return input_code.strip() == VALIDATION_CODE def username_screen(): """Display the Prolific ID entry screen.""" st.title("Welcome to the Feedback Survey") username_input = st.text_input("Enter your First name and press TAB:") validation_code_input = st.text_input("Enter the validation code to proceed and press ENTER:") next_button = st.button("Next") if (username_input and validation_code_input) or next_button: # Validate Prolific ID and code if validate_username(username_input) and validate_code(validation_code_input): st.session_state.username = username_input # Load previous state if exists saved_state = st.session_state.survey_state.load_state(username_input) if saved_state: st.session_state.current_index = saved_state.get('current_index', 0) st.session_state.responses = saved_state.get('responses', []) st.success("Previous progress loaded! Continuing from where you left off.") else: st.success("Prolific ID and code validated! Starting new survey.") st.rerun() else: if not validate_username(username_input): st.warning("Invalid Prolific ID. Please check and try again.") if not validate_code(validation_code_input): st.warning("Invalid validation code. Please check and try again.") def questions_screen(data): """Display the questions screen.""" current_index = st.session_state.current_index config = data.iloc[current_index] # Progress bar progress = (current_index + 1) / len(data) st.progress(progress) st.write(f"Question {current_index + 1} of {len(data)}") st.subheader(f"Config ID: {config['config_id']}") # Create tabs for better organization context_tab, questions_tab = st.tabs(["Context", "Questions"]) with context_tab: st.write(f"**Persona:** {config['persona']}") st.write(f"**Filters:** {config['filters']}") st.write(f"**Cities:** {config['city']}") # Expandable context with st.expander("Show Context", expanded=False): st.write(config['context']) options = [0, 1, 2, 3, 4, 5] with questions_tab: st.write(f"**Query_v:** {config['query_v']}") rating_v = st.radio("Rate this config:", options, key=f"rating_v_{current_index}") st.write(f"**Query_p0:** {config['query_p0']}") rating_p0 = st.radio("Rate this config:", options, key=f"rating_p0_{current_index}") st.write(f"**Query_p1:** {config['query_p1']}") rating_p1 = st.radio("Rate this config:", options, key=f"rating_p1_{current_index}") comment = st.text_area("Comments (Optional):") response = Response(config_id=config["config_id"], rating_v=rating_v, rating_p0=rating_p0, rating_p1=rating_p1, comment=comment, timestamp=datetime.now().isoformat()) print(response) if len(st.session_state.responses) > current_index: st.session_state.responses[current_index] = response else: st.session_state.responses.append(response) navigation_buttons(data, rating_v, rating_p0, rating_p1) def navigation_buttons(data, rating_v, rating_p0, rating_p1): """Display navigation buttons.""" current_index = st.session_state.current_index col1, col2, col3 = st.columns([1, 1, 2]) with col1: # Back button if st.button("Back") and current_index > 0: st.session_state.current_index -= 1 st.rerun() with col2: # Next button if st.button("Next"): if rating_v == 0 or rating_p0 == 0 or rating_p1 == 0: st.warning("Please provide a rating before proceeding.") else: if current_index < len(data) - 1: st.session_state.current_index += 1 st.rerun() else: feedback = Feedback( id=current_index + 1, user_id=st.session_state.username, time_stamp=datetime.now().isoformat(), responses=st.session_state.responses, ) try: ingest(feedback) st.session_state.completed = True st.rerun() except Exception as e: st.error(f"An error occurred while submitting feedback: {e}") def exit_screen(): """Display exit screen""" st.markdown("""

Thank you for participating!

Your responses have been saved successfully.

You can safely close this window or start a new survey.

""", unsafe_allow_html=True) if st.button("Start New Survey"): reset_survey() st.rerun() def submit_survey(): """Submit the complete survey""" try: feedback = Feedback( id=st.session_state.current_index, user_id=st.session_state.username, time_stamp=datetime.now().isoformat(), responses=st.session_state.responses, ) print(feedback) ingest(feedback) st.session_state.completed = True st.rerun() except Exception as e: st.error(f"An error occurred while submitting feedback: {e}") def reset_survey(): """Reset the survey state to start over.""" st.session_state.username = None st.session_state.current_index = 0 st.session_state.responses = [] st.session_state.completed = False def ui(): """Main function to control the survey flow.""" data = pd.read_csv("data/gemini_results_subset.csv")[:3] initialization() if st.session_state.completed: # st.title("Survey Completed") # st.success("Thank you! Your survey has been submitted successfully.") # if st.button("Start New Survey"): # reset_survey() # st.rerun() exit_screen() return if st.session_state.username is None: username_screen() else: questions_screen(data) if __name__ == "__main__": ui()