""" bilibili_api.session 消息相关 """ import json import time import asyncio import logging import datetime from enum import Enum from typing import Union, Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from bilibili_api.exceptions import ApiException from .video import Video from .user import get_self_info from .utils.utils import get_api, raise_for_statement from .utils.picture import Picture from .utils.AsyncEvent import AsyncEvent from .utils.credential import Credential from .utils.network import Api API = get_api("session") async def fetch_session_msgs( talker_id: int, credential: Credential, session_type: int = 1, begin_seqno: int = 0 ) -> dict: """ 获取指定用户的近三十条消息 Args: talker_id (int) : 用户 UID credential (Credential): Credential session_type (int) : 会话类型 1 私聊 2 应援团 begin_seqno (int) : 起始 Seqno Returns: dict: 调用 API 返回结果 """ credential.raise_for_no_sessdata() params = { "talker_id": talker_id, "session_type": session_type, "begin_seqno": begin_seqno, } api = API["session"]["fetch"] return await Api(**api, credential=credential).update_params(**params).result async def new_sessions( credential: Credential, begin_ts: int = int(time.time() * 1000000) ) -> dict: """ 获取新消息 Args: credential (Credential): Credential begin_ts (int) : 起始时间戳 Returns: dict: 调用 API 返回结果 """ credential.raise_for_no_sessdata() params = {"begin_ts": begin_ts, "build": 0, "mobi_app": "web"} api = API["session"]["new"] return await Api(**api, credential=credential).update_params(**params).result async def get_sessions(credential: Credential, session_type: int = 4) -> dict: """ 获取已有消息 Args: credential (Credential): Credential session_type (int) : 会话类型 1: 私聊, 2: 通知, 3: 应援团, 4: 全部 Returns: dict: 调用 API 返回结果 """ credential.raise_for_no_sessdata() params = { "session_type": session_type, "group_fold": 1, "unfollow_fold": 0, "sort_rule": 2, "build": 0, "mobi_app": "web", } api = API["session"]["get"] return await Api(**api, credential=credential).update_params(**params).result async def get_session_detail( credential: Credential, talker_id: int, session_type: int = 1 ) -> dict: """ 获取会话详情 Args: credential (Credential): Credential session_type (int) : 会话类型 talker_id (int) : 会话对象的UID Returns: dict: 调用 API 返回结果 """ credential.raise_for_no_sessdata() params = {"talker_id": talker_id, "session_type": session_type} api = API["session"]["get_session_detail"] return await Api(**api, credential=credential).update_params(**params).result async def get_replies( credential: Credential, last_reply_id: Optional[int] = None, reply_time: Optional[int] = None, ) -> dict: """ 获取收到的回复 Args: credential (Credential): 凭据类. last_reply_id (Optional, int) 最后一个评论的 ID reply_time (Optional, int) 最后一个评论发送时间 Returns: dict: 调用 API 返回的结果 """ api = API["session"]["replies"] params = {"id": last_reply_id, "reply_time": reply_time} return await Api(**api, credential=credential).update_params(**params).result async def get_likes( credential: Credential, last_id: int = None, like_time: int = None ) -> dict: """ 获取收到的赞 Args: credential (Credential): 凭据类. last_id (Optional, int) 最后一个 ID like_time (Optional, int) 最后一个点赞发送时间 Returns: dict: 调用 API 返回的结果 """ api = API["session"]["likes"] params = {"id": last_id, "like_time": like_time} return await Api(**api, credential=credential).update_params(**params).result async def get_at( credential: Credential, last_uid: int = None, at_time: int = None ) -> dict: """ 获取收到的 AT Args: credential (Credential): 凭据类. last_id (Optional, int) 最后一个 ID at_time (Optional, int) 最后一个点赞发送时间 Returns: dict: 调用 API 返回的结果 """ api = API["session"]["at"] params = {"id": last_uid, "at_time": at_time} return await Api(**api, credential=credential).update_params(**params).result async def get_unread_messages(credential: Credential) -> dict: """ 获取未读的信息 Args: credential (Credential): 凭据类. Returns: dict: 调用 API 返回的结果 """ api = API["session"]["unread"] return await Api(**api, credential=credential).result async def get_system_messages(credential: Credential) -> dict: """ 获取系统信息 Args: credential (Credential): 凭据类. Returns: dict: 调用 API 返回的结果 """ api = API["session"]["system_msg"] return await Api(**api, credential=credential).result async def get_session_settings(credential: Credential) -> dict: """ 获取消息设置 Args: credential (Credential): 凭据类. Returns: dict: 调用 API 返回的结果 """ api = API["session"]["session_settings"] return await Api(**api, credential=credential).result class EventType(Enum): """ 事件类型 - TEXT: 纯文字消息 - PICTURE: 图片消息 - WITHDRAW: 撤回消息 - GROUPS_PICTURE: 应援团图片,但似乎不常触发,一般使用 PICTURE 即可 - SHARE_VIDEO: 分享视频 - NOTICE: 系统通知 - PUSHED_VIDEO: UP主推送的视频 - WELCOME: 新成员加入应援团欢迎 """ TEXT = 1 PICTURE = 2 WITHDRAW = 5 GROUPS_PICTURE = 6 SHARE_VIDEO = 7 NOTICE = 10 PUSHED_VIDEO = 11 WELCOME = 306 class Event: """ 事件参数: + receiver_id: 收信人 UID + receiver_type: 收信人类型,1: 私聊, 2: 应援团通知, 3: 应援团 + sender_uid: 发送人 UID + talker_id: 对话人 UID + msg_seqno: 事件 Seqno + msg_type: 事件类型 + msg_key: 事件唯一编号 + timestamp: 事件时间戳 + content: 事件内容 """ receiver_id: int receiver_type: int sender_uid: int talker_id: int msg_seqno: int msg_type: int msg_key: int timestamp: int content: Union[str, int, Picture, Video] def __init__(self, data: dict, self_uid: int): """ 信息事件类型 Args: data: 接收到的事件详细信息 self_uid: 用户自身 UID """ self.__dict__.update(data) self.uid = self_uid try: self.__content() except AttributeError: logging.error(f"解析消息错误:{data}") def __str__(self): if self.receiver_type == 1: if self.receiver_id == self.uid: msg_type = "收到" user_id = self.sender_uid elif self.sender_uid == self.uid: msg_type = "发送" user_id = self.receiver_id elif self.receiver_type == 2: user_id = self.receiver_id if self.sender_uid == self.uid: msg_type = "发送应援团" elif self.sender_uid == 0: msg_type = "系统提示" else: msg_type = "收到应援团" return f"{msg_type} {user_id} 信息 {self.content}({self.timestamp})" # type: ignore def __content(self) -> None: """ 更新消息内容 """ content: dict = json.loads(self.content) # type: ignore mt = self.msg_type if mt == EventType.TEXT.value: self.content = content.get("content") # type: ignore elif mt == EventType.WELCOME.value: self.content = content.get("content") + str(content.get("group_id")) # type: ignore elif mt == EventType.WITHDRAW.value: self.content = str(content) elif mt == EventType.PICTURE.value or mt == EventType.GROUPS_PICTURE.value: content.pop("original") self.content = Picture(**content) elif mt == EventType.SHARE_VIDEO.value or mt == EventType.PUSHED_VIDEO.value: self.content = Video(bvid=content.get("bvid"), aid=content.get("id")) elif mt == EventType.NOTICE.value: self.content = content["title"] + " " + content["text"] else: logging.error(f"未知消息类型:{mt},消息内容:{content}") async def send_msg( credential: Credential, receiver_id: int, msg_type: EventType, content: Union[str, Picture], ) -> dict: """ 给用户发送私聊信息。目前仅支持纯文本。 Args: credential (Credential) : 凭证 receiver_id (int) : 接收者 UID msg_type (EventType) : 信息类型,参考 Event 类的事件类型。 content (str | Picture): 信息内容。支持文字和图片。 Returns: dict: 调用 API 返回结果 """ credential.raise_for_no_sessdata() credential.raise_for_no_bili_jct() api = API["operate"]["send_msg"] self_info = await get_self_info(credential) sender_uid = self_info["mid"] if msg_type == EventType.TEXT: real_content = json.dumps({"content": content}) elif msg_type == EventType.WITHDRAW: real_content = str(content) elif msg_type == EventType.PICTURE or msg_type == EventType.GROUPS_PICTURE: raise_for_statement(isinstance(content, Picture), "TypeError") await content.upload_file(credential=credential, data={"biz": "im"}) real_content = json.dumps( { "url": content.url, "height": content.height, "width": content.width, "imageType": content.imageType, "original": 1, "size": content.size, } ) else: raise ApiException("信息类型不支持。") data = { "msg[sender_uid]": sender_uid, "msg[receiver_id]": receiver_id, "msg[receiver_type]": 1, "msg[msg_type]": msg_type.value, "msg[msg_status]": 0, "msg[content]": real_content, "msg[dev_id]": "A6716E9A-7CE3-47AF-994B-F0B34178D28D", "msg[new_face_version]": 0, "msg[timestamp]": int(time.time()), "from_filework": 0, "build": 0, "mobi_app": "web", } return await Api(**api, credential=credential).update_data(**data).result class Session(AsyncEvent): """ 会话类,用来开启消息监听。 """ def __init__(self, credential: Credential, debug=False): super().__init__() # 会话状态 self.__status = 0 # 已获取会话中最大的时间戳 默认当前时间 self.maxTs = int(time.time() * 1000000) # 会话UID为键 会话中最大Seqno为值 self.maxSeqno = dict() # 凭证 self.credential = credential # 异步定时任务框架 self.sched = AsyncIOScheduler(timezone="Asia/Shanghai") # 已接收的所有事件 用于撤回时找回 self.events = dict() # logging self.logger = logging.getLogger("Session") self.logger.setLevel(logging.DEBUG if debug else logging.INFO) if not self.logger.handlers: handler = logging.StreamHandler() handler.setFormatter( logging.Formatter( "[%(asctime)s][%(levelname)s]: %(message)s", "%Y-%m-%d %H:%M:%S" ) ) self.logger.addHandler(handler) def on(self, event_type: EventType): """ 重载装饰器注册事件监听器 Args: event_type (EventType): 事件类型 """ return super().on(event_name=str(event_type.value)) def get_status(self) -> int: """ 获取连接状态 Returns: int: 0 初始化,1 已连接,2 断开连接中,3 已断开,4 错误 """ return self.__status async def run(self, exclude_self: bool = True) -> None: """ 非阻塞异步爬虫 定时发送请求获取消息 Args: exclude_self: bool 是否排除自己发出的消息,默认排除 """ # 获取自身UID 用于后续判断消息是发送还是接收 self_info = await get_self_info(self.credential) self.uid = self_info["mid"] # 初始化 只接收开始运行后的新消息 js = await get_sessions(self.credential) self.maxSeqno = { _session["talker_id"]: _session["max_seqno"] for _session in js.get("session_list", []) } # 间隔 6 秒轮询消息列表 之前设置 3 秒询一次 跑了一小时给我账号冻结了 @self.sched.scheduled_job( "interval", id="query", seconds=6, max_instances=3, next_run_time=datetime.datetime.now(), ) async def qurey(): js: dict = await new_sessions(self.credential, self.maxTs) if js.get("session_list") is None: return pending = set() for session in js["session_list"]: self.maxTs = max(self.maxTs, session["session_ts"]) pending.add( asyncio.create_task( fetch_session_msgs( session["talker_id"], self.credential, session["session_type"], self.maxSeqno.get(session["talker_id"]), # type: ignore ) ) ) self.maxSeqno[session["talker_id"]] = session["max_seqno"] while pending: done, pending = await asyncio.wait(pending) for done_task in done: result: dict = await done_task if result is None or result.get("messages") is None: continue for message in result.get("messages", [])[::-1]: event = Event(message, self.uid) if event.msg_type == EventType.WITHDRAW.value: self.logger.info( str( self.events.get( event.content, f"key={event.content}" ) ) + f" 被撤回({event.timestamp})" ) else: self.logger.info(event) # 自己发出的消息不发布任务 if event.sender_uid != self.uid or not exclude_self: self.dispatch(str(event.msg_type), event) self.events[str(event.msg_key)] = event self.logger.debug(f"maxTs = {self.maxTs}") self.__status = 1 self.sched.start() self.logger.info("开始轮询") async def start(self, exclude_self: bool = True) -> None: """ 阻塞异步启动 通过调用 self.close() 后可断开连接 Args: exclude_self: bool 是否排除自己发出的消息,默认排除 """ await self.run(exclude_self) while self.get_status() < 2: await asyncio.sleep(1) if self.get_status() == 2: self.__status = 3 async def reply(self, event: Event, content: Union[str, Picture]) -> dict: # type: ignore """ 快速回复消息 Args: event : Event 要回复的消息 content: str | Picture 要回复的文字内容 Returns: dict: 调用接口返回的内容。 """ if self.uid == event.sender_uid: self.logger.error("不能给自己发送消息哦~") else: msg_type = ( EventType.PICTURE if isinstance(content, Picture) else EventType.TEXT ) return await send_msg(self.credential, event.sender_uid, msg_type, content) def close(self) -> None: """结束轮询""" self.sched.remove_job("query") self.__status = 2 self.logger.info("结束轮询")