argilla-webhooks / main.py
Francisco Aranda
using raw events for records and responses
48225dd
raw
history blame
2.02 kB
from queue import Queue
import argilla as rg
import gradio as gr
client = rg.Argilla()
server = rg.get_webhook_server()
incoming_events = Queue()
# Set up the webhook listeners
# Delete all existing webhooks
for webhook in client.webhooks:
webhook.delete()
# Create a webhook for record events
@rg.webhook_listener(
events=["record.created", "record.updated", "record.completed"],
raw_event=True # Using raw events until PR https://github.com/argilla-io/argilla/pull/5500 is merged
)
async def record_events(event:dict):
print("Received event", event)
incoming_events.put(event)
# Create a webhook for dataset events
@rg.webhook_listener(events=["dataset.created", "dataset.updated", "dataset.published"])
async def dataset_events(type: str, dataset: rg.Dataset | None = None, **kwargs):
print(f"Received event {type} for dataset {dataset.id}")
incoming_events.put((type, dataset))
# Create a webhook for response events
@rg.webhook_listener(
events=["response.created", "response.updated"],
raw_event=True # Using raw events until PR https://github.com/argilla-io/argilla/pull/5500 is merged
)
async def response_events(event: dict):
print("Received event", event)
incoming_events.put(event)
@rg.webhook_listener(events=["record.deleted", "dataset.deleted", "response.deleted"])
async def deleted_events(type: str, data: dict, **kwargs):
print(f"Received event {type} for resource {data}")
incoming_events.put((type, data))
def check_incoming_events():
"""
This function is called every 5 seconds to check if there are any incoming
events and send data to update the JSON component.
"""
events = []
while not incoming_events.empty():
events.append(incoming_events.get())
return {"events": events}
with gr.Blocks() as demo:
json_component = gr.JSON(label="Incoming argilla events:")
gr.Timer(5, active=True).tick(check_incoming_events, outputs=json_component)
gr.mount_gradio_app(server, demo, path="/")