前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 五十九、Ribbon负载均衡命令:LoadBalancerCommand(二)执行目标请求

[享学Netflix] 五十九、Ribbon负载均衡命令:LoadBalancerCommand(二)执行目标请求

作者头像
YourBatman
发布2020-03-24 10:43:19
1.9K0
发布2020-03-24 10:43:19
举报

代码下载地址:https://github.com/f641385712/netflix-learning

前言

上文已对LoadBalancerCommand的基础类进行了打点,给本文内容做了一定支撑。本文就进入到负载均衡命令的主菜,LoadBalancerCommand用于提交任务,执行目标方法。

因为Ribbon对目标请求的执行采用的也是命令模式,因此本文的重要性也不可忽视,特别是理解它的重试机制,处理得非常巧妙,值的学习和考究。


正文

Ribbon对请求的执行依旧采用的命令模式,一个LoadBalancerCommand实例代表着一个请求,它管控着所有的执行流程:包括Server的选择、重试处理、结果处理等。


RxJava的concatMap()方法使用示例

因为LoadBalancerCommand提交任务时多次用到了concatMap()这个操作符的特性,因此此处做个使用示例:

@Test
public void fun1() throws InterruptedException {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .flatMap(i -> Observable.just(i).delay(10, TimeUnit.MILLISECONDS))
            .subscribe(d -> System.out.println(d));

    System.out.println("----------end-----------");
    TimeUnit.SECONDS.sleep(2);
}

运行程序,控制台输出:

----------end-----------
1
5
9
3
7
2
6
4
8

可以看到它是完全无序的。为了测试无需本例延迟10ms再发射下个数据,否则可能效果不那么明显~

那么如果你想要保证数据发射的顺序,那就请使用concatMap()方法吧:

@Test
public void fun2() throws InterruptedException {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .concatMap(i -> Observable.just(i).delay(10, TimeUnit.MILLISECONDS))
            .subscribe(d -> System.out.println(d));

    System.out.println("----------end-----------");
    TimeUnit.SECONDS.sleep(2);
}

运行程序,控制台输出:

----------end-----------
1
2
3
4
5
6
7
8
9

数据发射完全有序。说明:end先打印了是合理的,毕竟人家是异步执行的~


LoadBalancerCommand 负载均衡命令

在介绍完了前置知识后,下面来到本文主菜:LoadBalancerCommand负载均衡命令。熟悉的Command命令模式有木有,它用于用于从负载均衡器执行中生成可观察对象Observable<T>。主要负责完成如下事情:

  1. 选中一个Server(最核心的逻辑,通过负载均衡器完成选择)
  2. 执行ServerOperation#call(server)方法得到一个Observable<T>结果
  3. 若有ExecutionListener,会执行监听器们
  4. 借助RetryHandler对发生异常是会进行重试
  5. LoadBalancerStats负载结果对象提供指标反馈

成员属性
// 泛型T表示`Observable<T>`这个类型  也就是output类型
public class LoadBalancerCommand<T> {

    private final URI    loadBalancerURI;
    private final Object loadBalancerKey;
    private final LoadBalancerContext loadBalancerContext;
    private final RetryHandler retryHandler;
    private volatile ExecutionInfo executionInfo;
    private final Server server;
    private final ExecutionContextListenerInvoker<?, T> listenerInvoker;
}
  • loadBalancerURI:请求的URI。作为original原始uri去负载均衡器里获取一个Server
  • loadBalancerKey:用于去负载均衡器获取一个Server
  • loadBalancerContext:负载均衡器上下文。提供执行过程中各种组件的访问和获取,如:
    • loadBalancerContext.getServerFromLoadBalancer()获取一台Server
    • loadBalancerContext.getServerStats(server):得到Server的状态信息
    • loadBalancerContext.noteOpenConnection(stats) / noteRequestCompletion():收集stats信息
  • retryHandler:重试处理器。若构建时没有指定,就会选用loadBalancerContext里的。它负载完成IClient执行时的重试操作
  • executionInfo:略
  • server:若构建时传入了server就使用这台Server执行。否则交给负载均衡器自己去选择
  • ExecutionContextListenerInvoker listenerInvoker:负责各个执行阶段中监听器的执行,比较简单

以上大都为执行时的必须参数,由IClient在执行时构建进来。而给这些属性赋值有且仅能采用Builder模式:

