|
from seleniumwire import webdriver |
|
from selenium.webdriver.chrome.options import Options |
|
from fastapi import FastAPI, Request |
|
import uvicorn |
|
import time |
|
import json |
|
from urllib.parse import unquote, urlparse, quote_plus |
|
import base64 |
|
|
|
app = FastAPI() |
|
|
|
|
|
def convert_cookies_to_dict(cookies): |
|
cookie_items = cookies.split("; ") |
|
parsed_cookies = {item.split("=", 1)[0].strip(): item.split("=", 1)[1].strip() if "=" in item else "" for item in cookie_items} |
|
return parsed_cookies |
|
|
|
|
|
def get_root_domain(url): |
|
parsed_url = urlparse(url) |
|
domain = parsed_url.netloc |
|
|
|
parts = domain.split('.') |
|
if len(parts) > 1: |
|
return '.'.join(parts[-2:]) |
|
else: |
|
return domain |
|
|
|
|
|
def try_json_decode(headers): |
|
try: |
|
return json.loads(str(headers)) |
|
except Exception as e: |
|
return headers |
|
|
|
@app.get("/") |
|
def main(): |
|
return {"code": 200,"msg":"Success"} |
|
|
|
@app.get("/chrome") |
|
def chrome(url:str=None,wait:int=5,header:str=None,cookie_string:str=None,cookie_json_base64:str=None): |
|
|
|
seleniumwire_options = { |
|
'enable_har': True |
|
} |
|
|
|
|
|
if type(url) == str: |
|
target_url = unquote(url) |
|
target_domain = get_root_domain(target_url) |
|
else: |
|
return {"code": 500,"msg":"No target URL"} |
|
|
|
|
|
if wait in range(0, 31): |
|
wait_time = wait |
|
else: |
|
return {"code": 500,"msg":"The waiting time must be between 0 and 30"} |
|
|
|
header_array = {} |
|
|
|
|
|
try: |
|
if type(header) == str: |
|
header_array.update(json.loads(unquote(header))) |
|
if 'cookie' in header_array: |
|
del header_array['cookie'] |
|
except Exception as e: |
|
return {"code": 500,"msg":"The header field is not JSON"} |
|
|
|
|
|
options = Options() |
|
|
|
|
|
options.add_argument('--headless') |
|
|
|
|
|
driver = webdriver.Chrome(options=options,seleniumwire_options=seleniumwire_options) |
|
|
|
|
|
driver.get(target_url) |
|
|
|
|
|
driver.delete_all_cookies() |
|
driver.execute_script("window.sessionStorage.clear();") |
|
driver.execute_script("window.localStorage.clear();") |
|
del driver.requests |
|
|
|
|
|
print(f'初始的cookie:{len(driver.get_cookies())}') |
|
print(f'初始的sessionStorage:{driver.execute_script("return window.sessionStorage.length;")}') |
|
print(f'初始的localStorage:{driver.execute_script("return window.localStorage.length;")}') |
|
print(f'初始的network:{len(driver.requests)}') |
|
|
|
|
|
if type(cookie_string) == str: |
|
cookie_array = convert_cookies_to_dict(header_array['cookie']) |
|
domain = f'.{target_domain}' |
|
for key, value in cookie_array.items(): |
|
try: |
|
driver.add_cookie({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"}) |
|
except Exception as e: |
|
print("Error Cookie String:") |
|
print({"name": key, "value": quote_plus(value), "domain": domain, "path": "/"}) |
|
elif type(cookie_json_base64) == str: |
|
try: |
|
cookie_json = base64.b64decode(cookie_json_base64) |
|
except Exception as e: |
|
return {"code": 500,"msg":"The cookie_json_base64 field is not BASE64"} |
|
try: |
|
cookie_array = json.loads(cookie_json) |
|
except Exception as e: |
|
return {"code": 500,"msg":"The cookie_json field is not JSON","data":cookie_json} |
|
for iteam in cookie_array: |
|
try: |
|
driver.add_cookie({ |
|
"name": iteam["name"], |
|
"value": iteam["value"], |
|
"domain": iteam["domain"], |
|
"path": iteam["path"], |
|
|
|
|
|
|
|
}) |
|
except Exception as e: |
|
print("Error Cookie Json:") |
|
print(iteam) |
|
|
|
else: |
|
pass |
|
|
|
|
|
driver.header_overrides = header_array |
|
|
|
|
|
driver.get(target_url) |
|
|
|
|
|
|
|
|
|
|
|
if wait_time > 0: |
|
time.sleep(wait_time) |
|
|
|
|
|
current_url = driver.current_url |
|
|
|
|
|
page_source = driver.page_source |
|
|
|
|
|
cookies = driver.get_cookies() |
|
|
|
|
|
is_jump = (target_url != current_url) |
|
|
|
network = [] |
|
|
|
for request in driver.requests: |
|
if request.response: |
|
network.append({ |
|
"method":request.method, |
|
"status":request.response.status_code , |
|
"url":request.url, |
|
"responseheaders":{k: try_json_decode(v) for k, v in request.response.headers.items()}, |
|
"requestheaders":{k: try_json_decode(v) for k, v in request.headers.items()}, |
|
}) |
|
|
|
|
|
|
|
|
|
data = { |
|
"url": current_url, |
|
"page_source": page_source, |
|
"end_cookies": cookies, |
|
"is_jump": is_jump, |
|
"network": network |
|
} |
|
|
|
driver.quit() |
|
return {"code": 200,"data":data} |
|
|
|
if __name__ == '__main__': |
|
uvicorn.run(app='app:app', host="0.0.0.0", port=7860) |