前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >机器学习算法之决策树算法

机器学习算法之决策树算法

作者头像
BBuf
发布2019-12-04 18:10:48
4000
发布2019-12-04 18:10:48
举报
文章被收录于专栏:GiantPandaCVGiantPandaCV

前言

这里以ID3算法做二分类为例介绍决策树算法的原理。所谓决策树就是由一个个"决策"组成的树。决策树中,结点分为两种,放“决策依据”的是非叶结点,放“决策结果”的是叶结点。那么决策是什么呢?很简单,决策就是对于一个问题有多种答案,我们选择答案的过程就是决策,例如判断一个人的身高是否大于180就是一种决策。

ID3算法原理

ID3算法的局限性

(1)不支持连续特征 (2)采用信息增益大的特征优先建立决策树的节点。在相同条件下,取值比较多的特征比取值少的特征信息增益大。 (3)不支持缺失值处理 (4)没有应对过拟合的策略

这里只是讲了一下决策树算法最基础的入门ID3算法,关于更多的知识例如C4.5,CART决策树等请看《周志华机器学习》决策树那章。

代码实现

代码语言:javascript
复制
#coding=utf-8
from math import log2
from copy import copy
from time import time
from random import random
from random import randint, seed, random
import numpy as np

def list_split(X, idxs, feature, split):
    ret = [[], []]
    while idxs:
        if X[idxs[0]][feature] < split:
            ret[0].append(idxs.pop(0))
        else:
            ret[1].append(idxs.pop(0))
    return ret

# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
    def fun():
        start = time()
        fn()
        ret = time() - start
        if ret < 1e-6:
            unit = "ns"
            ret *= 1e9
        elif ret < 1e-3:
            unit = "us"
            ret *= 1e6
        elif ret < 1:
            unit = "ms"
            ret *= 1e3
        else:
            unit = "s"
        print("Total run time is %.1f %s\n" % (ret, unit))
    return fun()

# 加载肺癌数据集
def load_data():
    f = open("boston/breast_cancer.csv")
    X = []
    y = []
    for line in f:
        line = line[:-1].split(',')
        xi = [float(s) for s in line[:-1]]
        yi = line[-1]
        if '.' in yi:
            yi = float(yi)
        else:
            yi = int(yi)
        X.append(xi)
        y.append(yi)
    f.close()
    return X, y

# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
    if random_state is not None:
        seed(random_state)
    X_train = []
    X_test = []
    y_train = []
    y_test = []
    for i in range(len(X)):
        if random() < prob:
            X_train.append(X[i])
            y_train.append(y[i])
        else:
            X_test.append(X[i])
            y_test.append(y[i])
    seed()
    return np.array(X_train), np.array(X_test), np.array(y_train), np.array(y_test)

# 准确率
def get_acc(y, y_hat):
    return sum(yi == yi_hat for yi, yi_hat in zip(y, y_hat)) / len(y)

