taskManager / task /_0上传遮罩.py
rogerxavier's picture
Update task/_0上传遮罩.py
6b71423 verified
raw
history blame
1.83 kB
#批量上传
import os
import requests
#->void
def upload_mask_to_space(baseUrl:str,mask_path:str,fake_headers:dict=None):
# 创建一个Session对象
session = requests.Session()
if fake_headers is not None:
session.headers = fake_headers
delete_url = baseUrl +"/deleteFiles"
directory_clear_list = ["manga", "manga1", "manga12", "output", "mp3_out", "mp4_out", "cover", "mask"]
for directory in directory_clear_list:
response = session.delete(delete_url, params={"directory": directory})
if response.status_code == 200:
print(response.text)
else:
print("请求失败,状态码:", response.status_code)
print("请求失败,状态码:", response.text)
upload_url = baseUrl +'/getOriginalMangaList'
# 获取当前目录的下的全部图片用于上传
img_path = mask_path
subdir_path = os.path.join(os.getcwd(), img_path)
image_files = []
for root, dirs, files in os.walk(subdir_path):
for file in files:
if file.endswith(".jpg") or file.endswith(".png"):
image_files.append(os.path.relpath(os.path.join(root, file)))
# 转换为上传格式并上传
upload_files = []
for image_path in image_files:
upload_files.append(("images", (image_path, open(image_path, "rb"), "image/jpeg")))
# 指定mask目录保存遮照图片结果
data = {
'save_path': "/mask"
}
response = session.post(upload_url, files=upload_files, params=data)
print(response.text)
if __name__ == '__main__':
#upload_mask_to_space(baseUrl='https://rogerxavier-moviepy-with-manga-test.hf.space',mask_path='mask')
upload_mask_to_space(baseUrl='https://rogerxavier-moviepy-with-manga-test.hf.space', mask_path='mask')#破案了,必须https