前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 五十四、Ribbon启动连接操作:IPrimeConnection检测Server是否能够提供服务

[享学Netflix] 五十四、Ribbon启动连接操作:IPrimeConnection检测Server是否能够提供服务

作者头像
YourBatman
发布2020-03-24 10:04:12
1.1K0
发布2020-03-24 10:04:12
举报
文章被收录于专栏:BAT的乌托邦BAT的乌托邦

代码下载地址:https://github.com/f641385712/netflix-learning

IPrimeConnectionILoadBalancer提供支持,但是非必须开启的一个组件。


IPrimeConnection:启动连接操作

prime:首要的,主要的,上乘的,优异的;priming a connection:启动连接;

该接口定义了启动连接的操作

代码语言:javascript
复制
public interface IPrimeConnection extends IClientConfigAware {
	// 子类应该实现协议特定的连接操作到服务器。
	// server:待连接的服务器
	// uriPath:进行连接时使用的uri。如/api/v1/ping
    public boolean connect(Server server, String uriPath) throws Exception;
}

该接口在ribbon-loadbalancer包内并无任何实现类,在ribbon-httpclient包里有一个实现类HttpPrimeConnection:基于http协议实现链接到Server。因为ribbon-httpclient并不是本系列所要讲述的内容,但是呢它却作为默认的IPrimeConnection实现,并且Spring Cloud里也使用它来启动连接,所以唠一唠。


HttpPrimeConnection

内部依赖于使用Apache HttpClient发送一个Http请求,能够连接成功便代表ok。

代码语言:javascript
复制
public class HttpPrimeConnection implements IPrimeConnection {

	// 它是个HC,继承自Apache的DefaultHttpClient
	private NFHttpClient client;

	// client名称是:clientName-PrimeConnsClient
	// 重点是:链接超时时间是2s。若2s还没连接上就代表链接失败
    @Override
    public void initWithNiwsConfig(IClientConfig niwsClientConfig) {
        client = NFHttpClientFactory.getNamedNFHttpClient(niwsClientConfig.getClientName() + "-PrimeConnsClient", false); 
        HttpConnectionParams.setConnectionTimeout(client.getParams(), 2000);        
    }
    
	//发送一个get请求,只要connect连接上了,就表示成功了(状态码无所谓,因为只是测试connect连接性)
    @Override
    public boolean connect(Server server, String primeConnectionsURIPath) throws Exception {
        String url = "http://" + server.getHostPort() + primeConnectionsURIPath;
        logger.debug("Trying URL: {}", url);
        HttpUriRequest get = new HttpGet(url);
        HttpResponse response = null;
        try {
            response = client.execute(get);
            if (logger.isDebugEnabled() && response.getStatusLine() != null) {
                logger.debug("Response code:" + response.getStatusLine().getStatusCode());
            }
        } finally {
           get.abort();
        }
        return true;
    }

}

它使用HttpClient去connect服务器,只要能够connect上(2s内)就表示成功


方法调用处

IPrimeConnection#connect方法的唯一调用处子在PrimeConnections#connectToServer里,这将在下面继续聊聊它是如何工作的(重点)。


PrimeConnections:连接预热

根据“行业经验”,末尾加s的类一般是批量操作或者是工具类,此处表示批量操作,因为一个Client后面一般对应着一批Server嘛。


成员属性
代码语言:javascript
复制
public class PrimeConnections {

	String primeConnectionsURIPath = "/";
    private ExecutorService executorService;
    private int maxExecutorThreads = 5;
    private long executorThreadTimeout = 30000;
    private String name = "default";
    private float primeRatio = 1.0f;
    int maxRetries = 9;
    long maxTotalTimeToPrimeConnections = 30 * 1000;
    long totalTimeTaken = 0;
    private boolean aSync = true;
        
    // servo里用于监控的类,忽略
    Counter totalCounter;
    Counter successCounter;
    Timer initialPrimeTimer;

