前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >netty在大厂内部的优化实践

netty在大厂内部的优化实践

原创
作者头像
IT架构圈
修改2021-02-01 11:22:55
4310
修改2021-02-01 11:22:55
举报
文章被收录于专栏:IT架构圈IT架构圈

在查看源码的时候有个抽象的概念,抽象理解源码这块就类似鸡生蛋,蛋生鸡的问题,不看源码永远不知道这种写法,不知道这个原理也好像看不懂这个写法,就形成一个死循环,不懂reactor就很难看得懂netty,不懂netty就不知道reactor这种模式。需要在知识这块了解原始的积累。今天一起说说在实际开发过程中需要考虑的部分,那些影响性能,进而影响功能。这次说说腾讯邮箱的技术分享。邮箱不像咱们的http请求,属于自己的协议,内部使用netty的功能比较多,一起说说内部如何实践的。

(一)内部实践
  • ① 整体服务架构

其中业务服务,推送服务等均是由netty实现,承担着各种长连接,高并发的业务服务。

  • ② 推送服务架构
  • ③ netty对象优化这个优化和实际的功能就是无关,做什么功能都不要紧。

handler对象的复用。 initChannel 每次链接建立后,new XDecoder() 和 new XHandller()多个对象,每个连接都对应一个pipeline,也就是pipeline 里面保留多个handler,复用的含义就是我不创建这个么对象行不行,其实不行的,因为handler不能做成功的handler,只能被一个连接所使用,

代码语言:txt
复制
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;

public class XNettyServer {
    public static void main(String[] args) throws Exception {
        // 1、 线程定义
        // accept 处理连接的线程池
        EventLoopGroup acceptGroup = new NioEventLoopGroup();
        // read io 处理数据的线程池
        EventLoopGroup readGroup = new NioEventLoopGroup();
      XHandller xHandller = new XHandller();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptGroup, readGroup);
            // 2、 选择TCP协议,NIO的实现方式
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 3、 职责链定义(请求收到后怎么处理)
                    ChannelPipeline pipeline = ch.pipeline();
                    // TODO 3.1 增加解码器
                    pipeline.addLast(new XDecoder());
                    // TODO 3.2 打印出内容 handdler
                    // pipeline.addLast(xHandller  ); 复用的时候注释下面的代码
                    pipeline.addLast(new XHandller());
                }
            });
            // 4、 绑定端口
            System.out.println("启动成功,端口 9999");
            b.bind(9999).sync().channel().closeFuture().sync();
        } finally {
            acceptGroup.shutdownGracefully();
            readGroup.shutdownGracefully();
        }
    }
}

如果想共享的话,需要在handler上边增加一个标记 @ChannelHandler.Sharable

代码语言:txt
复制
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 后续处理handdler
 */
@ChannelHandler.Sharable
public class XHandller extends ChannelInboundHandlerAdapter {

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 输出 bytebuf
        ByteBuf buf = (ByteBuf) msg;
        byte[] content = new byte[buf.readableBytes()];
        buf.readBytes(content);
        System.out.println(Thread.currentThread()+ ": 最终打印"+new String(content));
        ((ByteBuf) msg).release(); // 引用计数减一
        // ctx.fireChannelRead();
    }

    // 异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

一个netty EventLoop 每次都创建一个Handler,共享后连接1 和 连接2都指向同一个handler,就会产生一个问题,就是多线程的问题,多个请求多个线程,同时调用handler的时候,共享变量的情况就会导致线程安全的问题,handler要共享数据,如果优化了一定要注意共享变量不要产生线程安全问题。目前的XHandller 满足共享,不存在线程安全。

XDecoder 里面就存在线程不安全的因素, ByteBuf tempMsg = Unpooled.buffer(); 共享变量的存在。一般来说共享是没有问题的,但是编解码器一般不会共享,在编解码器里面ByteToMessageDecoder 父类里面有个ensureNotSharable() 告诉就不能进行共享。

代码语言:txt
复制
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

// 编解码一定是根据协议~
public class XDecoder extends ByteToMessageDecoder {
    static final int PACKET_SIZE = 220;

    // 用来临时保留没有处理过的请求报文
    ByteBuf tempMsg = Unpooled.buffer();

    // in输入   --- 处理  --- out 输出
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println(Thread.currentThread()+"收到了一次数据包,长度是:" + in.readableBytes());
        // in 请求的数据
        // out 将粘在一起的报文拆分后的结果保留起来

        // 1、 合并报文
        ByteBuf message = null;
        int tmpMsgSize = tempMsg.readableBytes();
        // 如果暂存有上一次余下的请求报文,则合并
        if (tmpMsgSize > 0) {
            message = Unpooled.buffer();
            message.writeBytes(tempMsg);
            message.writeBytes(in);
            System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
        } else {
            message = in;
        }

        // 2、 拆分报文
        // 这个场景下,一个请求固定长度为3,可以根据长度来拆分
        // i+1 i+1 i+1 i+1 i+1
        // 不固定长度,需要应用层协议来约定 如何计算长度
        // 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并
        // dubbo rpc协议 = header(16) + body(不固定)
        // header最后四个字节来标识body
        // 长度 = 16 + body长度
        // 0xda, 0xbb 魔数


        int size = message.readableBytes();
        int counter = size / PACKET_SIZE;
        for (int i = 0; i < counter; i++) {
            byte[] request = new byte[PACKET_SIZE];
            // 每次从总的消息中读取220个字节的数据
            message.readBytes(request);

            // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
            out.add(Unpooled.copiedBuffer(request));
        }

