聊聊rocketmq的NettyClientConfig

本文主要研究一下rocketmq的NettyClientConfig

NettyClientConfig

org/apache/rocketmq/remoting/netty/NettyClientConfig.java

public class NettyClientConfig {
    /**
     * Worker thread number
     */
    private int clientWorkerThreads = 4;
    private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
    private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
    private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
    private int connectTimeoutMillis = 3000;
    private long channelNotActiveInterval = 1000 * 60;

    /**
     * IdleStateEvent will be triggered when neither read nor write was performed for
     * the specified period of this time. Specify {@code 0} to disable
     */
    private int clientChannelMaxIdleTimeSeconds = 120;

    private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean clientPooledByteBufAllocatorEnable = false;
    private boolean clientCloseSocketIfTimeout = false;

    private boolean useTLS;

    //......
}

这里主要有几个参数:

  • clientWorkerThreads,默认为4
  • clientCallbackExecutorThreads,默认为cpu核数
  • clientOnewaySemaphoreValue,默认为NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE,如果没有指定,则默认为65535
  • clientAsyncSemaphoreValue,默认为NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE,如果没有指定,则默认为65535
  • connectTimeoutMillis,默认为3秒
  • channelNotActiveInterval,默认为60秒
  • clientChannelMaxIdleTimeSeconds,默认为为120秒
  • clientSocketSndBufSize,取NettySystemConfig.socketSndbufSize,如果没有指定,默认为65535
  • clientSocketRcvBufSize,取NettySystemConfig.socketRcvbufSize,如果没有指定,默认为65535
  • clientPooledByteBufAllocatorEnable,默认为false
  • clientCloseSocketIfTimeout,默认为false
  • useTLS,默认为false

NettySystemConfig

org/apache/rocketmq/remoting/netty/NettySystemConfig.java

public class NettySystemConfig {
    public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
        "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
    public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE =
        "com.rocketmq.remoting.socket.sndbuf.size";
    public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE =
        "com.rocketmq.remoting.socket.rcvbuf.size";
    public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE =
        "com.rocketmq.remoting.clientAsyncSemaphoreValue";
    public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE =
        "com.rocketmq.remoting.clientOnewaySemaphoreValue";

    public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
        Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
    public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
    public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
    public static int socketSndbufSize =
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
    public static int socketRcvbufSize =
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
}
  • 采取先从系统变量读取,没有则返回默认值

NettyRemotingClient

org/apache/rocketmq/remoting/netty/NettyRemotingClient.java

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    private static final long LOCK_TIMEOUT_MILLIS = 3000;

    private final NettyClientConfig nettyClientConfig;
    private final Bootstrap bootstrap = new Bootstrap();
    private final EventLoopGroup eventLoopGroupWorker;
    private final Lock lockChannelTables = new ReentrantLock();
    private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();

    private final Timer timer = new Timer("ClientHouseKeepingService", true);

    private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
    private final Lock lockNamesrvChannel = new ReentrantLock();

    private final ExecutorService publicExecutor;

    /**
     * Invoke the callback methods in this executor when process response.
     */
    private ExecutorService callbackExecutor;
    private final ChannelEventListener channelEventListener;
    private DefaultEventExecutorGroup defaultEventExecutorGroup;
    private RPCHook rpcHook;

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
        this(nettyClientConfig, null);
    }

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
        this.nettyClientConfig = nettyClientConfig;
        this.channelEventListener = channelEventListener;

        int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
            }
        });

        if (nettyClientConfig.isUseTLS()) {
            try {
                sslContext = TlsHelper.buildSslContext(true);
                log.info("SSL enabled for client");
            } catch (IOException e) {
                log.error("Failed to create SSLContext", e);
            } catch (CertificateException e) {
                log.error("Failed to create SSLContext", e);
                throw new RuntimeException("Failed to create SSLContext", e);
            }
        }
    }

    private static int initValueIndex() {
        Random r = new Random();

        return Math.abs(r.nextInt() % 999) % 999;
    }

    @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });

        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

    //......
}
  • NettyRemotingClient使用了NettyClientConfig的相关配置来初始化NettyClient
  • ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是nettyClientConfig.getConnectTimeoutMillis()
  • ChannelOption.SO_SNDBUF使用的是nettyClientConfig.getClientSocketSndBufSize()
  • ChannelOption.SO_RCVBUF使用的是nettyClientConfig.getClientSocketRcvBufSize()
  • IdleStateHandler的allIdleTimeSeconds参数使用的是nettyClientConfig.getClientChannelMaxIdleTimeSeconds()

小结

rocketmq的NettyClientConfig指定了NettyRemotingClient相关参数,这里除了Netty相关的,还指定了两个信号量,一个是clientOnewaySemaphoreValue,一个是clientAsyncSemaphoreValue。clientOnewaySemaphoreValue指定的是invokeOnewayImpl方法的调用控制,clientAsyncSemaphoreValue指定的是invokeAsyncImpl方法的调用频率。

doc

  • NettyClientConfig

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-08-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊reactor-netty的PoolResources的两种模式

本文主要研究下reactor-netty的PoolResources的两种模式elastic及fixed。

1561
来自专栏猿天地

高性能NIO框架Netty-对象传输

上篇文章高性能NIO框架Netty入门篇我们对Netty做了一个简单的介绍,并且写了一个入门的Demo,客户端往服务端发送一个字符串的消息,服务端回复一个字符串...

3108
来自专栏菩提树下的杨过

java调用.net asmx / wcf

一、先用asmx与wcf写二个.net web service: 1.1 asmx web服务:asmx-service.asmx.cs 1 using Sy...

2285
来自专栏菩提树下的杨过

spring mvc4的日期/数字格式化、枚举转换

日期、数字格式化显示,是web开发中的常见需求,spring mvc采用XXXFormatter来处理,先看一个最基本的单元测试:

1723
来自专栏微服务那些事儿

微服务的异常处理

不加班的周末,整理了一下项目上的异常处理方案,和小伙伴们共享,里面不成熟的代码或解决方式.QAQ,评论区走起

1.1K6
来自专栏SpringBoot 核心技术

第三十七章:基于SpringBoot架构以及参数装载完成接口安全认证

46510
来自专栏zhisheng

Spring Boot 2.0(七):SpringApplication 深入探索

对于第一个注解 @SpringBootApplication,我已经在博客 Spring Boot 2.0系列文章(六):Spring Boot 2.0中Spr...

1391
来自专栏cmazxiaoma的架构师之路

SpringBoot之路(二)之Web进阶

3454
来自专栏码匠的流水账

聊聊spring boot tomcat jdbc pool的属性绑定

本文主要研究一下spring boot tomcat jdbc pool的属性绑定

1642
来自专栏技术记录

shiro权限控制(二):分布式架构中shiro的实现

前言: 前段时间在搭建公司游戏框架安全验证的时候,就想到之前web最火的shiro框架,虽然后面实践发现在netty中不太适用,最后自己模仿shiro写了一个缩...

6167

扫码关注云+社区

领取腾讯云代金券