import pika
import os
# os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import json
import time
# from get_config import config_params
from config import get_config
from function import topic_clustering_not_summary as tc
from function import topic_clustering_social
import requests
config_params = get_config()
ConfigManager = config_params['ConfigManager']
last_time_check = time.time()
def update_result(result, type='daily', meta = {}):
benchmark_children_id = -1
benchmark_id = -1
source_tagids = []
for id_cluster in result:
for doc in result[id_cluster][:1]:
source_tagids = doc.get('source_tagids',[])
for key in doc:
if "benchmark_child" in key:
benchmark_children_id = int(key.lstrip('benchmark_child_'))
if "benchmark" in key and 'child' not in key:
benchmark_id = int(key.lstrip('benchmark_'))
if not source_tagids:
source_tagids = []
if len(source_tagids) > 0:
benchmark_id = 0
benchmark_children_id = 0
output = {
"benchmark_id": benchmark_id,
"benchmark_children_id": benchmark_children_id,
"source_tagids": source_tagids,
"country_code": meta.get('country_code',''),
"type": type,
"data": json.dumps(result)
# with open('test_result.json','w') as f:
# json.dump(output, f, ensure_ascii=False)
# url = config_params['api_save_clustering']
url = ConfigManager['ApiConnects']['api_save_clustering']['BaseUrl']
# with open("/home/vietle/topic-clustering/config/save.json", 'w') as f:
# json.dump(output, f,ensure_ascii=False)
res =, json = output)
print('Update result !!!!!!!!!')
def callback_func(ch, method, properties, body):
print("receive done: ")
starttime = time.time()
body = json.loads(body.decode("utf-8"))
# with open('input_daily.json','w') as f:
# json.dump(body, f, ensure_ascii=False)
docs = body['docs']
# threshold = body['threshold']
threshold = 0.25
top_cluster = body['top_cluster']
top_sentence = body['top_sentence']
topn_summary = body['topn_summary']
hash_str = body['hash_str']
st_time = body['st_time']
meta = body.get('meta',{})
country_code = meta.get("country_code", "")
delete_message = False if country_code in ["ICOMM-RND","SOCIAL"] else True
print("country_code: ", country_code, "meta: ", meta)
is_cache = False
with open("log_run/log.txt") as f:
data_dict = json.load(f)
except Exception as ve:
data_dict = {}
if hash_str in data_dict:
path_res = data_dict[hash_str]["response_path"]
with open(path_res) as ff:
results = json.load(ff)
print("time analysis (cache): ", time.time() - st_time)
is_cache = True
except Exception as vee:
if not is_cache:
if country_code in ["SOCIAL"]:
results = topic_clustering_social.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence,
topn_summary=topn_summary, delete_message=delete_message)
results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence,
topn_summary=topn_summary, delete_message=delete_message)
update_result(results, meta=meta)
path_res = "log/result_{0}.txt".format(hash_str)
with open(path_res, "w+") as ff:
data_dict[hash_str] = {"time": st_time, "response_path": path_res}
lst_rm = []
global last_time_check
if time.time() - last_time_check > 3600:
print("check log to del .....")
last_time_check = time.time()
for dt in data_dict:
if time.time() - data_dict[dt]["time"] > 30 * 24 * 3600:
for dt in lst_rm:
del data_dict[dt]
with open("log_run/log.txt", "w+") as ff:
print("time analysis: ", time.time() - starttime)
def test():
with open('req_daily/aus.json') as f:
body = json.load(f)
docs = body['response']['docs']
# threshold = body['threshold']
threshold = 0.25
top_cluster = body['top_cluster']
top_sentence = body['top_sentence']
topn_summary = body['topn_summary']
# hash_str = body['hash_str']
# st_time = body['st_time']
meta = body['response'].get('meta',{})
results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence,
topn_summary=topn_summary, delete_message=True)
# update_result(results, meta=meta)
# print(123)
if __name__ == '__main__':
# test()
params = ConfigManager['QueueConfigs']['queue_topic_clustering']
usr_name = params["UserName"]
password = str(params["Password"])
host = params["HostName"]
virtual_host = params["VirtualHost"]
queue_name = params["Queue"]
# params = config_params['queue_topic_clustering']
# usr_name = params["usr_name"]
# password = str(params["password"])
# host = params["host"]
# virtual_host = params["virtual_host"]
# queue_name = params["queue_name"]
while True:
credentials = pika.PlainCredentials(usr_name, password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host, virtual_host=virtual_host, credentials=credentials, heartbeat=3600, blocked_connection_timeout=3600))
channel =
channel.queue_declare(queue=queue_name, durable=True, arguments={"x-max-priority": 10})
print(" * wait message")
channel.basic_consume(queue=queue_name, on_message_callback=callback_func)
except Exception as ex:
print(f'[ERROR] ', ex)
# raise ex