LoadBalancerCommand:
	
    public static <T> Builder<T> builder() {
        return new Builder<T>();
    }
	public static class Builder<T> {
        private RetryHandler        retryHandler;
        private ILoadBalancer       loadBalancer;
        private IClientConfig       config;
        private LoadBalancerContext loadBalancerContext;
        private List<? extends ExecutionListener<?, T>> listeners;
        private Object              loadBalancerKey;
        private ExecutionContext<?> executionContext;
        private ExecutionContextListenerInvoker invoker;
        private URI                 loadBalancerURI;
        private Server              server;
        ... 
		public LoadBalancerCommand<T> build() {
			...
			return new LoadBalancerCommand<T>(this);
		}
	}

成员方法

返回一个仅发出单个请求服务器的观察对象,仅仅发射一个数据,该数据通过负载均衡器、负载均衡算法选出来。

选择Server:

LoadBalancerCommand:

	private Observable<Server> selectServer() {
		return Observable.create((Subscriber<? super Server> next) -> {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
		});
	}

根据负载均衡算法选择一台合适的Server,依赖于LoadBalancerContext#getServerFromLoadBalancer()实现。

重试策略:

LoadBalancerCommand:
	
	// same:是否是同一台Server
	// true:在当前这台Server上还能否重试(受MaxAutoRetries控制)
	// false:换一台Server还能否重试(受MaxAutoRetriesNextServer控制)
	private Func2<Integer, Throwable, Boolean> retryPolicy(int maxRetrys, boolean same) {
		return (Integer tryCount, Throwable e) -> {
            if (e instanceof AbortExecutionException)
                return false;
            if (tryCount > maxRetrys)
                return false;
            if (e.getCause() != null && e instanceof RuntimeException)
                e = e.getCause();
            
            return retryHandler.isRetriableException(e, same);
		};
	}

该策略描述得非常抽象,通过配置的最大重试次数当前异常类型对每次请求进行判断:

  1. 若异常类型是AbortExecutionException类型,那啥都不说了,不要再重试了
    1. AbortExecutionException异常类型 是ribbon自定义的类型,在ExecutionListener监听器执行时可能会抛出
    2. so,可以通过监听器的方式,认为的控制、干预目标方法的执行~
  2. 若当前重试总此处已经超过了最大次数,那还有什么好说的呢,拒绝再次重试呗
  3. 若1,2都不满足,那就交给retryHandler去判断,让它来决定你的这个异常类型是否应该重试吧
    1. 关于RetryHandler的详细判断逻辑请参见:四十、Ribbon核心API源码解析:ribbon-core(三)RetryHandler重试处理器

submit(ServerOperation operation) 提交方法

LoadBalancerCommand有且仅提供一个public方法可供外部调用(builder方法除外):submit(ServerOperation<T> operation)提交方法。作用是:执行目标action(也就是Client发送请求喽)。

说明:本处源码多次用到了汶上示例的RxJava的中的concatMap操作符,请予以理解


1、外层逻辑(不同Server间重试)

外层逻辑通过负载均衡算法选出一台Server,并且若开启重试参数的话,在不同Server之间进行重试。

LoadBalancerCommand:

	public Observable<T> submit(ServerOperation<T> operation) {
		// 每次执行开始,就创建一个执行info的上下文,用于记录有用信息
		ExecutionInfoContext context = new ExecutionInfoContext();
		... // 执行监听器
	
		// 这两个参数对重试策略非常重要,默认
		// MaxAutoRetries:0  在单台机器上不重试
		// MaxAutoRetriesNextServer:1 最大向下试一一台机器
        int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

		// Use the load balancer 使用负载均衡执行
		// 若你指定了server就用指定的,否则通过lb去根据负载均衡策略选择一台Server出来
		Observable<T> o = (server == null ? selectServer() : Observable.just(server));
		
		... // 这部分代码请参见步骤2
		
		// 内部决定每台Server去重试多少次  所以这里控制的去重试多少台Server
		// 说明:第一台Server也不计入在内
		if (maxRetrysNext > 0 && server == null)
			o = o.retry(retryPolicy(maxRetrysNext, false));
		
		// 当最终重试都还不行时,仍旧还抛错,就会触发此函数
		return o.onErrorResumeNext((Throwable e) -> {
			// 执行过(并不能说重试过)
			// 只要执行过,就得看看是啥异常呢,到底是重试不够还是咋滴
			if (context.getAttemptCount() > 0) { 

				// 重试的机器数超过了maxRetrysNext的值时,抛出此异常
				if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e);
				} 
				// 可能maxRetrysNext=0,由单台机器重试次数的异常
				else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e);
				}

			}
			... // 执行监听器,final。onExecutionFailed
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
			return Observable.error(e);
		});
	}

