Spaces:
Runtime error
Runtime error
import os | |
from datetime import datetime | |
from queue import Queue | |
import argilla as rg | |
import gradio as gr | |
client = rg.Argilla() | |
server = rg.get_webhook_server() | |
incoming_events = Queue() | |
# The default webhook server URL is set to http://127.0.0.1:8000 | |
# This value is important since the webhook server URL is used to create the webhook URLs | |
# When the SPACE_HOST environment variable is set, the default webhook server URL is set to https://SPACE_HOST value | |
# If the FastAPI app is running on a different URL, the WEBHOOK_SERVER_URL environment variable should be set | |
default_webhook_server_url = os.getenv("WEBHOOK_SERVER_URL") | |
if default_webhook_server_url is None and "SPACE_HOST" in os.environ: | |
# We set it to the default gradio server URL | |
os.environ["WEBHOOK_SERVER_URL"] = "http://127.0.0.1:7860" | |
# Set up the webhook listeners | |
# Create a webhook for record events | |
async def record_events(record: rg.Record, type: str, timestamp: datetime): | |
print(f"Received event type {type} at {timestamp}: ", record) | |
incoming_events.put(record) | |
# Create a webhook for dataset events | |
async def dataset_events(dataset: rg.Dataset, type: str, timestamp: datetime): | |
print(f"Received event type {type} at {timestamp}: ", dataset) | |
incoming_events.put((type, dataset)) | |
# Create a webhook for response events | |
async def response_events(response: rg.UserResponse, type: str, timestamp: datetime): | |
print(f"Received event type {type} at {timestamp}: ", response) | |
incoming_events.put(response) | |
def read_next_event(): | |
event = incoming_events.get() | |
return event | |
with gr.Blocks() as demo: | |
argilla_server = client.http_client.base_url | |
gr.Markdown("## Argilla Events") | |
gr.Markdown(f"This demo shows the incoming events from the [Argilla Server]({argilla_server}).") | |
json_component = gr.JSON(label="Incoming argilla events:", value={}) | |
gr.Timer(1, active=True).tick(read_next_event, outputs=json_component) | |
gr.mount_gradio_app(server, demo, path="/") | |