こしあん
2019-10-29

TensorFlow2.0のTPUでモデルを保存したり、CPUと相互運用する方法


3.2k{icon} {views}


TensorFlow2.0+Colab TPUでモデルを保存する方法、CPUとTPUで保存した係数を相互運用する方法、TPUを意識したモデルの保存方法を見ていきます。

環境

CPU:Windows 10
TPU: Google Colab TPU
どちらもTensorFlow 2.0.0

CPUでのモデルの保存

訓練ループを書く例を紹介します。

import tensorflow as tf
import tensorflow.keras.layers as layers
import numpy as np

def create_model():
    inputs = layers.Input((28, 28))
    x = layers.Flatten()(inputs)
    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dense(10, activation="softmax")(x)
    return tf.keras.models.Model(inputs, x)

def load_dataset():
    (X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
    trainset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    trainset = trainset.map(
        lambda x, y: (tf.cast(x, tf.float32) /255.0, tf.cast(y, tf.float32))
    ).shuffle(50000).batch(128)

    test_set = tf.data.Dataset.from_tensor_slices((X_test, y_test))
    test_set = test_set.map(
        lambda x, y: (tf.cast(x, tf.float32) / 255.0, tf.cast(y, tf.float32))        
    ).batch(128)
    return trainset, test_set

def train():
    trainset, testset = load_dataset()

    model = create_model()
    optim = tf.keras.optimizers.Adam()

    loss_func = tf.keras.losses.SparseCategoricalCrossentropy()
    acc = tf.keras.metrics.SparseCategoricalAccuracy()

    @tf.function
    def train_on_batch(X, y):
        with tf.GradientTape() as tape:
            y_pred = model(X, training=True)
            loss = loss_func(y, y_pred)
        grad = tape.gradient(loss, model.trainable_weights)
        optim.apply_gradients(zip(grad, model.trainable_weights))
        acc.update_state(y, y_pred)
        return loss

    @tf.function
    def validation_on_batch(X, y):
        y_pred = model(X, training=False)
        loss = loss_func(y, y_pred)
        acc.update_state(y, y_pred)
        return loss

    for epoch in range(10):
        acc.reset_states()

        trainloss = []
        for X, y in trainset:
            trainloss.append(train_on_batch(X, y).numpy())
        trainloss = np.mean(np.asarray(trainloss))
        trainacc = acc.result().numpy()

        testloss = []
        acc.reset_states()
        for X, y in testset:
            testloss.append(validation_on_batch(X, y).numpy())
        testloss = np.mean(np.asarray(testloss))

        print("Epoch : ", epoch + 1, "train_loss : ", trainloss, "train_acc : ", trainacc,
              "test_loss : ", testloss, "test_acc : ", acc.result().numpy())

    model.save("model.h5") # 後々のTPUとの互換性を考えてH5形式で保存する(チェックポイントだとうまくいかない)

def inference():
    _, testset = load_dataset()

    model = tf.keras.models.load_model("model.h5")

    acc = tf.keras.metrics.SparseCategoricalAccuracy()
    for X, y in testset:
        y_pred = model(X, training=False)
        acc.update_state(y, y_pred)

    print("Test accuracy : ", acc.result().numpy())

trainを実行すると訓練が行われ、

Epoch :  1 train_loss :  0.35660258 train_acc :  0.9036 test_loss :  0.19367947 test_acc :  0.9418
Epoch :  2 train_loss :  0.1614553 train_acc :  0.95391667 test_loss :  0.13227987 test_acc :  0.962
Epoch :  3 train_loss :  0.115202285 train_acc :  0.9672667 test_loss :  0.11312597 test_acc :  0.9658
Epoch :  4 train_loss :  0.09036127 train_acc :  0.9738333 test_loss :  0.09663772 test_acc :  0.9702
Epoch :  5 train_loss :  0.07244045 train_acc :  0.9795333 test_loss :  0.08491239 test_acc :  0.9734
Epoch :  6 train_loss :  0.060106944 train_acc :  0.9826667 test_loss :  0.080243975 test_acc :  0.9759
Epoch :  7 train_loss :  0.050129887 train_acc :  0.98565 test_loss :  0.08160928 test_acc :  0.9751
Epoch :  8 train_loss :  0.042404808 train_acc :  0.98793334 test_loss :  0.07150439 test_acc :  0.9775
Epoch :  9 train_loss :  0.03540358 train_acc :  0.99065 test_loss :  0.07100736 test_acc :  0.9775
Epoch :  10 train_loss :  0.02988126 train_acc :  0.9921 test_loss :  0.07224209 test_acc :  0.9783

TensorFlowのドキュメントでは、チェックポイントとして保存することが一番最初に出てきますがTensorFlow2.0.0時点ではTPUとの互換性を考えるとh5形式で保存したほうがよさそうです。なぜなら、TPU環境でチェックポイントとして保存でしようとすると、Google Cloud Storage以外保存できなくなってしまう(エラーが出てローカルファイルに保存できない:詳しくはこちら)という現象があるからです。このエラーはh5形式として保存すると発生しません。

したがって、TPUとの互換性を考えるなら現時点ではh5形式として保存するのが簡単です。

この係数を読みこんで推論する、inferenceを実行すると、

Test accuracy :  0.9783

このように、最終エポックと同じ結果が復元されました。

TPUでのモデルの保存

次はTPU環境でのモデルの保存です。Google ColabでTPUを使うための準備をします(以降のTPUでの計算でも同様に必要です)。

!pip install tensorflow==2.0.0
import tensorflow as tf
import os
tpu_grpc_url = "grpc://" + os.environ["COLAB_TPU_ADDR"]
tpu_cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_grpc_url)
tf.config.experimental_connect_to_cluster(tpu_cluster_resolver) # TF2.0の場合、ここを追加
tf.tpu.experimental.initialize_tpu_system(tpu_cluster_resolver) # TF2.0の場合、今後experimentialが取れる可能性がある
strategy = tf.distribute.experimental.TPUStrategy(tpu_cluster_resolver)

