Spaces:
Build error
Build error
import os | |
from pathlib import Path | |
from typing import Literal | |
from fastapi import BackgroundTasks, HTTPException | |
from huggingface_hub import ( | |
CommitOperationAdd, | |
CommitOperationDelete, | |
comment_discussion, | |
create_commit, | |
create_repo, | |
delete_repo, | |
get_repo_discussions, | |
snapshot_download, | |
space_info, | |
) | |
from huggingface_hub.repocard import RepoCard | |
from requests import HTTPError | |
from gradio_webhooks import GradioWebhookApp, WebhookPayload | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
app = GradioWebhookApp() | |
async def post_webhook(payload: WebhookPayload, task_queue: BackgroundTasks): | |
if payload.repo.type != "space": | |
print("HTTP 400: not a space") | |
raise HTTPException(400, f"Must be a Space, not {payload.repo.type}") | |
space_id = payload.repo.name | |
if ( | |
payload.event.scope.startswith("discussion") | |
and payload.event.action == "create" | |
and payload.discussion is not None | |
and payload.discussion.isPullRequest | |
and payload.discussion.status == "open" | |
): | |
# New PR! | |
if not is_pr_synced(space_id=space_id, pr_num=payload.discussion.num): | |
task_queue.add_task( | |
sync_ci_space, | |
space_id=space_id, | |
pr_num=payload.discussion.num, | |
private=payload.repo.private, | |
) | |
print("New PR! Sync task scheduled") | |
else: | |
print("New comment on PR but CI space already synced") | |
elif ( | |
payload.event.scope.startswith("discussion") | |
and payload.event.action == "update" | |
and payload.discussion is not None | |
and payload.discussion.isPullRequest | |
and ( | |
payload.discussion.status == "merged" | |
or payload.discussion.status == "closed" | |
) | |
): | |
# PR merged or closed! | |
task_queue.add_task( | |
delete_ci_space, | |
space_id=space_id, | |
pr_num=payload.discussion.num, | |
) | |
print("PR is merged (or closed)! Delete task scheduled") | |
elif ( | |
payload.event.scope.startswith("repo.content") | |
and payload.event.action == "update" | |
): | |
# New repo change. Is it a commit on a PR? | |
# => loop through all PRs and check if new changes happened | |
print("New repo content update. Checking PRs state.") | |
for discussion in get_repo_discussions( | |
repo_id=space_id, repo_type="space", token=HF_TOKEN | |
): | |
if discussion.is_pull_request and discussion.status == "open": | |
if not is_pr_synced(space_id=space_id, pr_num=discussion.num): | |
task_queue.add_task( | |
sync_ci_space, | |
space_id=space_id, | |
pr_num=discussion.num, | |
private=payload.repo.private, | |
) | |
print(f"Scheduled update for PR {discussion.num}.") | |
print(f"Done looping over PRs.") | |
else: | |
print(f"Webhook ignored.") | |
print(f"Done.") | |
return {"processed": True} | |
def is_pr_synced(space_id: str, pr_num: int) -> bool: | |
# What is the last synced commit for this PR? | |
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
try: | |
card = RepoCard.load( | |
repo_id_or_path=ci_space_id, repo_type="space", token=HF_TOKEN | |
) | |
last_synced_sha = getattr(card.data, "synced_sha", None) | |
except HTTPError: | |
last_synced_sha = None | |
# What is the last commit id for this PR? | |
info = space_info(repo_id=space_id, revision=f"refs/pr/{pr_num}") | |
last_pr_sha = info.sha | |
# Is it up to date ? | |
return last_synced_sha == last_pr_sha | |
def sync_ci_space(space_id: str, pr_num: int, private: bool) -> None: | |
# Create a temporary space for CI if didn't exist | |
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
try: | |
create_repo( | |
ci_space_id, | |
repo_type="space", | |
space_sdk="docker", | |
private=private, | |
token=HF_TOKEN, | |
) | |
is_new = True | |
except HTTPError as err: | |
if err.response.status_code == 409: # already exists | |
is_new = False | |
else: | |
raise | |
# Download space codebase from PR revision | |
snapshot_path = Path( | |
snapshot_download( | |
repo_id=space_id, | |
revision=f"refs/pr/{pr_num}", | |
repo_type="space", | |
token=HF_TOKEN, | |
) | |
) | |
# Sync space codebase with PR revision | |
operations = [ # little aggressive but works | |
CommitOperationDelete(".", is_folder=True) | |
] | |
for filepath in snapshot_path.glob("**/*"): | |
if filepath.is_file(): | |
path_in_repo = str(filepath.relative_to(snapshot_path)) | |
# Upload all files without changes except for the README file | |
if path_in_repo == "README.md": | |
card = RepoCard.load(filepath) | |
setattr(card.data, "synced_sha", snapshot_path.name) # latest sha | |
path_or_fileobj = str(card).encode() | |
else: | |
path_or_fileobj = filepath | |
operations.append( | |
CommitOperationAdd( | |
path_in_repo=path_in_repo, path_or_fileobj=path_or_fileobj | |
) | |
) | |
create_commit( | |
repo_id=ci_space_id, | |
repo_type="space", | |
operations=operations, | |
commit_message=f"Sync CI Space with PR {pr_num}.", | |
token=HF_TOKEN, | |
) | |
# Post a comment on the PR | |
notify_pr(space_id=space_id, pr_num=pr_num, action="create" if is_new else "update") | |
def delete_ci_space(space_id: str, pr_num: int) -> None: | |
# Delete | |
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
delete_repo(repo_id=ci_space_id, repo_type="space", token=HF_TOKEN) | |
# Notify about deletion | |
notify_pr(space_id=space_id, pr_num=pr_num, action="delete") | |
def notify_pr( | |
space_id: str, pr_num: int, action: Literal["create", "update", "delete"] | |
) -> None: | |
ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
if action == "create": | |
comment = NOTIFICATION_TEMPLATE_CREATE.format(ci_space_id=ci_space_id) | |
elif action == "update": | |
comment = NOTIFICATION_TEMPLATE_UPDATE.format(ci_space_id=ci_space_id) | |
elif action == "delete": | |
comment = NOTIFICATION_TEMPLATE_DELETE | |
else: | |
raise ValueError(f"Status {action} not handled.") | |
comment_discussion( | |
repo_id=space_id, | |
repo_type="space", | |
discussion_num=pr_num, | |
comment=comment, | |
token=HF_TOKEN, | |
) | |
def _get_ci_space_id(space_id: str, pr_num: int) -> str: | |
return f"{space_id}-ci-pr-{pr_num}" | |
NOTIFICATION_TEMPLATE_CREATE = """\ | |
Hey there! | |
Following the creation of this PR, a temporary test Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been launched. | |
Any changes pushed to this PR will be synced with the test Space. | |
(This is an automated message) | |
""" | |
NOTIFICATION_TEMPLATE_UPDATE = """\ | |
Hey there! | |
Following new commits that happened in this PR, the temporary test Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been updated. | |
(This is an automated message) | |
""" | |
NOTIFICATION_TEMPLATE_DELETE = """\ | |
Hey there! | |
PR is now merged/closed. The temporary test Space has been deleted. | |
(This is an automated message) | |
""" | |
app.ready() | |