こしあん
2019-09-16

画像分類で比較するBatch Norm, Instance Norm, Spectral Normの勾配の大きさ

Pocket
LINEで送る


GANの安定化のために、Batch Normalizationを置き換えるということがしばしば行われます。その置き換え先として、Spectral Norm、Instance Normなどが挙げられます。今回はGANではなく普通の画像分類の問題としてBatch Normを置き換えし、勾配のノルムどのように変わるかを比較します。

はじめに

GANの安定化のためにBatch Normalizationをそのまま使わない、ここを工夫するというのはほとんど定石になりつつあります。最たるものが「Spectral Normalization」で、特異値分解によるリプシッツ定数のコントロールにより、Dのリプシッツ連続性を保証するような制約をおくということを行っています。具体的には、GANのDのBatch NormをSpectral Normで置き換えするという単純かつ明快なものです。このSpectral Normは、GANにおいてImage Netに対する現在のSoTAであるBigGANにおいても広く使われています。

Spectral Normを使わなくてもBatch Normの置き換えをするというケースはあります。例えば、pix2pix HDではNormalizationにInstance Normalizationを使っています。またProgressive GANではLayer Normzalitionを使ったりPixelwise Normalizationを使ったりといろいろ工夫をしています。

今回は、一旦GANから離れて通常の画像分類の問題に戻り、「Normalizationの違いによってレイヤー単位での勾配がどの程度変わるのか」を調べます。具体的には、画像分類のモデルを訓練していって、レイヤー単位の勾配ノルムがどのように変化するのかを調べます。

やること

CIFAR-10の分類モデルを訓練します。以下の2つのモデルを用意します(モデルの詳細はコード参照)。

  • 10層モデル
  • ResNetライクなモデル(ResNetの論文からCIFARの解像度にあうように適当に変更したものです)

そして各モデルに対し、Normalizationを3種類変更します。すべてPyTorchに組み込まれているものです。

  • Batch Normalization
  • Instance Normalization
  • Spectral Normalization

すべてで6ケースになります。各ケースに対してエポックのはじめにConvレイヤーの勾配ノルムを計算します。具体的には、

def calc_gradients(model, input_x, target_y):
    model.zero_grad()
    loss_func = torch.nn.CrossEntropyLoss()
    loss = loss_func(model(input_x), target_y)
    loss.backward()
    grads = [p.grad.norm().item() for p in model.parameters() if len(p.size()) == 4]
    return grads

このようなコードで計算します。ここでinput_xは入力画像、target_yは本物のラベルです。テストデータすべてに対する勾配ノルムを計算し、ミニバッチ単位で平均します。この平均をエポック単位、ケース単位で比較します。

バッチサイズは128とし、学習率は0.1でモメンタム係数は0.9としました。100エポック訓練し、50エポック目と80エポック目で学習率を1/10にします。また、ResNetライクなモデルに限り、Weight Decay=0.0001を追加しました。

注意

Instance NormのResNetライクなモデルだけ、なぜかうまく行かなかった(ロスがずっと下がらないし初期の勾配がすべて0になってしまう)ので結果を省略します。Batch Normはすべてで訓練が成功しましたが、Spectral NormやInstance Normを画像分類で使う場合は「初期値ガチャ」みたいなものがあるようで、そのガチャに失敗するとずっとロスが下がらないということが観測されました。ここに掲載しているのはうまくいったケースです。

テスト精度推移

テストデータをValidationとしたときの精度推移はこちらです。横軸がエポック、縦軸が精度を表します。

精度を単純に上げたいのなら、従来どおりBatch Normzalitionがやはり有効なようです。Spectral Normは振れ幅が低すぎてロスが下がり切る前に学習率が減衰してしまったのかもしれません。

Normalization別の勾配ノルム

次はモデル-エポック単位で切り出したときの勾配推移です。横軸が層のインデックス(大きい値ほど深い)、縦軸がノルムの値です。

Batch Normは訓練開始時の勾配がひたすら大きいという傾向があります。これはResNet、10層問わず同じです。Batch Normの勾配は訓練が進むほど他のNormalizationよりも低くなりがちになります。

