Spaces:
Running
Running
import os | |
import re | |
import time | |
import requests | |
import schedule | |
import gradio as gr | |
import pandas as pd | |
from tqdm import tqdm | |
from functools import partial | |
from datetime import datetime, timedelta | |
# import json | |
# import random | |
# import string | |
TIMEOUT = 15 | |
DELAY = 1 | |
# def start_monitor(url: str): | |
# payload = { | |
# "data": ["", ""], | |
# "event_data": None, # 使用None来表示null | |
# "fn_index": 0, | |
# "trigger_id": 11, | |
# "session_hash": "".join( | |
# random.choice(string.ascii_lowercase) for _ in range(11) | |
# ), | |
# } | |
# response = requests.post(f"{url}/queue/join?", json=payload) | |
# # 检查请求是否成功 | |
# if response.status_code == 200: | |
# return "monitoring" | |
# return "running" | |
def add_six_hours(match): | |
datetime_str = match.group(0) | |
dt = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S") | |
dt_plus_six = dt + timedelta(hours=6) | |
return dt_plus_six.strftime("%Y-%m-%d %H:%M:%S") | |
def fix_datetime(text: str): | |
datetime_pattern = r"\b\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\b" | |
return re.sub(datetime_pattern, add_six_hours, text) | |
# def get_studios(username: str): | |
# # 请求负载 | |
# payload = { | |
# "PageNumber": 1, | |
# "PageSize": 1000, | |
# "Name": "", | |
# "SortBy": "gmt_modified", | |
# "Order": "desc", | |
# } | |
# try: | |
# # 发送PUT请求 | |
# response = requests.put( | |
# f"https://www.modelscope.cn/api/v1/studios/{username}/list", | |
# data=json.dumps(payload), | |
# headers={ | |
# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" | |
# }, | |
# timeout=TIMEOUT, | |
# ) | |
# # 检查请求是否成功 | |
# response.raise_for_status() | |
# # 解析JSON响应 | |
# spaces: list = response.json()["Data"]["Studios"] | |
# if spaces: | |
# studios = [] | |
# for space in spaces: | |
# studios.append( | |
# f"https://www.modelscope.cn/api/v1/studio/{username}/{space['Name']}/start_expired" | |
# ) | |
# return studios | |
# except requests.exceptions.Timeout as errt: | |
# print(f"请求超时: {errt}, retrying...") | |
# time.sleep(DELAY) | |
# return get_studios(username) | |
# except Exception as err: | |
# print(f"请求发生错误: {err}") | |
# return [] | |
def get_spaces(username: str): | |
try: | |
# 发送GET请求 | |
response = requests.get( | |
"https://huggingface.co/spaces-json", | |
params={"sort": "trending", "search": username}, | |
headers={ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537" | |
}, | |
timeout=TIMEOUT, | |
) | |
# 检查请求是否成功 | |
response.raise_for_status() | |
# 解析JSON响应 | |
spaces: list = response.json()["spaces"] | |
studios = [] | |
for space in spaces: | |
if space["author"] == username: | |
studios.append( | |
f"https://{space['id'].replace('/', '-').replace('_', '-')}.hf.space" | |
) | |
return studios | |
except requests.exceptions.Timeout as errt: | |
print(f"请求超时: {errt}, retrying...") | |
time.sleep(DELAY) | |
return get_spaces(username) | |
except Exception as err: | |
print(f"请求发生错误: {err}") | |
return [] | |
def activate_space(url: str): | |
status = "running" | |
try: | |
if ".hf.space" in url: | |
response = requests.get(url, timeout=TIMEOUT) | |
response.raise_for_status() | |
# if "-keep-spaces-active.hf.space" in url: | |
# status = start_monitor(url) | |
else: | |
response = requests.put(url, timeout=TIMEOUT) | |
response.raise_for_status() | |
print("Expired studio found, restarting...") | |
while ( | |
requests.get( | |
url.replace("/start_expired", "/status"), | |
timeout=TIMEOUT, | |
).json()["Data"]["Status"] | |
== "ExpiredCreating" | |
): | |
requests.get( | |
url.replace("/api/v1/studio/", "/studios/").replace( | |
"/start_expired", "" | |
), | |
timeout=TIMEOUT, | |
) | |
time.sleep(5) | |
except requests.exceptions.Timeout as e: | |
if ".hf.space" in url: | |
status = "restarting" | |
else: | |
print(f"Failed to activate {url} : {e}, retrying...") | |
return activate_space(url) | |
except requests.RequestException as e: | |
if ( | |
"500 Server Error:" in f"{e}" | |
and response.json()["Message"] == "studio is not expired" | |
): | |
status = "running" | |
else: | |
status = f"{e}" | |
except Exception as e: | |
status = f"{e}" | |
return status | |
def activate(hf_users: str, ms_users: str): | |
if not hf_users: | |
hf_users = os.getenv("hf_users") | |
if not ms_users: | |
ms_users = hf_users | |
# spaces, monitors, studios, output = [], [], [], [] | |
spaces, output = [], [] | |
hf_usernames = hf_users.split(";") | |
# ms_usernames = ms_users.split(";") | |
for user in tqdm(hf_usernames, desc="Collecting spaces..."): | |
username = user.strip() | |
if username: | |
spaces += get_spaces(username) | |
time.sleep(DELAY) | |
# for space in spaces: | |
# if "keep-spaces-active" in space: | |
# monitors.append(space) | |
# else: | |
# studios.append(space) | |
# spaces = monitors + studios | |
# for user in tqdm(ms_usernames, desc="Collecting studios..."): | |
# username = user.strip() | |
# if username: | |
# spaces += get_studios(username) | |
# time.sleep(DELAY) | |
for space in tqdm(spaces, desc="Activating spaces..."): | |
output.append( | |
{ | |
"space": space.split("//")[-1].replace( | |
"www.modelscope.cn/api/v1/studio/", "" | |
), | |
"status": activate_space(space), | |
} | |
) | |
time.sleep(DELAY) | |
print("Activation complete!") | |
return pd.DataFrame(output) | |
def monitor( | |
hf_users: str = os.getenv("hf_users"), | |
ms_users: str = os.getenv("hf_users"), | |
period=4, | |
): | |
if schedule.get_jobs(): | |
return | |
if not hf_users: | |
hf_users = os.getenv("hf_users") | |
if not ms_users: | |
ms_users = hf_users | |
print(f"监控开启中...每 {period} 小时触发") | |
fixed_activate = partial(activate, hf_users=hf_users, ms_users=ms_users) | |
schedule.every(period).hours.do(fixed_activate) | |
while True: | |
schedule.run_pending() | |
time.sleep(DELAY) | |
def listasks(): | |
jobs = schedule.get_jobs() | |
if jobs: | |
details = f"{jobs}".replace("[", "").replace("]", "") | |
return fix_datetime( | |
details.split("functools.")[0] + "(" + details.split(") (")[-1] | |
) | |
return "None" | |
if __name__ == "__main__": | |
# monitor(None, None) | |
with gr.Blocks() as demo: | |
gr.Interface( | |
title="Start keeping all spaces active periodically", | |
fn=monitor, | |
inputs=[ | |
# gr.Textbox( | |
# label="HuggingFace", | |
# placeholder="Usernames joint by ;", | |
# ), | |
# gr.Textbox( | |
# label="ModelScope", | |
# placeholder="Usernames joint by ;", | |
# ), | |
], | |
outputs="text", | |
flagging_mode="never", | |
live=True, | |
) | |
gr.Interface( | |
title="See current task status", | |
fn=listasks, | |
inputs=None, | |
outputs=gr.Textbox(label="Current task details"), | |
flagging_mode="never", | |
) | |
gr.Interface( | |
title="Test activation for all spaces once", | |
fn=activate, | |
inputs=[ | |
gr.Textbox( | |
label="HuggingFace", | |
placeholder="Usernames joint by ;", | |
), | |
gr.Textbox( | |
label="ModelScope", | |
placeholder="Usernames joint by ;", | |
), | |
], | |
outputs=gr.Dataframe(label="Activated spaces"), | |
flagging_mode="never", | |
) | |
demo.launch() | |