前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【生成模型】简述概率密度函数可处理流模型

【生成模型】简述概率密度函数可处理流模型

作者头像
用户1508658
发布2020-11-19 16:08:37
1.2K1
发布2020-11-19 16:08:37
举报
文章被收录于专栏:有三AI有三AI

本期将介绍第二种非常优雅的生成模型—流模型,它也是一种概率密度函数可处理的生成模型。本文将对其原理进行介绍,并对nice模型的源码进行讲解。

作者&编辑 | 小米粥

1 流模型

这是一种想法比较直接但实际不容易构造的生成模型,它通过可逆的非线性变换等技巧使得似然函数可以被精确计算出来。对于分布比较简单(例如高斯分布)的隐变量z,其概率分布为 pz(z) ,这时若存在一个连续、可微、可逆的非线性变换g(z),将简单的潜变量z的分布转换成关于样本x的一个复杂分布,将非线性变换g(z)的逆变换记为f(x),则可得到样本x的准确的概率密度函数 px(x)

注意,非线性变换g(z)会引起空间的变形,即px(x)不等于 pz(f(x)),且有pz(z)dz=px(x)dx。若上述模型构建成功,则生成样本时只需从简单分布pz(z)中随机采样然后将其变换为 x=g(z) 即可。

为了训练非线性独立成分估计模型,我们必须计算样本的概率密度函数px(x)。分析上式,概率密度函数px(x)的计算需要计算pz(z)和雅可比矩阵的行列式绝对值。对于前者,要构建一个 g(z) 的逆变换f(x),这样当给定一个样本 x 后,就可以通过z=f(x)计算得到其对应的隐变量,pz(z)通常设计为简单的分布,容易计算;对于后者,要将f(x)设计为某种特殊的形式,使得雅可比矩阵的行列式易于计算。另外,变换的可逆性要求样本x和隐变量z具有相同的维度。综上,需要将生成模型精心设计成一种易于处理且灵活的双射模型,使其逆变换f(x)存在,且对应的雅可比矩阵的行列式可计算。

为了对这类模型有更深刻的理解,我们对NICE模型进行详细的介绍。NICE模型的逆变换f(x)由多个加性耦合层和一个尺度变换层构成。在每个加性耦合层中,首先将 n 维样本 x 分解为两部分x1和x2,例如将x的第1,3,5...个元素划入x1部分,将第2,4,6...个元素划入x2部分,每个部分的维度均为n/2;也可以将 x 的第2,4,6...个元素划入x1部分,将第1,3,5...个元素划入 x2 部分;或者使用其他划分方式。然后对两部分其进行变换:

其中m()为任意函数,注意这里要保证m()的输出结果维度与 x2 保持一致,NICE模型使用多层全连接网络和ReLU激活函数来构建 m() 。容易发现,使用加性耦合层作为逆变换f(x)的其中一部分,它是可逆的并且雅可比矩阵的行列式也是容易计算的。当已知h1和h2时,可得其逆变换:

其雅可比矩阵为:

根据三角矩阵的性质,其行列式为对角元素的乘积,故加性耦合层雅可比矩阵的行列式绝对值为1。当将多个加性耦合层串联时,如下图所示。由于每个层的逆变换是容易计算的,则串联后的逆变换仍然是容易计算的。此时的雅可比矩阵为:

根据矩阵行列式的性质,有:

需要说明的是,必须注意在不同的加性耦合层使用不同的划分策略,使得样本不同维度的信息充分混淆。在尺度变换层,定义了包含n个非负参数的向量s=[s1,s2,...,sn],将加性耦合层的输出结果h(l)与s逐元素相乘可得到对应的隐变量z。这里s用于控制每个维度的特征变换的尺度,可以表征维度的重要性,对应维度的数值较大表明这一维度的重要性低,因为生成样本时隐变量需要先经过尺度变换层,隐变量在尺度变换层需要逐元素乘1/s。显然,尺度变换层的逆变换只需逐元素乘1/s,为了计算雅可比矩阵,将尺度变换写为对角矩阵的形式:

则其雅可比矩阵的行列式为s1s2...sn。现在,我们构造了可逆的、雅可比矩阵的行列式绝对值易于计算的逆变换f(x),对于隐变量z,NICE模型假设其n个维度彼此独立,即

若选择z为高斯分布,则样本x的似然函数为:

若选择z为logistic分布,即

则样本x的似然函数为

