Francisco Aranda
chore: Configure Gradio env
16eaf19
raw
history blame
1.81 kB
import os
from datetime import datetime
from queue import Queue
import argilla as rg
import gradio as gr
client = rg.Argilla()
server = rg.get_webhook_server()
incoming_events = Queue()
# Set up the webhook listeners
# Delete all existing webhooks
for webhook in client.webhooks:
print(f"Deleting webhook: {webhook.url}")
webhook.delete()
# Create a webhook for record events
@rg.webhook_listener(events=["record.created", "record.updated", "record.deleted", "record.completed"])
async def record_events(record: rg.Record, type: str, timestamp: datetime):
print(f"Received event type {type} at {timestamp}: ", record)
incoming_events.put(record)
# Create a webhook for dataset events
@rg.webhook_listener(events=["dataset.created", "dataset.updated", "dataset.deleted", "dataset.published"])
async def dataset_events(dataset: rg.Dataset, type: str, timestamp: datetime):
print(f"Received event type {type} at {timestamp}: ", dataset)
incoming_events.put((type, dataset))
# Create a webhook for response events
@rg.webhook_listener(events=["response.created", "response.updated"])
async def response_events(response: rg.UserResponse, type: str, timestamp: datetime):
print(f"Received event type {type} at {timestamp}: ", response)
incoming_events.put(response)
def read_next_event():
event = incoming_events.get()
return event
with gr.Blocks() as demo:
argilla_server = client.http_client.base_url
gr.Markdown("## Argilla Events")
gr.Markdown(f"This demo shows the incoming events from the [Argilla Server]({argilla_server}).")
json_component = gr.JSON(label="Incoming argilla events:", value={})
gr.Timer(1, active=True).tick(read_next_event, outputs=json_component)
gr.mount_gradio_app(server, demo, path="/")
demo.launch()