from queue import Queue import argilla as rg import gradio as gr client = rg.Argilla() server = rg.get_webhook_server() incoming_events = Queue() # Set up the webhook listeners # Delete all existing webhooks for webhook in client.webhooks: webhook.delete() # Create a webhook for record events @rg.webhook_listener(events=["record.created", "record.updated", "record.completed"]) async def record_events(record: rg.Record, **kwargs): print(f"Received event {kwargs['type']} for record {record.id}") incoming_events.put(record) # Create a webhook for dataset events @rg.webhook_listener(events=["dataset.created", "dataset.updated", "dataset.published"]) async def dataset_events(type: str, dataset: rg.Dataset | None = None, **kwargs): if "deleted" in type: print(f"Received event {type} for dataset {kwargs['data']}") incoming_events.put((type, kwargs["data"])) else: print(f"Received event {type} for dataset {dataset.id}") incoming_events.put((type, dataset)) # Create a webhook for response events @rg.webhook_listener(events=["response.created", "response.updated", "response.deleted"]) async def response_events(response: rg.UserResponse, **kwargs): print(f"Received event {kwargs['type']} for response {response.id}") incoming_events.put(response) def check_incoming_events(): """ This function is called every 5 seconds to check if there are any incoming events and send data to update the JSON component. """ events = [] while not incoming_events.empty(): events.append(incoming_events.get()) return {"events": events} with gr.Blocks() as demo: json_component = gr.JSON(label="Incoming argilla events:") gr.Timer(5, active=True).tick(check_incoming_events, outputs=json_component) gr.mount_gradio_app(server, demo, path="/gradio")