前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >动手实现Bandit算法

动手实现Bandit算法

作者头像
秋枫学习笔记
发布2022-09-19 10:13:49
3600
发布2022-09-19 10:13:49
举报
文章被收录于专栏:秋枫学习笔记

关注我们,一起学习~

Bandit方法在很多领域都有应用,比如强化学习,推荐系统。在推荐系统中可以采用Bandit方法进行冷启动,探索与利用的平衡,具体的方法介绍在之前的文章中已经介绍了,这里不在赘述,这次和大家分享相关方法的具体实现。

Bandit算法学习与总结(一)

Bandit算法学习与总结(二):Disjoint LinUCB、Hybrid LinUCB

完整代码和测试代码详见:https://github.com/dqdallen/recommendation/tree/main/Bandit

代码相对简单,所以就没写多少注释

父类

该类用于实现一些通用的方法,从而简化具体类实现时的代码。主要是arms,counts,values分别存储臂,每个臂被访问的次数以及每个臂对应的收益。

代码语言:javascript
复制
from abc import ABCMeta, abstractmethod


class BanditFather(metaclass=ABCMeta):
    def __init__(self, data):
        """
        Args:
            data: array, the collection of arm, eg items
        """
        self.arms = data
        # store the number of visits per arm
        self.counts = [0] * len(self.arms)
        # store the average reward or wins of each arm
        self.values = [0.] * len(self.arms)

    @abstractmethod
    def excute(self):
        pass

    @abstractmethod
    def update(self, reward, arm_idx):
        pass

EpsilonGreedy

代码语言:javascript
复制
import numpy as np
from bandit_father import BanditFather


class EpsilonGreedy(BanditFather):
    def __init__(self, data, eps):
        """The implementation of epsilon_greedy
        Args:
            eps: the epsilon
        """
        super(EpsilonGreedy, self).__init__(data)
        self.eps = eps

    def get_best_armidx(self):
        v = max(self.values)
        return self.values.index(v)

    def get_random_armidx(self):
        idx = np.random.randint(len(self.arms))
        return idx

    def update(self, reward, arm_idx):
        self.counts[arm_idx] += 1
        new_value = self.values[arm_idx] * (self.counts[arm_idx] - 1.) + reward
        new_value = new_value / self.counts[arm_idx]
        self.values[arm_idx] = new_value

    def excute(self):
        if np.random.random() > self.eps:
            arm_idx = self.get_best_armidx()
        else:
            arm_idx = self.get_random_armidx()

        return arm_idx

汤普森采样

代码语言:javascript
复制
import numpy as np
from bandit_father import BanditFather


class ThompsonSampling(BanditFather):
    def __init__(self, data):
        super(ThompsonSampling, self).__init__(data)

    def update(self, reward, arm_idx):
        if reward == 1:
            self.values[arm_idx] += 1
        self.counts[arm_idx] += 1

    def excute(self):
        # sample from beta distribution
        pbeta = [np.random.beta(a + 1, b - a + 1)
                 for a, b in zip(self.values, self.counts)]
        arm_idx = np.argmax(pbeta)
        return arm_idx

UCB

代码语言:javascript
复制
import numpy as np
from bandit_father import BanditFather


class UCB(BanditFather):
    def __init__(self, data):
        super(UCB, self).__init__(data)

    def excute(self):
        for i in range(len(self.arms)):
            if self.counts[i] == 0:
                return i
        max_ind = 0
        max_ucb = -1
        for i in range(len(self.arms)):
            v = self.values[i] + np.sqrt(
                2 * np.log(sum(self.counts)) / self.counts[i])
            if v > max_ucb:
                max_ucb = v
                max_ind = i
        return max_ind

    def update(self, reward, arm_idx):
        self.counts[arm_idx] += 1
        value = self.values[arm_idx] * (self.counts[arm_idx] - 1) + reward
        self.values[arm_idx] = value / self.counts[arm_idx]

LibUCB

可以对照着伪代码看,基本就是跟着伪代码写的。

代码语言:javascript
复制
import numpy as np


class DisjointLibUCB:
    def __init__(self, data, dim, alpha=0.25):
        self.arms = data
        self.dim = dim  # 特征维度
        self.alpha = alpha
        self.A = []     # 存储每个臂的A矩阵
        self.invA = []  # 矩阵A的逆
        self.b = []
        self.theta = []
        self.initialize()

    def initialize(self):
        for i in range(len(self.arms)):
            self.A.append(np.eye(self.dim))
            self.invA.append(np.eye(self.dim))
            self.b.append(np.zeros((self.dim, 1)))

    def excute(self, features):
        max_prob = -1
        max_ind = 0
        for i in range(len(self.invA)):
            theta = np.dot(self.invA[i], self.b[i])
            score = np.dot(theta.T, features[:, i])[0]
            bound = self.alpha * np.sqrt(np.dot(
                np.dot(features[:, i].T, self.invA[i]), features[:, i]))
            prob_tmp = score + bound
            if max_prob < prob_tmp:
                max_prob = prob_tmp
                max_ind = i

        return max_ind

    def update(self, features, reward, arm_idx):
        fea_tmp = features[:, arm_idx]
        self.A[arm_idx] = self.A[arm_idx] + np.dot(fea_tmp, fea_tmp.T)
        self.b[arm_idx] = self.b[arm_idx] + reward * fea_tmp
        self.invA[arm_idx] = np.linalg.inv(self.A[arm_idx])

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 秋枫学习笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 父类
  • EpsilonGreedy
  • 汤普森采样
  • UCB
  • LibUCB
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档