|
r"""
|
|
bilibili_api.live
|
|
|
|
直播相关
|
|
"""
|
|
import json
|
|
import time
|
|
import base64
|
|
import struct
|
|
import asyncio
|
|
import logging
|
|
from enum import Enum
|
|
from typing import Any, List, Union
|
|
|
|
import brotli
|
|
import aiohttp
|
|
from aiohttp.client_ws import ClientWebSocketResponse
|
|
|
|
from .utils.utils import get_api, raise_for_statement
|
|
from .utils.danmaku import Danmaku
|
|
from .utils.network import get_aiohttp_session, Api, HEADERS
|
|
from .utils.AsyncEvent import AsyncEvent
|
|
from .utils.credential import Credential
|
|
from .exceptions.LiveException import LiveException
|
|
|
|
API = get_api("live")
|
|
|
|
|
|
class ScreenResolution(Enum):
|
|
"""
|
|
直播源清晰度。
|
|
|
|
清晰度编号,4K 20000,原画 10000,蓝光(杜比)401,蓝光 400,超清 250,高清 150,流畅 80
|
|
+ FOUR_K : 4K。
|
|
+ ORIGINAL : 原画。
|
|
+ BLU_RAY_DOLBY : 蓝光(杜比)。
|
|
+ BLU_RAY : 蓝光。
|
|
+ ULTRA_HD : 超清。
|
|
+ HD : 高清。
|
|
+ FLUENCY : 流畅。
|
|
"""
|
|
|
|
FOUR_K = 20000
|
|
ORIGINAL = 10000
|
|
BLU_RAY_DOLBY = 401
|
|
BLU_RAY = 400
|
|
ULTRA_HD = 250
|
|
HD = 150
|
|
FLUENCY = 80
|
|
|
|
|
|
class LiveProtocol(Enum):
|
|
"""
|
|
直播源流协议。
|
|
|
|
流协议,0 为 FLV 流,1 为 HLS 流。默认:0,1
|
|
+ FLV : 0。
|
|
+ HLS : 1。
|
|
+ DEFAULT : 0,1
|
|
"""
|
|
|
|
FLV = 0
|
|
HLS = 1
|
|
DEFAULT = "0,1"
|
|
|
|
|
|
class LiveFormat(Enum):
|
|
"""
|
|
直播源容器格式
|
|
|
|
容器格式,0 为 flv 格式;1 为 ts 格式(仅限 hls 流);2 为 fmp4 格式(仅限 hls 流)。默认:0,2
|
|
+ FLV : 0。
|
|
+ TS : 1。
|
|
+ FMP4 : 2。
|
|
+ DEFAULT : 2。
|
|
"""
|
|
|
|
FLV = 0
|
|
TS = 1
|
|
FMP4 = 2
|
|
DEFAULT = "0,1,2"
|
|
|
|
|
|
class LiveCodec(Enum):
|
|
"""
|
|
直播源视频编码
|
|
|
|
视频编码,0 为 avc 编码,1 为 hevc 编码。默认:0,1
|
|
+ AVC : 0。
|
|
+ HEVC : 1。
|
|
+ DEFAULT : 0,1。
|
|
"""
|
|
|
|
AVC = 0
|
|
HEVC = 1
|
|
DEFAULT = "0,1"
|
|
|
|
|
|
class LiveRoom:
|
|
"""
|
|
直播类,获取各种直播间的操作均在里边。
|
|
|
|
AttributesL
|
|
credential (Credential): 凭据类
|
|
|
|
room_display_id (int) : 房间展示 id
|
|
"""
|
|
|
|
def __init__(
|
|
self, room_display_id: int, credential: Union[Credential, None] = None
|
|
):
|
|
"""
|
|
Args:
|
|
room_display_id (int) : 房间展示 ID(即 URL 中的 ID)
|
|
|
|
credential (Credential, optional): 凭据. Defaults to None.
|
|
"""
|
|
self.room_display_id = room_display_id
|
|
|
|
if credential is None:
|
|
self.credential = Credential()
|
|
else:
|
|
self.credential = credential
|
|
|
|
self.__ruid = None
|
|
|
|
async def start(self, area_id: int) -> dict:
|
|
"""
|
|
开始直播
|
|
|
|
Args:
|
|
area_id (int): 直播分区id(子分区id)。可使用 live_area 模块查询。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["start"]
|
|
params = {
|
|
"area_v2": area_id,
|
|
"room_id": self.room_display_id,
|
|
"platform": "pc",
|
|
}
|
|
resp = (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
return resp
|
|
|
|
async def stop(self) -> dict:
|
|
"""
|
|
关闭直播
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["stop"]
|
|
params = {
|
|
"room_id": self.room_display_id,
|
|
}
|
|
resp = (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
return resp
|
|
|
|
async def get_room_play_info(self) -> dict:
|
|
"""
|
|
获取房间信息(真实房间号,封禁情况等)
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["room_play_info"]
|
|
params = {
|
|
"room_id": self.room_display_id,
|
|
}
|
|
resp = (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
|
|
self.__ruid = resp["uid"]
|
|
return resp
|
|
|
|
async def get_room_id(self) -> int:
|
|
return (await self.get_room_play_info())["room_id"]
|
|
|
|
async def __get_ruid(self) -> int:
|
|
"""
|
|
获取真实房间 ID,若有缓存则使用缓存
|
|
"""
|
|
if self.__ruid is None:
|
|
await self.get_room_play_info()
|
|
|
|
return self.__ruid
|
|
|
|
async def get_ruid(self) -> int:
|
|
return await self.__get_ruid()
|
|
|
|
async def get_danmu_info(self) -> dict:
|
|
"""
|
|
获取聊天弹幕服务器配置信息(websocket)
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["danmu_info"]
|
|
params = {"id": self.room_display_id}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_room_info(self) -> dict:
|
|
"""
|
|
获取直播间信息(标题,简介等)
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["room_info"]
|
|
params = {"room_id": self.room_display_id}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_fan_model(
|
|
self,
|
|
page_num: int = 1,
|
|
target_id: Union[int, None] = None,
|
|
roomId: Union[int, None] = None,
|
|
) -> dict:
|
|
"""
|
|
获取自己的粉丝勋章信息
|
|
|
|
如果带有房间号就返回是否具有的判断 has_medal
|
|
|
|
如果带有主播 id ,就返回主播的粉丝牌,没有就返回 null
|
|
|
|
Args:
|
|
roomId (int, optional) : 指定房间,查询是否拥有此房间的粉丝牌
|
|
|
|
target_id (int | None, optional): 指定返回一个主播的粉丝牌,留空就不返回
|
|
|
|
page_num (int | None, optional): 粉丝牌列表,默认 1
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["live_info"]
|
|
params = {
|
|
"pageSize": 10,
|
|
"page": page_num,
|
|
}
|
|
if roomId:
|
|
params["roomId"] = roomId
|
|
if target_id:
|
|
params["target_id"] = target_id
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_user_info_in_room(self) -> dict:
|
|
"""
|
|
获取自己在直播间的信息(粉丝勋章等级,直播用户等级等)
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["user_info_in_room"]
|
|
params = {"room_id": self.room_display_id}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_popular_ticket_num(self) -> dict:
|
|
"""
|
|
获取自己在直播间的人气票数量(付费人气票已赠送的量,免费人气票的持有量)
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["popular_ticket"]
|
|
params = {
|
|
"ruid": await self.__get_ruid(),
|
|
"surce": 0,
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def send_popular_ticket(self) -> dict:
|
|
"""
|
|
赠送自己在直播间的所有免费人气票
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["send_popular_ticket"]
|
|
params = {
|
|
"ruid": await self.__get_ruid(),
|
|
"visit_id": "",
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_dahanghai(self, page: int = 1) -> dict:
|
|
"""
|
|
获取大航海列表
|
|
|
|
Args:
|
|
page (int, optional): 页码. Defaults to 1.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["dahanghai"]
|
|
params = {
|
|
"roomid": self.room_display_id,
|
|
"ruid": await self.__get_ruid(),
|
|
"page_size": 30,
|
|
"page": page,
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_gaonengbang(self, page: int = 1) -> dict:
|
|
"""
|
|
获取高能榜列表
|
|
|
|
Args:
|
|
page (int, optional): 页码. Defaults to 1
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["gaonengbang"]
|
|
params = {
|
|
"roomId": self.room_display_id,
|
|
"ruid": await self.__get_ruid(),
|
|
"pageSize": 50,
|
|
"page": page,
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_seven_rank(self) -> dict:
|
|
"""
|
|
获取七日榜
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["seven_rank"]
|
|
params = {
|
|
"roomid": self.room_display_id,
|
|
"ruid": await self.__get_ruid(),
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_fans_medal_rank(self) -> dict:
|
|
"""
|
|
获取粉丝勋章排行
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["fans_medal_rank"]
|
|
params = {"roomid": self.room_display_id, "ruid": await self.__get_ruid()}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_black_list(self, page: int = 1) -> dict:
|
|
"""
|
|
获取黑名单列表
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["black_list"]
|
|
params = {"room_id": self.room_display_id, "ps": page}
|
|
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_room_play_url(
|
|
self, screen_resolution: ScreenResolution = ScreenResolution.ORIGINAL
|
|
) -> dict:
|
|
"""
|
|
获取房间直播流列表
|
|
|
|
Args:
|
|
screen_resolution (ScreenResolution, optional): 清晰度. Defaults to ScreenResolution.ORIGINAL
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["room_play_url"]
|
|
params = {
|
|
"cid": self.room_display_id,
|
|
"platform": "web",
|
|
"qn": screen_resolution.value,
|
|
"https_url_req": "1",
|
|
"ptype": "16",
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_room_play_info_v2(
|
|
self,
|
|
live_protocol: LiveProtocol = LiveProtocol.DEFAULT,
|
|
live_format: LiveFormat = LiveFormat.DEFAULT,
|
|
live_codec: LiveCodec = LiveCodec.DEFAULT,
|
|
live_qn: ScreenResolution = ScreenResolution.ORIGINAL,
|
|
) -> dict:
|
|
"""
|
|
获取房间信息及可用清晰度列表
|
|
|
|
Args:
|
|
live_protocol (LiveProtocol, optional) : 直播源流协议. Defaults to LiveProtocol.DEFAULT.
|
|
|
|
live_format (LiveFormat, optional) : 直播源容器格式. Defaults to LiveFormat.DEFAULT.
|
|
|
|
live_codec (LiveCodec, optional) : 直播源视频编码. Defaults to LiveCodec.DEFAULT.
|
|
|
|
live_qn (ScreenResolution, optional): 直播源清晰度. Defaults to ScreenResolution.ORIGINAL.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["room_play_info_v2"]
|
|
params = {
|
|
"room_id": self.room_display_id,
|
|
"platform": "web",
|
|
"ptype": "16",
|
|
"protocol": live_protocol.value,
|
|
"format": live_format.value,
|
|
"codec": live_codec.value,
|
|
"qn": live_qn.value,
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def ban_user(self, uid: int) -> dict:
|
|
"""
|
|
封禁用户
|
|
|
|
Args:
|
|
uid (int): 用户 UID
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["operate"]["add_block"]
|
|
data = {
|
|
"room_id": self.room_display_id,
|
|
"tuid": uid,
|
|
"mobile_app": "web",
|
|
"visit_id": "",
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def unban_user(self, uid: int) -> dict:
|
|
"""
|
|
解封用户
|
|
|
|
Args:
|
|
uid (int): 用户 UID
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
api = API["operate"]["del_block"]
|
|
data = {
|
|
"room_id": self.room_display_id,
|
|
"tuid": uid,
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def send_danmaku(self, danmaku: Danmaku, reply_mid: int = None) -> dict:
|
|
"""
|
|
直播间发送弹幕
|
|
|
|
Args:
|
|
danmaku (Danmaku): 弹幕类
|
|
|
|
reply_mid (int, optional): @的 UID. Defaults to None.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["operate"]["send_danmaku"]
|
|
room_id = (await self.get_room_play_info())["room_id"]
|
|
|
|
data = {
|
|
"mode": danmaku.mode,
|
|
"msg": danmaku.text,
|
|
"roomid": room_id,
|
|
"bubble": 0,
|
|
"rnd": int(time.time()),
|
|
"color": int(danmaku.color, 16),
|
|
"fontsize": danmaku.font_size,
|
|
}
|
|
if reply_mid: data["reply_mid"] = reply_mid
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def sign_up_dahanghai(self, task_id: int = 1447) -> dict:
|
|
"""
|
|
大航海签到
|
|
|
|
Args:
|
|
task_id (int, optional): 签到任务 ID. Defaults to 1447
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["sign_up_dahanghai"]
|
|
data = {
|
|
"task_id": task_id,
|
|
"uid": await self.__get_ruid(),
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def send_gift_from_bag(
|
|
self,
|
|
uid: int,
|
|
bag_id: int,
|
|
gift_id: int,
|
|
gift_num: int,
|
|
storm_beat_id: int = 0,
|
|
price: int = 0,
|
|
) -> dict:
|
|
"""
|
|
赠送包裹中的礼物,获取包裹信息可以使用 get_self_bag 方法
|
|
|
|
Args:
|
|
uid (int) : 赠送用户的 UID
|
|
|
|
bag_id (int) : 礼物背包 ID
|
|
|
|
gift_id (int) : 礼物 ID
|
|
|
|
gift_num (int) : 礼物数量
|
|
|
|
storm_beat_id (int, optional) : 未知, Defaults to 0
|
|
|
|
price (int, optional) : 礼物单价,Defaults to 0
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["send_gift_from_bag"]
|
|
data = {
|
|
"uid": uid,
|
|
"bag_id": bag_id,
|
|
"gift_id": gift_id,
|
|
"gift_num": gift_num,
|
|
"platform": "pc",
|
|
"send_ruid": 0,
|
|
"storm_beat_id": storm_beat_id,
|
|
"price": price,
|
|
"biz_code": "live",
|
|
"biz_id": self.room_display_id,
|
|
"ruid": await self.__get_ruid(),
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def receive_reward(self, receive_type: int = 2) -> dict:
|
|
"""
|
|
领取自己在某个直播间的航海日志奖励
|
|
|
|
Args:
|
|
receive_type (int) : 领取类型,Defaults to 2.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["operate"]["receive_reward"]
|
|
data = {
|
|
"ruid": await self.__get_ruid(),
|
|
"receive_type": receive_type,
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def get_general_info(self, act_id: int = 100061) -> dict:
|
|
"""
|
|
获取自己在该房间的大航海信息, 比如是否开通, 等级等
|
|
|
|
Args:
|
|
act_id (int, optional) : 未知,Defaults to 100061
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["general_info"]
|
|
params = {
|
|
"actId": act_id,
|
|
"roomId": self.room_display_id,
|
|
"uid": await self.__get_ruid(),
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def update_news(self, content: str) -> dict:
|
|
"""
|
|
更新公告
|
|
|
|
Args:
|
|
content: 最多60字符
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["update_news"]
|
|
params = {
|
|
"content": content,
|
|
"roomId": self.room_display_id,
|
|
"uid": await self.__get_ruid(),
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_gift_common(self) -> dict:
|
|
"""
|
|
获取当前直播间内的普通礼物列表
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api_room_info = API["info"]["room_info"]
|
|
params_room_info = {
|
|
"room_id": self.room_display_id,
|
|
}
|
|
res_room_info = (
|
|
await Api(**api_room_info, credential=self.credential)
|
|
.update_params(**params_room_info)
|
|
.result
|
|
)
|
|
area_id, area_parent_id = (
|
|
res_room_info["room_info"]["area_id"],
|
|
res_room_info["room_info"]["parent_area_id"],
|
|
)
|
|
|
|
api = API["info"]["gift_common"]
|
|
params = {
|
|
"room_id": self.room_display_id,
|
|
"area_id": area_id,
|
|
"area_parent_id": area_parent_id,
|
|
"platform": "pc",
|
|
"source": "live",
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def get_gift_special(self, tab_id: int) -> dict:
|
|
"""
|
|
注:此 API 已失效,请使用 live.get_gift_config
|
|
|
|
获取当前直播间内的特殊礼物列表
|
|
|
|
Args:
|
|
tab_id (int) : 2:特权礼物,3:定制礼物
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api_room_info = API["info"]["room_info"]
|
|
params_room_info = {
|
|
"room_id": self.room_display_id,
|
|
}
|
|
res_room_info = (
|
|
await Api(**api_room_info, credential=self.credential)
|
|
.update_params(**params_room_info)
|
|
.result
|
|
)
|
|
area_id, area_parent_id = (
|
|
res_room_info["room_info"]["area_id"],
|
|
res_room_info["room_info"]["parent_area_id"],
|
|
)
|
|
|
|
api = API["info"]["gift_special"]
|
|
|
|
params = {
|
|
"tab_id": tab_id,
|
|
"area_id": area_id,
|
|
"area_parent_id": area_parent_id,
|
|
"room_id": await self.__get_ruid(),
|
|
"source": "live",
|
|
"platform": "pc",
|
|
"build": 1,
|
|
}
|
|
return (
|
|
await Api(**api, credential=self.credential).update_params(**params).result
|
|
)
|
|
|
|
async def send_gift_gold(
|
|
self, uid: int, gift_id: int, gift_num: int, price: int, storm_beat_id: int = 0
|
|
) -> dict:
|
|
"""
|
|
赠送金瓜子礼物
|
|
|
|
Args:
|
|
uid (int) : 赠送用户的 UID
|
|
|
|
gift_id (int) : 礼物 ID (可以通过 get_gift_common 或 get_gift_special 或 get_gift_config 获取)
|
|
|
|
gift_num (int) : 赠送礼物数量
|
|
|
|
price (int) : 礼物单价
|
|
|
|
storm_beat_id (int, Optional): 未知,Defaults to 0
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["send_gift_gold"]
|
|
data = {
|
|
"uid": uid,
|
|
"gift_id": gift_id,
|
|
"gift_num": gift_num,
|
|
"price": price,
|
|
"ruid": await self.__get_ruid(),
|
|
"biz_code": "live",
|
|
"biz_id": self.room_display_id,
|
|
"platform": "pc",
|
|
"storm_beat_id": storm_beat_id,
|
|
"send_ruid": 0,
|
|
"coin_type": "gold",
|
|
"bag_id": "0",
|
|
"rnd": int(time.time()),
|
|
"visit_id": "",
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
async def send_gift_silver(
|
|
self,
|
|
uid: int,
|
|
gift_id: int,
|
|
gift_num: int,
|
|
price: int,
|
|
storm_beat_id: int = 0,
|
|
) -> dict:
|
|
"""
|
|
赠送银瓜子礼物
|
|
|
|
Args:
|
|
uid (int) : 赠送用户的 UID
|
|
|
|
gift_id (int) : 礼物 ID (可以通过 get_gift_common 或 get_gift_special 或 get_gift_config 获取)
|
|
|
|
gift_num (int) : 赠送礼物数量
|
|
|
|
price (int) : 礼物单价
|
|
|
|
storm_beat_id (int, Optional): 未知, Defaults to 0
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
self.credential.raise_for_no_sessdata()
|
|
self.credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["send_gift_silver"]
|
|
data = {
|
|
"uid": uid,
|
|
"gift_id": gift_id,
|
|
"gift_num": gift_num,
|
|
"price": price,
|
|
"ruid": await self.__get_ruid(),
|
|
"biz_code": "live",
|
|
"biz_id": self.room_display_id,
|
|
"platform": "pc",
|
|
"storm_beat_id": storm_beat_id,
|
|
"send_ruid": 0,
|
|
"coin_type": "silver",
|
|
"bag_id": 0,
|
|
"rnd": int(time.time()),
|
|
"visit_id": "",
|
|
}
|
|
return await Api(**api, credential=self.credential).update_data(**data).result
|
|
|
|
|
|
class LiveDanmaku(AsyncEvent):
|
|
"""
|
|
Websocket 实时获取直播弹幕
|
|
|
|
Events:
|
|
+ DANMU_MSG: 用户发送弹幕
|
|
+ SEND_GIFT: 礼物
|
|
+ COMBO_SEND:礼物连击
|
|
+ GUARD_BUY:续费大航海
|
|
+ SUPER_CHAT_MESSAGE:醒目留言(SC)
|
|
+ SUPER_CHAT_MESSAGE_JPN:醒目留言(带日语翻译?)
|
|
+ WELCOME: 老爷进入房间
|
|
+ WELCOME_GUARD: 房管进入房间
|
|
+ NOTICE_MSG: 系统通知(全频道广播之类的)
|
|
+ PREPARING: 直播准备中
|
|
+ LIVE: 直播开始
|
|
+ ROOM_REAL_TIME_MESSAGE_UPDATE: 粉丝数等更新
|
|
+ ENTRY_EFFECT: 进场特效
|
|
+ ROOM_RANK: 房间排名更新
|
|
+ INTERACT_WORD: 用户进入直播间
|
|
+ ACTIVITY_BANNER_UPDATE_V2: 好像是房间名旁边那个 xx 小时榜
|
|
+ ===========================
|
|
+ 本模块自定义事件:
|
|
+ ==========================
|
|
+ VIEW: 直播间人气更新
|
|
+ ALL: 所有事件
|
|
+ DISCONNECT: 断开连接(传入连接状态码参数)
|
|
+ TIMEOUT: 心跳响应超时
|
|
+ VERIFICATION_SUCCESSFUL: 认证成功
|
|
"""
|
|
|
|
PROTOCOL_VERSION_RAW_JSON = 0
|
|
PROTOCOL_VERSION_HEARTBEAT = 1
|
|
PROTOCOL_VERSION_BROTLI_JSON = 3
|
|
|
|
DATAPACK_TYPE_HEARTBEAT = 2
|
|
DATAPACK_TYPE_HEARTBEAT_RESPONSE = 3
|
|
DATAPACK_TYPE_NOTICE = 5
|
|
DATAPACK_TYPE_VERIFY = 7
|
|
DATAPACK_TYPE_VERIFY_SUCCESS_RESPONSE = 8
|
|
|
|
STATUS_INIT = 0
|
|
STATUS_CONNECTING = 1
|
|
STATUS_ESTABLISHED = 2
|
|
STATUS_CLOSING = 3
|
|
STATUS_CLOSED = 4
|
|
STATUS_ERROR = 5
|
|
|
|
def __init__(
|
|
self,
|
|
room_display_id: int,
|
|
debug: bool = False,
|
|
credential: Union[Credential, None] = None,
|
|
max_retry: int = 5,
|
|
retry_after: float = 1,
|
|
):
|
|
"""
|
|
Args:
|
|
room_display_id (int) : 房间展示 ID
|
|
debug (bool, optional) : 调试模式,将输出更多信息。. Defaults to False.
|
|
credential (Credential | None, optional): 凭据. Defaults to None.
|
|
max_retry (int, optional) : 连接出错后最大重试次数. Defaults to 5
|
|
retry_after (int, optional) : 连接出错后重试间隔时间(秒). Defaults to 1
|
|
"""
|
|
super().__init__()
|
|
|
|
self.credential = credential if credential is not None else Credential()
|
|
self.room_display_id = room_display_id
|
|
self.max_retry = max_retry
|
|
self.retry_after = retry_after
|
|
self.__room_real_id = None
|
|
self.__status = 0
|
|
self.__ws = None
|
|
self.__tasks = []
|
|
self.__debug = debug
|
|
self.__heartbeat_timer = 60.0
|
|
self.err_reason = ""
|
|
|
|
|
|
self.logger = logging.getLogger(f"LiveDanmaku_{self.room_display_id}")
|
|
self.logger.setLevel(logging.DEBUG if debug else logging.INFO)
|
|
if not self.logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(
|
|
logging.Formatter(
|
|
"["
|
|
+ str(room_display_id)
|
|
+ "][%(asctime)s][%(levelname)s] %(message)s"
|
|
)
|
|
)
|
|
self.logger.addHandler(handler)
|
|
|
|
def get_status(self) -> int:
|
|
"""
|
|
获取连接状态
|
|
|
|
Returns:
|
|
int: 0 初始化,1 连接建立中,2 已连接,3 断开连接中,4 已断开,5 错误
|
|
"""
|
|
return self.__status
|
|
|
|
async def connect(self) -> None:
|
|
"""
|
|
连接直播间
|
|
"""
|
|
if self.get_status() == self.STATUS_CONNECTING:
|
|
raise LiveException("正在建立连接中")
|
|
|
|
if self.get_status() == self.STATUS_ESTABLISHED:
|
|
raise LiveException("连接已建立,不可重复调用")
|
|
|
|
if self.get_status() == self.STATUS_CLOSING:
|
|
raise LiveException("正在关闭连接,不可调用")
|
|
|
|
await self.__main()
|
|
|
|
async def disconnect(self) -> None:
|
|
"""
|
|
断开连接
|
|
"""
|
|
if self.get_status() != self.STATUS_ESTABLISHED:
|
|
raise LiveException("尚未连接服务器")
|
|
|
|
self.__status = self.STATUS_CLOSING
|
|
self.logger.info("连接正在关闭")
|
|
|
|
|
|
while len(self.__tasks) > 0:
|
|
self.__tasks.pop().cancel()
|
|
|
|
self.__status = self.STATUS_CLOSED
|
|
await self.__ws.close()
|
|
|
|
self.logger.info("连接已关闭")
|
|
|
|
async def __main(self) -> None:
|
|
"""
|
|
入口
|
|
"""
|
|
self.__status = self.STATUS_CONNECTING
|
|
|
|
room = LiveRoom(self.room_display_id, self.credential)
|
|
self.logger.info(f"准备连接直播间 {self.room_display_id}")
|
|
|
|
self.logger.debug("正在获取真实房间号")
|
|
info = await room.get_room_play_info()
|
|
self.__room_real_id = info["room_id"]
|
|
self.logger.debug(f"获取成功,真实房间号:{self.__room_real_id}")
|
|
|
|
|
|
self.logger.debug("正在获取聊天服务器配置")
|
|
conf = await room.get_danmu_info()
|
|
self.logger.debug("聊天服务器配置获取成功")
|
|
|
|
|
|
self.logger.debug("准备连接直播间")
|
|
session = get_aiohttp_session()
|
|
available_hosts: List[dict] = conf["host_list"]
|
|
retry = self.max_retry
|
|
host = None
|
|
|
|
@self.on("TIMEOUT")
|
|
async def on_timeout(ev):
|
|
|
|
self.err_reason = "心跳响应超时"
|
|
await self.__ws.close()
|
|
|
|
while True:
|
|
self.err_reason = ""
|
|
|
|
self.__heartbeat_timer = 0
|
|
if not available_hosts:
|
|
self.err_reason = "已尝试所有主机但仍无法连接"
|
|
break
|
|
|
|
if host is None or retry <= 0:
|
|
host = available_hosts.pop()
|
|
retry = self.max_retry
|
|
|
|
port = host["wss_port"]
|
|
protocol = "wss"
|
|
uri = f"{protocol}://{host['host']}:{port}/sub"
|
|
self.__status = self.STATUS_CONNECTING
|
|
self.logger.info(f"正在尝试连接主机: {uri}")
|
|
|
|
try:
|
|
async with session.ws_connect(uri, headers=HEADERS.copy()) as ws:
|
|
|
|
@self.on("VERIFICATION_SUCCESSFUL")
|
|
async def on_verification_successful(data):
|
|
|
|
while len(self.__tasks) > 0:
|
|
self.__tasks.pop().cancel()
|
|
self.__tasks.append(asyncio.create_task(self.__heartbeat(ws)))
|
|
|
|
self.__ws = ws
|
|
self.logger.debug("连接主机成功, 准备发送认证信息")
|
|
await self.__send_verify_data(ws, conf["token"])
|
|
|
|
async for msg in ws:
|
|
if msg.type == aiohttp.WSMsgType.BINARY:
|
|
self.logger.debug(f"收到原始数据:{msg.data}")
|
|
await self.__handle_data(msg.data)
|
|
|
|
elif msg.type == aiohttp.WSMsgType.ERROR:
|
|
self.__status = self.STATUS_ERROR
|
|
self.logger.error("出现错误")
|
|
|
|
elif msg.type == aiohttp.WSMsgType.CLOSING:
|
|
self.logger.debug("连接正在关闭")
|
|
self.__status = self.STATUS_CLOSING
|
|
|
|
elif msg.type == aiohttp.WSMsgType.CLOSED:
|
|
self.logger.info("连接已关闭")
|
|
self.__status = self.STATUS_CLOSED
|
|
|
|
|
|
if self.__status != self.STATUS_CLOSED or self.err_reason:
|
|
|
|
self.logger.warning(
|
|
"非正常关闭连接" if not self.err_reason else self.err_reason
|
|
)
|
|
else:
|
|
break
|
|
|
|
except Exception as e:
|
|
await ws.close()
|
|
self.logger.warning(e)
|
|
if retry <= 0 or len(available_hosts) == 0:
|
|
self.logger.error("无法连接服务器")
|
|
self.err_reason = "无法连接服务器"
|
|
break
|
|
|
|
self.logger.warning(f"将在 {self.retry_after} 秒后重新连接...")
|
|
self.__status = self.STATUS_ERROR
|
|
retry -= 1
|
|
await asyncio.sleep(self.retry_after)
|
|
|
|
async def __handle_data(self, data) -> None:
|
|
"""
|
|
处理数据
|
|
"""
|
|
data = self.__unpack(data)
|
|
self.logger.debug(f"收到信息:{data}")
|
|
|
|
for info in data:
|
|
callback_info = {
|
|
"room_display_id": self.room_display_id,
|
|
"room_real_id": self.__room_real_id,
|
|
}
|
|
|
|
if (
|
|
info["datapack_type"]
|
|
== LiveDanmaku.DATAPACK_TYPE_VERIFY_SUCCESS_RESPONSE
|
|
):
|
|
|
|
if info["data"]["code"] == 0:
|
|
|
|
self.logger.info("连接服务器并认证成功")
|
|
self.__status = self.STATUS_ESTABLISHED
|
|
callback_info["type"] = "VERIFICATION_SUCCESSFUL"
|
|
callback_info["data"] = None
|
|
self.dispatch("VERIFICATION_SUCCESSFUL", callback_info)
|
|
self.dispatch("ALL", callback_info)
|
|
|
|
elif info["datapack_type"] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE:
|
|
|
|
self.logger.debug("收到心跳包反馈")
|
|
|
|
self.__heartbeat_timer = 30.0
|
|
callback_info["type"] = "VIEW"
|
|
callback_info["data"] = info["data"]["view"]
|
|
self.dispatch("VIEW", callback_info)
|
|
self.dispatch("ALL", callback_info)
|
|
|
|
elif info["datapack_type"] == LiveDanmaku.DATAPACK_TYPE_NOTICE:
|
|
|
|
callback_info["type"] = info["data"]["cmd"]
|
|
|
|
|
|
if callback_info["type"].find("DANMU_MSG") > -1:
|
|
callback_info["type"] = "DANMU_MSG"
|
|
info["data"]["cmd"] = "DANMU_MSG"
|
|
|
|
callback_info["data"] = info["data"]
|
|
self.dispatch(callback_info["type"], callback_info)
|
|
self.dispatch("ALL", callback_info)
|
|
|
|
else:
|
|
self.logger.warning("检测到未知的数据包类型,无法处理")
|
|
|
|
async def __send_verify_data(self, ws: ClientWebSocketResponse, token: str) -> None:
|
|
self.credential.raise_for_no_buvid3()
|
|
|
|
if not self.credential.has_dedeuserid():
|
|
try:
|
|
info = await get_self_info(self.credential)
|
|
self.credential.dedeuserid = str(info["uid"])
|
|
except:
|
|
pass
|
|
self.credential.raise_for_no_dedeuserid()
|
|
verifyData = {
|
|
"uid": int(self.credential.dedeuserid),
|
|
"roomid": self.__room_real_id,
|
|
"protover": 3,
|
|
"buvid": self.credential.buvid3,
|
|
"platform": "web",
|
|
"type": 2,
|
|
"key": token,
|
|
}
|
|
data = json.dumps(verifyData).encode()
|
|
await self.__send(
|
|
data, self.PROTOCOL_VERSION_HEARTBEAT, self.DATAPACK_TYPE_VERIFY, ws
|
|
)
|
|
|
|
async def __heartbeat(self, ws: ClientWebSocketResponse) -> None:
|
|
"""
|
|
定时发送心跳包
|
|
"""
|
|
HEARTBEAT = self.__pack(
|
|
b"[object Object]",
|
|
self.PROTOCOL_VERSION_HEARTBEAT,
|
|
self.DATAPACK_TYPE_HEARTBEAT,
|
|
)
|
|
while True:
|
|
if self.__heartbeat_timer == 0:
|
|
self.logger.debug("发送心跳包")
|
|
await ws.send_bytes(HEARTBEAT)
|
|
heartbeat_url = "https://live-trace.bilibili.com/xlive/rdata-interface/v1/heartbeat/webHeartBeat?pf=web&hb="
|
|
hb = str(
|
|
base64.b64encode(f"60|{self.room_display_id}|1|0".encode("utf-8")),
|
|
"utf-8",
|
|
)
|
|
await Api(
|
|
method="GET", url=heartbeat_url, json_body=True
|
|
).update_params(**{"hb": hb, "pf": "web"}).result
|
|
elif self.__heartbeat_timer <= -30:
|
|
|
|
self.dispatch("TIMEOUT")
|
|
break
|
|
|
|
await asyncio.sleep(1.0)
|
|
self.__heartbeat_timer -= 1
|
|
|
|
async def __send(
|
|
self,
|
|
data: bytes,
|
|
protocol_version: int,
|
|
datapack_type: int,
|
|
ws: ClientWebSocketResponse,
|
|
) -> None:
|
|
"""
|
|
自动打包并发送数据
|
|
"""
|
|
data = self.__pack(data, protocol_version, datapack_type)
|
|
self.logger.debug(f"发送原始数据:{data}")
|
|
await ws.send_bytes(data)
|
|
|
|
@staticmethod
|
|
def __pack(data: bytes, protocol_version: int, datapack_type: int) -> bytes:
|
|
"""
|
|
打包数据
|
|
"""
|
|
sendData = bytearray()
|
|
sendData += struct.pack(">H", 16)
|
|
raise_for_statement(0 <= protocol_version <= 2, LiveException("数据包协议版本错误,范围 0~2"))
|
|
sendData += struct.pack(">H", protocol_version)
|
|
raise_for_statement(datapack_type in [2, 7], LiveException("数据包类型错误,可用类型:2, 7"))
|
|
sendData += struct.pack(">I", datapack_type)
|
|
sendData += struct.pack(">I", 1)
|
|
sendData += data
|
|
sendData = struct.pack(">I", len(sendData) + 4) + sendData
|
|
return bytes(sendData)
|
|
|
|
@staticmethod
|
|
def __unpack(data: bytes) -> List[Any]:
|
|
"""
|
|
解包数据
|
|
"""
|
|
ret = []
|
|
offset = 0
|
|
header = struct.unpack(">IHHII", data[:16])
|
|
if header[2] == LiveDanmaku.PROTOCOL_VERSION_BROTLI_JSON:
|
|
realData = brotli.decompress(data[16:])
|
|
else:
|
|
realData = data
|
|
|
|
if (
|
|
header[2] == LiveDanmaku.PROTOCOL_VERSION_HEARTBEAT
|
|
and header[3] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE
|
|
):
|
|
realData = realData[16:]
|
|
|
|
recvData = {
|
|
"protocol_version": header[2],
|
|
"datapack_type": header[3],
|
|
"data": {"view": struct.unpack(">I", realData[0:4])[0]},
|
|
}
|
|
ret.append(recvData)
|
|
return ret
|
|
|
|
while offset < len(realData):
|
|
header = struct.unpack(">IHHII", realData[offset: offset + 16])
|
|
length = header[0]
|
|
recvData = {
|
|
"protocol_version": header[2],
|
|
"datapack_type": header[3],
|
|
"data": None,
|
|
}
|
|
chunkData = realData[(offset + 16): (offset + length)]
|
|
if header[2] == 0:
|
|
recvData["data"] = json.loads(chunkData.decode())
|
|
elif header[2] == 2:
|
|
recvData["data"] = json.loads(chunkData.decode())
|
|
elif header[2] == 1:
|
|
if header[3] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE:
|
|
recvData["data"] = {"view": struct.unpack(">I", chunkData)[0]}
|
|
elif header[3] == LiveDanmaku.DATAPACK_TYPE_VERIFY_SUCCESS_RESPONSE:
|
|
recvData["data"] = json.loads(chunkData.decode())
|
|
ret.append(recvData)
|
|
offset += length
|
|
return ret
|
|
|
|
|
|
async def get_self_info(credential: Credential) -> dict:
|
|
"""
|
|
获取自己直播等级、排行等信息
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["user_info"]
|
|
return await Api(**api, credential=credential).result
|
|
|
|
|
|
async def get_self_live_info(credential: Credential) -> dict:
|
|
"""
|
|
获取自己的粉丝牌、大航海等信息
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
|
|
credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["live_info"]
|
|
return await Api(**api, credential=credential).result
|
|
|
|
|
|
async def get_self_dahanghai_info(
|
|
page: int = 1, page_size: int = 10, credential: Union[Credential, None] = None
|
|
) -> dict:
|
|
"""
|
|
获取自己开通的大航海信息
|
|
|
|
Args:
|
|
page (int, optional): 页数. Defaults to 1.
|
|
|
|
page_size (int, optional): 每页数量. Defaults to 10.
|
|
|
|
总页数取得方法:
|
|
|
|
```python
|
|
import math
|
|
|
|
info = live.get_self_live_info(credential)
|
|
pages = math.ceil(info['data']['guards'] / 10)
|
|
```
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
if credential is None:
|
|
credential = Credential()
|
|
|
|
credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["user_guards"]
|
|
params = {"page": page, "page_size": page_size}
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def get_self_bag(credential: Credential) -> dict:
|
|
"""
|
|
获取自己的直播礼物包裹信息
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
|
|
credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["bag_list"]
|
|
return await Api(**api, credential=credential).result
|
|
|
|
|
|
async def get_gift_config(
|
|
room_id: Union[int, None] = None,
|
|
area_id: Union[int, None] = None,
|
|
area_parent_id: Union[int, None] = None,
|
|
):
|
|
"""
|
|
获取所有礼物的信息,包括礼物 id、名称、价格、等级等。
|
|
|
|
同时填了 room_id、area_id、area_parent_id,则返回一个较小的 json,只包含该房间、该子区域、父区域的礼物。
|
|
|
|
但即使限定了三个条件,仍然会返回约 1.5w 行的 json。不加限定则是 2.8w 行。
|
|
|
|
Args:
|
|
room_id (int, optional) : 房间显示 ID. Defaults to None.
|
|
area_id (int, optional) : 子分区 ID. Defaults to None.
|
|
area_parent_id (int, optional) : 父分区 ID. Defaults to None.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["gift_config"]
|
|
params = {
|
|
"platform": "pc",
|
|
"source": "live",
|
|
"room_id": room_id if room_id is not None else "",
|
|
"area_id": area_id if area_id is not None else "",
|
|
"area_parent_id": area_parent_id if area_parent_id is not None else "",
|
|
}
|
|
return await Api(**api).update_params(**params).result
|
|
|
|
|
|
async def get_area_info() -> dict:
|
|
"""
|
|
获取所有分区信息
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["info"]["area_info"]
|
|
return await Api(**api).result
|
|
|
|
|
|
async def get_live_followers_info(
|
|
need_recommend: bool = True, credential: Union[Credential, None] = None
|
|
) -> dict:
|
|
"""
|
|
获取关注列表中正在直播的直播间信息,包括房间直播热度,房间名称及标题,清晰度,是否官方认证等信息。
|
|
|
|
Args:
|
|
need_recommend (bool, optional): 是否接受推荐直播间,Defaults to True
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
if credential is None:
|
|
credential = Credential()
|
|
|
|
credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["followers_live_info"]
|
|
params = {"need_recommend": int(need_recommend), "filterRule": 0}
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def get_unlive_followers_info(
|
|
page: int = 1, page_size: int = 30, credential: Union[Credential, None] = None
|
|
) -> dict:
|
|
"""
|
|
获取关注列表中未在直播的直播间信息,包括上次开播时间,上次开播的类别,直播间公告,是否有录播等。
|
|
|
|
Args:
|
|
page (int, optional): 页码, Defaults to 1.
|
|
|
|
page_size (int, optional): 每页数量 Defaults to 30.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
if credential is None:
|
|
credential = Credential()
|
|
|
|
credential.raise_for_no_sessdata()
|
|
|
|
api = API["info"]["followers_unlive_info"]
|
|
params = {
|
|
"page": page,
|
|
"pagesize": page_size,
|
|
}
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def create_live_reserve(
|
|
title: str, start_time: int, credential: Credential
|
|
) -> dict:
|
|
"""
|
|
创建直播预约
|
|
|
|
Args:
|
|
title (str) : 直播间标题
|
|
|
|
start_time (int) : 开播时间戳
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
credential.raise_for_no_sessdata()
|
|
|
|
api = API["operate"]["create_reserve"]
|
|
data = {
|
|
"title": title,
|
|
"type": 2,
|
|
"live_plan_start_time": start_time,
|
|
"stime": None,
|
|
"from": 1,
|
|
}
|
|
return await Api(**api, credential=credential).update_data(**data).result
|
|
|