前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >keras_bert文本多分类

keras_bert文本多分类

原创
作者头像
用户1750490
修改2019-12-17 18:34:15
1.3K0
修改2019-12-17 18:34:15
举报
文章被收录于专栏:钛问题钛问题
代码语言:javascript
复制
#! -*- coding:utf-8 -*-

import codecs
import os

import keras
import pandas as pd
from keras.callbacks import EarlyStopping
from keras.layers import *
from keras.models import Model
from keras.optimizers import Adam
from keras_bert import load_trained_model_from_checkpoint, Tokenizer
import numpy as np

max_len = 128
config_path = r'wwm/bert_config.json'
checkpoint_path = 'wwm/bert_model.ckpt'
dict_path = 'wwm/vocab.txt'
label_id = {}
token_dict = {}

with codecs.open(dict_path, 'r', 'utf8') as reader:
    for line in reader:
        token = line.strip()
        token_dict[token] = len(token_dict)


class OurTokenizer(Tokenizer):
    def _tokenize(self, text):
        R = []
        for c in text:
            if c in self._token_dict:
                R.append(c)
            elif self._is_space(c):
                R.append('[unused1]')  # space类用未经训练的[unused1]表示
            else:
                R.append('[UNK]')  # 剩余的字符是[UNK]
        return R


tokenizer = OurTokenizer(token_dict)
train_data = pd.read_csv("train_file.csv").values.tolist()
data = []
a = 0
label_id = {}
label = 0
for data_message in train_data:
    if data_message[1] not in label_id.keys():
        label_id[data_message[1]] = label
        one_label = label
        label = label + 1
    else:
        one_label = label_id[data_message[1]]
    if isinstance(data_message[0], str):
        data.append([data_message[0], one_label])
print("label", label)
# 按照9:1的比例划分训练集和验证集
random_order = list(range(len(data)))
np.random.shuffle(random_order)
train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0 and i % 10 != 1]
valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]
test_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 1]


def seq_padding(X, padding=0):
    L = [len(x) for x in X]
    ML = max(L)
    return np.array([
        np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
    ])


class data_generator:
    def __init__(self, data, batch_size=64):
        self.data = data
        self.batch_size = batch_size
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1

    def __len__(self):
        return self.steps

    def __iter__(self):
        while True:
            idxs = list(range(len(self.data)))
            np.random.shuffle(idxs)
            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:max_len]
                x1, x2 = tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
                if len(X1) == self.batch_size or i == idxs[-1]:
                    X1 = seq_padding(X1)
                    X2 = seq_padding(X2)
                    Y = seq_padding(Y)
                    yield [X1, X2], Y
                    [X1, X2, Y] = [], [], []


bert_model = load_trained_model_from_checkpoint(config_path,
                                                checkpoint_path,
#                                                 training=True,
                                                seq_len=64)
for l in bert_model.layers:
    l.trainable = True

x1_in = Input(shape=(None,))
x2_in = Input(shape=(None,))

x = bert_model([x1_in, x2_in])
x = Lambda(lambda x: x[:, 0])(x)
p = Dense(label+1, activation='softmax')(x)

model = Model([x1_in, x2_in], p)

model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=Adam(1e-5),  # 用足够小的学习率
    metrics=['accuracy']
)
model.summary()

train_D = data_generator(train_data)
valid_D = data_generator(valid_data)
early_stopping = EarlyStopping(
    monitor='val_loss',
    min_delta=0,
    patience=8,
    verbose=1,
    mode='auto'
)
tb_cb = keras.callbacks.TensorBoard(log_dir="log_dir", write_images=1, histogram_freq=1)
model.fit_generator(
    train_D.__iter__(),
    steps_per_epoch=len(train_D),
    epochs=5000,
    validation_data=valid_D.__iter__(),
    validation_steps=1000,
    callbacks=[early_stopping, tb_cb]
)

model.save("bert_base_model")


def predict(model, test_data):
    """
    预测
    :param test_data:
    :return:
    """
    X1, X2, Y = [], [], []
    for s in test_data:
        x1, x2 = tokenizer.encode(first=s[0][:max_len])
        X1.append(x1)
        X2.append(x2)
        Y.append(s[1])
    X1 = seq_padding(X1)
    X2 = seq_padding(X2)
    Y = seq_padding(Y)
    model.evaluate([X1, X2], Y)


test_data = pd.read_csv(os.path.join('data/valid.data.v3'), sep='\t', encoding='utf-8')
predict_test = []
for i, j in zip(test_data['question1'], test_data['question2']):
    if i is not None and j is not None:
        predict_test.append([str(i), str(j)])
evaluate(model, predict_test)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档