前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于PaddlePaddle2.0验证码端到端的识别

基于PaddlePaddle2.0验证码端到端的识别

作者头像
夜雨飘零
发布2021-12-07 14:52:54
2820
发布2021-12-07 14:52:54
举报
文章被收录于专栏:CSDN博客CSDN博客

验证码端到端的识别,是对《我的PaddlePaddle学习之路》笔记六——验证码端到端的识别 的升级,这篇文章是我18年初写的,基于当时的V2版本编写,现在有点过时了,突然想升级一下。

在线运行

在线运行:https://aistudio.baidu.com/aistudio/projectdetail/1679868

创建数据列表和词汇表

数据列表是为了方便训练是读取数据的。

代码语言:javascript
复制
import os
import cv2

def createDataList(data_path, list_path):
    # 读取所有的图像路径
    imgs = os.listdir(data_path)
    with open(list_path, 'w', encoding='utf-8') as f:
        for img in imgs:
           name = img.split('.')[0]
           image_path = os.path.join(data_path, img)
           # 写入图像路径和label,用Tab隔开
           f.write(image_path + '\t' + name + '\n')

createDataList('dataset/train_data/', 'dataset/train.txt')
createDataList('dataset/test_data/', 'dataset/test.txt')

还缺词汇表,执行下面的代码。

代码语言:javascript
复制
with open('dataset/train.txt', 'r', encoding='utf-8') as f:
    lines = f.readlines()
v = set()
for line in lines:
    _, label = line.replace('\n', '').split('\t')
    for c in label:
        v.add(c)

vocabulary_path = 'dataset/vocabulary.txt'
with open(vocabulary_path, 'w', encoding='utf-8') as f:
    for c in v:
        f.write(c + '\n')

解码器

这个好似贪心解码方法,用于解码预测的输出的结果,将PaddlePaddle输出的结果转换为字符串。这里还提供了数据标签转字符串的,和计算字错率的。

代码语言:javascript
复制
%%writefile decoder.py

import Levenshtein as Lev
from itertools import groupby
import paddle


def ctc_greedy_decoder(probs_seq, vocabulary):
    """CTC贪婪(最佳路径)解码器。
    由最可能的令牌组成的路径被进一步后处理
    删除连续的重复和所有的空白。
    :param probs_seq: 每个词汇表上概率的二维列表字符。
                      每个元素都是浮点概率列表为一个字符。
    :type probs_seq: list
    :param vocabulary: 词汇表
    :type vocabulary: list
    :return: 解码结果字符串
    :rtype: baseline
    """
    # 尺寸验证
    for probs in probs_seq:
        if not len(probs) == len(vocabulary) + 1:
            raise ValueError("probs_seq 尺寸与词汇不匹配")
    # argmax以获得每个时间步长的最佳指标
    max_index_list = paddle.argmax(probs_seq, -1).numpy()
    # 删除连续的重复索引
    index_list = [index_group[0] for index_group in groupby(max_index_list)]
    # 删除空白索引
    blank_index = len(vocabulary)
    index_list = [index for index in index_list if index != blank_index]
    # 将索引列表转换为字符串
    return ''.join([vocabulary[index] for index in index_list])[:4]


def label_to_string(label, vocabulary):
    """标签转文字

    :param label: 结果的标签,或者数据集的标签
    :type label: list
    :param vocabulary: 词汇表
    :type vocabulary: list
    :return: 解码结果字符串
    :rtype: baseline
    """
    return ''.join([vocabulary[index] for index in label])


def cer(out_string, target_string):
    """通过计算两个字符串的距离,得出字错率

    Arguments:
        out_string (string): 比较的字符串
        target_string (string): 比较的字符串
    """
    s1, s2, = out_string.replace(" ", ""), target_string.replace(" ", "")
    return Lev.distance(s1, s2)

数据读取器

这个是用于训练时读取数据的,用数据列表中读取图片和标签,将图片进行预处理,字符串标签转换为整型的标签输入的网络模型中。

代码语言:javascript
复制
%%writefile data.py

import cv2
import numpy as np
from paddle.io import Dataset


# 图像预处理
def process(path):
    image = cv2.imread(path)
    # 转灰度图
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    # 统一缩放大小
    image = cv2.resize(image, (72, 27))
    # 转换成CHW
    image = image[np.newaxis, :]
    # 归一化
    image = (image - 128) / 128
    return image


