作者 | Rohan Jagtap
编译 | VK
来源 | Towards Data Science
TPU(张量处理单元)是针对处理矩阵而专门优化的专用集成电路(ASIC)。
❝云TPU资源加速了线性代数计算的性能 ❞
Google Colab免费为TPUs提供实验支持!在本文中,我们将讨论如何在Colab上使用TPU训练模型。具体来说,我们将通过在TPU上训练huggingface transformers库里的BERT来进行文本分类。
第一件事:由于TPU针对某些特定操作进行了优化,我们需要检查我们的模型是否真的使用了它们;也就是说,我们需要检查TPU是否真的帮助我们的模型更快地训练。
以下是我们根据云TPU文档中提到的TPU的一些用例:
❝如果你的模型使用自定义的TensorFlow操作,而云TPU支持的TensorFlow操作不存在,那么你应该要使用GPU进行加速。 ❞
tpu在云端工作,不像gpu或cpu在本地工作。因此,在开始之前,我们需要进行一些初始化:
import tensorflow as tf
import os
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
"""
Prints: All devices: [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'),
LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'),
LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'),
LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'),
LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'),
LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'),
LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'),
LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU')]
"""
如果你观察上面代码片段的输出,我们的TPU集群有8个逻辑TPU设备(0-7)能够并行处理。因此,我们为这8种设备上的分布式训练定义了一种分配策略:
strategy = tf.distribute.TPUStrategy(resolver)
有关分布式训练的更多信息,请参阅:https://www.tensorflow.org/guide/distributed
在本节中,我们将实际了解如何在TPU上训练BERT。我们将通过两种方式实现:
由于我们使用的是分布策略,因此必须在每个设备上创建模型以共享参数。因此,需要在策略作用域内创建和构建模型:
def create_model():
bert = TFBertForSequenceClassification.from_pretrained('bert-large-cased')
inputs = tf.keras.layers.Input((None,), dtype=tf.int32)
mask = tf.keras.layers.Input((None,), dtype=tf.int32)
preds = bert(
inputs=inputs,
attention_mask=mask,
training=True
)[0]
return tf.keras.Model([inputs, mask], preds)
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam(learning_rate=3e-5, epsilon=1e-08)
loss_fun = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
model.compile(optimizer=optimizer, loss=loss_fun)
然后,我们简单地根据数据拟合模型,就像在常规训练环境中一样:
model.fit(
[train_input_ids, train_attention_mask],
y_train,
validation_data = ([val_input_ids, val_attention_mask], y_val),
validation_steps=val_input_ids.shape[0] // 16,
batch_size=16,
epochs=2,
)
要保存模型权重,请执行以下操作:
model.save_weights("checkpoint/tpu-model.h5")
在下一小节中,我们将讨论如何使用自定义训练循环来执行相同的操作。
在这里,我们需要手动调整TensorFlow做一些事情。
首先,我们使用 「tf.data」 API:
def get_dataset(batch_size, training=True):
if training:
inputs = tf.data.Dataset.from_tensor_slices((train_input_ids, train_attention_mask))
outputs = tf.data.Dataset.from_tensor_slices(y_train)
return tf.data.Dataset.zip((inputs, outputs)).batch(batch_size).repeat()
else:
inputs = tf.data.Dataset.from_tensor_slices((val_input_ids, val_attention_mask))
outputs = tf.data.Dataset.from_tensor_slices(y_val)
return tf.data.Dataset.zip((inputs, outputs)).batch(batch_size).repeat()
在上一部分中,我们不必担心这个问题的原因是TensorFlow自己处理这些事情;也就是说,当我们调用model.fit()时会自动处理. 同样,这一次,我们需要在TPU设备之间手动分配数据集:
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync
train_dataset = strategy.experimental_distribute_datasets_from_function(lambda _: get_dataset(per_replica_batch_size, training=True))
接下来,我们以与前面方法完全相同的方式创建和构建模型。或者,我们可以在策略范围中添加一些指标,用于损失和准确性的监控:
with strategy.scope():
train_loss = tf.keras.metrics.Mean('train_loss')
train_acc = tf.keras.metrics.SparseCategoricalAccuracy('train_acc')
现在是最重要的部分,即训练步骤功能。但首先,让我们为分布式数据集创建一个迭代器:
train_iterator = iter(train_dataset)
然后我们编写了train_step函数,并用@tf.function注解。我们使用strategy.run()执行训练步骤:
@tf.function
def train_step(iterator):
def step(inputs, label):
ids, mask = inputs
with tf.GradientTape() as tape:
preds = model([ids, mask])
loss = loss_fun(label, preds)
loss = tf.nn.compute_average_loss(loss, global_batch_size=16)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(label, preds)
strategy.run(step, args=(next(iterator)))
最后,我们用几个epoch运行这个train_step函数:
steps_per_epoch = train.shape[0] // 16
for epoch in range(5):
print('Epoch: {}/5'.format(epoch))
for step in range(steps_per_epoch):
train_step(train_iterator)
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2))
)
training_loss.reset_states()
training_accuracy.reset_states()
这次,让我们尝试使用检查点保存模型。现在有个问题。好吧,我们不能这样保存模型。
错误很明显,它说你不能在eager执行时访问本地文件系统,因为执行是被带到云端让TPU执行操作的。
因此,为了克服这个问题,我们需要将检查点保存在GCS存储桶中。你可以在此处创建免费层GCP帐户(https://cloud.google.com/free)。
首先,我们需要创建一个云存储桶。以下是官方文档中关于创建GCS存储桶的教程:https://cloud.google.com/storage/docs/creating-buckets
接下来,我们需要使用GCP凭据登录,并将GCP项目设置为活动配置:
from google.colab import auth
auth.authenticate_user()
!gcloud config set project <project-id>
❝gcloud config set仅在活动配置中设置指定的属性。 ❞
完成后,我们只需使用以下命令即可访问存储桶:
gs://<bucket-name>/<file-path>
现在保存看起来像这样:
checkpoint_path = "gs://colab-tpu-bucket/checkpoint/base"
ckpt = tf.train.Checkpoint(model=model, optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)
这一次,它将成功地将模型检查点保存到存储桶中!
在本文中,我们了解了为什么以及如何调整一个模型的原始代码,使之与TPU兼容。我们还讨论了何时和何时不使用TPU进行训练。
https://www.tensorflow.org/guide/tpu
https://towardsdatascience.com/10-tensorflow-tricks-every-ml-practitioner-must-know-96b860e53c1
https://medium.com/swlh/bert-pre-training-of-transformers-for-language-understanding-5214fba4a9af
https://huggingface.co/transformers/model_doc/bert.html
原文链接:https://towardsdatascience.com/how-to-colab-with-tpu-98e0b4230d9c