前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 五十、Ribbon的LoadBalancer五大组件之:ServerListUpdater

[享学Netflix] 五十、Ribbon的LoadBalancer五大组件之:ServerListUpdater

作者头像
YourBatman
发布2020-03-19 17:11:43
1.9K0
发布2020-03-19 17:11:43
举报

如果你是房间里最聪明的人,那么你走错房间了。 代码下载地址:https://github.com/f641385712/netflix-learning

目录
  • 前言
  • 正文
    • ServerListUpdater
      • PollingServerListUpdater
        • 成员属性
        • 初始化任务调度器
        • 成员方法
        • 哪里使用?
    • 代码示例
  • 总结
    • 声明

前言

我们已经知道ServerList它用于提供Server列表,而ServerListFilter组件它用于对列表进行过滤,本文将介绍一个Action组件:ServerListUpdater服务列表更新器。它像是一个任务调度器,来定时触发相应的动作,它强调的是动作的开始/触发,具体实现它并不关心,所以在实现里你完全可以结合ServerListServerListFilter一起来完成服务列表的维护,实际上也确实是这么做的。


正文

ServerListUpdater组件只关心动作的开始,并不关心具体实现,所以它是比较简单理解的一个组件。


ServerListUpdater

列表更新器,被DynamicServerListLoadBalancer用于动态的更新服务列表。

// Server列表更新器。
public interface ServerListUpdater {
	// 内部接口:函数式接口 实际上执行服务器列表更新的接口 也就是实际的动作
	// 一般使用匿名内部类的形式实现
	public interface UpdateAction {
        void doUpdate();
    }

	// 使用给定的更新操作启动serverList更新程序这个调用应该是幂等的
	void start(UpdateAction updateAction);
	// 停止服务器列表更新程序。这个调用应该是幂等的
	void stop();

	// ============下面是一些获取执行过程中的信息方法==============
	// 最后更新的时间Date的String表示形式
	String getLastUpdate();
	// 自上次更新以来已经过的ms数
	long getDurationSinceLastUpdateMs();
	//错过更新周期的数量(如果有的话)
	int getNumberMissedCycles();
	// 使用的线程数
	int getCoreThreads();
}

从接口方法中能看出,它会通过任务调度去定时实现更新操作。所以它有个唯一实现子类:PollingServerListUpdater


PollingServerListUpdater

动态服务器列表更新器要更新的默认实现,使用一个任务调度器ScheduledThreadPoolExecutor完成定时更新。

它本作为DynamicServerListLoadBalancer的一个内部类,现单独拿出来成为一个public的类了


成员属性
public class PollingServerListUpdater implements ServerListUpdater {

    private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000;
    private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000;

	private final AtomicBoolean isActive = new AtomicBoolean(false);
	private volatile long lastUpdated = System.currentTimeMillis();
    private final long initialDelayMs;
    private final long refreshIntervalMs;
    
	// 继承自Futrue,在Futrue的基础上增加getDelay(TimeUnit unit)方法:还有多久执行任务
	// ScheduledExecutorService提交任务时返回它
	// ScheduledThreadPoolExecutor是带有线程池功能的执行器,实现了接口ScheduledExecutorService
	private volatile ScheduledFuture<?> scheduledFuture;
}
  • isActive:标记当前Scheduled任务是否是活跃状态中(已经开启就活跃状态)
  • lastUpdated:任务调用执行一次updateAction.doUpdate()后记录该时刻,表示最新的一次update的时间戳
  • initialDelayMs:线程池的initialDelay参数。默认值是LISTOFSERVERS_CACHE_UPDATE_DELAY也就是延迟1000ms开始执行
  • refreshIntervalMs:默认值是LISTOFSERVERS_CACHE_REPEAT_INTERVAL也就是30s执行一次
    • 因为该参数相对重要,所以不仅构造时可以指定其值,还可以通过外部化配置其值,对应的key是ServerListRefreshInterval,也就是说你可以这么配置它:
      • <clientName>.ribbon.ServerListRefreshInterval = 60000(当然一般配置一个全局的即可)
  • scheduledFuture:任务调度结果,方便做cancel()动作

初始化任务调度器

该类的任务调度器是通过自己的一个私有静态内部类LazyHolder来内聚实现的:

PollingServerListUpdater.LazyHolder:

	// 重点:core核心数是可**动态**可变的
	private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
	private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
	static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;

	// 使用静态代码块完成初始化
	static {
		// 注册一个callback以便能动态更新core核心数
		int coreSize = poolSizeProp.get();
        poolSizeProp.addCallback(new Runnable() {
            @Override
            public void run() {
                _serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
            }
        });
        _serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
        ...
	}

