Netty流程学习

Netty中首先会经过OP_ACCEPT操作,再经过OP_READ事件,此时的操作是在processSelectionKeys中进行处理的,此时首先select出事件,然后执行处理操作。此时的read方法会会执行先后会执行两个事件,一个是连接事件16和一个读事件1。而在OP_WRITE则是在缓冲区写满的时候,才会去注册,等待通知去写。

一个普通的BIO的操作:

 public class BIOServer {
        public static void main(String[] args) throws Exception {
            ServerSocket serverSocket = new ServerSocket(6666);
            ExecutorService executorService = Executors.newCachedThreadPool();
            while (true) {
                System.out.println("等待客户端连接。。。。");
                Socket socket = serverSocket.accept(); //阻塞
                executorService.execute(() -> {
                    try {
                        InputStream inputStream = socket.getInputStream(); //阻塞
                        byte[] bytes = new byte[1024];
                        while (true){
                            int length = inputStream.read(bytes);
                            if(length == -1){
                                break;
                            }
                            System.out.println(new String(bytes, 0, length, "UTF-
                                    8"));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }

从上面我们可以看到首先绑定端口,然后进行连接,然后执行read操作。

NIO操作:首先需要打开selector,同时打开serverSocketChannel,设置成非阻塞方式,此时进行通道获取,然后绑定bind端口,注册感兴趣的事件。接下来是在死循环中,进行轮询获取selectionKeys,然后对selectionKeys中的事件进行处理,处理完,进行移除。可以看到在处理的过程中,会涉及到连接事件和读事件的操作。

public class SelectorDemo {
        /**
         * 注册事件
         *
         * @return
         */
        private Selector getSelector() throws Exception {
            //获取selector对象
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false); //非阻塞
            //获取通道并且绑定端⼝
            ServerSocket socket = serverSocketChannel.socket();
            socket.bind(new InetSocketAddress(6677));
            //注册感兴趣的事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            return selector;
        }

        public void listen() throws Exception {
            Selector selector = this.getSelector();
            while (true) {
                selector.select(); //该方法会阻塞,直到至少有一个事件的发生
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    process(selectionKey, selector);
                    iterator.remove();
                }
            }
        }

        private void process(SelectionKey key, Selector selector) throws Exception {
            if(key.isAcceptable()){ //新连接请求
                ServerSocketChannel server = (ServerSocketChannel)key.channel();
                SocketChannel channel = server.accept();
                channel.configureBlocking(false); //阻塞
                channel.register(selector, SelectionKey.OP_READ);
            }else if(key.isReadable()){ //读数据
                SocketChannel channel = (SocketChannel)key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                channel.read(byteBuffer);
                System.out.println("form 客户端 " + new String(byteBuffer.array(),
                        0, byteBuffer.position()));
            }
        }

        public static void main(String[] args) throws Exception {
            new SelectorDemo().listen();
        }
    }

而Netty,正是围绕NIO进行优化封装的。

也即Netty中,我们首先会启动服务,此时会将连接事件注册到NioEventLoop中,而这个过程首先是注册0,然后注册16这个事件,也即连接事件,接着注册读事件,如果不能写的时候,注册写事件,等待写通知。

在Dubbo中进行的封装:

dubbo暴露服务的流程如下

在dubbo中,经过invoker操作后,会调用协议进行dubbo协议适配:

Exporter<?> exporter = PROTOCOL.export(wrapperInvoker)
 @Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
//进行网络通信操作,启动netty服务器
 @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }

        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }

openServer(url):

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    if (isServer) {
        ProtocolServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}

可以看到这里使用了单例模式。

同时执行bind操作:

private ProtocolServer createServer(URL url) {
    url = URLBuilder.from(url)
            // send readonly event when server closes, it's enabled by default
            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            // enable heartbeat by default
            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
            .addParameter(CODEC_KEY, DubboCodec.NAME)
            .build();
    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    str = url.getParameter(CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }

    return new DubboProtocolServer(server);
}

可以看到绑定操作的实质:初始化和启动Netty服务器

@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
    return new NettyServer(url, handler);
}


 /**
     * Init and start netty server
     *
     * @throws Throwable
     */
    @Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        //boss线程组这里设置为1个
        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        //worker线程组
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

同时可以看到dubbo中Netty的设置:将boss线程组设置为1个,避免过多的线程浪费。

禁用Nagle算法=> .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE),来减少网络卡顿。

设置编解码器,采用适配器的方式,适配协议对应的编解码,方便协议适配。

设置空闲处理handler参数:IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit)

中的读闲置时间为0,同时写闲置时间为0,充分保证性能。

 ch.pipeline()
            .addLast("decoder", adapter.getDecoder())
            .addLast("encoder", adapter.getEncoder())
            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
            .addLast("handler", nettyServerHandler);
}

设置闲置超时时间:idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3)

