前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从矩阵链式求导的角度来深入理解BP算法(原理+代码)

从矩阵链式求导的角度来深入理解BP算法(原理+代码)

作者头像
Cyril-KI
发布2022-09-19 14:40:03
4650
发布2022-09-19 14:40:03
举报
文章被收录于专栏:KI的算法杂记

I. 前言

BP神经网络模型及其Gradient Descent的推导过程中有推导过BP算法,但只是简单用符号来表示求导关系,并没有详细介绍求导的具体步骤。

由于联邦学习的需要,所以在这里着重推导一下反向传播过程中的链式求导过程。

II. 网络结构

本次搭建的神经网络参考了另一篇博文:手写神经网络识别MNIST数据集,该神经网络由1个输入层、3个隐藏层以及1个输出层组成,激活函数全部采用Sigmoid函数。

III. 前向传播

神经网络各层间的运算关系,也就是前向传播过程如下所示:

其中:

代码实现

  • 初始化
代码语言:javascript
复制
class BP:
    def __init__(self):
        self.input = np.zeros((100, 32))  # 100 samples per round
        self.w1 = 2 * np.random.random((32, 20)) - 1  # limit to (-1, 1)
        self.z1 = 2 * np.random.random((100, 20)) - 1
        self.hidden_layer_1 = np.zeros((100, 20))
        self.w2 = 2 * np.random.random((20, 20)) - 1
        self.z2 = 2 * np.random.random((100, 20)) - 1
        self.hidden_layer_2 = np.zeros((100, 20))
        self.w3 = 2 * np.random.random((20, 20)) - 1
        self.z3 = 2 * np.random.random((100, 20)) - 1
        self.hidden_layer_3 = np.zeros((100, 20))
        self.w4 = 2 * np.random.random((20, 1)) - 1
        self.z4 = 2 * np.random.random((100, 1)) - 1
        self.output_layer = np.zeros((100, 1))
        self.loss = np.zeros((100, 1))
        self.learning_rate = 0.08
  • 前向传播
代码语言:javascript
复制
def forward_prop(self, data, label):  # label:100 X 1,data: 100 X 32
    self.input = data
    self.z1 = np.dot(self.input, self.w1)
    self.hidden_layer_1 = self.sigmoid(self.z1)
    self.z2 = np.dot(self.hidden_layer_1, self.w2)
    self.hidden_layer_2 = self.sigmoid(self.z2)
    self.z3 = np.dot(self.hidden_layer_2, self.w3)
    self.hidden_layer_3 = self.sigmoid(self.z3)
    self.z4 = np.dot(self.hidden_layer_3, self.w4)
    self.output_layer = self.sigmoid(self.z4)
    # loss
    self.loss = 1 / 2 * (label - self.output_layer) ** 2
    
    return self.output_layer

IV. 反向传播

相应代码为:

代码语言:javascript
复制
l_deri_out = self.output_layer - label
l_deri_z4 = l_deri_out * self.sigmoid_deri(self.output_layer)
l_deri_w4 = np.dot(self.hidden_layer_3.T, l_deri_z4)

那么同理,w3对 我们有:

相应代码为:

代码语言:javascript
复制
l_deri_h3 = np.dot(l_deri_z4, self.w4.T)
l_deri_z3 = l_deri_h3 * self.sigmoid_deri(self.hidden_layer_3)
l_deri_w3 = np.dot(self.hidden_layer_2.T, l_deri_z3)

w2有:

相应代码为:

代码语言:javascript
复制
l_deri_h2 = np.dot(l_deri_z3, self.w3.T)
l_deri_z2 = l_deri_h2 * self.sigmoid_deri(self.hidden_layer_2)
l_deri_w2 = np.dot(self.hidden_layer_1.T, l_deri_z2)

w1有:

相应代码为:

代码语言:javascript
复制
l_deri_h1 = np.dot(l_deri_z2, self.w2.T)
l_deri_z1 = l_deri_h1 * self.sigmoid_deri(self.hidden_layer_1)
l_deri_w1 = np.dot(self.input.T, l_deri_z1)

