不久之前,Google开源了TensorFlow,这是一个旨在简化图表计算的库。 主要的应用程序是针对深度学习,将神经网络以图形形式显示。 我花了几天的时间阅读他们的API和教程,我非常满意这些我所看到的内容。 尽管其他库提供了类似的功能,如GPU计算和符号差异化,但是它API的整洁性和对IPython栈的熟悉使其吸引我使用。
在这篇文章中,我尝试使用TensorFlow来实现经典的混合密度网络(Bishop '94)模型。在之前的博客文章中,我已经实现了MDN 。这是我学习使用TensorFlow的第一次尝试,可能有更多的方法来做一些事情,所以请在评论部分让我知道!
要开始,我们试着快速建立一个神经网络,以适应一些假的数据。因为甚至一个隐藏层都可以实现通用的函数逼近,所以我们可以看看是否可以训练一个简单的神经网络来拟合一个有噪声的正弦数据,就像这样(只是标准的高斯随机噪声):
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import math
导入库之后,我们生成了正弦数据,我们将训练一个神经网络,以适应新的情况:
NSAMPLE = 1000
x_data = np.float32(np.random.uniform(-10.5, 10.5, (1, NSAMPLE))).T
r_data = np.float32(np.random.normal(size=(NSAMPLE,1)))
y_data = np.float32(np.sin(0.75*x_data)*7.0+x_data*0.5+r_data*1.0)
plt.figure(figsize=(8, 8))
plot_out = plt.plot(x_data,y_data,'ro',alpha=0.3)
plt.show()
TensorFlow使用占位符作为最终保存数据的变量,以便稍后在图形上进行符号计算。
x = tf.placeholder(dtype=tf.float32, shape=[None,1])
y = tf.placeholder(dtype=tf.float32, shape=[None,1])
我们将定义这个简单神经网络的一个隐藏层和20个节点:
NHIDDEN = 20
W = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=1.0, dtype=tf.float32))
b = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=1.0, dtype=tf.float32))
W_out = tf.Variable(tf.random_normal([NHIDDEN,1], stddev=1.0, dtype=tf.float32))
b_out = tf.Variable(tf.random_normal([1,1], stddev=1.0, dtype=tf.float32))
hidden_layer = tf.nn.tanh(tf.matmul(x, W) + b)
y_out = tf.matmul(hidden_layer,W_out) + b_out
我们可以定义一个损失函数来计算输出的平方误差与数据的总和(如果需要,我们可以加上正则化)。
lossfunc = tf.nn.l2_loss(y_out-y);
我们还将定义一个训练操作,来告诉TensorFlow如何将损失函数最小化。只需一条线,我们就可以使用花式的RMSProp梯度下降优化方法。
train_op = tf.train.RMSPropOptimizer(learning_rate=0.1, decay=0.8).minimize(lossfunc)
为了能开始使用TensorFlow来计算事物,我们必须定义一个会话对象。在IPython shell中,我们使用InteractiveSession
。之后,我们需要运行一个命令来初始化所有的变量,其中计算图也将会在TensorFlow中生成。
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
我们将运行梯度下降1000次,以将通过字典输入数据的损失函数最小化。由于数据集不大,我们不会使用小批量。在下面的代码运行之后,重量和偏差参数将自动存储在它们各自的tf.Variable
对象中。
NEPOCH = 1000
for i in range(NEPOCH):
sess.run(train_op,feed_dict={x: x_data, y: y_data})
我喜欢的API是我们可以再次简单地使用sess.run()
来从网络中的任何运营商或者节点处生成数据。所以训练结束后,我们可以使用训练好的模型,然后再调用sess.run()
来生成预测,并绘制预测的数据与训练数据集。
在我们完成了这个练习后,我们应该使用close()来释放资源。
x_test = np.float32(np.arange(-10.5,10.5,0.1))
x_test = x_test.reshape(x_test.size,1)
y_test = sess.run(y_out,feed_dict={x: x_test})
plt.figure(figsize=(8, 8))
plt.plot(x_data,y_data,'ro', x_test,y_test,'bo',alpha=0.3)
plt.show()
sess.close()
正如我们所料,我们看到神经网络能很好地拟合这个正弦数据。然而,这种拟合方法只有当我们想要模拟一对一或者多对一的神经网络函数时才适用。举例来说,如果我们反转训练数据:
temp_data = x_data
x_data = y_data
y_data = temp_data
plt.figure(figsize=(8, 8))
plot_out = plt.plot(x_data,y_data,'ro',alpha=0.3)
plt.show()
如果我们使用相同的方法来拟合这个倒置的数据,显然它不会很好,我们希望看到一个神经网络只适合于数据的平均值。
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
for i in range(NEPOCH):
sess.run(train_op,feed_dict={x: x_data, y: y_data})
x_test = np.float32(np.arange(-10.5,10.5,0.1))
x_test = x_test.reshape(x_test.size,1)
y_test = sess.run(y_out,feed_dict={x: x_test})
plt.figure(figsize=(8, 8))
plt.plot(x_data,y_data,'ro', x_test,y_test,'bo',alpha=0.3)
plt.show()
sess.close()
我们目前的模型只能预测每个输入的一个输出值,所以这种方法将会失败。我们需要的是一个能够预测每个输入的不同输出值范围的模型。在下一节中,我们实现一个混合密度网络(MDN)来完成这个任务。
由Christopher Bishop在90年代开发的混合密度网络(MDNs)试图解决这个问题。该方法倾向于让网络预测一个单个的输出值,网络将预测出输出内容的整个概率分布。这个概念是相当强大的,可以用于当前机器学习研究的许多领域。它还使我们能够在网络正在进行的预测中计算某种信任因子。
我们选择的反正弦数据不仅仅是为了解决玩具问题,而且它在机器人领域有广泛地应用,例如,我们要确定哪个角度可以使移动机器人手臂移动到目标位置。MDN也可以用来模拟手写,其中的下个笔划是从多种可能性的概率分布里绘制的,而不是坚持一个预测。
Bishop的MDN实现将预测被称为混合高斯分布的一类概率分布,其中输出值被建模为许多高斯随机值的总和,每个高斯随机值都具有不同的均值和标准差。因此,对于每个输入,我们将预测其概率分布函数(pdf) 是较小高斯概率分布的概率加权和。
作为输入的函数,分布的每个参数 将由神经网络确定,这里有一个限制的总和加起来,以确保pdf整合到100%。在我们的实现中,我们将使用一个后来隐藏的24个节点的神经网络,并且还将产生24个混合,因此将有72个实际输出的单个输入的神经网络。我们的定义将分为两部分:
是72个值的向量,然后将其分成三个相等的部分:
pdf的参数将定义如下,以满足更早的条件:
基本上被放入softmax算子中以确保总和增加到1,并且每个混合概率是正的。
NHIDDEN = 24
STDEV = 0.5
KMIX = 24 # number of mixtures
NOUT = KMIX * 3 # pi, mu, stdev
x = tf.placeholder(dtype=tf.float32, shape=[None,1], name="x")
y = tf.placeholder(dtype=tf.float32, shape=[None,1], name="y")
Wh = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=STDEV, dtype=tf.float32))
bh = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=STDEV, dtype=tf.float32))
Wo = tf.Variable(tf.random_normal([NHIDDEN,NOUT], stddev=STDEV, dtype=tf.float32))
bo = tf.Variable(tf.random_normal([1,NOUT], stddev=STDEV, dtype=tf.float32))
hidden_layer = tf.nn.tanh(tf.matmul(x, Wh) + bh)
output = tf.matmul(hidden_layer,Wo) + bo
def get_mixture_coef(output):
out_pi = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam")
out_sigma = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam")
out_mu = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam")
out_pi, out_sigma, out_mu = tf.split(1, 3, output)
max_pi = tf.reduce_max(out_pi, 1, keep_dims=True)
out_pi = tf.sub(out_pi, max_pi)
out_pi = tf.exp(out_pi)
normalize_pi = tf.inv(tf.reduce_sum(out_pi, 1, keep_dims=True))
out_pi = tf.mul(normalize_pi, out_pi)
out_sigma = tf.exp(out_sigma)
return out_pi, out_sigma, out_mu
out_pi, out_sigma, out_mu = get_mixture_coef(output)
我们来定义我们想要训练的用于以后预测MDN的反向数据。由于这是一个更为复杂的预测任务,与之前的简单数据拟合任务相比,我使用了更多的样本。
NSAMPLE = 2500
y_data = np.float32(np.random.uniform(-10.5, 10.5, (1, NSAMPLE))).T
r_data = np.float32(np.random.normal(size=(NSAMPLE,1))) # random noise
x_data = np.float32(np.sin(0.75*y_data)*7.0+y_data*0.5+r_data*1.0)
plt.figure(figsize=(8, 8))
plt.plot(x_data,y_data,'ro', alpha=0.3)
plt.show()
在这个任务中我们不能简单地使用最小平方误差L2丢失函数,因为输出是概率分布的完整描述。更合适的是能使分布与训练数据的可能性进行对数最小化的损失函数:
因此,对于
训练数据集中的每个点,我们都可以根据预测分布与实际点来计算成本函数,然后尝试最小化所有成本总和。对那些熟悉逻辑回归和softmax熵最小化的人来说,这是一个类似的方法,但是又没有离散的状态。
我们将在下面的TensorFlow操作中实现这个成本函数:
oneDivSqrtTwoPI = 1 / math.sqrt(2*math.pi) # normalisation factor for gaussian, not needed.
def tf_normal(y, mu, sigma):
result = tf.sub(y, mu)
result = tf.mul(result,tf.inv(sigma))
result = -tf.square(result)/2
return tf.mul(tf.exp(result),tf.inv(sigma))*oneDivSqrtTwoPI
def get_lossfunc(out_pi, out_sigma, out_mu, y):
result = tf_normal(y, out_mu, out_sigma)
result = tf.mul(result, out_pi)
result = tf.reduce_sum(result, 1, keep_dims=True)
result = -tf.log(result)
return tf.reduce_mean(result)
lossfunc = get_lossfunc(out_pi, out_sigma, out_mu, y)
train_op = tf.train.AdamOptimizer().minimize(lossfunc)
我们将在下面训练模型,同时收集损失函数在训练迭代中的进度。TensorFlow提供了一些可视化训练数据进度的有用的工具,但我们并没有在这里使用它们。
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
NEPOCH = 10000
loss = np.zeros(NEPOCH) # store the training progress here.
for i in range(NEPOCH):
sess.run(train_op,feed_dict={x: x_data, y: y_data})
loss[i] = sess.run(lossfunc, feed_dict={x: x_data, y: y_data})
作为一个方面的说明,令人印象深刻的是,TensorFlow会自动计算对数似然成本函数的梯度,并在优化中应用这些梯度。对于这个问题,实际上有非常优化的梯度公式(参见Bishop的原始论文中的推导,方程33-39),我非常怀疑TensorFlow梯度公式自动计算的优化性和优雅性,所以通过在TensorFlow中建立一个自定义运算符,为这个损失函数预先优化梯度公式,就像他们为交叉熵损失函数所做的一样。我已经在优化闭式梯度公式之前实现了所有的数字梯度测试 - 如果你想实现它,请确保你做了梯度测试!第一次很难得到正确的答案。
让我们绘制训练的进度,看看迭代中成本是否下降:
plt.figure(figsize=(8, 8))
plt.plot(np.arange(100, NEPOCH,1), loss[100:], 'r-')
plt.show()
我们发现,经过6000多次迭代后,它或多或少地停止了改进。接下来我们要做的是让模型为我们产生分布,例如沿着x轴的一堆点,然后对于每个分布,从该分布中随机抽取10个点,将所生成的集合数据映射到y轴上。这让我们能感知生成的pdf是否与训练数据相匹配。
为了对混合高斯分布进行采样,我们随机选择基于
概率集合的分布,然后基于
高斯分布绘制点 。
x_test = np.float32(np.arange(-15,15,0.1))
NTEST = x_test.size
x_test = x_test.reshape(NTEST,1) # needs to be a matrix, not a vector
def get_pi_idx(x, pdf):
N = pdf.size
accumulate = 0
for i in range(0, N):
accumulate += pdf[i]
if (accumulate >= x):
return i
print 'error with sampling ensemble'
return -1
def generate_ensemble(out_pi, out_mu, out_sigma, M = 10):
NTEST = x_test.size
result = np.random.rand(NTEST, M) # initially random [0, 1]
rn = np.random.randn(NTEST, M) # normal random matrix (0.0, 1.0)
mu = 0
std = 0
idx = 0
# transforms result into random ensembles
for j in range(0, M):
for i in range(0, NTEST):
idx = get_pi_idx(result[i, j], out_pi[i])
mu = out_mu[i, idx]
std = out_sigma[i, idx]
result[i, j] = mu + rn[i, j]*std
return result
我们来看看生成的数据如何:
out_pi_test, out_sigma_test, out_mu_test = sess.run(get_mixture_coef(output), feed_dict={x: x_test})
y_test = generate_ensemble(out_pi_test, out_mu_test, out_sigma_test)
plt.figure(figsize=(8, 8))
plt.plot(x_data,y_data,'ro', x_test,y_test,'bo',alpha=0.3)
plt.show()
在上面的图中,我们绘制出我们从MDN分布中生成的数据,用蓝色标出,并将其绘制在红色原始的训练数据上。除了少数数据外,这些分布似乎与数据相匹配。我们也可以绘制一张图,我们可以看到神经网络在做什么。对于x轴上的每一个点,都可能有多个线或者状态,我们选择这些状态的概率定义为
plt.figure(figsize=(8, 8))
plt.plot(x_test,out_mu_test,'go', x_test,y_test,'bo',alpha=0.3)
plt.show()
x_heatmap_label = np.float32(np.arange(-15,15,0.1))
y_heatmap_label = np.float32(np.arange(-15,15,0.1))
def custom_gaussian(x, mu, std):
x_norm = (x-mu)/std
result = oneDivSqrtTwoPI*math.exp(-x_norm*x_norm/2)/std
return result
def generate_heatmap(out_pi, out_mu, out_sigma, x_heatmap_label, y_heatmap_label):
N = x_heatmap_label.size
M = y_heatmap_label.size
K = KMIX
z = np.zeros((N, M)) # initially random [0, 1]
mu = 0
std = 0
pi = 0
# transforms result into random ensembles
for k in range(0, K):
for i in range(0, M):
pi = out_pi[i, k]
mu = out_mu[i, k]
std = out_sigma[i, k]
for j in range(0, N):
z[N-j-1, i] += pi * custom_gaussian(y_heatmap_label[j], mu, std)
return z
def draw_heatmap(xedges, yedges, heatmap):
extent = [xedges[0], xedges[-1], yedges[0], yedges[-1]]
plt.figure(figsize=(8, 8))
plt.imshow(heatmap, extent=extent)
plt.show()
z = generate_heatmap(out_pi_test, out_mu_test, out_sigma_test, x_heatmap_label, y_heatmap_label)
draw_heatmap(x_heatmap_label, y_heatmap_label, z)
sess.close()
这篇文章的完整的IPython笔记可以在这里找到