webhook / app.py
Wauplin's picture
Wauplin HF staff
refacto + add example
d9c353a
raw
history blame
7.5 kB
import os
from pathlib import Path
from typing import Literal
from fastapi import BackgroundTasks, HTTPException
from huggingface_hub import (
CommitOperationAdd,
CommitOperationDelete,
comment_discussion,
create_commit,
create_repo,
delete_repo,
get_repo_discussions,
snapshot_download,
space_info,
)
from huggingface_hub.repocard import RepoCard
from requests import HTTPError
from gradio_webhooks import GradioWebhookApp, WebhookPayload
HF_TOKEN = os.getenv("HF_TOKEN")
app = GradioWebhookApp()
@app.add_webhook("/webhook")
async def post_webhook(payload: WebhookPayload, task_queue: BackgroundTasks):
if payload.repo.type != "space":
print("HTTP 400: not a space")
raise HTTPException(400, f"Must be a Space, not {payload.repo.type}")
space_id = payload.repo.name
if (
payload.event.scope.startswith("discussion")
and payload.event.action == "create"
and payload.discussion is not None
and payload.discussion.isPullRequest
and payload.discussion.status == "open"
):
# New PR!
if not is_pr_synced(space_id=space_id, pr_num=payload.discussion.num):
task_queue.add_task(
sync_ci_space,
space_id=space_id,
pr_num=payload.discussion.num,
private=payload.repo.private,
)
print("New PR! Sync task scheduled")
else:
print("New comment on PR but CI space already synced")
elif (
payload.event.scope.startswith("discussion")
and payload.event.action == "update"
and payload.discussion is not None
and payload.discussion.isPullRequest
and (
payload.discussion.status == "merged"
or payload.discussion.status == "closed"
)
):
# PR merged or closed!
task_queue.add_task(
delete_ci_space,
space_id=space_id,
pr_num=payload.discussion.num,
)
print("PR is merged (or closed)! Delete task scheduled")
elif (
payload.event.scope.startswith("repo.content")
and payload.event.action == "update"
):
# New repo change. Is it a commit on a PR?
# => loop through all PRs and check if new changes happened
print("New repo content update. Checking PRs state.")
for discussion in get_repo_discussions(
repo_id=space_id, repo_type="space", token=HF_TOKEN
):
if discussion.is_pull_request and discussion.status == "open":
if not is_pr_synced(space_id=space_id, pr_num=discussion.num):
task_queue.add_task(
sync_ci_space,
space_id=space_id,
pr_num=discussion.num,
private=payload.repo.private,
)
print(f"Scheduled update for PR {discussion.num}.")
print(f"Done looping over PRs.")
else:
print(f"Webhook ignored.")
print(f"Done.")
return {"processed": True}
def is_pr_synced(space_id: str, pr_num: int) -> bool:
# What is the last synced commit for this PR?
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num)
try:
card = RepoCard.load(
repo_id_or_path=ci_space_id, repo_type="space", token=HF_TOKEN
)
last_synced_sha = getattr(card.data, "synced_sha", None)
except HTTPError:
last_synced_sha = None
# What is the last commit id for this PR?
info = space_info(repo_id=space_id, revision=f"refs/pr/{pr_num}")
last_pr_sha = info.sha
# Is it up to date ?
return last_synced_sha == last_pr_sha
def sync_ci_space(space_id: str, pr_num: int, private: bool) -> None:
# Create a temporary space for CI if didn't exist
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num)
try:
create_repo(
ci_space_id,
repo_type="space",
space_sdk="docker",
private=private,
token=HF_TOKEN,
)
is_new = True
except HTTPError as err:
if err.response.status_code == 409: # already exists
is_new = False
else:
raise
# Download space codebase from PR revision
snapshot_path = Path(
snapshot_download(
repo_id=space_id,
revision=f"refs/pr/{pr_num}",
repo_type="space",
token=HF_TOKEN,
)
)
# Sync space codebase with PR revision
operations = [ # little aggressive but works
CommitOperationDelete(".", is_folder=True)
]
for filepath in snapshot_path.glob("**/*"):
if filepath.is_file():
path_in_repo = str(filepath.relative_to(snapshot_path))
# Upload all files without changes except for the README file
if path_in_repo == "README.md":
card = RepoCard.load(filepath)
setattr(card.data, "synced_sha", snapshot_path.name) # latest sha
path_or_fileobj = str(card).encode()
else:
path_or_fileobj = filepath
operations.append(
CommitOperationAdd(
path_in_repo=path_in_repo, path_or_fileobj=path_or_fileobj
)
)
create_commit(
repo_id=ci_space_id,
repo_type="space",
operations=operations,
commit_message=f"Sync CI Space with PR {pr_num}.",
token=HF_TOKEN,
)
# Post a comment on the PR
notify_pr(space_id=space_id, pr_num=pr_num, action="create" if is_new else "update")
def delete_ci_space(space_id: str, pr_num: int) -> None:
# Delete
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num)
delete_repo(repo_id=ci_space_id, repo_type="space", token=HF_TOKEN)
# Notify about deletion
notify_pr(space_id=space_id, pr_num=pr_num, action="delete")
def notify_pr(
space_id: str, pr_num: int, action: Literal["create", "update", "delete"]
) -> None:
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num)
if action == "create":
comment = NOTIFICATION_TEMPLATE_CREATE.format(ci_space_id=ci_space_id)
elif action == "update":
comment = NOTIFICATION_TEMPLATE_UPDATE.format(ci_space_id=ci_space_id)
elif action == "delete":
comment = NOTIFICATION_TEMPLATE_DELETE
else:
raise ValueError(f"Status {action} not handled.")
comment_discussion(
repo_id=space_id,
repo_type="space",
discussion_num=pr_num,
comment=comment,
token=HF_TOKEN,
)
def _get_ci_space_id(space_id: str, pr_num: int) -> str:
return f"{space_id}-ci-pr-{pr_num}"
NOTIFICATION_TEMPLATE_CREATE = """\
Hey there!
Following the creation of this PR, a temporary test Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been launched.
Any changes pushed to this PR will be synced with the test Space.
(This is an automated message)
"""
NOTIFICATION_TEMPLATE_UPDATE = """\
Hey there!
Following new commits that happened in this PR, the temporary test Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been updated.
(This is an automated message)
"""
NOTIFICATION_TEMPLATE_DELETE = """\
Hey there!
PR is now merged/closed. The temporary test Space has been deleted.
(This is an automated message)
"""
app.ready()