因此,反向传播的完整代码为:

代码语言:javascript
复制
def backward_prop(self, label):
    # w4
    l_deri_out = self.output_layer - label
    l_deri_z4 = l_deri_out * self.sigmoid_deri(self.output_layer)
    l_deri_w4 = np.dot(self.hidden_layer_3.T, l_deri_z4)
    # w3
    l_deri_h3 = np.dot(l_deri_z4, self.w4.T)
    l_deri_z3 = l_deri_h3 * self.sigmoid_deri(self.hidden_layer_3)
    l_deri_w3 = np.dot(self.hidden_layer_2.T, l_deri_z3)
    # w2
    l_deri_h2 = np.dot(l_deri_z3, self.w3.T)
    l_deri_z2 = l_deri_h2 * self.sigmoid_deri(self.hidden_layer_2)
    l_deri_w2 = np.dot(self.hidden_layer_1.T, l_deri_z2)
    # w1
    l_deri_h1 = np.dot(l_deri_z2, self.w2.T)
    l_deri_z1 = l_deri_h1 * self.sigmoid_deri(self.hidden_layer_1)
    l_deri_w1 = np.dot(self.input.T, l_deri_z1)
    # update
    self.w4 -= self.learning_rate * l_deri_w4
    self.w3 -= self.learning_rate * l_deri_w3
    self.w2 -= self.learning_rate * l_deri_w2
    self.w1 -= self.learning_rate * l_deri_w1

V. 实验

1 数据处理

本次实验拟对电力负荷进行预测:利用前24个小时的负荷和下一时刻的环境因素来预测下一时刻的负荷值。

代码语言:javascript
复制
def nn_seq():
    print('处理数据:')
    data = load_data()
    columns = data.columns
    load = data[columns[1]]
    load = load.tolist()
    data = data.values.tolist()
    X, Y = [], []
    for i in range(len(data) - 30):
        train_seq = []
        train_label = []
        for j in range(i, i + 24):
            train_seq.append(load[j])
        # 添加温度、湿度、气压等信息
        for c in range(2, 10):
            train_seq.append(data[i + 24][c])
        train_label.append(load[i + 24])
        X.append(train_seq)
        Y.append(train_label)

    X, Y = np.array(X), np.array(Y)
    train_x, train_y = X[0:int(len(X) * 0.7)], Y[0:int(len(Y) * 0.7)]
    test_x, test_y = X[int(len(X) * 0.7):len(X)], Y[int(len(Y) * 0.7):len(Y)]
    
    return train_x, train_y, test_x, test_y

2 训练

代码语言:javascript
复制
def train():
    nn = BP()
    print('training...')
    train_x, train_y, test_x, test_y = nn_seq()
    batch_size = 100
    epochs = 1000
    batch = int(len(train_x) / batch_size)
    for epoch in range(epochs):
        for i in range(batch):
            start = i * batch_size
            end = start + batch_size
            nn.forward_prop(train_x[start:end], train_y[start:end])
            nn.backward_prop(train_y[start:end])
        print('当前epoch:', epoch, ' error:', np.mean(nn.error))
    return nn

3 测试

代码语言:javascript
复制
def test():
    global MAX, MIN
    nn = train()
    train_x, train_y, test_x, test_y = nn_seq()
    pred = []
    batch = int(len(test_y) / 100)
    for i in range(batch):
        start = i * 100
        end = start + 100
        res = nn.forward_prop(test_x[start:end], test_y[start:end])
        res = res.tolist()
        res = list(chain.from_iterable(res))
        pred.extend(res)

    # 复原
    test_y = (MAX - MIN) * test_y + MIN
    pred = np.array(pred)
    pred = (MAX - MIN) * pred + MIN
    print('accuracy:', get_mape(test_y.flatten()[0:4900], pred))

    # plot
    x = [i for i in range(1, 151)]
    x_smooth = np.linspace(1, 150, 600)
    y_smooth = make_interp_spline(x, test_y[0:150])(x_smooth)
    plt.plot(x_smooth, y_smooth, c='green', marker='*', ms=1, alpha=0.75, label='true')

    y_smooth = make_interp_spline(x, pred[0:150])(x_smooth)
    plt.plot(x_smooth, y_smooth, c='red', marker='o', ms=1, alpha=0.75, label='pred')
    plt.grid(axis='y')
    plt.legend()
    plt.show()