Instance Normは10層モデルだけですが、Batch Normほど訓練開始時の勾配は大きくありません。最終的にはBatch Normと同じように訓練が進んでいきます。

少し変わっているのがSpectral Normです。Spectral Normの訓練開始時の勾配は驚くほど低いです。GANのように失敗が訓練開始時に起きやすいケースでは、Spectral Normは有効でしょう。しかし面白いのがその後で、訓練が進むほどBatch NormやInstance Normの勾配が下がっていくのに対して、Spectral Normはだんだん勾配が大きくなっていきます。特に10層モデルのようなResNetではない(Skip Connectionのない)ケースでは顕著で、層の深さに対して指数関数的に上がっているようにも見えます。Self attention GANやBig GANのように訓練が進むと崩壊するケースがあるのはもしかしたらこういう理由なのかもしれません。ResNetを使えば多少は勾配の発散を軽減できそうなので、GANでのSpectral NormがResNetとセットで使われるのもなんとなく理解できます。

Epoch別の勾配ノルム

次は、モデル-Normalization単位で切り出したときの勾配推移です。横軸が層のインデックス(大きいほど深い層)、縦軸がノルムの値です。

訓練が進むほど勾配が大きくなるのはどのNormalizationでも共通でした(ただしResNetのBatch Normだけ例外)。変わるのはNormalization別の相対的な順位ということでしょうか。

10層モデルの場合、Spectral Normは初期の勾配は低いですが、訓練が進むほど、層の深さに対して指数関数的な勾配構成になります。Batch NormやInstance Normの場合は真ん中の層の勾配が膨らむので、深い層の勾配が膨らむのはあんまりよくないような感じがします(GANでモデルを深くするというのはあんまりよくないという話は聞いた記憶がある)。Spectral Normでも、ResNetにすると深い層の勾配が膨らまなくなるので、ResNetは大事なのでしょう。

しかし、ResNetにするとなぜかBatch Normの初期の勾配が驚くほど高いという不思議なことがおこります。これは10層モデルでは起こりませんでした。確かにこれはこれでよくないのでしょう(Spectral Normが良いと言われる理由はおそらくこれ)。

まとめ

今回の実験をまとめます。

  • 訓練開始時の勾配ノルムは、「Batch Norm>Instance Norm>Spectral Norm」だった。特にResNetではBatch Normが初期になぜかとても高い勾配を出すので、これがGANの失敗に寄与しているのだと思われる。この点ではSpectral Normは良い。
  • しかし訓練が進んでいくと、Spectral Normの勾配ノルムがBatch Normよりも遥かに高くなるケースがある。ResNetを使わない場合、深い層ほど指数関数的に勾配が増加していくので、Spectral Normを使う場合はResNetを使ったほうがよさそう。GANで訓練がある程度進んだところから急に崩壊するというのは、このSpectral Normの勾配が大きくなりすぎていることによるものなのかもれない。
  • Spectral Normは現時点(2019年9月)では非常に有効なGANの安定化手法であるが、もしかするとまだまだ改良の余地があるのかもしれない。

コード

モデル(models.py)

import torch
from torch import nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from torch.nn.utils.spectral_norm import spectral_norm

class TenLayersModel(nn.Module):
    def __init__(self, normalization):
        super().__init__()
        self.convs = self.create_model(normalization)
        self.linear = nn.Linear(256, 10)

    def conv_norm_relu(self, in_ch, out_ch, normalization):
        layers = []
        if normalization == "spectral":
            w = nn.Conv2d(in_ch, out_ch, 3, padding=1)
            layers.append(spectral_norm(w))
        else:
            layers.append(nn.Conv2d(in_ch, out_ch, 3, padding=1))
            if normalization == "batch":
                layers.append(nn.BatchNorm2d(out_ch))
            elif normalization == "instance":
                layers.append(nn.InstanceNorm2d(out_ch))
        layers.append(nn.ReLU(True))
        return layers

    def create_model(self, normalization):
        layers = []
        for in_ch in [3, 64, 64]:
            layers += self.conv_norm_relu(in_ch, 64, normalization)
        layers.append(nn.AvgPool2d(2))
        for in_ch in [64, 128, 128]:
            layers += self.conv_norm_relu(in_ch, 128, normalization)
        layers.append(nn.AvgPool2d(2))
        for in_ch in [128, 256, 256]:
            layers += self.conv_norm_relu(in_ch, 256, normalization)
        layers.append(nn.AvgPool2d(8))
        return nn.Sequential(*layers)

    def forward(self, inputs):
        x = self.convs(inputs).view(inputs.size(0), -1)
        x = self.linear(x)
        return x