现在,我们可以使用极大似然法对NICE模型进行训练,训练完成后也得到了生成模型g(z)。若 z 为高斯分布,则直接从高斯分布中采样可得到z;若选择 为logistic分布,可现在0-1之间的均匀分布中采样得到\epsilon ,然后使用变换z=t(ε)得到隐变量。根据两个随机变量的映射关系

则有

将隐变量z使用非线性变换g(z),即经过尺度变换层的逆变换、多个加性耦合层的逆变换可得到生成样本x。

2 NICE 代码

接下来我们将提供一份完整的NICE的代码讲解,其中训练集为mnist数据集。

首先读取相关python库:

import argparse 
import torch 
import torchvision 
import numpy as np 
import niceimport utils

定义主函数,设置相关参数,并进行模型的训练和验证

定义二维掩膜卷积核,其中有A与B两种类型,区别之处在于中心位置是否被卷积计算:

def main(args):
    device = torch.device("cuda:0") 
    # 训练相关参数设置
    dataset = args.dataset
    batch_size = args.batch_size
    latent = args.latent
    max_iter = args.max_iter
    sample_size = args.sample_size
    coupling = 4    mask_config = 1.
    # 优化器设置
    lr = args.lr
    momentum = args.momentum
    decay = args.decay
    zca = None
    mean = None
    # 选择相应的训练数据集
    if dataset == 'mnist':
        mean = torch.load('./statistics/mnist_mean.pt')        (full_dim, mid_dim, hidden) = (1 * 28 * 28, 1000, 5)        transform = torchvision.transforms.ToTensor()        trainset = torchvision.datasets.MNIST(root='~/torch/data/MNIST',            train=True, download=True, transform=transform)        trainloader = torch.utils.data.DataLoader(trainset,            batch_size=batch_size, shuffle=True, num_workers=2)         
  # 使用正态分布或逻辑回归分布
   if latent == 'normal':
        prior = torch.distributions.Normal(
            torch.tensor(0.).to(device), torch.tensor(1.).to(device))
    elif latent == 'logistic':
        prior = utils.StandardLogistic()
    filename = '%s_' % dataset \
             + 'bs%d_' % batch_size \
             + '%s_' % latent \
             + 'cp%d_' % coupling \
             + 'md%d_' % mid_dim \
             + 'hd%d_' % hidden
    # 实例化流模型
    flow = nice.NICE(prior=prior,
                coupling=coupling,
                in_out_dim=full_dim,
                mid_dim=mid_dim,
                hidden=hidden,
                mask_config=mask_config).to(device)
    optimizer = torch.optim.Adam(
        flow.parameters(), lr=lr, betas=(momentum, decay), eps=1e-4)
    total_iter = 0
    train = True
    running_loss = 0
    # 训练模型
    while train:
        for _, data in enumerate(trainloader, 1):
            flow.train()
    # set to training mode
            if total_iter == max_iter:
                train = False
                break
            total_iter += 1
            optimizer.zero_grad()
    # clear gradient tensors
            inputs, _ = data
            inputs = utils.prepare_data(
                inputs, dataset, zca=zca,
 mean=mean).to(device)
            # log-likelihood of input minibatch
            loss = -flow(inputs).mean()
            running_loss += float(loss)
            # backprop and update parameters
            loss.backward()
            optimizer.step()
            if total_iter % 10 == 0:
                mean_loss = running_loss / 2000
                bit_per_dim = (mean_loss + np.log(256.) * full_dim) \
                            / (full_dim * np.log(2.)) 
                print('iter %s:' % total_iter,
                    'loss = %.3f' % mean_loss,
                    'bits/dim = %.3f' % bit_per_dim)
                running_loss = 0.0
            if total_iter % 1000 == 0:
                flow.eval()
        # set to inference mode
                with torch.no_grad():
                    samples = flow.sample(sample_size).cpu()
                    samples = utils.prepare_data(                        samples, dataset, zca=zca, mean=mean, reverse=True)
                torchvision.utils.save_image(torchvision.utils.make_grid(samples),
'./samples/' + filename +'iter%d.png' % total_iter)
    # 保存模型
    torch.save({
        'total_iter': total_iter,
        'model_state_dict': flow.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'dataset': dataset,
        'batch_size': batch_size,
        'latent': latent,
        'coupling': coupling,
        'mid_dim': mid_dim,
        'hidden': hidden,
        'mask_config': mask_config},
        './models/mnist/' + filename +'iter%d.tar' % total_iter)
    print('Checkpoint Saved')
 if __name__ == '__main__':
    parser = argparse.ArgumentParser('MNIST NICE PyTorch implementation')
    parser.add_argument('--dataset',
                        help='dataset to be modeled.',
                        type=str,
                        default='mnist')    parser.add_argument('--batch_size', 
                        help='number of images in a mini-batch.',
                        type=int,
                        default=256)
    parser.add_argument('--latent',
                        help='latent distribution.',
                        type=str,
                        default='logistic')
    parser.add_argument('--max_iter',
                        help='maximum number of iterations.',
                        type=int,
                        default=20000)
    parser.add_argument('--sample_size',
                        help='number of images to generate.',
                        type=int,
                        default=64)
    parser.add_argument('--lr',
                        help='initial learning rate.',
                        type=float,
                        default=1e-3)
    parser.add_argument('--momentum', 
                        help='beta1 in Adam optimizer.',
                        type=float,
                        default=0.9)
    parser.add_argument('--decay',
                        help='beta2 in Adam optimizer.',
                        type=float,
                        default=0.999)
    args = parser.parse_args()
    main(args)
