rogerxavier's picture
Upload 258 files
0aee47a verified
raw
history blame
46.3 kB
r"""
bilibili_api.live
直播相关
"""
import json
import time
import base64
import struct
import asyncio
import logging
from enum import Enum
from typing import Any, List, Union
import brotli
import aiohttp
from aiohttp.client_ws import ClientWebSocketResponse
from .utils.utils import get_api, raise_for_statement
from .utils.danmaku import Danmaku
from .utils.network import get_aiohttp_session, Api, HEADERS
from .utils.AsyncEvent import AsyncEvent
from .utils.credential import Credential
from .exceptions.LiveException import LiveException
API = get_api("live")
class ScreenResolution(Enum):
"""
直播源清晰度。
清晰度编号,4K 20000,原画 10000,蓝光(杜比)401,蓝光 400,超清 250,高清 150,流畅 80
+ FOUR_K : 4K。
+ ORIGINAL : 原画。
+ BLU_RAY_DOLBY : 蓝光(杜比)。
+ BLU_RAY : 蓝光。
+ ULTRA_HD : 超清。
+ HD : 高清。
+ FLUENCY : 流畅。
"""
FOUR_K = 20000
ORIGINAL = 10000
BLU_RAY_DOLBY = 401
BLU_RAY = 400
ULTRA_HD = 250
HD = 150
FLUENCY = 80
class LiveProtocol(Enum):
"""
直播源流协议。
流协议,0 为 FLV 流,1 为 HLS 流。默认:0,1
+ FLV : 0。
+ HLS : 1。
+ DEFAULT : 0,1
"""
FLV = 0
HLS = 1
DEFAULT = "0,1"
class LiveFormat(Enum):
"""
直播源容器格式
容器格式,0 为 flv 格式;1 为 ts 格式(仅限 hls 流);2 为 fmp4 格式(仅限 hls 流)。默认:0,2
+ FLV : 0。
+ TS : 1。
+ FMP4 : 2。
+ DEFAULT : 2。
"""
FLV = 0
TS = 1
FMP4 = 2
DEFAULT = "0,1,2"
class LiveCodec(Enum):
"""
直播源视频编码
视频编码,0 为 avc 编码,1 为 hevc 编码。默认:0,1
+ AVC : 0。
+ HEVC : 1。
+ DEFAULT : 0,1。
"""
AVC = 0
HEVC = 1
DEFAULT = "0,1"
class LiveRoom:
"""
直播类,获取各种直播间的操作均在里边。
AttributesL
credential (Credential): 凭据类
room_display_id (int) : 房间展示 id
"""
def __init__(
self, room_display_id: int, credential: Union[Credential, None] = None
):
"""
Args:
room_display_id (int) : 房间展示 ID(即 URL 中的 ID)
credential (Credential, optional): 凭据. Defaults to None.
"""
self.room_display_id = room_display_id
if credential is None:
self.credential = Credential()
else:
self.credential = credential
self.__ruid = None
async def start(self, area_id: int) -> dict:
"""
开始直播
Args:
area_id (int): 直播分区id(子分区id)。可使用 live_area 模块查询。
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["start"]
params = {
"area_v2": area_id,
"room_id": self.room_display_id,
"platform": "pc",
}
resp = (
await Api(**api, credential=self.credential).update_params(**params).result
)
return resp
async def stop(self) -> dict:
"""
关闭直播
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["stop"]
params = {
"room_id": self.room_display_id,
}
resp = (
await Api(**api, credential=self.credential).update_params(**params).result
)
return resp
async def get_room_play_info(self) -> dict:
"""
获取房间信息(真实房间号,封禁情况等)
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["room_play_info"]
params = {
"room_id": self.room_display_id,
}
resp = (
await Api(**api, credential=self.credential).update_params(**params).result
)
# 缓存真实房间 ID
self.__ruid = resp["uid"]
return resp
async def get_room_id(self) -> int:
return (await self.get_room_play_info())["room_id"]
async def __get_ruid(self) -> int:
"""
获取真实房间 ID,若有缓存则使用缓存
"""
if self.__ruid is None:
await self.get_room_play_info()
return self.__ruid # type: ignore
async def get_ruid(self) -> int:
return await self.__get_ruid()
async def get_danmu_info(self) -> dict:
"""
获取聊天弹幕服务器配置信息(websocket)
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["danmu_info"]
params = {"id": self.room_display_id}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_room_info(self) -> dict:
"""
获取直播间信息(标题,简介等)
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["room_info"]
params = {"room_id": self.room_display_id}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_fan_model(
self,
page_num: int = 1,
target_id: Union[int, None] = None,
roomId: Union[int, None] = None,
) -> dict:
"""
获取自己的粉丝勋章信息
如果带有房间号就返回是否具有的判断 has_medal
如果带有主播 id ,就返回主播的粉丝牌,没有就返回 null
Args:
roomId (int, optional) : 指定房间,查询是否拥有此房间的粉丝牌
target_id (int | None, optional): 指定返回一个主播的粉丝牌,留空就不返回
page_num (int | None, optional): 粉丝牌列表,默认 1
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["live_info"]
params = {
"pageSize": 10,
"page": page_num,
}
if roomId:
params["roomId"] = roomId
if target_id:
params["target_id"] = target_id
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_user_info_in_room(self) -> dict:
"""
获取自己在直播间的信息(粉丝勋章等级,直播用户等级等)
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["user_info_in_room"]
params = {"room_id": self.room_display_id}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_popular_ticket_num(self) -> dict:
"""
获取自己在直播间的人气票数量(付费人气票已赠送的量,免费人气票的持有量)
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["popular_ticket"]
params = {
"ruid": await self.__get_ruid(),
"surce": 0,
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def send_popular_ticket(self) -> dict:
"""
赠送自己在直播间的所有免费人气票
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["send_popular_ticket"]
params = {
"ruid": await self.__get_ruid(),
"visit_id": "",
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_dahanghai(self, page: int = 1) -> dict:
"""
获取大航海列表
Args:
page (int, optional): 页码. Defaults to 1.
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["dahanghai"]
params = {
"roomid": self.room_display_id,
"ruid": await self.__get_ruid(),
"page_size": 30,
"page": page,
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_gaonengbang(self, page: int = 1) -> dict:
"""
获取高能榜列表
Args:
page (int, optional): 页码. Defaults to 1
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["gaonengbang"]
params = {
"roomId": self.room_display_id,
"ruid": await self.__get_ruid(),
"pageSize": 50,
"page": page,
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_seven_rank(self) -> dict:
"""
获取七日榜
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["seven_rank"]
params = {
"roomid": self.room_display_id,
"ruid": await self.__get_ruid(),
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_fans_medal_rank(self) -> dict:
"""
获取粉丝勋章排行
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["fans_medal_rank"]
params = {"roomid": self.room_display_id, "ruid": await self.__get_ruid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_black_list(self, page: int = 1) -> dict:
"""
获取黑名单列表
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["black_list"]
params = {"room_id": self.room_display_id, "ps": page}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_room_play_url(
self, screen_resolution: ScreenResolution = ScreenResolution.ORIGINAL
) -> dict:
"""
获取房间直播流列表
Args:
screen_resolution (ScreenResolution, optional): 清晰度. Defaults to ScreenResolution.ORIGINAL
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["room_play_url"]
params = {
"cid": self.room_display_id,
"platform": "web",
"qn": screen_resolution.value,
"https_url_req": "1",
"ptype": "16",
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_room_play_info_v2(
self,
live_protocol: LiveProtocol = LiveProtocol.DEFAULT,
live_format: LiveFormat = LiveFormat.DEFAULT,
live_codec: LiveCodec = LiveCodec.DEFAULT,
live_qn: ScreenResolution = ScreenResolution.ORIGINAL,
) -> dict:
"""
获取房间信息及可用清晰度列表
Args:
live_protocol (LiveProtocol, optional) : 直播源流协议. Defaults to LiveProtocol.DEFAULT.
live_format (LiveFormat, optional) : 直播源容器格式. Defaults to LiveFormat.DEFAULT.
live_codec (LiveCodec, optional) : 直播源视频编码. Defaults to LiveCodec.DEFAULT.
live_qn (ScreenResolution, optional): 直播源清晰度. Defaults to ScreenResolution.ORIGINAL.
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["room_play_info_v2"]
params = {
"room_id": self.room_display_id,
"platform": "web",
"ptype": "16",
"protocol": live_protocol.value,
"format": live_format.value,
"codec": live_codec.value,
"qn": live_qn.value,
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def ban_user(self, uid: int) -> dict:
"""
封禁用户
Args:
uid (int): 用户 UID
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["operate"]["add_block"]
data = {
"room_id": self.room_display_id,
"tuid": uid,
"mobile_app": "web",
"visit_id": "",
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def unban_user(self, uid: int) -> dict:
"""
解封用户
Args:
uid (int): 用户 UID
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["operate"]["del_block"]
data = {
"room_id": self.room_display_id,
"tuid": uid,
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def send_danmaku(self, danmaku: Danmaku, reply_mid: int = None) -> dict:
"""
直播间发送弹幕
Args:
danmaku (Danmaku): 弹幕类
reply_mid (int, optional): @的 UID. Defaults to None.
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["operate"]["send_danmaku"]
room_id = (await self.get_room_play_info())["room_id"]
data = {
"mode": danmaku.mode,
"msg": danmaku.text,
"roomid": room_id,
"bubble": 0,
"rnd": int(time.time()),
"color": int(danmaku.color, 16),
"fontsize": danmaku.font_size,
}
if reply_mid: data["reply_mid"] = reply_mid
return await Api(**api, credential=self.credential).update_data(**data).result
async def sign_up_dahanghai(self, task_id: int = 1447) -> dict:
"""
大航海签到
Args:
task_id (int, optional): 签到任务 ID. Defaults to 1447
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["sign_up_dahanghai"]
data = {
"task_id": task_id,
"uid": await self.__get_ruid(),
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def send_gift_from_bag(
self,
uid: int,
bag_id: int,
gift_id: int,
gift_num: int,
storm_beat_id: int = 0,
price: int = 0,
) -> dict:
"""
赠送包裹中的礼物,获取包裹信息可以使用 get_self_bag 方法
Args:
uid (int) : 赠送用户的 UID
bag_id (int) : 礼物背包 ID
gift_id (int) : 礼物 ID
gift_num (int) : 礼物数量
storm_beat_id (int, optional) : 未知, Defaults to 0
price (int, optional) : 礼物单价,Defaults to 0
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["send_gift_from_bag"]
data = {
"uid": uid,
"bag_id": bag_id,
"gift_id": gift_id,
"gift_num": gift_num,
"platform": "pc",
"send_ruid": 0,
"storm_beat_id": storm_beat_id,
"price": price,
"biz_code": "live",
"biz_id": self.room_display_id,
"ruid": await self.__get_ruid(),
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def receive_reward(self, receive_type: int = 2) -> dict:
"""
领取自己在某个直播间的航海日志奖励
Args:
receive_type (int) : 领取类型,Defaults to 2.
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["operate"]["receive_reward"]
data = {
"ruid": await self.__get_ruid(),
"receive_type": receive_type,
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def get_general_info(self, act_id: int = 100061) -> dict:
"""
获取自己在该房间的大航海信息, 比如是否开通, 等级等
Args:
act_id (int, optional) : 未知,Defaults to 100061
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["general_info"]
params = {
"actId": act_id,
"roomId": self.room_display_id,
"uid": await self.__get_ruid(),
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def update_news(self, content: str) -> dict:
"""
更新公告
Args:
content: 最多60字符
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["update_news"]
params = {
"content": content,
"roomId": self.room_display_id,
"uid": await self.__get_ruid(),
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_gift_common(self) -> dict:
"""
获取当前直播间内的普通礼物列表
Returns:
dict: 调用 API 返回的结果
"""
api_room_info = API["info"]["room_info"]
params_room_info = {
"room_id": self.room_display_id,
}
res_room_info = (
await Api(**api_room_info, credential=self.credential)
.update_params(**params_room_info)
.result
)
area_id, area_parent_id = (
res_room_info["room_info"]["area_id"],
res_room_info["room_info"]["parent_area_id"],
)
api = API["info"]["gift_common"]
params = {
"room_id": self.room_display_id,
"area_id": area_id,
"area_parent_id": area_parent_id,
"platform": "pc",
"source": "live",
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_gift_special(self, tab_id: int) -> dict:
"""
注:此 API 已失效,请使用 live.get_gift_config
获取当前直播间内的特殊礼物列表
Args:
tab_id (int) : 2:特权礼物,3:定制礼物
Returns:
dict: 调用 API 返回的结果
"""
api_room_info = API["info"]["room_info"]
params_room_info = {
"room_id": self.room_display_id,
}
res_room_info = (
await Api(**api_room_info, credential=self.credential)
.update_params(**params_room_info)
.result
)
area_id, area_parent_id = (
res_room_info["room_info"]["area_id"],
res_room_info["room_info"]["parent_area_id"],
)
api = API["info"]["gift_special"]
params = {
"tab_id": tab_id,
"area_id": area_id,
"area_parent_id": area_parent_id,
"room_id": await self.__get_ruid(),
"source": "live",
"platform": "pc",
"build": 1,
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def send_gift_gold(
self, uid: int, gift_id: int, gift_num: int, price: int, storm_beat_id: int = 0
) -> dict:
"""
赠送金瓜子礼物
Args:
uid (int) : 赠送用户的 UID
gift_id (int) : 礼物 ID (可以通过 get_gift_common 或 get_gift_special 或 get_gift_config 获取)
gift_num (int) : 赠送礼物数量
price (int) : 礼物单价
storm_beat_id (int, Optional): 未知,Defaults to 0
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["send_gift_gold"]
data = {
"uid": uid,
"gift_id": gift_id,
"gift_num": gift_num,
"price": price,
"ruid": await self.__get_ruid(),
"biz_code": "live",
"biz_id": self.room_display_id,
"platform": "pc",
"storm_beat_id": storm_beat_id,
"send_ruid": 0,
"coin_type": "gold",
"bag_id": "0",
"rnd": int(time.time()),
"visit_id": "",
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def send_gift_silver(
self,
uid: int,
gift_id: int,
gift_num: int,
price: int,
storm_beat_id: int = 0,
) -> dict:
"""
赠送银瓜子礼物
Args:
uid (int) : 赠送用户的 UID
gift_id (int) : 礼物 ID (可以通过 get_gift_common 或 get_gift_special 或 get_gift_config 获取)
gift_num (int) : 赠送礼物数量
price (int) : 礼物单价
storm_beat_id (int, Optional): 未知, Defaults to 0
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["send_gift_silver"]
data = {
"uid": uid,
"gift_id": gift_id,
"gift_num": gift_num,
"price": price,
"ruid": await self.__get_ruid(),
"biz_code": "live",
"biz_id": self.room_display_id,
"platform": "pc",
"storm_beat_id": storm_beat_id,
"send_ruid": 0,
"coin_type": "silver",
"bag_id": 0,
"rnd": int(time.time()),
"visit_id": "",
}
return await Api(**api, credential=self.credential).update_data(**data).result
class LiveDanmaku(AsyncEvent):
"""
Websocket 实时获取直播弹幕
Events:
+ DANMU_MSG: 用户发送弹幕
+ SEND_GIFT: 礼物
+ COMBO_SEND:礼物连击
+ GUARD_BUY:续费大航海
+ SUPER_CHAT_MESSAGE:醒目留言(SC)
+ SUPER_CHAT_MESSAGE_JPN:醒目留言(带日语翻译?)
+ WELCOME: 老爷进入房间
+ WELCOME_GUARD: 房管进入房间
+ NOTICE_MSG: 系统通知(全频道广播之类的)
+ PREPARING: 直播准备中
+ LIVE: 直播开始
+ ROOM_REAL_TIME_MESSAGE_UPDATE: 粉丝数等更新
+ ENTRY_EFFECT: 进场特效
+ ROOM_RANK: 房间排名更新
+ INTERACT_WORD: 用户进入直播间
+ ACTIVITY_BANNER_UPDATE_V2: 好像是房间名旁边那个 xx 小时榜
+ ===========================
+ 本模块自定义事件:
+ ==========================
+ VIEW: 直播间人气更新
+ ALL: 所有事件
+ DISCONNECT: 断开连接(传入连接状态码参数)
+ TIMEOUT: 心跳响应超时
+ VERIFICATION_SUCCESSFUL: 认证成功
"""
PROTOCOL_VERSION_RAW_JSON = 0
PROTOCOL_VERSION_HEARTBEAT = 1
PROTOCOL_VERSION_BROTLI_JSON = 3
DATAPACK_TYPE_HEARTBEAT = 2
DATAPACK_TYPE_HEARTBEAT_RESPONSE = 3
DATAPACK_TYPE_NOTICE = 5
DATAPACK_TYPE_VERIFY = 7
DATAPACK_TYPE_VERIFY_SUCCESS_RESPONSE = 8
STATUS_INIT = 0
STATUS_CONNECTING = 1
STATUS_ESTABLISHED = 2
STATUS_CLOSING = 3
STATUS_CLOSED = 4
STATUS_ERROR = 5
def __init__(
self,
room_display_id: int,
debug: bool = False,
credential: Union[Credential, None] = None,
max_retry: int = 5,
retry_after: float = 1,
):
"""
Args:
room_display_id (int) : 房间展示 ID
debug (bool, optional) : 调试模式,将输出更多信息。. Defaults to False.
credential (Credential | None, optional): 凭据. Defaults to None.
max_retry (int, optional) : 连接出错后最大重试次数. Defaults to 5
retry_after (int, optional) : 连接出错后重试间隔时间(秒). Defaults to 1
"""
super().__init__()
self.credential = credential if credential is not None else Credential()
self.room_display_id = room_display_id
self.max_retry = max_retry
self.retry_after = retry_after
self.__room_real_id = None
self.__status = 0
self.__ws = None
self.__tasks = []
self.__debug = debug
self.__heartbeat_timer = 60.0
self.err_reason = ""
# logging
self.logger = logging.getLogger(f"LiveDanmaku_{self.room_display_id}")
self.logger.setLevel(logging.DEBUG if debug else logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(
"["
+ str(room_display_id)
+ "][%(asctime)s][%(levelname)s] %(message)s"
)
)
self.logger.addHandler(handler)
def get_status(self) -> int:
"""
获取连接状态
Returns:
int: 0 初始化,1 连接建立中,2 已连接,3 断开连接中,4 已断开,5 错误
"""
return self.__status
async def connect(self) -> None:
"""
连接直播间
"""
if self.get_status() == self.STATUS_CONNECTING:
raise LiveException("正在建立连接中")
if self.get_status() == self.STATUS_ESTABLISHED:
raise LiveException("连接已建立,不可重复调用")
if self.get_status() == self.STATUS_CLOSING:
raise LiveException("正在关闭连接,不可调用")
await self.__main()
async def disconnect(self) -> None:
"""
断开连接
"""
if self.get_status() != self.STATUS_ESTABLISHED:
raise LiveException("尚未连接服务器")
self.__status = self.STATUS_CLOSING
self.logger.info("连接正在关闭")
# 取消所有任务
while len(self.__tasks) > 0:
self.__tasks.pop().cancel()
self.__status = self.STATUS_CLOSED
await self.__ws.close() # type: ignore
self.logger.info("连接已关闭")
async def __main(self) -> None:
"""
入口
"""
self.__status = self.STATUS_CONNECTING
room = LiveRoom(self.room_display_id, self.credential)
self.logger.info(f"准备连接直播间 {self.room_display_id}")
# 获取真实房间号
self.logger.debug("正在获取真实房间号")
info = await room.get_room_play_info()
self.__room_real_id = info["room_id"]
self.logger.debug(f"获取成功,真实房间号:{self.__room_real_id}")
# 获取直播服务器配置
self.logger.debug("正在获取聊天服务器配置")
conf = await room.get_danmu_info()
self.logger.debug("聊天服务器配置获取成功")
# 连接直播间
self.logger.debug("准备连接直播间")
session = get_aiohttp_session()
available_hosts: List[dict] = conf["host_list"]
retry = self.max_retry
host = None
@self.on("TIMEOUT")
async def on_timeout(ev):
# 连接超时
self.err_reason = "心跳响应超时"
await self.__ws.close() # type: ignore
while True:
self.err_reason = ""
# 重置心跳计时器
self.__heartbeat_timer = 0
if not available_hosts:
self.err_reason = "已尝试所有主机但仍无法连接"
break
if host is None or retry <= 0:
host = available_hosts.pop()
retry = self.max_retry
port = host["wss_port"]
protocol = "wss"
uri = f"{protocol}://{host['host']}:{port}/sub"
self.__status = self.STATUS_CONNECTING
self.logger.info(f"正在尝试连接主机: {uri}")
try:
async with session.ws_connect(uri, headers=HEADERS.copy()) as ws:
@self.on("VERIFICATION_SUCCESSFUL")
async def on_verification_successful(data):
# 新建心跳任务
while len(self.__tasks) > 0:
self.__tasks.pop().cancel()
self.__tasks.append(asyncio.create_task(self.__heartbeat(ws)))
self.__ws = ws
self.logger.debug("连接主机成功, 准备发送认证信息")
await self.__send_verify_data(ws, conf["token"])
async for msg in ws:
if msg.type == aiohttp.WSMsgType.BINARY:
self.logger.debug(f"收到原始数据:{msg.data}")
await self.__handle_data(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
self.__status = self.STATUS_ERROR
self.logger.error("出现错误")
elif msg.type == aiohttp.WSMsgType.CLOSING:
self.logger.debug("连接正在关闭")
self.__status = self.STATUS_CLOSING
elif msg.type == aiohttp.WSMsgType.CLOSED:
self.logger.info("连接已关闭")
self.__status = self.STATUS_CLOSED
# 正常断开情况下跳出循环
if self.__status != self.STATUS_CLOSED or self.err_reason:
# 非用户手动调用关闭,触发重连
self.logger.warning(
"非正常关闭连接" if not self.err_reason else self.err_reason
)
else:
break
except Exception as e:
await ws.close()
self.logger.warning(e)
if retry <= 0 or len(available_hosts) == 0:
self.logger.error("无法连接服务器")
self.err_reason = "无法连接服务器"
break
self.logger.warning(f"将在 {self.retry_after} 秒后重新连接...")
self.__status = self.STATUS_ERROR
retry -= 1
await asyncio.sleep(self.retry_after)
async def __handle_data(self, data) -> None:
"""
处理数据
"""
data = self.__unpack(data)
self.logger.debug(f"收到信息:{data}")
for info in data:
callback_info = {
"room_display_id": self.room_display_id,
"room_real_id": self.__room_real_id,
}
# 依次处理并调用用户指定函数
if (
info["datapack_type"]
== LiveDanmaku.DATAPACK_TYPE_VERIFY_SUCCESS_RESPONSE
):
# 认证反馈
if info["data"]["code"] == 0:
# 认证成功反馈
self.logger.info("连接服务器并认证成功")
self.__status = self.STATUS_ESTABLISHED
callback_info["type"] = "VERIFICATION_SUCCESSFUL"
callback_info["data"] = None
self.dispatch("VERIFICATION_SUCCESSFUL", callback_info)
self.dispatch("ALL", callback_info)
elif info["datapack_type"] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE:
# 心跳包反馈,返回直播间人气
self.logger.debug("收到心跳包反馈")
# 重置心跳计时器
self.__heartbeat_timer = 30.0
callback_info["type"] = "VIEW"
callback_info["data"] = info["data"]["view"]
self.dispatch("VIEW", callback_info)
self.dispatch("ALL", callback_info)
elif info["datapack_type"] == LiveDanmaku.DATAPACK_TYPE_NOTICE:
# 直播间弹幕、礼物等信息
callback_info["type"] = info["data"]["cmd"]
# DANMU_MSG 事件名特殊:DANMU_MSG:4:0:2:2:2:0,需取出事件名,暂不知格式
if callback_info["type"].find("DANMU_MSG") > -1:
callback_info["type"] = "DANMU_MSG"
info["data"]["cmd"] = "DANMU_MSG"
callback_info["data"] = info["data"]
self.dispatch(callback_info["type"], callback_info)
self.dispatch("ALL", callback_info)
else:
self.logger.warning("检测到未知的数据包类型,无法处理")
async def __send_verify_data(self, ws: ClientWebSocketResponse, token: str) -> None:
self.credential.raise_for_no_buvid3()
# 没传入 dedeuserid 可以试图 live.get_self_info
if not self.credential.has_dedeuserid():
try:
info = await get_self_info(self.credential)
self.credential.dedeuserid = str(info["uid"])
except:
pass # 留到下面一起抛出错误
self.credential.raise_for_no_dedeuserid()
verifyData = {
"uid": int(self.credential.dedeuserid),
"roomid": self.__room_real_id,
"protover": 3,
"buvid": self.credential.buvid3,
"platform": "web",
"type": 2,
"key": token,
}
data = json.dumps(verifyData).encode()
await self.__send(
data, self.PROTOCOL_VERSION_HEARTBEAT, self.DATAPACK_TYPE_VERIFY, ws
)
async def __heartbeat(self, ws: ClientWebSocketResponse) -> None:
"""
定时发送心跳包
"""
HEARTBEAT = self.__pack(
b"[object Object]",
self.PROTOCOL_VERSION_HEARTBEAT,
self.DATAPACK_TYPE_HEARTBEAT,
)
while True:
if self.__heartbeat_timer == 0:
self.logger.debug("发送心跳包")
await ws.send_bytes(HEARTBEAT)
heartbeat_url = "https://live-trace.bilibili.com/xlive/rdata-interface/v1/heartbeat/webHeartBeat?pf=web&hb="
hb = str(
base64.b64encode(f"60|{self.room_display_id}|1|0".encode("utf-8")),
"utf-8",
)
await Api(
method="GET", url=heartbeat_url, json_body=True
).update_params(**{"hb": hb, "pf": "web"}).result
elif self.__heartbeat_timer <= -30:
# 视为已异常断开连接,发布 TIMEOUT 事件
self.dispatch("TIMEOUT")
break
await asyncio.sleep(1.0)
self.__heartbeat_timer -= 1
async def __send(
self,
data: bytes,
protocol_version: int,
datapack_type: int,
ws: ClientWebSocketResponse,
) -> None:
"""
自动打包并发送数据
"""
data = self.__pack(data, protocol_version, datapack_type)
self.logger.debug(f"发送原始数据:{data}")
await ws.send_bytes(data)
@staticmethod
def __pack(data: bytes, protocol_version: int, datapack_type: int) -> bytes:
"""
打包数据
"""
sendData = bytearray()
sendData += struct.pack(">H", 16)
raise_for_statement(0 <= protocol_version <= 2, LiveException("数据包协议版本错误,范围 0~2"))
sendData += struct.pack(">H", protocol_version)
raise_for_statement(datapack_type in [2, 7], LiveException("数据包类型错误,可用类型:2, 7"))
sendData += struct.pack(">I", datapack_type)
sendData += struct.pack(">I", 1)
sendData += data
sendData = struct.pack(">I", len(sendData) + 4) + sendData
return bytes(sendData)
@staticmethod
def __unpack(data: bytes) -> List[Any]:
"""
解包数据
"""
ret = []
offset = 0
header = struct.unpack(">IHHII", data[:16])
if header[2] == LiveDanmaku.PROTOCOL_VERSION_BROTLI_JSON:
realData = brotli.decompress(data[16:])
else:
realData = data
if (
header[2] == LiveDanmaku.PROTOCOL_VERSION_HEARTBEAT
and header[3] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE
):
realData = realData[16:]
# 心跳包协议特殊处理
recvData = {
"protocol_version": header[2],
"datapack_type": header[3],
"data": {"view": struct.unpack(">I", realData[0:4])[0]},
}
ret.append(recvData)
return ret
while offset < len(realData):
header = struct.unpack(">IHHII", realData[offset: offset + 16])
length = header[0]
recvData = {
"protocol_version": header[2],
"datapack_type": header[3],
"data": None,
}
chunkData = realData[(offset + 16): (offset + length)]
if header[2] == 0:
recvData["data"] = json.loads(chunkData.decode())
elif header[2] == 2:
recvData["data"] = json.loads(chunkData.decode())
elif header[2] == 1:
if header[3] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE:
recvData["data"] = {"view": struct.unpack(">I", chunkData)[0]}
elif header[3] == LiveDanmaku.DATAPACK_TYPE_VERIFY_SUCCESS_RESPONSE:
recvData["data"] = json.loads(chunkData.decode())
ret.append(recvData)
offset += length
return ret
async def get_self_info(credential: Credential) -> dict:
"""
获取自己直播等级、排行等信息
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
api = API["info"]["user_info"]
return await Api(**api, credential=credential).result
async def get_self_live_info(credential: Credential) -> dict:
"""
获取自己的粉丝牌、大航海等信息
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
api = API["info"]["live_info"]
return await Api(**api, credential=credential).result
async def get_self_dahanghai_info(
page: int = 1, page_size: int = 10, credential: Union[Credential, None] = None
) -> dict:
"""
获取自己开通的大航海信息
Args:
page (int, optional): 页数. Defaults to 1.
page_size (int, optional): 每页数量. Defaults to 10.
总页数取得方法:
```python
import math
info = live.get_self_live_info(credential)
pages = math.ceil(info['data']['guards'] / 10)
```
Returns:
dict: 调用 API 返回的结果
"""
if credential is None:
credential = Credential()
credential.raise_for_no_sessdata()
api = API["info"]["user_guards"]
params = {"page": page, "page_size": page_size}
return await Api(**api, credential=credential).update_params(**params).result
async def get_self_bag(credential: Credential) -> dict:
"""
获取自己的直播礼物包裹信息
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
api = API["info"]["bag_list"]
return await Api(**api, credential=credential).result
async def get_gift_config(
room_id: Union[int, None] = None,
area_id: Union[int, None] = None,
area_parent_id: Union[int, None] = None,
):
"""
获取所有礼物的信息,包括礼物 id、名称、价格、等级等。
同时填了 room_id、area_id、area_parent_id,则返回一个较小的 json,只包含该房间、该子区域、父区域的礼物。
但即使限定了三个条件,仍然会返回约 1.5w 行的 json。不加限定则是 2.8w 行。
Args:
room_id (int, optional) : 房间显示 ID. Defaults to None.
area_id (int, optional) : 子分区 ID. Defaults to None.
area_parent_id (int, optional) : 父分区 ID. Defaults to None.
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["gift_config"]
params = {
"platform": "pc",
"source": "live",
"room_id": room_id if room_id is not None else "",
"area_id": area_id if area_id is not None else "",
"area_parent_id": area_parent_id if area_parent_id is not None else "",
}
return await Api(**api).update_params(**params).result
async def get_area_info() -> dict:
"""
获取所有分区信息
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["area_info"]
return await Api(**api).result
async def get_live_followers_info(
need_recommend: bool = True, credential: Union[Credential, None] = None
) -> dict:
"""
获取关注列表中正在直播的直播间信息,包括房间直播热度,房间名称及标题,清晰度,是否官方认证等信息。
Args:
need_recommend (bool, optional): 是否接受推荐直播间,Defaults to True
Returns:
dict: 调用 API 返回的结果
"""
if credential is None:
credential = Credential()
credential.raise_for_no_sessdata()
api = API["info"]["followers_live_info"]
params = {"need_recommend": int(need_recommend), "filterRule": 0}
return await Api(**api, credential=credential).update_params(**params).result
async def get_unlive_followers_info(
page: int = 1, page_size: int = 30, credential: Union[Credential, None] = None
) -> dict:
"""
获取关注列表中未在直播的直播间信息,包括上次开播时间,上次开播的类别,直播间公告,是否有录播等。
Args:
page (int, optional): 页码, Defaults to 1.
page_size (int, optional): 每页数量 Defaults to 30.
Returns:
dict: 调用 API 返回的结果
"""
if credential is None:
credential = Credential()
credential.raise_for_no_sessdata()
api = API["info"]["followers_unlive_info"]
params = {
"page": page,
"pagesize": page_size,
}
return await Api(**api, credential=credential).update_params(**params).result
async def create_live_reserve(
title: str, start_time: int, credential: Credential
) -> dict:
"""
创建直播预约
Args:
title (str) : 直播间标题
start_time (int) : 开播时间戳
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
api = API["operate"]["create_reserve"]
data = {
"title": title,
"type": 2,
"live_plan_start_time": start_time,
"stime": None,
"from": 1,
}
return await Api(**api, credential=credential).update_data(**data).result