from queue import Queue import argilla as rg import gradio as gr client = rg.Argilla() server = rg.get_webhook_server() incoming_events = Queue() # Set up the webhook listeners # Delete all existing webhooks for webhook in client.webhooks: webhook.delete() # Create a webhook for record events @rg.webhook_listener( events=["record.created", "record.updated", "record.completed"], raw_event=True # Using raw events until PR https://github.com/argilla-io/argilla/pull/5500 is merged ) async def record_events(event:dict): print("Received event", event) incoming_events.put(event) # Create a webhook for dataset events @rg.webhook_listener(events=["dataset.created", "dataset.updated", "dataset.published"]) async def dataset_events(type: str, dataset: rg.Dataset | None = None, **kwargs): print(f"Received event {type} for dataset {dataset.id}") incoming_events.put((type, dataset)) # Create a webhook for response events @rg.webhook_listener( events=["response.created", "response.updated"], raw_event=True # Using raw events until PR https://github.com/argilla-io/argilla/pull/5500 is merged ) async def response_events(event: dict): print("Received event", event) incoming_events.put(event) @rg.webhook_listener(events=["record.deleted", "dataset.deleted", "response.deleted"]) async def deleted_events(type: str, data: dict, **kwargs): print(f"Received event {type} for resource {data}") incoming_events.put((type, data)) def check_incoming_events(): """ This function is called every 5 seconds to check if there are any incoming events and send data to update the JSON component. """ events = [] while not incoming_events.empty(): events.append(incoming_events.get()) return {"events": events} with gr.Blocks() as demo: json_component = gr.JSON(label="Incoming argilla events:") gr.Timer(5, active=True).tick(check_incoming_events, outputs=json_component) gr.mount_gradio_app(server, demo, path="/")