cuongnguyen910's picture
Upload folder using huggingface_hub
5120311 verified
raw
history blame
1.78 kB
import time
import json
import hashlib
from pydantic import BaseModel
class InputHotTopic(BaseModel):
start_time: str = ""
end_time: str = ""
query: str = ""
keywords: list = []
top_cluster: int = 5
prompt: str = ""
check_relevent: str = ""
class SessionProcess(object):
def __init__(self):
self.session = dict()
def hash_session(self, query: InputHotTopic):
hash_dict = query.dict()
hash_dict['time'] = int(time.time())
return hashlib.sha224(json.dumps(hash_dict).encode("utf-8")).hexdigest()
def insert_session(self, data_input):
print('data_input: ', data_input)
# if self.mode == "command_center":
# hash_id = hash_session(data_input)
# else:
hash_id = self.hash_session(data_input)
if hash_id not in self.session:
self.session[hash_id] = {"status": 0, "created_time": time.time(), "update_time": time.time(),
"result": {}, "data": data_input}
return hash_id
def get_info_session(self, hash_id: str):
if hash_id in self.session:
return self.session[hash_id]
return {"status": -2, "result": {}, "meta": {}}
def update_session(self, hash_id: str, result: dict, status: int):
if hash_id in self.session:
self.session[hash_id]["status"] = status
self.session[hash_id]["result"] = result
self.session[hash_id]["update_time"] = time.time()
return True
return False
def delete_session(self, hash_id: str):
if hash_id in self.session:
del self.session[hash_id]
return True
return False