微信公众号:[中间件兴趣圈] 作者简介:《RocketMQ技术内幕》作者
在源码分析Dubbo通讯篇之网络核心类一文中已给出Dubbo netty client的启动流程,如下图:
以Dubbo协议为例,DubboProtocol#refer中,在创建Invoker时,通过getClient方法,开始Client(连接的)创建过程,先重点看一下:
1private ExchangeClient[] getClients(URL url) { // @1
2 // whether to share connection
3 boolean service_share_connect = false;
4 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // @2
5 // if not configured, connection is shared, otherwise, one connection for one service
6 if (connections == 0) {
7 service_share_connect = true;
8 connections = 1;
9 }
10
11 ExchangeClient[] clients = new ExchangeClient[connections]; // @3
12 for (int i = 0; i < clients.length; i++) {
13 if (service_share_connect) {
14 clients[i] = getSharedClient(url); // @4
15 } else {
16 clients[i] = initClient(url); // @5
17 }
18 }
19 return clients;
20 }
代码@1:参数URL,服务提供者URL。
代码@2:获取<dubbo:reference connections = "" />,默认0表示客户端对同一个服务提供者的所有服务,使用共享一个连接,如果该值有设置,则使用非共享的客户端,所谓的共享客户端,以Netty为例,也即客户端对同一服务提供者发起的不同服务,使用同一个客户端(NettyClient)进行请求的发送与接收。
代码@3:根据connections,创建ExchangeClients数组。
代码@4:如果使用共享连接,则调用getSharedClient获取共享连接,如果客户端未建立,则创建客户端。
代码@5:如果不使用共享连接,调用initClient创建客户端,其创建时序图如上图所示。
接下来,还是以Netty4为例,探究一下Dubbo NettyClient的创建细节。
首先从全貌上大概看一下NettyClient对象所持有的属性:
1public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { // @1
2 super(url, wrapChannelHandler(url, handler)); // @2
3}
代码@1:url:服务提供者URL;ChannelHandler handler:事件处理器。
代码@2:wrapChannelHandler在讲解NettyServer时已重点分析,构造其事件转发模型(Dispatch)。
接下来重点分析其父类的构造方法:
1public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
2 super(url, handler); // @1
3
4 send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
5
6 shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
7
8 // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
9 reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); // @2
10
11 try {
12 doOpen(); // @3
13 } catch (Throwable t) {
14 close();
15 throw new RemotingException(url.toInetSocketAddress(), null,
16 "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
17 + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
18 }
19 try {
20 // connect.
21 connect(); // @4
22 if (logger.isInfoEnabled()) {
23 logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
24 }
25 } catch (RemotingException t) {
26 if (url.getParameter(Constants.CHECK_KEY, true)) {
27 close();
28 throw t;
29 } else {
30 logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
31 + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
32 }
33 } catch (Throwable t) {
34 close();
35 throw new RemotingException(url.toInetSocketAddress(), null,
36 "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
37 + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
38 }
39
40 executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class) // @5
41 .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
42 ExtensionLoader.getExtensionLoader(DataStore.class)
43 .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
44 }
代码@1:调用父类的构造其,初始化url、ChannelHandler。
代码@2:初始化send_reconnect、shutdown_timeout、reconnect_warning_period(默认1小时打印一次日志)
代码@3:调用doOpen初始化客户端调用模型,后续重点分析。
代码@4:调用connect方法,向服务端发起TCP连接。
代码@5:获取线程池,并从缓存中移除。
1protected void doOpen() throws Throwable {
2 NettyHelper.setNettyLoggerFactory();
3 final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); // @1
4 bootstrap = new Bootstrap(); // @2
5 bootstrap.group(nioEventLoopGroup) // @3
6 .option(ChannelOption.SO_KEEPALIVE, true)
7 .option(ChannelOption.TCP_NODELAY, true)
8 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
9 //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
10 .channel(NioSocketChannel.class);
11
12 if (getTimeout() < 3000) { // @4
13 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
14 } else {
15 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
16 }
17
18 bootstrap.handler(new ChannelInitializer() {
19
20 @Override
21 protected void initChannel(Channel ch) throws Exception {
22 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
23 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
24 .addLast("decoder", adapter.getDecoder())
25 .addLast("encoder", adapter.getEncoder())
26 .addLast("handler", nettyClientHandler); // @5
27 }
28 });
29 }
代码@1:创建NettyClientHandler。
代码@2:创建Netty客户端启动实例bootstrap.
代码@3:客户端绑定IO线程组(池),注意,一个JVM中所有的NettyClient共享其IO线程。
代码@4:设置连接超时时间,最小连接超时时间为3s。
代码@5:设置编码器、事件连接器,当触发事件后,将调用nettyClientHandler中相关的方法。
1protected void doConnect() throws Throwable {
2 long start = System.currentTimeMillis();
3 ChannelFuture future = bootstrap.connect(getConnectAddress()); // @1
4 try {
5 boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS); // @2
6
7 if (ret && future.isSuccess()) {
8 Channel newChannel = future.channel();
9 try {
10 // Close old channel
11 Channel oldChannel = NettyClient.this.channel; // copy reference
12 if (oldChannel != null) {
13 try {
14 if (logger.isInfoEnabled()) {
15 logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
16 }
17 oldChannel.close();
18 } finally {
19 NettyChannel.removeChannelIfDisconnected(oldChannel);
20 }
21 }
22 } finally {
23 if (NettyClient.this.isClosed()) {
24 try {
25 if (logger.isInfoEnabled()) {
26 logger.info("Close new netty channel " + newChannel + ", because the client closed.");
27 }
28 newChannel.close();
29 } finally {
30 NettyClient.this.channel = null;
31 NettyChannel.removeChannelIfDisconnected(newChannel);
32 }
33 } else {
34 NettyClient.this.channel = newChannel;
35 }
36 }
37 } else if (future.cause() != null) {
38 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
39 + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
40 } else {
41 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
42 + getRemoteAddress() + " client-side timeout "
43 + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
44 + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
45 }
46 } finally {
47 if (!isConnected()) {
48 //future.cancel(true);
49 }
50 }
51 }
代码@1:调用bootstrap.connect方法发起TCP连接。
代码@2:future.awaitUninterruptibly,连接事件只等待3S,这里写成固定了,显然没有与doOpen方法中ChannelOption.CONNECT_TIMEOUT_MILLIS保持一致。
关于NettyClient的介绍就将到这里了,下一篇将会分析编码解码。