""" Learn to classify the manually annotated CDA attributes (frames, 'riferimento', orientation) """ import sys import torch from allennlp.data.vocabulary import Vocabulary from allennlp.data import DatasetReader, TokenIndexer, Instance, Token from allennlp.data.fields import TextField, LabelField from allennlp.data.token_indexers.pretrained_transformer_indexer import ( PretrainedTransformerIndexer, ) from allennlp.data.tokenizers.pretrained_transformer_tokenizer import ( PretrainedTransformerTokenizer, ) from allennlp.models import BasicClassifier from allennlp.modules.text_field_embedders.basic_text_field_embedder import ( BasicTextFieldEmbedder, ) from allennlp.modules.token_embedders.pretrained_transformer_embedder import ( PretrainedTransformerEmbedder, ) from allennlp.modules.seq2vec_encoders.bert_pooler import BertPooler from allennlp.training.checkpointer import Checkpointer from allennlp.training.gradient_descent_trainer import GradientDescentTrainer from allennlp.data.data_loaders.simple_data_loader import SimpleDataLoader from allennlp.training.optimizers import AdamOptimizer from allennlp.predictors.text_classifier import TextClassifierPredictor from sklearn.svm import SVC from sklearn.feature_extraction.text import CountVectorizer from sklearn.metrics import precision_recall_fscore_support from sklearn.tree import DecisionTreeClassifier from sklearn.dummy import DummyClassifier import pandas as pd import numpy as np import spacy import json import os from typing import Dict, Iterable class MigrationReader(DatasetReader): def __init__(self, token_indexers, tokenizer): self.token_indexers = token_indexers self.tokenizer = tokenizer def text_to_instance(self, sentence, label=None) -> Instance: text_field = TextField(self.tokenizer.tokenize(sentence), self.token_indexers) fields = {"tokens": text_field} if label is not None: label_field = LabelField(label) fields["label"] = label_field return Instance(fields) def read_instances( self, text: pd.Series, labels: pd.Series ) -> Iterable[Instance]: for sentence, label in zip(text, labels): instance = self.text_to_instance(sentence, label) yield instance def train(attrib, use_gpu=False): assert attrib in ["cda_frame", "riferimento", "orientation", "fake"] # load data print("Loading data...") x_train, y_train, x_dev, y_dev = load_data(attrib) print(f"\t\ttrain size: {len(x_train)}") print(f"\t\tdev size: {len(x_dev)}") # try different setups print("Running training setups...") scores = [] setups = [ # defaults: remove_punct=True, lowercase=True, lemmatize=False, remove_stop=False # ({}, {}, {"type": "svm", "options": {"kernel": "linear", "C": 1.0}}), ( {}, {}, { "type": "bert", "options": {"transformer": "Musixmatch/umberto-commoncrawl-cased-v1"}, }, ), # ({"lemmatize": True, "remove_stop": True}, {}, {"type": "svm", "options": {"kernel": "linear", "C": 0.8}}), # ({"lemmatize": True, "remove_stop": True}, {"embed": False}, {"type": "svm", "options": {"kernel": "linear", "C": 0.8}}), # ({"lemmatize": True, "remove_stop": True}, {"embed": False}, {"type": "dummy", "options": {}}), # ({"lemmatize": True, "remove_stop": True}, {"embed": False}, {"type": "tree", "options": {}}), # ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel='linear')), # ({"lemmatize": True, "remove_stop": True}, {"min_freq": 5}, SVC(kernel='linear')), # ({"lemmatize": True, "remove_stop": True}, {"min_freq": 5, "max_freq": .70}, SVC(kernel='linear')), # ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel='linear', C=0.6)), # ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel='linear', C=0.7)), # ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel='linear', C=0.8)), # ({"lemmatize": True, "remove_stop": True}, {"ngram_range": (1,2)}, SVC(kernel='linear', C=0.8)), # ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel="rbf")), ] nlp = spacy.load("it_core_news_md") for s_idx, (text_options, vect_options, model_info) in enumerate(setups): if model_info["type"] == "bert": print("\t\tPreparing BERT model...") # cuda_device = 0 if torch.cuda.is_available() else -1 cuda_device = None if use_gpu and torch.cuda.is_available() else -1 transformer = model_info["options"]["transformer"] token_indexers = {"tokens": PretrainedTransformerIndexer(transformer)} tokenizer = PretrainedTransformerTokenizer(transformer) reader = MigrationReader(token_indexers, tokenizer) train_instances = list( reader.read_instances(x_train, y_train) ) dev_instances = list( reader.read_instances(x_dev, y_dev) ) vocab = Vocabulary.from_instances(train_instances + dev_instances) print(vocab.get_vocab_size("tags")) embedder = BasicTextFieldEmbedder( {"tokens": PretrainedTransformerEmbedder(transformer)} ) seq2vec = BertPooler(transformer) model = BasicClassifier(vocab, embedder, seq2vec, namespace="tags") if use_gpu: model = model.cuda(cuda_device) checkpoint_dir = f"/scratch/p289731/cda_classify/model_{attrib}/checkpoints/" serialization_dir = f"/scratch/p289731/cda_classify/model_{attrib}/serialize/" os.makedirs(checkpoint_dir) os.makedirs(serialization_dir) checkpointer = Checkpointer(checkpoint_dir) optimizer = AdamOptimizer( [(n, p) for n, p in model.named_parameters() if p.requires_grad], lr=1e-6 ) train_loader = SimpleDataLoader(train_instances, batch_size=8, shuffle=True) dev_loader = SimpleDataLoader(dev_instances, batch_size=8, shuffle=False) train_loader.index_with(vocab) dev_loader.index_with(vocab) print("\t\tTraining BERT model") trainer = GradientDescentTrainer( model, optimizer, train_loader, validation_data_loader=dev_loader, patience=32, checkpointer=checkpointer, cuda_device=cuda_device, serialization_dir=serialization_dir ) trainer.train() print("\t\tProducing predictions...") predictor = TextClassifierPredictor(model, reader) predictions = [predictor.predict(sentence) for sentence in x_dev] y_dev_pred = [p["label"] for p in predictions] class_labels = list(vocab.get_token_to_index_vocabulary("labels").keys()) elif model_info["type"] in ["svm", "tree", "dummy"]: # extract features print("\t\tExtracting features...") x_train_fts, vectorizer = extract_features( x_train, nlp, text_options, **vect_options ) x_dev_fts, _ = extract_features( x_dev, nlp, text_options, **vect_options, vectorizer=vectorizer ) if not vect_options["embed"]: print(f"\t\t\tnum features: {len(vectorizer.vocabulary_)}") else: assert model_info["type"] != "tree", "Decision tree does not support embedding input" print("\t\tTraining the model...") if model_info["type"] == "svm": model = SVC(**model_info["options"]) elif model_info["type"] == "tree": model = DecisionTreeClassifier() else: model = DummyClassifier() model.fit(x_train_fts, y_train) # evaluate on dev print("\t\tValidating the model...") y_dev_pred = model.predict(x_dev_fts) class_labels = model.classes_ p_micro, r_micro, f_micro, _ = precision_recall_fscore_support( y_dev, y_dev_pred, average="micro" ) p_classes, r_classes, f_classes, _ = precision_recall_fscore_support( y_dev, y_dev_pred, average=None, labels=class_labels, zero_division=0 ) print( f"\t\t\tOverall scores (micro-averaged):\tP={p_micro}\tR={r_micro}\tF={f_micro}" ) scores.append( { "micro": {"p": p_micro, "r": r_micro, "f": f_micro}, "classes": { "p": list(zip(class_labels, p_classes)), "r": list(zip(class_labels, r_classes)), "f": list(zip(class_labels, f_classes)), }, } ) prediction_df = pd.DataFrame( zip(x_dev, y_dev, y_dev_pred), columns=["headline", "gold", "prediction"] ) prediction_df.to_csv( f"output/migration/cda_classify/predictions_{attrib}_{s_idx:02}.csv" ) with open( f"output/migration/cda_classify/scores_{attrib}.json", "w", encoding="utf-8" ) as f_scores: json.dump(scores, f_scores, indent=4) def load_data(attrib): train_data = pd.read_csv("output/migration/preprocess/annotations_train.csv") dev_data = pd.read_csv("output/migration/preprocess/annotations_dev.csv") x_train = train_data["Titolo"] x_dev = dev_data["Titolo"] if attrib == "cda_frame": y_train = train_data["frame"] y_dev = dev_data["frame"] elif attrib == "riferimento": y_train = train_data["riferimento"] y_dev = dev_data["riferimento"] elif attrib == "orientation": y_train = train_data["orientation"] y_dev = dev_data["orientation"] # fake task to test setup else: y_train = pd.Series(["true" if "rifugiato" in exa else "false" for exa in x_train]) y_dev = pd.Series(["true" if "rifugiato" in exa else "false" for exa in x_dev]) return x_train, y_train, x_dev, y_dev def extract_features( headlines, nlp, text_options, embed=False, min_freq=1, max_freq=1.0, ngram_range=(1, 1), vectorizer=None, ): if embed: vectorized = np.array( [vec for vec in process_text(headlines, nlp, embed=True, **text_options)] ) else: tokenized = [ " ".join(sent) for sent in process_text(headlines, nlp, **text_options) ] if vectorizer is None: vectorizer = CountVectorizer( lowercase=False, analyzer="word", min_df=min_freq, max_df=max_freq, ngram_range=ngram_range, ) vectorized = vectorizer.fit_transform(tokenized) else: vectorized = vectorizer.transform(tokenized) return vectorized, vectorizer def process_text( headlines, nlp, embed=False, remove_punct=True, lowercase=True, lemmatize=False, remove_stop=False, ): for sent in headlines: doc = nlp(sent) tokens = ( t for t in doc if (not remove_stop or not t.is_stop) and (not remove_punct or t.pos_ not in ["PUNCT", "SYM", "X"]) ) if embed: if lemmatize: tokens = (t.vocab[t.lemma].vector for t in tokens) else: tokens = (t.vector for t in tokens if t.has_vector) else: if lemmatize: tokens = (t.lemma_ for t in tokens) else: tokens = (t.text for t in tokens) if lowercase: tokens = (t.lower() for t in tokens) if embed: token_arr = np.array([t for t in tokens]) if len(token_arr) == 0: yield np.random.rand(300) else: yield np.mean(token_arr, axis=0) else: yield list(tokens) if __name__ == "__main__": use_gpu = True if sys.argv[1] == "gpu" else False # train(attrib="fake", use_gpu=use_gpu) train(attrib="cda_frame", use_gpu=use_gpu) # train(attrib="riferimento") # train(attrib="orientation")