# 数据加载器
class CustomDataset(Dataset):
    def __init__(self, data_list_path, voc_path):
        super(CustomDataset, self).__init__()
        with open(data_list_path, 'r', encoding='utf-8') as f:
            self.lines = f.readlines()
        with open(voc_path, 'r', encoding='utf-8') as f:
            labels = f.readlines()
        self.vocabulary = [labels[i].replace('\n', '') for i in range(len(labels))]
        self.vocabulary_dict = dict([(labels[i].replace('\n', ''), i) for i in range(len(labels))])

    def __getitem__(self, idx):
        path, label = self.lines[idx].replace('\n', '').split('\t')
        img = process(path)
        # 将字符标签转换为int数据
        transcript = [self.vocabulary_dict.get(x) for x in label]
        img = np.array(img, dtype='float32')
        transcript = np.array(transcript, dtype='int32')
        return img, transcript

    def __len__(self):
        return len(self.lines)

模型结构

这个模型类型CRNN,前面使用卷积层提前图像特征,后面用一个GRU,他是LSTM的变种,最后的全连接层,输出的大小为词汇表+1,因为还有一个空格字符,这个是CTC需要的。

代码语言:javascript
复制
%%writefile model.py

import paddle
import paddle.nn as nn


class Model(nn.Layer):
    def __init__(self, vocabulary):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2D(in_channels=1, out_channels=32, kernel_size=3)
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2D(32)
        self.pool1 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv2 = nn.Conv2D(in_channels=32, out_channels=64, kernel_size=3)
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2D(64)
        self.pool2 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv3 = nn.Conv2D(in_channels=64, out_channels=128, kernel_size=3)
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2D(128)
        self.pool3 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv4 = nn.Conv2D(in_channels=128, out_channels=256, kernel_size=3)
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2D(256)
        self.pool4 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv5 = nn.Conv2D(in_channels=256, out_channels=256, kernel_size=3)
        self.relu5 = nn.ReLU()
        self.bn5 = nn.BatchNorm2D(256)
        self.pool5 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv6 = nn.Conv2D(in_channels=256, out_channels=256, kernel_size=3)
        self.relu6 = nn.ReLU()
        self.bn6 = nn.BatchNorm2D(256)
        self.pool6 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv7 = nn.Conv2D(in_channels=256, out_channels=256, kernel_size=3)
        self.relu7 = nn.ReLU()
        self.bn7 = nn.BatchNorm2D(256)
        self.pool7 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.fc = nn.Linear(in_features=306, out_features=128)

        self.gru = nn.GRU(input_size=256, hidden_size=128)

        self.output = nn.Linear(in_features=128, out_features=len(vocabulary) + 1)

    def forward(self, x):
        x = self.relu1(self.bn1(self.conv1(x)))
        x = self.pool1(x)
        x = self.relu2(self.bn2(self.conv2(x)))
        x = self.pool2(x)
        x = self.relu3(self.bn3(self.conv3(x)))
        x = self.pool3(x)
        x = self.relu4(self.bn4(self.conv4(x)))
        x = self.pool4(x)
        x = self.relu5(self.bn5(self.conv5(x)))
        x = self.pool5(x)
        x = self.relu6(self.bn6(self.conv6(x)))
        x = self.pool6(x)
        x = self.relu7(self.bn7(self.conv7(x)))
        x = self.pool7(x)
        x = paddle.reshape(x, shape=(x.shape[0], x.shape[1], -1))
        x = self.fc(x)
        x = paddle.transpose(x, perm=[0, 2, 1])
        y, h = self.gru(x)
        x = self.output(y)
        return x

训练

这就开始训练了。三两下就可以训练完,主要是数据量少。每十轮训练结束都执行一次评估,输出的是字错率。保存的模型是静态模型,方便预测。

代码语言:javascript
复制
import paddle
import numpy as np
import os
from datetime import datetime
from model import Model
from decoder import ctc_greedy_decoder, label_to_string, cer
from paddle.io import DataLoader
from data import CustomDataset
from visualdl import LogWriter
from paddle.static import InputSpec

train_data_list_path = 'dataset/train.txt'
test_data_list_path = 'dataset/test.txt'
voc_path = 'dataset/vocabulary.txt'
save_model = 'models/model'
batch_size = 32
pretrained_model = None
num_epoch = 100
learning_rate = 1e-3
writer = LogWriter(logdir='log')