public static int getIdleTimeout(URL url) {
    int heartBeat = getHeartbeat(url);
    // idleTimeout should be at least more than twice heartBeat because possible retries of client.
    int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3);
    if (idleTimeout < heartBeat * 2) {
        throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
    }
    return idleTimeout;
}

连接完成之后,不能无所事事,此时应该会执行业务处理。也即此时可以看到上面的NettyServerHandler。因此可以看到dubbo的线程模型:

配置 Dubbo 中的线程模型

如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。

但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。

如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁。

因此,需要通过不同的派发策略和不同的线程池配置的组合来应对不同的场景:

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

Dispatcher

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

ThreadPool

  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
  • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

下面来看NettyServerHandler中:执行的操作包括:连接操作connect、断开连接disconnect、received操作、写操作write和发送操作sent、关闭操作

/**
 * NettyServerHandler.
 */
@io.netty.channel.ChannelHandler.Sharable
public class NettyServerHandler extends ChannelDuplexHandler {
    //执行连接操作
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        if (channel != null) {
            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
        }
        handler.connected(channel);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
            handler.disconnected(channel);
        } finally {
            NettyChannel.removeChannel(ctx.channel());
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        handler.received(channel, msg);
    }


    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        handler.sent(channel, msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // server will close channel when server don't receive any heartbeat from client util timeout.
        if (evt instanceof IdleStateEvent) {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                logger.info("IdleStateEvent triggered, close channel " + channel);
                channel.close();
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.caught(channel, cause);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

    public void handshakeCompleted(HandshakeCompletionEvent evt) {
        // TODO
    }
}

同时NettyServerClient里面也有这几个事件。

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-12-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Dubbo源码学习五-服务发现

    从类图中,我们可以看到ReferenceBean继承了Referenceconfig,同时实现了FactoryBean、DisposableBean、Appli...

    路行的亚洲
  • Netty学习一

    前面我们已经学习了NIO的简单知识,三大组件:ByteBuffer、Channel、Selector。知道ByteBufffer是数据,而Channel是数据的...

    路行的亚洲
  • dubbo源码学习四-服务注册以及服务提供者总结

    文章参考官方文档以及丁威老师的文章。前面我们已经知道服务暴露分为本地服务暴露和远程服务暴露,同时远程服务暴露又分为:进行服务暴露、服务注册到注册中心、服务订阅。...

    路行的亚洲
  • 【安全通知】PyPI 官方仓库遭遇covd恶意包投毒

    近日,腾讯洋葱反入侵系统检测发现 PyPI官方仓库被恶意上传了covd 钓鱼包,并通知官方仓库下架处理。由于国内开源镜像站均同步于PyPI官方仓库,所以该问题不...

    腾讯安全应急响应中心
  • [CodeIgniter4]-错误处理

    CodeIgniter 通过 SPL collection 和一些框架内自定义异常来生成系统错误报告。错误处理的行为取决于你部署环境的设置,当一个错误或异常被抛...

    landv
  • SpringMVC之细说ModelAndView

    当控制器处理完请求时,通常会将包含视图名称或视图对象以及一些模型属性的ModelAndView对象返回到DispatcherServlet。

    一觉睡到小时候
  • 干货|揭秘2020年最新的24招小程序运营玩法

    小程序面世三年,在电商方面,为企业创造了超过100亿的gmv,而商家们该如何落地布局,抢占小程序电商红利呢?今天,新爷从我们服务过的众多商户中,总结出了24招高...

    云店加小程序分享
  • Vim第二讲 删除、重复、撤销

    其中: operator - 操作符,代表要做的事情,比如 d 代表删除 [number] - 可以附加的数字,代表动作重复的次数 motion - 动作,代表...

    宋天伦
  • ElasticSearch集群发现,解答使用Docker部署ES集群时留下的疑惑

    在介绍使用Docker部署ES集群的那篇文章中,docker-compose.yml配置文件中,只配置了第一个ES节点的端口,第二个节点并没有配置端口,而是使用...

    Java艺术
  • 使用SAP CRM AET工具创建类型为下拉列表的扩展字段

    When you create extension field via Application Extension Tool, it seems the cod...

    Jerry Wang

扫码关注云+社区

领取腾讯云代金券