import pika import os # os.environ['CUDA_VISIBLE_DEVICES'] = '-1' import json import time # from get_config import config_params from config import get_config from function import topic_clustering_not_summary as tc from function import topic_clustering_social import requests config_params = get_config() ConfigManager = config_params['ConfigManager'] last_time_check = time.time() def update_result(result, type='daily', meta = {}): benchmark_children_id = -1 benchmark_id = -1 source_tagids = [] for id_cluster in result: for doc in result[id_cluster][:1]: source_tagids = doc.get('source_tagids',[]) for key in doc: if "benchmark_child" in key: benchmark_children_id = int(key.lstrip('benchmark_child_')) if "benchmark" in key and 'child' not in key: benchmark_id = int(key.lstrip('benchmark_')) break if not source_tagids: source_tagids = [] if len(source_tagids) > 0: benchmark_id = 0 benchmark_children_id = 0 output = { "benchmark_id": benchmark_id, "benchmark_children_id": benchmark_children_id, "source_tagids": source_tagids, "country_code": meta.get('country_code',''), "type": type, "data": json.dumps(result) } # with open('test_result.json','w') as f: # json.dump(output, f, ensure_ascii=False) # url = config_params['api_save_clustering'] url = ConfigManager['ApiConnects']['api_save_clustering']['BaseUrl'] # with open("/home/vietle/topic-clustering/config/save.json", 'w') as f: # json.dump(output, f,ensure_ascii=False) res = requests.post(url, json = output) print(res.text) print('Update result !!!!!!!!!') def callback_func(ch, method, properties, body): print("receive done: ") starttime = time.time() body = json.loads(body.decode("utf-8")) # with open('input_daily.json','w') as f: # json.dump(body, f, ensure_ascii=False) docs = body['docs'] # threshold = body['threshold'] threshold = 0.25 top_cluster = body['top_cluster'] top_sentence = body['top_sentence'] topn_summary = body['topn_summary'] hash_str = body['hash_str'] st_time = body['st_time'] meta = body.get('meta',{}) country_code = meta.get("country_code", "") delete_message = False if country_code in ["ICOMM-RND","SOCIAL"] else True print("country_code: ", country_code, "meta: ", meta) is_cache = False try: with open("log_run/log.txt") as f: data_dict = json.load(f) except Exception as ve: print(ve) data_dict = {} try: if hash_str in data_dict: path_res = data_dict[hash_str]["response_path"] with open(path_res) as ff: results = json.load(ff) print("time analysis (cache): ", time.time() - st_time) update_result(results,meta=meta) is_cache = True except Exception as vee: print(vee) if not is_cache: if country_code in ["SOCIAL"]: results = topic_clustering_social.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence, topn_summary=topn_summary, delete_message=delete_message) else: results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence, topn_summary=topn_summary, delete_message=delete_message) update_result(results, meta=meta) path_res = "log/result_{0}.txt".format(hash_str) with open(path_res, "w+") as ff: ff.write(json.dumps(results)) data_dict[hash_str] = {"time": st_time, "response_path": path_res} lst_rm = [] global last_time_check if time.time() - last_time_check > 3600: print("check log to del .....") last_time_check = time.time() for dt in data_dict: if time.time() - data_dict[dt]["time"] > 30 * 24 * 3600: lst_rm.append(dt) for dt in lst_rm: del data_dict[dt] with open("log_run/log.txt", "w+") as ff: ff.write(json.dumps(data_dict)) print("time analysis: ", time.time() - starttime) ch.basic_ack(delivery_tag=method.delivery_tag) def test(): with open('req_daily/aus.json') as f: body = json.load(f) docs = body['response']['docs'] # threshold = body['threshold'] threshold = 0.25 top_cluster = body['top_cluster'] top_sentence = body['top_sentence'] topn_summary = body['topn_summary'] # hash_str = body['hash_str'] # st_time = body['st_time'] meta = body['response'].get('meta',{}) results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence, topn_summary=topn_summary, delete_message=True) print(results) # update_result(results, meta=meta) # print(123) if __name__ == '__main__': # test() params = ConfigManager['QueueConfigs']['queue_topic_clustering'] usr_name = params["UserName"] password = str(params["Password"]) host = params["HostName"] virtual_host = params["VirtualHost"] queue_name = params["Queue"] # params = config_params['queue_topic_clustering'] # usr_name = params["usr_name"] # password = str(params["password"]) # host = params["host"] # virtual_host = params["virtual_host"] # queue_name = params["queue_name"] while True: try: credentials = pika.PlainCredentials(usr_name, password) connection = pika.BlockingConnection( pika.ConnectionParameters(host=host, virtual_host=virtual_host, credentials=credentials, heartbeat=3600, blocked_connection_timeout=3600)) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True, arguments={"x-max-priority": 10}) print(" * wait message") channel.basic_qos(prefetch_count=1) channel.basic_consume(queue=queue_name, on_message_callback=callback_func) channel.start_consuming() except Exception as ex: print(f'[ERROR] ', ex) # raise ex