import time import json import hashlib from pydantic import BaseModel class InputHotTopic(BaseModel): start_time: str = "" end_time: str = "" query: str = "" keywords: list = [] top_cluster: int = 5 prompt: str = "" check_relevent: str = "" class SessionProcess(object): def __init__(self): self.session = dict() def hash_session(self, query: InputHotTopic): hash_dict = query.dict() hash_dict['time'] = int(time.time()) return hashlib.sha224(json.dumps(hash_dict).encode("utf-8")).hexdigest() def insert_session(self, data_input): print('data_input: ', data_input) # if self.mode == "command_center": # hash_id = hash_session(data_input) # else: hash_id = self.hash_session(data_input) if hash_id not in self.session: self.session[hash_id] = {"status": 0, "created_time": time.time(), "update_time": time.time(), "result": {}, "data": data_input} return hash_id def get_info_session(self, hash_id: str): if hash_id in self.session: return self.session[hash_id] return {"status": -2, "result": {}, "meta": {}} def update_session(self, hash_id: str, result: dict, status: int): if hash_id in self.session: self.session[hash_id]["status"] = status self.session[hash_id]["result"] = result self.session[hash_id]["update_time"] = time.time() return True return False def delete_session(self, hash_id: str): if hash_id in self.session: del self.session[hash_id] return True return False