from __future__ import annotations import base64 import json import logging import os import uuid from io import BytesIO import requests from PIL import Image from ..index_func import * from ..presets import * from ..utils import * from .base_model import BaseLLMModel class XMChat(BaseLLMModel): def __init__(self, api_key, user_name=""): super().__init__(model_name="xmchat", user=user_name) self.api_key = api_key self.session_id = None self.reset() self.image_bytes = None self.image_path = None self.xm_history = [] self.url = "https://xmbot.net/web" self.last_conv_id = None def reset(self, remain_system_prompt=False): self.session_id = str(uuid.uuid4()) self.last_conv_id = None return super().reset() def image_to_base64(self, image_path): # 打开并加载图片 img = Image.open(image_path) # 获取图片的宽度和高度 width, height = img.size # 计算压缩比例,以确保最长边小于4096像素 max_dimension = 2048 scale_ratio = min(max_dimension / width, max_dimension / height) if scale_ratio < 1: # 按压缩比例调整图片大小 new_width = int(width * scale_ratio) new_height = int(height * scale_ratio) img = img.resize((new_width, new_height), Image.LANCZOS) # 将图片转换为jpg格式的二进制数据 buffer = BytesIO() if img.mode == "RGBA": img = img.convert("RGB") img.save(buffer, format='JPEG') binary_image = buffer.getvalue() # 对二进制数据进行Base64编码 base64_image = base64.b64encode(binary_image).decode('utf-8') return base64_image def try_read_image(self, filepath): def is_image_file(filepath): # 判断文件是否为图片 valid_image_extensions = [ ".jpg", ".jpeg", ".png", ".bmp", ".gif", ".tiff"] file_extension = os.path.splitext(filepath)[1].lower() return file_extension in valid_image_extensions if is_image_file(filepath): logging.info(f"读取图片文件: {filepath}") self.image_bytes = self.image_to_base64(filepath) self.image_path = filepath else: self.image_bytes = None self.image_path = None def like(self): if self.last_conv_id is None: return "点赞失败,你还没发送过消息" data = { "uuid": self.last_conv_id, "appraise": "good" } requests.post(self.url, json=data) return "👍点赞成功,感谢反馈~" def dislike(self): if self.last_conv_id is None: return "点踩失败,你还没发送过消息" data = { "uuid": self.last_conv_id, "appraise": "bad" } requests.post(self.url, json=data) return "👎点踩成功,感谢反馈~" def prepare_inputs(self, real_inputs, use_websearch, files, reply_language, chatbot): fake_inputs = real_inputs display_append = "" limited_context = False return limited_context, fake_inputs, display_append, real_inputs, chatbot def handle_file_upload(self, files, chatbot, language): """if the model accepts multi modal input, implement this function""" if files: for file in files: if file.name: logging.info(f"尝试读取图像: {file.name}") self.try_read_image(file.name) if self.image_path is not None: chatbot = chatbot + [((self.image_path,), None)] if self.image_bytes is not None: logging.info("使用图片作为输入") # XMChat的一轮对话中实际上只能处理一张图片 self.reset() conv_id = str(uuid.uuid4()) data = { "user_id": self.api_key, "session_id": self.session_id, "uuid": conv_id, "data_type": "imgbase64", "data": self.image_bytes } response = requests.post(self.url, json=data) response = json.loads(response.text) logging.info(f"图片回复: {response['data']}") return None, chatbot, None def get_answer_at_once(self): question = self.history[-1]["content"] conv_id = str(uuid.uuid4()) self.last_conv_id = conv_id data = { "user_id": self.api_key, "session_id": self.session_id, "uuid": conv_id, "data_type": "text", "data": question } response = requests.post(self.url, json=data) try: response = json.loads(response.text) return response["data"], len(response["data"]) except Exception as e: return response.text, len(response.text)