rogerxavier's picture
Upload 258 files
0aee47a verified
raw
history blame
26.3 kB
"""
bilibili_api.cheese
有关 bilibili 课程的 api。
注意,注意!课程中的视频和其他视频几乎没有任何相通的 API!
不能将 CheeseVideo 换成 Video 类。(CheeseVideo 类保留了所有的通用的 API)
获取下载链接需要使用 bilibili_api.cheese.get_download_url,video.get_download_url 不适用。
还有,课程的 season_id 和 ep_id 不与番剧相通,井水不犯河水,请不要错用!
"""
import json
import datetime
from typing import Any, List, Union
from . import settings
from .utils.utils import get_api
from .utils.danmaku import Danmaku
from .utils.credential import Credential
from .utils.BytesReader import BytesReader
from .exceptions.ArgsException import ArgsException
from .utils.network import Api, get_session
from .exceptions import NetworkException, ResponseException, DanmakuClosedException
API = get_api("cheese")
API_video = get_api("video")
cheese_video_meta_cache = {}
class CheeseList:
"""
课程类
Attributes:
credential (Credential): 凭据类
"""
def __init__(
self,
season_id: int = -1,
ep_id: int = -1,
credential: Union[Credential, None] = None,
):
"""
Args:
season_id (int) : ssid
ep_id (int) : 单集 ep_id
credential (Credential): 凭据类
注意:season_id 和 ep_id 任选一个即可,两个都选的话
以 season_id 为主
"""
if (season_id == -1) and (ep_id == -1):
raise ValueError("season id 和 ep id 必须选一个")
self.__season_id = season_id
self.__ep_id = ep_id
self.credential = credential if credential else Credential()
if self.__season_id == -1:
# self.season_id = str(sync(self.get_meta())["season_id"])
api = API["info"]["meta"]
params = {"season_id": self.__season_id, "ep_id": self.__ep_id}
meta = Api(**api, credential=self.credential).update_params(**params).result_sync
self.__season_id = int(meta["season_id"])
def set_season_id(self, season_id: int) -> None:
self.__init__(season_id=season_id)
def set_ep_id(self, ep_id: int) -> None:
self.__init__(ep_id=ep_id)
def get_season_id(self) -> int:
return self.__season_id
async def get_meta(self) -> dict:
"""
获取教程元数据
Returns:
调用 API 所得的结果。
"""
api = API["info"]["meta"]
params = {"season_id": self.__season_id, "ep_id": self.__ep_id}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_list_raw(self):
"""
获取教程所有视频 (返回原始数据)
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["list"]
params = {"season_id": self.__season_id, "pn": 1, "ps": 1000}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_list(self) -> List["CheeseVideo"]:
"""
获取教程所有视频
Returns:
List[CheeseVideo]: 课程视频列表
"""
global cheese_video_meta_cache
api = API["info"]["list"]
params = {"season_id": self.__season_id, "pn": 1, "ps": 1000}
lists = (
await Api(**api, credential=self.credential).update_params(**params).result
)
cheese_videos = []
for c in lists["items"]:
c["ssid"] = self.get_season_id()
cheese_video_meta_cache[c["id"]] = c
cheese_videos.append(CheeseVideo(c["id"], self.credential))
return cheese_videos
class CheeseVideo:
"""
教程视频类
因为不和其他视频相通,所以这里是一个新的类,无继承
Attributes:
credential (Credential): 凭据类
cheese (CheeseList): 所属的课程
"""
def __init__(self, epid, credential: Union[Credential, None] = None):
"""
Args:
epid (int) : 单集 ep_id
credential (Credential): 凭据类
"""
global cheese_video_meta_cache
self.__epid = epid
meta = cheese_video_meta_cache.get(epid)
if meta == None:
self.cheese = CheeseList(ep_id=self.__epid)
else:
self.cheese = CheeseList(season_id=meta["ssid"])
self.credential = credential if credential else Credential()
if meta == None:
api = API["info"]["meta"]
params = {"season_id": self.cheese.get_season_id(), "ep_id": self.__epid}
metadata = Api(**api).update_params(**params).result_sync
for v in metadata["episodes"]:
if v["id"] == epid:
self.__aid = v["aid"]
self.__cid = v["cid"]
self.__meta = v
else:
self.__meta = meta
self.__aid = meta["aid"]
self.__cid = meta["cid"]
def get_aid(self) -> int:
return self.__aid
def get_cid(self) -> int:
return self.__cid
def get_meta(self) -> dict:
"""
获取课程元数据
Returns:
dict: 视频元数据
"""
return self.__meta
def get_cheese(self) -> "CheeseList":
"""
获取所属课程
Returns:
CheeseList: 所属课程
"""
return self.cheese
def set_epid(self, epid: int) -> None:
self.__init__(epid, self.credential)
def get_epid(self) -> int:
return self.__epid
async def get_download_url(self) -> dict:
"""
获取下载链接
Returns:
dict: 调用 API 返回的结果。
"""
api = API["info"]["playurl"]
params = {
"avid": self.get_aid(),
"ep_id": self.get_epid(),
"cid": self.get_cid(),
"qn": 127,
"fnval": 4048,
"fourk": 1,
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_stat(self) -> dict:
"""
获取视频统计数据(播放量,点赞数等)。
Returns:
dict: 调用 API 返回的结果。
"""
api = API_video["info"]["stat"]
params = {"aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_pages(self) -> dict:
"""
获取分 P 信息。
Returns:
dict: 调用 API 返回的结果。
"""
api = API_video["info"]["pages"]
params = {"aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_danmaku_view(self) -> dict:
"""
获取弹幕设置、特殊弹幕、弹幕数量、弹幕分段等信息。
Returns:
dict: 调用 API 返回的结果。
"""
cid = self.get_cid()
session = get_session()
api = API_video["danmaku"]["view"]
config = {}
config["url"] = api["url"]
config["params"] = {"type": 1, "oid": cid, "pid": self.get_aid()}
config["cookies"] = self.credential.get_cookies()
config["headers"] = {
"Referer": "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0",
}
try:
resp = await session.get(**config)
except Exception as e:
raise NetworkException(-1, str(e))
resp_data = resp.read()
json_data = {}
reader = BytesReader(resp_data)
# 解析二进制数据流
def read_dm_seg(stream: bytes):
reader_ = BytesReader(stream)
data = {}
while not reader_.has_end():
t = reader_.varint() >> 3
if t == 1:
data["page_size"] = reader_.varint()
elif t == 2:
data["total"] = reader_.varint()
else:
continue
return data
def read_flag(stream: bytes):
reader_ = BytesReader(stream)
data = {}
while not reader_.has_end():
t = reader_.varint() >> 3
if t == 1:
data["rec_flag"] = reader_.varint()
elif t == 2:
data["rec_text"] = reader_.string()
elif t == 3:
data["rec_switch"] = reader_.varint()
else:
continue
return data
def read_command_danmakus(stream: bytes):
reader_ = BytesReader(stream)
data = {}
while not reader_.has_end():
t = reader_.varint() >> 3
if t == 1:
data["id"] = reader_.varint()
elif t == 2:
data["oid"] = reader_.varint()
elif t == 3:
data["mid"] = reader_.varint()
elif t == 4:
data["commend"] = reader_.string()
elif t == 5:
data["content"] = reader_.string()
elif t == 6:
data["progress"] = reader_.varint()
elif t == 7:
data["ctime"] = reader_.string()
elif t == 8:
data["mtime"] = reader_.string()
elif t == 9:
data["extra"] = json.loads(reader_.string())
elif t == 10:
data["id_str"] = reader_.string()
else:
continue
return data
def read_settings(stream: bytes):
reader_ = BytesReader(stream)
data = {}
while not reader_.has_end():
t = reader_.varint() >> 3
if t == 1:
data["dm_switch"] = reader_.bool()
elif t == 2:
data["ai_switch"] = reader_.bool()
elif t == 3:
data["ai_level"] = reader_.varint()
elif t == 4:
data["enable_top"] = reader_.bool()
elif t == 5:
data["enable_scroll"] = reader_.bool()
elif t == 6:
data["enable_bottom"] = reader_.bool()
elif t == 7:
data["enable_color"] = reader_.bool()
elif t == 8:
data["enable_special"] = reader_.bool()
elif t == 9:
data["prevent_shade"] = reader_.bool()
elif t == 10:
data["dmask"] = reader_.bool()
elif t == 11:
data["opacity"] = reader_.float(True)
elif t == 12:
data["dm_area"] = reader_.varint()
elif t == 13:
data["speed_plus"] = reader_.float(True)
elif t == 14:
data["font_size"] = reader_.float(True)
elif t == 15:
data["screen_sync"] = reader_.bool()
elif t == 16:
data["speed_sync"] = reader_.bool()
elif t == 17:
data["font_family"] = reader_.string()
elif t == 18:
data["bold"] = reader_.bool()
elif t == 19:
data["font_border"] = reader_.varint()
elif t == 20:
data["draw_type"] = reader_.string()
else:
continue
return data
def read_image_danmakus(string: bytes):
image_list = []
reader_ = BytesReader(string)
while not reader_.has_end():
type_ = reader_.varint() >> 3
if type_ == 1:
details_dict = {}
details_dict["texts"] = []
img_details = reader_.bytes_string()
reader_details = BytesReader(img_details)
while not reader_details.has_end():
type_details = reader_details.varint() >> 3
if type_details == 1:
details_dict["texts"].append(reader_details.string())
elif type_details == 2:
details_dict["image"] = reader_details.string()
elif type_details == 3:
id_string = reader_details.bytes_string()
id_reader = BytesReader(id_string)
while not id_reader.has_end():
type_id = id_reader.varint() >> 3
if type_id == 2:
details_dict["id"] = id_reader.varint()
else:
raise ResponseException("解析响应数据错误")
image_list.append(details_dict)
else:
raise ResponseException("解析响应数据错误")
return image_list
while not reader.has_end():
type_ = reader.varint() >> 3
if type_ == 1:
json_data["state"] = reader.varint()
elif type_ == 2:
json_data["text"] = reader.string()
elif type_ == 3:
json_data["text_side"] = reader.string()
elif type_ == 4:
json_data["dm_seg"] = read_dm_seg(reader.bytes_string())
elif type_ == 5:
json_data["flag"] = read_flag(reader.bytes_string())
elif type_ == 6:
if "special_dms" not in json_data:
json_data["special_dms"] = []
json_data["special_dms"].append(reader.string())
elif type_ == 7:
json_data["check_box"] = reader.bool()
elif type_ == 8:
json_data["count"] = reader.varint()
elif type_ == 9:
if "command_dms" not in json_data:
json_data["command_dms"] = []
json_data["command_dms"].append(
read_command_danmakus(reader.bytes_string())
)
elif type_ == 10:
json_data["dm_setting"] = read_settings(reader.bytes_string())
elif type_ == 12:
json_data["image_dms"] = read_image_danmakus(reader.bytes_string())
else:
continue
return json_data
async def get_danmakus(self, date: Union[datetime.date, None] = None):
"""
获取弹幕。
Args:
date (datetime.Date | None, optional): 指定日期后为获取历史弹幕,精确到年月日。Defaults to None.
Returns:
List[Danmaku]: Danmaku 类的列表。
"""
if date is not None:
self.credential.raise_for_no_sessdata()
# self.credential.raise_for_no_sessdata()
session = get_session()
aid = self.get_aid()
params: dict[str, Any] = {"oid": self.get_cid(), "type": 1, "pid": aid}
if date is not None:
# 获取历史弹幕
api = API_video["danmaku"]["get_history_danmaku"]
params["date"] = date.strftime("%Y-%m-%d")
params["type"] = 1
all_seg = 1
else:
api = API_video["danmaku"]["get_danmaku"]
view = await self.get_danmaku_view()
all_seg = view["dm_seg"]["total"]
danmakus = []
for seg in range(all_seg):
if date is None:
# 仅当获取当前弹幕时需要该参数
params["segment_index"] = seg + 1
config = {}
config["url"] = api["url"]
config["params"] = params
config["headers"] = {
"Referer": "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0",
}
config["cookies"] = self.credential.get_cookies()
try:
req = await session.get(**config)
except Exception as e:
raise NetworkException(-1, str(e))
if "content-type" not in req.headers.keys():
break
else:
content_type = req.headers["content-type"]
if content_type != "application/octet-stream":
raise ResponseException("返回数据类型错误:")
# 解析二进制流数据
data = req.read()
if data == b"\x10\x01":
# 视频弹幕被关闭
raise DanmakuClosedException()
reader = BytesReader(data)
while not reader.has_end():
type_ = reader.varint() >> 3
if type_ != 1:
if type_ == 4:
reader.bytes_string()
# 什么鬼?我用 protoc 解析出乱码!
elif type_ == 5:
# 大会员专属颜色
reader.varint()
reader.varint()
reader.varint()
reader.bytes_string()
elif type_ == 13:
# ???
continue
else:
raise ResponseException("解析响应数据错误")
dm = Danmaku("")
dm_pack_data = reader.bytes_string()
dm_reader = BytesReader(dm_pack_data)
while not dm_reader.has_end():
data_type = dm_reader.varint() >> 3
if data_type == 1:
dm.id_ = dm_reader.varint()
elif data_type == 2:
dm.dm_time = dm_reader.varint() / 1000
elif data_type == 3:
dm.mode = dm_reader.varint()
elif data_type == 4:
dm.font_size = dm_reader.varint()
elif data_type == 5:
dm.color = hex(dm_reader.varint())[2:]
elif data_type == 6:
dm.crc32_id = dm_reader.string()
elif data_type == 7:
dm.text = dm_reader.string()
elif data_type == 8:
dm.send_time = dm_reader.varint()
elif data_type == 9:
dm.weight = dm_reader.varint()
elif data_type == 10:
dm.action = str(dm_reader.varint())
elif data_type == 11:
dm.pool = dm_reader.varint()
elif data_type == 12:
dm.id_str = dm_reader.string()
elif data_type == 13:
dm.attr = dm_reader.varint()
elif data_type == 14:
dm.uid = dm_reader.varint()
elif data_type == 15:
dm_reader.varint()
elif data_type == 20:
dm_reader.bytes_string()
elif data_type == 21:
dm_reader.bytes_string()
else:
break
danmakus.append(dm)
return danmakus
async def get_pbp(self):
"""
获取高能进度条
Returns:
调用 API 返回的结果
"""
cid = self.get_cid()
api = API_video["info"]["pbp"]
params = {"cid": cid}
session = get_session()
return json.loads(
(
await session.get(
api["url"], params=params, cookies=self.credential.get_cookies()
)
).text
)
async def send_danmaku(self, danmaku: Union[Danmaku, None] = None):
"""
发送弹幕。
Args:
danmaku (Danmaku | None): Danmaku 类。Defaults to None.
Returns:
dict: 调用 API 返回的结果。
"""
danmaku = danmaku if danmaku else Danmaku("")
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API_video["danmaku"]["send_danmaku"]
if danmaku.is_sub:
pool = 1
else:
pool = 0
data = {
"type": 1,
"oid": self.get_cid(),
"msg": danmaku.text,
"aid": self.get_aid(),
"progress": int(danmaku.dm_time * 1000),
"color": int(danmaku.color, 16),
"fontsize": danmaku.font_size,
"pool": pool,
"mode": danmaku.mode,
"plat": 1,
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def has_liked(self):
"""
视频是否点赞过。
Returns:
bool: 视频是否点赞过。
"""
self.credential.raise_for_no_sessdata()
api = API_video["info"]["has_liked"]
params = {"aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
== 1
)
async def get_pay_coins(self):
"""
获取视频已投币数量。
Returns:
int: 视频已投币数量。
"""
self.credential.raise_for_no_sessdata()
api = API_video["info"]["get_pay_coins"]
params = {"aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)["multiply"]
async def has_favoured(self):
"""
是否已收藏。
Returns:
bool: 视频是否已收藏。
"""
self.credential.raise_for_no_sessdata()
api = API_video["info"]["has_favoured"]
params = {"aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)["favoured"]
async def like(self, status: bool = True):
"""
点赞视频。
Args:
status (bool, optional): 点赞状态。Defaults to True.
Returns:
dict: 调用 API 返回的结果。
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API_video["operate"]["like"]
data = {"aid": self.get_aid(), "like": 1 if status else 2}
return await Api(**api, credential=self.credential).update_data(**data).result
async def pay_coin(self, num: int = 1, like: bool = False):
"""
投币。
Args:
num (int, optional) : 硬币数量,为 1 ~ 2 个。Defaults to 1.
like (bool, optional): 是否同时点赞。Defaults to False.
Returns:
dict: 调用 API 返回的结果。
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
if num not in (1, 2):
raise ArgsException("投币数量只能是 1 ~ 2 个。")
api = API_video["operate"]["coin"]
data = {
"aid": self.get_aid(),
"multiply": num,
"like": 1 if like else 0,
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def set_favorite(
self, add_media_ids: List[int] = [], del_media_ids: List[int] = []
):
"""
设置视频收藏状况。
Args:
add_media_ids (List[int], optional): 要添加到的收藏夹 ID. Defaults to [].
del_media_ids (List[int], optional): 要移出的收藏夹 ID. Defaults to [].
Returns:
dict: 调用 API 返回结果。
"""
if len(add_media_ids) + len(del_media_ids) == 0:
raise ArgsException("对收藏夹无修改。请至少提供 add_media_ids 和 del_media_ids 中的其中一个。")
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API_video["operate"]["favorite"]
data = {
"rid": self.get_aid(),
"type": 2,
"add_media_ids": ",".join(map(lambda x: str(x), add_media_ids)),
"del_media_ids": ",".join(map(lambda x: str(x), del_media_ids)),
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def get_danmaku_xml(self):
"""
获取弹幕(xml 源)
Returns:
str: xml 文件源
"""
url = f"https://comment.bilibili.com/{self.get_cid()}.xml"
sess = get_session()
config: dict[str, Any] = {"url": url}
# 代理
if settings.proxy:
config["proxies"] = {"all://", settings.proxy}
resp = await sess.get(**config)
return resp.content.decode("utf-8")