import numpy as np import tensorflow as tf import tensorflow_addons as tfa from tensorflow.keras import layers import transformers import os MAX_LENGTH = 512 # the maximum number of messages per input BATCH_SIZE = 8 # number of messages processed at a time class MeanPool(tf.keras.layers.Layer): def call(self, inputs, mask=None): broadcast_mask = tf.expand_dims(tf.cast(mask, "float32"), -1) embedding_sum = tf.reduce_sum(inputs * broadcast_mask, axis=1) mask_sum = tf.reduce_sum(broadcast_mask, axis=1) mask_sum = tf.math.maximum(mask_sum, tf.constant([1e-9])) return embedding_sum / mask_sum class WeightsSumOne(tf.keras.constraints.Constraint): def __call__(self, w): return tf.nn.softmax(w, axis=0) def deberta_init( pretrained_model_name: str = "microsoft/deberta-v3-large", tokenizer_dir: str = "." ): """Helper function to quickly initialize the config and tokenizer for a model Args: pretrained_model_name (str, optional): The model name. Defaults to "microsoft/deberta-v3-large". tokenizer_dir (str, optional): Directory of the tokenizer. Defaults to ".". Returns: The configuration and tokenizer of the model. """ tokenizer = transformers.AutoTokenizer.from_pretrained(pretrained_model_name) tokenizer_path = os.path.join(tokenizer_dir, "tokenizer") tokenizer.save_pretrained(tokenizer_path) cfg = transformers.AutoConfig.from_pretrained( pretrained_model_name, output_hidden_states=True ) cfg.hidden_dropout_prob = 0 cfg.attention_probs_dropout_prob = 0 cfg.save_pretrained(tokenizer_path) return cfg, tokenizer def get_model(cfg): """Get a DeBERTa model using the specified configuration Args: cfg : the configuration of the model (can be generated using deberta_init) Returns: The model with respect to the given configuration. """ input_ids = tf.keras.layers.Input( shape=(MAX_LENGTH,), dtype=tf.int32, name="input_ids" ) attention_masks = tf.keras.layers.Input( shape=(MAX_LENGTH,), dtype=tf.int32, name="attention_masks" ) deberta_model = transformers.TFAutoModel.from_pretrained( "microsoft/deberta-v3-large", config=cfg ) REINIT_LAYERS = 1 normal_initializer = tf.keras.initializers.GlorotUniform() zeros_initializer = tf.keras.initializers.Zeros() ones_initializer = tf.keras.initializers.Ones() for encoder_block in deberta_model.deberta.encoder.layer[-REINIT_LAYERS:]: for layer in encoder_block.submodules: if isinstance(layer, tf.keras.layers.Dense): layer.kernel.assign( normal_initializer( shape=layer.kernel.shape, dtype=layer.kernel.dtype ) ) if layer.bias is not None: layer.bias.assign( zeros_initializer( shape=layer.bias.shape, dtype=layer.bias.dtype ) ) elif isinstance(layer, tf.keras.layers.LayerNormalization): layer.beta.assign( zeros_initializer(shape=layer.beta.shape, dtype=layer.beta.dtype) ) layer.gamma.assign( ones_initializer(shape=layer.gamma.shape, dtype=layer.gamma.dtype) ) deberta_output = deberta_model.deberta(input_ids, attention_mask=attention_masks) hidden_states = deberta_output.hidden_states # WeightedLayerPool + MeanPool of the last 4 hidden states stack_meanpool = tf.stack( [MeanPool()(hidden_s, mask=attention_masks) for hidden_s in hidden_states[-4:]], axis=2, ) weighted_layer_pool = layers.Dense( 1, use_bias=False, kernel_constraint=WeightsSumOne() )(stack_meanpool) weighted_layer_pool = tf.squeeze(weighted_layer_pool, axis=-1) output = layers.Dense(15, activation="linear")(weighted_layer_pool) model = tf.keras.Model(inputs=[input_ids, attention_masks], outputs=output) # Compile model with Layer-wise Learning Rate Decay layer_list = [deberta_model.deberta.embeddings] + list( deberta_model.deberta.encoder.layer ) layer_list.reverse() INIT_LR = 1e-5 LLRDR = 0.9 LR_SCH_DECAY_STEPS = 1600 lr_schedules = [ tf.keras.optimizers.schedules.ExponentialDecay( initial_learning_rate=INIT_LR * LLRDR**i, decay_steps=LR_SCH_DECAY_STEPS, decay_rate=0.3, ) for i in range(len(layer_list)) ] lr_schedule_head = tf.keras.optimizers.schedules.ExponentialDecay( initial_learning_rate=1e-4, decay_steps=LR_SCH_DECAY_STEPS, decay_rate=0.3 ) optimizers = [ tf.keras.optimizers.Adam(learning_rate=lr_sch) for lr_sch in lr_schedules ] optimizers_and_layers = [ (tf.keras.optimizers.Adam(learning_rate=lr_schedule_head), model.layers[-4:]) ] + list(zip(optimizers, layer_list)) optimizer = tfa.optimizers.MultiOptimizer(optimizers_and_layers) model.compile( optimizer=optimizer, loss="mse", metrics=[tf.keras.metrics.RootMeanSquaredError()], ) return model def deberta_encode(texts: str, tokenizer): """Helper function to tokenize the text using the specified tokenizer""" input_ids = [] attention_mask = [] for text in texts: token = tokenizer( text, add_special_tokens=True, max_length=512, return_attention_mask=True, return_tensors="np", truncation=True, padding="max_length", ) input_ids.append(token["input_ids"][0]) attention_mask.append(token["attention_mask"][0]) return np.array(input_ids, dtype="int32"), np.array(attention_mask, dtype="int32") def predict(model, tokenizer, texts): """Predict the labels for each messages in texts Args: model: your DeBERTa model tokenizer: a tokenizer (can be generated by deberta_init) texts (_type_): _description_ Returns: _type_: _description_ """ prediction = model.predict(deberta_encode(texts, tokenizer)) labels = np.argmax(prediction, axis=1) return labels def load_model(cfg, model_dir: str = "."): """Helper function to load a DeBERTa model with pretrained weights Args: cfg: configuration for the model (can be generated with deberta_init) model_dir (str, optional): the directory of the pretrained weights. Defaults to ".". Returns: A DeBERTa model with pretrained weights. """ tf.keras.backend.clear_session() model = get_model(cfg) model_path = os.path.join(model_dir, "best_model_fold2.h5") model.load_weights(model_path) return model # map the integer labels to their original string representation DEBERTA_LABEL_MAP = { 0: "Greeting", 1: "Curiosity", 2: "Interest", 3: "Obscene", 4: "Annoyed", 5: "Openness", 6: "Anxious", 7: "Acceptance", 8: "Uninterested", 9: "Informative", 10: "Accusatory", 11: "Denial", 12: "Confused", 13: "Disapproval", 14: "Remorse", } def decode_deberta_label(numeric_label): return DEBERTA_LABEL_MAP.get(numeric_label, "Unknown Label")