|
"""
|
|
bilibili_api.session
|
|
|
|
消息相关
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import asyncio
|
|
import logging
|
|
import datetime
|
|
from enum import Enum
|
|
from typing import Union, Optional
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
|
|
from bilibili_api.exceptions import ApiException
|
|
|
|
from .video import Video
|
|
from .user import get_self_info
|
|
from .utils.utils import get_api, raise_for_statement
|
|
from .utils.picture import Picture
|
|
from .utils.AsyncEvent import AsyncEvent
|
|
from .utils.credential import Credential
|
|
from .utils.network import Api
|
|
|
|
API = get_api("session")
|
|
|
|
|
|
async def fetch_session_msgs(
|
|
talker_id: int, credential: Credential, session_type: int = 1, begin_seqno: int = 0
|
|
) -> dict:
|
|
"""
|
|
获取指定用户的近三十条消息
|
|
|
|
Args:
|
|
talker_id (int) : 用户 UID
|
|
|
|
credential (Credential): Credential
|
|
|
|
session_type (int) : 会话类型 1 私聊 2 应援团
|
|
|
|
begin_seqno (int) : 起始 Seqno
|
|
|
|
Returns:
|
|
dict: 调用 API 返回结果
|
|
"""
|
|
|
|
credential.raise_for_no_sessdata()
|
|
params = {
|
|
"talker_id": talker_id,
|
|
"session_type": session_type,
|
|
"begin_seqno": begin_seqno,
|
|
}
|
|
api = API["session"]["fetch"]
|
|
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def new_sessions(
|
|
credential: Credential, begin_ts: int = int(time.time() * 1000000)
|
|
) -> dict:
|
|
"""
|
|
获取新消息
|
|
|
|
Args:
|
|
credential (Credential): Credential
|
|
|
|
begin_ts (int) : 起始时间戳
|
|
|
|
Returns:
|
|
dict: 调用 API 返回结果
|
|
"""
|
|
|
|
credential.raise_for_no_sessdata()
|
|
params = {"begin_ts": begin_ts, "build": 0, "mobi_app": "web"}
|
|
api = API["session"]["new"]
|
|
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def get_sessions(credential: Credential, session_type: int = 4) -> dict:
|
|
"""
|
|
获取已有消息
|
|
|
|
Args:
|
|
credential (Credential): Credential
|
|
|
|
session_type (int) : 会话类型 1: 私聊, 2: 通知, 3: 应援团, 4: 全部
|
|
|
|
Returns:
|
|
dict: 调用 API 返回结果
|
|
"""
|
|
|
|
credential.raise_for_no_sessdata()
|
|
params = {
|
|
"session_type": session_type,
|
|
"group_fold": 1,
|
|
"unfollow_fold": 0,
|
|
"sort_rule": 2,
|
|
"build": 0,
|
|
"mobi_app": "web",
|
|
}
|
|
api = API["session"]["get"]
|
|
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def get_session_detail(
|
|
credential: Credential, talker_id: int, session_type: int = 1
|
|
) -> dict:
|
|
"""
|
|
获取会话详情
|
|
|
|
Args:
|
|
credential (Credential): Credential
|
|
|
|
session_type (int) : 会话类型
|
|
|
|
talker_id (int) : 会话对象的UID
|
|
|
|
Returns:
|
|
dict: 调用 API 返回结果
|
|
"""
|
|
|
|
credential.raise_for_no_sessdata()
|
|
params = {"talker_id": talker_id, "session_type": session_type}
|
|
api = API["session"]["get_session_detail"]
|
|
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def get_replies(
|
|
credential: Credential,
|
|
last_reply_id: Optional[int] = None,
|
|
reply_time: Optional[int] = None,
|
|
) -> dict:
|
|
"""
|
|
获取收到的回复
|
|
|
|
Args:
|
|
credential (Credential): 凭据类.
|
|
|
|
last_reply_id (Optional, int) 最后一个评论的 ID
|
|
|
|
reply_time (Optional, int) 最后一个评论发送时间
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["session"]["replies"]
|
|
params = {"id": last_reply_id, "reply_time": reply_time}
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def get_likes(
|
|
credential: Credential, last_id: int = None, like_time: int = None
|
|
) -> dict:
|
|
"""
|
|
获取收到的赞
|
|
|
|
Args:
|
|
credential (Credential): 凭据类.
|
|
|
|
last_id (Optional, int) 最后一个 ID
|
|
|
|
like_time (Optional, int) 最后一个点赞发送时间
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["session"]["likes"]
|
|
params = {"id": last_id, "like_time": like_time}
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def get_at(
|
|
credential: Credential, last_uid: int = None, at_time: int = None
|
|
) -> dict:
|
|
"""
|
|
获取收到的 AT
|
|
|
|
Args:
|
|
credential (Credential): 凭据类.
|
|
|
|
last_id (Optional, int) 最后一个 ID
|
|
|
|
at_time (Optional, int) 最后一个点赞发送时间
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["session"]["at"]
|
|
params = {"id": last_uid, "at_time": at_time}
|
|
return await Api(**api, credential=credential).update_params(**params).result
|
|
|
|
|
|
async def get_unread_messages(credential: Credential) -> dict:
|
|
"""
|
|
获取未读的信息
|
|
|
|
Args:
|
|
credential (Credential): 凭据类.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["session"]["unread"]
|
|
return await Api(**api, credential=credential).result
|
|
|
|
|
|
async def get_system_messages(credential: Credential) -> dict:
|
|
"""
|
|
获取系统信息
|
|
|
|
Args:
|
|
credential (Credential): 凭据类.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["session"]["system_msg"]
|
|
return await Api(**api, credential=credential).result
|
|
|
|
|
|
async def get_session_settings(credential: Credential) -> dict:
|
|
"""
|
|
获取消息设置
|
|
|
|
Args:
|
|
credential (Credential): 凭据类.
|
|
|
|
Returns:
|
|
dict: 调用 API 返回的结果
|
|
"""
|
|
api = API["session"]["session_settings"]
|
|
return await Api(**api, credential=credential).result
|
|
|
|
|
|
class EventType(Enum):
|
|
"""
|
|
事件类型
|
|
|
|
- TEXT: 纯文字消息
|
|
- PICTURE: 图片消息
|
|
- WITHDRAW: 撤回消息
|
|
- GROUPS_PICTURE: 应援团图片,但似乎不常触发,一般使用 PICTURE 即可
|
|
- SHARE_VIDEO: 分享视频
|
|
- NOTICE: 系统通知
|
|
- PUSHED_VIDEO: UP主推送的视频
|
|
- WELCOME: 新成员加入应援团欢迎
|
|
"""
|
|
|
|
TEXT = 1
|
|
PICTURE = 2
|
|
WITHDRAW = 5
|
|
GROUPS_PICTURE = 6
|
|
SHARE_VIDEO = 7
|
|
NOTICE = 10
|
|
PUSHED_VIDEO = 11
|
|
WELCOME = 306
|
|
|
|
|
|
class Event:
|
|
"""
|
|
事件参数:
|
|
+ receiver_id: 收信人 UID
|
|
+ receiver_type: 收信人类型,1: 私聊, 2: 应援团通知, 3: 应援团
|
|
+ sender_uid: 发送人 UID
|
|
+ talker_id: 对话人 UID
|
|
+ msg_seqno: 事件 Seqno
|
|
+ msg_type: 事件类型
|
|
+ msg_key: 事件唯一编号
|
|
+ timestamp: 事件时间戳
|
|
+ content: 事件内容
|
|
"""
|
|
|
|
receiver_id: int
|
|
receiver_type: int
|
|
sender_uid: int
|
|
talker_id: int
|
|
msg_seqno: int
|
|
msg_type: int
|
|
msg_key: int
|
|
timestamp: int
|
|
content: Union[str, int, Picture, Video]
|
|
|
|
def __init__(self, data: dict, self_uid: int):
|
|
"""
|
|
信息事件类型
|
|
|
|
Args:
|
|
data: 接收到的事件详细信息
|
|
|
|
self_uid: 用户自身 UID
|
|
"""
|
|
self.__dict__.update(data)
|
|
self.uid = self_uid
|
|
|
|
try:
|
|
self.__content()
|
|
except AttributeError:
|
|
logging.error(f"解析消息错误:{data}")
|
|
|
|
def __str__(self):
|
|
if self.receiver_type == 1:
|
|
if self.receiver_id == self.uid:
|
|
msg_type = "收到"
|
|
user_id = self.sender_uid
|
|
elif self.sender_uid == self.uid:
|
|
msg_type = "发送"
|
|
user_id = self.receiver_id
|
|
|
|
elif self.receiver_type == 2:
|
|
user_id = self.receiver_id
|
|
if self.sender_uid == self.uid:
|
|
msg_type = "发送应援团"
|
|
elif self.sender_uid == 0:
|
|
msg_type = "系统提示"
|
|
else:
|
|
msg_type = "收到应援团"
|
|
|
|
return f"{msg_type} {user_id} 信息 {self.content}({self.timestamp})"
|
|
|
|
def __content(self) -> None:
|
|
"""
|
|
更新消息内容
|
|
"""
|
|
content: dict = json.loads(self.content)
|
|
mt = self.msg_type
|
|
|
|
if mt == EventType.TEXT.value:
|
|
self.content = content.get("content")
|
|
|
|
elif mt == EventType.WELCOME.value:
|
|
self.content = content.get("content") + str(content.get("group_id"))
|
|
|
|
elif mt == EventType.WITHDRAW.value:
|
|
self.content = str(content)
|
|
|
|
elif mt == EventType.PICTURE.value or mt == EventType.GROUPS_PICTURE.value:
|
|
content.pop("original")
|
|
self.content = Picture(**content)
|
|
|
|
elif mt == EventType.SHARE_VIDEO.value or mt == EventType.PUSHED_VIDEO.value:
|
|
self.content = Video(bvid=content.get("bvid"), aid=content.get("id"))
|
|
|
|
elif mt == EventType.NOTICE.value:
|
|
self.content = content["title"] + " " + content["text"]
|
|
|
|
else:
|
|
logging.error(f"未知消息类型:{mt},消息内容:{content}")
|
|
|
|
|
|
async def send_msg(
|
|
credential: Credential,
|
|
receiver_id: int,
|
|
msg_type: EventType,
|
|
content: Union[str, Picture],
|
|
) -> dict:
|
|
"""
|
|
给用户发送私聊信息。目前仅支持纯文本。
|
|
|
|
Args:
|
|
credential (Credential) : 凭证
|
|
|
|
receiver_id (int) : 接收者 UID
|
|
|
|
msg_type (EventType) : 信息类型,参考 Event 类的事件类型。
|
|
|
|
content (str | Picture): 信息内容。支持文字和图片。
|
|
|
|
Returns:
|
|
dict: 调用 API 返回结果
|
|
"""
|
|
credential.raise_for_no_sessdata()
|
|
credential.raise_for_no_bili_jct()
|
|
|
|
api = API["operate"]["send_msg"]
|
|
self_info = await get_self_info(credential)
|
|
sender_uid = self_info["mid"]
|
|
|
|
if msg_type == EventType.TEXT:
|
|
real_content = json.dumps({"content": content})
|
|
elif msg_type == EventType.WITHDRAW:
|
|
real_content = str(content)
|
|
elif msg_type == EventType.PICTURE or msg_type == EventType.GROUPS_PICTURE:
|
|
raise_for_statement(isinstance(content, Picture), "TypeError")
|
|
await content.upload_file(credential=credential, data={"biz": "im"})
|
|
real_content = json.dumps(
|
|
{
|
|
"url": content.url,
|
|
"height": content.height,
|
|
"width": content.width,
|
|
"imageType": content.imageType,
|
|
"original": 1,
|
|
"size": content.size,
|
|
}
|
|
)
|
|
else:
|
|
raise ApiException("信息类型不支持。")
|
|
|
|
data = {
|
|
"msg[sender_uid]": sender_uid,
|
|
"msg[receiver_id]": receiver_id,
|
|
"msg[receiver_type]": 1,
|
|
"msg[msg_type]": msg_type.value,
|
|
"msg[msg_status]": 0,
|
|
"msg[content]": real_content,
|
|
"msg[dev_id]": "A6716E9A-7CE3-47AF-994B-F0B34178D28D",
|
|
"msg[new_face_version]": 0,
|
|
"msg[timestamp]": int(time.time()),
|
|
"from_filework": 0,
|
|
"build": 0,
|
|
"mobi_app": "web",
|
|
}
|
|
return await Api(**api, credential=credential).update_data(**data).result
|
|
|
|
|
|
class Session(AsyncEvent):
|
|
"""
|
|
会话类,用来开启消息监听。
|
|
"""
|
|
def __init__(self, credential: Credential, debug=False):
|
|
super().__init__()
|
|
|
|
self.__status = 0
|
|
|
|
|
|
self.maxTs = int(time.time() * 1000000)
|
|
|
|
|
|
self.maxSeqno = dict()
|
|
|
|
|
|
self.credential = credential
|
|
|
|
|
|
self.sched = AsyncIOScheduler(timezone="Asia/Shanghai")
|
|
|
|
|
|
self.events = dict()
|
|
|
|
|
|
self.logger = logging.getLogger("Session")
|
|
self.logger.setLevel(logging.DEBUG if debug else logging.INFO)
|
|
if not self.logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(
|
|
logging.Formatter(
|
|
"[%(asctime)s][%(levelname)s]: %(message)s", "%Y-%m-%d %H:%M:%S"
|
|
)
|
|
)
|
|
self.logger.addHandler(handler)
|
|
|
|
def on(self, event_type: EventType):
|
|
"""
|
|
重载装饰器注册事件监听器
|
|
|
|
Args:
|
|
event_type (EventType): 事件类型
|
|
"""
|
|
return super().on(event_name=str(event_type.value))
|
|
|
|
def get_status(self) -> int:
|
|
"""
|
|
获取连接状态
|
|
|
|
Returns:
|
|
int: 0 初始化,1 已连接,2 断开连接中,3 已断开,4 错误
|
|
"""
|
|
return self.__status
|
|
|
|
async def run(self, exclude_self: bool = True) -> None:
|
|
"""
|
|
非阻塞异步爬虫 定时发送请求获取消息
|
|
|
|
Args:
|
|
exclude_self: bool 是否排除自己发出的消息,默认排除
|
|
"""
|
|
|
|
|
|
self_info = await get_self_info(self.credential)
|
|
self.uid = self_info["mid"]
|
|
|
|
|
|
js = await get_sessions(self.credential)
|
|
self.maxSeqno = {
|
|
_session["talker_id"]: _session["max_seqno"]
|
|
for _session in js.get("session_list", [])
|
|
}
|
|
|
|
|
|
@self.sched.scheduled_job(
|
|
"interval",
|
|
id="query",
|
|
seconds=6,
|
|
max_instances=3,
|
|
next_run_time=datetime.datetime.now(),
|
|
)
|
|
async def qurey():
|
|
js: dict = await new_sessions(self.credential, self.maxTs)
|
|
if js.get("session_list") is None:
|
|
return
|
|
|
|
pending = set()
|
|
for session in js["session_list"]:
|
|
self.maxTs = max(self.maxTs, session["session_ts"])
|
|
pending.add(
|
|
asyncio.create_task(
|
|
fetch_session_msgs(
|
|
session["talker_id"],
|
|
self.credential,
|
|
session["session_type"],
|
|
self.maxSeqno.get(session["talker_id"]),
|
|
)
|
|
)
|
|
)
|
|
self.maxSeqno[session["talker_id"]] = session["max_seqno"]
|
|
|
|
while pending:
|
|
done, pending = await asyncio.wait(pending)
|
|
for done_task in done:
|
|
result: dict = await done_task
|
|
if result is None or result.get("messages") is None:
|
|
continue
|
|
for message in result.get("messages", [])[::-1]:
|
|
event = Event(message, self.uid)
|
|
if event.msg_type == EventType.WITHDRAW.value:
|
|
self.logger.info(
|
|
str(
|
|
self.events.get(
|
|
event.content, f"key={event.content}"
|
|
)
|
|
)
|
|
+ f" 被撤回({event.timestamp})"
|
|
)
|
|
else:
|
|
self.logger.info(event)
|
|
|
|
|
|
if event.sender_uid != self.uid or not exclude_self:
|
|
self.dispatch(str(event.msg_type), event)
|
|
|
|
self.events[str(event.msg_key)] = event
|
|
|
|
self.logger.debug(f"maxTs = {self.maxTs}")
|
|
|
|
self.__status = 1
|
|
self.sched.start()
|
|
self.logger.info("开始轮询")
|
|
|
|
async def start(self, exclude_self: bool = True) -> None:
|
|
"""
|
|
阻塞异步启动 通过调用 self.close() 后可断开连接
|
|
|
|
Args:
|
|
exclude_self: bool 是否排除自己发出的消息,默认排除
|
|
"""
|
|
|
|
await self.run(exclude_self)
|
|
while self.get_status() < 2:
|
|
await asyncio.sleep(1)
|
|
|
|
if self.get_status() == 2:
|
|
self.__status = 3
|
|
|
|
async def reply(self, event: Event, content: Union[str, Picture]) -> dict:
|
|
"""
|
|
快速回复消息
|
|
|
|
Args:
|
|
event : Event 要回复的消息
|
|
|
|
content: str | Picture 要回复的文字内容
|
|
|
|
Returns:
|
|
dict: 调用接口返回的内容。
|
|
"""
|
|
|
|
if self.uid == event.sender_uid:
|
|
self.logger.error("不能给自己发送消息哦~")
|
|
else:
|
|
msg_type = (
|
|
EventType.PICTURE if isinstance(content, Picture) else EventType.TEXT
|
|
)
|
|
return await send_msg(self.credential, event.sender_uid, msg_type, content)
|
|
|
|
def close(self) -> None:
|
|
"""结束轮询"""
|
|
|
|
self.sched.remove_job("query")
|
|
self.__status = 2
|
|
self.logger.info("结束轮询")
|
|
|