from seleniumwire import webdriver from selenium.webdriver.chrome.options import Options from fastapi import FastAPI, Request import uvicorn import time import json from urllib.parse import unquote, urlparse, quote_plus import base64 app = FastAPI() # 解析cookie字符串为字典 def convert_cookies_to_dict(cookies): cookie_items = cookies.split("; ") parsed_cookies = {item.split("=", 1)[0].strip(): item.split("=", 1)[1].strip() if "=" in item else "" for item in cookie_items} return parsed_cookies# # 获取域名字符串的根域 def get_root_domain(url): parsed_url = urlparse(url) domain = parsed_url.netloc parts = domain.split('.') if len(parts) > 1: return '.'.join(parts[-2:]) else: return domain # 尝试对字符串做 json 解析,如果失败则返回原字符串 def try_json_decode(headers): try: return json.loads(str(headers)) except Exception as e: return headers @app.get("/") def main(): return {"code": 200,"msg":"Success"} @app.get("/chrome") def chrome(url:str=None,wait:int=5,header:str=None,cookie_string:str=None,cookie_json_base64:str=None): # 开启捕获HAR数据功能,允许使用 driver.har 进行检索 seleniumwire_options = { 'enable_har': True } # 必须有目标url if type(url) == str: target_url = unquote(url) target_domain = get_root_domain(target_url) else: return {"code": 500,"msg":"No target URL"} # 等待时间必须在 0 到 30 之间 if wait in range(0, 31): wait_time = wait else: return {"code": 500,"msg":"The waiting time must be between 0 and 30"} header_array = {} # header可以覆写(不包括cookie),必须传入json格式 try: if type(header) == str: header_array.update(json.loads(unquote(header))) if 'cookie' in header_array: del header_array['cookie'] except Exception as e: return {"code": 500,"msg":"The header field is not JSON"} # 初始化浏览器 options = Options() # 设置为无头模式 options.add_argument('--headless') # 实例化 driver = webdriver.Chrome(options=options,seleniumwire_options=seleniumwire_options) # 需要打开网址页面,才能用 driver.add_cookie 进行cookie追加 driver.get(target_url) # 清除本次打开网址页面,可能存储在本地的cookie、sessionStorage、localStorage,并删除因此次访问所产生的 network 和 har 记录 driver.delete_all_cookies() driver.execute_script("window.sessionStorage.clear();") driver.execute_script("window.localStorage.clear();") del driver.requests # 试着输出此时的浏览器数据,看是否清理干净 print(f'初始的cookie:{len(driver.get_cookies())}') print(f'初始的sessionStorage:{driver.execute_script("return window.sessionStorage.length;")}') print(f'初始的localStorage:{driver.execute_script("return window.localStorage.length;")}') print(f'初始的network:{len(driver.requests)}') # 对浏览器追加我们传递进来的cookie(允许明文字符串,或者经过base64编码的json,获取json使用谷歌浏览器的Cookie-Editor插件导出json即可) if type(cookie_string) == str: cookie_array = convert_cookies_to_dict(header_array['cookie']) domain = f'.{target_domain}' for key, value in cookie_array.items(): try: driver.add_cookie({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"}) except Exception as e: print("Error Cookie String:") print({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"}) elif type(cookie_json_base64) == str: try: cookie_json = base64.b64decode(cookie_json_base64) except Exception as e: return {"code": 500,"msg":"The cookie_json_base64 field is not BASE64"} try: cookie_array = json.loads(cookie_json) except Exception as e: return {"code": 500,"msg":"The cookie_json field is not JSON","data":cookie_json} for iteam in cookie_array: try: driver.add_cookie({ "name": iteam["name"], "value": iteam["value"], "domain": iteam["domain"], "path": iteam["path"], # "sameSite": iteam["sameSite"], # "secure": iteam["secure"], # "httpOnly": iteam["httpOnly"], }) except Exception as e: print("Error Cookie Json:") print(iteam) else: pass # 把下次访问中的请求头修改成我们需要的样式(没有修改的项目则保持原样) driver.header_overrides = header_array # 再次访问网址 driver.get(target_url) # 输出此时访问的网页源码 # print(driver.page_source) # 等待多少秒,来预估网页完全的加载完成(执行完内部的所有js,因为部分js可能涉及到请求后的动态处理,或者延时跳转) if wait_time > 0: time.sleep(wait_time) # 获取完全加载完成时,页面的URL current_url = driver.current_url # 获取完全加载完成时,页面的源代码 page_source = driver.page_source # 获取完全加载完成时,页面的cookie cookies = driver.get_cookies() # 完全加载完成时,页面是否有发生过 301 302 跳转过 is_jump = (target_url != current_url) network = [] # 遍历输出过程中的 network(使用非 har 文件的摘要方式输出) for request in driver.requests: if request.response: network.append({ "method":request.method, "status":request.response.status_code , "url":request.url, "responseheaders":{k: try_json_decode(v) for k, v in request.response.headers.items()}, "requestheaders":{k: try_json_decode(v) for k, v in request.headers.items()}, }) # driver.har 将调用 har 记录,输出最为完整的 network 数据流 # print(driver.har) data = { "url": current_url, "page_source": page_source, "end_cookies": cookies, "is_jump": is_jump, "network": network } driver.quit() return {"code": 200,"data":data} if __name__ == '__main__': uvicorn.run(app='app:app', host="0.0.0.0", port=7860)