taskManager / utils.py
rogerxavier's picture
Update utils.py
9d4219f verified
raw
history blame
15.4 kB
import json
import os
import random
from zipfile import ZipFile
import zipfile
import gradio as gr
from config import *
import shutil
from PIL import Image#上传保存遮罩和比较遮罩width height能否和章节图片匹配的时候用
from taskMap import *
from collections import Counter#计数删除少部分size不同图片
def load_config():
return get_variables()
class configData():
def __init__(self):
self.data = load_config()
def update(self):
self.data = load_config()
#print(self.data)
configData = configData()#复用实例
# 解压缩函数
def unzip_file(file_path, extract_path):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
# 上传并解压缩函数
#upload_and_unzip(file:"文件",extract_path:"解压路径")->"void 成功提示":
def upload_and_unzip(file_obj, extract_path):
print("执行上传解压函数")
##这样写如果新的多,那么替换为多的一方,如果新的少,也只会替换已经存在的部分--->符合预期想法
with ZipFile(file_obj.name) as zfile:
zfile.extractall(extract_path)
return "File uploaded and extracted successfully to "+extract_path
#构造函数获取当前目录file list信息-不放config进行返回是为了避免引入旧值
def update_file_info(root_dir):
info = {}
for root, dirs, files in os.walk(root_dir):
info[root] = {
"directories": dirs,
"files": files
}
for dir_name in dirs:
update_file_info(os.path.join(root, dir_name))
return info
#删除章节或者mask
def delete_file(delete_dir:str)->str:
for root, dirs, files in os.walk(delete_dir, topdown=False):
for name in files:
file_path = os.path.join(root, name)
print("删除指定目录函数检查到文件,删除文件:", file_path)
os.remove(file_path)
for name in dirs:
dir_path = os.path.join(root, name)
print("删除指定目录函数检查到目录,删除目录:", dir_path)
os.rmdir(dir_path)
return "message:"+ "成功删除{delete_dir}目录下的所有文件".format(delete_dir=delete_dir)
def random_chapter():
# for root, info in configData.data['file_info'].items():
#手动更新避免引入旧值-成功
file_info = update_file_info(configData.data['current_dir'])
configData.data['file_info'] = file_info
for root, info in configData.data['file_info'].items():
if root.startswith(configData.data['manga_abs_dir']):
for directory in info["directories"]:
if any(file.endswith('.jpg') or file.endswith('.png') for file in os.listdir(os.path.join(root, directory))):
chapter_path = os.path.join(root, directory)
# 删除章节目录
print("章节目录是:",chapter_path)
#获取标题
# 将路径转换为标题
folders = chapter_path.split(os.sep)
if len(folders) >= 2:
chapter_title = " ".join([folders[-2], folders[-1]]) # 获取倒数第二个和倒数第一个目录名作为标题
else:
chapter_title = chapter_path # 如果目录层级不足2层,直接使用路径作为标题
# 获取标题
return chapter_title,chapter_path#返回章节标题和章节目录给任务-1批量顺序上传原始图片到服务器manga.
return None,None
def list_files():
#查看files信息的时候需要更新configData
configData.update()
return configData.data['file_info']
def save_mask(mask_file, output_dir):
#上传遮罩保存到mask下面,名称随意
# 检查遮罩文件是否存在
# 构造保存文件的路径-保证linux和windows都能转义
save_path = os.path.join(output_dir, '0.jpg')
# 创建目录(如果目录不存在)
os.makedirs(output_dir, exist_ok=True)
# 使用with open方式保存文件
mask_file_img = mask_file #这个是Image.open后的对象
mask_file_img.save(save_path)
return "文件保存成功到:" + str(save_path)
def update_config_json(newConfig:str):
newConfig = json.loads(newConfig) #转换str为dict
with open(configData.data['current_config_json'], "w") as file:
file.write(json.dumps(newConfig, indent=4, ensure_ascii=False)) # 指定indent参数来保持JSON格式
#返回新的json文件内容
with open(configData.data['current_config_json'], "r") as f:
content =f.read() # 读取文件内容
return content
def zero_pad(s, length):
return s.zfill(length)
# 按照排序删除广告文件和片头
def delete_first_and_last_image_in_subdirectories(root_dir:str,deleted_list:list):
for root, dirs, files in os.walk(root_dir):
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
images = [f for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f))]
images.sort(
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3))
for index in deleted_list:
if index < len(images) and index >= -len(images):
image_path = os.path.join(dir_path, images[index])
print("依据默认config,删除章节片头和结尾广告,可修改默认配置[0,1,-1]",image_path)
os.remove(image_path)
#void 删除当前chapter中部分size不同的少类图片,还有就是删除片头和广告,默认配置[0,1,-1],这个只对cur_chapter执行就不会多删了
def remove_different_size_images(chapter_path:str):
image_dir = chapter_path
# 获取目录下所有图片文件名
image_files = [f for f in os.listdir(image_dir) if os.path.isfile(os.path.join(image_dir, f))]
# 获取图片尺寸
image_sizes = []
for image_file in image_files:
with Image.open(os.path.join(image_dir, image_file)) as img:
image_sizes.append(img.size)
# 统计图片尺寸出现次数
size_counter = Counter(image_sizes)
# 找到占多数的章节图片尺寸
most_common_size = size_counter.most_common(1)[0][0]
# 删除其他尺寸的图片
for image_file, image_size in zip(image_files, image_sizes):
if image_size != most_common_size:
print("发现图片:",image_file,"大小不同于章节其他图片,认为是杂类图片,进行删除")
os.remove(os.path.join(image_dir, image_file))
# 按照默认配置删除片头和广告
delete_first_and_last_image_in_subdirectories(root_dir=chapter_path,deleted_list=configData.data['default_chapter_delete'])
def is_asp_task_valid()->bool:
# 启动状态检测 ->
# 1config允许定时任务 2 mask目录下齐备 3 manga_all目录下有可用素材 4 mask的width height和素材匹配
# 5应当将config写入文件而不是py随时可以修改,以防ck账号被封等情况下不发送->同时删除不可用账号 -全部space中账号反馈不可用的时候停止定时任务
# 6应该避免任务扎堆进行,对ocr space造成负担
print("判断任务条件时对config进行更新")
configData.update()
if configData.data['allow_scheduler'] == False:
print("当前配置不允许定时任务,停止定时任务")
return False
chapter_title ,cur_chapter= random_chapter()
if cur_chapter == None:
print("当前没有可用章节了,停止定时任务")
return False
print("开始删除当前chapter中部分size不同的少类图片")
remove_different_size_images(chapter_path=cur_chapter)
mask_file_path = os.path.join(configData.data['mask_dir'], '0.jpg')
if not os.path.exists(mask_file_path):
print("遮罩文件不存在,停止定时任务")
return False
# 读取遮罩图片长宽比较当前章节下面图片的长宽
mask_image = Image.open(mask_file_path)
# 获取指定目录下所有文件
all_files = os.listdir(cur_chapter)
# 过滤出图片文件
image_files = [file for file in all_files if file.endswith(('.jpg', '.jpeg', '.png', '.gif'))]
if image_files:
# 随机选择一个图片文件
random_chapter_image_name = random.choice(image_files)
random_chapter_image_path = os.path.join(cur_chapter, random_chapter_image_name)
# 打开选定的图片文件
random_chapter_image = Image.open(random_chapter_image_path)
# 现在random_chapter_image中包含了选中的随机图片文件
if mask_image.size != random_chapter_image.size:
print("遮罩大小和章节随机图片的大小不匹配,继续定时任务,此处只做记录,因为前面已经删除了少类图片,且space也可以微调mask")
print("遮罩大小是:",mask_image.size,"随机章节图片大小是:",random_chapter_image.size)
return True
else:
print("No image files found in the directory.")
return False
return True
#定时任务流程函数集合 ->返回None说明非正常执行情况
def run_asp_task():
if not is_asp_task_valid():
return None
#遍历bili_spaces中的space url并执行日常任务
for space_url in configData.data["bili_spaces"]:
process_task_list("0 1 2",space_url)
#实现gradio接受上传压缩文件,解压到manga_all目录下面
with gr.Blocks() as mangaManager:
#上传文件选择列表
file_selected = gr.File(label="待上传压缩章节",file_types=['.zip', '.tar', '.gz'])
#上传zip的解压保存路径或mask的地址(必选) 根据需要选择
output_dir_gr = gr.Dropdown(label="zip的解压保存路径或mask的地址/删除地址(必选)", choices=configData.data['default_values'])
#压缩包上传按钮
file_upload_btn = gr.Button("开始上传")
#获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的
someResult = gr.Textbox(label="获取按钮返回信息", type="text")
# 设置按钮点击事件(调用上传解压函数,将压缩包内容解压到指定目录 默认是manga_all下面)
file_upload_btn.click(fn=upload_and_unzip, inputs=[file_selected, output_dir_gr],outputs=someResult)
#设置按钮查看json类型的listFiles信息
filesInfoBtn = gr.Button("查看files信息")
filesInfoBtn.click(fn=list_files,outputs=someResult)
#设置mask上传遮罩按钮保存遮罩到mask目录-因为直接重启space利用docker上传会触发定时任务,也许造成不好结果
# mask_selected = gr.inputs.Image(label="待上传遮罩", type='pil')
mask_selected = gr.components.Image(label="待上传遮罩", type='pil')#代替inputs的新写法
markUploadBtn = gr.Button("上传遮罩mask")
markUploadBtn.click(fn=save_mask,inputs=[mask_selected, output_dir_gr],outputs=someResult)
#设置按钮关联新config输入进行更新操作
config_update_text = gr.Textbox(placeholder="输入新的JSON数据")#str类型
config_update_btn = gr.Button("更新config.json数据")
config_update_btn.click(fn=update_config_json, inputs=[config_update_text], outputs=someResult)
#删除指定目录文件 -对上传保存的文件进行删除 ->比如章节和mask不匹配,但是一个个删太麻烦,干脆一起
file_delete_btn = gr.Button("开始删除")
file_delete_btn.click(fn=delete_file, inputs=[output_dir_gr], outputs=someResult)
# 定义一个函数来处理输入的步骤转list然后执行,返回执行结果->taskResult/None
# gradio输入0 1 2 3就行,会自动转"0 1 2 3"然后给process_task_list处理
def process_task_list(input_task_str:str = None,baseUrl:str =None):
if not is_asp_task_valid():
print("任务基本条件不符合")
return None
if baseUrl is None:
print("baseUrl不存在")
return None
if input_task_str is None:
print("input_task_str不存在")
return None
chapter_title,cur_chapter_path = random_chapter()
task_results = {}#保存任务执行结果
#临时变量记录是否上传了章节-上传了需要删除,而且只能在for循环任务结束后删除
manga_has_uploaded = False
print(input_task_str)# "a b c" - >['a', 'b', 'c']
task_list = input_task_str.split(' ')
for task in task_list:
if task in task_functions:
task_func = task_functions[task]#func
task_param = tasks_params[task]#dict
if "baseUrl" in task_param:
task_param["baseUrl"] = baseUrl
if "mask_path" in task_param:
#使用后遮罩不删,否则维护成本太高
task_param["mask_path"] = configData.data['mask_abs_dir']
if "manga_path" in task_param:
task_param["manga_path"] = cur_chapter_path#上传一个章节作为本次素材,保存到space的manga下面
print("上传了章节:",cur_chapter_path,"下面提交后删除")
manga_has_uploaded = True
#执行了这个任务就要删除该章节
if "allow_submit" in task_param:
#采用config.json中的是否允许上传
task_param["allow_submit"] = configData.data['allow_submit']
print("当前是否允许上传bili:",configData.data['allow_submit'])
if "bili_meta" in task_param:
bili_meta_data["title"] = chapter_title
task_param["bili_meta"] = bili_meta_data
result = task_func(**task_param)#执行对应函数获取结果
task_results[task] = "success" if result is None else result#保存对应函数执行结果
else:
print("该执行流程函数不存在")
return None#立即推出循环结束函数
if manga_has_uploaded is True:
#如果上传章节任务得到了执行,那么删除
shutil.rmtree(cur_chapter_path) # 递归地删除指定路径的目录及其所有内容,包括文件和子目录 #删除应该在返回的时候删
return task_results
# 创建一个输入块,接受str+空格类型的输入,比如设置默认值3只查看结果output.mp4
# 将按钮添加到任务管理器中
with gr.Blocks() as taskManager:
# 任务需要 1:查看指定space的output video状态 2: 手动修改执行流程->比如上传后是发送还是查看output效果
# 3:可以添加按钮跳转路由
task_list_text = gr.components.Textbox(type="text", label="输入任务列表元素,用空格分隔(一般用来执首尾比如查看output状态)", lines=5)
baseSpaceUrl = gr.components.Textbox(type="text", label="指定手动任务的执行容器地址", lines=3)
# 获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的
someResult = gr.components.Textbox(label="获取按钮返回信息", type="text")
# 创建一个按钮块来触发处理函数
taskBtn = gr.Button("处理列表")
taskBtn.click(fn=process_task_list,inputs=[task_list_text, baseSpaceUrl],outputs=someResult)
new_text = gr.components.Textbox(placeholder="敬请期待") # str类型