在查看源码的时候有个抽象的概念,抽象理解源码这块就类似鸡生蛋,蛋生鸡的问题,不看源码永远不知道这种写法,不知道这个原理也好像看不懂这个写法,就形成一个死循环,不懂reactor就很难看得懂netty,不懂netty就不知道reactor这种模式。需要在知识这块了解原始的积累。今天一起说说在实际开发过程中需要考虑的部分,那些影响性能,进而影响功能。这次说说腾讯邮箱的技术分享。邮箱不像咱们的http请求,属于自己的协议,内部使用netty的功能比较多,一起说说内部如何实践的。
其中业务服务,推送服务等均是由netty实现,承担着各种长连接,高并发的业务服务。
handler对象的复用。 initChannel 每次链接建立后,new XDecoder() 和 new XHandller()多个对象,每个连接都对应一个pipeline,也就是pipeline 里面保留多个handler,复用的含义就是我不创建这个么对象行不行,其实不行的,因为handler不能做成功的handler,只能被一个连接所使用,
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
public class XNettyServer {
public static void main(String[] args) throws Exception {
// 1、 线程定义
// accept 处理连接的线程池
EventLoopGroup acceptGroup = new NioEventLoopGroup();
// read io 处理数据的线程池
EventLoopGroup readGroup = new NioEventLoopGroup();
XHandller xHandller = new XHandller();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, readGroup);
// 2、 选择TCP协议,NIO的实现方式
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 3、 职责链定义(请求收到后怎么处理)
ChannelPipeline pipeline = ch.pipeline();
// TODO 3.1 增加解码器
pipeline.addLast(new XDecoder());
// TODO 3.2 打印出内容 handdler
// pipeline.addLast(xHandller ); 复用的时候注释下面的代码
pipeline.addLast(new XHandller());
}
});
// 4、 绑定端口
System.out.println("启动成功,端口 9999");
b.bind(9999).sync().channel().closeFuture().sync();
} finally {
acceptGroup.shutdownGracefully();
readGroup.shutdownGracefully();
}
}
}
如果想共享的话,需要在handler上边增加一个标记 @ChannelHandler.Sharable
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 后续处理handdler
*/
@ChannelHandler.Sharable
public class XHandller extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 输出 bytebuf
ByteBuf buf = (ByteBuf) msg;
byte[] content = new byte[buf.readableBytes()];
buf.readBytes(content);
System.out.println(Thread.currentThread()+ ": 最终打印"+new String(content));
((ByteBuf) msg).release(); // 引用计数减一
// ctx.fireChannelRead();
}
// 异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
一个netty EventLoop 每次都创建一个Handler,共享后连接1 和 连接2都指向同一个handler,就会产生一个问题,就是多线程的问题,多个请求多个线程,同时调用handler的时候,共享变量的情况就会导致线程安全的问题,handler要共享数据,如果优化了一定要注意共享变量不要产生线程安全问题。目前的XHandller 满足共享,不存在线程安全。
XDecoder 里面就存在线程不安全的因素, ByteBuf tempMsg = Unpooled.buffer(); 共享变量的存在。一般来说共享是没有问题的,但是编解码器一般不会共享,在编解码器里面ByteToMessageDecoder 父类里面有个ensureNotSharable() 告诉就不能进行共享。
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
// 编解码一定是根据协议~
public class XDecoder extends ByteToMessageDecoder {
static final int PACKET_SIZE = 220;
// 用来临时保留没有处理过的请求报文
ByteBuf tempMsg = Unpooled.buffer();
// in输入 --- 处理 --- out 输出
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println(Thread.currentThread()+"收到了一次数据包,长度是:" + in.readableBytes());
// in 请求的数据
// out 将粘在一起的报文拆分后的结果保留起来
// 1、 合并报文
ByteBuf message = null;
int tmpMsgSize = tempMsg.readableBytes();
// 如果暂存有上一次余下的请求报文,则合并
if (tmpMsgSize > 0) {
message = Unpooled.buffer();
message.writeBytes(tempMsg);
message.writeBytes(in);
System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
} else {
message = in;
}
// 2、 拆分报文
// 这个场景下,一个请求固定长度为3,可以根据长度来拆分
// i+1 i+1 i+1 i+1 i+1
// 不固定长度,需要应用层协议来约定 如何计算长度
// 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并
// dubbo rpc协议 = header(16) + body(不固定)
// header最后四个字节来标识body
// 长度 = 16 + body长度
// 0xda, 0xbb 魔数
int size = message.readableBytes();
int counter = size / PACKET_SIZE;
for (int i = 0; i < counter; i++) {
byte[] request = new byte[PACKET_SIZE];
// 每次从总的消息中读取220个字节的数据
message.readBytes(request);
// 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
out.add(Unpooled.copiedBuffer(request));
}
// 3、多余的报文存起来
// 第一个报文: i+ 暂存
// 第二个报文: 1 与第一次
size = message.readableBytes();
if (size != 0) {
System.out.println("多余的数据长度:" + size);
// 剩下来的数据放到tempMsg暂存
tempMsg.clear();
tempMsg.writeBytes(message.readBytes(size));
}
}
}
增加了线程池,addLast增加了线程池
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
public class XNettyServer {
XDecoder xDecoder = new XDecoder();
public static void main(String[] args) throws Exception {
// 1、 线程定义
// accept 处理连接的线程池
EventLoopGroup acceptGroup = new NioEventLoopGroup();
// read io 处理数据的线程池
EventLoopGroup readGroup = new NioEventLoopGroup();
EventLoopGroup bizGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, readGroup);
// 2、 选择TCP协议,NIO的实现方式
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 3、 职责链定义(请求收到后怎么处理)
ChannelPipeline pipeline = ch.pipeline();
// TODO 3.1 增加解码器
// pipeline.addLast(new XDecoder());
// TODO 3.2 打印出内容 handdler
pipline.addLast(bizGroup,xDecoder);
//pipeline.addLast(new XHandller());
}
});
// 4、 绑定端口
System.out.println("启动成功,端口 9999");
b.bind(9999).sync().channel().closeFuture().sync();
} finally {
acceptGroup.shutdownGracefully();
readGroup.shutdownGracefully();
}
}
}
客户端拉取邮箱服务器,邮箱服务器通过线程异步进行调用,线程异步处理一下拉取1000条邮件,达到了更大的数据量了,转交给客户端的时候,IO线程(CPU的两倍),最好的状态是少量的线程处理大量的请求,但是会写给客户端的时候量比较大的时候,反馈响应也存在数据量太大的问题。响应如果通过netty 就会存在IO阻塞问题。
5000个分成每个500个,桥窄大车过去就堵塞了,把货都放在一辆辆小车上过去,就可以轻松从桥上通过了。
netty 源码中,会判断如果涉及到iO,自动切换成WriteTask,任务类型来进行处理。
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
Runnable task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
} else {
task = WriteTask.newInstance(next, msg, size, promise);
}
safeExecute(executor, task, promise, msg);
}
}
如果注释了XHandller中,((ByteBuf) msg).release(); // 引用计数减一。 NIOEventLoop 收到 selector 通知,然后进入read环境,申请一个Bytebuf对象,作为数据的载体,最后转变handler进行业务处理。连接如果比较多,每次都申请一个Bytebuf对象,有一种情况1万个连接,就创建500个对象。500个对象可能到501的时候,突然变成1了,这就是JVM的GC机制,内存的损耗交给了GC,引发各种的GC,GC线程的压力,GC最大的问题STW(Java中Stop-The-World机制简称STW,是在执行垃圾收集算法时,Java应用程序的其他所有线程都被挂起(除了垃圾收集帮助器之外)),并不是优化没有效果,GC会导致程序的不稳定,并发量在大点,1万个对象都有可能,这样会频繁的GC。为什么每次都要重复生成一个对象呢。((ByteBuf) msg).release(); // 引用计数减一。 同一个地址多次调用使用同一个ByteBuf。
里面用到了计数器,直接这样理解吧,同一个地址的ByteBuf直接放入到一个堆里面,同一个地址直接取同一个ByteBuf,需要在((ByteBuf) msg).release();
####(三)总结
PS:底层原理的优化,体现在对源码的理解,系统参数决定了你执行的情况,操作系统是对外的平台,平台慢了话,netty程序,java程序在快是没用的。系统的参数调整后,netty也需要进行优化,下一步就是JAVA程序调优。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。