import streamlit as st import pandas as pd from huggingface_hub import HfApi, hf_hub_download # Constants HF_REPO = "giobin/MAIA_human_assessment_annotations" CSV_FILENAME = "user_selections.csv" # Function to assign samples to users def assign_samples(csv_path): df = pd.read_csv(csv_path) group_1 = df[(df["pool_pos"] == 1) & (~df["question_category"].str.endswith("_B"))].iloc[100:150] # select 50 sampels from the 100th group_2 = df[(df["pool_pos"] == 2) & (~df["question_category"].str.endswith("_B"))].iloc[100:150] group_3 = df[(df["pool_pos"] == 3) & (~df["question_category"].str.endswith("_B"))].iloc[100:150] return { "Bernardo": group_1, "Alessandro": group_1, "Alessio": group_1, "Lenci": group_2, "Lucia": group_2, "Davide": group_2, "Giovanni": group_3, "Raffaella": group_3, } # Function to load existing annotations from Hugging Face Hub def load_existing_annotations(): try: file_path = hf_hub_download(HF_REPO, CSV_FILENAME, repo_type="dataset", token=st.secrets["HF_TOKEN"]) return pd.read_csv(file_path) except Exception: return pd.DataFrame(columns=["username", "id"]) # Load datasets csv_file = "static/mc.csv" assignments = assign_samples(csv_file) existing_annotations = load_existing_annotations() # Valid users valid_users = list(assignments.keys()) # Initialize session state variables if "username" not in st.session_state: st.session_state.username = None if "index" not in st.session_state: st.session_state.index = 0 if "results" not in st.session_state: st.session_state.results = [] # User selectiontion def update_name(): """Set username and reset index.""" st.session_state.username = st.session_state.selected_user st.session_state.index = 0 # Reset progress if st.session_state.username is None: with st.form("user_form"): st.write("### Seleziona il tuo nome") selected_user = st.selectbox("Nome:", valid_users, key="selected_user") submit_button = st.form_submit_button("Inizia il task", on_click=update_name) st.stop() # Retrieve assigned dataset and filter out already labeled samples full_dataset = assignments[st.session_state.username].reset_index(drop=True) user_labeled_ids = existing_annotations[existing_annotations["username"] == st.session_state.username]["id"].tolist() dataset = full_dataset[~full_dataset["id"].isin(user_labeled_ids)].reset_index(drop=True) # Check if all samples are labeled if st.session_state.index >= len(dataset): st.write("### Ottimo. Hai completato il tuo task! 🎉") st.stop() # Function to push updated annotations to Hugging Face Hub def push_to_hf_hub(csv_path): api = HfApi() try: api.create_repo(HF_REPO, repo_type="dataset", exist_ok=True, token=st.secrets["HF_TOKEN"]) api.upload_file(path_or_fileobj=csv_path, path_in_repo=CSV_FILENAME, repo_id=HF_REPO, repo_type="dataset", token=st.secrets["HF_TOKEN"]) print(f"Dataset updated: https://huggingface.co/datasets/{HF_REPO}") except Exception as e: print(f"Error pushing to HF: {e}") # Function to save user choice def save_choice(): sample = dataset.iloc[st.session_state.index] selected_answer = st.session_state.get("selected_answer", None) not_enough_info = st.session_state.get("not_enough_info", False) if selected_answer is not None: st.session_state.results.append({ "username": st.session_state.username, "id": sample["id"], "video_id": sample["video_id"], "answer1": sample["answer1"], "answer2": sample["answer2"], "selected_answer": selected_answer, "target": sample["target"], "not_enough_info": not_enough_info }) st.session_state.index += 1 # Save results and push to Hugging Face Hub if all samples are labeled if st.session_state.index >= len(dataset): st.write("### Ottimo. Hai completato il tuo task! 🎉") result_df = pd.DataFrame(st.session_state.results) csv_path = "user_selections.csv" if not existing_annotations.empty: result_df = pd.concat([existing_annotations, result_df]).drop_duplicates(subset=["username", "id"], keep="last") result_df.to_csv(csv_path, index=False) push_to_hf_hub(csv_path) st.stop() # Display current sample sample = dataset.iloc[st.session_state.index] # Page title and user information st.markdown("

MAIA Sample

", unsafe_allow_html=True) st.markdown(f"

User: {st.session_state.username}

", unsafe_allow_html=True) st.write("\n\n") # Instructions st.markdown(""" ### Istruzioni: - Osserva attentamente il frame del video (senza premere play) e cerca di capire il contesto della scena - Valuta le opzioni (A e B) e seleziona quella che ritieni più attinente al video. Per selezionare l'alternativa usa l'immagine, la tua conoscenza e/o qualsiasi ragionamento utile. - Se il frame non contiene sufficienti informazioni per decidere l’alternativa appena selezionata, seleziona il checkbox sottostante. - Clicca 'Continua' per procedere. """) st.write("---") def convert_youtube_shorts_url(url): """Convert a YouTube Shorts URL to a standard YouTube video URL.""" if "youtube.com/shorts/" in url: video_id = url.split("/")[-1].split("?")[0] # Extract the video ID return f"https://www.youtube.com/watch?v={video_id}" return url fixed_url = convert_youtube_shorts_url(sample["video_url"]) st.video(fixed_url) # Display video thumbnail # st.video(sample["video_url"]) # Form for user input with st.form("annotation_form"): # Exclusive choice between A and B selected_answer = st.radio( "Seleziona la descrizione corretta:", options=[0, 1], index=None, format_func=lambda x: f"A: {sample['answer1']}" if x == 0 else f"B: {sample['answer2']}", key="selected_answer" ) # Independent checkbox for insufficient information not_enough_info = st.checkbox("Il frame non contiene sufficienti informazioni per scegliere", key="not_enough_info") # Submit button submit_button = st.form_submit_button("Continua", on_click=save_choice)