class ResNetPreactModule(nn.Module):
    def __init__(self, in_ch, out_ch, downsampling, normalization):
        assert normalization in ["batch", "spectral", "instance"]
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3, padding=1)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, padding=1)
        self.shortcut_conv = nn.Conv2d(in_ch, out_ch, 1) if in_ch != out_ch else None

        self.norm1 = self.get_normalization(in_ch, normalization)
        self.norm2 = self.get_normalization(out_ch, normalization)
        if normalization == "spectral":
            self.conv1 = spectral_norm(self.conv1)
            self.conv2 = spectral_norm(self.conv2)
            self.shortcut_conv = spectral_norm(self.shortcut_conv) if self.shortcut_conv is not None else None
        self.downsampling = nn.AvgPool2d(downsampling) if downsampling > 1 else None

    def get_normalization(self, ch, normalization):
        if normalization == "batch":
            return nn.BatchNorm2d(ch)
        elif normalization == "instance":
            return nn.InstanceNorm2d(ch)
        else:
            return None

    def forward(self, inputs):
        # main path
        x = self.norm1(inputs) if self.norm1 is not None else inputs
        x = F.relu(x)
        x = self.conv1(x)
        x = self.norm2(x) if self.norm2 is not None else x
        x = F.relu(x)
        x = self.conv2(x)
        # shortcut path
        shortcut = self.shortcut_conv(inputs) if self.shortcut_conv is not None else inputs
        # downsampling
        if self.downsampling is not None:
            x = self.downsampling(x)
            shortcut = self.downsampling(shortcut)
        return x + shortcut

class ResNetLikeModel(nn.Module):
    def __init__(self, normalization):
        super().__init__()
        self.conv = nn.Sequential(
            *self.resnet_block(3, 64, 3, normalization),
            *self.resnet_block(64, 128, 4, normalization),
            *self.resnet_block(128, 256, 6, normalization, enable_downsampling=False),
            nn.AvgPool2d(8)
        )
        if normalization == "batch":
            self.last_norm = nn.BatchNorm2d(256)
        elif normalization == "instance":
            self.last_norm = nn.InstanceNorm2d(256)
        else:
            self.last_norm = None
        self.linear = nn.Linear(256, 10)

    def resnet_block(self, in_ch, out_ch, reps, normalization, enable_downsampling=True):
        layers = []
        for i in range(reps):
            current_in = in_ch if i == 0 else out_ch
            down = 2 if i == reps-1 and enable_downsampling else 1
            layers.append(ResNetPreactModule(current_in, out_ch, down, normalization))
        return layers

    def forward(self, inputs):
        x = self.conv(inputs)
        x = self.last_norm(x) if self.last_norm is not None else x
        x = x.view(x.size(0), -1)
        x = self.linear(x)
        return x

勾配計算(compute_gradients.py)

import torch
import torchvision
from models import TenLayersModel, ResNetLikeModel
from torchvision import transforms
from tqdm import tqdm
import numpy as np
import os
import pickle

def load_cifar():
    trans = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))        
    ])
    trainset = torchvision.datasets.CIFAR10(root="./data", train=True, download=True, transform=trans)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True)
    testset = torchvision.datasets.CIFAR10(root="./data", train=False, download=True, transform=trans)
    testloader = torch.utils.data.DataLoader(testset, batch_size=128, shuffle=False)
    return trainloader, testloader

def calc_gradients(model, input_x, target_y):
    model.zero_grad()
    loss_func = torch.nn.CrossEntropyLoss()
    loss = loss_func(model(input_x), target_y)
    loss.backward()
    grads = [p.grad.norm().item() for p in model.parameters() if len(p.size()) == 4]
    return grads

