こしあん
2021-01-23

TFRecordを自作して最低限のCIFAR-10を訓練するまで


4.7k{icon} {views}


TFRecordを自作して、とりあえずCIFAR-10を訓練するための最低限の処理を書きました。なんでもBytesListに格納する方法です。

TFRecordについて

TFRecord、たびたび出てくるのですが、なくてもなんとかなっちゃう割に入門が面倒なので放置しがちなんですよね。今回はCIFAR-10を、自作のTFRecordを経由して訓練してみます。NumPyベースでできる最低限の処理をTFRecordで書きます。

そもそもTFRecordとはなにかというと、TensorFlowが推奨している機械学習のためのデータフォーマット。フォーマットはProtocol Bufferがベースとなっています。

TFRecords と tf.Example の使用法
TFRecordデータの消費

NumPy配列を記録するには

TFRecordはいろいろ作り方があり、最初は「どの方法で書けばよいか」「どのフォーマットでシリアライズすればいいか」がかなり悩みます。とりあえずNumPy配列のデータをRecord化して、最低限訓練できればいいかなということを探ってみました。

TFRecordには3つの型があります。

  • tf.train.BytesList(string、byte)
  • tf.train.FloatList(float, double)
  • tf.train.Int64List(bool, enum, int32, uint32, int64, uint64)

最初「画像はInt64ListかFloatListだろうな」と思っていたのですが、わざわざ64ビットを使うまでもない変数に64ビット割り当てるのは無駄です。ストレージの容量的には圧縮かかるため問題なくても、メモリを64ビット分どかどか食っていくのは後々まずそうです。

この他に、試していると、BytesList、FloatList、Int64Listは1次元配列なら記録できるが、多次元配列は記録できないという問題がありました(TF2.3.1時点)。次のようなエラーになります。

Tensorflow error “has type list, but expected one of: int, long, float”

「shapeの構造を保存できないんだったら、NumPy配列をByte列にしてTFRecordではBytesListで保持していたほうが明らかに便利でしょ?」と考えました。NumPy配列でなくても例えばJPEGのByte列を持っておくなどにも応用できます。とりあえずByte列なら発想としてはわかりやすい。

NumPy配列のTFRecord化

次のコードになります。

import tensorflow as tf
import numpy as np

def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def serialize_sample(image, label):
    image_binary = (image.astype(np.float32) / 255.0).tobytes()
    label_binary = np.eye(10).astype(np.float32)[label].tobytes()
    image_list = _bytes_feature(image_binary)
    label_list = _bytes_feature(label_binary)
    proto = tf.train.Example(features=tf.train.Features(feature={
        "image": image_list, # float32, (32, 32, 3)
        "label": label_list # float32, (10, )
    }))
    return proto.SerializeToString()

def write_record():
    (X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()

    with tf.io.TFRecordWriter("train.tfrecord") as writer:
        for i in range(X_train.shape[0]):
            example = serialize_sample(X_train[i], y_train[i])
            writer.write(example)
    with tf.io.TFRecordWriter("test.tfrecord") as writer:
        for i in range(X_test.shape[0]):
            example = serialize_sample(X_test[i], y_test[i])
            writer.write(example)

if __name__ == "__main__":
    write_record()

tf.io.TFRecordWriterを使って書く方法がやりやすい。サンプル単位でシリアライズして書き込みます。

サンプル単位のシリアライズ処理は「serialize_sample」の関数にあります。NumPy配列をByte列を経由してTensorFlowのテンソルにわたす方法はこちらに書きました。

とりあえず.tobytes()でByte列に変換して、tf.train.Featureでラップするというものです。この処理は公式のコードのコピペです。

あとはProtocol Bufferのシリアライズですが、これも公式コードのコピペで作りました。dict方式で必要に応じて項目を追加していけばいいと思います。CIFAR-10は固定解像度なので特に不要ですが、解像度がいろいろあるデータのときは、画像データのshapeも入れるといいでしょう。

前処理部分として、

  • 画像はFloat32に変換して0-1スケール
  • ラベルはOnehotエンコーディングしてFloat32にする

という処理を挟んでいます。ファイルサイズはtrain.tfrecordが590MB、test.tfrecordが118MBでした。無圧縮の理論値でしょうか。オプションで圧縮もかけられます(後述)。

作ったTFRecordを読み出してみる

先程作ったTFRecordを読み出すには次のようにします。

import tensorflow as tf
import tensorflow.keras.layers as layers
import numpy as np

def deserialize_example(serialized_string):
    image_feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.string),
    }
    example = tf.io.parse_single_example(serialized_string, image_feature_description)
    image = tf.reshape(tf.io.decode_raw(example["image"], tf.float32), (32, 32, 3))
    label = tf.io.decode_raw(example["label"], tf.float32)
    return image, label

def read_record():
    dataset = tf.data.TFRecordDataset("train.tfrecord").map(deserialize_example).batch(4)
    for x in dataset:
        print(x)
        break