对于线程调度线程池的初始化,此处的核心要点是:默认的coreSize数是2,但是你可以通过配置动态实现更改,所以当你的ClientName实例较多时,可适当的调高此数值。它的实现原理同ScheduledThreadPoolExectuorWithDynamicSize


成员方法
PollingServerListUpdater:

	// 构造器,为initialDelayMs/refreshIntervalMs两个参数赋值
    public PollingServerListUpdater() {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
    }
    // 从config里面拿值。对应的key是ServerListRefreshInterval
    public PollingServerListUpdater(IClientConfig clientConfig) {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
    }
    public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
        this.initialDelayMs = initialDelayMs;
        this.refreshIntervalMs = refreshIntervalMs;
    }

	// 启动
    @Override
    public synchronized void start(final UpdateAction updateAction) {
    	// 保证原子性。如果已经启动了就啥都不做
    	if (isActive.compareAndSet(false, true)) {
			//定时任务每次执行的Task
    		Runnable wrapperRunnable = () -> {
				if (!isActive.get()) {
					if (scheduledFuture != null) {
						scheduledFuture.cancel(true);
						return;
					}
					// 每次执行更新操作时,记录下时间戳
					updateAction.doUpdate();
					lastUpdated = System.currentTimeMillis();
				}

				// 启动任务 默认30s执行一次
	            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
	                    wrapperRunnable, initialDelayMs,
	                    refreshIntervalMs, TimeUnit.MILLISECONDS
    		};
    	} 
    }

	// 停止任务
    @Override
    public synchronized void stop() {
    	scheduledFuture.cancel(true);
    }

	// 为何返回String?返回Date不香吗?
    @Override
    public String getLastUpdate() {
        return new Date(lastUpdated).toString();
    }
    // 距离上一次update更新已过去这么长时间了
    @Override
    public long getDurationSinceLastUpdateMs() {
        return System.currentTimeMillis() - lastUpdated;
    }

	// 因为coreSize是动态可以配的,所以提供方法供以访问
    @Override
    public int getCoreThreads() {
        if (isActive.get()) {
            if (getRefreshExecutor() != null) {
                return getRefreshExecutor().getCorePoolSize();
            }
        }
        return 0;
    }
	...

得到一个PollingServerListUpdater实例后,调用其start方法,便可实现定时的更新服务列表了,非常的方便。所以若你想要有一个定时的去更新服务列表的能力,可以使用此组件方便的实现。


哪里使用?

它的唯一使用处在DynamicServerListLoadBalancer里,它的实现动作是:ServerListUpdater.UpdateAction

ServerListUpdater.UpdateAction:

    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
            }
        }
        updateAllServerList(servers);
    }

一句话解释:定时的更新服务器列表,这个更新动作是之前学过的两步:

  • 先用ServerList拿到所有的Server列表
  • 再用ServerListFilter完成过滤

代码示例

@Test
public void fun10() throws InterruptedException {
    ServerListUpdater serverListUpdater = new PollingServerListUpdater();

    serverListUpdater.start(() -> {
        int coreThreads = serverListUpdater.getCoreThreads();
        String lastUpdate = serverListUpdater.getLastUpdate();
        int numberMissedCycles = serverListUpdater.getNumberMissedCycles();
        long durationSinceLastUpdateMs = serverListUpdater.getDurationSinceLastUpdateMs();
        System.out.println("===========上次的执行时间是:" + lastUpdate);
        System.out.println("自上次更新以来已经过的ms数:" + durationSinceLastUpdateMs);
        System.out.println("线程核心数:" + coreThreads);
        System.out.println("错过更新周期的数量:" + numberMissedCycles);

        // .... 执行你对Server列表的更新动作,本处略
    });

    TimeUnit.SECONDS.sleep(500);
}

运行程序,控制台打印:

===========上次的执行时间是:Thu Mar 19 10:28:14 CST 2020
自上次更新以来已经过的ms数:30003
线程核心数:2
错过更新周期的数量:1
===========上次的执行时间是:Thu Mar 19 10:28:44 CST 2020
自上次更新以来已经过的ms数:30002
线程核心数:2
错过更新周期的数量:1
===========上次的执行时间是:Thu Mar 19 10:29:14 CST 2020
自上次更新以来已经过的ms数:30002
线程核心数:2
错过更新周期的数量:1

总结

关于Ribbon的LoadBalancer五大组件之:ServerListUpdater服务列表更新器就先介绍到这,实际上最终实现的都是使用的PollingServerListUpdater来实现定时更新

至此,LoadBalancer的五大组件已完成其四,剩下最为重要,也是相对较难的IRule了,下文继续予以详解。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 前言
  • 正文
    • ServerListUpdater
      • PollingServerListUpdater
    • 代码示例
    • 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档