“装袋”(bagging)和“提升”(boost)是构建组合模型的两种最主要的方法,所谓的组合模型是由多个基本模型构成的模型,组合模型的预测效果往往比任意一个基本模型的效果都要好。
1)给每个训练样本 x_{1},x_{2},….,x_{N} 分配权重,初始权重w_{1} 均为1/N。 2)针对带有权值的样本进行训练,得到模型G_m (初始模型为G1)。 3)计算模型 G_m 的误分率 e_m=\sum_{i=1}^Nw_iI(y_i\not= G_m(x_i)) 4)计算模型 \alpha_m=0.5\log[(1-e_m)/e_m] 5)根据误分率e和当前权重向量 w_m 更新权重向量 w_{m+1} D_{m+1}=(w_{m+1,1},...,w_{m+1,i},...,w_{m+1,N}) w_{m+1,i}=\frac{w_{m,1}}{Z_{m}}exp(-a_{m}y_{i}G_{m}(x_{i})) Z_{m}=\sum_{i=1}^Nw_{mi}exp(-a_{m}y_{i}G_{m}(x_{i}))
6)计算组合模型 f(x)=\sum_{m=1}^M\alpha_mG_m(x_i) 的误分率。 得到最终分类器: G(x)=sign(f(x))=sign(\sum_{m=1}^M\alpha_mG_m(x_i)) 7)当组合模型的误分率或迭代次数低于一定阈值,停止迭代;否则,回到步骤2)
# -*- coding:utf-8 -*-
# /usr/bin/python
import numpy as np
class AdaBoost():
def __init__(self,nEstimators=50,learningRate=0.1):
'''
对象属性
:param nEstimators: 迭代代数
:param learningRate: 学习率
'''
self.nEstimators = nEstimators
self.learningRate = learningRate
def initArgs(self,datasets,labels):
'''
初始化参数
:param datasets: 数据集
:param labels: 标签
:return:
'''
self.X = datasets
self.Y = labels
self.M,self.N = self.X.shape
# 弱分类器
self.clfSets=[]
# 初始化权值
self.weights = [1.0 / self.M]*self.M
# 不同分类器的系数
self.alpha = []
def _G(self,features,labels,weights):
m = len(features)
print("m",m)
error = 10000
bestV = 0
features_min = min(features)
features_max = max(features)
n_step = (features_max - features_min + self.learningRate) // self.learningRate
print('n_step:{}'.format(n_step))
direct, compare_array = None, None
for i in range(1, int(n_step)):
v = features_min + self.learningRate * i
if v not in features:
# 误分类计算
compare_array_positive = np.array([1 if features[k] > v else -1 for k in range(m)])
print(compare_array_positive)
weight_error_positive = sum([weights[k] for k in range(m) if compare_array_positive[k] != labels[k]])
print(weight_error_positive)
compare_array_nagetive = np.array([-1 if features[k] > v else 1 for k in range(m)])
weight_error_nagetive = sum([weights[k] for k in range(m) if compare_array_nagetive[k] != labels[k]])
if weight_error_positive < weight_error_nagetive:
weight_error = weight_error_positive
_compare_array = compare_array_positive
direct = 'positive'
else:
weight_error = weight_error_nagetive
_compare_array = compare_array_nagetive
direct = 'nagetive'
print('v:{} error:{}'.format(v, weight_error))
if weight_error < error:
error = weight_error
compare_array = _compare_array
bestV = v
return bestV,direct,error,compare_array
# 计算alpha
def _alpha(self,error):
'''
更新alpha
:param error: 误差
:return: 新的alpha
'''
return 0.5 * np.log((1 - error) / error)
def _Z(self,weights,a,clf):
'''
规范化因子
:param weights: 权值
:param a: 系数
:param clf: 类别
:return:
'''
return sum([weights[i] * np.exp(-1 * a * self.Y[i] * clf[i]) for i in range(self.M)])
def _w(self,a,clf,Z):
'''
更新权值
:param a: 系数
:param clf: 类别
:param Z:
:return: 规范因子
'''
self.weights = self.weights * np.exp(-1 * a * self.Y * clf) / Z
# G(x)的线性组合
def _f(self, alpha, clf_sets):
pass
def G(self, x, v, direct):
'''
分类器
:param x: 输入特征
:param v: 阈值
:param direct: 正负倾向
:return:
'''
if direct == 'positive':
return 1 if x > v else -1
else:
return -1 if x > v else 1
def fit(self,X,y):
'''
训练
:param X: x特征
:param y: y标签
:return:
'''
self.initArgs(X,y)
for epoch in range(self.nEstimators):
best_clf_error, bestV, clf_result = 100000, None, None
for j in range(self.N):
features = self.X[:, j]
# 分类阈值,分类误差,分类结果
v, direct, error, compare_array = self._G(features, self.Y, self.weights)
if error < best_clf_error:
best_clf_error = error
bestV = v
final_direct = direct
clf_result = compare_array
axis = j
if best_clf_error == 0:
break
# 计算G(x)系数a
a = self._alpha(best_clf_error)
self.alpha.append(a)
# 记录分类器
self.clfSets.append((axis, bestV, final_direct))
# 规范化因子
Z = self._Z(self.weights, a, clf_result)
# 权值更新
self._w(a, clf_result, Z)
def predict(self, feature):
'''
预测
:param feature: x特征
:return:
'''
result = 0.0
for i in range(len(self.clfSets)):
axis, clf_v, direct = self.clfSets[i]
f_input = feature[axis]
result += self.alpha[i] * self.G(f_input, clf_v, direct)
# sign
return 1 if result > 0 else -1
def score(self, X_test, y_test):
'''
分数计算
:param X_test: 测试数据集的x特征
:param y_test: 测试数据集的y特征
:return:
'''
right_count = 0
for i in range(len(X_test)):
feature = X_test[i]
if self.predict(feature) == y_test[i]:
right_count += 1
return right_count / len(X_test)
X = np.arange(10).reshape(10,1)
y = np.array([1, 1, 1, -1, -1, -1, 1, 1, 1, -1])
clf = AdaBoost(nEstimators=10,learningRate=0.9)
clf.fit(X,y)
testX = np.array([[0],[9],[2]])
testY = np.array([1,-1,1])
print(clf.score(testX,testY))