在写代码之前,我们先看下netty的线程模型,这比那固定格式的代码将会更有趣,看完线程模型,你就知道netty写的那几段固定代码的意义了。
在这里插入图片描述
这个线程模型图里面大概包含了这几个组件:bossGroup,workGroup,selectot(accept),selector(读写),pipline,NioSocketChannel,NioServerSocketChannel;
介绍完这些基本组件之后,我们对netty的线程模型应该有了初步的认识,现在我们大概梳理下netty的整个处理过程:
前面已经将netty的基本组成和其线程模型大概说了下,现在我们演示下如何使用netty进行开发:代码已经放到码云:穿云箭
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) {
// 创建 处理连接请求的线程组 1个
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 创建工作组线程 默认为 cpu核数*2 个
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipline中添加自定义的handle处理器
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start");
// 绑定9000 端口号 sync指的是 创建完端口监听后,才执行后续操作
ChannelFuture cf = serverBootstrap.bind(9000).sync();
// 添加监听器
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("服务启动完成");
}
});
// 注册chnnel的关闭事件,sync是只有当关闭事件发生后才结束该线程,否则一直阻塞
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf msg1 = (ByteBuf) msg;
System.out.println(String.format("收到客户端(%s)消息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(String.format("有新的客户端连接:%s", ctx.channel().remoteAddress().toString()));
}
}
# 这里的ChannelInboundHandlerAdapter已经被废弃了,大家后续可以继承SimpleChannelInboundHandler,支持传入泛型,然后配合解码器使用,这里只是做个简单的演示。
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class NettyClient {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(String.format("有新的客户端连接:%s", ctx.channel().remoteAddress().toString()));
}
});
System.out.println("netty client start");
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
cf.addListener((ChannelFutureListener) channelFuture -> System.out.println("客户端启动完成"));
String msg = "";
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
do {
try {
msg = br.readLine();
} catch (IOException e) {
e.printStackTrace();
}
ByteBuf buf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
cf.channel().writeAndFlush(buf);
} while (!msg.equals("end"));
System.out.println("您已退出");
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf msg1 = (ByteBuf) msg;
System.out.println(String.format("收到服户端(%s)消息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
}
}
客户端与服务端建立了TCP/UDP连接,如果连接中限制了发送数据的报文大小,此时 将要发送的数据大于这个限制,就会产生拆包现象; 截取后的数据包会等待下次发送数据的时候一起发送,如果这个时候这部分数据和其他数据包一起发到服务端,又会产生粘包的现象;
解决方案
说零拷贝之前,我们需要引入一个名词“直接内存”,我们知道java代码都运行在jvm虚拟机中,分配的内存数据都是在jvm中分配的,如果想直接访问jvm之外的内存数据,那就叫直接内存访问; 在netty中,直接使用直接内存进行socket进行读写。不需要将数据拷贝到jvm中的缓冲区中,而是将数据直接发送到socket中,不需要再执行中间的拷贝操作;
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。