Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
import random | |
import time | |
import string | |
import gspread | |
import os | |
import json | |
import datetime | |
from oauth2client.service_account import ServiceAccountCredentials | |
# Load worker-specific stimuli | |
def get_google_creds(): | |
service_account_json = os.getenv("SERVICE_ACCOUNT_JSON") | |
if service_account_json: | |
creds_dict = json.loads(service_account_json) | |
scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"] | |
creds = ServiceAccountCredentials.from_json_keyfile_dict(creds_dict, scope) | |
return gspread.authorize(creds) | |
else: | |
st.error("Google service account credentials not found.") | |
return None | |
def get_next_worker_id(): | |
client = get_google_creds() | |
if client is None: | |
return None | |
try: | |
# sheet_name = "Odd-One-Out Experiment Responses" | |
sheet_name = "Odd-one-out results v2.0" | |
sheet = client.open(sheet_name).sheet1 | |
existing_worker_ids = sheet.col_values(1)[1:] # Get worker IDs (skip header) | |
assigned_workers = set(map(int, existing_worker_ids)) | |
for i in range(1, 21): # Assign worker ID between 1 and 20 | |
if i not in assigned_workers: | |
return i | |
return None # No available worker slots | |
except Exception as e: | |
st.error(f"Error retrieving worker ID: {str(e)}") | |
return None | |
# Load worker-specific stimuli | |
def load_worker_data(worker_id): | |
file_path = os.path.join("worker_stimuli", f"worker_{worker_id:02d}_stimuli.csv") | |
if os.path.exists(file_path): | |
df = pd.read_csv(file_path) | |
return df.dropna().reset_index(drop=True) | |
else: | |
return None | |
# Automatically assign worker ID | |
if 'worker_id' not in st.session_state: | |
# assigned_worker_id = get_next_worker_id() | |
assigned_worker_id = 4 | |
if assigned_worker_id is None: | |
st.error("No available worker slots. Please try again later.") | |
st.stop() | |
st.session_state.worker_id = assigned_worker_id | |
st.session_state.df = load_worker_data(st.session_state.worker_id) | |
st.session_state.step = "instructions" | |
st.rerun() | |
# Load main experiment data assigned to individual worker | |
df = st.session_state.df | |
if df is None: | |
st.error("No stimuli available for this worker.") | |
st.stop() | |
# Function to generate a unique passcode | |
def generate_passcode(): | |
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) | |
# Function to upload responses to Google Drive | |
def upload_to_google_drive(response_df): | |
try: | |
client = get_google_creds() | |
# sheet_name = "Odd-One-Out Experiment Responses" | |
sheet_name = "Odd-one-out results v2.0" | |
try: | |
sheet = client.open(sheet_name).sheet1 | |
except gspread.exceptions.SpreadsheetNotFound: | |
sheet = client.create(sheet_name).sheet1 | |
sheet.append_row(["worker_id", "passcode", "question", "keyword", "selected", "correct_answer", "is_correct", "response_time"]) | |
data_list = response_df.values.tolist() | |
for row in data_list: | |
sheet.append_row(row) | |
st.success("โ Your responses have been recorded successfully.") | |
except Exception as e: | |
st.error(f"Error uploading to Google Drive: {str(e)}") | |
# Load training samples | |
def load_training_samples(): | |
file_path = "training_samples.csv" | |
if os.path.exists(file_path): | |
return pd.read_csv(file_path).dropna().reset_index(drop=True) | |
else: | |
st.error("Training samples file not found.") | |
return pd.DataFrame() | |
# Load training data | |
training_df = load_training_samples() | |
# Initialize session state variables | |
if "step" not in st.session_state: | |
st.session_state.step = "instructions" | |
if "training_index" not in st.session_state: | |
st.session_state.training_index = 0 | |
if "training_complete" not in st.session_state: | |
st.session_state.training_complete = False | |
if "experiment_index" not in st.session_state: | |
st.session_state.experiment_index = 0 | |
if "experiment_complete" not in st.session_state: | |
st.session_state.experiment_complete = False | |
if "responses" not in st.session_state: | |
st.session_state.responses = [] | |
if "start_time" not in st.session_state: | |
st.session_state.start_time = None | |
if "passcode" not in st.session_state: | |
st.session_state.passcode = None | |
if "show_answer" not in st.session_state: | |
st.session_state.show_answer = False | |
# Increase font size for instructions and multiple-choice options | |
st.markdown(""" | |
<style> | |
.stApp { | |
font-size: 17px !important; | |
} | |
.correct-answer { | |
color: green; | |
} | |
.highlight-red { | |
color: #D9534F; | |
font-weight: bold; | |
} | |
/* Increase font size for radio button labels */ | |
div[data-testid="stRadio"] label { | |
font-size: 17px !important; | |
} | |
/* Increase font size for general text */ | |
div[data-testid="stMarkdownContainer"] { | |
font-size: 17px !important; | |
} | |
div[data-testid="stVerticalBlock"] p { | |
font-size: 17px !important; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
st.title("Scene Identification Experiment") | |
# **STEP 1: Instructions Page** | |
if st.session_state.step == "instructions": | |
st.header("๐ Instructions") | |
st.write(""" | |
Welcome to the experiment! Hereโs how it works: | |
- You will be presented with five short text passages excerpted from **fictional stories**. | |
- The passages describe scenes related to a **specific keyword**. | |
- Four of these passages describe **a similar type of scene**, while **one is different**. | |
- Your task is to identify the passage that describes <span style='color:#D9534F; font-weight:bold;'>a scene distinct from the rest</span>. | |
- Please try to answer as quickly and accurately as possible. | |
""", unsafe_allow_html=True) | |
if st.button("Start Practicing"): | |
st.session_state.step = "training" | |
st.rerun() | |
# **STEP 2: Training Phase** | |
elif st.session_state.step == "training": | |
if st.session_state.training_index >= len(training_df): | |
st.session_state.step = "training_complete" | |
st.rerun() | |
else: | |
row = training_df.iloc[st.session_state.training_index] | |
st.write(f"### Practice Sample {st.session_state.training_index + 1}/{len(training_df)}") | |
st.write(f"**Keyword: {row['keyword']}**") | |
options = [row['text_1'], row['text_2'], row['text_3'], row['text_4'], row['text_5']] | |
correct_answer = options[row['odd_index'] - 1] # Get the correct answer using odd_index | |
user_choice = st.radio("Which scene is distinct from the others?", options, | |
key=f"training_{st.session_state.training_index}", index=None) | |
if st.button("Submit Answer"): | |
if user_choice is None: | |
st.warning("โ ๏ธ Please select an option first!") | |
else: | |
st.session_state.show_answer = True | |
st.rerun() | |
if st.session_state.show_answer: | |
st.write("### โ The correct answer was:") | |
st.markdown(f"<p class='correct-answer'>{correct_answer}</p>", unsafe_allow_html=True) | |
if st.button("Next Practice Sample"): | |
st.session_state.show_answer = False | |
st.session_state.training_index += 1 | |
st.rerun() | |
# **STEP 3: Training Complete Page** | |
elif st.session_state.step == "training_complete": | |
st.header("๐ Practice Complete!") | |
# st.write(""" | |
# You have completed the practice phase! Now, you will proceed to the main experiment. | |
# Please remember to select the passage that describes <span style='color:#D9534F; font-weight:bold;'>a distinct scene from the rest</span>. | |
# Click **"Start Experiment"** when you are ready. Your response time will begin after you proceed. | |
# """) | |
st.markdown(""" | |
You have completed the practice phase! Now, you will proceed to the main experiment. | |
Please remember to select the passage that describes <span style='color:#D9534F; font-weight:bold;'>a distinct scene from the rest</span>. | |
Click **"Start Experiment"** when you are ready. Your response time will begin after you proceed. | |
""", unsafe_allow_html=True) | |
if st.button("Start Experiment"): | |
st.session_state.start_time = time.time() | |
st.session_state.step = "experiment" | |
st.rerun() | |
# Ensure 'Submit Answer' button is hidden after experiment completion | |
if 'experiment_complete' not in st.session_state: | |
st.session_state.experiment_complete = False | |
# **STEP 4: Main Experiment** | |
elif st.session_state.step == "experiment": | |
if st.session_state.experiment_index >= len(df): | |
st.success("โ You've completed the experiment!") | |
st.session_state.experiment_complete = True | |
if "passcode" not in st.session_state or st.session_state.passcode is None: | |
st.session_state.passcode = generate_passcode() | |
st.write("### ๐ Your unique completion passcode:") | |
st.code(st.session_state.passcode, language="text") | |
response_df = pd.DataFrame(st.session_state.responses) | |
response_df["worker_id"] = st.session_state.worker_id | |
response_df["passcode"] = st.session_state.passcode | |
upload_to_google_drive(response_df) | |
else: | |
row = df.iloc[st.session_state.experiment_index] | |
st.write(f"### Question {st.session_state.experiment_index + 1}/{len(df)}") | |
st.write(f"**Keyword: {row['keyword']}**") | |
user_choice = st.radio("Which scene is distinct from the others?", [ | |
row['text_1'], row['text_2'], row['text_3'], row['text_4'], row['text_5'] | |
], key=st.session_state.experiment_index, index=None) | |
options = [row['text_1'], row['text_2'], row['text_3'], row['text_4'], row['text_5']] | |
correct_answer = options[row['odd_index'] - 1] # Get the correct answer using odd_index | |
# if not st.session_state.experiment_complete: | |
if not st.session_state.experiment_complete and st.session_state.experiment_index < len(df): | |
if st.button("Submit Answer"): | |
if user_choice is None: | |
st.warning("โ ๏ธ Please select an option first!") | |
else: | |
response_time = time.time() - st.session_state.start_time | |
st.session_state.responses.append({ | |
'timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
'worker_id': st.session_state.worker_id, | |
'passcode': st.session_state.passcode if st.session_state.passcode else "TEMP", | |
'question': st.session_state.experiment_index + 1, | |
'keyword': row['keyword'], | |
'selected': user_choice, | |
'correct_answer': correct_answer, | |
'is_correct': user_choice == correct_answer, | |
'response_time': round(response_time, 2) | |
}) | |
st.session_state.experiment_index += 1 | |
st.session_state.start_time = time.time() | |
st.rerun() | |