if __name__ == "__main__":
    read_record()

image_feature_descriptionというスキーマーを作って、tf.io.parse_single_exampleとします。これ遅いらしいんですが、prefetchすればある程度速くなるんで、ここではサンプル単位で読み出すという設計思想を大事にします。

入力されるのはProtocol Bufferでシリアライズされた文字列。スキーマーの部分の「tf.io.FixedLenFeature([], tf.string)」はByte列の文字列を受け取りますよという意味します。

それを「tf.io.decode_raw」でテンソルに戻しています。shapeが不定になる場合は、元のshapeもTFRecordに入れておくと良さそうですね。これを実行すると次の通り。

(<tf.Tensor: shape=(4, 32, 32, 3), dtype=float32, numpy=
array([[[[0.23137255, 0.24313726, 0.24705882],
         [0.16862746, 0.18039216, 0.1764706 ],
         [0.19607843, 0.1882353 , 0.16862746],
         ...,
         (中略)
         [0.21176471, 0.18431373, 0.10980392],
         [0.24705882, 0.21960784, 0.14509805],
         [0.28235295, 0.25490198, 0.18039216]]]], dtype=float32)>, <tf.Tensor: shape=(4, 10), dtype=float32, numpy=
array([[0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 1.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 1.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.]], dtype=float32)>)

いい感じになりました。

TFRecordによる訓練

import tensorflow as tf
import tensorflow.keras.layers as layers
import numpy as np

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

def deserialize_example(serialized_string):
    image_feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.string),
    }
    example = tf.io.parse_single_example(serialized_string, image_feature_description)
    image = tf.reshape(tf.io.decode_raw(example["image"], tf.float32), (32, 32, 3))
    label = tf.io.decode_raw(example["label"], tf.float32)
    return image, label