4 结果

经过1000轮训练后,神经网络的MAPE为5.08%:

VI. 完整代码

代码语言:javascript
复制
# -*- coding: utf-8 -*-
"""
@Time : 2022/1/8 11:40
@Author :KI 
@File :federal_learning.py
@Motto:Hungry And Humble

"""
import pandas as pd
import numpy as np
import torch
import matplotlib.pyplot as plt
from scipy.interpolate import make_interp_spline
from itertools import chain

MAX, MIN = 0, 0


def load_data():
    global MAX, MIN
    df = pd.read_csv('data/anqiudata.csv', encoding='gbk')
    columns = df.columns
    df.fillna(df.mean(), inplace=True)
    for i in range(1, len(columns)):  # 归一化
        column = columns[i]
        df[column] = df[column].astype('float64')
        # 映射到-1~1区间
        Max = np.max(df[column])
        Min = np.min(df[column])
        if i == 1:
            MAX = Max
            MIN = Min
        df[column] = (df[column] - Min) / (Max - Min)

    return df


class BP:
    def __init__(self):
        self.input = np.zeros((100, 32))  # 100 samples per round
        self.w1 = 2 * np.random.random((32, 20)) - 1  # limit to (-1, 1)
        self.z1 = 2 * np.random.random((100, 20)) - 1
        self.hidden_layer_1 = np.zeros((100, 20))
        self.w2 = 2 * np.random.random((20, 20)) - 1
        self.z2 = 2 * np.random.random((100, 20)) - 1
        self.hidden_layer_2 = np.zeros((100, 20))
        self.w3 = 2 * np.random.random((20, 20)) - 1
        self.z3 = 2 * np.random.random((100, 20)) - 1
        self.hidden_layer_3 = np.zeros((100, 20))
        self.w4 = 2 * np.random.random((20, 1)) - 1
        self.z4 = 2 * np.random.random((100, 1)) - 1
        self.output_layer = np.zeros((100, 1))
        self.loss = np.zeros((100, 1))
        self.learning_rate = 0.08

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def sigmoid_deri(self, x):
        return x * (1 - x)

    def forward_prop(self, data, label):  # label:100 X 1,data: 100 X 32
        self.input = data
        self.z1 = np.dot(self.input, self.w1)
        self.hidden_layer_1 = self.sigmoid(self.z1)
        self.z2 = np.dot(self.hidden_layer_1, self.w2)
        self.hidden_layer_2 = self.sigmoid(self.z2)
        self.z3 = np.dot(self.hidden_layer_2, self.w3)
        self.hidden_layer_3 = self.sigmoid(self.z3)
        self.z4 = np.dot(self.hidden_layer_3, self.w4)
        self.output_layer = self.sigmoid(self.z4)
        # error
        self.loss = 1/2 * (label - self.output_layer) ** 2

        return self.output_layer

    def backward_prop(self, label):
        # w4
        l_deri_out = self.output_layer - label
        l_deri_z4 = l_deri_out * self.sigmoid_deri(self.output_layer)
        l_deri_w4 = np.dot(self.hidden_layer_3.T, l_deri_z4)
        # w3
        l_deri_h3 = np.dot(l_deri_z4, self.w4.T)
        l_deri_z3 = l_deri_h3 * self.sigmoid_deri(self.hidden_layer_3)
        l_deri_w3 = np.dot(self.hidden_layer_2.T, l_deri_z3)
        # w2
        l_deri_h2 = np.dot(l_deri_z3, self.w3.T)
        l_deri_z2 = l_deri_h2 * self.sigmoid_deri(self.hidden_layer_2)
        l_deri_w2 = np.dot(self.hidden_layer_1.T, l_deri_z2)
        # w1
        l_deri_h1 = np.dot(l_deri_z2, self.w2.T)
        l_deri_z1 = l_deri_h1 * self.sigmoid_deri(self.hidden_layer_1)
        l_deri_w1 = np.dot(self.input.T, l_deri_z1)
        # update
        self.w4 -= self.learning_rate * l_deri_w4
        self.w3 -= self.learning_rate * l_deri_w3
        self.w2 -= self.learning_rate * l_deri_w2
        self.w1 -= self.learning_rate * l_deri_w1


