I. 前言
在BP神经网络模型及其Gradient Descent的推导过程中有推导过BP算法,但只是简单用符号来表示求导关系,并没有详细介绍求导的具体步骤。
由于联邦学习的需要,所以在这里着重推导一下反向传播过程中的链式求导过程。
II. 网络结构
本次搭建的神经网络参考了另一篇博文:手写神经网络识别MNIST数据集,该神经网络由1个输入层、3个隐藏层以及1个输出层组成,激活函数全部采用Sigmoid函数。
III. 前向传播
神经网络各层间的运算关系,也就是前向传播过程如下所示:
其中:
代码实现
class BP:
def __init__(self):
self.input = np.zeros((100, 32)) # 100 samples per round
self.w1 = 2 * np.random.random((32, 20)) - 1 # limit to (-1, 1)
self.z1 = 2 * np.random.random((100, 20)) - 1
self.hidden_layer_1 = np.zeros((100, 20))
self.w2 = 2 * np.random.random((20, 20)) - 1
self.z2 = 2 * np.random.random((100, 20)) - 1
self.hidden_layer_2 = np.zeros((100, 20))
self.w3 = 2 * np.random.random((20, 20)) - 1
self.z3 = 2 * np.random.random((100, 20)) - 1
self.hidden_layer_3 = np.zeros((100, 20))
self.w4 = 2 * np.random.random((20, 1)) - 1
self.z4 = 2 * np.random.random((100, 1)) - 1
self.output_layer = np.zeros((100, 1))
self.loss = np.zeros((100, 1))
self.learning_rate = 0.08
def forward_prop(self, data, label): # label:100 X 1,data: 100 X 32
self.input = data
self.z1 = np.dot(self.input, self.w1)
self.hidden_layer_1 = self.sigmoid(self.z1)
self.z2 = np.dot(self.hidden_layer_1, self.w2)
self.hidden_layer_2 = self.sigmoid(self.z2)
self.z3 = np.dot(self.hidden_layer_2, self.w3)
self.hidden_layer_3 = self.sigmoid(self.z3)
self.z4 = np.dot(self.hidden_layer_3, self.w4)
self.output_layer = self.sigmoid(self.z4)
# loss
self.loss = 1 / 2 * (label - self.output_layer) ** 2
return self.output_layer
IV. 反向传播
相应代码为:
l_deri_out = self.output_layer - label
l_deri_z4 = l_deri_out * self.sigmoid_deri(self.output_layer)
l_deri_w4 = np.dot(self.hidden_layer_3.T, l_deri_z4)
那么同理,w3对 我们有:
相应代码为:
l_deri_h3 = np.dot(l_deri_z4, self.w4.T)
l_deri_z3 = l_deri_h3 * self.sigmoid_deri(self.hidden_layer_3)
l_deri_w3 = np.dot(self.hidden_layer_2.T, l_deri_z3)
对 w2有:
相应代码为:
l_deri_h2 = np.dot(l_deri_z3, self.w3.T)
l_deri_z2 = l_deri_h2 * self.sigmoid_deri(self.hidden_layer_2)
l_deri_w2 = np.dot(self.hidden_layer_1.T, l_deri_z2)
对w1有:
相应代码为:
l_deri_h1 = np.dot(l_deri_z2, self.w2.T)
l_deri_z1 = l_deri_h1 * self.sigmoid_deri(self.hidden_layer_1)
l_deri_w1 = np.dot(self.input.T, l_deri_z1)
因此,反向传播的完整代码为:
def backward_prop(self, label):
# w4
l_deri_out = self.output_layer - label
l_deri_z4 = l_deri_out * self.sigmoid_deri(self.output_layer)
l_deri_w4 = np.dot(self.hidden_layer_3.T, l_deri_z4)
# w3
l_deri_h3 = np.dot(l_deri_z4, self.w4.T)
l_deri_z3 = l_deri_h3 * self.sigmoid_deri(self.hidden_layer_3)
l_deri_w3 = np.dot(self.hidden_layer_2.T, l_deri_z3)
# w2
l_deri_h2 = np.dot(l_deri_z3, self.w3.T)
l_deri_z2 = l_deri_h2 * self.sigmoid_deri(self.hidden_layer_2)
l_deri_w2 = np.dot(self.hidden_layer_1.T, l_deri_z2)
# w1
l_deri_h1 = np.dot(l_deri_z2, self.w2.T)
l_deri_z1 = l_deri_h1 * self.sigmoid_deri(self.hidden_layer_1)
l_deri_w1 = np.dot(self.input.T, l_deri_z1)
# update
self.w4 -= self.learning_rate * l_deri_w4
self.w3 -= self.learning_rate * l_deri_w3
self.w2 -= self.learning_rate * l_deri_w2
self.w1 -= self.learning_rate * l_deri_w1
V. 实验
本次实验拟对电力负荷进行预测:利用前24个小时的负荷和下一时刻的环境因素来预测下一时刻的负荷值。
def nn_seq():
print('处理数据:')
data = load_data()
columns = data.columns
load = data[columns[1]]
load = load.tolist()
data = data.values.tolist()
X, Y = [], []
for i in range(len(data) - 30):
train_seq = []
train_label = []
for j in range(i, i + 24):
train_seq.append(load[j])
# 添加温度、湿度、气压等信息
for c in range(2, 10):
train_seq.append(data[i + 24][c])
train_label.append(load[i + 24])
X.append(train_seq)
Y.append(train_label)
X, Y = np.array(X), np.array(Y)
train_x, train_y = X[0:int(len(X) * 0.7)], Y[0:int(len(Y) * 0.7)]
test_x, test_y = X[int(len(X) * 0.7):len(X)], Y[int(len(Y) * 0.7):len(Y)]
return train_x, train_y, test_x, test_y
def train():
nn = BP()
print('training...')
train_x, train_y, test_x, test_y = nn_seq()
batch_size = 100
epochs = 1000
batch = int(len(train_x) / batch_size)
for epoch in range(epochs):
for i in range(batch):
start = i * batch_size
end = start + batch_size
nn.forward_prop(train_x[start:end], train_y[start:end])
nn.backward_prop(train_y[start:end])
print('当前epoch:', epoch, ' error:', np.mean(nn.error))
return nn
def test():
global MAX, MIN
nn = train()
train_x, train_y, test_x, test_y = nn_seq()
pred = []
batch = int(len(test_y) / 100)
for i in range(batch):
start = i * 100
end = start + 100
res = nn.forward_prop(test_x[start:end], test_y[start:end])
res = res.tolist()
res = list(chain.from_iterable(res))
pred.extend(res)
# 复原
test_y = (MAX - MIN) * test_y + MIN
pred = np.array(pred)
pred = (MAX - MIN) * pred + MIN
print('accuracy:', get_mape(test_y.flatten()[0:4900], pred))
# plot
x = [i for i in range(1, 151)]
x_smooth = np.linspace(1, 150, 600)
y_smooth = make_interp_spline(x, test_y[0:150])(x_smooth)
plt.plot(x_smooth, y_smooth, c='green', marker='*', ms=1, alpha=0.75, label='true')
y_smooth = make_interp_spline(x, pred[0:150])(x_smooth)
plt.plot(x_smooth, y_smooth, c='red', marker='o', ms=1, alpha=0.75, label='pred')
plt.grid(axis='y')
plt.legend()
plt.show()
经过1000轮训练后,神经网络的MAPE为5.08%:
VI. 完整代码
# -*- coding: utf-8 -*-
"""
@Time : 2022/1/8 11:40
@Author :KI
@File :federal_learning.py
@Motto:Hungry And Humble
"""
import pandas as pd
import numpy as np
import torch
import matplotlib.pyplot as plt
from scipy.interpolate import make_interp_spline
from itertools import chain
MAX, MIN = 0, 0
def load_data():
global MAX, MIN
df = pd.read_csv('data/anqiudata.csv', encoding='gbk')
columns = df.columns
df.fillna(df.mean(), inplace=True)
for i in range(1, len(columns)): # 归一化
column = columns[i]
df[column] = df[column].astype('float64')
# 映射到-1~1区间
Max = np.max(df[column])
Min = np.min(df[column])
if i == 1:
MAX = Max
MIN = Min
df[column] = (df[column] - Min) / (Max - Min)
return df
class BP:
def __init__(self):
self.input = np.zeros((100, 32)) # 100 samples per round
self.w1 = 2 * np.random.random((32, 20)) - 1 # limit to (-1, 1)
self.z1 = 2 * np.random.random((100, 20)) - 1
self.hidden_layer_1 = np.zeros((100, 20))
self.w2 = 2 * np.random.random((20, 20)) - 1
self.z2 = 2 * np.random.random((100, 20)) - 1
self.hidden_layer_2 = np.zeros((100, 20))
self.w3 = 2 * np.random.random((20, 20)) - 1
self.z3 = 2 * np.random.random((100, 20)) - 1
self.hidden_layer_3 = np.zeros((100, 20))
self.w4 = 2 * np.random.random((20, 1)) - 1
self.z4 = 2 * np.random.random((100, 1)) - 1
self.output_layer = np.zeros((100, 1))
self.loss = np.zeros((100, 1))
self.learning_rate = 0.08
def sigmoid(self, x):
return 1 / (1 + np.exp(-x))
def sigmoid_deri(self, x):
return x * (1 - x)
def forward_prop(self, data, label): # label:100 X 1,data: 100 X 32
self.input = data
self.z1 = np.dot(self.input, self.w1)
self.hidden_layer_1 = self.sigmoid(self.z1)
self.z2 = np.dot(self.hidden_layer_1, self.w2)
self.hidden_layer_2 = self.sigmoid(self.z2)
self.z3 = np.dot(self.hidden_layer_2, self.w3)
self.hidden_layer_3 = self.sigmoid(self.z3)
self.z4 = np.dot(self.hidden_layer_3, self.w4)
self.output_layer = self.sigmoid(self.z4)
# error
self.loss = 1/2 * (label - self.output_layer) ** 2
return self.output_layer
def backward_prop(self, label):
# w4
l_deri_out = self.output_layer - label
l_deri_z4 = l_deri_out * self.sigmoid_deri(self.output_layer)
l_deri_w4 = np.dot(self.hidden_layer_3.T, l_deri_z4)
# w3
l_deri_h3 = np.dot(l_deri_z4, self.w4.T)
l_deri_z3 = l_deri_h3 * self.sigmoid_deri(self.hidden_layer_3)
l_deri_w3 = np.dot(self.hidden_layer_2.T, l_deri_z3)
# w2
l_deri_h2 = np.dot(l_deri_z3, self.w3.T)
l_deri_z2 = l_deri_h2 * self.sigmoid_deri(self.hidden_layer_2)
l_deri_w2 = np.dot(self.hidden_layer_1.T, l_deri_z2)
# w1
l_deri_h1 = np.dot(l_deri_z2, self.w2.T)
l_deri_z1 = l_deri_h1 * self.sigmoid_deri(self.hidden_layer_1)
l_deri_w1 = np.dot(self.input.T, l_deri_z1)
# update
self.w4 -= self.learning_rate * l_deri_w4
self.w3 -= self.learning_rate * l_deri_w3
self.w2 -= self.learning_rate * l_deri_w2
self.w1 -= self.learning_rate * l_deri_w1
def nn_seq():
print('处理数据:')
data = load_data()
columns = data.columns
load = data[columns[1]]
load = load.tolist()
data = data.values.tolist()
X, Y = [], []
for i in range(len(data) - 30):
train_seq = []
train_label = []
for j in range(i, i + 24):
train_seq.append(load[j])
# 添加温度、湿度、气压等信息
for c in range(2, 10):
train_seq.append(data[i + 24][c])
train_label.append(load[i + 24])
X.append(train_seq)
Y.append(train_label)
X, Y = np.array(X), np.array(Y)
train_x, train_y = X[0:int(len(X) * 0.7)], Y[0:int(len(Y) * 0.7)]
test_x, test_y = X[int(len(X) * 0.7):len(X)], Y[int(len(Y) * 0.7):len(Y)]
return train_x, train_y, test_x, test_y
def train():
nn = BP()
print('training...')
train_x, train_y, test_x, test_y = nn_seq()
batch_size = 100
epochs = 1000
batch = int(len(train_x) / batch_size)
for epoch in range(epochs):
for i in range(batch):
start = i * batch_size
end = start + batch_size
nn.forward_prop(train_x[start:end], train_y[start:end])
nn.backward_prop(train_y[start:end])
print('当前epoch:', epoch, ' error:', np.mean(nn.loss))
return nn
def get_mape(x, y):
"""
:param x:真实值
:param y:预测值
:return:MAPE
"""
return np.mean(np.abs((x - y) / x))
def test():
global MAX, MIN
nn = train()
train_x, train_y, test_x, test_y = nn_seq()
pred = []
batch = int(len(test_y) / 100)
for i in range(batch):
start = i * 100
end = start + 100
res = nn.forward_prop(test_x[start:end], test_y[start:end])
res = res.tolist()
res = list(chain.from_iterable(res))
# print('res=', res)
pred.extend(res)
# 复原
test_y = (MAX - MIN) * test_y + MIN
pred = np.array(pred)
pred = (MAX - MIN) * pred + MIN
print('accuracy:', get_mape(test_y.flatten()[0:4900], pred))
# plot
x = [i for i in range(1, 151)]
x_smooth = np.linspace(1, 150, 600)
y_smooth = make_interp_spline(x, test_y[0:150])(x_smooth)
plt.plot(x_smooth, y_smooth, c='green', marker='*', ms=1, alpha=0.75, label='true')
y_smooth = make_interp_spline(x, pred[0:150])(x_smooth)
plt.plot(x_smooth, y_smooth, c='red', marker='o', ms=1, alpha=0.75, label='pred')
plt.grid(axis='y')
plt.legend()
plt.show()
if __name__ == '__main__':
test()