maxRetrysNext它控制的是请求级别的(切换不同Server)的重试次数。本步骤属于最外层逻辑:

  1. 选择一台Server执行请求,若抛错了,进入重试策略
  2. 若配置了maxRetrysNext请求级别的重试(默认值是1),就继续重复再找一台Server(至于是哪台就看LB策略喽),再试一次Server执行请求
  3. 只要在重试范畴内,任何一次成功了,就把执行的结果返回。否则(重试范围内都没成功)就抛出对应的异常错误~

本步骤属于最外层控制,但其实它还有针对同一Server更精细化的重试策略,这就是下面这个步骤所完成的内容。


2、内层逻辑(同一Server内重试)

本步骤讲述的是在同一Server下,加上重试策略来执行目标请求。

LoadBalancerCommand:

	public Observable<T> submit(ServerOperation<T> operation) {
		...
		Observable<T> o = 选出来的Server实例;
		// 针对选出来的实例(同一台),执行concatMap里面的操作(Server级别重试)
		o.concatMap(server -> {
			context.setServer(server); // 记录下当前Server到上下文
			ServerStats stats = loadBalancerContext.getServerStats(server); // 拿到此Server所属的状态stats
			
			//这一步的目的是:为同一台Server绑定上重试机制
			// 此处用oo表示,代表内层逻辑
			Observable<T> oo = Observable.just(server);
			oo.concatMap(server -> {
				context.incAttemptCount(); // 尝试总数+1
				loadBalancerContext.noteOpenConnection(stats); // 开启连接
				... // 触发监听器onStartWithServer回调。上面是调用,这里是马上要执行了

				// call执行目标方法:也就是发送execute发送请求
				// call执行目标方法:也就是发送execute发送请求
				// call执行目标方法:也就是发送execute发送请求
				// doOnEach:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted
				// 这里的doOnEach主要是为了触发监听器行为
				return operation.call(server).doOnEach(new Observer<T>() {
					private T entity;
                    @Override
                    public void onNext(T entity) {
                        this.entity = entity;
                        ... // 触发监听器的onExecutionSuccess方法。entity就是response
                    }
                    ... // 省略onCompleted/onError等方法
				};
			});
		
			// 绑定针对同一Server实例的重试策略,所以第二参数传true表示在同一实例上
			// 注意:这里使用的是oo,是内层重试逻辑
			if (maxRetrysSame > 0) {
				oo = oo.retry(retryPolicy(maxRetrysSame, true));
			}
			return o;
		});
		...
	}
  1. 针对选定的Server实例,准备执行目标请求。其中记录好执行上下文信息、和该Server绑定的ServerStats信息…
  2. maxRetrysSame >0就绑定上针对同一个Server实例的重试策略
  3. 执行目标请求,若失败了进行重试,知道重试此处到了 or 成功了为止

代码示例

@Test
public void fun1() {
    List<Server> serverList = new ArrayList<>();
    serverList.add(createServer("华南", 1));
    serverList.add(createServer("华东", 1));
    serverList.add(createServer("华东", 2));

    serverList.add(createServer("华北", 1));
    serverList.add(createServer("华北", 2));
    serverList.add(createServer("华北", 3));
    serverList.add(createServer("华北", 4));

    BaseLoadBalancer lb = new BaseLoadBalancer();
    lb.addServers(serverList);
    IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues("YourBatman");
    // 通过API方式设置重试次数
    config.set(CommonClientConfigKey.MaxAutoRetries, 2);
    config.set(CommonClientConfigKey.MaxAutoRetriesNextServer, 5);

    LoadBalancerContext loadBalancerContext = new LoadBalancerContext(lb, config);


    // 构建一个执行命令command
    // 说明:案例中请使用不含host的URI,因为有host的情况下将不会再使用ILoadBalancer去选择Server
    // (当然你若要它使用也行,就请配置vipAddress吧)
    URI original = URI.create("");
    // URI original = URI.create("http://account:3333");
    LoadBalancerCommand<String> command = LoadBalancerCommand.<String>builder()
            .withClientConfig(config)
            .withLoadBalancerContext(loadBalancerContext)
            .withLoadBalancerURI(original)
            // 自定义一个重拾器,让NPE也能触发异常  配置使用config的
            .withRetryHandler(new DefaultLoadBalancerRetryHandler(config) {
                @Override
                public boolean isRetriableException(Throwable e, boolean sameServer) {
                    boolean result = super.isRetriableException(e, sameServer);
                    return result || e instanceof NullPointerException;
                }
            })
            // 注册一个监听器,监听执行的过程
            // .withListeners(Collections.singletonList(...))
            .build();

    // 执行目标方法/操作
    // 记录总重试次数
    AtomicInteger totalRetry = new AtomicInteger();
    Observable<String> submit = command.submit(server -> {
        System.out.println("第[" + totalRetry.incrementAndGet() + "]次发送请求,使用的Server是:" + server);

        // 模拟执行时出现异常(请注意:NPE等业务异常并不会触发重试~~~~~)
        // System.out.println(1 / 0);
        Integer i = null;
        System.out.println(i.toString());

        return Observable.just("hello success!!!");
    });

    // 监听且打印结果
    submit.doOnError(throwable -> System.out.println("执行失败,异常:" + throwable.getClass()))
            .subscribe(d -> System.out.println("执行成功,结果:" + d));

}

private Server createServer(String zone, int index) {
    Server server = new Server("www.baidu" + zone + ".com", index);
    server.setZone(zone);
    return server;
}

再次运行,控制台打印:

第[1]次发送请求,使用的Server是:www.baidu华东.com:1
第[2]次发送请求,使用的Server是:www.baidu华东.com:1
第[3]次发送请求,使用的Server是:www.baidu华东.com:1
16:17:13.041 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华东.com:2 for request 
第[4]次发送请求,使用的Server是:www.baidu华东.com:2
第[5]次发送请求,使用的Server是:www.baidu华东.com:2
第[6]次发送请求,使用的Server是:www.baidu华东.com:2
16:17:13.043 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华北.com:1 for request 
第[7]次发送请求,使用的Server是:www.baidu华北.com:1
第[8]次发送请求,使用的Server是:www.baidu华北.com:1
第[9]次发送请求,使用的Server是:www.baidu华北.com:1
16:17:13.044 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华北.com:2 for request 
第[10]次发送请求,使用的Server是:www.baidu华北.com:2
第[11]次发送请求,使用的Server是:www.baidu华北.com:2
第[12]次发送请求,使用的Server是:www.baidu华北.com:2
16:17:13.044 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华北.com:3 for request 
第[13]次发送请求,使用的Server是:www.baidu华北.com:3
第[14]次发送请求,使用的Server是:www.baidu华北.com:3
第[15]次发送请求,使用的Server是:www.baidu华北.com:3
16:17:13.045 [main] DEBUG com.netflix.loadbalancer.LoadBalancerContext - YourBatman using LB returned Server: www.baidu华北.com:4 for request 
第[16]次发送请求,使用的Server是:www.baidu华北.com:4
第[17]次发送请求,使用的Server是:www.baidu华北.com:4
第[18]次发送请求,使用的Server是:www.baidu华北.com:4
执行失败,异常:class com.netflix.client.ClientException

手动跑该示例时需要注意:

  1. URI请构造一个木有host的,因为你自己有host的话,LB默认就不会管你了(毕竟你都自己有了嘛),除非你配置vipAddresses(原理参考:五十五、Ribbon负载均衡器执行上下文:LoadBalancerContext
  2. 程序出现异常时,默认情况下只有ConnectException/SocketTimeoutException等异常才会重试的,本文扩展了一下:让其NPE异常也触发重试
  3. 请求执行总数18次 = (1 + 2) * (1 + 6),即(1 + MaxAutoRetries) * (1 + MaxAutoRetriesNextServer),完全符合预期
    1. 至于为何前面都加上1,请读者自行思考~~~
    2. 同时重试规律掌握了,那么对于一个请求,总的等待时机你应该会计算了吧???这将影响到Hystrix的超时时长的配置哦~~~~

总结

关于Ribbon负载均衡命令:LoadBalancerCommand(二)执行目标方法内容就先介绍到着,本篇内容丰富,特别是详细讲解了Ribbon对重试机制的实现,解释了很多小伙伴解释不清的:MaxAutoRetriesMaxAutoRetriesNextServer的区别和联系。

LoadBalancerCommand巧妙的利用了RxJava实现了优雅的重试机制,这种编码风格确实有很多值的学习之处,但是最为重要的依旧是对Ribbon整个执行流程的把控,以及如何通过钩子监听到执行全过程,这在生产上讲非常具有实战意义。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 正文
    • RxJava的concatMap()方法使用示例
      • LoadBalancerCommand 负载均衡命令
        • 成员属性
        • 成员方法
        • submit(ServerOperation operation) 提交方法
      • 代码示例
      • 总结
      相关产品与服务
      负载均衡
      负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档