import hashlib import concurrent.futures import time import http.server import socketserver import threading import psutil import traceback import sys import os port = 7860 wait_second = 1 update_when_run_scripts = True # True 时启动时更新, False 启动后300s启动更新 script_name = "potato.py" script_url = "https://potato.fuckgyz.eu.org/potato.py" import_str = "psutil==5.9.1 requests==2.31.0 selenium==4.10.0" user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1788.0" task_list = [ [ "get", { "process": 1, "threads": 1, "url": "http://6.6.6.6", "headers": { "User-Agent": user_agent, # "Referer": "https://www.google.com", # "Origin": "https://www.google.com", # "Host": "www.google.com", # "Cookies": "", }, "data": { # "key": "value", } } ], [ "get", { "process": 8, "threads": 128, "url": "https://cache.fuckgyz.eu.org/100m.bin", "headers": { "User-Agent": user_agent, # "Referer": "https://www.google.com", # "Origin": "https://www.google.com", # "Host": "www.google.com", # "Cookies": "", }, "data": { # "key": "value", } } ], # [ # "get", # { # "process": 8, # "threads": 128, # "url": "http://speed.cloudflare.com/__down?bytes=1145141919810", # "headers": { # "User-Agent": user_agent, # # "Referer": "https://www.google.com", # # "Origin": "https://www.google.com", # # "Host": "www.google.com", # # "Cookies": "", # }, # "data": { # # "key": "value", # } # } # ], # [ # "get", # { # "process": 1, # "threads": 1, # "url": "https://www.google.com", # "headers": { # "User-Agent": user_agent, # # "Referer": "https://www.google.com", # # "Origin": "https://www.google.com", # # "Host": "www.google.com", # # "Cookies": "", # }, # "data": { # # "key": "value", # } # } # ], ] task_get_data = { "process": 8, "threads": 128, "url": "http://speed.cloudflare.com/__down?bytes=1145141919810", "headers": { "User-Agent": user_agent, # "Referer": "https://www.google.com", # "Origin": "https://www.google.com", # "Host": "www.google.com", # "Cookies": "", }, "data": { # "key": "value", } } task_post_data = { "process": 8, "threads": 128, "url": "https://www.google.com", "headers": { "User-Agent": user_agent, # "Referer": "https://www.google.com", # "Origin": "https://www.google.com", # "Host": "www.google.com", # "Cookies": "", }, "data": { # "key": "value", } } task_browser_data = { "process": 8, "threads": 1, "url": "https://www.google.com", # "headers": { # "User-Agent": user_agent, # # "Referer": "https://www.google.com", # # "Origin": "https://www.google.com", # # "Host": "www.google.com", # # "Cookies": "", # }, # "data": { # # "key": "value", # } } task_workers = [] success = 0 failed = 0 total = 0 net_s1 = "" net_s2 = "" net_s3 = "" net_s4 = "" net_s5 = "" def restart(): print("正在重启脚本") # 结束子进程 for task in task_workers: print(f"结束子进程:{str(task_workers.index(task))}") try: task.terminate() except: traceback.print_exc() print(f"子进程结束失败!索引:{str(task_workers.index(task))}") pass print(f"子进程:{str(task_workers.index(task))} 结束成功") # 重启脚本 print("开始重启主线程") python = sys.executable os.execl(python, python, *sys.argv) def update(): import requests i = 0 while True: try: if i == 0 and not update_when_run_scripts: time.sleep(300) i += 1 print("开始检查更新") response = requests.get(script_url) sha256_hash_old = hashlib.sha256(open(script_name, 'rb').read()).hexdigest() sha256_hash_new = hashlib.sha256(response.content).hexdigest() if sha256_hash_old == sha256_hash_new: print(f"{str(sha256_hash_old)} == {str(sha256_hash_new)}") print("已是最新版本") time.sleep(60) continue print("发现新版本代码,开始更新") print("写入到:" + script_name) with open(script_name, 'wb') as file: file.write(response.content) print("写入成功\n重新运行脚本") restart() except: traceback.print_exc() print("更新出现错误") time.sleep(60) def import_init(): try: import hashlib import threading import time import traceback import requests import sys import os import selenium except: try: print("部分依赖未安装,正在安装依赖") os.system(f"pip install {import_str}") print("依赖安装完成") except: print("依赖安装失败,请检查报错") def get_worker(i, task_data): import requests def get(): global success global failed global total with requests.get(task_data["url"], headers=task_data["headers"], data=task_data["data"] if task_data["data"] != {} else None, stream=True) as r: if r.status_code == requests.codes.ok: for _ in r.iter_content(chunk_size=1048576): pass total += 1 if r.status_code == 200: success += 1 else: failed += 1 r.close() def monitor(): while True: print(f"Worker: {str(i)}, Success: {str(success)}, Failed: {str(failed)}, Total: {str(total)}") print(f"Task URL: " + task_data["url"]) time.sleep(1) print(f"get任务{str(i)}开始运行") threading.Thread(target=monitor).start() while True: try: executor = concurrent.futures.ThreadPoolExecutor(max_workers=task_data["threads"]) while True: if executor._work_queue.qsize() < executor._max_workers: executor.submit(get) else: time.sleep(0.5) except: traceback.print_exc() print("get任务运行出现错误,重试...") time.sleep(1) def post_worker(i, task_data): import requests def post(): global success global failed global total with requests.post(task_data["url"], headers=task_data["headers"], data=task_data["data"] if task_data["data"] != {} else None, stream=True) as r: if r.status_code == requests.codes.ok: for _ in r.iter_content(chunk_size=1048576): pass total += 1 if r.status_code == 200: success += 1 else: failed += 1 r.close() def monitor(): while True: print(f"Worker: {str(i)}, Success: {str(success)}, Failed: {str(failed)}, Total: {str(total)}") print(f"Task URL: " + task_data["url"]) time.sleep(1) print(f"post任务{str(i)}开始运行") threading.Thread(target=monitor).start() while True: try: executor = concurrent.futures.ThreadPoolExecutor(max_workers=task_data["threads"]) while True: if executor._work_queue.qsize() < executor._max_workers: executor.submit(post) else: time.sleep(0.5) except: traceback.print_exc() print("post任务运行出现错误,重试...") time.sleep(1) # def browser_worker(): # import requests # # while True: # try: # response = requests.get(task_get_data["url"], headers=task_get_data["headers"]) # return # except: # traceback.print_exc() # print("get任务运行出现错误") # time.sleep(1) def do_task_worker(i, task): import multiprocessing global task_workers print("开始运行线程:" + str(i)) kv = { "get": get_worker, "post": post_worker, # "browser": browser_worker, } p_pool = [] for i in range(task[1]["process"]): try: print("开始运行任务:") print(task[1]) p = multiprocessing.Process(target=kv[task[0]], args=(i, task[1])) except: print(f"未知的任务类型:{task[0]},请重新配置") return task_workers.append(p) p_pool.append(p) p.start() for p in p_pool: p.join() print(f"线程:{str(i)}, 任务运行完成") def monitor_network(): global net_s1 global net_s2 global net_s3 global net_s4 global net_s5 last_sent_bytes = psutil.net_io_counters().bytes_sent last_recv_bytes = psutil.net_io_counters().bytes_recv while True: time.sleep(wait_second) current_sent_bytes = psutil.net_io_counters().bytes_sent current_recv_bytes = psutil.net_io_counters().bytes_recv sent_bytes = current_sent_bytes - last_sent_bytes sent_speed = psutil._common.bytes2human(int(sent_bytes / wait_second)) + "/s" recv_bytes = current_recv_bytes - last_recv_bytes recv_speed = psutil._common.bytes2human(int(recv_bytes / wait_second)) + "/s" last_sent_bytes = current_sent_bytes last_recv_bytes = current_recv_bytes net_s1 = f"Total network traffic: {psutil._common.bytes2human(current_sent_bytes + current_recv_bytes)}" net_s2 = f"Sent traffic in the last {str(wait_second)} seconds: " + psutil._common.bytes2human(sent_bytes) net_s3 = "Sent speed: " + sent_speed net_s4 = f"Received traffic in the last {str(wait_second)} seconds: {psutil._common.bytes2human(recv_bytes)}" net_s5 = "Received speed: " + recv_speed print("\n" + net_s1) print(net_s2) print(net_s3) print(net_s4) print(net_s5 + "\n") def generate_tasks_page(): s = "" for i in task_list: s += i[0] + ": " + i[1]["url"] + "
" return s class MyRequestHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() po_list_bytes = generate_tasks_page().encode(encoding='utf-8') net_s1_bytes = net_s1.encode(encoding='utf-8') net_s2_bytes = net_s2.encode(encoding='utf-8') net_s3_bytes = net_s3.encode(encoding='utf-8') net_s4_bytes = net_s4.encode(encoding='utf-8') net_s5_bytes = net_s5.encode(encoding='utf-8') self.wfile.write( b"

This is Potato Page!

There is a po-ta-to list:

" + po_list_bytes + b"

" + net_s1_bytes + b"
" + net_s2_bytes + b"
" + net_s3_bytes + b"
" + net_s4_bytes + b"
" + net_s5_bytes + b"

") def http_server_init(): with socketserver.TCPServer(("", port), MyRequestHandler) as httpd: print("Server started at http://localhost:{}".format(port)) httpd.serve_forever() def init(): threading.Thread(target=update).start() threading.Thread(target=http_server_init).start() threading.Thread(target=monitor_network).start() import_init() for i in range(len(task_list)): threading.Thread(target=do_task_worker, args=(i, task_list[i])).start() print("初始化成功") while True: time.sleep(60) if __name__ == "__main__": while True: try: init() print("出现错误,重新运行") time.sleep(1) except: traceback.print_exc() print("出现错误,重新运行") time.sleep(1)