Spaces:
Runtime error
Runtime error
import os | |
from datetime import datetime | |
from queue import Queue | |
import argilla as rg | |
import gradio as gr | |
client = rg.Argilla() | |
server = rg.get_webhook_server() | |
incoming_events = Queue() | |
# Set up the webhook listeners | |
# Delete all existing webhooks | |
for webhook in client.webhooks: | |
print(f"Deleting webhook: {webhook.url}") | |
webhook.delete() | |
# Create a webhook for record events | |
async def record_events(record: rg.Record, type: str, timestamp: datetime): | |
print(f"Received event type {type} at {timestamp}: ", record) | |
incoming_events.put(record) | |
# Create a webhook for dataset events | |
async def dataset_events(dataset: rg.Dataset, type: str, timestamp: datetime): | |
print(f"Received event type {type} at {timestamp}: ", dataset) | |
incoming_events.put((type, dataset)) | |
# Create a webhook for response events | |
async def response_events(response: rg.UserResponse, type: str, timestamp: datetime): | |
print(f"Received event type {type} at {timestamp}: ", response) | |
incoming_events.put(response) | |
def read_next_event(): | |
event = incoming_events.get() | |
return event | |
with gr.Blocks() as demo: | |
argilla_server = client.http_client.base_url | |
gr.Markdown("## Argilla Events") | |
gr.Markdown(f"This demo shows the incoming events from the [Argilla Server]({argilla_server}).") | |
json_component = gr.JSON(label="Incoming argilla events:", value={}) | |
gr.Timer(1, active=True).tick(read_next_event, outputs=json_component) | |
gr.mount_gradio_app(server, demo, path="/") | |
demo.launch() | |