File size: 6,612 Bytes
5120311 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
import pika
import os
# os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import json
import time
# from get_config import config_params
from config import get_config
from function import topic_clustering_not_summary as tc
from function import topic_clustering_social
import requests
config_params = get_config()
ConfigManager = config_params['ConfigManager']
last_time_check = time.time()
def update_result(result, type='daily', meta = {}):
benchmark_children_id = -1
benchmark_id = -1
source_tagids = []
for id_cluster in result:
for doc in result[id_cluster][:1]:
source_tagids = doc.get('source_tagids',[])
for key in doc:
if "benchmark_child" in key:
benchmark_children_id = int(key.lstrip('benchmark_child_'))
if "benchmark" in key and 'child' not in key:
benchmark_id = int(key.lstrip('benchmark_'))
break
if not source_tagids:
source_tagids = []
if len(source_tagids) > 0:
benchmark_id = 0
benchmark_children_id = 0
output = {
"benchmark_id": benchmark_id,
"benchmark_children_id": benchmark_children_id,
"source_tagids": source_tagids,
"country_code": meta.get('country_code',''),
"type": type,
"data": json.dumps(result)
}
# with open('test_result.json','w') as f:
# json.dump(output, f, ensure_ascii=False)
# url = config_params['api_save_clustering']
url = ConfigManager['ApiConnects']['api_save_clustering']['BaseUrl']
# with open("/home/vietle/topic-clustering/config/save.json", 'w') as f:
# json.dump(output, f,ensure_ascii=False)
res = requests.post(url, json = output)
print(res.text)
print('Update result !!!!!!!!!')
def callback_func(ch, method, properties, body):
print("receive done: ")
starttime = time.time()
body = json.loads(body.decode("utf-8"))
# with open('input_daily.json','w') as f:
# json.dump(body, f, ensure_ascii=False)
docs = body['docs']
# threshold = body['threshold']
threshold = 0.25
top_cluster = body['top_cluster']
top_sentence = body['top_sentence']
topn_summary = body['topn_summary']
hash_str = body['hash_str']
st_time = body['st_time']
meta = body.get('meta',{})
country_code = meta.get("country_code", "")
delete_message = False if country_code in ["ICOMM-RND","SOCIAL"] else True
print("country_code: ", country_code, "meta: ", meta)
is_cache = False
try:
with open("log_run/log.txt") as f:
data_dict = json.load(f)
except Exception as ve:
print(ve)
data_dict = {}
try:
if hash_str in data_dict:
path_res = data_dict[hash_str]["response_path"]
with open(path_res) as ff:
results = json.load(ff)
print("time analysis (cache): ", time.time() - st_time)
update_result(results,meta=meta)
is_cache = True
except Exception as vee:
print(vee)
if not is_cache:
if country_code in ["SOCIAL"]:
results = topic_clustering_social.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence,
topn_summary=topn_summary, delete_message=delete_message)
else:
results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence,
topn_summary=topn_summary, delete_message=delete_message)
update_result(results, meta=meta)
path_res = "log/result_{0}.txt".format(hash_str)
with open(path_res, "w+") as ff:
ff.write(json.dumps(results))
data_dict[hash_str] = {"time": st_time, "response_path": path_res}
lst_rm = []
global last_time_check
if time.time() - last_time_check > 3600:
print("check log to del .....")
last_time_check = time.time()
for dt in data_dict:
if time.time() - data_dict[dt]["time"] > 30 * 24 * 3600:
lst_rm.append(dt)
for dt in lst_rm:
del data_dict[dt]
with open("log_run/log.txt", "w+") as ff:
ff.write(json.dumps(data_dict))
print("time analysis: ", time.time() - starttime)
ch.basic_ack(delivery_tag=method.delivery_tag)
def test():
with open('req_daily/aus.json') as f:
body = json.load(f)
docs = body['response']['docs']
# threshold = body['threshold']
threshold = 0.25
top_cluster = body['top_cluster']
top_sentence = body['top_sentence']
topn_summary = body['topn_summary']
# hash_str = body['hash_str']
# st_time = body['st_time']
meta = body['response'].get('meta',{})
results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence,
topn_summary=topn_summary, delete_message=True)
print(results)
# update_result(results, meta=meta)
# print(123)
if __name__ == '__main__':
# test()
params = ConfigManager['QueueConfigs']['queue_topic_clustering']
usr_name = params["UserName"]
password = str(params["Password"])
host = params["HostName"]
virtual_host = params["VirtualHost"]
queue_name = params["Queue"]
# params = config_params['queue_topic_clustering']
# usr_name = params["usr_name"]
# password = str(params["password"])
# host = params["host"]
# virtual_host = params["virtual_host"]
# queue_name = params["queue_name"]
while True:
try:
credentials = pika.PlainCredentials(usr_name, password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host, virtual_host=virtual_host, credentials=credentials, heartbeat=3600, blocked_connection_timeout=3600))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True, arguments={"x-max-priority": 10})
print(" * wait message")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback_func)
channel.start_consuming()
except Exception as ex:
print(f'[ERROR] ', ex)
# raise ex
|