如果你是房间里最聪明的人,那么你走错房间了。 代码下载地址:https://github.com/f641385712/netflix-learning
我们已经知道ServerList
它用于提供Server列表,而ServerListFilter
组件它用于对列表进行过滤,本文将介绍一个Action组件:ServerListUpdater
服务列表更新器。它像是一个任务调度器,来定时触发相应的动作,它强调的是动作的开始/触发,具体实现它并不关心,所以在实现里你完全可以结合ServerList
和ServerListFilter
一起来完成服务列表的维护,实际上也确实是这么做的。
ServerListUpdater
组件只关心动作的开始,并不关心具体实现,所以它是比较简单理解的一个组件。
列表更新器,被DynamicServerListLoadBalancer
用于动态的更新服务列表。
// Server列表更新器。
public interface ServerListUpdater {
// 内部接口:函数式接口 实际上执行服务器列表更新的接口 也就是实际的动作
// 一般使用匿名内部类的形式实现
public interface UpdateAction {
void doUpdate();
}
// 使用给定的更新操作启动serverList更新程序这个调用应该是幂等的
void start(UpdateAction updateAction);
// 停止服务器列表更新程序。这个调用应该是幂等的
void stop();
// ============下面是一些获取执行过程中的信息方法==============
// 最后更新的时间Date的String表示形式
String getLastUpdate();
// 自上次更新以来已经过的ms数
long getDurationSinceLastUpdateMs();
//错过更新周期的数量(如果有的话)
int getNumberMissedCycles();
// 使用的线程数
int getCoreThreads();
}
从接口方法中能看出,它会通过任务调度去定时实现更新操作。所以它有个唯一实现子类:PollingServerListUpdater
。
动态服务器列表更新器要更新的默认实现,使用一个任务调度器ScheduledThreadPoolExecutor
完成定时更新。
它本作为
DynamicServerListLoadBalancer
的一个内部类,现单独拿出来成为一个public的类了
public class PollingServerListUpdater implements ServerListUpdater {
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000;
private final AtomicBoolean isActive = new AtomicBoolean(false);
private volatile long lastUpdated = System.currentTimeMillis();
private final long initialDelayMs;
private final long refreshIntervalMs;
// 继承自Futrue,在Futrue的基础上增加getDelay(TimeUnit unit)方法:还有多久执行任务
// ScheduledExecutorService提交任务时返回它
// ScheduledThreadPoolExecutor是带有线程池功能的执行器,实现了接口ScheduledExecutorService
private volatile ScheduledFuture<?> scheduledFuture;
}
isActive
:标记当前Scheduled任务是否是活跃状态中(已经开启就活跃状态)lastUpdated
:任务调用执行一次updateAction.doUpdate()
后记录该时刻,表示最新的一次update的时间戳initialDelayMs
:线程池的initialDelay
参数。默认值是LISTOFSERVERS_CACHE_UPDATE_DELAY
也就是延迟1000ms开始执行refreshIntervalMs
:默认值是LISTOFSERVERS_CACHE_REPEAT_INTERVAL
也就是30s执行一次 ServerListRefreshInterval
,也就是说你可以这么配置它: <clientName>.ribbon.ServerListRefreshInterval = 60000
(当然一般配置一个全局的即可)scheduledFuture
:任务调度结果,方便做cancel()动作该类的任务调度器是通过自己的一个私有静态内部类LazyHolder
来内聚实现的:
PollingServerListUpdater.LazyHolder:
// 重点:core核心数是可**动态**可变的
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
// 使用静态代码块完成初始化
static {
// 注册一个callback以便能动态更新core核心数
int coreSize = poolSizeProp.get();
poolSizeProp.addCallback(new Runnable() {
@Override
public void run() {
_serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
}
});
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
...
}
对于线程调度线程池的初始化,此处的核心要点是:默认的coreSize数是2,但是你可以通过配置动态实现更改,所以当你的ClientName实例较多时,可适当的调高此数值。它的实现原理同ScheduledThreadPoolExectuorWithDynamicSize
PollingServerListUpdater:
// 构造器,为initialDelayMs/refreshIntervalMs两个参数赋值
public PollingServerListUpdater() {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}
// 从config里面拿值。对应的key是ServerListRefreshInterval
public PollingServerListUpdater(IClientConfig clientConfig) {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
}
public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
this.initialDelayMs = initialDelayMs;
this.refreshIntervalMs = refreshIntervalMs;
}
// 启动
@Override
public synchronized void start(final UpdateAction updateAction) {
// 保证原子性。如果已经启动了就啥都不做
if (isActive.compareAndSet(false, true)) {
//定时任务每次执行的Task
Runnable wrapperRunnable = () -> {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
return;
}
// 每次执行更新操作时,记录下时间戳
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
}
// 启动任务 默认30s执行一次
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable, initialDelayMs,
refreshIntervalMs, TimeUnit.MILLISECONDS
};
}
}
// 停止任务
@Override
public synchronized void stop() {
scheduledFuture.cancel(true);
}
// 为何返回String?返回Date不香吗?
@Override
public String getLastUpdate() {
return new Date(lastUpdated).toString();
}
// 距离上一次update更新已过去这么长时间了
@Override
public long getDurationSinceLastUpdateMs() {
return System.currentTimeMillis() - lastUpdated;
}
// 因为coreSize是动态可以配的,所以提供方法供以访问
@Override
public int getCoreThreads() {
if (isActive.get()) {
if (getRefreshExecutor() != null) {
return getRefreshExecutor().getCorePoolSize();
}
}
return 0;
}
...
得到一个PollingServerListUpdater
实例后,调用其start方法,便可实现定时的更新服务列表了,非常的方便。所以若你想要有一个定时的去更新服务列表的能力,可以使用此组件方便的实现。
它的唯一使用处在DynamicServerListLoadBalancer
里,它的实现动作是:ServerListUpdater.UpdateAction
ServerListUpdater.UpdateAction:
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
一句话解释:定时的更新服务器列表,这个更新动作是之前学过的两步:
ServerList
拿到所有的Server列表ServerListFilter
完成过滤@Test
public void fun10() throws InterruptedException {
ServerListUpdater serverListUpdater = new PollingServerListUpdater();
serverListUpdater.start(() -> {
int coreThreads = serverListUpdater.getCoreThreads();
String lastUpdate = serverListUpdater.getLastUpdate();
int numberMissedCycles = serverListUpdater.getNumberMissedCycles();
long durationSinceLastUpdateMs = serverListUpdater.getDurationSinceLastUpdateMs();
System.out.println("===========上次的执行时间是:" + lastUpdate);
System.out.println("自上次更新以来已经过的ms数:" + durationSinceLastUpdateMs);
System.out.println("线程核心数:" + coreThreads);
System.out.println("错过更新周期的数量:" + numberMissedCycles);
// .... 执行你对Server列表的更新动作,本处略
});
TimeUnit.SECONDS.sleep(500);
}
运行程序,控制台打印:
===========上次的执行时间是:Thu Mar 19 10:28:14 CST 2020
自上次更新以来已经过的ms数:30003
线程核心数:2
错过更新周期的数量:1
===========上次的执行时间是:Thu Mar 19 10:28:44 CST 2020
自上次更新以来已经过的ms数:30002
线程核心数:2
错过更新周期的数量:1
===========上次的执行时间是:Thu Mar 19 10:29:14 CST 2020
自上次更新以来已经过的ms数:30002
线程核心数:2
错过更新周期的数量:1
关于Ribbon的LoadBalancer五大组件之:ServerListUpdater服务列表更新器就先介绍到这,实际上最终实现的都是使用的PollingServerListUpdater
来实现定时更新。
至此,LoadBalancer
的五大组件已完成其四,剩下最为重要,也是相对较难的IRule了,下文继续予以详解。