Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
from huggingface_hub import HfApi, hf_hub_download | |
# Constants | |
HF_REPO = "giobin/MAIA_human_assessment_annotations" | |
CSV_FILENAME = "user_selections.csv" | |
# Function to assign samples to users | |
def assign_samples(csv_path): | |
df = pd.read_csv(csv_path) | |
group_1 = df[(df["pool_pos"] == 1) & (~df["question_category"].str.endswith("_B"))].iloc[100:150] # select 50 sampels from the 100th | |
group_2 = df[(df["pool_pos"] == 2) & (~df["question_category"].str.endswith("_B"))].iloc[100:150] | |
group_3 = df[(df["pool_pos"] == 3) & (~df["question_category"].str.endswith("_B"))].iloc[100:150] | |
return { | |
"Bernardo": group_1, | |
"Alessandro": group_1, | |
"Alessio": group_1, | |
"Lenci": group_2, | |
"Lucia": group_2, | |
"Davide": group_2, | |
"Giovanni": group_3, | |
"Raffaella": group_3, | |
} | |
# Function to load existing annotations from Hugging Face Hub | |
def load_existing_annotations(): | |
try: | |
file_path = hf_hub_download(HF_REPO, CSV_FILENAME, repo_type="dataset", token=st.secrets["HF_TOKEN"]) | |
return pd.read_csv(file_path) | |
except Exception: | |
return pd.DataFrame(columns=["username", "id"]) | |
# Load datasets | |
csv_file = "static/mc.csv" | |
assignments = assign_samples(csv_file) | |
existing_annotations = load_existing_annotations() | |
# Valid users | |
valid_users = list(assignments.keys()) | |
# Initialize session state variables | |
if "username" not in st.session_state: | |
st.session_state.username = None | |
if "index" not in st.session_state: | |
st.session_state.index = 0 | |
if "results" not in st.session_state: | |
st.session_state.results = [] | |
# User selectiontion | |
def update_name(): | |
"""Set username and reset index.""" | |
st.session_state.username = st.session_state.selected_user | |
st.session_state.index = 0 # Reset progress | |
if st.session_state.username is None: | |
with st.form("user_form"): | |
st.write("### Seleziona il tuo nome") | |
selected_user = st.selectbox("Nome:", valid_users, key="selected_user") | |
submit_button = st.form_submit_button("Inizia il task", on_click=update_name) | |
st.stop() | |
# Retrieve assigned dataset and filter out already labeled samples | |
full_dataset = assignments[st.session_state.username].reset_index(drop=True) | |
user_labeled_ids = existing_annotations[existing_annotations["username"] == st.session_state.username]["id"].tolist() | |
dataset = full_dataset[~full_dataset["id"].isin(user_labeled_ids)].reset_index(drop=True) | |
# Check if all samples are labeled | |
if st.session_state.index >= len(dataset): | |
st.write("### Ottimo. Hai completato il tuo task! 🎉") | |
st.stop() | |
# Function to push updated annotations to Hugging Face Hub | |
def push_to_hf_hub(csv_path): | |
api = HfApi() | |
try: | |
api.create_repo(HF_REPO, repo_type="dataset", exist_ok=True, token=st.secrets["HF_TOKEN"]) | |
api.upload_file(path_or_fileobj=csv_path, path_in_repo=CSV_FILENAME, repo_id=HF_REPO, repo_type="dataset", token=st.secrets["HF_TOKEN"]) | |
print(f"Dataset updated: https://huggingface.co/datasets/{HF_REPO}") | |
except Exception as e: | |
print(f"Error pushing to HF: {e}") | |
# Function to save user choice | |
def save_choice(): | |
sample = dataset.iloc[st.session_state.index] | |
selected_answer = st.session_state.get("selected_answer", None) | |
not_enough_info = st.session_state.get("not_enough_info", False) | |
if selected_answer is not None: | |
st.session_state.results.append({ | |
"username": st.session_state.username, | |
"id": sample["id"], | |
"video_id": sample["video_id"], | |
"answer1": sample["answer1"], | |
"answer2": sample["answer2"], | |
"selected_answer": selected_answer, | |
"target": sample["target"], | |
"not_enough_info": not_enough_info | |
}) | |
st.session_state.index += 1 | |
# Save results and push to Hugging Face Hub if all samples are labeled | |
if st.session_state.index >= len(dataset): | |
st.write("### Ottimo. Hai completato il tuo task! 🎉") | |
result_df = pd.DataFrame(st.session_state.results) | |
csv_path = "user_selections.csv" | |
if not existing_annotations.empty: | |
result_df = pd.concat([existing_annotations, result_df]).drop_duplicates(subset=["username", "id"], keep="last") | |
result_df.to_csv(csv_path, index=False) | |
push_to_hf_hub(csv_path) | |
st.stop() | |
# Display current sample | |
sample = dataset.iloc[st.session_state.index] | |
# Page title and user information | |
st.markdown("<h1 style='text-align: center; font-size: 50px;'>MAIA Sample</h1>", unsafe_allow_html=True) | |
st.markdown(f"<h3 style='text-align: center;'>User: {st.session_state.username}</h3>", unsafe_allow_html=True) | |
st.write("\n\n") | |
# Instructions | |
st.markdown(""" | |
### Istruzioni: | |
- Osserva attentamente il frame del video (senza premere play) e cerca di capire il contesto della scena | |
- Valuta le opzioni (A e B) e seleziona quella che ritieni più attinente al video. Per selezionare l'alternativa usa l'immagine, la tua conoscenza e/o qualsiasi ragionamento utile. | |
- Se il frame non contiene sufficienti informazioni per decidere l’alternativa appena selezionata, seleziona il checkbox sottostante. | |
- Clicca 'Continua' per procedere. | |
""") | |
st.write("---") | |
def convert_youtube_shorts_url(url): | |
"""Convert a YouTube Shorts URL to a standard YouTube video URL.""" | |
if "youtube.com/shorts/" in url: | |
video_id = url.split("/")[-1].split("?")[0] # Extract the video ID | |
return f"https://www.youtube.com/watch?v={video_id}" | |
return url | |
fixed_url = convert_youtube_shorts_url(sample["video_url"]) | |
st.video(fixed_url) | |
# Display video thumbnail | |
# st.video(sample["video_url"]) | |
# Form for user input | |
with st.form("annotation_form"): | |
# Exclusive choice between A and B | |
selected_answer = st.radio( | |
"Seleziona la descrizione corretta:", | |
options=[0, 1], | |
index=None, | |
format_func=lambda x: f"A: {sample['answer1']}" if x == 0 else f"B: {sample['answer2']}", | |
key="selected_answer" | |
) | |
# Independent checkbox for insufficient information | |
not_enough_info = st.checkbox("Il frame non contiene sufficienti informazioni per scegliere", key="not_enough_info") | |
# Submit button | |
submit_button = st.form_submit_button("Continua", on_click=save_choice) | |