File size: 6,686 Bytes
76c921d 61e3d87 76c921d 61e3d87 76c921d 61e3d87 76c921d 61e3d87 76c921d cfb1afd 1d54d21 cfb1afd 61e3d87 76c921d 61e3d87 76c921d 61e3d87 76c921d 61e3d87 76c921d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
from seleniumwire import webdriver
from selenium.webdriver.chrome.options import Options
from fastapi import FastAPI, Request
import uvicorn
import time
import json
from urllib.parse import unquote, urlparse, quote_plus
import base64
app = FastAPI()
# 解析cookie字符串为字典
def convert_cookies_to_dict(cookies):
cookie_items = cookies.split("; ")
parsed_cookies = {item.split("=", 1)[0].strip(): item.split("=", 1)[1].strip() if "=" in item else "" for item in cookie_items}
return parsed_cookies#
# 获取域名字符串的根域
def get_root_domain(url):
parsed_url = urlparse(url)
domain = parsed_url.netloc
parts = domain.split('.')
if len(parts) > 1:
return '.'.join(parts[-2:])
else:
return domain
# 尝试对字符串做 json 解析,如果失败则返回原字符串
def try_json_decode(headers):
try:
return json.loads(str(headers))
except Exception as e:
return headers
@app.get("/")
def main():
return {"code": 200,"msg":"Success"}
@app.get("/chrome")
def chrome(url:str=None,wait:int=5,header:str=None,cookie_string:str=None,cookie_json_base64:str=None):
# 开启捕获HAR数据功能,允许使用 driver.har 进行检索
seleniumwire_options = {
'enable_har': True
}
# 必须有目标url
if type(url) == str:
target_url = unquote(url)
target_domain = get_root_domain(target_url)
else:
return {"code": 500,"msg":"No target URL"}
# 等待时间必须在 0 到 30 之间
if wait in range(0, 31):
wait_time = wait
else:
return {"code": 500,"msg":"The waiting time must be between 0 and 30"}
header_array = {}
# header可以覆写(不包括cookie),必须传入json格式
try:
if type(header) == str:
header_array.update(json.loads(unquote(header)))
if 'cookie' in header_array:
del header_array['cookie']
except Exception as e:
return {"code": 500,"msg":"The header field is not JSON"}
# 初始化浏览器
options = Options()
# 设置为无头模式
options.add_argument('--headless')
# 实例化
driver = webdriver.Chrome(options=options,seleniumwire_options=seleniumwire_options)
# 需要打开网址页面,才能用 driver.add_cookie 进行cookie追加
driver.get(target_url)
# 清除本次打开网址页面,可能存储在本地的cookie、sessionStorage、localStorage,并删除因此次访问所产生的 network 和 har 记录
driver.delete_all_cookies()
driver.execute_script("window.sessionStorage.clear();")
driver.execute_script("window.localStorage.clear();")
del driver.requests
# 试着输出此时的浏览器数据,看是否清理干净
print(f'初始的cookie:{len(driver.get_cookies())}')
print(f'初始的sessionStorage:{driver.execute_script("return window.sessionStorage.length;")}')
print(f'初始的localStorage:{driver.execute_script("return window.localStorage.length;")}')
print(f'初始的network:{len(driver.requests)}')
# 对浏览器追加我们传递进来的cookie(允许明文字符串,或者经过base64编码的json,获取json使用谷歌浏览器的Cookie-Editor插件导出json即可)
if type(cookie_string) == str:
cookie_array = convert_cookies_to_dict(header_array['cookie'])
domain = f'.{target_domain}'
for key, value in cookie_array.items():
try:
driver.add_cookie({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"})
except Exception as e:
print("Error Cookie String:")
print({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"})
elif type(cookie_json_base64) == str:
try:
cookie_json = base64.b64decode(cookie_json_base64)
except Exception as e:
return {"code": 500,"msg":"The cookie_json_base64 field is not BASE64"}
try:
cookie_array = json.loads(cookie_json)
except Exception as e:
return {"code": 500,"msg":"The cookie_json field is not JSON","data":cookie_json}
for iteam in cookie_array:
try:
driver.add_cookie({
"name": iteam["name"],
"value": iteam["value"],
"domain": iteam["domain"],
"path": iteam["path"],
# "sameSite": iteam["sameSite"],
# "secure": iteam["secure"],
# "httpOnly": iteam["httpOnly"],
})
except Exception as e:
print("Error Cookie Json:")
print(iteam)
else:
pass
# 把下次访问中的请求头修改成我们需要的样式(没有修改的项目则保持原样)
driver.header_overrides = header_array
# 再次访问网址
driver.get(target_url)
# 输出此时访问的网页源码
# print(driver.page_source)
# 等待多少秒,来预估网页完全的加载完成(执行完内部的所有js,因为部分js可能涉及到请求后的动态处理,或者延时跳转)
if wait_time > 0:
time.sleep(wait_time)
# 获取完全加载完成时,页面的URL
current_url = driver.current_url
# 获取完全加载完成时,页面的源代码
page_source = driver.page_source
# 获取完全加载完成时,页面的cookie
cookies = driver.get_cookies()
# 完全加载完成时,页面是否有发生过 301 302 跳转过
is_jump = (target_url != current_url)
network = []
# 遍历输出过程中的 network(使用非 har 文件的摘要方式输出)
for request in driver.requests:
if request.response:
network.append({
"method":request.method,
"status":request.response.status_code ,
"url":request.url,
"responseheaders":{k: try_json_decode(v) for k, v in request.response.headers.items()},
"requestheaders":{k: try_json_decode(v) for k, v in request.headers.items()},
})
# driver.har 将调用 har 记录,输出最为完整的 network 数据流
# print(driver.har)
data = {
"url": current_url,
"page_source": page_source,
"end_cookies": cookies,
"is_jump": is_jump,
"network": network
}
driver.quit()
return {"code": 200,"data":data}
if __name__ == '__main__':
uvicorn.run(app='app:app', host="0.0.0.0", port=7860) |