Wauplin HF staff commited on
Commit
fd92af7
1 Parent(s): 84210ae

add recovery mode

Browse files
Files changed (1) hide show
  1. src/gradio_space_ci/webhook.py +34 -13
src/gradio_space_ci/webhook.py CHANGED
@@ -1,6 +1,7 @@
1
  import os
2
  import uuid
3
  import warnings
 
4
  from pathlib import Path
5
  from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
6
 
@@ -20,6 +21,7 @@ from huggingface_hub import (
20
  get_repo_discussions,
21
  get_space_runtime,
22
  get_space_variables,
 
23
  request_space_hardware,
24
  request_space_storage,
25
  snapshot_download,
@@ -126,9 +128,38 @@ def configure_space_ci(
126
  server = WebhooksServer(ui=blocks, webhook_secret=WEBHOOK_SECRET)
127
  server.add_webhook()(trigger_ci_on_pr)
128
  configure_webhook_on_hub()
 
 
 
129
  return server
130
 
131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  ###
133
  # Define webhook on the Hub logic
134
  ###
@@ -173,12 +204,7 @@ async def trigger_ci_on_pr(payload: WebhookPayload, task_queue: BackgroundTasks)
173
  ):
174
  if not is_pr_synced(space_id=space_id, pr_num=payload.discussion.num):
175
  # New PR! Sync task scheduled
176
- task_queue.add_task(
177
- sync_ci_space,
178
- space_id=space_id,
179
- pr_num=payload.discussion.num,
180
- private=payload.repo.private,
181
- )
182
  has_task = True
183
  elif (
184
  # Means "a PR has been merged or closed"
@@ -204,12 +230,7 @@ async def trigger_ci_on_pr(payload: WebhookPayload, task_queue: BackgroundTasks)
204
  if discussion.is_pull_request and discussion.status == "open":
205
  if not is_pr_synced(space_id=space_id, pr_num=discussion.num):
206
  # Found a PR that is not yet synced
207
- task_queue.add_task(
208
- sync_ci_space,
209
- space_id=space_id,
210
- pr_num=discussion.num,
211
- private=payload.repo.private,
212
- )
213
  has_task = True
214
 
215
  if has_task:
@@ -240,7 +261,7 @@ def is_pr_synced(space_id: str, pr_num: int) -> bool:
240
  return last_synced_sha == last_pr_sha
241
 
242
 
243
- def sync_ci_space(space_id: str, pr_num: int, private: bool) -> None:
244
  print(f"New task: sync ephemeral env for {space_id} (PR {pr_num})")
245
  ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num)
246
 
 
1
  import os
2
  import uuid
3
  import warnings
4
+ from concurrent.futures import ThreadPoolExecutor
5
  from pathlib import Path
6
  from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
7
 
 
21
  get_repo_discussions,
22
  get_space_runtime,
23
  get_space_variables,
24
+ repo_exists,
25
  request_space_hardware,
26
  request_space_storage,
27
  snapshot_download,
 
128
  server = WebhooksServer(ui=blocks, webhook_secret=WEBHOOK_SECRET)
129
  server.add_webhook()(trigger_ci_on_pr)
130
  configure_webhook_on_hub()
131
+
132
+ # Recover missed webhooks
133
+ recover_after_restart(space_id=SPACE_ID)
134
  return server
135
 
136
 
137
+ ###
138
+ # Recovery logic
139
+ ###
140
+ # Check if there are any PRs that need to be synced.
141
+ # We might have missed some events while the server was down.
142
+ # => called once at startup (see configure_space_ci)
143
+
144
+ background_pool = ThreadPoolExecutor(max_workers=1)
145
+
146
+
147
+ def recover_after_restart(space_id: str) -> None:
148
+ for discussion in get_repo_discussions(repo_id=space_id, repo_type="space"):
149
+ if discussion.is_pull_request:
150
+ if discussion.status == "open":
151
+ if not is_pr_synced(space_id=space_id, pr_num=discussion.num):
152
+ # Found a PR that is not yet synced
153
+ print(f"Recovery. Found an open PR that is not synced: {discussion.url}. Syncing it.")
154
+ background_pool.submit(sync_ci_space, space_id=space_id, pr_num=discussion.num)
155
+ if discussion.status == "merged" or discussion.status == "closed":
156
+ ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=discussion.num)
157
+ if repo_exists(repo_id=ci_space_id, repo_type="space"):
158
+ # Found a PR for which the CI space has not been deleted
159
+ print(f"Recovery. Found a closed PR with an active CI space: {discussion.url}. Deleting it.")
160
+ background_pool.submit(delete_ci_space, space_id=space_id, pr_num=discussion.num)
161
+
162
+
163
  ###
164
  # Define webhook on the Hub logic
165
  ###
 
204
  ):
205
  if not is_pr_synced(space_id=space_id, pr_num=payload.discussion.num):
206
  # New PR! Sync task scheduled
207
+ task_queue.add_task(sync_ci_space, space_id=space_id, pr_num=payload.discussion.num)
 
 
 
 
 
208
  has_task = True
209
  elif (
210
  # Means "a PR has been merged or closed"
 
230
  if discussion.is_pull_request and discussion.status == "open":
231
  if not is_pr_synced(space_id=space_id, pr_num=discussion.num):
232
  # Found a PR that is not yet synced
233
+ task_queue.add_task(sync_ci_space, space_id=space_id, pr_num=discussion.num)
 
 
 
 
 
234
  has_task = True
235
 
236
  if has_task:
 
261
  return last_synced_sha == last_pr_sha
262
 
263
 
264
+ def sync_ci_space(space_id: str, pr_num: int) -> None:
265
  print(f"New task: sync ephemeral env for {space_id} (PR {pr_num})")
266
  ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num)
267