NCTC / models /official /nlp /modeling /layers /transformer_scaffold_test.py
NCTCMumbai's picture
Upload 2571 files
0b8359d
raw
history blame
21.8 kB
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for Keras-based transformer block layer."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import numpy as np
import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling.layers import attention
from official.nlp.modeling.layers import transformer_scaffold
# Test class that wraps a standard attention layer. If this layer is called
# at any point, the list passed to the config object will be filled with a
# boolean 'True'. We register this class as a Keras serializable so we can
# test serialization below.
@tf.keras.utils.register_keras_serializable(package='TestOnlyAttention')
class ValidatedAttentionLayer(attention.MultiHeadAttention):
def __init__(self, call_list, **kwargs):
super(ValidatedAttentionLayer, self).__init__(**kwargs)
self.list = call_list
def call(self, inputs, attention_mask=None):
self.list.append(True)
return super(ValidatedAttentionLayer, self).call(
inputs, attention_mask=attention_mask)
def get_config(self):
config = super(ValidatedAttentionLayer, self).get_config()
config['call_list'] = []
return config
# Test class implements a simple feedforward layer. If this layer is called
# at any point, the list passed to the config object will be filled with a
# boolean 'True'. We register this class as a Keras serializable so we can
# test serialization below.
@tf.keras.utils.register_keras_serializable(package='TestOnlyFeedforward')
class ValidatedFeedforwardLayer(tf.keras.layers.Layer):
def __init__(self, call_list, activation, **kwargs):
super(ValidatedFeedforwardLayer, self).__init__(**kwargs)
self.list = call_list
self.activation = activation
def build(self, input_shape):
hidden_size = input_shape.as_list()[-1]
self._feedforward_dense = tf.keras.layers.experimental.EinsumDense(
'...x,xy->...y',
output_shape=hidden_size,
bias_axes='y',
activation=self.activation,
name='feedforward')
def call(self, inputs):
self.list.append(True)
return self._feedforward_dense(inputs)
def get_config(self):
config = super(ValidatedFeedforwardLayer, self).get_config()
config['call_list'] = []
config['activation'] = self.activation
return config
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
# guarantees forward compatibility of this code for the V2 switchover.
@keras_parameterized.run_all_keras_modes
class TransformerLayerTest(keras_parameterized.TestCase):
def tearDown(self):
super(TransformerLayerTest, self).tearDown()
tf.keras.mixed_precision.experimental.set_policy('float32')
def test_layer_creation(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
num_attention_heads=10,
intermediate_size=2048,
intermediate_activation='relu')
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
output_tensor = test_layer(data_tensor)
# The default output of a transformer layer should be the same as the input.
self.assertEqual(data_tensor.shape.as_list(), output_tensor.shape.as_list())
# If call_list[0] exists and is True, the passed layer class was
# instantiated from the given config properly.
self.assertNotEmpty(call_list)
self.assertTrue(call_list[0], "The passed layer class wasn't instantiated.")
def test_layer_creation_with_feedforward_cls(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
}
feedforward_call_list = []
feedforward_layer_cfg = {
'activation': 'relu',
'call_list': feedforward_call_list,
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
feedforward_cls=ValidatedFeedforwardLayer,
feedforward_cfg=feedforward_layer_cfg,
num_attention_heads=10,
intermediate_size=None,
intermediate_activation=None)
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
output_tensor = test_layer(data_tensor)
# The default output of a transformer layer should be the same as the input.
self.assertEqual(data_tensor.shape.as_list(), output_tensor.shape.as_list())
# If call_list[0] exists and is True, the passed layer class was
# instantiated from the given config properly.
self.assertNotEmpty(call_list)
self.assertTrue(call_list[0], "The passed layer class wasn't instantiated.")
self.assertNotEmpty(feedforward_call_list)
self.assertTrue(feedforward_call_list[0],
"The passed layer class wasn't instantiated.")
def test_layer_creation_with_mask(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
num_attention_heads=10,
intermediate_size=2048,
intermediate_activation='relu')
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor = test_layer([data_tensor, mask_tensor])
# The default output of a transformer layer should be the same as the input.
self.assertEqual(data_tensor.shape.as_list(), output_tensor.shape.as_list())
# If call_list[0] exists and is True, the passed layer class was
# instantiated from the given config properly.
self.assertNotEmpty(call_list)
self.assertTrue(call_list[0], "The passed layer class wasn't instantiated.")
def test_layer_creation_with_incorrect_mask_fails(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
num_attention_heads=10,
intermediate_size=2048,
intermediate_activation='relu')
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length - 3))
with self.assertRaisesRegex(ValueError, 'When passing a mask tensor.*'):
_ = test_layer([data_tensor, mask_tensor])
def test_layer_invocation(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
num_attention_heads=10,
intermediate_size=2048,
intermediate_activation='relu')
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
output_tensor = test_layer(data_tensor)
# Create a model from the test layer.
model = tf.keras.Model(data_tensor, output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
_ = model.predict(input_data)
# If call_list[0] exists and is True, the passed layer class was
# instantiated from the given config properly.
self.assertNotEmpty(call_list)
self.assertTrue(call_list[0], "The passed layer class wasn't instantiated.")
def test_layer_invocation_with_feedforward_cls(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
}
feedforward_call_list = []
feedforward_layer_cfg = {
'activation': 'relu',
'call_list': feedforward_call_list,
}
feedforward_layer = ValidatedFeedforwardLayer(**feedforward_layer_cfg)
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
feedforward_cls=feedforward_layer,
num_attention_heads=10,
intermediate_size=None,
intermediate_activation=None)
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor = test_layer([data_tensor, mask_tensor])
# Create a model from the test layer.
model = tf.keras.Model([data_tensor, mask_tensor], output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
# The attention mask should be of shape (batch, from_seq_len, to_seq_len),
# which here is (batch, sequence_length, sequence_length)
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
_ = model.predict([input_data, mask_data])
# If call_list[0] exists and is True, the passed layer class was
# instantiated from the given config properly.
self.assertNotEmpty(call_list)
self.assertTrue(call_list[0], "The passed layer class wasn't instantiated.")
self.assertNotEmpty(feedforward_call_list)
self.assertTrue(feedforward_call_list[0],
"The passed layer class wasn't instantiated.")
def test_layer_invocation_with_mask(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
num_attention_heads=10,
intermediate_size=2048,
intermediate_activation='relu')
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor = test_layer([data_tensor, mask_tensor])
# Create a model from the test layer.
model = tf.keras.Model([data_tensor, mask_tensor], output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
# The attention mask should be of shape (batch, from_seq_len, to_seq_len),
# which here is (batch, sequence_length, sequence_length)
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
_ = model.predict([input_data, mask_data])
# If call_list[0] exists and is True, the passed layer class was
# instantiated from the given config properly.
self.assertNotEmpty(call_list)
self.assertTrue(call_list[0], "The passed layer class wasn't instantiated.")
def test_layer_invocation_with_float16_dtype(self):
tf.keras.mixed_precision.experimental.set_policy('mixed_float16')
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
num_attention_heads=10,
intermediate_size=2048,
intermediate_activation='relu')
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor = test_layer([data_tensor, mask_tensor])
# Create a model from the test layer.
model = tf.keras.Model([data_tensor, mask_tensor], output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = (10 * np.random.random_sample(
(batch_size, sequence_length, width)))
# The attention mask should be of shape (batch, from_seq_len, to_seq_len),
# which here is (batch, sequence_length, sequence_length)
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
_ = model.predict([input_data, mask_data])
# If call_list[0] exists and is True, the passed layer class was
# instantiated from the given config properly.
self.assertNotEmpty(call_list)
self.assertTrue(call_list[0], "The passed layer class wasn't instantiated.")
def test_transform_with_initializer(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
num_attention_heads=10,
intermediate_size=2048,
intermediate_activation='relu',
kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02))
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
output = test_layer(data_tensor)
# The default output of a transformer layer should be the same as the input.
self.assertEqual(data_tensor.shape.as_list(), output.shape.as_list())
# If call_list[0] exists and is True, the passed layer class was
# instantiated from the given config properly.
self.assertNotEmpty(call_list)
self.assertTrue(call_list[0])
def test_layer_restoration_from_config(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
'name': 'test_layer',
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
num_attention_heads=10,
intermediate_size=2048,
intermediate_activation='relu')
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor = test_layer([data_tensor, mask_tensor])
# Create a model from the test layer.
model = tf.keras.Model([data_tensor, mask_tensor], output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
# The attention mask should be of shape (batch, from_seq_len, to_seq_len),
# which here is (batch, sequence_length, sequence_length)
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
pre_serialization_output = model.predict([input_data, mask_data])
# Serialize the model config. Pass the serialized data through json to
# ensure that we can serialize this layer to disk.
serialized_data = json.dumps(model.get_config())
post_string_serialized_data = json.loads(serialized_data)
# Create a new model from the old config, and copy the weights. These models
# should have identical outputs.
new_model = tf.keras.Model.from_config(post_string_serialized_data)
new_model.set_weights(model.get_weights())
output = new_model.predict([input_data, mask_data])
self.assertAllClose(pre_serialization_output, output)
# If the layer was configured correctly, it should have a list attribute
# (since it should have the custom class and config passed to it).
new_model.summary()
new_call_list = new_model.get_layer(
name='transformer_scaffold')._attention_layer.list
self.assertNotEmpty(new_call_list)
self.assertTrue(new_call_list[0],
"The passed layer class wasn't instantiated.")
def test_layer_with_feedforward_cls_restoration_from_config(self):
sequence_length = 21
width = 80
call_list = []
attention_layer_cfg = {
'num_heads': 10,
'key_size': 8,
'call_list': call_list,
'name': 'test_layer',
}
feedforward_call_list = []
feedforward_layer_cfg = {
'activation': 'relu',
'call_list': feedforward_call_list,
}
test_layer = transformer_scaffold.TransformerScaffold(
attention_cls=ValidatedAttentionLayer,
attention_cfg=attention_layer_cfg,
feedforward_cls=ValidatedFeedforwardLayer,
feedforward_cfg=feedforward_layer_cfg,
num_attention_heads=10,
intermediate_size=None,
intermediate_activation=None)
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor = test_layer([data_tensor, mask_tensor])
# Create a model from the test layer.
model = tf.keras.Model([data_tensor, mask_tensor], output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
# The attention mask should be of shape (batch, from_seq_len, to_seq_len),
# which here is (batch, sequence_length, sequence_length)
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
pre_serialization_output = model.predict([input_data, mask_data])
# Serialize the model config. Pass the serialized data through json to
# ensure that we can serialize this layer to disk.
serialized_data = json.dumps(model.get_config())
post_string_serialized_data = json.loads(serialized_data)
# Create a new model from the old config, and copy the weights. These models
# should have identical outputs.
new_model = tf.keras.Model.from_config(post_string_serialized_data)
new_model.set_weights(model.get_weights())
output = new_model.predict([input_data, mask_data])
self.assertAllClose(pre_serialization_output, output)
# If the layer was configured correctly, it should have a list attribute
# (since it should have the custom class and config passed to it).
new_model.summary()
new_call_list = new_model.get_layer(
name='transformer_scaffold')._attention_layer.list
self.assertNotEmpty(new_call_list)
self.assertTrue(new_call_list[0],
"The passed layer class wasn't instantiated.")
new_feedforward_call_list = new_model.get_layer(
name='transformer_scaffold')._feedforward_block.list
self.assertNotEmpty(new_feedforward_call_list)
self.assertTrue(new_feedforward_call_list[0],
"The passed layer class wasn't instantiated.")
if __name__ == '__main__':
tf.test.main()