    private IPrimeConnection connector;
    private PrimeConnectionEndStats stats;
	
}
  • primeConnectionsURIPath:去请求的URI。默认“/”对大多数人来说都是好事(毕竟我们只需要能连上就行),但如果服务端对此访问是个重操作额话(比如有些应用返回主页),那请换个轻量级的URI(在Filter里返回个常量最好)
    • 可通过PrimeConnectionsURI这个key配置
  • executorService:用于执行异步请求的Executor服务(因为多台Server可以异步一起去完成连接,提高效率)
  • maxExecutorThreads:最大执行线程 ,线程池的maximumPoolSize(一般此值默认即可,并不需要设置太高)。默认值是5
  • executorThreadTimeout:线程池的keepAliveTime,默认值是是30s
  • nameILoadBalancer的名称,没有指定就是defualt,一般是clientName
  • primeRatio:放行的比例。比如你有N台Server,乘以这个比率就是最终多少台完成了(并不代表成功)就不要阻塞主线程了,默认是100%表示全部完成检测了才会放行(注意:它只影响阻塞or不阻塞的情况,并不影响每台Server自己的readyToServe属性值,因为此属性值只跟检测结果有关)
    • 该值默认是1,也就是100%。可通过key MinPrimeConnectionsRatio来配置
  • maxRetries:最大重试次数,默认是9。(如果connect失败了,就重试几次(默认每次connect超时是2s))
    • 可通过MaxRetriesPerServerPrimeConnection这个key配置
  • maxTotalTimeToPrimeConnections:最大耗时,默认是30s。也就是说无论你多少台机器,最大阻塞我30s,超过这个时间我就不等你了,当前成功了几个就是几个
    • 可通过MaxTotalTimeToPrimeConnections这个key来配置
  • totalTimeTaken:此字段并没有使用过
  • aSync:默认值true,也就是使用异步方式去connect每台Server,然后用闭锁统一控制
  • IPrimeConnection connector:实际执行连接每台server的逻辑
    • 实现类通过PrimeConnectionsClassName这个key来指定,默认值是com.netflix.niws.client.http.HttpPrimeConnection
  • PrimeConnectionEndStats stats:记录每批链接完成后的状态,并提供get方法给外部访问(包含total、success、failure、totalTime四个参数)

初始化方法

对各个成员属性赋值,来自于构造器、以及IClientConfig配置。如线程池的初始化:

代码语言:javascript
复制
executorService = new ThreadPoolExecutor(
	1, // 核心线程数只有1个
	maxExecutorThreads, // 默认是5
	executorThreadTimeout, // 默认30s
	TimeUnit.MILLISECONDS,
	new LinkedBlockingQueue<Runnable>(), // 因为Server不会超级多,所以用无界队列也没关系
	new ASyncPrimeConnectionsThreadFactory(name));

提供的public方法

该类public方法一共有四个:

代码语言:javascript
复制
public void primeConnections(List<Server> servers) { ... }
public List<Future<Boolean>> primeConnectionsAsync(final List<Server> servers, final PrimeConnectionListener listener) { ... }

// 这两个简单,不用做单独讲解和梳理
public PrimeConnectionEndStats getEndStats() { ... }
public void shutdown() { ... }

其中primeConnections/primeConnectionsAsync均被调用,所有下面详细絮叨絮叨。


primeConnectionsAsync()

它通过异步的方式去连接Server,成功就标记其readyToServe=true,否则标记其readyToServe=false

其中PrimeConnectionListener属于监听器,它仅有一个方法primeCompleted()在connect完成后调用(完成指的是:connect成功or重试N次后失败)

代码语言:javascript
复制
PrimeConnections:

	public List<Future<Boolean>> primeConnectionsAsync(final List<Server> servers, final PrimeConnectionListener listener) {
		...
		List<Future<Boolean>> ftList = new ArrayList<>();
		for (Server s : allServers) {
			s.setReadyToServe(false);
			
			// 默认是开启了异步方式的,任务交给线程池处理
			if (aSync) {
				ftList.add(executorService.submit(() -> connectToServer(s, listener)));
			} else {
				connectToServer(s, listener);
			}	
		}
	}

	// 使用IPrimeConnection#connect完成连接动作
	// 对于http的connect来说,超时是2s,所以此方法最长hold 18秒
	private Boolean connectToServer(final Server s, final PrimeConnectionListener listener) {
		boolean success = false;
		do {
			success = connector.connect(s, primeConnectionsURIPath);
			successCounter.increment();
			lastException = null;
			//连接成功后立马停止
			break;
		} while (!success && (tryNum <= maxRetries));
			
		if (listener != null) {
			listener.primeCompleted(s, lastException);
		}
		return success;
	}

	// 以递增的方式,每次增加重试时的睡眠时长,这是非常合理的。如:
	// 200ms, 400 ms, 800ms, 1600ms etc.
    private void sleepBeforeRetry(int tryNum) {
        try {
            int sleep = (tryNum + 1) * 100;
            logger.debug("Sleeping for " + sleep + "ms ...");
            Thread.sleep(sleep);
        } catch (InterruptedException ex) {
        }
    }

connectToServer()用于去实际connect服务器,以Http实现为例默认的超时时间是2s,但是并不代表最长hold住18s哦,请参考sleepBeforeRetry()方法的实现。

另外还能发现:connect成功与否它会影响到Server.readyToServe属性的值,该值默认是true,但如果你要connect的话,只有成功了它才会是true,否则会给你设置为fasle,这样在轮询负载策略中此Server将不会再提供服务了。