# 查准率
def get_precision(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    predicted_positive = sum(y_hat)
    return true_postive / predicted_positive

# 查全率
def get_recall(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_postive / actual_positive

# 计算真正率
def get_tpr(y, y_hat):
    true_positive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_positive / actual_positive

# 计算真负率
def get_tnr(y, y_hat):
    true_negative = sum(1 - (yi or yi_hat) for yi, yi_hat in zip(y, y_hat))
    actual_negative = len(y) - sum(y)
    return true_negative / actual_negative

# 画ROC曲线
def get_roc(y, y_hat_prob):
    thresholds = sorted(set(y_hat_prob), reverse=True)
    ret = [[0, 0]]
    for threshold in thresholds:
        y_hat = [int(yi_hat_prob >= threshold) for yi_hat_prob in y_hat_prob]
        ret.append([get_tpr(y, y_hat), 1 - get_tnr(y, y_hat)])
    return ret
# 计算AUC(ROC曲线下方的面积)
def get_auc(y, y_hat_prob):
    roc = iter(get_roc(y, y_hat_prob))
    tpr_pre, fpr_pre = next(roc)
    auc = 0
    for tpr, fpr in roc:
        auc += (tpr + tpr_pre) * (fpr - fpr_pre) / 2
        tpr_pre = tpr
        fpr_pre = fpr
    return auc

# 定义决策树的节点
class Node(object):
    def __init__(self, prob=None):
        self.prob = prob
        self.left = None
        self.right = None
        self.feature = None
        self.split = None


class DecisionTree(object):
    # 本决策树只支持使用ID3的二分类
    # root代表根节点,depth代表决策树的深度
    def __init__(self):
        self.root = Node()
        self.depth = 1
        self._rules = None

    # 对特定特征feature和分裂点x
    def get_split_effect(self, X, y, idx, feature, split):
        n = len(idx)
        pos_cnt = [0, 0]
        cnt = [0, 0]
        for i in idx:
            xi, yi = X[i][feature], y[i]
            if xi < split:
                cnt[0] += 1
                pos_cnt[0] += yi
            else:
                cnt[1] += 1
                pos_cnt[1] += y[i]
        # 计算分裂影响
        prob = [pos_cnt[0] / cnt[0], pos_cnt[1] / cnt[1]]
        rate = [cnt[0] / n, cnt[1] / n]
        return prob, rate

    # 计算熵
    def get_entropy(self, p):
        if p == 1 or p == 0:
            return 0
        else:
            q = 1 - p
            return -(p * log2(p) + q * log2(q))

    # 计算信息熵
    def get_info(self, y, idx):
        p = sum(y[i] for i in idx) / len(idx)
        return self.get_entropy(p)

    # 计算条件熵
    def get_cond_info(self, prob, rate):
        info_left = self.get_entropy(prob[0])
        info_right = self.get_entropy(prob[1])
        return rate[0] * info_left + rate[1] * info_right
    # 寻找feature特征下的最好的分裂点
    def choose_split(self, X, y, idxs, feature):
        unique = set([X[i][feature] for i in idxs])
        if len(unique) == 1:
            return None
        unique.remove(min(unique))
        def f(split):
            info = self.get_info(y, idxs)
            prob, rate = self.get_split_effect(X, y, idxs, feature, split)
            cond_info = self.get_cond_info(prob, rate)
            # 信息增益
            gain = info - cond_info
            return gain, split, prob
        # 得到用于最大信息增益的分裂点
        gain, split, prob = max((f(split) for split in unique), key=lambda x: x[0])

        return gain, feature, split, prob

    # 寻找具有最大信息增益的特征
    def choose_feature(self, X, y, idxs):
        m = len(X[0])
        split_rets = map(lambda j: self.choose_split(X, y, idxs, j), range(m))
        split_rets = filter(lambda x: x is not None, split_rets)
        return max(split_rets, default=None, key=lambda x: x[0])

    def expr2literal(self, expr):
        feature, op, split = expr
        op = ">=" if op == 1 else "<"
        return "Feature%d %s %.4f" % (feature, op, split)

    # 获取决策树所有叶子节点的规则
    def get_rules(self):
        que = [[self.root, []]]
        self._rules = []
        while que:
            nd, exprs = que.pop(0)
            if not(nd.left or nd.right):
                literals = list(map(self.expr2literal, exprs))
                self._rules.append([literals, nd.prob])
            if nd.left:
                rule_left = copy(exprs)
                rule_left.append([nd.feature, -1, nd.split])
                que.append([nd.left, rule_left])
            if nd.right:
                rule_right = copy(exprs)
                rule_right.append([nd.feature, 1, nd.split])
                que.append([nd.right, rule_right])

    # 训练数据
    def fit(self, X, y, max_depth=4, min_samples_split=2):
        idxs = list(range(len(y)))
        que = [(self.depth+1, self.root, idxs)]
        while que:
            depth, nd, idxs = que.pop(0)
            if depth > max_depth:
                depth -= 1
                break
            if len(idxs) < min_samples_split or nd.prob == 1 or nd.prob == 0:
                continue
            split_ret = self.choose_feature(X, y, idxs)
            if split_ret is None:
                continue
            _, feature, split, prob = split_ret
            nd.feature = feature
            nd.split = split
            nd.left = Node(prob[0])
            nd.right = Node(prob[1])
            idxs_split = list_split(X, idxs, feature, split)
            que.append((depth + 1, nd.left, idxs_split[0]))
            que.append((depth + 1, nd.right, idxs_split[1]))
        self.depth = depth
        self.get_rules()

    def print_rules(self):
        for i, rule in enumerate(self._rules):
            literals, prob = rule
            print("Rule %d: " % i, ' | '.join(literals) + ' => y_hat %.4f' % prob)
            print()
    def _predict(self, Xi):
        nd = self.root
        while nd.left and nd.right:
            if Xi[nd.feature] < nd.split:
                nd = nd.left
            else:
                nd = nd.right
        return nd.prob

    def predict(self, X, threshold=0.5):
        return [int(self._predict(Xi) >= threshold) for Xi in X]


@run_time
# 效果评估
def main():
    print("Tesing the performance of DecisionTree...")
    data, label = load_data()
    data_train, data_test, label_train, label_test = train_test_split(data, label, random_state=100)
    clf = DecisionTree()
    clf.fit(data_train, label_train)
    clf.print_rules()
    y_hat = clf.predict(data_test)
    acc = get_acc(label_test, y_hat)
    print("Accuracy is %.3f" % acc)

在肺癌数据集上测试效果

代码获取

https://github.com/BBuf/machine-learning

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 GiantPandaCV 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • ID3算法原理
  • ID3算法的局限性
  • 在肺癌数据集上测试效果
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档