def conv_bn_relu(inputs, chs):
    x = layers.Conv2D(chs, 3, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    return layers.ReLU()(x)

def create_model():
    inputs = layers.Input((32, 32, 3))
    x = inputs
    for chs in [64, 128, 256]:
        for i in range(3):
            x = conv_bn_relu(x, chs)
        x = layers.AveragePooling2D(2)(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(10, activation="softmax")(x)
    return tf.keras.models.Model(inputs, x)

def main():
    trainset = tf.data.TFRecordDataset("train.tfrecord").map(deserialize_example).shuffle(2048).repeat().batch(128)
    testset = tf.data.TFRecordDataset("test.tfrecord").map(deserialize_example).batch(128)

    model = create_model()
    model.compile("adam", "categorical_crossentropy", ["accuracy"])
    model.fit(trainset, steps_per_epoch=50000//128, validation_data=testset, epochs=3)

if __name__ == "__main__":
    main()

注意点としてはディスクのスループットが結構ボトルネックになるので、TFRecordの置き場はHDDよりもSSDのほうがいいでしょう。

390/390 [==============================] - 15s 39ms/step - loss: 1.2676 - accuracy: 0.5360 - val_loss: 2.7957 - val_accuracy: 0.2260
Epoch 2/3
390/390 [==============================] - 15s 39ms/step - loss: 0.7906 - accuracy: 0.7214 - val_loss: 1.9798 - val_accuracy: 0.5060
Epoch 3/3
390/390 [==============================] - 16s 40ms/step - loss: 0.6133 - accuracy: 0.7867 - val_loss: 1.1967 - val_accuracy: 0.6233

普通にNumPy配列で訓練すると

CIFAR-10程度でTFRecordは不要ですが、普通にNumPy配列から訓練すると次のような経過になります。オンメモリのデータなのでこちらのほうが基本的に速くなります。ただ、Data Augmentationより前の前処理が重いと、TFRecordのほうが速くなるかもしれません。

def main():
    #trainset = tf.data.TFRecordDataset("train.tfrecord").map(deserialize_example).shuffle(2048).repeat().batch(128)
    #testset = tf.data.TFRecordDataset("test.tfrecord").map(deserialize_example).batch(128)
    (X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()
    trainset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).map(
        lambda x, y: (tf.image.convert_image_dtype(x, tf.float32), tf.cast(y, tf.float32))
        ).shuffle(2048).repeat().batch(128)
    testset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).map(
        lambda x, y: (tf.image.convert_image_dtype(x, tf.float32), tf.cast(y, tf.float32))
        ).batch(128)

    model = create_model()
    # one-hotではないので損失関数はsparse
    model.compile("adam", "sparse_categorical_crossentropy", ["accuracy"]) 
    model.fit(trainset, steps_per_epoch=50000//128, validation_data=testset, epochs=3)
390/390 [==============================] - 14s 36ms/step - loss: 1.2956 - accuracy: 0.5298 - val_loss: 2.1240 - val_accuracy: 0.3646
Epoch 2/3
390/390 [==============================] - 14s 35ms/step - loss: 0.8241 - accuracy: 0.7079 - val_loss: 1.4766 - val_accuracy: 0.5713
Epoch 3/3
390/390 [==============================] - 14s 36ms/step - loss: 0.6346 - accuracy: 0.7778 - val_loss: 1.2441 - val_accuracy: 0.6193

オンメモリのほうが若干速いですね。それはストレージとメモリだとスループットは段違いですから。

prefetchする

高速化したいのだったらTFRecordのケースをprefetchすると速くなります。これはバッチを前もって読んでおいてキャッシュしてくれる機能です。

def main():
    trainset = tf.data.TFRecordDataset("train.tfrecord").map(deserialize_example).shuffle(2048).repeat().batch(128).prefetch(50)
    testset = tf.data.TFRecordDataset("test.tfrecord").map(deserialize_example).batch(128).prefetch(50)

    model = create_model()
    model.compile("adam", "categorical_crossentropy", ["accuracy"])
    model.fit(trainset, steps_per_epoch=50000//128, validation_data=testset, epochs=3)
390/390 [==============================] - 11s 27ms/step - loss: 1.2628 - accuracy: 0.5406 - val_loss: 2.4775 - val_accuracy: 0.3088
Epoch 2/3
390/390 [==============================] - 10s 26ms/step - loss: 0.7998 - accuracy: 0.7173 - val_loss: 1.3415 - val_accuracy: 0.5913
Epoch 3/3
390/390 [==============================] - 10s 27ms/step - loss: 0.6118 - accuracy: 0.7883 - val_loss: 1.5369 - val_accuracy: 0.5890

ちょっと多かったかもしれませんが、50バッチ分prefetchしてみました。このぐらいキャッシュすると15s→10sとかなり高速化します。

圧縮をかける

TFRecordは圧縮をかけられます。ディスク容量をケチりたいときに使えます。ただし、展開時の解凍コストが増えるので「ディスク容量は減るが、解凍部分がボトルネックになって低速化する」ということがありえます。CPUに余裕があるときは使えますね。

圧縮したいときはTFRecordWriterにTFRecordOptionsのオプションを加えます。compression_typeは現在”GZIP”と”ZLIB”が選べます。もうちょっと高速な圧縮があってもいいと思うんですけどね

def write_record():
    (X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()

    with tf.io.TFRecordWriter("train.tfrecord", tf.io.TFRecordOptions(compression_type="GZIP")) as writer:
        for i in range(X_train.shape[0]):
            example = serialize_sample(X_train[i], y_train[i])
            writer.write(example)
    with tf.io.TFRecordWriter("test.tfrecord", tf.io.TFRecordOptions(compression_type="GZIP")) as writer:
        for i in range(X_test.shape[0]):
            example = serialize_sample(X_test[i], y_test[i])
            writer.write(example)

容量は次のようになります。容量をケチりたいとき用ですね。

  • 圧縮前:train.tfrecordが590MB、test.tfrecordが118MB
  • 圧縮後:train.tfrecordが190MB、test.tfrecordが38MB

読み込み時も圧縮フォーマットを指定します。自動的に判別はしてくれません。

def main():
    trainset = tf.data.TFRecordDataset("train.tfrecord", "GZIP").map(deserialize_example).shuffle(2048).repeat().batch(128).prefetch(50)
    testset = tf.data.TFRecordDataset("test.tfrecord", "GZIP").map(deserialize_example).batch(128).prefetch(50)

    model = create_model()
    model.compile("adam", "categorical_crossentropy", ["accuracy"])
    model.fit(trainset, steps_per_epoch=50000//128, validation_data=testset, epochs=3)

訓練ログは次のようになります。

390/390 [==============================] - 11s 28ms/step - loss: 1.2691 - accuracy: 0.5390 - val_loss: 2.2522 - val_accuracy: 0.3326
Epoch 2/3
390/390 [==============================] - 11s 27ms/step - loss: 0.8107 - accuracy: 0.7126 - val_loss: 1.7273 - val_accuracy: 0.5426
Epoch 3/3
390/390 [==============================] - 11s 27ms/step - loss: 0.6233 - accuracy: 0.7855 - val_loss: 0.9054 - val_accuracy: 0.7100

10秒から11秒と若干遅くなっています。これは解凍処理で余計な計算コストが発生しているためです。今前処理ほとんどありませんが、Data Augmentationが重くなってくると、どんどん計算が乗ってくるので、圧縮はそこまで積極的にやるほどでもないかなとは思います。

まとめ

「速度はさておいて、とりあえずByte列化してTFRecordに入れておくとわかりやすく扱えそう」ということでした。今回はNumPy配列でどうにかなるケースなのでTFRecordは不要ですが、大きいデータで必要になったときにこれを思い出せば、なんとかできるのではないでしょうか。



Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内

技術書コーナー

北海道の駅巡りコーナー


One Comment

Add a Comment

メールアドレスが公開されることはありません。 が付いている欄は必須項目です