IPrimeConnection
是ILoadBalancer
提供支持,但是非必须开启的一个组件。
prime
:首要的,主要的,上乘的,优异的;priming a connection
:启动连接;
该接口定义了启动连接的操作。
public interface IPrimeConnection extends IClientConfigAware {
// 子类应该实现协议特定的连接操作到服务器。
// server:待连接的服务器
// uriPath:进行连接时使用的uri。如/api/v1/ping
public boolean connect(Server server, String uriPath) throws Exception;
}
该接口在ribbon-loadbalancer
包内并无任何实现类,在ribbon-httpclient
包里有一个实现类HttpPrimeConnection
:基于http协议实现链接到Server。因为ribbon-httpclient
并不是本系列所要讲述的内容,但是呢它却作为默认的IPrimeConnection
实现,并且Spring Cloud
里也使用它来启动连接,所以唠一唠。
内部依赖于使用Apache HttpClient
发送一个Http请求,能够连接成功便代表ok。
public class HttpPrimeConnection implements IPrimeConnection {
// 它是个HC,继承自Apache的DefaultHttpClient
private NFHttpClient client;
// client名称是:clientName-PrimeConnsClient
// 重点是:链接超时时间是2s。若2s还没连接上就代表链接失败
@Override
public void initWithNiwsConfig(IClientConfig niwsClientConfig) {
client = NFHttpClientFactory.getNamedNFHttpClient(niwsClientConfig.getClientName() + "-PrimeConnsClient", false);
HttpConnectionParams.setConnectionTimeout(client.getParams(), 2000);
}
//发送一个get请求,只要connect连接上了,就表示成功了(状态码无所谓,因为只是测试connect连接性)
@Override
public boolean connect(Server server, String primeConnectionsURIPath) throws Exception {
String url = "http://" + server.getHostPort() + primeConnectionsURIPath;
logger.debug("Trying URL: {}", url);
HttpUriRequest get = new HttpGet(url);
HttpResponse response = null;
try {
response = client.execute(get);
if (logger.isDebugEnabled() && response.getStatusLine() != null) {
logger.debug("Response code:" + response.getStatusLine().getStatusCode());
}
} finally {
get.abort();
}
return true;
}
}
它使用HttpClient去connect服务器,只要能够connect上(2s内)就表示成功。
IPrimeConnection#connect
方法的唯一调用处子在PrimeConnections#connectToServer
里,这将在下面继续聊聊它是如何工作的(重点)。
根据“行业经验”,末尾加s的类一般是批量操作或者是工具类,此处表示批量操作,因为一个Client
后面一般对应着一批Server嘛。
public class PrimeConnections {
String primeConnectionsURIPath = "/";
private ExecutorService executorService;
private int maxExecutorThreads = 5;
private long executorThreadTimeout = 30000;
private String name = "default";
private float primeRatio = 1.0f;
int maxRetries = 9;
long maxTotalTimeToPrimeConnections = 30 * 1000;
long totalTimeTaken = 0;
private boolean aSync = true;
// servo里用于监控的类,忽略
Counter totalCounter;
Counter successCounter;
Timer initialPrimeTimer;
private IPrimeConnection connector;
private PrimeConnectionEndStats stats;
}
primeConnectionsURIPath
:去请求的URI。默认“/”对大多数人来说都是好事(毕竟我们只需要能连上就行),但如果服务端对此访问是个重操作额话(比如有些应用返回主页),那请换个轻量级的URI(在Filter里返回个常量最好) PrimeConnectionsURI
这个key配置executorService
:用于执行异步请求的Executor服务(因为多台Server可以异步一起去完成连接,提高效率)maxExecutorThreads
:最大执行线程 ,线程池的maximumPoolSize(一般此值默认即可,并不需要设置太高)。默认值是5executorThreadTimeout
:线程池的keepAliveTime,默认值是是30sname
:ILoadBalancer
的名称,没有指定就是defualt,一般是clientNameprimeRatio
:放行的比例。比如你有N台Server,乘以这个比率就是最终多少台完成了(并不代表成功)就不要阻塞主线程了,默认是100%表示全部完成检测了才会放行(注意:它只影响阻塞or不阻塞的情况,并不影响每台Server自己的readyToServe
属性值,因为此属性值只跟检测结果有关) MinPrimeConnectionsRatio
来配置maxRetries
:最大重试次数,默认是9。(如果connect失败了,就重试几次(默认每次connect超时是2s)) MaxRetriesPerServerPrimeConnection
这个key配置maxTotalTimeToPrimeConnections
:最大耗时,默认是30s。也就是说无论你多少台机器,最大阻塞我30s,超过这个时间我就不等你了,当前成功了几个就是几个。 MaxTotalTimeToPrimeConnections
这个key来配置totalTimeTaken
:此字段并没有使用过aSync
:默认值true,也就是使用异步方式去connect每台Server,然后用闭锁统一控制IPrimeConnection connector
:实际执行连接每台server的逻辑 PrimeConnectionsClassName
这个key来指定,默认值是com.netflix.niws.client.http.HttpPrimeConnection
PrimeConnectionEndStats stats
:记录每批链接完成后的状态,并提供get方法给外部访问(包含total、success、failure、totalTime
四个参数)对各个成员属性赋值,来自于构造器、以及IClientConfig
配置。如线程池的初始化:
executorService = new ThreadPoolExecutor(
1, // 核心线程数只有1个
maxExecutorThreads, // 默认是5
executorThreadTimeout, // 默认30s
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), // 因为Server不会超级多,所以用无界队列也没关系
new ASyncPrimeConnectionsThreadFactory(name));
该类public方法一共有四个:
public void primeConnections(List<Server> servers) { ... }
public List<Future<Boolean>> primeConnectionsAsync(final List<Server> servers, final PrimeConnectionListener listener) { ... }
// 这两个简单,不用做单独讲解和梳理
public PrimeConnectionEndStats getEndStats() { ... }
public void shutdown() { ... }
其中primeConnections/primeConnectionsAsync
均被调用,所有下面详细絮叨絮叨。
它通过异步的方式去连接Server,成功就标记其readyToServe=true
,否则标记其readyToServe=false
。
其中PrimeConnectionListener
属于监听器,它仅有一个方法primeCompleted()
在connect完成后调用(完成指的是:connect成功or重试N次后失败)
PrimeConnections:
public List<Future<Boolean>> primeConnectionsAsync(final List<Server> servers, final PrimeConnectionListener listener) {
...
List<Future<Boolean>> ftList = new ArrayList<>();
for (Server s : allServers) {
s.setReadyToServe(false);
// 默认是开启了异步方式的,任务交给线程池处理
if (aSync) {
ftList.add(executorService.submit(() -> connectToServer(s, listener)));
} else {
connectToServer(s, listener);
}
}
}
// 使用IPrimeConnection#connect完成连接动作
// 对于http的connect来说,超时是2s,所以此方法最长hold 18秒
private Boolean connectToServer(final Server s, final PrimeConnectionListener listener) {
boolean success = false;
do {
success = connector.connect(s, primeConnectionsURIPath);
successCounter.increment();
lastException = null;
//连接成功后立马停止
break;
} while (!success && (tryNum <= maxRetries));
if (listener != null) {
listener.primeCompleted(s, lastException);
}
return success;
}
// 以递增的方式,每次增加重试时的睡眠时长,这是非常合理的。如:
// 200ms, 400 ms, 800ms, 1600ms etc.
private void sleepBeforeRetry(int tryNum) {
try {
int sleep = (tryNum + 1) * 100;
logger.debug("Sleeping for " + sleep + "ms ...");
Thread.sleep(sleep);
} catch (InterruptedException ex) {
}
}
connectToServer()
用于去实际connect服务器,以Http实现为例默认的超时时间是2s,但是并不代表最长hold住18s哦,请参考sleepBeforeRetry()
方法的实现。
另外还能发现:connect成功与否它会影响到Server.readyToServe
属性的值,该值默认是true,但如果你要connect的话,只有成功了它才会是true,否则会给你设置为fasle,这样在轮询负载策略中此Server将不会再提供服务了。
需要注意的是:Server.readyToServe
这个属性唯一使用处是RoundRobinRule
轮询规则里,而该算法既是默认规则,又是使用最广泛的规则,所以它的影响仅是RoundRobinRule
轮询策略,但因为它是默认策略且使用很广,所以其实该接口的影响还是很大的。
要么你就不指定
IPrimeConnection
的实现,要么就指定了实现就就得谨慎操作,否则容易使得Server不可用。理论上,如果你木有导ribbon-httpclient
包的话,是不会有IPrimeConnection
实现的,但是,但是,但是Spring Cloud默认都导了此包,需要引起重视~
该方法属偏底层方法,所以它用于异步执行,也可以传入一个PrimeConnectionListener
监听器来监听完成后的动作(比如默认实现是:若成功了就标记readyToServe=true
等等)。
它作为一个public方法,除了被本类的primeConnections
调用外,还被BaseLoadBalancer#setServersList
这个负载均衡器调用了:
BaseLoadBalancer:
public void setServersList(List lsrv) {
...
if (isEnablePrimingConnections()) {
...
if (primeConnections != null) {
primeConnections.primeConnectionsAsync(newServers, this);
}
}
...
}
它传入的监听器是this,因为BaseLoadBalancer
自己便是该监听器的实现:
BaseLoadBalancer:
@Override
public void primeCompleted(Server s, Throwable lastException) {
s.setReadyToServe(true);
}
这么做的目的很明显:对刚放进来的Server逐一做链接检查,然后赋值到Server.readyToServe
属性身上。
注意:
enablePrimingConnections
通过keyEnablePrimeConnections
可配置,不过它的默认值是false。
开启链接,阻塞直到目标服务器的配置百分比(默认为100%)启动或达到最大时间。
PrimeConnections:
public void primeConnections(List<Server> servers) {
// 准备发送链接之前,先全部置为false(这种操作是很危险的,所以请保证连通性)
for (Server server: servers) {
server.setReadyToServe(false);
}
// 默认100%执行完了(并不是通过)才会放行主线程
// 此处使用CountDownLatch 闭锁来控制,异步 -> 同步阻塞
int totalCount = (int) (servers.size() * primeRatio);
CountDownLatch latch = new CountDownLatch(totalCount);
// 只要木有抛出异常(证明联通了),那就s.setReadyToServe(true);
primeConnectionsAsync(servers, new PrimeConnectionListener() {
@Override
public void primeCompleted(Server s, Throwable lastException) {
if (lastException == null) {
successCount.incrementAndGet();
s.setReadyToServe(true);
} else {
failureCount.incrementAndGet();
}
latch.countDown();
}
});
...
// 如果机器太多,也不会一直等待你,最大等待时间,默认是30s
// 当然可以通过`MaxTotalTimeToPrimeConnections`这个key来设定
latch.await(maxTotalTimeToPrimeConnections, TimeUnit.MILLISECONDS);
...
// 记录链接的结果,后面可以通过get方法获取到
stats = new PrimeConnectionEndStats(totalCount, successCount.get(), failureCount.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
// 打印一下结果。结果为info级别:Priming Connections not fully successful
// 详细日志是debug级别
printStats(stats);
}
可以理解它是对primeConnectionsAsync()
方法的封装,在全异步的基础上阻塞、最大超时时间、监听器等功能,因此它是更具有实用特性的。
该方法的调用处均是外部调用,发生在初始化阶段:
BaseLoadBalancer#init
:primeConnections(getReachableServers())DynamicServerListLoadBalancer#restOfInit
:primeConnections(getReachableServers())你不免也会疑问,这个接口有何用呢?首先需要明确:该开关默认是关闭的(可以通过key EnablePrimeConnections=true
来开启,默认值是false),因此可知它的作用并不是那么的必须。
它的作用可描述为:启动指定Client的链接(尝试去连一次),作用主要用于解决那些部署环境(如读EC2)在实际使用实时请求之前,从防火墙连接/路径进行预热(比如先加白名单、初始化等等动作比较耗时,可以用它先去打通,混个脸熟嘛)。
Ribbon启动连接操作:IPrimeConnection
就先介绍到这,它能够帮你检测到Server的可用性,让你的Server更加健康,所以若你可以很好的玩转它,为你所用,那么它对你的工程健康指数是有帮助的。