def train():
    # 获取训练数据
    train_dataset = CustomDataset(train_data_list_path, voc_path)
    train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
    # 获取测试数据
    test_dataset = CustomDataset(test_data_list_path, voc_path)
    test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size)
    # 获取模型
    model = Model(train_dataset.vocabulary)
    paddle.summary(model, input_size=(batch_size, 1, 27, 72))
    # 设置优化方法
    boundaries = [10, 20, 50]
    lr = [0.1 ** l * learning_rate for l in range(len(boundaries) + 1)]
    scheduler = paddle.optimizer.lr.PiecewiseDecay(boundaries=boundaries, values=lr, verbose=False)
    optimizer = paddle.optimizer.Adam(parameters=model.parameters(), learning_rate=scheduler)
    # 获取损失函数
    ctc_loss = paddle.nn.CTCLoss(blank=len(train_dataset.vocabulary))
    # 加载预训练模型
    if pretrained_model is not None:
        model.set_state_dict(paddle.load(os.path.join(pretrained_model, 'model.pdparams')))
        optimizer.set_state_dict(paddle.load(os.path.join(pretrained_model, 'optimizer.pdopt')))
    train_step = 0
    test_step = 0
    # 开始训练
    for epoch in range(num_epoch):
        for batch_id, (inputs, labels) in enumerate(train_loader()):
            out = model(inputs)
            out = paddle.transpose(out, perm=[1, 0, 2])
            input_lengths = paddle.full(shape=[out.shape[1]], fill_value=out.shape[0], dtype="int64")
            label_lengths = paddle.full(shape=[out.shape[1]], fill_value=4, dtype="int64")
            # 计算损失
            loss = ctc_loss(out, labels, input_lengths, label_lengths)
            loss.backward()
            optimizer.step()
            optimizer.clear_grad()
            # 多卡训练只使用一个进程打印
            if batch_id % 100 == 0:
                print('[%s] Train epoch %d, batch %d, loss: %f' % (datetime.now(), epoch, batch_id, loss))
                writer.add_scalar('Train loss', loss, train_step)
                train_step += 1
        if (epoch % 10 == 0 and epoch != 0) or epoch == num_epoch:
            # 执行评估
            model.eval()
            cer = evaluate(model, test_loader, train_dataset.vocabulary)
            print('[%s] Test epoch %d, cer: %f' % (datetime.now(), epoch, cer))
            writer.add_scalar('Test cer', cer, test_step)
            test_step += 1
            model.train()
        # 记录学习率
        writer.add_scalar('Learning rate', scheduler.last_lr, epoch)
        scheduler.step()
        # 保存模型
        paddle.jit.save(layer=model, path=save_model, input_spec=[InputSpec(shape=[None, 1, 27, 72], dtype='float32')])


# 评估模型
def evaluate(model, test_loader, vocabulary):
    cer_result = []
    for batch_id, (inputs, labels) in enumerate(test_loader()):
        # 执行识别
        outs = model(inputs)
        outs = paddle.nn.functional.softmax(outs)
        # 解码获取识别结果
        labelss = []
        out_strings = []
        for out in outs:
            out_string = ctc_greedy_decoder(out, vocabulary)
            out_strings.append(out_string)
        for label in labels:
            labels = label_to_string(label, vocabulary)
            labelss.append(labels)
        for out_string, label in zip(*(out_strings, labelss)):
            print(label, out_string)
            # 计算字错率
            c = cer(out_string, label) / float(len(label))
            cer_result.append(c)
    cer_result = float(np.mean(cer_result))
    return cer_result


train()

预测

使用训练好的模型识别验证码图片。

代码语言:javascript
复制
import numpy as np
import paddle

from data import process
from decoder import ctc_greedy_decoder


with open('dataset/vocabulary.txt', 'r', encoding='utf-8') as f:
    vocabulary = f.readlines()

vocabulary = [v.replace('\n', '') for v in vocabulary]

save_model = 'models/model'
model = paddle.jit.load(save_model)
model.eval()


def infer(path):
    data = process(path)
    data = data[np.newaxis, :]
    data = paddle.to_tensor(data, dtype='float32')
    # 执行识别
    out = model(data)
    out = paddle.nn.functional.softmax(out)[0]
    # 解码获取识别结果
    out_string = ctc_greedy_decoder(out, vocabulary)

    print('预测结果:%s' % out_string)


if __name__ == '__main__':
    image_path = 'dataset/test.png'
    infer(image_path)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-03-23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 在线运行
  • 创建数据列表和词汇表
  • 解码器
  • 数据读取器
  • 模型结构
  • 训练
  • 预测
相关产品与服务
验证码
腾讯云新一代行为验证码(Captcha),基于十道安全栅栏, 为网页、App、小程序开发者打造立体、全面的人机验证。最大程度保护注册登录、活动秒杀、点赞发帖、数据保护等各大场景下业务安全的同时,提供更精细化的用户体验。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档