argilla-webhooks / main.py
Francisco Aranda
mount gradio app on /
6020a54
raw
history blame
1.86 kB
from queue import Queue
import argilla as rg
import gradio as gr
client = rg.Argilla()
server = rg.get_webhook_server()
incoming_events = Queue()
# Set up the webhook listeners
# Delete all existing webhooks
for webhook in client.webhooks:
webhook.delete()
# Create a webhook for record events
@rg.webhook_listener(events=["record.created", "record.updated", "record.completed"])
async def record_events(record: rg.Record, **kwargs):
print(f"Received event {kwargs['type']} for record {record.id}")
incoming_events.put(record)
# Create a webhook for dataset events
@rg.webhook_listener(events=["dataset.created", "dataset.updated", "dataset.published"])
async def dataset_events(type: str, dataset: rg.Dataset | None = None, **kwargs):
if "deleted" in type:
print(f"Received event {type} for dataset {kwargs['data']}")
incoming_events.put((type, kwargs["data"]))
else:
print(f"Received event {type} for dataset {dataset.id}")
incoming_events.put((type, dataset))
# Create a webhook for response events
@rg.webhook_listener(events=["response.created", "response.updated", "response.deleted"])
async def response_events(response: rg.UserResponse, **kwargs):
print(f"Received event {kwargs['type']} for response {response.id}")
incoming_events.put(response)
def check_incoming_events():
"""
This function is called every 5 seconds to check if there are any incoming
events and send data to update the JSON component.
"""
events = []
while not incoming_events.empty():
events.append(incoming_events.get())
return {"events": events}
with gr.Blocks() as demo:
json_component = gr.JSON(label="Incoming argilla events:")
gr.Timer(5, active=True).tick(check_incoming_events, outputs=json_component)
gr.mount_gradio_app(server, demo, path="/")