        // 3、多余的报文存起来
        // 第一个报文: i+  暂存
        // 第二个报文: 1 与第一次
        size = message.readableBytes();
        if (size != 0) {
            System.out.println("多余的数据长度:" + size);
            // 剩下来的数据放到tempMsg暂存
            tempMsg.clear();
            tempMsg.writeBytes(message.readBytes(size));
        }
    }

}
  • ④ 处理过程中,线程的调度问题在整个程序运行过程中,存在好几个块。
  1. Accept 接收, reactor进行读取work线程。
  2. handler耗时操作是在数据库,导致IO线程堵塞所有都被积压。、
  3. 为耗时操作交给指定的线程池。

增加了线程池,addLast增加了线程池

代码语言:txt
复制
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;

public class XNettyServer {
	
	  XDecoder xDecoder = new XDecoder();
    public static void main(String[] args) throws Exception {
        // 1、 线程定义
        // accept 处理连接的线程池
        EventLoopGroup acceptGroup = new NioEventLoopGroup();
        // read io 处理数据的线程池
        EventLoopGroup readGroup = new NioEventLoopGroup();
        
        EventLoopGroup bizGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptGroup, readGroup);
            // 2、 选择TCP协议,NIO的实现方式
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 3、 职责链定义(请求收到后怎么处理)
                    ChannelPipeline pipeline = ch.pipeline();
                    // TODO 3.1 增加解码器
                    // pipeline.addLast(new XDecoder());
                    // TODO 3.2 打印出内容 handdler
                    
                     pipline.addLast(bizGroup,xDecoder);
                    //pipeline.addLast(new XHandller());
                }
            });
            // 4、 绑定端口
            System.out.println("启动成功,端口 9999");
            b.bind(9999).sync().channel().closeFuture().sync();
        } finally {
            acceptGroup.shutdownGracefully();
            readGroup.shutdownGracefully();
        }
    }
}
  • ⑤ 响应内容,必须经过netty的IO线程

客户端拉取邮箱服务器,邮箱服务器通过线程异步进行调用,线程异步处理一下拉取1000条邮件,达到了更大的数据量了,转交给客户端的时候,IO线程(CPU的两倍),最好的状态是少量的线程处理大量的请求,但是会写给客户端的时候量比较大的时候,反馈响应也存在数据量太大的问题。响应如果通过netty 就会存在IO阻塞问题。

5000个分成每个500个,桥窄大车过去就堵塞了,把货都放在一辆辆小车上过去,就可以轻松从桥上通过了。

netty 源码中,会判断如果涉及到iO,自动切换成WriteTask,任务类型来进行处理。

代码语言:txt
复制
private void write(Object msg, boolean flush, ChannelPromise promise) {

        AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeWrite(msg, promise);
            if (flush) {
                next.invokeFlush();
            }
        } else {
            int size = channel.estimatorHandle().size(msg);
            if (size > 0) {
                ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
                // Check for null as it may be set to null if the channel is closed already
                if (buffer != null) {
                    buffer.incrementPendingOutboundBytes(size);
                }
            }
            Runnable task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, msg, size, promise);
            }  else {
                task = WriteTask.newInstance(next, msg, size, promise);
            }
            safeExecute(executor, task, promise, msg);
        }
    }
  1. 调大操作系统的tcp网络缓存
  2. 业务处理,将大数据的write,多次小数据write
  3. 一般为了稳定,基本会保留40~50的资源空闲。QPS3000-5000 机器适度就可以。
  • ⑥ ByteBuf 复用机制

如果注释了XHandller中,((ByteBuf) msg).release(); // 引用计数减一。 NIOEventLoop 收到 selector 通知,然后进入read环境,申请一个Bytebuf对象,作为数据的载体,最后转变handler进行业务处理。连接如果比较多,每次都申请一个Bytebuf对象,有一种情况1万个连接,就创建500个对象。500个对象可能到501的时候,突然变成1了,这就是JVM的GC机制,内存的损耗交给了GC,引发各种的GC,GC线程的压力,GC最大的问题STW(Java中Stop-The-World机制简称STW,是在执行垃圾收集算法时,Java应用程序的其他所有线程都被挂起(除了垃圾收集帮助器之外)),并不是优化没有效果,GC会导致程序的不稳定,并发量在大点,1万个对象都有可能,这样会频繁的GC。为什么每次都要重复生成一个对象呢。((ByteBuf) msg).release(); // 引用计数减一。 同一个地址多次调用使用同一个ByteBuf。

里面用到了计数器,直接这样理解吧,同一个地址的ByteBuf直接放入到一个堆里面,同一个地址直接取同一个ByteBuf,需要在((ByteBuf) msg).release();

(二)提高请求、推送的吞吐量
  1. 业务操作提交到单独的线程执行(防止IO线程的阻塞)。
  2. 调整TCP缓冲区大小,提高网络吞吐量。
  3. 基于Netty框架开发时,业务代码逻辑的调优。
  4. 结合Netty框架特点,复用对象,实现性能提升。

####(三)总结

  1. 并发连接主要靠操作系统参数调优。
  2. 吞吐量的提示,主要靠代码处理能力来提升。
  3. 有时候网络和磁盘会成为瓶颈(10M和100M差距很大)。
  4. 水平扩展,集群的方式是最终方案。
  5. Netty的运作机制很重要(多路复用至关重要reactor)。

PS:底层原理的优化,体现在对源码的理解,系统参数决定了你执行的情况,操作系统是对外的平台,平台慢了话,netty程序,java程序在快是没用的。系统的参数调整后,netty也需要进行优化,下一步就是JAVA程序调优。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • (一)内部实践
  • (二)提高请求、推送的吞吐量
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档