需要注意的是:Server.readyToServe这个属性唯一使用处是RoundRobinRule轮询规则里,而该算法既是默认规则,又是使用最广泛的规则,所以它的影响仅是RoundRobinRule轮询策略,但因为它是默认策略且使用很广,所以其实该接口的影响还是很大的。

要么你就不指定IPrimeConnection的实现,要么就指定了实现就就得谨慎操作,否则容易使得Server不可用。理论上,如果你木有导ribbon-httpclient包的话,是不会有IPrimeConnection实现的,但是,但是,但是Spring Cloud默认都导了此包,需要引起重视~


方法调用处

该方法属偏底层方法,所以它用于异步执行,也可以传入一个PrimeConnectionListener监听器来监听完成后的动作(比如默认实现是:若成功了就标记readyToServe=true等等)。

它作为一个public方法,除了被本类的primeConnections调用外,还被BaseLoadBalancer#setServersList这个负载均衡器调用了:

代码语言:javascript
复制
BaseLoadBalancer:

	public void setServersList(List lsrv) {
		...
		if (isEnablePrimingConnections()) {
			...
            if (primeConnections != null) {
                primeConnections.primeConnectionsAsync(newServers, this);
            }
		}
		...
	}

它传入的监听器是this,因为BaseLoadBalancer自己便是该监听器的实现:

代码语言:javascript
复制
BaseLoadBalancer:

    @Override
    public void primeCompleted(Server s, Throwable lastException) {
        s.setReadyToServe(true);
    }

这么做的目的很明显:对刚放进来的Server逐一做链接检查,然后赋值到Server.readyToServe属性身上。

注意:enablePrimingConnections通过key EnablePrimeConnections可配置,不过它的默认值是false。


primeConnections()

开启链接,阻塞直到目标服务器的配置百分比(默认为100%)启动或达到最大时间

代码语言:javascript
复制
PrimeConnections:

	public void primeConnections(List<Server> servers) {

		// 准备发送链接之前,先全部置为false(这种操作是很危险的,所以请保证连通性)
        for (Server server: servers) {
            server.setReadyToServe(false);
        }		
        // 默认100%执行完了(并不是通过)才会放行主线程
        // 此处使用CountDownLatch 闭锁来控制,异步 -> 同步阻塞
        int totalCount = (int) (servers.size() * primeRatio); 
        CountDownLatch latch = new CountDownLatch(totalCount);
        // 只要木有抛出异常(证明联通了),那就s.setReadyToServe(true);
        primeConnectionsAsync(servers, new PrimeConnectionListener()  {            
            @Override
            public void primeCompleted(Server s, Throwable lastException) {
                if (lastException == null) {
                    successCount.incrementAndGet();
                    s.setReadyToServe(true);
                } else {
                    failureCount.incrementAndGet();
                }
                latch.countDown();
            }
        }); 
		...
		// 如果机器太多,也不会一直等待你,最大等待时间,默认是30s
		// 当然可以通过`MaxTotalTimeToPrimeConnections`这个key来设定
		latch.await(maxTotalTimeToPrimeConnections, TimeUnit.MILLISECONDS);
		...
		// 记录链接的结果,后面可以通过get方法获取到
		stats = new PrimeConnectionEndStats(totalCount, successCount.get(), failureCount.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
		// 打印一下结果。结果为info级别:Priming Connections not fully successful
		// 详细日志是debug级别
		printStats(stats);
	}

可以理解它是对primeConnectionsAsync()方法的封装,在全异步的基础上阻塞、最大超时时间、监听器等功能,因此它是更具有实用特性的。


方法调用处

该方法的调用处均是外部调用,发生在初始化阶段

  • BaseLoadBalancer#init:primeConnections(getReachableServers())
  • DynamicServerListLoadBalancer#restOfInit:primeConnections(getReachableServers())

有何用?

你不免也会疑问,这个接口有何用呢?首先需要明确:该开关默认是关闭的(可以通过key EnablePrimeConnections=true来开启,默认值是false),因此可知它的作用并不是那么的必须。

它的作用可描述为:启动指定Client的链接(尝试去连一次),作用主要用于解决那些部署环境(如读EC2)在实际使用实时请求之前,从防火墙连接/路径进行预热(比如先加白名单、初始化等等动作比较耗时,可以用它先去打通,混个脸熟嘛)。


总结

Ribbon启动连接操作:IPrimeConnection就先介绍到这,它能够帮你检测到Server的可用性,让你的Server更加健康,所以若你可以很好的玩转它,为你所用,那么它对你的工程健康指数是有帮助的。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • IPrimeConnection:启动连接操作
    • HttpPrimeConnection
      • 方法调用处
      • PrimeConnections:连接预热
        • 成员属性
          • 初始化方法
            • 提供的public方法
            • 有何用?
            • 总结
            相关产品与服务
            负载均衡
            负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档