File size: 11,298 Bytes
b9d0462
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import json
import os
import random
from zipfile import ZipFile
import zipfile
import gradio as gr
from config import *
import shutil
from PIL import Image#上传保存遮罩和比较遮罩width height能否和章节图片匹配的时候用
from taskMap import *


def load_config():
    return get_variables()

class configData():
    def __init__(self):
        self.data = load_config()
    def update(self):
        self.data = load_config()
        print(self.data)

configData = configData()#复用实例


# 解压缩函数
def unzip_file(file_path, extract_path):
    with zipfile.ZipFile(file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)

# 上传并解压缩函数
#upload_and_unzip(file:"文件",extract_path:"解压路径")->"void 成功提示":
def upload_and_unzip(file_obj, extract_path):
    print("执行上传解压函数")
    ##这样写如果新的多,那么替换为多的一方,如果新的少,也只会替换已经存在的部分--->符合预期想法
    with ZipFile(file_obj.name) as zfile:
        zfile.extractall(extract_path)
    return "File uploaded and extracted successfully to "+extract_path

#构造函数获取当前目录file list信息-不放config进行返回是为了避免引入旧值
def update_file_info(root_dir):
    info = {}
    for root, dirs, files in os.walk(root_dir):
        info[root] = {
            "directories": dirs,
            "files": files
        }
        for dir_name in dirs:
            update_file_info(os.path.join(root, dir_name))
    return info

def random_chapter():
    # for root, info in configData.data['file_info'].items():
    #手动更新避免引入旧值-成功
    file_info = update_file_info(configData.data['current_dir'])
    configData.data['file_info'] = file_info
    for root, info in configData.data['file_info'].items():
        if root.startswith(configData.data['manga_abs_dir']):
            for directory in info["directories"]:
                if any(file.endswith('.jpg') or file.endswith('.png')  for file in os.listdir(os.path.join(root, directory))):
                    chapter_path = os.path.join(root, directory)
                    # 删除章节目录
                    print("章节目录是:",chapter_path)

                    #获取标题
                    # 将路径转换为标题
                    folders = chapter_path.split(os.sep)
                    if len(folders) >= 2:
                        chapter_title = " ".join([folders[-2], folders[-1]]) # 获取倒数第二个和倒数第一个目录名作为标题
                    else:
                        chapter_title = chapter_path  # 如果目录层级不足2层,直接使用路径作为标题

                    # 获取标题
                    return  chapter_title,chapter_path#返回章节标题和章节目录给任务-1批量顺序上传原始图片到服务器manga.

    return None,None

def list_files():
    return configData.data['file_info']


def save_mask(mask_file, output_dir):
    #上传遮罩保存到mask下面,名称随意
    # 检查遮罩文件是否存在
    # 构造保存文件的路径-保证linux和windows都能转义
    save_path = os.path.join(output_dir, '0.jpg')
    # 创建目录(如果目录不存在)
    os.makedirs(output_dir, exist_ok=True)

    # 使用with open方式保存文件
    mask_file_img = mask_file #这个是Image.open后的对象
    mask_file_img.save(save_path)
    return "文件保存成功到:" + str(save_path)


def update_config_json(newConfig:str):
    newConfig = json.loads(newConfig) #转换str为dict
    with open(configData.data['current_config_json'], "w") as file:
        file.write(json.dumps(newConfig, indent=4, ensure_ascii=False))  # 指定indent参数来保持JSON格式
    #返回新的json文件内容
    with open(configData.data['current_config_json'], "r") as f:
        content =f.read()  # 读取文件内容
    return content


def is_asp_task_valid()->bool:
    # 启动状态检测 ->
    # 1config允许定时任务   2 mask目录下齐备 3 manga_all目录下有可用素材  4 mask的width height和素材匹配
    # 5应当将config写入文件而不是py随时可以修改,以防ck账号被封等情况下不发送->同时删除不可用账号 -全部space中账号反馈不可用的时候停止定时任务
    # 6应该避免任务扎堆进行,对ocr space造成负担
    if configData.data['allow_scheduler'] == False:
        print("当前配置不允许定时任务,停止定时任务")
        return False
    chapter_title ,cur_chapter= random_chapter()
    if cur_chapter == None:
        print("当前没有可用章节了,停止定时任务")
        return False
    mask_file_path = os.path.join(configData.data['mask_dir'], '0.jpg')
    if not os.path.exists(mask_file_path):
        print("遮罩文件不存在,停止定时任务")
        return False
    # 读取遮罩图片长宽比较当前章节下面图片的长宽
    mask_image = Image.open(mask_file_path)
    random_chapter_image_name = random.choice(os.listdir(cur_chapter))
    random_chapter_image_path = os.path.join(cur_chapter, random_chapter_image_name)
    random_chapter_image = Image.open(random_chapter_image_path)
    if mask_image.size != random_chapter_image.size:
        print("遮罩大小和章节随机一个图片的大小不匹配,停止定时任务")
        return False
    return True

#定时任务流程函数集合 ->返回None说明非正常执行情况
def run_asp_task():
    if not is_asp_task_valid():
        return None
    #遍历bili_spaces中的space url并执行日常任务
    for space_url in  configData.data["bili_spaces"]:
        process_task_list("0 1 2",space_url)







