Container's picture
Update app.py
1d54d21 verified
from seleniumwire import webdriver
from selenium.webdriver.chrome.options import Options
from fastapi import FastAPI, Request
import uvicorn
import time
import json
from urllib.parse import unquote, urlparse, quote_plus
import base64
app = FastAPI()
# 解析cookie字符串为字典
def convert_cookies_to_dict(cookies):
cookie_items = cookies.split("; ")
parsed_cookies = {item.split("=", 1)[0].strip(): item.split("=", 1)[1].strip() if "=" in item else "" for item in cookie_items}
return parsed_cookies#
# 获取域名字符串的根域
def get_root_domain(url):
parsed_url = urlparse(url)
domain = parsed_url.netloc
parts = domain.split('.')
if len(parts) > 1:
return '.'.join(parts[-2:])
else:
return domain
# 尝试对字符串做 json 解析,如果失败则返回原字符串
def try_json_decode(headers):
try:
return json.loads(str(headers))
except Exception as e:
return headers
@app.get("/")
def main():
return {"code": 200,"msg":"Success"}
@app.get("/chrome")
def chrome(url:str=None,wait:int=5,header:str=None,cookie_string:str=None,cookie_json_base64:str=None):
# 开启捕获HAR数据功能,允许使用 driver.har 进行检索
seleniumwire_options = {
'enable_har': True
}
# 必须有目标url
if type(url) == str:
target_url = unquote(url)
target_domain = get_root_domain(target_url)
else:
return {"code": 500,"msg":"No target URL"}
# 等待时间必须在 0 到 30 之间
if wait in range(0, 31):
wait_time = wait
else:
return {"code": 500,"msg":"The waiting time must be between 0 and 30"}
header_array = {}
# header可以覆写(不包括cookie),必须传入json格式
try:
if type(header) == str:
header_array.update(json.loads(unquote(header)))
if 'cookie' in header_array:
del header_array['cookie']
except Exception as e:
return {"code": 500,"msg":"The header field is not JSON"}
# 初始化浏览器
options = Options()
# 设置为无头模式
options.add_argument('--headless')
# 实例化
driver = webdriver.Chrome(options=options,seleniumwire_options=seleniumwire_options)
# 需要打开网址页面,才能用 driver.add_cookie 进行cookie追加
driver.get(target_url)
# 清除本次打开网址页面,可能存储在本地的cookie、sessionStorage、localStorage,并删除因此次访问所产生的 network 和 har 记录
driver.delete_all_cookies()
driver.execute_script("window.sessionStorage.clear();")
driver.execute_script("window.localStorage.clear();")
del driver.requests
# 试着输出此时的浏览器数据,看是否清理干净
print(f'初始的cookie:{len(driver.get_cookies())}')
print(f'初始的sessionStorage:{driver.execute_script("return window.sessionStorage.length;")}')
print(f'初始的localStorage:{driver.execute_script("return window.localStorage.length;")}')
print(f'初始的network:{len(driver.requests)}')
# 对浏览器追加我们传递进来的cookie(允许明文字符串,或者经过base64编码的json,获取json使用谷歌浏览器的Cookie-Editor插件导出json即可)
if type(cookie_string) == str:
cookie_array = convert_cookies_to_dict(header_array['cookie'])
domain = f'.{target_domain}'
for key, value in cookie_array.items():
try:
driver.add_cookie({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"})
except Exception as e:
print("Error Cookie String:")
print({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"})
elif type(cookie_json_base64) == str:
try:
cookie_json = base64.b64decode(cookie_json_base64)
except Exception as e:
return {"code": 500,"msg":"The cookie_json_base64 field is not BASE64"}
try:
cookie_array = json.loads(cookie_json)
except Exception as e:
return {"code": 500,"msg":"The cookie_json field is not JSON","data":cookie_json}
for iteam in cookie_array:
try:
driver.add_cookie({
"name": iteam["name"],
"value": iteam["value"],
"domain": iteam["domain"],
"path": iteam["path"],
# "sameSite": iteam["sameSite"],
# "secure": iteam["secure"],
# "httpOnly": iteam["httpOnly"],
})
except Exception as e:
print("Error Cookie Json:")
print(iteam)
else:
pass
# 把下次访问中的请求头修改成我们需要的样式(没有修改的项目则保持原样)
driver.header_overrides = header_array
# 再次访问网址
driver.get(target_url)
# 输出此时访问的网页源码
# print(driver.page_source)
# 等待多少秒,来预估网页完全的加载完成(执行完内部的所有js,因为部分js可能涉及到请求后的动态处理,或者延时跳转)
if wait_time > 0:
time.sleep(wait_time)
# 获取完全加载完成时,页面的URL
current_url = driver.current_url
# 获取完全加载完成时,页面的源代码
page_source = driver.page_source
# 获取完全加载完成时,页面的cookie
cookies = driver.get_cookies()
# 完全加载完成时,页面是否有发生过 301 302 跳转过
is_jump = (target_url != current_url)
network = []
# 遍历输出过程中的 network(使用非 har 文件的摘要方式输出)
for request in driver.requests:
if request.response:
network.append({
"method":request.method,
"status":request.response.status_code ,
"url":request.url,
"responseheaders":{k: try_json_decode(v) for k, v in request.response.headers.items()},
"requestheaders":{k: try_json_decode(v) for k, v in request.headers.items()},
})
# driver.har 将调用 har 记录,输出最为完整的 network 数据流
# print(driver.har)
data = {
"url": current_url,
"page_source": page_source,
"end_cookies": cookies,
"is_jump": is_jump,
"network": network
}
driver.quit()
return {"code": 200,"data":data}
if __name__ == '__main__':
uvicorn.run(app='app:app', host="0.0.0.0", port=7860)