import mxnet as mx
num_layers = 2
num_hidden = 256
stack = mx.rnn.SequentialRNNCell()
for i in range(num_layers):
stack.add(mx.rnn.LSTMCell(num_hidden=num_hidden, prefix='lstm_l%d_'%i))
mx.rnn.SequentialRNNCell()
:RNN容器,用于组合多个RNN层mx.rnn.LSTMCell(num_hidden=num_hidden, prefix='lstm_l%d_'%i)
:LSTM单元num_embed = 256
def sym_gen(seq_len):
data = mx.sym.Variable('data')
label = mx.sym.Variable('softmax_label')
embed = mx.sym.Embedding(data=data, input_dim=1000,output_dim=num_embed, name='embed')
# 数据生成,定义Variable并进行词向量化
stack.reset()
outputs, states = stack.unroll(seq_len, inputs=embed, merge_outputs=True)
# 按时间展开输出和状态
pred = mx.sym.Reshape(outputs, shape=(-1, num_hidden))
pred = mx.sym.FullyConnected(data=pred, num_hidden=1000, name='pred')
# 变换输出形式,将输出变为(-1,num_hidden)尺寸
label = mx.sym.Reshape(label, shape=(-1,))
pred = mx.sym.SoftmaxOutput(data=pred, label=label, name='softmax')
# 展平label,并计算代价函数
return pred, ('data',), ('softmax_label',)
sym_gen(1)
(<Symbol softmax>, ('data',), ('softmax_label',))
unroll()
函数按时间展开RNN单元,输出最终的运算结果step_input = mx.symbol.Variable('step_data')
# First we embed our raw input data to be used as LSTM's input.
embedded_step = mx.symbol.Embedding(data=step_input, \
input_dim=50, \
output_dim=50)
# print(embedded_step.shape)
mx.viz.plot_network(symbol=embedded_step)
# Then we create an LSTM cell.
output_7_0.png
Embedding
是一种词向量化技术,这种技术可以保持语义(例如相近语义的词的向量距离会较近),将尺寸为(d0,d1...dn)的输入向量进行词向量化技术后转换为尺寸为(d0,d1,...,dn,out_dim)的向量,多出的一维为词向量,即使用一个向量代替原来一个词的位置。
vocabulary_size = 26
embed_dim = 16
seq_len, batch_size = (10, 64)
input = mx.sym.Variable('letters')
op = mx.sym.Embedding(data=input, input_dim=vocabulary_size, output_dim=embed_dim,name='embed')
op.infer_shape(letters=(seq_len, batch_size))
([(10, 64), (26, 16)], [(10, 64, 16)], [])
上文的例子可以看出输入向量尺寸为(10,64),输出向量尺寸变为了(10,64,16)
使用了隐层为50的LSTM单元,并带入转换好的数据,该图绘制出的lstm图较经典LSTM有一些出入
lstm_cell = mx.rnn.LSTMCell(num_hidden=50)
begin_state = lstm_cell.begin_state()
output, states = lstm_cell(embedded_step, begin_state)
mx.viz.plot_network(symbol=output)
output_11_0.png
LSTM的源码的构造函数如下:
def __init__(self, num_hidden, prefix='lstm_', params=None, forget_bias=1.0):
super(LSTMCell, self).__init__(prefix=prefix, params=params)
self._num_hidden = num_hidden
self._iW = self.params.get('i2h_weight')
self._hW = self.params.get('h2h_weight')
# we add the forget_bias to i2h_bias, this adds the bias to the forget gate activation
self._iB = self.params.get('i2h_bias', init=init.LSTMBias(forget_bias=forget_bias))
self._hB = self.params.get('h2h_bias')
其中:self.params.get()
方法为尝试找到传入名称对应的Variable,若找不到则新建,因此该LSTM单元一共仅有两对参数:iW和iB,hW和hB
前向传播函数如下:
def __call__(self, inputs, states):
self._counter += 1
name = '%st%d_'%(self._prefix, self._counter)
i2h = symbol.FullyConnected(data=inputs, weight=self._iW, bias=self._iB,
num_hidden=self._num_hidden*4,
name='%si2h'%name)
h2h = symbol.FullyConnected(data=states[0], weight=self._hW, bias=self._hB,
num_hidden=self._num_hidden*4,
name='%sh2h'%name)
gates = i2h + h2h
slice_gates = symbol.SliceChannel(gates, num_outputs=4,name="%sslice"%name)
in_gate = symbol.Activation(slice_gates[0], act_type="sigmoid",name='%si'%name)
forget_gate = symbol.Activation(slice_gates[1], act_type="sigmoid",name='%sf'%name)
in_transform = symbol.Activation(slice_gates[2], act_type="tanh",name='%sc'%name)
out_gate = symbol.Activation(slice_gates[3], act_type="sigmoid",name='%so'%name)
next_c = symbol._internal._plus(forget_gate * states[1], in_gate * in_transform,name='%sstate'%name)
next_h = symbol._internal._mul(out_gate, symbol.Activation(next_c, act_type="tanh"),name='%sout'%name)
return next_h, [next_h, next_c]
可以看出,LSTM的实现过程如下所示
sequence_length = 10
input_dim = 10
seq_input = mx.symbol.Variable('seq_data')
embedded_seq = mx.symbol.Embedding(data=seq_input, \
input_dim=input_dim, \
output_dim=embed_dim)
outputs, states = lstm_cell.unroll(length=sequence_length, \
inputs=embedded_seq, \
layout='NTC', \
merge_outputs=True)
使用unroll
方法按时间展平运算,输入数据为(batch_size,lenght,...)(layout="NTC)或(lenght,batch,...)(layout="TNC)
该函数的源码为:
def unroll(self, length, inputs, begin_state=None, layout='NTC', merge_outputs=None):
self.reset()
inputs, _ = _normalize_sequence(length, inputs, layout, False)
if begin_state is None:
begin_state = self.begin_state()
states = begin_state
outputs = []
for i in range(length):
output, states = self(inputs[i], states)
outputs.append(output)
outputs, _ = _normalize_sequence(length, outputs, layout, merge_outputs)
return outputs, states
方法_normalize_sequence
是对输入做一些处理,由一个for循环可以看出该方法循环了网络运算