NICE流模型的核心代码:
# 加性耦合层class Coupling(nn.Module):
    def __init__(self, in_out_dim, mid_dim, hidden, mask_config):
        super(Coupling, self).__init__()
        self.mask_config = mask_config
        self.in_block = nn.Sequential(
            nn.Linear(in_out_dim//2, mid_dim),
            nn.ReLU())
        self.mid_block = nn.ModuleList([
            nn.Sequential(
                nn.Linear(mid_dim, mid_dim),
                nn.ReLU()) for _ in range(hidden - 1)])
        self.out_block = nn.Linear(mid_dim, in_out_dim//2)
    def forward(self, x, reverse=False):
        [B, W] = list(x.size())
        x = x.reshape((B, W//2, 2))
        if self.mask_config:
            on, off = x[:, :, 0], x[:, :, 1]
        else:
            off, on = x[:, :, 0], x[:, :, 1]
        off_ = self.in_block(off)
        for i in range(len(self.mid_block)):
            off_ = self.mid_block[i](off_)
        shift = self.out_block(off_)
        if reverse:
            on = on - shift
        else:
            on = on + shift
        if self.mask_config:
            x = torch.stack((on, off), dim=2)
        else:
            x = torch.stack((off, on), dim=2)
        return x.reshape((B, W))
 # 尺度变换层
 class Scaling(nn.Module):
    def __init__(self, dim):
        super(Scaling, self).__init__()
        self.scale = nn.Parameter(
            torch.zeros((1, dim)), requires_grad=True)
    def forward(self, x, reverse=False):
        log_det_J = torch.sum(self.scale)
        if reverse:
            x = x * torch.exp(-self.scale)
        else:
            x = x * torch.exp(self.scale)
        return x, log_det_J
 # NICE模型
 class NICE(nn.Module):
    def __init__(self, prior, coupling,
        in_out_dim, mid_dim, hidden, mask_config):
        self.prior = prior
        self.in_out_dim = in_out_dim
        self.coupling = nn.ModuleList([
            Coupling(in_out_dim=in_out_dim,
                     mid_dim=mid_dim,
                     hidden=hidden,
                     mask_config=(mask_config+i)%2) \            for i in range(coupling)])
        self.scaling = Scaling(in_out_dim)
    def g(self, z):
        x, _ = self.scaling(z, reverse=True)
        for i in reversed(range(len(self.coupling))):
            x = self.coupling[i](x, reverse=True)
        return x
    def f(self, x):
        for i in range(len(self.coupling)):
            x = self.coupling[i](x)
        return self.scaling(x)
    def log_prob(self, x):
        z, log_det_J = self.f(x)
        log_ll = torch.sum(self.prior.log_prob(z), dim=1)
        return log_ll + log_det_J
    def sample(self, size):
        z = self.prior.sample((size, self.in_out_dim)).cuda()        return self.g(z)
    def forward(self, x):
        return self.log_prob(x)

[1] Dinh L , Krueger D , Bengio Y . NICE: Non-linear Independent Components Estimation[J]. Computer ence, 2014.

[2]Diederik P. Kingma*†, Prafulla Dhariwal⇤. Glow: Generative Flow with Invertible 1x1 Convolutions[J]. 2018.

总结

本期带大家学习了流模型,流模型的生成效果也非常卓越,但是其网络设计比较复杂,训练难度比较大。下一期将为大家继续讲解其他生成模型。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有三AI 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档