TheEeeeLin's picture
update files
d5d20be verified
raw
history blame
16.2 kB
"""
焕影小程序功能服务端的基本工具函数,以类的形式封装
"""
try: # 加上这个try的原因在于本地环境和云函数端的import形式有所不同
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
except ImportError:
try:
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
except ImportError:
raise ImportError("请下载腾讯云COS相关代码包:pip install cos-python-sdk-v5")
import requests
import datetime
import json
from .error import ProcessError
import os
local_path_ = os.path.dirname(__file__)
class GetConfig(object):
@staticmethod
def hy_sdk_client(Id:str, Key:str):
# 从cos中寻找文件
REGION: str = 'ap-beijing'
TOKEN = None
SCHEME: str = 'https'
BUCKET: str = 'hy-sdk-config-1305323352'
client_config = CosConfig(Region=REGION,
SecretId=Id,
SecretKey=Key,
Token=TOKEN,
Scheme=SCHEME)
return CosS3Client(client_config), BUCKET
def load_json(self, path:str, default_download=False):
try:
if os.path.isdir(path):
raise ProcessError("请输入具体的配置文件路径,而非文件夹!")
if default_download is True:
print(f"\033[34m 默认强制重新下载配置文件...\033[0m")
raise FileNotFoundError
with open(path) as f:
config = json.load(f)
return config
except FileNotFoundError:
dir_name = os.path.dirname(path)
try:
os.makedirs(dir_name)
except FileExistsError:
pass
base_name = os.path.basename(path)
print(f"\033[34m 正在从COS中下载配置文件...\033[0m")
print(f"\033[31m 请注意,接下来会在{dir_name}路径下生成文件{base_name}...\033[0m")
Id = input("请输入SecretId:")
Key = input("请输入SecretKey:")
client, bucket = self.hy_sdk_client(Id, Key)
data_bytes = client.get_object(Bucket=bucket,Key=base_name)["Body"].get_raw_stream().read()
data = json.loads(data_bytes.decode("utf-8"))
# data["SecretId"] = Id # 未来可以把这个加上
# data["SecretKey"] = Key
with open(path, "w") as f:
data_str = json.dumps(data, ensure_ascii=False)
# 如果 ensure_ascii 是 true (即默认值),输出保证将所有输入的非 ASCII 字符转义。
# 如果 ensure_ascii 是 false,这些字符会原样输出。
f.write(data_str)
f.close()
print(f"\033[32m 配置文件保存成功\033[0m")
return data
except json.decoder.JSONDecodeError:
print(f"\033[31m WARNING: 配置文件为空!\033[0m")
return {}
def load_file(self, cloud_path:str, local_path:str):
"""
从COS中下载文件到本地,本函数将会被默认执行的,在使用的时候建议加一些限制.
:param cloud_path: 云端的文件路径
:param local_path: 将云端文件保存在本地的路径
"""
if os.path.isdir(cloud_path):
raise ProcessError("请输入具体的云端文件路径,而非文件夹!")
if os.path.isdir(local_path):
raise ProcessError("请输入具体的本地文件路径,而非文件夹!")
dir_name = os.path.dirname(local_path)
base_name = os.path.basename(local_path)
try:
os.makedirs(dir_name)
except FileExistsError:
pass
cloud_name = os.path.basename(cloud_path)
print(f"\033[31m 请注意,接下来会在{dir_name}路径下生成文件{base_name}\033[0m")
Id = input("请输入SecretId:")
Key = input("请输入SecretKey:")
client, bucket = self.hy_sdk_client(Id, Key)
print(f"\033[34m 正在从COS中下载文件: {cloud_name}, 此过程可能耗费一些时间...\033[0m")
data_bytes = client.get_object(Bucket=bucket,Key=cloud_path)["Body"].get_raw_stream().read()
# data["SecretId"] = Id # 未来可以把这个加上
# data["SecretKey"] = Key
with open(local_path, "wb") as f:
# 如果 ensure_ascii 是 true (即默认值),输出保证将所有输入的非 ASCII 字符转义。
# 如果 ensure_ascii 是 false,这些字符会原样输出。
f.write(data_bytes)
f.close()
print(f"\033[32m 文件保存成功\033[0m")
class CosConf(GetConfig):
"""
从安全的角度出发,将一些默认配置文件上传至COS中,接下来使用COS和它的子类的时候,在第一次使用时需要输入Cuny给的id和key
用于连接cos存储桶,下载配置文件.
当然,在service_default_download = False的时候,如果在运行路径下已经有conf/service_config.json文件了,
那么就不用再次下载了,也不用输入id和key
事实上这只需要运行一次,因为配置文件将会被下载至源码文件夹中
如果要自定义路径,请在继承的子类中编写__init__函数,将service_path定向到指定路径
"""
def __init__(self) -> None:
# 下面这些参数是类的共享参数
self.__SECRET_ID: str = None # 服务的id
self.__SECRET_KEY: str = None # 服务的key
self.__REGION: str = None # 服务的存储桶地区
self.__TOKEN: str = None # 服务的token,目前一直是None
self.__SCHEME: str = None # 服务的访问协议,默认实际上是https
self.__BUCKET: str = None # 服务的存储桶
self.__SERVICE_CONFIG: dict = None # 服务的配置文件
self.service_path: str = f"{local_path_}/conf/service_config.json"
# 配置文件路径,默认是函数运行的路径下的conf文件夹
self.service_default_download = False # 是否在每次访问配置的时候都重新下载文件
@property
def service_config(self):
if self.__SERVICE_CONFIG is None or self.service_default_download is True:
self.__SERVICE_CONFIG = self.load_json(self.service_path, self.service_default_download)
return self.__SERVICE_CONFIG
@property
def client(self):
client_config = CosConfig(Region=self.region,
SecretId=self.secret_id,
SecretKey=self.secret_key,
Token=self.token,
Scheme=self.scheme)
return CosS3Client(client_config)
def get_key(self, key:str):
try:
data = self.service_config[key]
if data == "None":
return None
else:
return data
except KeyError:
print(f"\033[31m没有对应键值{key},默认返回None\033[0m")
return None
@property
def secret_id(self):
if self.__SECRET_ID is None:
self.__SECRET_ID = self.get_key("SECRET_ID")
return self.__SECRET_ID
@secret_id.setter
def secret_id(self, value:str):
self.__SECRET_ID = value
@property
def secret_key(self):
if self.__SECRET_KEY is None:
self.__SECRET_KEY = self.get_key("SECRET_KEY")
return self.__SECRET_KEY
@secret_key.setter
def secret_key(self, value:str):
self.__SECRET_KEY = value
@property
def region(self):
if self.__REGION is None:
self.__REGION = self.get_key("REGION")
return self.__REGION
@region.setter
def region(self, value:str):
self.__REGION = value
@property
def token(self):
# if self.__TOKEN is None:
# self.__TOKEN = self.get_key("TOKEN")
# 这里可以注释掉
return self.__TOKEN
@token.setter
def token(self, value:str):
self.__TOKEN= value
@property
def scheme(self):
if self.__SCHEME is None:
self.__SCHEME = self.get_key("SCHEME")
return self.__SCHEME
@scheme.setter
def scheme(self, value:str):
self.__SCHEME = value
@property
def bucket(self):
if self.__BUCKET is None:
self.__BUCKET = self.get_key("BUCKET")
return self.__BUCKET
@bucket.setter
def bucket(self, value):
self.__BUCKET = value
def downloadFile_COS(self, key, bucket:str=None, if_read:bool=False):
"""
从COS下载对象(二进制数据), 如果下载失败就返回None
"""
CosBucket = self.bucket if bucket is None else bucket
try:
# 将本类的Debug继承给抛弃了
# self.debug_print(f"Download from {CosBucket}", font_color="blue")
obj = self.client.get_object(
Bucket=CosBucket,
Key=key
)
if if_read is True:
data = obj["Body"].get_raw_stream().read() # byte
return data
else:
return obj
except Exception as e:
print(f"\033[31m下载失败! 错误描述:{e}\033[0m")
return None
def showFileList_COS_base(self, key, bucket, marker:str=""):
"""
返回cos存储桶内部的某个文件夹的内部名称
:param key: cos云端的存储路径
:param bucket: cos存储桶名称,如果没指定名称(None)就会寻找默认的存储桶
:param marker: 标记,用于记录上次查询到哪里了
ps:如果需要修改默认的存储桶配置,请在代码运行的时候加入代码 s.bucket = 存储桶名称 (s是对象实例)
返回的内容存储在response["Content"],不过返回的数据大小是有限制的,具体内容还是请看官方文档。
"""
response = self.client.list_objects(
Bucket=bucket,
Prefix=key,
Marker=marker
)
return response
def showFileList_COS(self, key, bucket:str=None)->list:
"""
实现查询存储桶中所有对象的操作,因为cos的sdk有返回数据包大小的限制,所以我们需要进行一定的改动
"""
marker = ""
file_list = []
CosBucket = self.bucket if bucket is None else bucket
while True: # 轮询
response = self.showFileList_COS_base(key, CosBucket, marker)
try:
file_list.extend(response["Contents"])
except KeyError as e:
print(e)
raise
if response['IsTruncated'] == 'false': # 接下来没有数据了,就退出
break
marker = response['NextMarker']
return file_list
def uploadFile_COS(self, buffer, key, bucket:str=None):
"""
从COS上传数据,需要注意的是必须得是二进制文件
"""
CosBucket = self.bucket if bucket is None else bucket
try:
self.client.put_object(
Bucket=CosBucket,
Body=buffer,
Key=key
)
return True
except Exception as e:
print(e)
return False
class FuncDiary(CosConf):
filter_dict = {"60a5e13da00e6e0001fd53c8": "Cuny",
"612c290f3a9af4000170faad": "守望平凡",
"614de96e1259260001506d6c": "林泽毅-焕影一新"}
def __init__(self, func_name: str, uid: str, error_conf_path: str = f"{local_path_}/conf/func_error_conf.json"):
"""
日志类的实例化
Args:
func_name: 功能名称,影响了日志投递的路径
"""
super().__init__()
# 配置文件路径,默认是函数运行的路径下的conf文件夹
self.service_path: str = os.path.join(os.path.dirname(error_conf_path), "service_config.json")
self.error_dict = self.load_json(path=error_conf_path)
self.__up: str = f"wx/invokeFunction_c/{datetime.datetime.now().strftime('%Y/%m/%d/%H')}/{func_name}/"
self.func_name: str = func_name
# 下面这个属性是的日志名称的前缀
self.__start_time = datetime.datetime.now().timestamp()
h_point = datetime.datetime.strptime(datetime.datetime.now().strftime('%Y/%m/%d/%H'), '%Y/%m/%d/%H')
h_point_timestamp = h_point.timestamp()
self.__prefix = int(self.__start_time - h_point_timestamp).__str__() + "_"
self.__uid = uid
self.__diary = None
def __str__(self):
return f"<{self.func_name}> DIARY for {self.__uid}"
@property
def content(self):
return self.__diary
@content.setter
def content(self, value: str):
if not isinstance(value, dict):
raise TypeError("content 只能是字典!")
if "status" in value:
raise KeyError("status字段已被默认占用,请在日志信息中更换字段名称!")
if self.__diary is None:
self.__diary = value
else:
raise PermissionError("为了减小日志对整体代码的影响,<content>只能被覆写一次!")
def uploadDiary_COS(self, status_id: str, suffix: str = "", bucket: str = "hy-hcy-data-logs-1306602019"):
if self.__diary is None:
self.__diary = {"status": self.error_dict[status_id]}
if status_id == "0000":
self.__up += f"True/{self.__uid}/"
else:
self.__up += f"False/{self.__uid}/"
interval = int(10 * (datetime.datetime.now().timestamp() - self.__start_time))
prefix = self.__prefix + status_id + "_" + interval.__str__()
self.__diary["status"] = self.error_dict[status_id]
name = prefix + "_" + suffix if len(suffix) != 0 else prefix
self.uploadFile_COS(buffer=json.dumps(self.__diary), key=self.__up + name, bucket=bucket)
print(f"{self}上传成功.")
class ResponseWebSocket(CosConf):
# 网关推送地址
__HOST:str = None
@property
def sendBackHost(self):
if self.__HOST is None:
self.__HOST = self.get_key("HOST")
return self.__HOST
@sendBackHost.setter
def sendBackHost(self, value):
self.__HOST = value
def sendMsg_toWebSocket(self, message,connectionID:str = None):
if connectionID is not None:
retmsg = {'websocket': {}}
retmsg['websocket']['action'] = "data send"
retmsg['websocket']['secConnectionID'] = connectionID
retmsg['websocket']['dataType'] = 'text'
retmsg['websocket']['data'] = json.dumps(message)
requests.post(self.sendBackHost, json=retmsg)
print("send success!")
else:
pass
@staticmethod
def create_Msg(status, msg):
"""
本方法用于创建一个用于发送到WebSocket客户端的数据
输入的信息部分,需要有如下几个参数:
1. id,固定为"return-result"
2. status,如果输入为1则status=true, 如果输入为-1则status=false
3. obj_key, 图片的云端路径, 这是输入的msg本身自带的
"""
msg['status'] = "false" if status == -1 else 'true' # 其实最好还是用bool
msg['id'] = "async-back-msg"
msg['type'] = "funcType"
msg["format"] = "imageType"
return msg
# 功能服务类
class Service(ResponseWebSocket):
"""
服务的主函数,封装了cos上传/下载功能以及与api网关的一键通讯
将类的实例变成一个可被调用的对象,在服务运行的时候,只需要运行该对象即可
当然,因为是类,所以支持继承和修改
"""
@classmethod
def process(cls, *args, **kwargs):
"""
处理函数,在使用的时候请将之重构
"""
pass
@classmethod
def __call__(cls, *args, **kwargs):
pass