Spaces:
Sleeping
Sleeping
File size: 4,085 Bytes
48548de 36e4ada 48548de 36e4ada 48548de 36e4ada 48548de 36e4ada 48548de c931653 48548de c931653 48548de 36e4ada c931653 48548de 36e4ada 48548de 36e4ada 48548de cc9bba0 48548de c931653 48548de 36e4ada 48548de c931653 48548de c931653 48548de c931653 36e4ada 48548de 36e4ada 48548de 36e4ada 48548de 36e4ada |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import json
import traceback
from queue import Queue
from threading import Thread
from typing import List
import argilla as rg
import gradio as gr
from gradio_client import Client
client = rg.Argilla()
completed_record_events = Queue()
def build_dataset(client: rg.Argilla) -> rg.Dataset:
settings = rg.Settings.from_hub("stanfordnlp/imdb")
settings.questions.add(
rg.LabelQuestion(name="sentiment", labels=["negative", "positive"])
)
dataset_name = "stanfordnlp_imdb"
dataset = client.datasets(dataset_name) or rg.Dataset.from_hub(
"stanfordnlp/imdb",
name=dataset_name,
settings=settings,
client=client,
split="train[:1000]",
)
return dataset
with gr.Blocks() as demo:
argilla_server = client.http_client.base_url
gr.Markdown("## Argilla Events")
gr.Markdown(
f"This demo shows the incoming events from the [Argilla Server]({argilla_server})."
)
gr.Markdown("### Record Events")
gr.Markdown("#### Records are processed in background and suggestions are added.")
server, _, _ = demo.launch(prevent_thread_lock=True, app_kwargs={"docs_url": "/docs"})
# Set up the webhook listeners
rg.set_webhook_server(server)
for webhook in client.webhooks:
webhook.enabled = False
webhook.update()
# Create a webhook for record events
@rg.webhook_listener(events="record.completed")
async def record_events(record: rg.Record, type: str, **kwargs):
print("Received event", type)
completed_record_events.put(record)
dataset = build_dataset(client)
def add_record_suggestions_on_response_created():
print("Starting thread")
completed_records_filter = rg.Filter(("status", "==", "completed"))
pending_records_filter = rg.Filter(("status", "==", "pending"))
while True:
try:
record: rg.Record = completed_record_events.get()
if dataset.id != record.dataset.id:
continue
# Prepare predict data
field = dataset.settings.fields["text"]
question = dataset.settings.questions["sentiment"]
examples = list(
dataset.records(
query=rg.Query(filter=completed_records_filter),
limit=5,
)
)
some_pending_records = list(
dataset.records(
query=rg.Query(filter=pending_records_filter),
limit=5,
)
)
if not some_pending_records:
continue
some_pending_records = parse_pending_records(
some_pending_records, field, question, examples
)
dataset.records.log(some_pending_records)
except Exception:
print(traceback.format_exc())
continue
def parse_pending_records(
records: List[rg.Record],
field: rg.Field,
question,
example_records: List[rg.Record],
) -> List[rg.Record]:
try:
gradio_client = Client("davidberenstein1957/distilabel-argilla-labeller")
payload = {
"records": [record.to_dict() for record in records],
"fields": [field.serialize()],
"question": question.serialize(),
"example_records": [record.to_dict() for record in example_records],
"api_name": "/predict",
}
response = gradio_client.predict(**payload)
response = json.loads(response) if isinstance(response, str) else response
for record, suggestion in zip(records, response["results"]):
record.suggestions.add(
rg.Suggestion(
question_name=question.name,
value=suggestion["value"],
score=suggestion["score"],
agent=suggestion["agent"],
)
)
except Exception:
print(traceback.format_exc())
return records
thread = Thread(target=add_record_suggestions_on_response_created)
thread.start()
demo.block_thread()
|