rogerxavier's picture
Upload 258 files
0aee47a verified
"""
bilibili_api.dynamic
动态相关
"""
import os
import re
import sys
import json
import asyncio
from enum import Enum
from datetime import datetime
from typing import Any, List, Tuple, Union, Optional
import httpx
from .user import name2uid_sync
from .utils import utils
from .utils.sync import sync
from .utils.picture import Picture
from . import user, vote, exceptions
from .utils.credential import Credential
from .utils.network import Api
from .utils import cache_pool
from .exceptions.DynamicExceedImagesException import DynamicExceedImagesException
from . import opus
API = utils.get_api("dynamic")
raise_for_statement = utils.raise_for_statement
class DynamicType(Enum):
"""
动态类型
+ ALL: 所有动态
+ ANIME: 追番追剧
+ ARTICLE: 文章
+ VIDEO: 视频投稿
"""
ALL = "all"
ANIME = "pgc"
ARTICLE = "article"
VIDEO = "video"
class SendDynamicType(Enum):
"""
发送动态类型
scene 参数
+ TEXT: 纯文本
+ IMAGE: 图片
"""
TEXT = 1
IMAGE = 2
class DynamicContentType(Enum):
"""
动态内容类型
+ TEXT: 文本
+ EMOJI: 表情
+ AT: @User
+ VOTE: 投票
"""
TEXT = 1
EMOJI = 9
AT = 2
VOTE = 4
async def _parse_at(text: str) -> Tuple[str, str, str]:
"""
@人格式:“@用户名 ”(注意最后有空格)
Args:
text (str): 原始文本
Returns:
tuple(str, str(int[]), str(dict)): 替换后文本,解析出艾特的 UID 列表,AT 数据
"""
text += " "
pattern = re.compile(r"(?<=@).*?(?=\s)")
match_result = re.finditer(pattern, text)
uid_list = []
names = []
for match in match_result:
uname = match.group()
try:
uid = (await user.name2uid(uname))["uid_list"][0]["uid"]
except KeyError:
# 没有此用户
continue
uid_list.append(str(uid))
names.append(uname + " ")
at_uids = ",".join(uid_list)
ctrl = []
last_index = 0
for i, name in enumerate(names):
index = text.index(f"@{name}", last_index)
last_index = index + 1
length = 2 + len(name)
ctrl.append(
{"location": index, "type": 1, "length": length, "data": int(uid_list[i])}
)
return text, at_uids, json.dumps(ctrl, ensure_ascii=False)
async def _get_text_data(text: str) -> dict:
"""
获取文本动态请求参数
Args:
text (str): 文本内容
Returns:
dict: 文本动态请求数据
"""
new_text, at_uids, ctrl = await _parse_at(text)
data = {
"dynamic_id": 0,
"type": 4,
"rid": 0,
"content": new_text,
"extension": '{"emoji_type":1}',
"at_uids": at_uids,
"ctrl": ctrl,
}
return data
async def _get_draw_data(
text: str, images: List[Picture], credential: Credential
) -> dict:
"""
获取图片动态请求参数,将会自动上传图片
Args:
text (str) : 文本内容
images (List[Picture]): 图片流
"""
new_text, at_uids, ctrl = await _parse_at(text)
images_info = await asyncio.gather(
*[upload_image(stream, credential) for stream in images]
)
def transformPicInfo(image: dict):
return {
"img_src": image["image_url"],
"img_width": image["image_width"],
"img_height": image["image_height"],
}
pictures = list(map(transformPicInfo, images_info))
data = {
"biz": 3,
"category": 3,
"type": 0,
"pictures": json.dumps(pictures),
"title": "",
"tags": "",
"description": new_text,
"content": new_text,
"from": "create.dynamic.web",
"up_choose_comment": 0,
"extension": json.dumps(
{"emoji_type": 1, "from": {"emoji_type": 1}, "flag_cfg": {}}
),
"at_uids": at_uids,
"at_control": ctrl,
"setting": json.dumps({"copy_forbidden": 0, "cachedTime": 0}),
}
return data
async def upload_image(
image: Picture, credential: Credential, data: dict = None
) -> dict:
"""
上传动态图片
Args:
image (Picture) : 图片流. 有格式要求.
credential (Credential): 凭据
data (dict): 自定义请求体
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
credential.raise_for_no_bili_jct()
api = API["send"]["upload_img"]
raw = image.content
if data is None:
data = {"biz": "new_dyn", "category": "daily"}
files = {"file_up": open(image._write_to_temp_file(), "rb")}
return_info = (
await Api(**api, credential=credential).update_data(**data).request(files=files)
)
return return_info
def upload_image_sync(
image: Picture, credential: Credential, data: dict = None
) -> dict:
"""
上传动态图片 (同步函数)
Args:
image (Picture) : 图片流. 有格式要求.
credential (Credential): 凭据
data (dict): 自定义请求体
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
credential.raise_for_no_bili_jct()
api = API["send"]["upload_img"]
raw = image.content
if data is None:
data = {"biz": "new_dyn", "category": "daily"}
files = {"file_up": open(image._write_to_temp_file(), "rb")}
return_info = (
Api(**api, credential=credential).update_data(**data).request_sync(files=files)
)
return return_info
class BuildDynamic:
"""
构建动态内容. 提供两种 API.
- 1. 链式调用构建
``` python
BuildDynamic.empty().add_plain_text("114514").add_image(Picture.from_url("https://www.bilibili.com/favicon.ico"))
```
- 2. 参数构建
``` python
BuildDynamic.create_by_args(text="114514", topic_id=114514)
```
"""
def __init__(self) -> None:
"""
构建动态内容
"""
self.contents: list = []
self.pics: List[Picture] = []
self.attach_card: Optional[dict] = None
self.topic: Optional[dict] = None
self.options: dict = {}
self.time: Optional[datetime] = None
@staticmethod
def empty():
"""
新建空的动态以链式逐步构建
"""
return BuildDynamic()
@staticmethod
def create_by_args(
text: str = "",
pics: List[Picture] = [],
topic_id: int = -1,
vote_id: int = -1,
live_reserve_id: int = -1,
send_time: Union[datetime, None] = None,
):
"""
通过参数构建动态
Args:
text (str , optional): 动态文字. Defaults to "".
pics (List[Picture] , optional): 动态图片列表. Defaults to [].
topic_id (int , optional): 动态话题 id. Defaults to -1.
vote_id (int , optional): 动态中的投票的 id. 将放在整个动态的最后面. Defaults to -1.
live_reserve_id (int , optional): 直播预约 oid. 通过 `live.create_live_reserve` 获取. Defaults to -1.
send_time (datetime | None, optional): 发送时间. Defaults to None.
"""
dyn = BuildDynamic()
dyn.add_text(text)
dyn.add_image(pics)
if topic_id != -1:
dyn.set_topic(topic_id)
if vote_id != -1:
dyn.add_vote(vote.Vote(vote_id=vote_id))
if live_reserve_id != -1:
dyn.set_attach_card(live_reserve_id)
if send_time != None:
dyn.set_send_time(send_time)
return dyn
def add_plain_text(self, text: str) -> "BuildDynamic":
"""
添加纯文本
Args:
text (str): 文本内容
"""
self.contents.append(
{"biz_id": "", "type": DynamicContentType.TEXT.value, "raw_text": text}
)
return self
def add_at(self, uid: Union[int, user.User]) -> "BuildDynamic":
"""
添加@用户,支持传入 User 类或 UID
Args:
uid (Union[int, user.User]): 用户ID
"""
if isinstance(uid, user.User):
uid = uid.__uid
name = user.User(uid).get_user_info_sync().get("name")
self.contents.append(
{"biz_id": uid, "type": DynamicContentType.AT.value, "raw_text": f"@{name}"}
)
return self
def add_emoji(self, emoji_id: int) -> "BuildDynamic":
"""
添加表情
Args:
emoji_id (int): 表情ID
"""
with open(
os.path.join(os.path.dirname(__file__), "data/emote.json"), encoding="UTF-8"
) as f:
emote_info = json.load(f)
if str(emoji_id) not in emote_info:
raise ValueError("不存在此表情")
self.contents.append(
{
"biz_id": "",
"type": DynamicContentType.EMOJI.value,
"raw_text": emote_info[str(emoji_id)],
}
)
return self
def add_vote(self, vote: vote.Vote) -> "BuildDynamic":
vote.get_info_sync()
self.contents.append(
{
"biz_id": str(vote.get_vote_id()),
"type": DynamicContentType.VOTE.value,
"raw_text": vote.title,
}
)
return self
def add_image(self, image: Union[List[Picture], Picture]) -> "BuildDynamic":
"""
添加图片
Args:
image (Picture | List[Picture]): 图片类
"""
if isinstance(image, Picture):
image = [image]
self.pics += image
return self
def add_text(self, text: str) -> "BuildDynamic":
"""
添加文本 (可包括 at, 表情包)
Args:
text (str): 文本内容
"""
def _get_ats(text: str) -> List:
text += " "
pattern = re.compile(r"(?<=@).*?(?=\s)")
match_result = re.finditer(pattern, text)
uid_list = []
names = []
for match in match_result:
uname = match.group()
try:
name_to_uid_resp = name2uid_sync(uname)
uid = name_to_uid_resp["uid_list"][0]["uid"]
except KeyError:
# 没有此用户
continue
uid_list.append(str(uid))
names.append(uname)
data = []
last_index = 0
for i, name in enumerate(names):
index = text.index(f"@{name}", last_index)
last_index = index + 1
length = 2 + len(name)
data.append(
{
"location": index,
"length": length,
"text": f"@{name} ",
"type": "at",
"uid": uid_list[i],
}
)
return data
def _get_emojis(text: str) -> List:
with open(
os.path.join(os.path.dirname(__file__), "data/emote.json"),
encoding="UTF-8",
) as f:
emote_info = json.load(f)
all_emojis = []
for key, item in emote_info.items():
all_emojis.append(item)
pattern = re.compile(r"(?<=\[).*?(?=\])")
match_result = re.finditer(pattern, text)
emotes = []
for match in match_result:
emote = match.group(0)
if f"[{emote}]" not in all_emojis:
continue
emotes.append(f"[{emote}]")
data = []
last_index = 0
for i, emoji in enumerate(emotes):
index = text.index(emoji, last_index)
last_index = index + 1
length = len(emoji)
data.append(
{
"location": index,
"length": length,
"text": emoji,
"type": "emoji",
}
)
return data
all_at_and_emoji = _get_ats(text) + _get_emojis(text)
def split_text_to_plain_at_and_emoji(text: str, at_and_emoji: List):
def base_split(texts: List[str], at_and_emoji: List, last_length: int):
if len(at_and_emoji) == 0:
return texts
last_piece_of_text = texts.pop(-1)
next_at_or_emoji = at_and_emoji.pop(0)
texts += [
last_piece_of_text[: next_at_or_emoji["location"] - last_length],
next_at_or_emoji,
last_piece_of_text[
next_at_or_emoji["location"]
+ next_at_or_emoji["length"]
- last_length :
],
]
last_length += (
next_at_or_emoji["length"]
+ next_at_or_emoji["location"]
- last_length
)
return base_split(texts, at_and_emoji, last_length)
old_recursion_limit = sys.getrecursionlimit()
sys.setrecursionlimit(100000)
all_pieces = base_split([text], at_and_emoji, 0)
sys.setrecursionlimit(old_recursion_limit)
return all_pieces
all_pieces = split_text_to_plain_at_and_emoji(text, all_at_and_emoji)
for piece in all_pieces:
if isinstance(piece, str):
self.add_plain_text(piece)
elif piece["type"] == "at":
self.contents.append(
{
"biz_id": piece["uid"],
"type": DynamicContentType.AT.value,
"raw_text": piece["text"],
}
)
else:
self.contents.append(
{
"biz_id": "",
"type": DynamicContentType.EMOJI.value,
"raw_text": piece["text"],
}
)
return self
def set_attach_card(self, oid: int) -> "BuildDynamic":
"""
设置直播预约
在 live.create_live_reserve 中获取 oid
Args:
oid (int): 卡片oid
"""
self.attach_card = {
"type": 14,
"biz_id": oid,
"reserve_source": 1, # 疑似0为视频预告但没法验证...
"reserve_lottery": 0,
}
return self
def set_topic(self, topic_id: int) -> "BuildDynamic":
"""
设置话题
Args:
topic_id (int): 话题ID
"""
self.topic = {"id": topic_id}
return self
def set_options(
self, up_choose_comment: bool = False, close_comment: bool = False
) -> "BuildDynamic":
"""
设置选项
Args:
up_choose_comment (bool): 精选评论flag
close_comment (bool): 关闭评论flag
"""
if up_choose_comment:
self.options["up_choose_comment"] = 1
if close_comment:
self.options["close_comment"] = 1
return self
def set_send_time(self, time: datetime):
"""
设置发送时间
Args:
time (datetime): 发送时间
"""
self.time = time
return self
def get_dynamic_type(self) -> SendDynamicType:
if len(self.pics) != 0:
return SendDynamicType.IMAGE
return SendDynamicType.TEXT
def get_contents(self) -> list:
return self.contents
def get_pics(self) -> list:
return self.pics
def get_attach_card(self) -> Optional[dict]:
return self.attach_card
def get_topic(self) -> Optional[dict]:
return self.topic
def get_options(self) -> dict:
return self.options
async def send_dynamic(info: BuildDynamic, credential: Credential):
"""
发送动态
Args:
info (BuildDynamic): 动态内容
credential (Credential): 凭据
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
credential.raise_for_no_bili_jct()
pic_data = []
for image in info.pics:
await image.upload_file(credential)
pic_data.append(
{"img_src": image.url, "img_width": image.width, "img_height": image.height}
)
async def schedule(type_: int):
api = API["send"]["schedule"]
text = "".join(
[part["raw_text"] for part in info.contents if part["type"] != 4]
)
send_time = info.time
if len(info.pics) > 0:
# 画册动态
request_data = await _get_draw_data(text, info.pics, credential) # type: ignore
request_data.pop("setting")
else:
# 文字动态
request_data = await _get_text_data(text)
data = {
"type": type_,
"publish_time": int(send_time.timestamp()), # type: ignore
"request": json.dumps(request_data, ensure_ascii=False),
}
return await Api(**api, credential=credential).update_data(**data).result
if info.time != None:
return await schedule(2 if len(info.pics) == 0 else 4)
api = API["send"]["instant"]
data = {
"dyn_req": {
"content": {"contents": info.get_contents()}, # 必要参数
"scene": info.get_dynamic_type().value, # 必要参数
"meta": {
"app_meta": {"from": "create.dynamic.web", "mobi_app": "web"},
},
}
}
if len(info.get_pics()) != 0:
data["dyn_req"]["pics"] = pic_data
if info.get_topic() is not None:
data["dyn_req"]["topic"] = info.get_topic()
if len(info.get_options()) > 0:
data["dyn_req"]["option"] = info.get_options()
if info.get_attach_card() is not None:
data["dyn_req"]["attach_card"] = {}
data["dyn_req"]["attach_card"]["common_card"] = info.get_attach_card()
else:
data["dyn_req"]["attach_card"] = None
params = {"csrf": credential.bili_jct}
send_result = (
await Api(**api, credential=credential, json_body=True)
.update_data(**data)
.update_params(**params)
.result
)
return send_result
# 定时动态操作
async def get_schedules_list(credential: Credential) -> dict:
"""
获取待发送定时动态列表
Args:
credential (Credential): 凭据
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
api = API["schedule"]["list"]
return await Api(**api, credential=credential).result
async def send_schedule_now(draft_id: int, credential: Credential) -> dict:
"""
立即发送定时动态
Args:
draft_id (int): 定时动态 ID
credential (Credential): 凭据
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
api = API["schedule"]["publish_now"]
data = {"draft_id": draft_id}
return await Api(**api, credential=credential).update_data(**data).result
async def delete_schedule(draft_id: int, credential: Credential) -> dict:
"""
删除定时动态
Args:
draft_id (int): 定时动态 ID
credential (Credential): 凭据
Returns:
dict: 调用 API 返回的结果
"""
credential.raise_for_no_sessdata()
api = API["schedule"]["delete"]
data = {"draft_id": draft_id}
return await Api(**api, credential=credential).update_data(**data).result
class Dynamic:
"""
动态类
Attributes:
credential (Credential): 凭据类
"""
def __init__(
self, dynamic_id: int, credential: Union[Credential, None] = None
) -> None:
"""
Args:
dynamic_id (int) : 动态 ID
credential (Credential | None, optional): 凭据类. Defaults to None.
"""
self.__dynamic_id = dynamic_id
self.credential = credential if credential is not None else Credential()
if cache_pool.dynamic_is_opus.get(self.__dynamic_id):
self.__opus = cache_pool.dynamic_is_opus[self.__dynamic_id]
else:
api = API["info"]["detail"]
params = {
"id": self.__dynamic_id,
"timezone_offset": -480,
"features": "itemOpusStyle",
}
data = (
Api(**api, credential=self.credential)
.update_params(**params)
.result_sync
)
self.__opus = data["item"]["basic"]["comment_type"] != 11
cache_pool.dynamic_is_opus[self.__dynamic_id] = self.__opus
def get_dynamic_id(self) -> int:
return self.__dynamic_id
def is_opus(self) -> DynamicType:
"""
判断是否为 opus 动态
Returns:
bool: 是否为 opus 动态
"""
return self.__opus
def turn_to_opus(self) -> "opus.Opus":
"""
对 opus 动态,将其转换为图文
"""
raise_for_statement(self.__opus, "仅支持图文动态")
return opus.Opus(self.__dynamic_id, credential=self.credential)
async def get_info(self, features: str = "itemOpusStyle") -> dict:
"""
(对 Opus 动态,获取动态内容建议使用 Opus.get_detail())
获取动态信息
Args:
features (str, optional): 默认 itemOpusStyle.
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["detail"]
params = {
"id": self.__dynamic_id,
"timezone_offset": -480,
"features": features,
}
data = (
await Api(**api, credential=self.credential).update_params(**params).result
)
return data
async def get_reaction(self, offset: str = "") -> dict:
"""
获取点赞、转发
Args:
offset (str, optional): 偏移值(下一页的第一个动态 ID,为该请求结果中的 offset 键对应的值),类似单向链表. Defaults to ""
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["reaction"]
params = {"web_location": "333.1369", "offset": "", "id": self.get_dynamic_id()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_reposts(self, offset: str = "0") -> dict:
"""
获取动态转发列表
Args:
offset (str, optional): 偏移值(下一页的第一个动态 ID,为该请求结果中的 offset 键对应的值),类似单向链表. Defaults to "0"
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["repost"]
params: dict[str, Any] = {"dynamic_id": self.__dynamic_id}
if offset != "0":
params["offset"] = offset
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_likes(self, pn: int = 1, ps: int = 30) -> dict:
"""
获取动态点赞列表
Args:
pn (int, optional): 页码,defaults to 1
ps (int, optional): 每页大小,defaults to 30
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["likes"]
params = {"dynamic_id": self.__dynamic_id, "pn": pn, "ps": ps}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def set_like(self, status: bool = True) -> dict:
"""
设置动态点赞状态
Args:
status (bool, optional): 点赞状态. Defaults to True.
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["like"]
user_info = await user.get_self_info(credential=self.credential)
self_uid = user_info["mid"]
data = {
"dynamic_id": self.__dynamic_id,
"up": 1 if status else 2,
"uid": self_uid,
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def delete(self) -> dict:
"""
删除动态
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["operate"]["delete"]
data = {"dynamic_id": self.__dynamic_id}
return await Api(**api, credential=self.credential).update_data(**data).result
async def repost(self, text: str = "转发动态") -> dict:
"""
转发动态
Args:
text (str, optional): 转发动态时的文本内容. Defaults to "转发动态"
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
api = API["operate"]["repost"]
data = await _get_text_data(text)
data["dynamic_id"] = self.__dynamic_id
return await Api(**api, credential=self.credential).update_data(**data).result
async def get_new_dynamic_users(credential: Union[Credential, None] = None) -> dict:
"""
获取更新动态的关注者
Args:
credential (Credential | None): 凭据类. Defaults to None.
Returns:
dict: 调用 API 返回的结果
"""
credential = credential if credential else Credential()
credential.raise_for_no_sessdata()
api = API["info"]["attention_new_dynamic"]
return await Api(**api, credential=credential).result
async def get_live_users(
size: int = 10, credential: Union[Credential, None] = None
) -> dict:
"""
获取正在直播的关注者
Args:
size (int) : 获取的数据数量. Defaults to 10.
credential (Credential | None): 凭据类. Defaults to None.
Returns:
dict: 调用 API 返回的结果
"""
credential = credential if credential else Credential()
credential.raise_for_no_sessdata()
api = API["info"]["attention_live"]
params = {"size": size}
return await Api(**api, credential=credential).update_params(**params).result
async def get_dynamic_page_UPs_info(credential: Credential) -> dict:
"""
获取动态页 UP 主列表
Args:
credential (Credential): 凭据类.
Returns:
dict: 调用 API 返回的结果
"""
api = API["info"]["dynamic_page_UPs_info"]
return await Api(**api, credential=credential).result
async def get_dynamic_page_info(
credential: Credential,
_type: Optional[DynamicType] = None,
host_mid: Optional[int] = None,
features: str = "itemOpusStyle",
pn: int = 1,
offset: Optional[int] = None,
) -> List[Dynamic]:
"""
获取动态页动态信息
获取全部动态或者相应类型需传入 _type
获取指定 UP 主动态需传入 host_mid
Args:
credential (Credential): 凭据类.
_type (DynamicType, optional): 动态类型. Defaults to DynamicType.ALL.
host_mid (int, optional): 获取对应 UP 主动态的 mid. Defaults to None.
features (str, optional): 默认 itemOpusStyle.
pn (int, optional): 页码. Defaults to 1.
offset (int, optional): 偏移值(下一页的第一个动态 ID,为该请求结果中的 offset 键对应的值),类似单向链表. Defaults to None.
Returns:
list[Dynamic]: 动态类列表
"""
api = API["info"]["dynamic_page_info"]
params = {
"timezone_offset": -480,
"features": features,
"page": pn,
}
params.update({"offset": offset} if offset else {})
if _type: # 全部动态
params["type"] = _type.value
elif host_mid: # 指定 UP 主动态
params["host_mid"] = host_mid
elif not _type:
api["params"].pop("type")
elif not host_mid:
api["params"].pop("host_mid")
dynmaic_data = (
await Api(**api, credential=credential).update_params(**params).result
)
return [
Dynamic(dynamic_id=int(dynamic["id_str"]), credential=credential)
for dynamic in dynmaic_data["items"]
]