""" from bilibili_api import Credential 凭据操作类 """ import re import time import uuid import binascii from typing import Union from Cryptodome.Hash import SHA256 from Cryptodome.PublicKey import RSA from Cryptodome.Cipher import PKCS1_OAEP from .credential import Credential as _Credential from .network import Api, get_api, get_session, HEADERS key = RSA.importKey( """\ -----BEGIN PUBLIC KEY----- MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDLgd2OAkcGVtoE3ThUREbio0Eg Uc/prcajMKXvkCKFCWhJYJcLkcM2DKKcSeFpD/j6Boy538YXnR6VhcuUJOhH2x71 nzPjfdTcqMz7djHum0qSZA0AyCBDABUqCrfNgCiJ00Ra7GmRj+YCK1NJEuewlb40 JNrRuoEUXpabUzGB8QIDAQAB -----END PUBLIC KEY-----""" ) API = get_api("credential") class Credential(_Credential): """ 凭据操作类,用于各种请求操作。 """ async def check_refresh(self) -> bool: """ 检查是否需要刷新 cookies Returns: bool: cookies 是否需要刷新 """ return await check_cookies(self) async def refresh(self) -> None: """ 刷新 cookies """ new_cred: Credential = await refresh_cookies(self) self.sessdata = new_cred.sessdata self.bili_jct = new_cred.bili_jct self.dedeuserid = new_cred.dedeuserid self.ac_time_value = new_cred.ac_time_value async def check_valid(self) -> bool: """ 检查 cookies 是否有效 Returns: bool: cookies 是否有效 """ data = await Api( credential=self, **get_api("credential")["info"]["valid"] ).result return data["isLogin"] @staticmethod def from_cookies(cookies: dict={}) -> "Credential": """ 从 cookies 新建 Credential Args: cookies (dict, optional): Cookies. Defaults to {}. Returns: Credential: 凭据类 """ c = Credential() c.sessdata = cookies.get("SESSDATA") c.bili_jct = cookies.get("bili_jct") c.buvid3 = cookies.get("buvid3") c.dedeuserid = cookies.get("DedeUserID") c.ac_time_value = cookies.get("ac_time_value") return c """ Cookies 刷新相关 感谢 bilibili-API-collect 提供的刷新 Cookies 的思路 https://socialsisteryi.github.io/bilibili-API-collect/docs/login/cookie_refresh.html """ async def check_cookies(credential: Credential) -> bool: """ 检查是否需要刷新 Cookies Args: credential (Credential): 用户凭证 Return: bool: 是否需要刷新 Cookies """ api = API["info"]["check_cookies"] return (await Api(**api, credential=credential).result)["refresh"] def getCorrespondPath() -> str: """ 根据时间生成 CorrespondPath Return: str: CorrespondPath """ ts = round(time.time() * 1000) cipher = PKCS1_OAEP.new(key, SHA256) encrypted = cipher.encrypt(f"refresh_{ts}".encode()) return binascii.b2a_hex(encrypted).decode() async def get_refresh_csrf(credential: Credential) -> str: """ 获取刷新 Cookies 的 csrf Return: str: csrf """ correspond_path = getCorrespondPath() api = API["operate"]["get_refresh_csrf"] cookies = credential.get_cookies() cookies["buvid3"] = str(uuid.uuid1()) cookies["Domain"] = ".bilibili.com" resp = await get_session().request( "GET", api["url"].replace("{correspondPath}", correspond_path), cookies=cookies, headers=HEADERS.copy(), ) if resp.status_code == 404: raise Exception("correspondPath 过期或错误。") elif resp.status_code == 200: text = resp.text refresh_csrf = re.findall('