Mixtral_ether / periodical_update_and_scheduling_test.py
jeduardogruiz's picture
Upload 22 files
516a027 verified
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for when the training and inference graphs are the same."""
import os
import tempfile
import tensorflow as tf
from tensorflow_model_optimization.python.core.common.keras.compression.algorithms import periodical_update_and_scheduling as svd
from tensorflow_model_optimization.python.core.keras.compat import keras
from tensorflow_model_optimization.python.core.keras.testing import test_utils_mnist
def _build_model():
i = keras.layers.Input(shape=(28, 28), name='input')
x = keras.layers.Reshape((28, 28, 1))(i)
x = keras.layers.Conv2D(
20, 5, activation='relu', padding='valid', name='conv1'
)(x)
x = keras.layers.MaxPool2D(2, 2)(x)
x = keras.layers.Conv2D(
50, 5, activation='relu', padding='valid', name='conv2'
)(x)
x = keras.layers.MaxPool2D(2, 2)(x)
x = keras.layers.Flatten()(x)
x = keras.layers.Dense(500, activation='relu', name='fc1')(x)
output = keras.layers.Dense(10, name='fc2')(x)
model = keras.Model(inputs=[i], outputs=[output])
return model
def _get_dataset():
mnist = keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
# Use subset of 60000 examples to keep unit test speed fast.
x_train = x_train[0:1000]
y_train = y_train[0:1000]
return (x_train, y_train), (x_test, y_test)
def _train_model(model):
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy'])
(x_train, y_train), _ = _get_dataset()
model.fit(x_train, y_train, epochs=1)
def _save_as_saved_model(model):
saved_model_dir = tempfile.mkdtemp()
model.save(saved_model_dir)
return saved_model_dir
# TODO(tfmot): reuse existing test utilities.
def _convert_to_tflite(saved_model_dir):
_, tflite_file = tempfile.mkstemp()
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
tflite_model = converter.convert()
with open(tflite_file, 'wb') as f:
f.write(tflite_model)
return tflite_file
def _get_directory_size_in_bytes(directory):
total = 0
try:
for entry in os.scandir(directory):
if entry.is_file():
# if it's a file, use stat() function
total += entry.stat().st_size
elif entry.is_dir():
# if it's a directory, recursively call this function
total += _get_directory_size_in_bytes(entry.path)
except NotADirectoryError:
# if `directory` isn't a directory, get the file size then
return os.path.getsize(directory)
except PermissionError:
# if for whatever reason we can't open the folder, return 0
return 0
return total
class FunctionalTest(tf.test.TestCase):
# TODO(tfmot): can simplify to single layer test that checks exact
# dimensions of weights.
def testSVD_ReducesSavedModelSize(self):
model = _build_model()
original_saved_model_dir = _save_as_saved_model(model)
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10)
training_model = algorithm.optimize_model(model)
compressed_model = algorithm.compress_model(training_model)
saved_model_dir = _save_as_saved_model(compressed_model)
original_size = _get_directory_size_in_bytes(original_saved_model_dir)
compressed_size = _get_directory_size_in_bytes(saved_model_dir)
self.assertLess(compressed_size, original_size / 3)
def testSVD_HasReasonableAccuracy_TF(self):
model = _build_model()
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10)
training_model = algorithm.optimize_model(model)
_train_model(training_model)
compressed_model = algorithm.compress_model(training_model)
_, (x_test, y_test) = _get_dataset()
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
compressed_model.compile(
optimizer='adam', loss=loss_fn, metrics=['accuracy'])
results = compressed_model.evaluate(x_test, y_test)
self.assertGreater(results[1], 0.60)
def testSVD_ReducesTFLiteModelSize(self):
model = _build_model()
original_saved_model_dir = _save_as_saved_model(model)
original_tflite_file = _convert_to_tflite(original_saved_model_dir)
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10)
training_model = algorithm.optimize_model(model)
compressed_model = algorithm.compress_model(training_model)
saved_model_dir = _save_as_saved_model(compressed_model)
compressed_tflite_file = _convert_to_tflite(saved_model_dir)
original_size = os.path.getsize(original_tflite_file)
compressed_size = os.path.getsize(compressed_tflite_file)
self.assertLess(compressed_size, original_size / 6)
def testSVD_HasReasonableAccuracy_TFLite(self):
model = _build_model()
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10)
training_model = algorithm.optimize_model(model)
_train_model(training_model)
compressed_model = algorithm.compress_model(training_model)
saved_model_dir = _save_as_saved_model(compressed_model)
compressed_tflite_file = _convert_to_tflite(saved_model_dir)
accuracy = test_utils_mnist.eval_tflite(compressed_tflite_file)
self.assertGreater(accuracy, 0.60)
# TODO(tfmot): can simplify to single layer test.
def testSVD_BreaksDownLayerWeights(self):
model = _build_model()
first_conv_layer = model.layers[2]
self.assertLen(first_conv_layer.weights, 2)
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10)
training_model = algorithm.optimize_model(model)
compressed_model = algorithm.compress_model(training_model)
first_conv_layer = compressed_model.layers[2]
self.assertLen(first_conv_layer.weights, 3)
# TODO(tfmot): can simplify to single layer test.
def testSVD_PreservesPretrainedWeights(self):
i = keras.layers.Input(shape=(2), name='input')
output = keras.layers.Dense(3, name='fc1')(i)
model = keras.Model(inputs=[i], outputs=[output])
dense_layer_weights = model.layers[1].get_weights()
algorithm = svd.SVD(rank=1, update_freq=1, warmup_step=10)
training_model = algorithm.optimize_model(model)
dense_layer_training_weights = training_model.layers[1].get_weights()
# kernel
algorithm.weight_reprs = []
algorithm.init_training_weights(dense_layer_weights[0])
w1_repr, w2_repr = algorithm.weight_reprs
assert (w1_repr.kwargs['initializer'](None) == \
dense_layer_training_weights[0]).numpy().all()
assert (w2_repr.kwargs['initializer'](None) == \
dense_layer_training_weights[1]).numpy().all()
# bias
assert (dense_layer_weights[1] == dense_layer_training_weights[2]).all()
if __name__ == '__main__':
tf.test.main()