上文已对LoadBalancerCommand
的基础类进行了打点,给本文内容做了一定支撑。本文就进入到负载均衡命令的主菜,LoadBalancerCommand
用于提交任务,执行目标方法。
因为Ribbon对目标请求的执行采用的也是命令模式,因此本文的重要性也不可忽视,特别是理解它的重试机制,处理得非常巧妙,值的学习和考究。
Ribbon对请求的执行依旧采用的命令模式,一个LoadBalancerCommand
实例代表着一个请求,它管控着所有的执行流程:包括Server的选择、重试处理、结果处理等。
因为LoadBalancerCommand
提交任务时多次用到了concatMap()
这个操作符的特性,因此此处做个使用示例:
@Test
public void fun1() throws InterruptedException {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(i -> Observable.just(i).delay(10, TimeUnit.MILLISECONDS))
.subscribe(d -> System.out.println(d));
System.out.println("----------end-----------");
TimeUnit.SECONDS.sleep(2);
}
运行程序,控制台输出:
----------end-----------
1
5
9
3
7
2
6
4
8
可以看到它是完全无序的。为了测试无需本例延迟10ms再发射下个数据,否则可能效果不那么明显~
那么如果你想要保证数据发射的顺序,那就请使用concatMap()
方法吧:
@Test
public void fun2() throws InterruptedException {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.concatMap(i -> Observable.just(i).delay(10, TimeUnit.MILLISECONDS))
.subscribe(d -> System.out.println(d));
System.out.println("----------end-----------");
TimeUnit.SECONDS.sleep(2);
}
运行程序,控制台输出:
----------end-----------
1
2
3
4
5
6
7
8
9
数据发射完全有序。说明:end先打印了是合理的,毕竟人家是异步执行的~
在介绍完了前置知识后,下面来到本文主菜:LoadBalancerCommand
负载均衡命令。熟悉的Command
命令模式有木有,它用于用于从负载均衡器执行中生成可观察对象Observable<T>
。主要负责完成如下事情:
ServerOperation#call(server)
方法得到一个Observable<T>
结果ExecutionListener
,会执行监听器们RetryHandler
对发生异常是会进行重试LoadBalancerStats
负载结果对象提供指标反馈// 泛型T表示`Observable<T>`这个类型 也就是output类型
public class LoadBalancerCommand<T> {
private final URI loadBalancerURI;
private final Object loadBalancerKey;
private final LoadBalancerContext loadBalancerContext;
private final RetryHandler retryHandler;
private volatile ExecutionInfo executionInfo;
private final Server server;
private final ExecutionContextListenerInvoker<?, T> listenerInvoker;
}
loadBalancerURI
:请求的URI。作为original
原始uri去负载均衡器里获取一个ServerloadBalancerKey
:用于去负载均衡器获取一个ServerloadBalancerContext
:负载均衡器上下文。提供执行过程中各种组件的访问和获取,如: loadBalancerContext.getServerFromLoadBalancer()
获取一台ServerloadBalancerContext.getServerStats(server)
:得到Server的状态信息loadBalancerContext.noteOpenConnection(stats) / noteRequestCompletion()
:收集stats信息retryHandler
:重试处理器。若构建时没有指定,就会选用loadBalancerContext
里的。它负载完成IClient执行时的重试操作executionInfo
:略server
:若构建时传入了server就使用这台Server执行。否则交给负载均衡器自己去选择ExecutionContextListenerInvoker listenerInvoker
:负责各个执行阶段中监听器的执行,比较简单以上大都为执行时的必须参数,由IClient
在执行时构建进来。而给这些属性赋值有且仅能采用Builder模式:
LoadBalancerCommand:
public static <T> Builder<T> builder() {
return new Builder<T>();
}
public static class Builder<T> {
private RetryHandler retryHandler;
private ILoadBalancer loadBalancer;
private IClientConfig config;
private LoadBalancerContext loadBalancerContext;
private List<? extends ExecutionListener<?, T>> listeners;
private Object loadBalancerKey;
private ExecutionContext<?> executionContext;
private ExecutionContextListenerInvoker invoker;
private URI loadBalancerURI;
private Server server;
...
public LoadBalancerCommand<T> build() {
...
return new LoadBalancerCommand<T>(this);
}
}
返回一个仅发出单个请求服务器的观察对象,仅仅发射一个数据,该数据通过负载均衡器、负载均衡算法选出来。
选择Server:
LoadBalancerCommand:
private Observable<Server> selectServer() {
return Observable.create((Subscriber<? super Server> next) -> {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
});
}
根据负载均衡算法选择一台合适的Server,依赖于LoadBalancerContext#getServerFromLoadBalancer()
实现。
重试策略:
LoadBalancerCommand:
// same:是否是同一台Server
// true:在当前这台Server上还能否重试(受MaxAutoRetries控制)
// false:换一台Server还能否重试(受MaxAutoRetriesNextServer控制)
private Func2<Integer, Throwable, Boolean> retryPolicy(int maxRetrys, boolean same) {
return (Integer tryCount, Throwable e) -> {
if (e instanceof AbortExecutionException)
return false;
if (tryCount > maxRetrys)
return false;
if (e.getCause() != null && e instanceof RuntimeException)
e = e.getCause();
return retryHandler.isRetriableException(e, same);
};
}
该策略描述得非常抽象,通过配置的最大重试次数和当前异常类型对每次请求进行判断:
AbortExecutionException
类型,那啥都不说了,不要再重试了 AbortExecutionException
异常类型 是ribbon自定义的类型,在ExecutionListener
监听器执行时可能会抛出retryHandler
去判断,让它来决定你的这个异常类型是否应该重试吧 RetryHandler
的详细判断逻辑请参见:四十、Ribbon核心API源码解析:ribbon-core(三)RetryHandler重试处理器LoadBalancerCommand
有且仅提供一个public方法可供外部调用(builder方法除外):submit(ServerOperation<T> operation)
提交方法。作用是:执行目标action(也就是Client发送请求喽)。
说明:本处源码多次用到了汶上示例的RxJava的中的concatMap操作符,请予以理解
外层逻辑通过负载均衡算法选出一台Server,并且若开启重试参数的话,在不同Server之间进行重试。
LoadBalancerCommand:
public Observable<T> submit(ServerOperation<T> operation) {
// 每次执行开始,就创建一个执行info的上下文,用于记录有用信息
ExecutionInfoContext context = new ExecutionInfoContext();
... // 执行监听器
// 这两个参数对重试策略非常重要,默认
// MaxAutoRetries:0 在单台机器上不重试
// MaxAutoRetriesNextServer:1 最大向下试一一台机器
int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer 使用负载均衡执行
// 若你指定了server就用指定的,否则通过lb去根据负载均衡策略选择一台Server出来
Observable<T> o = (server == null ? selectServer() : Observable.just(server));
... // 这部分代码请参见步骤2
// 内部决定每台Server去重试多少次 所以这里控制的去重试多少台Server
// 说明:第一台Server也不计入在内
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
// 当最终重试都还不行时,仍旧还抛错,就会触发此函数
return o.onErrorResumeNext((Throwable e) -> {
// 执行过(并不能说重试过)
// 只要执行过,就得看看是啥异常呢,到底是重试不够还是咋滴
if (context.getAttemptCount() > 0) {
// 重试的机器数超过了maxRetrysNext的值时,抛出此异常
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e);
}
// 可能maxRetrysNext=0,由单台机器重试次数的异常
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e);
}
}
... // 执行监听器,final。onExecutionFailed
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
});
}
maxRetrysNext
它控制的是请求级别的(切换不同Server)的重试次数。本步骤属于最外层逻辑:
maxRetrysNext
请求级别的重试(默认值是1),就继续重复再找一台Server(至于是哪台就看LB策略喽),再试一次Server执行请求本步骤属于最外层控制,但其实它还有针对同一Server更精细化的重试策略,这就是下面这个步骤所完成的内容。
本步骤讲述的是在同一Server下,加上重试策略来执行目标请求。
LoadBalancerCommand:
public Observable<T> submit(ServerOperation<T> operation) {
...
Observable<T> o = 选出来的Server实例;
// 针对选出来的实例(同一台),执行concatMap里面的操作(Server级别重试)
o.concatMap(server -> {
context.setServer(server); // 记录下当前Server到上下文
ServerStats stats = loadBalancerContext.getServerStats(server); // 拿到此Server所属的状态stats
//这一步的目的是:为同一台Server绑定上重试机制
// 此处用oo表示,代表内层逻辑
Observable<T> oo = Observable.just(server);
oo.concatMap(server -> {
context.incAttemptCount(); // 尝试总数+1
loadBalancerContext.noteOpenConnection(stats); // 开启连接
... // 触发监听器onStartWithServer回调。上面是调用,这里是马上要执行了
// call执行目标方法:也就是发送execute发送请求
// call执行目标方法:也就是发送execute发送请求
// call执行目标方法:也就是发送execute发送请求
// doOnEach:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted
// 这里的doOnEach主要是为了触发监听器行为
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onNext(T entity) {
this.entity = entity;
... // 触发监听器的onExecutionSuccess方法。entity就是response
}
... // 省略onCompleted/onError等方法
};
});
// 绑定针对同一Server实例的重试策略,所以第二参数传true表示在同一实例上
// 注意:这里使用的是oo,是内层重试逻辑
if (maxRetrysSame > 0) {
oo = oo.retry(retryPolicy(maxRetrysSame, true));
}
return o;
});
...
}
maxRetrysSame >0
就绑定上针对同一个Server实例的重试策略@Test
public void fun1() {
List<Server> serverList = new ArrayList<>();
serverList.add(createServer("华南", 1));
serverList.add(createServer("华东", 1));
serverList.add(createServer("华东", 2));
serverList.add(createServer("华北", 1));
serverList.add(createServer("华北", 2));
serverList.add(createServer("华北", 3));
serverList.add(createServer("华北", 4));
BaseLoadBalancer lb = new BaseLoadBalancer();
lb.addServers(serverList);
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues("YourBatman");
// 通过API方式设置重试次数
config.set(CommonClientConfigKey.MaxAutoRetries, 2);
config.set(CommonClientConfigKey.MaxAutoRetriesNextServer, 5);
LoadBalancerContext loadBalancerContext = new LoadBalancerContext(lb, config);
// 构建一个执行命令command
// 说明:案例中请使用不含host的URI,因为有host的情况下将不会再使用ILoadBalancer去选择Server
// (当然你若要它使用也行,就请配置vipAddress吧)
URI original = URI.create("");
// URI original = URI.create("http://account:3333");
LoadBalancerCommand<String> command = LoadBalancerCommand.<String>builder()
.withClientConfig(config)
.withLoadBalancerContext(loadBalancerContext)
.withLoadBalancerURI(original)
// 自定义一个重拾器,让NPE也能触发异常 配置使用config的
.withRetryHandler(new DefaultLoadBalancerRetryHandler(config) {
@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
boolean result = super.isRetriableException(e, sameServer);
return result || e instanceof NullPointerException;
}
})
// 注册一个监听器,监听执行的过程
// .withListeners(Collections.singletonList(...))
.build();
// 执行目标方法/操作
// 记录总重试次数
AtomicInteger totalRetry = new AtomicInteger();
Observable<String> submit = command.submit(server -> {
System.out.println("第[" + totalRetry.incrementAndGet() + "]次发送请求,使用的Server是:" + server);
// 模拟执行时出现异常(请注意:NPE等业务异常并不会触发重试~~~~~)
// System.out.println(1 / 0);
Integer i = null;
System.out.println(i.toString());
return Observable.just("hello success!!!");
});
// 监听且打印结果
submit.doOnError(throwable -> System.out.println("执行失败,异常:" + throwable.getClass()))
.subscribe(d -> System.out.println("执行成功,结果:" + d));
}
private Server createServer(String zone, int index) {
Server server = new Server("www.baidu" + zone + ".com", index);
server.setZone(zone);
return server;
}
再次运行,控制台打印:
第[1]次发送请求,使用的Server是:www.baidu华东.com:1
第[2]次发送请求,使用的Server是:www.baidu华东.com:1
第[3]次发送请求,使用的Server是:www.baidu华东.com:1
16:17:13.041 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华东.com:2 for request
第[4]次发送请求,使用的Server是:www.baidu华东.com:2
第[5]次发送请求,使用的Server是:www.baidu华东.com:2
第[6]次发送请求,使用的Server是:www.baidu华东.com:2
16:17:13.043 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华北.com:1 for request
第[7]次发送请求,使用的Server是:www.baidu华北.com:1
第[8]次发送请求,使用的Server是:www.baidu华北.com:1
第[9]次发送请求,使用的Server是:www.baidu华北.com:1
16:17:13.044 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华北.com:2 for request
第[10]次发送请求,使用的Server是:www.baidu华北.com:2
第[11]次发送请求,使用的Server是:www.baidu华北.com:2
第[12]次发送请求,使用的Server是:www.baidu华北.com:2
16:17:13.044 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华北.com:3 for request
第[13]次发送请求,使用的Server是:www.baidu华北.com:3
第[14]次发送请求,使用的Server是:www.baidu华北.com:3
第[15]次发送请求,使用的Server是:www.baidu华北.com:3
16:17:13.045 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华北.com:4 for request
第[16]次发送请求,使用的Server是:www.baidu华北.com:4
第[17]次发送请求,使用的Server是:www.baidu华北.com:4
第[18]次发送请求,使用的Server是:www.baidu华北.com:4
执行失败,异常:class com.netflix.client.ClientException
手动跑该示例时需要注意:
URI
请构造一个木有host的,因为你自己有host的话,LB默认就不会管你了(毕竟你都自己有了嘛),除非你配置vipAddresses(原理参考:五十五、Ribbon负载均衡器执行上下文:LoadBalancerContext)ConnectException/SocketTimeoutException
等异常才会重试的,本文扩展了一下:让其NPE异常也触发重试(1 + MaxAutoRetries) * (1 + MaxAutoRetriesNextServer)
,完全符合预期 关于Ribbon负载均衡命令:LoadBalancerCommand(二)执行目标方法内容就先介绍到着,本篇内容丰富,特别是详细讲解了Ribbon对重试机制的实现,解释了很多小伙伴解释不清的:MaxAutoRetries
和MaxAutoRetriesNextServer
的区别和联系。
LoadBalancerCommand
巧妙的利用了RxJava实现了优雅的重试机制,这种编码风格确实有很多值的学习之处,但是最为重要的依旧是对Ribbon整个执行流程的把控,以及如何通过钩子监听到执行全过程,这在生产上讲非常具有实战意义。