def main(network, normalization):
    if network == "ten":
        model = TenLayersModel(normalization)
    elif network == "resnet":
        model = ResNetLikeModel(normalization)

    model_name = f"{network}_{normalization}"
    output_dir = "snapshot"
    if not os.path.exists(output_dir):
        os.mkdir(output_dir)

    device = "cuda"
    batch_size = 128
    model.to(device)
    model = torch.nn.DataParallel(model)

    trainloader, testloader = load_cifar()
    log_gradients = []
    log_loss = []
    log_val_acc = []

    weight_decay = 0.0001 if network == "resnet" else 0

    optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=weight_decay)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[50, 80], gamma=0.1)
    criterion = torch.nn.CrossEntropyLoss()
    max_val_acc = 0.0

    for epoch in tqdm(range(100)):
        # gradient check
        gradients = []
        for X, y in testloader:
            if len(X) != 128: continue
            X, y = X.to(device), y.to(device)
            gradients.append(calc_gradients(model, X, y))
        model.zero_grad()
        layer_grads = np.mean(np.array(gradients), axis=0)  # batch-wise mean
        log_gradients.append(layer_grads)

        # train
        train_loss = 0.0
        for i, (X, y) in enumerate(trainloader):
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            y_pred = model(X)
            loss = criterion(y_pred, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        log_loss.append(train_loss / (i + 1))

        # validation
        with torch.no_grad():
            correct, total = 0, 0
            for X, y in testloader:
                X, y = X.to(device), y.to(device)
                outputs = model(X)
                _, pred = torch.max(outputs.data, 1)
                total += y.size(0)
                correct += (pred == y).sum().item()
            log_val_acc.append(correct / total)

        # save model
        if max_val_acc < log_val_acc[-1]:
            torch.save(model.state_dict(), f"{output_dir}/{model_name}.pytorch")
            max_val_acc = log_val_acc[-1]

        scheduler.step()

        print("Epoch =", epoch, "Loss =", log_loss[-1], "Val_acc =", log_val_acc[-1], "/ ", model_name)
        # print(log_gradients[-1])

    # save result
    with open(f"{output_dir}/log_{model_name}.pkl", "wb") as fp:
        result = {"gradient":log_gradients, "loss":log_loss, "val_acc":log_val_acc}
        pickle.dump(result, fp)

if __name__ == "__main__":
    for model in ["ten", "resnet"]:
        for norm in ["batch", "instance", "spectral"]:
            main(model, norm)

Related Posts

PyTorchでSliced Wasserstein Distance (SWD)を実装した... PyTorchでSliced Wasserstein Distance (SWD)を実装してみました。オリジナルの実装はNumpyですが、これはPyTorchで実装しているので、GPU上で計算することができます。本来はGANの生成画像を評価するためのものですが、画像の分布不一致を見るためにも使うこ...
KerasのLearningRateSchedulerとPyTorchのLambdaLRの微妙な違い... 学習率の調整は大事です。エポック後に学習率を減衰させる際、現在のエポックを引数として更新後の学習率を返す関数を与えると便利なことが多いです。この操作はKeras,PyTorchどちらでもできますが、扱い方が微妙に違うところがあります。ここを知らないでKerasの感覚のままPyTorchでやったらハ...
Google ColabのTPUでResNetのベンチマークを取ってみた... Google ColaboratoryでTPUが使えるようになりましたが、さっそくどのぐらい速いのかベンチマークを取ってみました。以前やったResNetのベンチマークを使います。 環境:Google Colab(TPU)、TensorFlow:1.11.0-rc2、Keras:2.1.6 コ...
Channelwise Variational AutoEncoder(失敗) Variational Auto Encoder(VAE)を試していて、カラー画像は上手く行かなくてもグレースケール画像ならそこそこうまく行ったので、「じゃあチャンネル単位にVAEかけて後で結合すればカラーでもきれいにいくんじゃね?」と安直な発想で試してみたら失敗しました。それの記録を書きました。...
PyTorchでのConvTranspose2dのパラメーター設定について... VAE(Variational Auto Encoder)やGAN(Generative Adversarial Network)などで用いられるデコーダーで畳み込みの逆処理(Convtranspose2d)を使うことがあります。このパラメーター設定についてハマったので解説します。 エンコーダ...
Pocket
Delicious にシェア

Add a Comment

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です