image.png
image.png RNN维持一个state,保证信息传播
#加载数据
imdb = keras.datasets.imdb
#定义超参数
vocab_size = 10000
index_from = 3
max_length = 500
embedding_dim = 16
batch_size = 128
(train_data, train_labels), (test_data, test_labels) = imdb.load_data(
num_words = vocab_size, index_from = index_from)
word_index = {k:(v+3) for k, v in word_index.items()}
word_index['<PAD>'] = 0
word_index['<START>'] = 1
word_index['<UNK>'] = 2
word_index['<END>'] = 3
#id到词的映射
reverse_word_index = dict(
[(value, key) for key, value in word_index.items()])
def decode_review(text_ids):
return ' '.join(
[reverse_word_index.get(word_id, "<UNK>") for word_id in text_ids])
#padding
train_data = keras.preprocessing.sequence.pad_sequences(
train_data, # list of list
value = word_index['<PAD>'],
padding = 'post', # post, pre
maxlen = max_length)
test_data = keras.preprocessing.sequence.pad_sequences(
test_data, # list of list
value = word_index['<PAD>'],
padding = 'post', # post, pre
maxlen = max_length)
model = keras.models.Sequential([
# 1. define matrix: [vocab_size, embedding_dim]
# 2. [1,2,3,4..], max_length * embedding_dim
# 3. batch_size * max_length * embedding_dim
keras.layers.Embedding(vocab_size, embedding_dim,
input_length = max_length),
# batch_size * max_length * embedding_dim
# -> batch_size * embedding_dim
keras.layers.GlobalAveragePooling1D(),
keras.layers.Dense(64, activation = 'relu'),
keras.layers.Dense(1, activation = 'sigmoid'),
])
model.summary()
model.compile(optimizer = 'adam', loss = 'binary_crossentropy',
metrics = ['accuracy'])
#没有验证集,用validation_split代替,处理的全数据,在这里设置batchsize也可以
history = model.fit(train_data, train_labels,
epochs = 5,
batch_size = batch_size,
validation_split = 0.2)
image.png
image.png 代码RNN:
model = keras.models.Sequential([
# 1. define matrix: [vocab_size, embedding_dim]
# 2. [1,2,3,4..], max_length * embedding_dim
# 3. batch_size * max_length * embedding_dim
keras.layers.Embedding(vocab_size, embedding_dim,
input_length = max_length),
keras.layers.SimpleRNN(units = 64, return_sequences = False),
keras.layers.Dense(64, activation = 'relu'),
keras.layers.Dense(1, activation='sigmoid'),
])
代码双向RNN
bi_rnn_model = keras.models.Sequential([
# 1. define matrix: [vocab_size, embedding_dim]
# 2. [1,2,3,4..], max_length * embedding_dim
# 3. batch_size * max_length * embedding_dim
keras.layers.Embedding(vocab_size, embedding_dim,
input_length = max_length),
keras.layers.Bidirectional(
keras.layers.SimpleRNN(
units = 32, return_sequences = False)),
keras.layers.Dense(32, activation = 'relu'),
keras.layers.Dense(1, activation='sigmoid'),
])
# coding=utf-8
'''
用RNN来做文本生成;训练集(abcde拆分成 abcd-->bcde)
关于lstm的调参;statful=True;当前批次的状态结束是否保留到下一个状态;recurrent_initializer = 'glorot_uniform'初始化方法Glorot均匀分布初始化方法,又成Xavier均匀初始化,参数从[-limit, limit]的均匀分布产生,其中
'''
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras
print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
print(module.__name__, module.__version__)
#第一步:导入数据集
input_filepath = "./data/shakespeare.txt"
text = open(input_filepath,"r").read()
#第二部:数据处理
vocab = sorted(set(text))
char2id={char:idx for idx,char in enumerate(vocab)}
id2char=np.array(vocab)
text2int=[char2id[char] for char in text]
#第三步:数据集构建
def split_input_target(id_text):
'''
:param id_text: abcde -->abcd,bcde
:return:
'''
return id_text[:-1],id_text[1:]
seq_length=100
char_dataset = tf.data.Dataset.from_tensor_slices(text2int).batch(seq_length+1,drop_remainder=True)
seq_dataset =char_dataset.map(split_input_target)
batch_size = 64
buffer_size = 10000
seq_dataset=seq_dataset.shuffle(buffer_size).batch(batch_size,drop_remainder=True)#首先,Dataset会取所有数据的前buffer_size数据项,填充 buffer
#模型构建
vocab_size = len(vocab)
embedding_dim = 256
rnn_units = 512
def build_model(vocab_size,embedding_dim,rnn_units,batch_size):
model = keras.Sequential([
keras.layers.Embedding(vocab_size,embedding_dim,batch_input_shape=[batch_size,None]),
keras.layers.SimpleRNN(units=rnn_units,return_sequences=True),
keras.layers.Dense(vocab_size),
])
return model
model=build_model(vocab_size,embedding_dim,rnn_units,batch_size)
#检查一下模型
for input_ex_batch,target_ex_batch in seq_dataset.take(1):
example_batch_predict= model(input_ex_batch)
print(example_batch_predict.shape)
sample_indices = tf.random.categorical(
logits = example_batch_predict[0], num_samples = 1) #num_samples取几个样本;随机采样
# print(sample_indices)
# (100, 65) -> (100, 1)
sample_indices = tf.squeeze(sample_indices, axis = -1) #去掉最后一个维度
# print(sample_indices)
print("Predictions: ", repr("".join(id2char[sample_indices])))
def loss(labels, logits):
return keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
model.compile(optimizer = 'adam', loss = loss)
example_loss = loss(target_ex_batch, example_batch_predict)
print(example_loss.shape)
print(example_loss.numpy().mean())
#模型训练
output_dir = "./text_generation_checkpoints"
if not os.path.exists(output_dir):
os.mkdir(output_dir)
checkpoint_prefix = os.path.join(output_dir, 'ckpt_{epoch}')
checkpoint_callback = keras.callbacks.ModelCheckpoint(
filepath = checkpoint_prefix,
save_weights_only = True)
epochs = 100
history = model.fit(seq_dataset, epochs = epochs,
callbacks = [checkpoint_callback])