# -*- coding: utf-8 -*- import os import re import numpy as np import pandas as pd from tqdm import tqdm import matplotlib.pyplot as plt import warnings import nltk import random, time import datetime # nltk.download("stopwords") from nltk.corpus import stopwords import torch import torch.nn as nn from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler,random_split from sklearn.metrics import classification_report import transformers from transformers import BartForSequenceClassification, AdamW, BartTokenizer, get_linear_schedule_with_warmup, pipeline, set_seed from transformers import pipeline, set_seed, BartTokenizer from datasets import load_dataset, load_metric from dotenv import load_dotenv from transformers import AutoModelForSeq2SeqLM, AutoTokenizer from nltk.tokenize import sent_tokenize from datasets import Dataset, load_metric import datasets import gradio as gr import pyperclip import openai from transformers import AutoModelForSeq2SeqLM, AutoTokenizer from transformers import TrainingArguments, Trainer # from vicuna_generate import * # from convert_article import * # Data preprocessing def text_preprocessing(s): """ - Lowercase the sentence - Change "'t" to "not" - Remove "@name" - Isolate and remove punctuations except "?" - Remove other special characters - Remove stop words except "not" and "can" - Remove trailing whitespace """ s = s.lower() # Change 't to 'not' s = re.sub(r"\'t", " not", s) # Remove @name s = re.sub(r'(@.*?)[\s]', ' ', s) # Isolate and remove punctuations except '?' s = re.sub(r'([\'\"\.\(\)\!\?\\\/\,])', r' \1 ', s) s = re.sub(r'[^\w\s\?]', ' ', s) # Remove some special characters s = re.sub(r'([\;\:\|•«\n])', ' ', s) # Remove stopwords except 'not' and 'can' s = " ".join([word for word in s.split() if word not in stopwords.words('english') or word in ['not', 'can']]) # Remove trailing whitespace s = re.sub(r'\s+', ' ', s).strip() return s def text_preprocessing(text): """ - Remove entity mentions (eg. '@united') - Correct errors (eg. '&' to '&') @param text (str): a string to be processed. @return text (Str): the processed string. """ # Remove '@name' text = re.sub(r'(@.*?)[\s]', ' ', text) # Replace '&' with '&' text = re.sub(r'&', '&', text) # Remove trailing whitespace text = re.sub(r'\s+', ' ', text).strip() return text # Total number of training steps is [number of batches] x [number of epochs]. # (Note that this is not the same as the number of training samples). # Create the learning rate scheduler. # Function to calculate the accuracy of our predictions vs labels def flat_accuracy(preds, labels): pred_flat = np.argmax(preds, axis=1).flatten() labels_flat = labels.flatten() return np.sum(pred_flat == labels_flat) / len(labels_flat) def format_time(elapsed): ''' Takes a time in seconds and returns a string hh:mm:ss ''' # Round to the nearest second. elapsed_rounded = int(round((elapsed))) # Format as hh:mm:ss return str(datetime.timedelta(seconds=elapsed_rounded)) def decode(paragraphs_needed): # model_ckpt = "facebook/bart-large-cnn" tokenizer = AutoTokenizer.from_pretrained("theQuert/NetKUp-tokenzier") # pipe = pipeline("summarization", model="bart-decoder",tokenizer=tokenizer) pipe = pipeline("summarization", model="hyesunyun/update-summarization-bart-large-longformer",tokenizer=tokenizer) contexts = [str(pipe(paragraph)) for paragraph in paragraphs_needed] return contexts def split_article(article, trigger): if article.split("\n"): article = article.replace("\n", "\\\\c\\\\c") paragraphs = article.replace("\\c\\c", "\c\c").split("\\\\c\\\\c") pars = [str(par) + " -- " + str(trigger) for par in paragraphs] # pd.DataFrame({"paragraph": pars}).to_csv("./util/experiments/input_paragraphs.csv") return pars def config(): load_dotenv() def call_gpt(paragraph, trigger): openai.api_key = os.environ.get("GPT_API") tokenizer = BartTokenizer.from_pretrained("theQuert/NetKUp-tokenzier") inputs_for_gpt = f""" As an article writer, your task is to provide an updated paragraph in the length same as non-updated paragraph based on the given non-updated paragraph and a triggered news. Non-updated paragraph: {paragraph} Triggered News: {trigger} """ # merged_with_prompts.append(merged.strip()) # pd.DataFrame({"paragraph": merged_with_prompts}).to_csv("./experiments/paragraphs_with_prompts.csv") completion = openai.ChatCompletion.create( model = "gpt-3.5-turbo", messages = [ {"role": "user", "content": inputs_for_gpt} ] ) response = completion.choices[0].message.content return str(response) def call_vicuna(paragraphs_tirgger): tokenizer = BartTokenizer.from_pretrained("theQuert/NetKUp-tokenzier") merged_with_prompts = [] for paragraph in paragraphs: merged = f""" As an article writer, your task is to provide an updated paragraph in the length same as non-updated paragraph based on the given non-updated paragraph and a triggered news. Non-updated paragraph: {paragraph} Triggered News: {trigger} """ merged_with_prompts.append(merged.strip()) pd.DataFrame({"paragraph": merged_with_prompts}).to_csv("./util/experiments/paragraphs_with_prompts.csv") responses = vicuna_output() return responses def main(input_article, input_trigger): # csv_path = "./util/experiments/input_paragraphs.csv" # if os.path.isfile(csv_path): # os.remove(csv_path) modified = "TRUE" # device = "cuda" if torch.cuda.is_available() else "cpu" device="cpu" # tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn', do_lower_case=True) tokenizer = AutoTokenizer.from_pretrained('theQuert/NetKUp-tokenzier') batch_size = 8 model = torch.load("./util/bart_model", map_location=torch.device("cpu")) optimizer = AdamW(model.parameters(), lr = 2e-5, eps = 1e-8 ) # split the input article to paragraphs in tmp csv format data_test = split_article(input_article, input_trigger) seed_val = 42 random.seed(seed_val) np.random.seed(seed_val) torch.manual_seed(seed_val) # torch.cuda.manual_seed_all(seed_val) input_ids = [] attention_masks = [] for sent in data_test: encoded_dict = tokenizer.encode_plus( text_preprocessing(sent), add_special_tokens = True, max_length = 600, pad_to_max_length = True, return_attention_mask = True, return_tensors = 'pt', truncation=True ) input_ids.append(encoded_dict['input_ids']) attention_masks.append(encoded_dict['attention_mask']) input_ids = torch.cat(input_ids, dim=0) attention_masks = torch.cat(attention_masks, dim=0) test_dataset = TensorDataset(input_ids, attention_masks) test_dataloader = DataLoader( test_dataset, sampler = SequentialSampler(test_dataset), batch_size = batch_size ) # Predictions predictions = [] for batch in test_dataloader: b_input_ids = batch[0].to(device) b_input_mask = batch[1].to(device) with torch.no_grad(): output= model(b_input_ids, attention_mask=b_input_mask) logits = output.logits logits = logits.detach().cpu().numpy() pred_flat = np.argmax(logits, axis=1).flatten() predictions.extend(list(pred_flat)) # Write predictions for each paragraph df_output = pd.DataFrame({"target": predictions}).to_csv('./util/experiments/classification.csv', index=False) if len(data_test)==1: predictions[0] = 1 # extract ids for update-needed paragraphs (extract the idx with predicted target == 1) pos_ids = [idx for idx in range(len(predictions)) if predictions[idx]==1] neg_ids = [idx for idx in range(len(predictions)) if predictions[idx]==0] # feed the positive paragraphs to decoder paragraphs_needed = [data_test[idx] for idx in pos_ids] pd.DataFrame({"paragraph": paragraphs_needed}).to_csv("./util/experiments/paragraphs_needed.csv", index=False) # updated_paragraphs = decode(input_paragraph, input_trigger) config() updated_paragraphs = [call_gpt(paragraph.split(" -- ")[0], input_trigger) for paragraph in paragraphs_needed] # updated_paragraphs = call_vicuna(paragraphs_needed, input_trigger) # merge updated paragraphs with non-updated paragraphs paragraphs_merged = data_test.copy() paragraphs_merged = [str(par).split(" -- ")[0] for par in paragraphs_merged] for idx in range(len(pos_ids)): paragraphs_merged[pos_ids[idx]] = updated_paragraphs[idx] sep = "\n" # paragarphs_merged = ["".join(par.split(" -- ")[:-1]) for par in paragraphs_merged] updated_article = str(sep.join(paragraphs_merged)) updated_article = updated_article.replace("[{'summary_text': '", "").replace("'}]", "").strip() class_res = pd.read_csv("./util/experiments/classification.csv") if class_res.target.values.all() == 0: modified="False" if len(data_test)==1: modified="TRUE" updated_article = call_gpt(input_article, input_trigger) with open("./util/experiments/updated_article.txt", "w") as f: f.write(updated_article) # combine the predictions and paragraphs into csv format file merged_par_pred_df = pd.DataFrame({"paragraphs": data_test, "predictions": predictions}).to_csv("./util/experiments/par_with_class.csv") # return updated_article, modified, merged_par_pred_df modified_in_all = str(len(paragraphs_needed)) + " / " + str(len(data_test)) return updated_article, modified_in_all def copy_to_clipboard(t): with open("./util/experiments/updated_article.txt", "r") as f: t = f.read() pyperclip.copy(t) demo = gr.Interface( main, [ gr.Textbox( lines=2, label="Non-updated Article", placeholder="Input the article..." ), gr.Textbox( lines=2, label="Triggered News Event", placeholder="Input the triggered news event..." ) ], [ gr.Textbox( lines=25, label="Output", ), gr.Textbox( lines=1, label="#MODIFIED/ALL" ), # btn = gr.Button(value="Copy Updated Article to Clipboard") # btn.click(copy_to_clipboard) # gr.components.Button(value="Copy Updated Article to Clipboard", fn=copy_to_clipboard), ], title="Event Triggered Article Updating System", description="Powered by YTLee", ) demo.launch()