#实现gradio接受上传压缩文件,解压到manga_all目录下面
with gr.Blocks() as mangaManager:
    #上传文件选择列表
    file_selected = gr.File(label="待上传压缩章节",file_types=['.zip', '.tar', '.gz'])
    #上传zip的解压保存路径或mask的地址(必选) 根据需要选择
    output_dir_gr = gr.Dropdown(label="上传zip的解压保存路径或mask的地址(必选)", choices=configData.data['default_values'])
    #压缩包上传按钮
    file_upload_btn = gr.Button("开始上传")
    #获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的
    someResult = gr.Textbox(label="获取按钮返回信息", type="text")
    # 设置按钮点击事件(调用上传解压函数,将压缩包内容解压到指定目录 默认是manga_all下面)
    file_upload_btn.click(fn=upload_and_unzip, inputs=[file_selected, output_dir_gr],outputs=someResult)

    #设置按钮查看json类型的listFiles信息

    filesInfoBtn = gr.Button("查看files信息")
    filesInfoBtn.click(fn=list_files,outputs=someResult)

    #设置mask上传遮罩按钮保存遮罩到mask目录-因为直接重启space利用docker上传会触发定时任务,也许造成不好结果
    # mask_selected = gr.inputs.Image(label="待上传遮罩", type='pil')
    mask_selected = gr.components.Image(label="待上传遮罩", type='pil')#代替inputs的新写法
    markUploadBtn = gr.Button("上传遮罩mask")
    markUploadBtn.click(fn=save_mask,inputs=[mask_selected, output_dir_gr],outputs=someResult)

    #设置按钮关联新config输入进行更新操作
    config_update_text = gr.Textbox(placeholder="输入新的JSON数据")#str类型
    config_update_btn = gr.Button("更新config.json数据")
    config_update_btn.click(fn=update_config_json, inputs=[config_update_text], outputs=someResult)


# 定义一个函数来处理输入的步骤转list然后执行,返回执行结果->taskResult/None
# gradio输入0 1 2 3就行,会自动转"0 1 2 3"然后给process_task_list处理
def process_task_list(input_task_str:str = None,baseUrl:str =None):
    if not is_asp_task_valid():
        print("任务基本条件不符合")
        return None
    if baseUrl is None:
        print("baseUrl不存在")
        return None
    if input_task_str is None:
        print("input_task_str不存在")
        return None
    chapter_title,cur_chapter_path  = random_chapter()
    task_results = {}#保存任务执行结果
    #临时变量记录是否上传了章节-上传了需要删除,而且只能在for循环任务结束后删除
    manga_has_uploaded = False


    print(input_task_str)# "a b c" - >['a', 'b', 'c']
    task_list = input_task_str.split(' ')
    for task in task_list:
        if task in task_functions:
            task_func = task_functions[task]#func
            task_param = tasks_params[task]#dict
            if "baseUrl" in task_param:
                task_param["baseUrl"] = baseUrl
            if "mask_path" in task_param:
                #使用后遮罩不删,否则维护成本太高
                task_param["mask_path"] = configData.data['mask_abs_dir']
            if "manga_path" in task_param:
                task_param["manga_path"] = cur_chapter_path#上传一个章节作为本次素材,保存到space的manga下面
                print("上传了章节:",cur_chapter_path,"下面提交后删除")
                manga_has_uploaded = True
                #执行了这个任务就要删除该章节

            if "bili_meta" in task_param:
                bili_meta_data["title"] = chapter_title
                task_param["bili_meta"] = bili_meta_data
            result = task_func(**task_param)#执行对应函数获取结果
            task_results[task] = "success" if result is None else result#保存对应函数执行结果
        else:
            print("该执行流程函数不存在")
            return None#立即推出循环结束函数
    if manga_has_uploaded is True:
        #如果上传章节任务得到了执行,那么删除
        shutil.rmtree(cur_chapter_path)  # 递归地删除指定路径的目录及其所有内容,包括文件和子目录 #删除应该在返回的时候删



    return task_results

# 创建一个输入块,接受str+空格类型的输入,比如设置默认值3只查看结果output.mp4

# 将按钮添加到任务管理器中
with gr.Blocks() as taskManager:
    # 任务需要 1:查看指定space的output video状态   2: 手动修改执行流程->比如上传后是发送还是查看output效果
    # 3:可以添加按钮跳转路由
    task_list_text = gr.components.Textbox(type="text", label="输入任务列表元素,用空格分隔(一般用来执首尾比如查看output状态)", lines=5)
    baseSpaceUrl = gr.components.Textbox(type="text", label="指定手动任务的执行容器地址", lines=3)
    # 获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的
    someResult = gr.components.Textbox(label="获取按钮返回信息", type="text")
    # 创建一个按钮块来触发处理函数
    taskBtn = gr.Button("处理列表")
    taskBtn.click(fn=process_task_list,inputs=[task_list_text, baseSpaceUrl],outputs=someResult)
    new_text = gr.components.Textbox(placeholder="敬请期待")  # str类型