Spaces:
Runtime error
Runtime error
File size: 1,805 Bytes
a7c2cd2 30c85ad 32c2587 f039650 32c2587 f039650 32c2587 f039650 32c2587 ea4e048 32c2587 16eaf19 32c2587 30c85ad 48225dd ea4e048 f039650 bcf795e 32c2587 30c85ad f039650 b888aa1 f039650 32c2587 ea4e048 30c85ad ea4e048 48225dd f039650 30c85ad ea4e048 32c2587 ea4e048 32c2587 30c85ad 32c2587 a7c2cd2 30c85ad 32c2587 6020a54 16eaf19 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
import os
from datetime import datetime
from queue import Queue
import argilla as rg
import gradio as gr
client = rg.Argilla()
server = rg.get_webhook_server()
incoming_events = Queue()
# Set up the webhook listeners
# Delete all existing webhooks
for webhook in client.webhooks:
print(f"Deleting webhook: {webhook.url}")
webhook.delete()
# Create a webhook for record events
@rg.webhook_listener(events=["record.created", "record.updated", "record.deleted", "record.completed"])
async def record_events(record: rg.Record, type: str, timestamp: datetime):
print(f"Received event type {type} at {timestamp}: ", record)
incoming_events.put(record)
# Create a webhook for dataset events
@rg.webhook_listener(events=["dataset.created", "dataset.updated", "dataset.deleted", "dataset.published"])
async def dataset_events(dataset: rg.Dataset, type: str, timestamp: datetime):
print(f"Received event type {type} at {timestamp}: ", dataset)
incoming_events.put((type, dataset))
# Create a webhook for response events
@rg.webhook_listener(events=["response.created", "response.updated"])
async def response_events(response: rg.UserResponse, type: str, timestamp: datetime):
print(f"Received event type {type} at {timestamp}: ", response)
incoming_events.put(response)
def read_next_event():
event = incoming_events.get()
return event
with gr.Blocks() as demo:
argilla_server = client.http_client.base_url
gr.Markdown("## Argilla Events")
gr.Markdown(f"This demo shows the incoming events from the [Argilla Server]({argilla_server}).")
json_component = gr.JSON(label="Incoming argilla events:", value={})
gr.Timer(1, active=True).tick(read_next_event, outputs=json_component)
gr.mount_gradio_app(server, demo, path="/")
demo.launch()
|