Spaces:
Runtime error
Runtime error
from apscheduler.schedulers.background import BackgroundScheduler | |
import datetime | |
import os | |
from typing import Dict, Tuple | |
from uuid import UUID | |
import altair as alt | |
import argilla as rg | |
from argilla.feedback import FeedbackDataset | |
from argilla.client.feedback.dataset.remote.dataset import RemoteFeedbackDataset | |
import gradio as gr | |
import pandas as pd | |
def obtain_source_target_datasets() -> ( | |
Tuple[ | |
FeedbackDataset | RemoteFeedbackDataset, FeedbackDataset | RemoteFeedbackDataset | |
] | |
): | |
""" | |
This function returns the source and target datasets to be used in the application. | |
Returns: | |
A tuple with the source and target datasets. The source dataset is filtered by the response status 'pending'. | |
""" | |
# Obtain the public dataset and see how many pending records are there | |
source_dataset = rg.FeedbackDataset.from_argilla( | |
os.getenv("SOURCE_DATASET"), workspace=os.getenv("SOURCE_WORKSPACE") | |
) | |
filtered_source_dataset = source_dataset.filter_by(response_status=["pending"]) | |
# Obtain a list of users from the private workspace | |
# target_dataset = rg.FeedbackDataset.from_argilla( | |
# os.getenv("RESULTS_DATASET"), workspace=os.getenv("RESULTS_WORKSPACE") | |
# ) | |
target_dataset = source_dataset.filter_by(response_status=["submitted"]) | |
return filtered_source_dataset, target_dataset | |
def get_user_annotations_dictionary( | |
dataset: FeedbackDataset | RemoteFeedbackDataset, | |
) -> Dict[str, int]: | |
""" | |
This function returns a dictionary with the username as the key and the number of annotations as the value. | |
Args: | |
dataset: The dataset to be analyzed. | |
Returns: | |
A dictionary with the username as the key and the number of annotations as the value. | |
""" | |
output = {} | |
for record in dataset: | |
for response in record.responses: | |
if str(response.user_id) not in output.keys(): | |
output[str(response.user_id)] = 1 | |
else: | |
output[str(response.user_id)] += 1 | |
# Changing the name of the keys, from the id to the username | |
for key in list(output.keys()): | |
output[rg.User.from_id(UUID(key)).username] = output.pop(key) | |
return output | |
def donut_chart_total() -> alt.Chart: | |
""" | |
This function returns a donut chart with the progress of the total annotations. | |
Counts each record that has been annotated at least once. | |
Returns: | |
An altair chart with the donut chart. | |
""" | |
# Load your data | |
annotated_records = len(target_dataset) | |
pending_records = int(os.getenv("TARGET_RECORDS")) - annotated_records | |
# Prepare data for the donut chart | |
source = pd.DataFrame( | |
{ | |
"values": [annotated_records, pending_records], | |
"category": ["Completed", "Remaining"], | |
"colors": ["#4CAF50", "#757575"], # Green for Completed, Grey for Remaining | |
} | |
) | |
base = alt.Chart(source).encode( | |
theta=alt.Theta("values:Q", stack=True), | |
radius=alt.Radius( | |
"values", scale=alt.Scale(type="sqrt", zero=True, rangeMin=20) | |
), | |
color=alt.Color("category:N", legend=alt.Legend(title="Category")), | |
) | |
c1 = base.mark_arc(innerRadius=20, stroke="#fff") | |
c2 = base.mark_text(radiusOffset=20).encode(text="values:Q") | |
chart = c1 + c2 | |
return chart | |
def donut_chart_target() -> alt.Chart: | |
""" | |
This function returns a donut chart with the progress of the total annotations, in terms of the v1 objective. | |
Counts each record that has been annotated at least once. | |
Returns: | |
An altair chart with the donut chart. | |
""" | |
# Load your data | |
annotated_records = len(target_dataset) | |
pending_records = int(os.getenv("TARGET_ANNOTATIONS_V1")) - annotated_records | |
# Prepare data for the donut chart | |
source = pd.DataFrame( | |
{ | |
"values": [annotated_records, pending_records], | |
"category": ["Completed", "Remaining"], | |
"colors": ["#4CAF50", "#757575"], # Green for Completed, Grey for Remaining | |
} | |
) | |
base = alt.Chart(source).encode( | |
theta=alt.Theta("values:Q", stack=True), | |
radius=alt.Radius( | |
"values", scale=alt.Scale(type="sqrt", zero=True, rangeMin=20) | |
), | |
color=alt.Color("category:N", legend=alt.Legend(title="Category")), | |
) | |
c1 = base.mark_arc(innerRadius=20, stroke="#fff") | |
c2 = base.mark_text(radiusOffset=20).encode(text="values:Q") | |
chart = c1 + c2 | |
return chart | |
def kpi_chart_remaining() -> alt.Chart: | |
""" | |
This function returns a KPI chart with the remaining amount of records to be annotated. | |
Returns: | |
An altair chart with the KPI chart. | |
""" | |
pending_records = int(os.getenv("TARGET_RECORDS")) - len(target_dataset) | |
# Assuming you have a DataFrame with user data, create a sample DataFrame | |
data = pd.DataFrame({"Category": ["Total remaining"], "Value": [pending_records]}) | |
# Create Altair chart | |
chart = ( | |
alt.Chart(data) | |
.mark_text(fontSize=100, align="center", baseline="middle", color="steelblue") | |
.encode(text="Value:N") | |
.properties(title="Total remaining", width=250, height=200) | |
) | |
return chart | |
def kpi_chart_submitted() -> alt.Chart: | |
""" | |
This function returns a KPI chart with the total amount of records that have been annotated. | |
Returns: | |
An altair chart with the KPI chart. | |
""" | |
total = len(target_dataset) | |
# Assuming you have a DataFrame with user data, create a sample DataFrame | |
data = pd.DataFrame({"Category": ["Total completed"], "Value": [total]}) | |
# Create Altair chart | |
chart = ( | |
alt.Chart(data) | |
.mark_text(fontSize=100, align="center", baseline="middle", color="steelblue") | |
.encode(text="Value:N") | |
.properties(title="Total completed", width=250, height=200) | |
) | |
return chart | |
def kpi_chart() -> alt.Chart: | |
""" | |
This function returns a KPI chart with the total amount of annotators. | |
Returns: | |
An altair chart with the KPI chart. | |
""" | |
# Obtain the total amount of annotators | |
total_annotators = len(user_ids_annotations) | |
# Assuming you have a DataFrame with user data, create a sample DataFrame | |
data = pd.DataFrame( | |
{"Category": ["Total Contributors"], "Value": [total_annotators]} | |
) | |
# Create Altair chart | |
chart = ( | |
alt.Chart(data) | |
.mark_text(fontSize=100, align="center", baseline="middle", color="steelblue") | |
.encode(text="Value:N") | |
.properties(title="Number of Contributors", width=250, height=200) | |
) | |
return chart | |
def render_hub_user_link(hub_id): | |
link = f"https://huggingface.co/{hub_id}" | |
return f'<a target="_blank" href="{link}" style="color: var(--link-text-color); text-decoration: underline;text-decoration-style: dotted;">{hub_id}</a>' | |
def obtain_top_5_users(user_ids_annotations: Dict[str, int]) -> pd.DataFrame: | |
""" | |
This function returns the top 5 users with the most annotations. | |
Args: | |
user_ids_annotations: A dictionary with the user ids as the key and the number of annotations as the value. | |
Returns: | |
A pandas dataframe with the top 5 users with the most annotations. | |
""" | |
dataframe = pd.DataFrame( | |
user_ids_annotations.items(), columns=["Name", "Submitted Responses"] | |
) | |
dataframe["Name"] = dataframe["Name"].apply(render_hub_user_link) | |
dataframe = dataframe.sort_values(by="Submitted Responses", ascending=False) | |
return dataframe.head(50) | |
def fetch_data() -> None: | |
""" | |
This function fetches the data from the source and target datasets and updates the global variables. | |
""" | |
print(f"Starting to fetch data: {datetime.datetime.now()}") | |
global source_dataset, target_dataset, user_ids_annotations, annotated, remaining, percentage_completed, top5_dataframe | |
source_dataset, target_dataset = obtain_source_target_datasets() | |
user_ids_annotations = get_user_annotations_dictionary(target_dataset) | |
annotated = len(target_dataset) | |
remaining = int(os.getenv("TARGET_RECORDS")) - annotated | |
percentage_completed = round( | |
(annotated / int(os.getenv("TARGET_RECORDS"))) * 100, 1 | |
) | |
# Print the current date and time | |
print(f"Data fetched: {datetime.datetime.now()}") | |
def get_top5() -> pd.DataFrame: | |
return obtain_top_5_users(user_ids_annotations) | |
def main() -> None: | |
# Set the update interval | |
update_interval = 300 # seconds | |
update_interval_charts = 30 # seconds | |
# Connect to the space with rg.init() | |
rg.init( | |
api_url=os.getenv("ARGILLA_API_URL"), | |
api_key=os.getenv("ARGILLA_API_KEY"), | |
extra_headers={"Authorization": f"Bearer {os.getenv('HF_TOKEN')}"}, | |
) | |
fetch_data() | |
scheduler = BackgroundScheduler() | |
scheduler.add_job( | |
func=fetch_data, trigger="interval", seconds=update_interval, max_instances=1 | |
) | |
scheduler.start() | |
# To avoid the orange border for the Gradio elements that are in constant loading | |
css = """ | |
.generating { | |
border: none; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
gr.Markdown( | |
""" | |
# π£οΈ The Prompt Collective Dashboad | |
This Gradio dashboard shows the progress of the first "Data is Better Together" initiative to understand and collect good quality and diverse prompt for the OSS AI community. | |
If you want to contribute to OSS AI, join [the Prompt Collective HF Space](https://huggingface.co/spaces/DIBT/prompt-collective). | |
""" | |
) | |
gr.Markdown( | |
f""" | |
## π Target for Releasing Dataset v2 | |
How close are we to the target for version 2.0? | |
""" | |
) | |
with gr.Row(): | |
donut_target_plot = gr.Plot(label="Plot") | |
demo.load( | |
donut_chart_target, | |
inputs=[], | |
outputs=[donut_target_plot], | |
every=update_interval_charts, | |
) | |
gr.Markdown( | |
f""" | |
## π Target for Releasing Dataset v1 | |
Done! Thanks to the awesome DIBT community we've surpassed 10K rated prompts. Open Dataset coming soon! | |
""" | |
) | |
gr.Markdown( | |
f""" | |
## π Global Progress | |
Here's what the community has achieved so far! | |
""" | |
) | |
with gr.Row(): | |
kpi_submitted_plot = gr.Plot(label="Plot") | |
demo.load( | |
kpi_chart_submitted, | |
inputs=[], | |
outputs=[kpi_submitted_plot], | |
every=update_interval_charts, | |
) | |
kpi_remaining_plot = gr.Plot(label="Plot") | |
demo.load( | |
kpi_chart_remaining, | |
inputs=[], | |
outputs=[kpi_remaining_plot], | |
every=update_interval_charts, | |
) | |
donut_total_plot = gr.Plot(label="Plot") | |
demo.load( | |
donut_chart_total, | |
inputs=[], | |
outputs=[donut_total_plot], | |
every=update_interval_charts, | |
) | |
gr.Markdown( | |
""" | |
## πΎ Contributors Hall of Fame | |
The number of all contributors and the top contributors: | |
""" | |
) | |
with gr.Row(): | |
kpi_hall_plot = gr.Plot(label="Plot") | |
demo.load( | |
kpi_chart, inputs=[], outputs=[kpi_hall_plot], every=update_interval_charts | |
) | |
top5_df_plot = gr.Dataframe( | |
headers=["Name", "Submitted Responses"], | |
datatype=[ | |
"markdown", | |
"number", | |
], | |
row_count=50, | |
col_count=(2, "fixed"), | |
interactive=False, | |
every=update_interval, | |
) | |
demo.load(get_top5, None, [top5_df_plot], every=update_interval_charts) | |
# Launch the Gradio interface | |
demo.launch() | |
if __name__ == "__main__": | |
main() | |