在介绍完了围绕负载均衡器ILoadBalancer
的五大核心组件,以及其相关支持组件后,本篇终于来到整个负载均衡器,甚至是整个Ribbon的主菜:ILoadBalancer
的学习。
作为一个客户端负载均衡器,它最核心的资源便是一堆Server们,也叫服务列表。对于这些服务列表的获取、更新、维护、探活、选择等等都有相应的组件能够完成,而如何把这些组件组合在一起有条不紊的工作便是本文的主要内容。
ILoadBalancer
是整个Ribbon体系的重中之重,因为关于它的介绍分为两篇文章进行,分别介绍其子类的作用,以及辅以代码示例来理解。
定义软件负载均衡器操作接口,动态更新一组服务列表及根据指定算法从现有服务器列表中选择一个服务。
public interface ILoadBalancer {
// 初始化Server列表。当然后期你可以可以再添加
// 在某些情况下,你可能想给出更多的“权重”时 该方法有用
public void addServers(List<Server> newServers);
// 根据key从load balancer里面找到一个Server
// 大多时候太是委托给`IRule`去做
public Server chooseServer(Object key);
// 由负载均衡器的客户端调用,以通知服务器停机否则
// LB会认为它还活着,直到下一个Ping周期
// 也就说该方法可以手动调用,让Server停机
public void markServerDown(Server server);
// 该方法已过期,被下面两个方法代替
@Deprecated
public List<Server> getServerList(boolean availableOnly);
// 只有服务器是可访问的就返回
public List<Server> getReachableServers();
// 所有已知的服务器,包括可访问的和不可访问的。
public List<Server> getAllServers();
}
该接口主要做了以下的一些事情:
说明:Spring Cloud
下并没有新增该接口的实现类~
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup {
ALL,
STATUS_UP,
STATUS_NOT_UP
}
// 选择具体的服务实例,key为null,忽略key的条件判断
public Server chooseServer() {
return chooseServer(null);
}
// 定义了根据分组类型来获取不同的服务实例的列表。
public abstract List<Server> getServerList(ServerGroup serverGroup);
// 获得所属的LoadBalancerStats,它是LB的状态仓储,对负载均衡规则有很大作用
public abstract LoadBalancerStats getLoadBalancerStats();
}
抽象实现:对Server们使用ServerGroup进行了分组,并且新增了2个抽象方法,使得LB实现必须和LoadBalancerStats
绑定了。其它所有实现均为它的子类,也就是说所有的LB实现均是有LoadBalancerStats
的喽~
有了
LoadBalancerStats
就有知晓运行状态的能力,进而可以动态的感知到Server、Zone区域的负载等状况,最终做到更高效的负载均衡~
不做任何事的负载均衡实现,一般用于占位(然而貌似从没被用到过)。
public class NoOpLoadBalancer extends AbstractLoadBalancer {
@Override
public void addServers(List<Server> newServers) {
logger.info("addServers to NoOpLoadBalancer ignored");
}
@Override
public Server chooseServer(Object key) {
return null;
}
... // 省略其它方法,均为返回null
}
下面重点介绍BaseLoadBalancer
,非常重要,因此单独拉出来唠唠。
听这名字,就知道它是Ribbon负载均衡器的基础实现类(非抽象类),在该类中定义了很多关于负载均衡器相关的基础内容,对所有接口方法提供了实现。
实际使用中的所有负载均衡器均是BaseLoadBalancer
的子类,另外请注意:本类并非抽象类,也是可以直接使用的哦~
// 它也是个PrimeConnectionListener,在测试请求完成后会标记readyToServe=true
public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware {
private final static IRule DEFAULT_RULE = new RoundRobinRule();
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
private final static String DEFAULT_NAME = "default";
private final static String PREFIX = "LoadBalancer_";
protected IRule rule = DEFAULT_RULE;
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();
protected String name = DEFAULT_NAME;
protected Timer lbTimer = null;
protected int pingIntervalSeconds = 10;
protected int maxTotalPingTimeSeconds = 5;
protected Comparator<Server> serverComparator = new ServerComparator();
protected AtomicBoolean pingInProgress = new AtomicBoolean(false);
protected LoadBalancerStats lbStats;
private volatile Counter counter = Monitors.newCounter("LoadBalancer_ChooseServer");
private PrimeConnections primeConnections;
private volatile boolean enablePrimingConnections = false;
private IClientConfig config;
private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();
private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();
}
rule
:默认是RoundRobinRule
,可以通过set方法指定pingStrategy
:ping的策略,默认是SerialPingStrategy
-> 串行全ping List<Server> allServerList
:所有实例upServerList
:up实例(STATUS_NOT_UP
实例列表通过两者差集取出)name
:负载均衡器的名称,一般同ClientName,若没指定为defaultlbTimer
:启动PingTask
,定时使用IPing去检查Server的isAlive
状态的定时器pingIntervalSeconds
:默认30ping一次。可通过key:NFLoadBalancerPingInterval
配置maxTotalPingTimeSeconds
:每次ping的最长时间 NFLoadBalancerMaxTotalPingTime
这个key配置serverComparator
:按照Server的id排序,方便轮询时候输出,并无实际作用pingInProgress
:标志是否正在ping中,如果是就避免重复进入去ping,做无用功LoadBalancerStats lbStats
:默认是new LoadBalancerStats(name)
,你也可以set进来一个现存的,当然你可以通过配置 NFLoadBalancerStatsClassName
来配置,只不过默认值是com.netflix.loadbalancer.LoadBalancerStats
。在实际应用中,你很有可能去继承然后复写它,然后配置进来就使用你自己的啦 LoadBalancerStats#createServerStats
它获取一个ServerStats
写死的默认参数是很不合理的,若你想精确控制Server的状态收集,建议你复写此类,然后配置好使用你自己优化后的实现吧~counter
:servo
的计数器。略primeConnections
:启动连接器。用于初始检测Server的readyToServe
是否能够提供服务(默认是关闭的,见下面的开关)enablePrimingConnections
:默认值是false,可通过EnablePrimeConnections
这个key来开启IClientConfig config
:略ServerListChangeListener changeListeners
:当allServerList
里面的内容(总数or每个Server的状态属性等)发生变化时,会触发此监听器ServerStatusChangeListener serverStatusListeners
:但凡只要Server的isAlive状态发生了变化,就会触发此监听器。有如下2种情况可能会触发: markServerDown(Server)/markServerDown(String id)
的时候(该方法暂无任何显示调用处)所有成员属性的初始化均是构造器+initWithNiwsConfig()
配置的方式完成。
BaseLoadBalancer:
// 开启对所有Server IPing的定时任务
// 注意:PingTask它是去ping allServers所有的服务器,毕竟有些Server它还会活过来的
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing(); // 在任务还没启动前,先快速强制执行一把
}
public int getServerCount(boolean onlyAvailable) {
if (onlyAvailable) {
return upServerList.size();
} else {
return allServerList.size();
}
}
setupPingTask()
方法用于启动Ping任务,从而每30s会去ping一次Server,探活。
该方法是核心接口方法:LB所管理的服务列表均通过此方法添加进来。
BaseLoadBalancer:
// 将服务器列表添加到“allServer”列表;不验证唯一性
// 所以您可以通过添加更多的服务器来提供更大的共享一次
@Override
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
setServersList(newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
}
}
}
需要注意的是:因为使用List存储不会去重,并且该方法内部也不验证唯一性。所以你可以通过这种方式(比如一个Server实例多几次add操作)来增加其被选中的概率,但并不建议你这么干。
通过该方法添加进来的Server都会进入到allServerList
全部服务列表里面,对于这些“新”Server的初始化工作依赖于setServersList()
来完成:
针对此方法本身而言:它是set动作,所以具有覆盖性(完全替换掉原值)。
BaseLoadBalancer:
// 小细节:此处的List不带泛型,是因为它要接受List<Server>和List<String>这两种集合
public void setServersList(List lsrv) {
// 全部的
ArrayList<Server> allServers = new ArrayList<Server>();
for (Object server : lsrv) {
if (server instanceof String) {
server = new Server((String) server);
}
...
// 添加进所有
allServers.add((Server) server);
}
// 编辑列表的内容是否有变更 只要内容不一样(包括数量、属性等)就算变更了
boolean listChanged = false;
if (!allServerList.equals(allServers)) {
listChanged = true;
... // 若注册了监听器,就触发
l.serverListChanged(oldList, newList);
}
if (isEnablePrimingConnections()) {
// 它记录的是newServers,也就是“新添加进来的”
// 因为只有新添加进来的时候才需要执行首次链接测试:primeConnections.primeConnectionsAsync(newServers, this);
ArrayList<Server> newServers = new ArrayList<Server>();
... // 对比老的 筛选出哪些是新的后,进行连接检测
if (primeConnections != null) {
primeConnections.primeConnectionsAsync(newServers, this);
}
}
// 一切处理好后:全面覆盖旧值
allServerList = allServers;
if (canSkipPing()) { // 如果不需要ping,那么每台Server都是活的,永远死不了
for (Server s : allServerList) {
s.setAlive(true);
}
upServerList = allServerList;
} else if (listChanged) { // 若Server发生了变化,才需要立马触发ping呗,否则也没有必要
forceQuickPing(); // 它里面会对upServerList重新赋值(值存储活的)
}
}
该方法完成Server列表的“初始化”逻辑:
allServerList
DummyPing
这个ping,那么upServerList
和allServerList
永远是一样的因为所有的Server交给ILoadBalancer
来管理都是通过addServers()
添加进来的,所以必经setServersList()
该方法完成初始化~
针对此问题,此处小总结一下:
add/addAll/remove
等方式来改变其值,仅能通过全覆盖的方式Pinger#runPinger
:基于allServerList对没台Server完成ping操作,所以它只会改变upServerList的值(isAlive=true才属于up)setServersList()
:它会用新set进来的对allServerList
全覆盖,并且完成对没台Server的初始化,包括识别出upServerList
(这种识别其实依赖也是上面一样的ping操作)综上可知,upServerList
的值有且仅是把allServerList
经过IPing处理后,若isAlive=true
就属于这个行列了。因此若你没指定IPing策略或者是默认的DummyPing
,那么它它哥俩就永远相等(认为所有的Server均永远可用~)。
@Test
public void fun1() {
List<Server> serverList = new ArrayList<>();
serverList.add(createServer("华南", 1));
serverList.add(createServer("华东", 1));
serverList.add(createServer("华东", 2));
serverList.add(createServer("华北", 1));
serverList.add(createServer("华北", 2));
serverList.add(createServer("华北", 3));
serverList.add(createServer("华北", 4));
ILoadBalancer lb = new BaseLoadBalancer();
lb.addServers(serverList);
// 把华北的机器都标记为down掉
LoadBalancerStats loadBalancerStats = ((BaseLoadBalancer) lb).getLoadBalancerStats();
loadBalancerStats.updateServerList(serverList); // 这一步不能省哦~~~
loadBalancerStats.getServerStats().keySet().forEach(server -> {
if (server.getHost().contains("华北")) {
lb.markServerDown(server);
}
});
for (int i = 0; i < 5; i++) {
System.out.println(lb.chooseServer(null));
}
}
private Server createServer(String zone, int index) {
Server server = new Server("www.baidu" + zone + ".com", index);
server.setZone(zone);
return server;
}
运行程序,控制台打印:
www.baidu华东.com:1
www.baidu华东.com:2
www.baidu华南.com:1
www.baidu华东.com:1
www.baidu华东.com:2
可以看到木有华北的Server出现了,完美~
关于Ribbon负载均衡器ILoadBalancer(一):BaseLoadBalancer就先介绍到这。本文重点讲述了它的基础实现BaseLoadBalancer
,虽然它也并非抽象类,但是实际应用中并不会使用,毕竟它功能偏弱,比如它没解决如下重要问题:
allListServer/upListServer
服务列表无法做到动态化(如我突然注册了一个新的实例,希望能够动态被发现放进来)带着这些疑问,下篇文章介绍其更强子类,会逐个解决这些问题。