def nn_seq():
    print('处理数据:')
    data = load_data()
    columns = data.columns
    load = data[columns[1]]
    load = load.tolist()
    data = data.values.tolist()
    X, Y = [], []
    for i in range(len(data) - 30):
        train_seq = []
        train_label = []
        for j in range(i, i + 24):
            train_seq.append(load[j])
        # 添加温度、湿度、气压等信息
        for c in range(2, 10):
            train_seq.append(data[i + 24][c])
        train_label.append(load[i + 24])
        X.append(train_seq)
        Y.append(train_label)

    X, Y = np.array(X), np.array(Y)
    train_x, train_y = X[0:int(len(X) * 0.7)], Y[0:int(len(Y) * 0.7)]
    test_x, test_y = X[int(len(X) * 0.7):len(X)], Y[int(len(Y) * 0.7):len(Y)]

    return train_x, train_y, test_x, test_y


def train():
    nn = BP()
    print('training...')
    train_x, train_y, test_x, test_y = nn_seq()
    batch_size = 100
    epochs = 1000
    batch = int(len(train_x) / batch_size)
    for epoch in range(epochs):
        for i in range(batch):
            start = i * batch_size
            end = start + batch_size
            nn.forward_prop(train_x[start:end], train_y[start:end])
            nn.backward_prop(train_y[start:end])
        print('当前epoch:', epoch, ' error:', np.mean(nn.loss))
    return nn


def get_mape(x, y):
    """
    :param x:真实值
    :param y:预测值
    :return:MAPE
    """
    return np.mean(np.abs((x - y) / x))


def test():
    global MAX, MIN
    nn = train()
    train_x, train_y, test_x, test_y = nn_seq()
    pred = []
    batch = int(len(test_y) / 100)
    for i in range(batch):
        start = i * 100
        end = start + 100
        res = nn.forward_prop(test_x[start:end], test_y[start:end])
        res = res.tolist()
        res = list(chain.from_iterable(res))
        # print('res=', res)
        pred.extend(res)

    # 复原
    test_y = (MAX - MIN) * test_y + MIN
    pred = np.array(pred)
    pred = (MAX - MIN) * pred + MIN
    print('accuracy:', get_mape(test_y.flatten()[0:4900], pred))

    # plot
    x = [i for i in range(1, 151)]
    x_smooth = np.linspace(1, 150, 600)
    y_smooth = make_interp_spline(x, test_y[0:150])(x_smooth)
    plt.plot(x_smooth, y_smooth, c='green', marker='*', ms=1, alpha=0.75, label='true')

    y_smooth = make_interp_spline(x, pred[0:150])(x_smooth)
    plt.plot(x_smooth, y_smooth, c='red', marker='o', ms=1, alpha=0.75, label='pred')
    plt.grid(axis='y')
    plt.legend()
    plt.show()


if __name__ == '__main__':
    test()
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-01-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KI的算法杂记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 数据处理
  • 2 训练
  • 3 测试
  • 4 结果
相关产品与服务
联邦学习
联邦学习(Federated Learning,FELE)是一种打破数据孤岛、释放 AI 应用潜能的分布式机器学习技术,能够让联邦学习各参与方在不披露底层数据和底层数据加密(混淆)形态的前提下,通过交换加密的机器学习中间结果实现联合建模。该产品兼顾AI应用与隐私保护,开放合作,协同性高,充分释放大数据生产力,广泛适用于金融、消费互联网等行业的业务创新场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档