コードはこちら

import tensorflow.keras.layers as layers
import numpy as np

def create_model():
    inputs = layers.Input((28, 28))
    x = layers.Flatten()(inputs)
    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dense(10, activation="softmax")(x)
    return tf.keras.models.Model(inputs, x)

def load_dataset():
    (X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
    trainset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    trainset = trainset.map(
        lambda x, y: (tf.cast(x, tf.float32) /255.0, tf.cast(y, tf.float32))
    ).shuffle(50000).batch(128)

    test_set = tf.data.Dataset.from_tensor_slices((X_test, y_test))
    test_set = test_set.map(
        lambda x, y: (tf.cast(x, tf.float32) / 255.0, tf.cast(y, tf.float32))        
    ).batch(128)
    return trainset, test_set

def train():
    trainset, testset = load_dataset()

    with strategy.scope():
        model = create_model()
        optim = tf.keras.optimizers.Adam()

        loss_func = tf.keras.losses.SparseCategoricalCrossentropy(reduction=tf.keras.losses.Reduction.NONE)
        acc = tf.keras.metrics.SparseCategoricalAccuracy()

        trainset = strategy.experimental_distribute_dataset(trainset)
        testset = strategy.experimental_distribute_dataset(testset)

        def train_on_batch(X, y):
            with tf.GradientTape() as tape:
                y_pred = model(X, training=True)
                loss = loss_func(y, y_pred)
                loss = tf.reduce_sum(loss, keepdims=True) / 128.0
            grad = tape.gradient(loss, model.trainable_weights)
            optim.apply_gradients(zip(grad, model.trainable_weights))
            acc.update_state(y, y_pred)
            return loss

        @tf.function
        def distributed_train_on_batch(X, y):
            loss = strategy.experimental_run_v2(train_on_batch, args=(X, y))
            return strategy.reduce(tf.distribute.ReduceOp.SUM, loss, axis=None)

        def validation_on_batch(X, y):
            y_pred = model(X, training=False)
            loss = loss_func(y, y_pred)
            loss = tf.reduce_sum(loss, keepdims=True) / 128.0
            acc.update_state(y, y_pred)
            return loss

        @tf.function
        def distributed_validation_on_batch(X, y):
            loss = strategy.experimental_run_v2(validation_on_batch, args=(X, y))
            return strategy.reduce(tf.distribute.ReduceOp.SUM, loss, axis=None)

        for epoch in range(10):
            acc.reset_states()

            trainloss = []
            for X, y in trainset:
                trainloss.append(distributed_train_on_batch(X, y).numpy())
            trainloss = np.mean(np.asarray(trainloss))
            trainacc = acc.result().numpy()

            testloss = []
            acc.reset_states()
            for X, y in testset:
                testloss.append(distributed_validation_on_batch(X, y).numpy())
            testloss = np.mean(np.asarray(testloss))

            print("Epoch : ", epoch + 1, "train_loss : ", trainloss, "train_acc : ", trainacc,
                "test_loss : ", testloss, "test_acc : ", acc.result().numpy())

        model.save("model_tpu.h5") # TF2.0ではh5形式でないと処理がえらい煩雑になる

def inference():
    _, testset = load_dataset()

    with strategy.scope():
        model = tf.keras.models.load_model("model_tpu.h5")
        testset = strategy.experimental_distribute_dataset(testset)
        acc = tf.keras.metrics.SparseCategoricalAccuracy()

        def validation_on_batch(X, y):
            y_pred = model(X, training=False)
            acc.update_state(y, y_pred)

        @tf.function
        def distributed_validation_on_batch(X, y):
            return strategy.experimental_run_v2(validation_on_batch, args=(X, y))

        for X, y in testset:
            distributed_validation_on_batch(X, y)

        print("Test accuracy : ", acc.result().numpy())

train()を実行すると、(TPUでのMLPはやたら遅いですけど、CNNはとても速いです)

Epoch :  1 train_loss :  0.353345 train_acc :  0.90295 test_loss :  0.18719012 test_acc :  0.9451
Epoch :  2 train_loss :  0.16474658 train_acc :  0.95318335 test_loss :  0.13890296 test_acc :  0.9567
Epoch :  3 train_loss :  0.119806044 train_acc :  0.9658333 test_loss :  0.11211825 test_acc :  0.9657
Epoch :  4 train_loss :  0.09255092 train_acc :  0.9739 test_loss :  0.09695558 test_acc :  0.9704
Epoch :  5 train_loss :  0.07436628 train_acc :  0.9784667 test_loss :  0.09559105 test_acc :  0.9716
Epoch :  6 train_loss :  0.06169544 train_acc :  0.98176664 test_loss :  0.08403554 test_acc :  0.9733
Epoch :  7 train_loss :  0.051985856 train_acc :  0.98515 test_loss :  0.07842376 test_acc :  0.9751
Epoch :  8 train_loss :  0.044806883 train_acc :  0.9866833 test_loss :  0.077447586 test_acc :  0.9765
Epoch :  9 train_loss :  0.03689432 train_acc :  0.98948336 test_loss :  0.07551964 test_acc :  0.9759
Epoch :  10 train_loss :  0.031945217 train_acc :  0.99118334 test_loss :  0.07882478 test_acc :  0.9774

モデルを読み込んで推論します。

WARNING:tensorflow:No training configuration found in save file: the model was *not* compiled. Compile it manually.
WARNING:tensorflow:No training configuration found in save file: the model was *not* compiled. Compile it manually.
Test accuracy :  0.9774

TPUでも同様に、精度を再現できました。

TPU→CPUの読み込み

ここからが本番で、TPUで訓練した重みをCPUで読み込んで推論します。実践的にはよくある形ですね。先程の「model_tpu.h5」をダウンロードしてきます。

import tensorflow as tf
import numpy as np

def load_dataset():
    (_, _), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
    test_set = tf.data.Dataset.from_tensor_slices((X_test, y_test))
    test_set = test_set.map(
        lambda x, y: (tf.cast(x, tf.float32) / 255.0, tf.cast(y, tf.float32))        
    ).batch(128)
    return test_set

def inference():
    testset = load_dataset()
    model = tf.keras.models.load_model("model_tpu.h5")

    acc = tf.keras.metrics.SparseCategoricalAccuracy()
    for X, y in testset:
        y_pred = model(X, training=False)
        acc.update_state(y, y_pred)

    print("Test accuracy : ", acc.result().numpy())

if __name__ == "__main__":
    inference()

このようにTPUでの訓練結果が再現されました。

Test accuracy :  0.9774

Kerasでcompile→fitで訓練するケースで、h5形式で保存し、かつカスタム損失関数や評価関数がある場合は読み込みで少し引っかかるかもしれません。これはTF1.X系からあるので以下の方法で解決できます。

Kerasでカスタム損失関数を用いたモデルをロードするとValueError
https://blog.shikoan.com/keras-load-custom-loss-model/

CPU→TPUの読み込み

ケースとしては少ないかもしれませんが、さっきの逆をやってみます。ColabにCPUで訓練した「model.h5」をアップロードします。

import numpy as np

def load_dataset():
    (_, _), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
    test_set = tf.data.Dataset.from_tensor_slices((X_test, y_test))
    test_set = test_set.map(
        lambda x, y: (tf.cast(x, tf.float32) / 255.0, tf.cast(y, tf.float32))        
    ).batch(128)
    return test_set

def inference():
    testset = load_dataset()

    with strategy.scope():
        model = tf.keras.models.load_model("model.h5")
        testset = strategy.experimental_distribute_dataset(testset)
        acc = tf.keras.metrics.SparseCategoricalAccuracy()

        def validation_on_batch(X, y):
            y_pred = model(X, training=False)
            acc.update_state(y, y_pred)

        @tf.function
        def distributed_validation_on_batch(X, y):
            return strategy.experimental_run_v2(validation_on_batch, args=(X, y))

        for X, y in testset:
            distributed_validation_on_batch(X, y)

        print("Test accuracy : ", acc.result().numpy())

if __name__ == "__main__":
    inference()
WARNING:tensorflow:No training configuration found in save file: the model was *not* compiled. Compile it manually.
WARNING:tensorflow:No training configuration found in save file: the model was *not* compiled. Compile it manually.
Test accuracy :  0.9782

小数第4位が違いますがほぼ誤差でしょう。

まとめ

h5形式でモデルを保存することで、TPU→CPU、CPU→TPUの相互のやり取りが無事できたということを確認できました。



Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内

技術書コーナー

北海道の駅巡りコーナー


Add a Comment

メールアドレスが公開されることはありません。 が付いている欄は必須項目です