|
"""
|
|
bilibili_api.video
|
|
|
|
视频相关操作
|
|
|
|
注意,同时存在 page_index 和 cid 的参数,两者至少提供一个。
|
|
"""
|
|
|
|
import re
|
|
import os
|
|
import json
|
|
import struct
|
|
import asyncio
|
|
import logging
|
|
import datetime
|
|
from enum import Enum
|
|
from inspect import isfunction
|
|
from functools import cmp_to_key
|
|
from dataclasses import dataclass
|
|
from typing import Any, List, Union, Optional
|
|
|
|
import httpx
|
|
import aiohttp
|
|
|
|
from . import settings
|
|
from .utils.aid_bvid_transformer import bvid2aid, aid2bvid
|
|
from .utils.utils import get_api
|
|
from .utils.AsyncEvent import AsyncEvent
|
|
from .utils.credential import Credential
|
|
from .utils.BytesReader import BytesReader
|
|
from .utils.danmaku import Danmaku, SpecialDanmaku
|
|
from .utils.network import get_aiohttp_session, Api, get_session
|
|
from .exceptions import (
|
|
ArgsException,
|
|
NetworkException,
|
|
ResponseException,
|
|
DanmakuClosedException,
|
|
)
|
|
|
|
API = get_api("video")
|
|
|
|
|
|
def get_cid_info_sync(cid: int):
|
|
"""
|
|
获取 cid 信息 (对应的视频,具体分 P 序号,up 等)
|
|
|
|
Returns:
|
|
dict: 调用 https://hd.biliplus.com 的 API 返回的结果
|
|
"""
|
|
api = API["info"]["cid_info"]
|
|
params = {"cid": cid}
|
|
return Api(**api).update_params(**params).result_sync
|
|
|
|
|
|
async def get_cid_info(cid: int):
|
|
"""
|
|
获取 cid 信息 (对应的视频,具体分 P 序号,up 等)
|
|
|
|
Returns:
|
|
dict: 调用 https://hd.biliplus.com 的 API 返回的结果
|
|
"""
|
|
api = API["info"]["cid_info"]
|
|
params = {"cid": cid}
|
|
return await Api(**api).update_params(**params).result
|
|
|
|
|
|
class DanmakuOperatorType(Enum):
|
|
"""
|
|
弹幕操作枚举
|
|
|
|
+ DELETE - 删除弹幕
|
|
+ PROTECT - 保护弹幕
|
|
+ UNPROTECT - 取消保护弹幕
|
|
"""
|
|
|
|
DELETE = 1
|
|
PROTECT = 2
|
|
UNPROTECT = 3
|
|
|
|
|
|
class VideoAppealReasonType:
|
|
"""
|
|
视频投诉原因枚举
|
|
|
|
注意: 每一项均为函数,部分项有参数,没有参数的函数无需调用函数,直接传入即可,有参数的函数请调用结果之后传入。
|
|
|
|
- ILLEGAL(): 违法违禁
|
|
- PRON(): 色情
|
|
- VULGAR(): 低俗
|
|
- GAMBLED_SCAMS(): 赌博诈骗
|
|
- VIOLENT(): 血腥暴力
|
|
- PERSONAL_ATTACK(): 人身攻击
|
|
- PLAGIARISM(bvid: str): 与站内其他视频撞车
|
|
- BAD_FOR_YOUNGS(): 青少年不良信息
|
|
- CLICKBAIT(): 不良封面/标题
|
|
- POLITICAL_RUMORS(): 涉政谣言
|
|
- SOCIAL_RUMORS(): 涉社会事件谣言
|
|
- COVID_RUMORS(): 疫情谣言
|
|
- UNREAL_EVENT(): 虚假不实消息
|
|
- OTHER(): 有其他问题
|
|
- LEAD_WAR(): 引战
|
|
- CANNOT_CHARGE(): 不能参加充电
|
|
- UNREAL_COPYRIGHT(source: str): 转载/自制类型错误
|
|
"""
|
|
|
|
ILLEGAL = lambda: 2
|
|
PRON = lambda: 3
|
|
VULGAR = lambda: 4
|
|
GAMBLED_SCAMS = lambda: 5
|
|
VIOLENT = lambda: 6
|
|
PERSONAL_ATTACK = lambda: 7
|
|
BAD_FOR_YOUNGS = lambda: 10000
|
|
CLICKBAIT = lambda: 10013
|
|
POLITICAL_RUMORS = lambda: 10014
|
|
SOCIAL_RUMORS = lambda: 10015
|
|
COVID_RUMORS = lambda: 10016
|
|
UNREAL_EVENT = lambda: 10017
|
|
OTHER = lambda: 1
|
|
LEAD_WAR = lambda: 9
|
|
CANNOT_CHARGE = lambda: 10
|
|
|
|
@staticmethod
|
|
def PLAGIARISM(bvid: str):
|
|
"""
|
|
与站内其他视频撞车
|
|
|
|
Args:
|
|
bvid (str): 撞车对象
|
|
"""
|
|
return {"tid": 8, "撞车对象": bvid}
|
|
|
|
@staticmethod
|
|
def UNREAL_COPYRIGHT(source: str):
|
|
"""
|
|
转载/自制类型错误
|
|
|
|
Args:
|
|
source (str): 原创视频出处
|
|
"""
|
|
return {"tid": 52, "出处": source}
|
|
|
|
|
|
class Video:
|
|
"""
|
|
视频类,各种对视频的操作均在里面。
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
bvid: Union[None, str] = None,
|
|
aid: Union[None, int] = None,
|
|
credential: Union[None, Credential] = None,
|
|
):
|
|
"""
|
|
Args:
|
|
bvid (str | None, optional) : BV 号. bvid 和 aid 必须提供其中之一。
|
|
|
|
aid (int | None, optional) : AV 号. bvid 和 aid 必须提供其中之一。
|
|
|
|
credential (Credential | None, optional): Credential 类. Defaults to None.
|
|
"""
|
|
|
|
if bvid is not None:
|
|
self.set_bvid(bvid)
|
|
elif aid is not None:
|
|
self.set_aid(aid)
|
|
else:
|
|
|
|
raise ArgsException("请至少提供 bvid 和 aid 中的其中一个参数。")
|
|
|
|
|
|
self.credential: Credential = Credential() if credential is None else credential
|
|
|
|
|
|
self.__info: Union[dict, None] = None
|
|
|
|
def set_bvid(self, bvid: str) -> None:
|
|
"""
|
|
设置 bvid。
|
|
|
|
Args:
|
|
bvid (str): 要设置的 bvid。
|
|
"""
|
|
|
|
if not re.search("^BV[a-zA-Z0-9]{10}$", bvid):
|
|
raise ArgsException("bvid 提供错误,必须是以 BV 开头的纯字母和数字组成的 12 位字符串(大小写敏感)。")
|
|
self.__bvid = bvid
|
|
self.__aid = bvid2aid(bvid)
|
|
|
|
def get_bvid(self) -> str:
|
|
"""
|
|
获取 BVID。
|
|
|
|
Returns:
|
|
str: BVID。
|
|
"""
|
|
return self.__bvid
|
|
|
|
def set_aid(self, aid: int) -> None:
|
|
"""
|
|
设置 aid。
|
|
|
|
Args:
|
|
aid (int): AV 号。
|
|
"""
|
|
if aid <= 0:
|
|
raise ArgsException("aid 不能小于或等于 0。")
|
|
|
|
self.__aid = aid
|
|
self.__bvid = aid2bvid(aid)
|
|
|
|
def get_aid(self) -> int:
|
|
"""
|
|
获取 AID。
|
|
|
|
Returns:
|
|
int: aid。
|
|
"""
|
|
return self.__aid
|
|
|
|
async def get_info(self) -> dict:
|
|
"""
|
|
获取视频信息。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
api = API["info"]["info"]
|
|
params = {"bvid": self.get_bvid(), "aid": self.get_aid()}
|
|
resp = (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
self.__info = resp
|
|
return resp
|
|
|
|
async def get_detail(self) -> dict:
|
|
"""
|
|
获取视频详细信息
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
api = API["info"]["detail"]
|
|
params = {
|
|
"bvid": self.get_bvid(),
|
|
"aid": self.get_aid(),
|
|
"need_operation_card": 0,
|
|
"need_elec": 0,
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def __get_info_cached(self) -> dict:
|
|
"""
|
|
获取视频信息,如果已获取过则使用之前获取的信息,没有则重新获取。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
if self.__info is None:
|
|
return await self.get_info()
|
|
return self.__info
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_up_mid(self) -> int:
|
|
"""
|
|
获取视频 up 主的 mid。
|
|
|
|
Returns:
|
|
int: up_mid
|
|
"""
|
|
info = await self.__get_info_cached()
|
|
return info["owner"]["mid"]
|
|
|
|
async def get_tags(
|
|
self, page_index: Union[int, None] = 0, cid: Union[int, None] = None
|
|
) -> List[dict]:
|
|
"""
|
|
获取视频标签。
|
|
|
|
Args:
|
|
page_index (int | None): 分 P 序号. Defaults to 0.
|
|
|
|
cid (int | None): 分 P 编码. Defaults to None.
|
|
|
|
Returns:
|
|
List[dict]: 调用 API 返回的结果。
|
|
"""
|
|
if cid == None:
|
|
if page_index == None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.get_cid(page_index=page_index)
|
|
api = API["info"]["tags"]
|
|
params = {"bvid": self.get_bvid(), "aid": self.get_aid(), "cid": cid}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_chargers(self) -> dict:
|
|
"""
|
|
获取视频充电用户。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
info = await self.__get_info_cached()
|
|
mid = info["owner"]["mid"]
|
|
api = API["info"]["chargers"]
|
|
params = {"aid": self.get_aid(), "bvid": self.get_bvid(), "mid": mid}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_pages(self) -> List[dict]:
|
|
"""
|
|
获取分 P 信息。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
api = API["info"]["pages"]
|
|
params = {"aid": self.get_aid(), "bvid": self.get_bvid()}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def __get_cid_by_index(self, page_index: int) -> int:
|
|
"""
|
|
根据分 p 号获取 cid。
|
|
|
|
Args:
|
|
page_index (int): 分 P 号,从 0 开始。
|
|
|
|
Returns:
|
|
int: cid 分 P 的唯一 ID。
|
|
"""
|
|
if page_index < 0:
|
|
raise ArgsException("分 p 号必须大于或等于 0。")
|
|
|
|
info = await self.__get_info_cached()
|
|
pages = info["pages"]
|
|
|
|
if len(pages) <= page_index:
|
|
raise ArgsException("不存在该分 p。")
|
|
|
|
page = pages[page_index]
|
|
cid = page["cid"]
|
|
return cid
|
|
|
|
async def get_video_snapshot(
|
|
self,
|
|
cid: Union[int, None] = None,
|
|
json_index: bool = False,
|
|
pvideo: bool = True,
|
|
) -> dict:
|
|
"""
|
|
获取视频快照(视频各个时间段的截图拼图)
|
|
|
|
Args:
|
|
cid(int): 分 P CID(可选)
|
|
|
|
json_index(bool): json 数组截取时间表 True 为需要,False 不需要
|
|
|
|
pvideo(bool): 是否只获取预览
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果,数据中 Url 没有 http 头
|
|
"""
|
|
params: dict[str, Any] = {"aid": self.get_aid()}
|
|
if pvideo:
|
|
api = API["info"]["video_snapshot_pvideo"]
|
|
else:
|
|
params["bvid"] = self.get_bvid()
|
|
if json_index:
|
|
params["index"] = 1
|
|
if cid:
|
|
params["cid"] = cid
|
|
api = API["info"]["video_snapshot"]
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_cid(self, page_index: int) -> int:
|
|
"""
|
|
获取稿件 cid
|
|
|
|
Args:
|
|
page_index(int): 分 P
|
|
|
|
Returns:
|
|
int: cid
|
|
"""
|
|
return await self.__get_cid_by_index(page_index)
|
|
|
|
async def get_download_url(
|
|
self,
|
|
page_index: Union[int, None] = None,
|
|
cid: Union[int, None] = None,
|
|
html5: bool = False,
|
|
) -> dict:
|
|
"""
|
|
获取视频下载信息。
|
|
|
|
返回结果可以传入 `VideoDownloadURLDataDetecter` 进行解析。
|
|
|
|
page_index 和 cid 至少提供其中一个,其中 cid 优先级最高
|
|
|
|
Args:
|
|
page_index (int | None, optional) : 分 P 号,从 0 开始。Defaults to None
|
|
|
|
cid (int | None, optional) : 分 P 的 ID。Defaults to None
|
|
|
|
html5 (bool, optional): 是否以 html5 平台访问,这样子能直接在网页中播放,但是链接少。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
api = API["info"]["playurl"]
|
|
if html5:
|
|
params = {
|
|
"avid": self.get_aid(),
|
|
"cid": cid,
|
|
"qn": "127",
|
|
"otype": "json",
|
|
"fnval": 4048,
|
|
"fourk": 1,
|
|
"platform": "html5",
|
|
"high_quality": "1",
|
|
}
|
|
else:
|
|
params = {
|
|
"avid": self.get_aid(),
|
|
"cid": cid,
|
|
"qn": "127",
|
|
"otype": "json",
|
|
"fnval": 4048,
|
|
"fourk": 1,
|
|
}
|
|
result = (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
result.update({"is_html5": True} if html5 else {})
|
|
return result
|
|
|
|
async def get_related(self) -> dict:
|
|
"""
|
|
获取相关视频信息。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
api = API["info"]["related"]
|
|
params = {"aid": self.get_aid(), "bvid": self.get_bvid()}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_relation(self) -> dict:
|
|
"""
|
|
获取用户与视频的关系
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
api = API["info"]["relation"]
|
|
params = {"aid": self.get_aid(), "bvid": self.get_bvid()}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def has_liked(self) -> bool:
|
|
"""
|
|
视频是否点赞过。
|
|
|
|
Returns:
|
|
bool: 视频是否点赞过。
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["has_liked"]
|
|
params = {"bvid": self.get_bvid(), "aid": self.get_aid()}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_pay_coins(self) -> int:
|
|
"""
|
|
获取视频已投币数量。
|
|
|
|
Returns:
|
|
int: 视频已投币数量。
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["get_pay_coins"]
|
|
params = {"bvid": self.get_bvid(), "aid": self.get_aid()}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)["multiply"]
|
|
|
|
async def has_favoured(self) -> bool:
|
|
"""
|
|
是否已收藏。
|
|
|
|
Returns:
|
|
bool: 视频是否已收藏。
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["has_favoured"]
|
|
params = {"bvid": self.get_bvid(), "aid": self.get_aid()}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)["favoured"]
|
|
|
|
async def is_forbid_note(self) -> bool:
|
|
"""
|
|
是否禁止笔记。
|
|
|
|
Returns:
|
|
bool: 是否禁止笔记。
|
|
"""
|
|
api = API["info"]["is_forbid"]
|
|
params = {"aid": self.get_aid()}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)["forbid_note_entrance"]
|
|
|
|
async def get_private_notes_list(self) -> list:
|
|
"""
|
|
获取稿件私有笔记列表。
|
|
|
|
Returns:
|
|
list: note_Ids。
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["private_notes"]
|
|
params = {"oid": self.get_aid(), "oid_type": 0}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)["noteIds"]
|
|
|
|
async def get_public_notes_list(self, pn: int, ps: int) -> dict:
|
|
"""
|
|
获取稿件公开笔记列表。
|
|
|
|
Args:
|
|
pn (int): 页码
|
|
|
|
ps (int): 每页项数
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
|
|
api = API["info"]["public_notes"]
|
|
params = {"oid": self.get_aid(), "oid_type": 0, "pn": pn, "ps": ps}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_ai_conclusion(self, cid: Optional[int] = None, page_index: Optional[int] = None,
|
|
up_mid: Optional[int] = None) -> dict:
|
|
"""
|
|
获取稿件 AI 总结结果。
|
|
|
|
cid 和 page_index 至少提供其中一个,其中 cid 优先级最高
|
|
|
|
Args:
|
|
cid (Optional, int): 分 P 的 cid。
|
|
|
|
page_index (Optional, int): 分 P 号,从 0 开始。
|
|
|
|
up_mid (Optional, int): up 主的 mid。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
api = API["info"]["ai_conclusion"]
|
|
params = {"aid": self.get_aid(), "bvid": self.get_bvid(), "cid": cid,
|
|
"up_mid": await self.get_up_mid() if up_mid is None else up_mid}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_danmaku_view(
|
|
self, page_index: Union[int, None] = None, cid: Union[int, None] = None
|
|
) -> dict:
|
|
"""
|
|
获取弹幕设置、特殊弹幕、弹幕数量、弹幕分段等信息。
|
|
|
|
Args:
|
|
page_index (int, optional): 分 P 号,从 0 开始。Defaults to None
|
|
|
|
cid (int, optional): 分 P 的 ID。Defaults to None
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
session = get_session()
|
|
api = API["danmaku"]["view"]
|
|
|
|
config = {}
|
|
config["url"] = api["url"]
|
|
config["params"] = {"type": 1, "oid": cid, "pid": self.get_aid()}
|
|
config["cookies"] = self.credential.get_cookies()
|
|
config["headers"] = {
|
|
"Referer": "https://www.bilibili.com",
|
|
"User-Agent": "Mozilla/5.0",
|
|
}
|
|
|
|
try:
|
|
resp = await session.get(**config)
|
|
except Exception as e:
|
|
raise NetworkException(-1, str(e))
|
|
|
|
resp_data = resp.read()
|
|
json_data = {}
|
|
reader = BytesReader(resp_data)
|
|
|
|
|
|
|
|
def read_dm_seg(stream: bytes):
|
|
reader_ = BytesReader(stream)
|
|
data = {}
|
|
while not reader_.has_end():
|
|
t = reader_.varint() >> 3
|
|
if t == 1:
|
|
data["page_size"] = reader_.varint()
|
|
elif t == 2:
|
|
data["total"] = reader_.varint()
|
|
else:
|
|
continue
|
|
return data
|
|
|
|
def read_flag(stream: bytes):
|
|
reader_ = BytesReader(stream)
|
|
data = {}
|
|
while not reader_.has_end():
|
|
t = reader_.varint() >> 3
|
|
if t == 1:
|
|
data["rec_flag"] = reader_.varint()
|
|
elif t == 2:
|
|
data["rec_text"] = reader_.string()
|
|
elif t == 3:
|
|
data["rec_switch"] = reader_.varint()
|
|
else:
|
|
continue
|
|
return data
|
|
|
|
def read_command_danmakus(stream: bytes):
|
|
reader_ = BytesReader(stream)
|
|
data = {}
|
|
while not reader_.has_end():
|
|
t = reader_.varint() >> 3
|
|
if t == 1:
|
|
data["id"] = reader_.varint()
|
|
elif t == 2:
|
|
data["oid"] = reader_.varint()
|
|
elif t == 3:
|
|
data["mid"] = reader_.varint()
|
|
elif t == 4:
|
|
data["commend"] = reader_.string()
|
|
elif t == 5:
|
|
data["content"] = reader_.string()
|
|
elif t == 6:
|
|
data["progress"] = reader_.varint()
|
|
elif t == 7:
|
|
data["ctime"] = reader_.string()
|
|
elif t == 8:
|
|
data["mtime"] = reader_.string()
|
|
elif t == 9:
|
|
data["extra"] = json.loads(reader_.string())
|
|
|
|
elif t == 10:
|
|
data["id_str"] = reader_.string()
|
|
else:
|
|
continue
|
|
return data
|
|
|
|
def read_settings(stream: bytes):
|
|
reader_ = BytesReader(stream)
|
|
data = {}
|
|
while not reader_.has_end():
|
|
t = reader_.varint() >> 3
|
|
|
|
if t == 1:
|
|
data["dm_switch"] = reader_.bool()
|
|
elif t == 2:
|
|
data["ai_switch"] = reader_.bool()
|
|
elif t == 3:
|
|
data["ai_level"] = reader_.varint()
|
|
elif t == 4:
|
|
data["enable_top"] = reader_.bool()
|
|
elif t == 5:
|
|
data["enable_scroll"] = reader_.bool()
|
|
elif t == 6:
|
|
data["enable_bottom"] = reader_.bool()
|
|
elif t == 7:
|
|
data["enable_color"] = reader_.bool()
|
|
elif t == 8:
|
|
data["enable_special"] = reader_.bool()
|
|
elif t == 9:
|
|
data["prevent_shade"] = reader_.bool()
|
|
elif t == 10:
|
|
data["dmask"] = reader_.bool()
|
|
elif t == 11:
|
|
data["opacity"] = reader_.float(True)
|
|
elif t == 12:
|
|
data["dm_area"] = reader_.varint()
|
|
elif t == 13:
|
|
data["speed_plus"] = reader_.float(True)
|
|
elif t == 14:
|
|
data["font_size"] = reader_.float(True)
|
|
elif t == 15:
|
|
data["screen_sync"] = reader_.bool()
|
|
elif t == 16:
|
|
data["speed_sync"] = reader_.bool()
|
|
elif t == 17:
|
|
data["font_family"] = reader_.string()
|
|
elif t == 18:
|
|
data["bold"] = reader_.bool()
|
|
elif t == 19:
|
|
data["font_border"] = reader_.varint()
|
|
elif t == 20:
|
|
data["draw_type"] = reader_.string()
|
|
else:
|
|
continue
|
|
return data
|
|
|
|
def read_image_danmakus(string: bytes):
|
|
image_list = []
|
|
reader_ = BytesReader(string)
|
|
while not reader_.has_end():
|
|
type_ = reader_.varint() >> 3
|
|
if type_ == 1:
|
|
details_dict: dict[Any, Any] = {"texts": []}
|
|
img_details = reader_.bytes_string()
|
|
reader_details = BytesReader(img_details)
|
|
while not reader_details.has_end():
|
|
type_details = reader_details.varint() >> 3
|
|
if type_details == 1:
|
|
details_dict["texts"].append(reader_details.string())
|
|
elif type_details == 2:
|
|
details_dict["image"] = reader_details.string()
|
|
elif type_details == 3:
|
|
id_string = reader_details.bytes_string()
|
|
id_reader = BytesReader(id_string)
|
|
while not id_reader.has_end():
|
|
type_id = id_reader.varint() >> 3
|
|
if type_id == 2:
|
|
details_dict["id"] = id_reader.varint()
|
|
else:
|
|
raise ResponseException("解析响应数据错误")
|
|
image_list.append(details_dict)
|
|
else:
|
|
raise ResponseException("解析响应数据错误")
|
|
return image_list
|
|
|
|
while not reader.has_end():
|
|
type_ = reader.varint() >> 3
|
|
|
|
if type_ == 1:
|
|
json_data["state"] = reader.varint()
|
|
elif type_ == 2:
|
|
json_data["text"] = reader.string()
|
|
elif type_ == 3:
|
|
json_data["text_side"] = reader.string()
|
|
elif type_ == 4:
|
|
json_data["dm_seg"] = read_dm_seg(reader.bytes_string())
|
|
elif type_ == 5:
|
|
json_data["flag"] = read_flag(reader.bytes_string())
|
|
elif type_ == 6:
|
|
if "special_dms" not in json_data:
|
|
json_data["special_dms"] = []
|
|
json_data["special_dms"].append(reader.string())
|
|
elif type_ == 7:
|
|
json_data["check_box"] = reader.bool()
|
|
elif type_ == 8:
|
|
json_data["count"] = reader.varint()
|
|
elif type_ == 9:
|
|
if "command_dms" not in json_data:
|
|
json_data["command_dms"] = []
|
|
json_data["command_dms"].append(
|
|
read_command_danmakus(reader.bytes_string())
|
|
)
|
|
elif type_ == 10:
|
|
json_data["dm_setting"] = read_settings(reader.bytes_string())
|
|
elif type_ == 12:
|
|
json_data["image_dms"] = read_image_danmakus(reader.bytes_string())
|
|
else:
|
|
continue
|
|
return json_data
|
|
|
|
async def get_danmakus(
|
|
self,
|
|
page_index: int = 0,
|
|
date: Union[datetime.date, None] = None,
|
|
cid: Union[int, None] = None,
|
|
from_seg: Union[int, None] = None,
|
|
to_seg: Union[int, None] = None,
|
|
) -> List[Danmaku]:
|
|
"""
|
|
获取弹幕。
|
|
|
|
Args:
|
|
page_index (int, optional): 分 P 号,从 0 开始。Defaults to None
|
|
|
|
date (datetime.Date | None, optional): 指定日期后为获取历史弹幕,精确到年月日。Defaults to None.
|
|
|
|
cid (int | None, optional): 分 P 的 ID。Defaults to None
|
|
|
|
from_seg (int, optional): 从第几段开始(0 开始编号,None 为从第一段开始,一段 6 分钟). Defaults to None.
|
|
|
|
to_seg (int, optional): 到第几段结束(0 开始编号,None 为到最后一段,包含编号的段,一段 6 分钟). Defaults to None.
|
|
|
|
注意:
|
|
- 1. 段数可以使用 `get_danmaku_view()["dm_seg"]["total"]` 查询。
|
|
- 2. `from_seg` 和 `to_seg` 仅对 `date == None` 的时候有效果。
|
|
- 3. 例:取前 `12` 分钟的弹幕:`from_seg=0, to_seg=1`
|
|
|
|
Returns:
|
|
List[Danmaku]: Danmaku 类的列表。
|
|
"""
|
|
if date is not None:
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
session = get_session()
|
|
aid = self.get_aid()
|
|
params: dict[str, Any] = {"oid": cid, "type": 1, "pid": aid}
|
|
if date is not None:
|
|
|
|
api = API["danmaku"]["get_history_danmaku"]
|
|
params["date"] = date.strftime("%Y-%m-%d")
|
|
params["type"] = 1
|
|
from_seg = to_seg = 0
|
|
else:
|
|
api = API["danmaku"]["get_danmaku"]
|
|
if from_seg == None:
|
|
from_seg = 0
|
|
if to_seg == None:
|
|
view = await self.get_danmaku_view(cid=cid)
|
|
to_seg = view["dm_seg"]["total"] - 1
|
|
|
|
danmakus = []
|
|
|
|
for seg in range(from_seg, to_seg + 1):
|
|
if date is None:
|
|
|
|
params["segment_index"] = seg + 1
|
|
|
|
config = {}
|
|
config["url"] = api["url"]
|
|
config["params"] = params
|
|
config["headers"] = {
|
|
"Referer": "https://www.bilibili.com",
|
|
"User-Agent": "Mozilla/5.0",
|
|
}
|
|
config["cookies"] = self.credential.get_cookies()
|
|
|
|
try:
|
|
req = await session.get(**config)
|
|
except Exception as e:
|
|
raise NetworkException(-1, str(e))
|
|
|
|
if "content-type" not in req.headers.keys():
|
|
break
|
|
else:
|
|
content_type = req.headers["content-type"]
|
|
if content_type != "application/octet-stream":
|
|
raise ResponseException("返回数据类型错误:")
|
|
|
|
|
|
data = req.read()
|
|
if data == b"\x10\x01":
|
|
|
|
raise DanmakuClosedException()
|
|
|
|
reader = BytesReader(data)
|
|
while not reader.has_end():
|
|
type_ = reader.varint() >> 3
|
|
if type_ != 1:
|
|
if type_ == 4:
|
|
reader.bytes_string()
|
|
|
|
elif type_ == 5:
|
|
|
|
reader.varint()
|
|
reader.varint()
|
|
reader.varint()
|
|
reader.bytes_string()
|
|
elif type_ == 13:
|
|
|
|
continue
|
|
else:
|
|
raise ResponseException("解析响应数据错误")
|
|
|
|
dm = Danmaku("")
|
|
dm_pack_data = reader.bytes_string()
|
|
dm_reader = BytesReader(dm_pack_data)
|
|
|
|
while not dm_reader.has_end():
|
|
data_type = dm_reader.varint() >> 3
|
|
|
|
if data_type == 1:
|
|
dm.id_ = dm_reader.varint()
|
|
elif data_type == 2:
|
|
dm.dm_time = dm_reader.varint() / 1000
|
|
elif data_type == 3:
|
|
dm.mode = dm_reader.varint()
|
|
elif data_type == 4:
|
|
dm.font_size = dm_reader.varint()
|
|
elif data_type == 5:
|
|
color = dm_reader.varint()
|
|
if color != 60001:
|
|
color = hex(color)[2:]
|
|
else:
|
|
color = "special"
|
|
dm.color = color
|
|
elif data_type == 6:
|
|
dm.crc32_id = dm_reader.string()
|
|
elif data_type == 7:
|
|
dm.text = dm_reader.string()
|
|
elif data_type == 8:
|
|
dm.send_time = dm_reader.varint()
|
|
elif data_type == 9:
|
|
dm.weight = dm_reader.varint()
|
|
elif data_type == 10:
|
|
dm.action = str(dm_reader.string())
|
|
elif data_type == 11:
|
|
dm.pool = dm_reader.varint()
|
|
elif data_type == 12:
|
|
dm.id_str = dm_reader.string()
|
|
elif data_type == 13:
|
|
dm.attr = dm_reader.varint()
|
|
elif data_type == 14:
|
|
dm.uid = dm_reader.varint()
|
|
elif data_type == 15:
|
|
dm_reader.varint()
|
|
elif data_type == 20:
|
|
dm_reader.bytes_string()
|
|
elif data_type == 21:
|
|
dm_reader.bytes_string()
|
|
elif data_type == 22:
|
|
dm_reader.bytes_string()
|
|
elif data_type == 25:
|
|
dm_reader.varint()
|
|
elif data_type == 26:
|
|
dm_reader.varint()
|
|
else:
|
|
break
|
|
danmakus.append(dm)
|
|
return danmakus
|
|
|
|
async def get_special_dms(
|
|
self, page_index: int = 0, cid: Union[int, None] = None
|
|
) -> List[SpecialDanmaku]:
|
|
"""
|
|
获取特殊弹幕
|
|
|
|
Args:
|
|
page_index (int, optional) : 分 P 号. Defaults to 0.
|
|
|
|
cid (int | None, optional): 分 P id. Defaults to None.
|
|
|
|
Returns:
|
|
List[SpecialDanmaku]: 调用接口解析后的结果
|
|
"""
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
view = await self.get_danmaku_view(cid=cid)
|
|
special_dms = view["special_dms"][0]
|
|
if settings.proxy != "":
|
|
sess = httpx.AsyncClient(proxies={"all://": settings.proxy})
|
|
else:
|
|
sess = httpx.AsyncClient()
|
|
dm_content = await sess.get(special_dms, cookies=self.credential.get_cookies())
|
|
dm_content.raise_for_status()
|
|
reader = BytesReader(dm_content.content)
|
|
dms: List[SpecialDanmaku] = []
|
|
while not reader.has_end():
|
|
spec_dm = SpecialDanmaku("")
|
|
type_ = reader.varint() >> 3
|
|
if type_ == 1:
|
|
reader_ = BytesReader(reader.bytes_string())
|
|
while not reader_.has_end():
|
|
type__ = reader_.varint() >> 3
|
|
if type__ == 1:
|
|
spec_dm.id_ = reader_.varint()
|
|
elif type__ == 3:
|
|
spec_dm.mode = reader_.varint()
|
|
elif type__ == 4:
|
|
reader_.varint()
|
|
elif type__ == 5:
|
|
reader_.varint()
|
|
elif type__ == 6:
|
|
reader_.string()
|
|
elif type__ == 7:
|
|
spec_dm.content = reader_.string()
|
|
elif type__ == 8:
|
|
reader_.varint()
|
|
elif type__ == 11:
|
|
spec_dm.pool = reader_.varint()
|
|
elif type__ == 12:
|
|
spec_dm.id_str = reader_.string()
|
|
else:
|
|
continue
|
|
else:
|
|
continue
|
|
dms.append(spec_dm)
|
|
return dms
|
|
|
|
async def get_history_danmaku_index(
|
|
self,
|
|
page_index: Union[int, None] = None,
|
|
date: Union[datetime.date, None] = None,
|
|
cid: Union[int, None] = None,
|
|
) -> Union[None, List[str]]:
|
|
"""
|
|
获取特定月份存在历史弹幕的日期。
|
|
|
|
Args:
|
|
page_index (int | None, optional): 分 P 号,从 0 开始。Defaults to None
|
|
|
|
date (datetime.date | None): 精确到年月. Defaults to None。
|
|
|
|
cid (int | None, optional): 分 P 的 ID。Defaults to None
|
|
|
|
Returns:
|
|
None | List[str]: 调用 API 返回的结果。不存在时为 None。
|
|
"""
|
|
if date is None:
|
|
raise ArgsException("请提供 date 参数")
|
|
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
api = API["danmaku"]["get_history_danmaku_index"]
|
|
params = {"oid": cid, "month": date.strftime("%Y-%m"), "type": 1}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def has_liked_danmakus(
|
|
self,
|
|
page_index: Union[int, None] = None,
|
|
ids: Union[List[int], None] = None,
|
|
cid: Union[int, None] = None,
|
|
) -> dict:
|
|
"""
|
|
是否已点赞弹幕。
|
|
|
|
Args:
|
|
page_index (int | None, optional): 分 P 号,从 0 开始。Defaults to None
|
|
|
|
ids (List[int] | None): 要查询的弹幕 ID 列表。
|
|
|
|
cid (int | None, optional): 分 P 的 ID。Defaults to None
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
if ids is None or len(ids) == 0:
|
|
raise ArgsException("请提供 ids 参数并至少有一个元素")
|
|
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
api = API["danmaku"]["has_liked_danmaku"]
|
|
params = {"oid": cid, "ids": ",".join(ids)}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def send_danmaku(
|
|
self,
|
|
page_index: Union[int, None] = None,
|
|
danmaku: Union[Danmaku, None] = None,
|
|
cid: Union[int, None] = None,
|
|
) -> dict:
|
|
"""
|
|
发送弹幕。
|
|
|
|
Args:
|
|
page_index (int | None, optional): 分 P 号,从 0 开始。Defaults to None
|
|
|
|
danmaku (Danmaku | None) : Danmaku 类。
|
|
|
|
cid (int | None, optional): 分 P 的 ID。Defaults to None
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
|
|
if danmaku is None:
|
|
raise ArgsException("请提供 danmaku 参数")
|
|
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
api = API["danmaku"]["send_danmaku"]
|
|
|
|
if danmaku.is_sub:
|
|
pool = 1
|
|
else:
|
|
pool = 0
|
|
data = {
|
|
"type": 1,
|
|
"oid": cid,
|
|
"msg": danmaku.text,
|
|
"aid": self.get_aid(),
|
|
"bvid": self.get_bvid(),
|
|
"progress": int(danmaku.dm_time * 1000),
|
|
"color": int(danmaku.color, 16),
|
|
"fontsize": danmaku.font_size,
|
|
"pool": pool,
|
|
"mode": danmaku.mode,
|
|
"plat": 1,
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def get_danmaku_xml(
|
|
self, page_index: Union[int, None] = None, cid: Union[int, None] = None
|
|
) -> str:
|
|
"""
|
|
获取所有弹幕的 xml 源文件(非装填)
|
|
|
|
Args:
|
|
page_index (int, optional) : 分 P 序号. Defaults to 0.
|
|
|
|
cid (int | None, optional): cid. Defaults to None.
|
|
|
|
Return:
|
|
xml 文件源
|
|
"""
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
url = f"https://comment.bilibili.com/{cid}.xml"
|
|
sess = get_session()
|
|
config: dict[Any, Any] = {"url": url}
|
|
|
|
if settings.proxy:
|
|
config["proxies"] = {"all://", settings.proxy}
|
|
resp = await sess.get(**config)
|
|
return resp.content.decode("utf-8")
|
|
|
|
async def like_danmaku(
|
|
self,
|
|
page_index: Union[int, None] = None,
|
|
dmid: Union[int, None] = None,
|
|
status: Union[bool, None] = True,
|
|
cid: Union[int, None] = None,
|
|
) -> dict:
|
|
"""
|
|
点赞弹幕。
|
|
|
|
Args:
|
|
page_index (int | None, optional) : 分 P 号,从 0 开始。Defaults to None
|
|
|
|
dmid (int | None) : 弹幕 ID。
|
|
|
|
status (bool | None, optional): 点赞状态。Defaults to True
|
|
|
|
cid (int | None, optional) : 分 P 的 ID。Defaults to None
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
if dmid is None:
|
|
raise ArgsException("请提供 dmid 参数")
|
|
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
api = API["danmaku"]["like_danmaku"]
|
|
|
|
data = {
|
|
"dmid": dmid,
|
|
"oid": cid,
|
|
"op": 1 if status else 2,
|
|
"platform": "web_player",
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def get_online(self, cid: Optional[int] = None, page_index: Optional[int] = 0) -> dict:
|
|
"""
|
|
获取实时在线人数
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
api = API["info"]["online"]
|
|
params = {"aid": self.get_aid(), "bvid": self.get_bvid(),
|
|
"cid": cid if cid is not None else await self.get_cid(page_index=page_index)}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def operate_danmaku(
|
|
self,
|
|
page_index: Union[int, None] = None,
|
|
dmids: Union[List[int], None] = None,
|
|
cid: Union[int, None] = None,
|
|
type_: Union[DanmakuOperatorType, None] = None,
|
|
) -> dict:
|
|
"""
|
|
操作弹幕
|
|
|
|
Args:
|
|
page_index (int | None, optional) : 分 P 号,从 0 开始。Defaults to None
|
|
|
|
dmids (List[int] | None) : 弹幕 ID 列表。
|
|
|
|
cid (int | None, optional) : 分 P 的 ID。Defaults to None
|
|
|
|
type_ (DanmakuOperatorType | None): 操作类型
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
|
|
if dmids is None or len(dmids) == 0:
|
|
raise ArgsException("请提供 dmid 参数")
|
|
|
|
if type_ is None:
|
|
raise ArgsException("请提供 type_ 参数")
|
|
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
api = API["danmaku"]["edit_danmaku"]
|
|
|
|
data = {
|
|
"type": 1,
|
|
"dmids": ",".join(map(lambda x: str(x), dmids)),
|
|
"oid": cid,
|
|
"state": type_.value,
|
|
}
|
|
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def like(self, status: bool = True) -> dict:
|
|
"""
|
|
点赞视频。
|
|
|
|
Args:
|
|
status (bool, optional): 点赞状态。Defaults to True.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["like"]
|
|
data = {"aid": self.get_aid(), "like": 1 if status else 2}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def pay_coin(self, num: int = 1, like: bool = False) -> dict:
|
|
"""
|
|
投币。
|
|
|
|
Args:
|
|
num (int, optional) : 硬币数量,为 1 ~ 2 个。Defaults to 1.
|
|
|
|
like (bool, optional): 是否同时点赞。Defaults to False.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
if num not in (1, 2):
|
|
raise ArgsException("投币数量只能是 1 ~ 2 个。")
|
|
|
|
api = API["operate"]["coin"]
|
|
data = {
|
|
"aid": self.get_aid(),
|
|
"bvid": self.get_bvid(),
|
|
"multiply": num,
|
|
"like": 1 if like else 0,
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def share(self) -> int:
|
|
"""
|
|
分享视频
|
|
|
|
Returns:
|
|
int: 当前分享数
|
|
"""
|
|
api = API["operate"]["share"]
|
|
data = {
|
|
"bvid": self.get_bvid(),
|
|
"aid": self.get_aid(),
|
|
"csrf": self.credential.bili_jct,
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def triple(self) -> dict:
|
|
"""
|
|
给阿婆主送上一键三连
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["operate"]["yjsl"]
|
|
data = {"bvid": self.get_bvid(), "aid": self.get_aid()}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def add_tag(self, name: str) -> dict:
|
|
"""
|
|
添加标签。
|
|
|
|
Args:
|
|
name (str): 标签名字。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。会返回标签 ID。
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["add_tag"]
|
|
data = {"aid": self.get_aid(), "bvid": self.get_bvid(), "tag_name": name}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def delete_tag(self, tag_id: int) -> dict:
|
|
"""
|
|
删除标签。
|
|
|
|
Args:
|
|
tag_id (int): 标签 ID。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果。
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["del_tag"]
|
|
|
|
data = {"tag_id": tag_id, "aid": self.get_aid(), "bvid": self.get_bvid()}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def appeal(self, reason: Any, detail: str):
|
|
"""
|
|
投诉稿件
|
|
|
|
Args:
|
|
reason (Any): 投诉类型。传入 VideoAppealReasonType 中的项目即可。
|
|
|
|
detail (str): 详情信息。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["operate"]["appeal"]
|
|
data = {"aid": self.get_aid(), "desc": detail}
|
|
if isfunction(reason):
|
|
reason = reason()
|
|
if isinstance(reason, int):
|
|
reason = {"tid": reason}
|
|
data.update(reason)
|
|
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def set_favorite(
|
|
self, add_media_ids: List[int] = [], del_media_ids: List[int] = []
|
|
) -> dict:
|
|
"""
|
|
设置视频收藏状况。
|
|
|
|
Args:
|
|
add_media_ids (List[int], optional): 要添加到的收藏夹 ID. Defaults to [].
|
|
|
|
del_media_ids (List[int], optional): 要移出的收藏夹 ID. Defaults to [].
|
|
|
|
Returns:
|
|
dict: 调用 API 返回结果。
|
|
"""
|
|
if len(add_media_ids) + len(del_media_ids) == 0:
|
|
raise ArgsException("对收藏夹无修改。请至少提供 add_media_ids 和 del_media_ids 中的其中一个。")
|
|
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["favorite"]
|
|
data = {
|
|
"rid": self.get_aid(),
|
|
"type": 2,
|
|
"add_media_ids": ",".join(map(lambda x: str(x), add_media_ids)),
|
|
"del_media_ids": ",".join(map(lambda x: str(x), del_media_ids)),
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def get_subtitle(
|
|
self,
|
|
cid: Union[int, None] = None,
|
|
) -> dict:
|
|
"""
|
|
获取视频上一次播放的记录,字幕和地区信息。需要分集的 cid, 返回数据中含有json字幕的链接
|
|
|
|
Args:
|
|
cid (int | None): 分 P ID,从视频信息中获取
|
|
|
|
Returns:
|
|
调用 API 返回的结果
|
|
"""
|
|
if cid is None:
|
|
raise ArgsException("需要 cid")
|
|
|
|
return (await self.get_player_info(cid=cid)).get("subtitle")
|
|
|
|
async def get_player_info(
|
|
self,
|
|
cid: Union[int, None] = None,
|
|
epid: Union[int, None] = None,
|
|
) -> dict:
|
|
"""
|
|
获取字幕信息
|
|
|
|
Args:
|
|
cid (int | None): 分 P ID,从视频信息中获取
|
|
|
|
epid (int | None): 番剧分集 ID,从番剧信息中获取
|
|
|
|
Returns:
|
|
调用 API 返回的结果
|
|
"""
|
|
if cid is None:
|
|
raise ArgsException("需要 cid")
|
|
api = API["info"]["get_player_info"]
|
|
|
|
params = {
|
|
"aid": self.get_aid(),
|
|
"cid": cid,
|
|
}
|
|
|
|
if epid:
|
|
params["epid"] = epid
|
|
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def submit_subtitle(
|
|
self,
|
|
lan: str,
|
|
data: dict,
|
|
submit: bool,
|
|
sign: bool,
|
|
page_index: Union[int, None] = None,
|
|
cid: Union[int, None] = None,
|
|
) -> dict:
|
|
"""
|
|
上传字幕
|
|
|
|
字幕数据 data 参考:
|
|
|
|
```json
|
|
{
|
|
"font_size": "float: 字体大小,默认 0.4",
|
|
"font_color": "str: 字体颜色,默认 \"#FFFFFF\"",
|
|
"background_alpha": "float: 背景不透明度,默认 0.5",
|
|
"background_color": "str: 背景颜色,默认 \"#9C27B0\"",
|
|
"Stroke": "str: 描边,目前作用未知,默认为 \"none\"",
|
|
"body": [
|
|
{
|
|
"from": "int: 字幕开始时间(秒)",
|
|
"to": "int: 字幕结束时间(秒)",
|
|
"location": "int: 字幕位置,默认为 2",
|
|
"content": "str: 字幕内容"
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
Args:
|
|
lan (str) : 字幕语言代码,参考 https://s1.hdslb.com/bfs/subtitle/subtitle_lan.json
|
|
|
|
data (dict) : 字幕数据
|
|
|
|
submit (bool) : 是否提交,不提交为草稿
|
|
|
|
sign (bool) : 是否署名
|
|
|
|
page_index (int | None, optional): 分 P 索引. Defaults to None.
|
|
|
|
cid (int | None, optional): 分 P id. Defaults to None.
|
|
|
|
Returns:
|
|
dict: API 调用返回结果
|
|
|
|
"""
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["submit_subtitle"]
|
|
|
|
|
|
with open(
|
|
os.path.join(os.path.dirname(__file__), "data/subtitle_lan.json"),
|
|
encoding="utf-8",
|
|
) as f:
|
|
subtitle_lans = json.load(f)
|
|
for lan in subtitle_lans:
|
|
if lan["lan"] == lan:
|
|
break
|
|
else:
|
|
raise ArgsException("lan 参数错误,请参见 https://s1.hdslb.com/bfs/subtitle/subtitle_lan.json")
|
|
|
|
payload = {
|
|
"type": 1,
|
|
"oid": cid,
|
|
"lan": lan,
|
|
"data": json.dumps(data),
|
|
"submit": submit,
|
|
"sign": sign,
|
|
"bvid": self.get_bvid(),
|
|
}
|
|
|
|
return await Api(**api, credential=self.credential).update_data(**payload).result
|
|
|
|
async def get_danmaku_snapshot(self) -> dict:
|
|
"""
|
|
获取弹幕快照
|
|
|
|
Returns:
|
|
调用 API 返回的结果
|
|
"""
|
|
api = API["danmaku"]["snapshot"]
|
|
|
|
params = {"aid": self.get_aid()}
|
|
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def recall_danmaku(
|
|
self,
|
|
page_index: Union[int, None] = None,
|
|
dmid: int = 0,
|
|
cid: Union[int, None] = None,
|
|
) -> dict:
|
|
"""
|
|
撤回弹幕
|
|
|
|
Args:
|
|
page_index(int | None, optional): 分 P 号
|
|
|
|
dmid(int) : 弹幕 id
|
|
|
|
cid(int | None, optional) : 分 P 编码
|
|
Returns:
|
|
调用 API 返回的结果
|
|
"""
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["danmaku"]["recall"]
|
|
data = {"dmid": dmid, "cid": cid}
|
|
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def get_pbp(
|
|
self, page_index: Union[int, None] = None, cid: Union[int, None] = None
|
|
) -> dict:
|
|
"""
|
|
获取高能进度条
|
|
|
|
Args:
|
|
page_index(int | None): 分 P 号
|
|
|
|
cid(int | None) : 分 P 编码
|
|
|
|
Returns:
|
|
调用 API 返回的结果
|
|
"""
|
|
if cid is None:
|
|
if page_index is None:
|
|
raise ArgsException("page_index 和 cid 至少提供一个。")
|
|
|
|
cid = await self.__get_cid_by_index(page_index)
|
|
|
|
api = API["info"]["pbp"]
|
|
|
|
params = {"cid": cid}
|
|
|
|
session = get_session()
|
|
|
|
return json.loads(
|
|
(
|
|
await session.get(
|
|
api["url"], params=params, cookies=self.credential.get_cookies()
|
|
)
|
|
).text
|
|
)
|
|
|
|
async def add_to_toview(self) -> dict:
|
|
"""
|
|
添加视频至稍后再看列表
|
|
|
|
Returns:
|
|
调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
api = get_api("toview")["operate"]["add"]
|
|
datas = {
|
|
"aid": self.get_aid(),
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**datas).result
|
|
|
|
async def delete_from_toview(self) -> dict:
|
|
"""
|
|
从稍后再看列表删除视频
|
|
|
|
Returns:
|
|
调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
api = get_api("toview")["operate"]["del"]
|
|
datas = {"viewed": "false", "aid": self.get_aid()}
|
|
return await Api(**api, credential=self.credential).update_data(**datas).result
|
|
|
|
|
|
class VideoOnlineMonitor(AsyncEvent):
|
|
"""
|
|
视频在线人数实时监测。
|
|
|
|
示例代码:
|
|
|
|
```python
|
|
import asyncio
|
|
from bilibili_api import video
|
|
|
|
# 实例化
|
|
r = video.VideoOnlineMonitor("BV1Bf4y1Q7QP")
|
|
|
|
# 装饰器方法注册事件监听器
|
|
@r.on("ONLINE")
|
|
async def handler(data):
|
|
print(data)
|
|
|
|
# 函数方法注册事件监听器
|
|
async def handler2(data):
|
|
print(data)
|
|
|
|
r.add_event_listener("ONLINE", handler2)
|
|
|
|
asyncio.get_event_loop().run_until_complete(r.connect())
|
|
|
|
```
|
|
|
|
Extends: AsyncEvent
|
|
|
|
Events:
|
|
ONLINE: 在线人数更新。 CallbackData: dict。
|
|
DANMAKU: 收到实时弹幕。 CallbackData: Danmaku。
|
|
DISCONNECTED: 正常断开连接。 CallbackData: None。
|
|
ERROR: 发生错误。 CallbackData: aiohttp.ClientWebSocketResponse。
|
|
CONNECTED: 成功连接。 CallbackData: None。
|
|
"""
|
|
|
|
class Datapack(Enum):
|
|
"""
|
|
数据包类型枚举。
|
|
|
|
+ CLIENT_VERIFY : 客户端发送验证信息。
|
|
+ SERVER_VERIFY : 服务端响应验证信息。
|
|
+ CLIENT_HEARTBEAT: 客户端发送心跳包。
|
|
+ SERVER_HEARTBEAT: 服务端响应心跳包。
|
|
+ DANMAKU : 实时弹幕更新。
|
|
"""
|
|
|
|
CLIENT_VERIFY = 0x7
|
|
SERVER_VERIFY = 0x8
|
|
CLIENT_HEARTBEAT = 0x2
|
|
SERVER_HEARTBEAT = 0x3
|
|
DANMAKU = 0x3E8
|
|
|
|
def __init__(
|
|
self,
|
|
bvid: Union[str, None] = None,
|
|
aid: Union[int, None] = None,
|
|
page_index: int = 0,
|
|
credential: Union[Credential, None] = None,
|
|
debug: bool = False,
|
|
):
|
|
"""
|
|
Args:
|
|
bvid (str | None, optional) : BVID. Defaults to None.
|
|
|
|
aid (int | None, optional) : AID. Defaults to None.
|
|
|
|
page_index (int, optional) : 分 P 序号. Defaults to 0.
|
|
|
|
credential (Credential | None, optional): Credential 类. Defaults to None.
|
|
|
|
debug (bool, optional) : 调试模式,将输出更详细信息. Defaults to False.
|
|
"""
|
|
super().__init__()
|
|
self.credential = credential
|
|
self.__video = Video(bvid, aid, credential=credential)
|
|
|
|
|
|
id_showed = None
|
|
if bvid is not None:
|
|
id_showed = bvid
|
|
else:
|
|
id_showed = aid
|
|
|
|
|
|
self.logger = logging.getLogger(f"VideoOnlineMonitor-{id_showed}")
|
|
if not self.logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(
|
|
logging.Formatter(
|
|
"[" + str(id_showed) + "][%(asctime)s][%(levelname)s] %(message)s"
|
|
)
|
|
)
|
|
self.logger.addHandler(handler)
|
|
self.logger.setLevel(logging.INFO if not debug else logging.DEBUG)
|
|
|
|
self.__page_index = page_index
|
|
self.__tasks = []
|
|
|
|
async def connect(self):
|
|
"""
|
|
连接服务器
|
|
"""
|
|
await self.__main()
|
|
|
|
async def disconnect(self):
|
|
"""
|
|
断开服务器
|
|
"""
|
|
self.logger.info("主动断开连接。")
|
|
self.dispatch("DISCONNECTED")
|
|
await self.__cancel_all_tasks()
|
|
await self.__ws.close()
|
|
|
|
async def __main(self):
|
|
"""
|
|
入口。
|
|
"""
|
|
|
|
pages = await self.__video.get_pages()
|
|
if self.__page_index >= len(pages):
|
|
raise ArgsException("不存在该分 P。")
|
|
cid = pages[self.__page_index]["cid"]
|
|
|
|
|
|
self.logger.debug(f"准备连接:{self.__video.get_bvid()}")
|
|
self.logger.debug(f"获取服务器信息中...")
|
|
|
|
api = API["info"]["video_online_broadcast_servers"]
|
|
resp = await Api(**api, credential=self.credential).result
|
|
|
|
uri = f"wss://{resp['domain']}:{resp['wss_port']}/sub"
|
|
self.__heartbeat_interval = resp["heartbeat"]
|
|
self.logger.debug(f"服务器信息获取成功,URI:{uri}")
|
|
|
|
|
|
self.logger.debug("准备连接服务器...")
|
|
session = get_aiohttp_session()
|
|
async with session.ws_connect(uri) as ws:
|
|
self.__ws = ws
|
|
|
|
|
|
self.logger.debug("服务器连接成功,准备发送认证信息...")
|
|
verify_info = {
|
|
"room_id": f"video://{self.__video.get_aid()}/{cid}",
|
|
"platform": "web",
|
|
"accepts": [1000, 1015],
|
|
}
|
|
verify_info = json.dumps(verify_info, separators=(",", ":"))
|
|
await ws.send_bytes(
|
|
self.__pack(
|
|
VideoOnlineMonitor.Datapack.CLIENT_VERIFY, 1, verify_info.encode()
|
|
)
|
|
)
|
|
|
|
|
|
async for msg in ws:
|
|
if msg.type == aiohttp.WSMsgType.BINARY:
|
|
data = self.__unpack(msg.data)
|
|
self.logger.debug(f"收到消息:{data}")
|
|
await self.__handle_data(data)
|
|
|
|
elif msg.type == aiohttp.WSMsgType.ERROR:
|
|
self.logger.warning("连接被异常断开")
|
|
await self.__cancel_all_tasks()
|
|
self.dispatch("ERROR", msg)
|
|
break
|
|
|
|
async def __handle_data(self, data: List[dict]):
|
|
"""
|
|
处理数据。
|
|
|
|
Args:
|
|
data (List[dict]): 收到的数据(已解析好)。
|
|
"""
|
|
for d in data:
|
|
if d["type"] == VideoOnlineMonitor.Datapack.SERVER_VERIFY.value:
|
|
|
|
if d["data"]["code"] == 0:
|
|
|
|
heartbeat = asyncio.create_task(self.__heartbeat_task())
|
|
self.__tasks.append(heartbeat)
|
|
|
|
self.logger.info("连接服务器并验证成功")
|
|
|
|
elif d["type"] == VideoOnlineMonitor.Datapack.SERVER_HEARTBEAT.value:
|
|
|
|
self.logger.debug(f'收到服务器心跳包反馈,编号:{d["number"]}')
|
|
self.logger.info(f'实时观看人数:{d["data"]["data"]["room"]["online"]}')
|
|
self.dispatch("ONLINE", d["data"])
|
|
|
|
elif d["type"] == VideoOnlineMonitor.Datapack.DANMAKU.value:
|
|
|
|
info = d["data"][0].split(",")
|
|
text = d["data"][1]
|
|
if info[5] == "0":
|
|
is_sub = False
|
|
else:
|
|
is_sub = True
|
|
dm = Danmaku(
|
|
dm_time=float(info[0]),
|
|
send_time=int(info[4]),
|
|
crc32_id=info[6],
|
|
color=info[3],
|
|
mode=info[1],
|
|
font_size=info[2],
|
|
is_sub=is_sub,
|
|
text=text,
|
|
)
|
|
self.logger.info(f"收到实时弹幕:{dm.text}")
|
|
self.dispatch("DANMAKU", dm)
|
|
|
|
else:
|
|
|
|
self.logger.warning("收到未知的数据包类型,无法解析:" + json.dumps(d))
|
|
|
|
async def __heartbeat_task(self):
|
|
"""
|
|
心跳 Task。
|
|
"""
|
|
index = 2
|
|
while True:
|
|
self.logger.debug(f"发送心跳包,编号:{index}")
|
|
await self.__ws.send_bytes(
|
|
self.__pack(
|
|
VideoOnlineMonitor.Datapack.CLIENT_HEARTBEAT,
|
|
index,
|
|
b"[object Object]",
|
|
)
|
|
)
|
|
index += 1
|
|
await asyncio.sleep(self.__heartbeat_interval)
|
|
|
|
async def __cancel_all_tasks(self):
|
|
"""
|
|
取消所有 Task。
|
|
"""
|
|
for task in self.__tasks:
|
|
task.cancel()
|
|
|
|
@staticmethod
|
|
def __pack(data_type: Datapack, number: int, data: bytes):
|
|
"""
|
|
打包数据。
|
|
|
|
# 数据包格式:
|
|
|
|
16B 头部:
|
|
|
|
| offset(bytes) | length(bytes) | type | description |
|
|
| ------------- | ------------- | ---- | ------------------- |
|
|
| 0 | 4 | I | 数据包长度 |
|
|
| 4 | 4 | I | 固定 0x00120001 |
|
|
| 8 | 4 | I | 数据包类型 |
|
|
| 12 | 4 | I | 递增数据包编号 |
|
|
| 16 | 2 | H | 固定 0x0000 |
|
|
|
|
之后是有效载荷。
|
|
|
|
# 数据包类型表:
|
|
|
|
+ 0x7 客户端发送认证信息
|
|
+ 0x8 服务端回应认证结果
|
|
+ 0x2 客户端发送心跳包,有效载荷:'[object Object]'
|
|
+ 0x3 服务端回应心跳包,会带上在线人数等信息,返回 JSON
|
|
+ 0x3e8 实时弹幕更新,返回列表,[0]弹幕信息,[1]弹幕文本
|
|
|
|
Args:
|
|
data_type (VideoOnlineMonitor.DataType): 数据包类型枚举。
|
|
|
|
Returns:
|
|
bytes: 打包好的数据。
|
|
"""
|
|
packed_data = bytearray()
|
|
packed_data += struct.pack(">I", 0x00120001)
|
|
packed_data += struct.pack(">I", data_type.value)
|
|
packed_data += struct.pack(">I", number)
|
|
packed_data += struct.pack(">H", 0)
|
|
packed_data += data
|
|
packed_data = struct.pack(">I", len(packed_data) + 4) + packed_data
|
|
return bytes(packed_data)
|
|
|
|
@staticmethod
|
|
def __unpack(data: bytes):
|
|
"""
|
|
解包数据。
|
|
|
|
Args:
|
|
data (bytes): 原始数据。
|
|
|
|
Returns:
|
|
tuple(dict): 解包后的数据。
|
|
"""
|
|
offset = 0
|
|
real_data = []
|
|
while offset < len(data):
|
|
region_header = struct.unpack(">IIII", data[:16])
|
|
region_data = data[offset: offset + region_header[0]]
|
|
real_data.append(
|
|
{
|
|
"type": region_header[2],
|
|
"number": region_header[3],
|
|
"data": json.loads(
|
|
region_data[offset + 18: offset + 18 + (region_header[0] - 16)]
|
|
),
|
|
}
|
|
)
|
|
offset += region_header[0]
|
|
return tuple(real_data)
|
|
|
|
|
|
class VideoQuality(Enum):
|
|
"""
|
|
视频的视频流分辨率枚举
|
|
|
|
- _360P: 流畅 360P
|
|
- _480P: 清晰 480P
|
|
- _720P: 高清 720P60
|
|
- _1080P: 高清 1080P
|
|
- _1080P_PLUS: 高清 1080P 高码率
|
|
- _1080P_60: 高清 1080P 60 帧码率
|
|
- _4K: 超清 4K
|
|
- HDR: 真彩 HDR
|
|
- DOLBY: 杜比视界
|
|
- _8K: 超高清 8K
|
|
"""
|
|
|
|
_360P = 16
|
|
_480P = 32
|
|
_720P = 64
|
|
_1080P = 80
|
|
_1080P_PLUS = 112
|
|
_1080P_60 = 116
|
|
_4K = 120
|
|
HDR = 125
|
|
DOLBY = 126
|
|
_8K = 127
|
|
|
|
|
|
class VideoCodecs(Enum):
|
|
"""
|
|
视频的视频流编码枚举
|
|
|
|
- HEV: HEVC(H.265)
|
|
- AVC: AVC(H.264)
|
|
- AV1: AV1
|
|
"""
|
|
|
|
HEV = "hev"
|
|
AVC = "avc"
|
|
AV1 = "av01"
|
|
|
|
|
|
class AudioQuality(Enum):
|
|
"""
|
|
视频的音频流清晰度枚举
|
|
|
|
- _64K: 64K
|
|
- _132K: 132K
|
|
- _192K: 192K
|
|
- HI_RES: Hi-Res 无损
|
|
- DOLBY: 杜比全景声
|
|
"""
|
|
|
|
_64K = 30216
|
|
_132K = 30232
|
|
DOLBY = 30250
|
|
HI_RES = 30251
|
|
_192K = 30280
|
|
|
|
|
|
@dataclass
|
|
class VideoStreamDownloadURL:
|
|
"""
|
|
(@dataclass)
|
|
|
|
视频流 URL 类
|
|
|
|
Attributes:
|
|
url (str) : 视频流 url
|
|
video_quality (VideoQuality): 视频流清晰度
|
|
video_codecs (VideoCodecs) : 视频流编码
|
|
"""
|
|
|
|
url: str
|
|
video_quality: VideoQuality
|
|
video_codecs: VideoCodecs
|
|
|
|
|
|
@dataclass
|
|
class AudioStreamDownloadURL:
|
|
"""
|
|
(@dataclass)
|
|
|
|
音频流 URL 类
|
|
|
|
Attributes:
|
|
url (str) : 音频流 url
|
|
audio_quality (AudioQuality): 音频流清晰度
|
|
"""
|
|
|
|
url: str
|
|
audio_quality: AudioQuality
|
|
|
|
|
|
@dataclass
|
|
class FLVStreamDownloadURL:
|
|
"""
|
|
(@dataclass)
|
|
|
|
FLV 视频流
|
|
|
|
Attributes:
|
|
url (str): FLV 流 url
|
|
"""
|
|
|
|
url: str
|
|
|
|
|
|
@dataclass
|
|
class HTML5MP4DownloadURL:
|
|
"""
|
|
(@dataclass)
|
|
|
|
可供 HTML5 播放的 mp4 视频流
|
|
|
|
Attributes:
|
|
url (str): HTML5 mp4 视频流
|
|
"""
|
|
|
|
url: str
|
|
|
|
|
|
@dataclass
|
|
class EpisodeTryMP4DownloadURL:
|
|
"""
|
|
(@dataclass)
|
|
|
|
番剧/课程试看的 mp4 播放流
|
|
|
|
Attributes:
|
|
url (str): 番剧试看的 mp4 播放流
|
|
"""
|
|
|
|
url: str
|
|
|
|
|
|
class VideoDownloadURLDataDetecter:
|
|
"""
|
|
`Video.get_download_url` 返回结果解析类。
|
|
|
|
在调用 `Video.get_download_url` 之后可以将代入 `VideoDownloadURLDataDetecter`,此类将一键解析。
|
|
|
|
目前支持:
|
|
- 视频清晰度: 360P, 480P, 720P, 1080P, 1080P 高码率, 1080P 60 帧, 4K, HDR, 杜比视界, 8K
|
|
- 视频编码: HEVC(H.265), AVC(H.264), AV1
|
|
- 音频清晰度: 64K, 132K, Hi-Res 无损音效, 杜比全景声, 192K
|
|
- FLV 视频流
|
|
- 番剧/课程试看视频流
|
|
"""
|
|
|
|
def __init__(self, data: dict):
|
|
"""
|
|
Args:
|
|
data (dict): `Video.get_download_url` 返回的结果
|
|
"""
|
|
self.__data = data
|
|
|
|
def check_video_and_audio_stream(self) -> bool:
|
|
"""
|
|
判断是否为音视频分离流
|
|
|
|
Returns:
|
|
bool: 是否为音视频分离流
|
|
"""
|
|
if "dash" in self.__data.keys():
|
|
return True
|
|
return False
|
|
|
|
def check_flv_stream(self) -> bool:
|
|
"""
|
|
判断是否为 FLV 视频流
|
|
|
|
Returns:
|
|
bool: 是否为 FLV 视频流
|
|
"""
|
|
if "durl" in self.__data.keys():
|
|
if self.__data["format"].startswith("flv"):
|
|
return True
|
|
return False
|
|
|
|
def check_html5_mp4_stream(self) -> bool:
|
|
"""
|
|
判断是否为 HTML5 可播放的 mp4 视频流
|
|
|
|
Returns:
|
|
bool: 是否为 HTML5 可播放的 mp4 视频流
|
|
"""
|
|
if "durl" in self.__data.keys():
|
|
if self.__data["format"].startswith("mp4"):
|
|
if self.__data.get("is_html5") == True:
|
|
return True
|
|
return False
|
|
|
|
def check_episode_try_mp4_stream(self):
|
|
"""
|
|
判断是否为番剧/课程试看的 mp4 视频流
|
|
|
|
Returns:
|
|
bool: 是否为番剧试看的 mp4 视频流
|
|
"""
|
|
if "durl" in self.__data.keys():
|
|
if self.__data["format"].startswith("mp4"):
|
|
if self.__data.get("is_html5") != True:
|
|
return True
|
|
return False
|
|
|
|
def detect_all(self):
|
|
"""
|
|
解析并返回所有数据
|
|
|
|
Returns:
|
|
List[VideoStreamDownloadURL | AudioStreamDownloadURL | FLVStreamDownloadURL | HTML5MP4DownloadURL | EpisodeTryMP4DownloadURL]: 所有的视频/音频流
|
|
"""
|
|
return self.detect()
|
|
|
|
def detect(
|
|
self,
|
|
video_max_quality: VideoQuality = VideoQuality._8K,
|
|
audio_max_quality: AudioQuality = AudioQuality._192K,
|
|
video_min_quality: VideoQuality = VideoQuality._360P,
|
|
audio_min_quality: AudioQuality = AudioQuality._64K,
|
|
video_accepted_qualities: List[VideoQuality] = [
|
|
item
|
|
for _, item in VideoQuality.__dict__.items()
|
|
if isinstance(item, VideoQuality)
|
|
],
|
|
audio_accepted_qualities: List[AudioQuality] = [
|
|
item
|
|
for _, item in AudioQuality.__dict__.items()
|
|
if isinstance(item, AudioQuality)
|
|
],
|
|
codecs: List[VideoCodecs] = [VideoCodecs.AV1, VideoCodecs.AVC, VideoCodecs.HEV],
|
|
no_dolby_video: bool = False,
|
|
no_dolby_audio: bool = False,
|
|
no_hdr: bool = False,
|
|
no_hires: bool = False,
|
|
) -> List[
|
|
Union[
|
|
VideoStreamDownloadURL,
|
|
AudioStreamDownloadURL,
|
|
FLVStreamDownloadURL,
|
|
HTML5MP4DownloadURL,
|
|
EpisodeTryMP4DownloadURL,
|
|
]
|
|
]:
|
|
"""
|
|
解析数据
|
|
|
|
Args:
|
|
**以下参数仅能在音视频流分离的情况下产生作用,flv / mp4 试看流 / html5 mp4 流下以下参数均没有作用**
|
|
|
|
video_max_quality (VideoQuality, optional) : 设置提取的视频流清晰度最大值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._8K.
|
|
|
|
audio_max_quality (AudioQuality, optional) : 设置提取的音频流清晰度最大值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._192K.
|
|
|
|
video_min_quality (VideoQuality, optional) : 设置提取的视频流清晰度最小值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._360P.
|
|
|
|
audio_min_quality (AudioQuality, optional) : 设置提取的音频流清晰度最小值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._64K.
|
|
|
|
video_accepted_qualities(List[VideoQuality], optional): 设置允许的所有视频流清晰度. Defaults to ALL.
|
|
|
|
audio_accepted_qualities(List[AudioQuality], optional): 设置允许的所有音频清晰度. Defaults to ALL.
|
|
|
|
codecs (List[VideoCodecs], optional) : 设置所有允许提取出来的视频编码. 此项不会忽略 HDR/杜比. Defaults to ALL codecs.
|
|
|
|
no_dolby_video (bool, optional) : 是否禁止提取杜比视界视频流. Defaults to False.
|
|
|
|
no_dolby_audio (bool, optional) : 是否禁止提取杜比全景声音频流. Defaults to False.
|
|
|
|
no_hdr (bool, optional) : 是否禁止提取 HDR 视频流. Defaults to False.
|
|
|
|
no_hires (bool, optional) : 是否禁止提取 Hi-Res 音频流. Defaults to False.
|
|
|
|
Returns:
|
|
List[VideoStreamDownloadURL | AudioStreamDownloadURL | FLVStreamDownloadURL | HTML5MP4DownloadURL | EpisodeTryMP4DownloadURL]: 提取出来的视频/音频流
|
|
"""
|
|
if "durl" in self.__data.keys():
|
|
if self.__data["format"].startswith("flv"):
|
|
|
|
return [FLVStreamDownloadURL(url=self.__data["durl"][0]["url"])]
|
|
else:
|
|
if self.check_html5_mp4_stream():
|
|
|
|
return [HTML5MP4DownloadURL(url=self.__data["durl"][0]["url"])]
|
|
else:
|
|
|
|
return [EpisodeTryMP4DownloadURL(url=self.__data["durl"][0]["url"])]
|
|
else:
|
|
|
|
streams = []
|
|
videos_data = self.__data["dash"]["video"]
|
|
audios_data = self.__data["dash"]["audio"]
|
|
flac_data = self.__data["dash"]["flac"]
|
|
dolby_data = self.__data["dash"]["dolby"]
|
|
for video_data in videos_data:
|
|
video_stream_url = video_data["baseUrl"]
|
|
video_stream_quality = VideoQuality(video_data["id"])
|
|
if video_stream_quality == VideoQuality.HDR and no_hdr:
|
|
continue
|
|
if video_stream_quality == VideoQuality.DOLBY and no_dolby_video:
|
|
continue
|
|
if (
|
|
video_stream_quality != VideoQuality.DOLBY
|
|
and video_stream_quality != VideoQuality.HDR
|
|
and video_stream_quality.value > video_max_quality.value
|
|
):
|
|
continue
|
|
if (
|
|
video_stream_quality != VideoQuality.DOLBY
|
|
and video_stream_quality != VideoQuality.HDR
|
|
and video_stream_quality.value < video_min_quality.value
|
|
):
|
|
continue
|
|
if (
|
|
video_stream_quality != VideoQuality.DOLBY
|
|
and video_stream_quality != VideoQuality.HDR
|
|
and (not video_stream_quality in video_accepted_qualities)
|
|
):
|
|
continue
|
|
video_stream_codecs = None
|
|
for val in VideoCodecs:
|
|
if val.value in video_data["codecs"]:
|
|
video_stream_codecs = val
|
|
if (not video_stream_codecs in codecs) and (
|
|
video_stream_codecs != None
|
|
):
|
|
continue
|
|
video_stream = VideoStreamDownloadURL(
|
|
url=video_stream_url,
|
|
video_quality=video_stream_quality,
|
|
video_codecs=video_stream_codecs,
|
|
)
|
|
streams.append(video_stream)
|
|
if audios_data:
|
|
for audio_data in audios_data:
|
|
audio_stream_url = audio_data["baseUrl"]
|
|
audio_stream_quality = AudioQuality(audio_data["id"])
|
|
if audio_stream_quality.value > audio_max_quality.value:
|
|
continue
|
|
if audio_stream_quality.value < audio_min_quality.value:
|
|
continue
|
|
if not audio_stream_quality in audio_accepted_qualities:
|
|
continue
|
|
audio_stream = AudioStreamDownloadURL(
|
|
url=audio_stream_url, audio_quality=audio_stream_quality
|
|
)
|
|
streams.append(audio_stream)
|
|
if flac_data and (not no_hires):
|
|
if flac_data["audio"]:
|
|
flac_stream_url = flac_data["audio"]["baseUrl"]
|
|
flac_stream_quality = AudioQuality(flac_data["audio"]["id"])
|
|
flac_stream = AudioStreamDownloadURL(
|
|
url=flac_stream_url, audio_quality=flac_stream_quality
|
|
)
|
|
streams.append(flac_stream)
|
|
if dolby_data and (not no_dolby_audio):
|
|
if dolby_data["audio"]:
|
|
dolby_stream_data = dolby_data["audio"][0]
|
|
dolby_stream_url = dolby_stream_data["baseUrl"]
|
|
dolby_stream_quality = AudioQuality(dolby_stream_data["id"])
|
|
dolby_stream = AudioStreamDownloadURL(
|
|
url=dolby_stream_url, audio_quality=dolby_stream_quality
|
|
)
|
|
streams.append(dolby_stream)
|
|
return streams
|
|
|
|
def detect_best_streams(
|
|
self,
|
|
video_max_quality: VideoQuality = VideoQuality._8K,
|
|
audio_max_quality: AudioQuality = AudioQuality._192K,
|
|
video_min_quality: VideoQuality = VideoQuality._360P,
|
|
audio_min_quality: AudioQuality = AudioQuality._64K,
|
|
video_accepted_qualities: List[VideoQuality] = [
|
|
item
|
|
for _, item in VideoQuality.__dict__.items()
|
|
if isinstance(item, VideoQuality)
|
|
],
|
|
audio_accepted_qualities: List[AudioQuality] = [
|
|
item
|
|
for _, item in AudioQuality.__dict__.items()
|
|
if isinstance(item, AudioQuality)
|
|
],
|
|
codecs: List[VideoCodecs] = [VideoCodecs.AV1, VideoCodecs.AVC, VideoCodecs.HEV],
|
|
no_dolby_video: bool = False,
|
|
no_dolby_audio: bool = False,
|
|
no_hdr: bool = False,
|
|
no_hires: bool = False,
|
|
) -> Union[
|
|
List[FLVStreamDownloadURL],
|
|
List[HTML5MP4DownloadURL],
|
|
List[EpisodeTryMP4DownloadURL],
|
|
List[Union[VideoStreamDownloadURL, AudioStreamDownloadURL, None]],
|
|
]:
|
|
"""
|
|
提取出分辨率、音质等信息最好的音视频流。
|
|
|
|
Args:
|
|
**以下参数仅能在音视频流分离的情况下产生作用,flv / mp4 试看流 / html5 mp4 流下以下参数均没有作用**
|
|
|
|
video_max_quality (VideoQuality) : 设置提取的视频流清晰度最大值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._8K.
|
|
|
|
audio_max_quality (AudioQuality) : 设置提取的音频流清晰度最大值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._192K.
|
|
|
|
video_min_quality (VideoQuality, optional) : 设置提取的视频流清晰度最小值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._360P.
|
|
|
|
audio_min_quality (AudioQuality, optional) : 设置提取的音频流清晰度最小值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._64K.
|
|
|
|
video_accepted_qualities(List[VideoQuality], optional): 设置允许的所有视频流清晰度. Defaults to ALL.
|
|
|
|
audio_accepted_qualities(List[AudioQuality], optional): 设置允许的所有音频清晰度. Defaults to ALL.
|
|
|
|
codecs (List[VideoCodecs]) : 设置所有允许提取出来的视频编码. 在数组中越靠前的编码选择优先级越高. 此项不会忽略 HDR/杜比. Defaults to [VideoCodecs.AV1, VideoCodecs.AVC, VideoCodecs.HEV].
|
|
|
|
no_dolby_video (bool) : 是否禁止提取杜比视界视频流. Defaults to False.
|
|
|
|
no_dolby_audio (bool) : 是否禁止提取杜比全景声音频流. Defaults to False.
|
|
|
|
no_hdr (bool) : 是否禁止提取 HDR 视频流. Defaults to False.
|
|
|
|
no_hires (bool) : 是否禁止提取 Hi-Res 音频流. Defaults to False.
|
|
|
|
Returns:
|
|
List[VideoStreamDownloadURL | AudioStreamDownloadURL | FLVStreamDownloadURL | HTML5MP4DownloadURL | None]: FLV 视频流 / HTML5 MP4 视频流 / 番剧或课程试看 MP4 视频流返回 `[FLVStreamDownloadURL | HTML5MP4StreamDownloadURL | EpisodeTryMP4DownloadURL]`, 否则为 `[VideoStreamDownloadURL, AudioStreamDownloadURL]`, 如果未匹配上任何合适的流则对应的位置位 `None`
|
|
"""
|
|
if self.check_flv_stream():
|
|
return self.detect_all()
|
|
elif self.check_html5_mp4_stream():
|
|
return self.detect_all()
|
|
elif self.check_episode_try_mp4_stream():
|
|
return self.detect_all()
|
|
else:
|
|
data = self.detect(
|
|
video_max_quality=video_max_quality,
|
|
audio_max_quality=audio_max_quality,
|
|
video_min_quality=video_min_quality,
|
|
audio_min_quality=audio_min_quality,
|
|
video_accepted_qualities=video_accepted_qualities,
|
|
audio_accepted_qualities=audio_accepted_qualities,
|
|
codecs=codecs,
|
|
)
|
|
video_streams = []
|
|
audio_streams = []
|
|
for stream in data:
|
|
if isinstance(stream, VideoStreamDownloadURL):
|
|
video_streams.append(stream)
|
|
if isinstance(stream, AudioStreamDownloadURL):
|
|
audio_streams.append(stream)
|
|
|
|
def video_stream_cmp(
|
|
s1: VideoStreamDownloadURL, s2: VideoStreamDownloadURL
|
|
):
|
|
|
|
if s1.video_quality == VideoQuality.DOLBY and (not no_dolby_video):
|
|
return 1
|
|
elif s2.video_quality == VideoQuality.DOLBY and (not no_dolby_video):
|
|
return -1
|
|
elif s1.video_quality == VideoQuality.HDR and (not no_hdr):
|
|
return 1
|
|
elif s2.video_quality == VideoQuality.HDR and (not no_hdr):
|
|
return -1
|
|
if s1.video_quality.value != s2.video_quality.value:
|
|
return s1.video_quality.value - s2.video_quality.value
|
|
|
|
elif s1.video_codecs.value != s2.video_codecs.value:
|
|
return codecs.index(s2.video_codecs) - codecs.index(s1.video_codecs)
|
|
return -1
|
|
|
|
def audio_stream_cmp(
|
|
s1: AudioStreamDownloadURL, s2: AudioStreamDownloadURL
|
|
):
|
|
|
|
if s1.audio_quality == AudioQuality.DOLBY and (not no_dolby_audio):
|
|
return 1
|
|
if s2.audio_quality == AudioQuality.DOLBY and (not no_dolby_audio):
|
|
return -1
|
|
if s1.audio_quality == AudioQuality.HI_RES and (not no_hires):
|
|
return 1
|
|
if s2.audio_quality == AudioQuality.HI_RES and (not no_hires):
|
|
return -1
|
|
return s1.audio_quality.value - s2.audio_quality.value
|
|
|
|
video_streams.sort(key=cmp_to_key(video_stream_cmp), reverse=True)
|
|
audio_streams.sort(key=cmp_to_key(audio_stream_cmp), reverse=True)
|
|
if len(video_streams) == 0:
|
|
video_streams = [None]
|
|
if len(audio_streams) == 0:
|
|
audio_streams = [None]
|
|
return [video_streams[0], audio_streams[0]]
|
|
|