假设客户端分别发送了两个数据包D1和D2给服务器,由于服务器端一次读取到的字节数是不确定的,所以可能发生四种情况:
1、服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包。
2、服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包。
3、服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包。
4、服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。
如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。
那么在Netty中可使用LineBasedFrameDecoder和StringDecoder
LineBasedFrameDecoder的工作原理是一次遍历ByteBuf中的可读字节,判断看是否有"\n"或者"\r\n",如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。
StringDecoder将接收到的对象转换成字符串,然后继续调用后面的handler。
利用LineBasedFrameDecoder解决TCP粘包问题:
1 package netty;
2
3 import io.netty.bootstrap.ServerBootstrap;
4 import io.netty.channel.ChannelFuture;
5 import io.netty.channel.ChannelInitializer;
6 import io.netty.channel.ChannelOption;
7 import io.netty.channel.EventLoopGroup;
8 import io.netty.channel.nio.NioEventLoopGroup;
9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.LineBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13
14
15 public class TimeServer {
16
17 public void bind(int port) throws Exception{
18 //配置服务端的NIO线程组
19 EventLoopGroup bossGroup = new NioEventLoopGroup();
20 EventLoopGroup workerGroup = new NioEventLoopGroup();
21 try{
22 ServerBootstrap b = new ServerBootstrap();
23 b.group(bossGroup,workerGroup)
24 .channel(NioServerSocketChannel.class)
25 .option(ChannelOption.SO_BACKLOG, 1024)
26 .childHandler(new ChildChannelHandler());
27 //绑定端口,同步等待成功
28 ChannelFuture f = b.bind(port).sync();
29 //等待服务器监听端口关闭
30 f.channel().closeFuture().sync();
31 }finally{
32 //优雅退出,释放线程池资源
33 bossGroup.shutdownGracefully();
34 workerGroup.shutdownGracefully();
35 }
36 }
37
38 private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
39 @Override
40 protected void initChannel(SocketChannel arg0) throws Exception{
41 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
42 arg0.pipeline().addLast(new StringDecoder());
43 arg0.pipeline().addLast(new TimeServerHandler());
44 }
45 }
46
47 public static void main(String args[]) throws Exception{
48 int port = 10001;
49 if(args != null && args.length > 0){
50 try{
51 port = Integer.valueOf(args[0]);
52 }catch(NumberFormatException e){
53 //采用默认值
54 }
55 }
56 new TimeServer().bind(port);
57 }
58 }
TimeServerHandler, msg是删除回车换行符后的请求消息,不需要额外考虑处理半包问题,也不需要对请求消息进行编码:
1 import java.io.IOException;
2 import io.netty.buffer.ByteBuf;
3 import io.netty.buffer.Unpooled;
4 import io.netty.channel.ChannelHandlerAdapter;
5 import io.netty.channel.ChannelHandlerContext;
6
7 public class TimeServerHandler extends ChannelHandlerAdapter{
8
9 private int counter;
10
11 public void channelRead(ChannelHandlerContext ctx,Object msg) throws IOException{
12
13 String body = (String)msg;
14 System.out.println("The time server receive order:" + body + "; the counter is :" + ++counter);
15 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)? new java.util.Date(
16 System.currentTimeMillis()).toString() : "BAD ORDER";
17 currentTime = currentTime + System.getProperty("line.separator");
18 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
19 ctx.writeAndFlush(resp);
20 }
21
22 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
23 ctx.flush();
24 }
25
26 @Override
27 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
28 ctx.close();
29 }
30 }
TimeClient,在TimeClientHandler之前新增lineBasedFrameDecoder和StringDecoder解码器:
1 import io.netty.bootstrap.Bootstrap;
2 import io.netty.channel.ChannelFuture;
3 import io.netty.channel.ChannelInitializer;
4 import io.netty.channel.ChannelOption;
5 import io.netty.channel.EventLoopGroup;
6 import io.netty.channel.nio.NioEventLoopGroup;
7 import io.netty.channel.socket.SocketChannel;
8 import io.netty.channel.socket.nio.NioSocketChannel;
9 import io.netty.handler.codec.LineBasedFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11
12 public class TimeClient {
13
14 public void connect(int port,String host) throws Exception{
15 //创建客户端处理I/O读写的NioEventLoopGroup Group线程组
16 EventLoopGroup group = new NioEventLoopGroup();
17 try{
18 //创建客户端辅助启动类Bootstrap
19 Bootstrap b = new Bootstrap();
20 b.group(group).channel(NioSocketChannel.class)
21 .option(ChannelOption.TCP_NODELAY, true)
22 .handler(new ChannelInitializer<SocketChannel>(){
23 //将ChannelHandler设置到ChannelPipleline中,用于处理网络I/O事件
24 @Override
25 protected void initChannel(SocketChannel ch) throws Exception {
26 ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
27 ch.pipeline().addLast(new StringDecoder());
28 ch.pipeline().addLast(new TimeClientHandler());
29 }
30 });
31 //发起异步连接操作,然后调用同步方法等待连接成功。
32 ChannelFuture f = b.connect(host,port).sync();
33
34 //等待客户端链路关闭
35 f.channel().closeFuture().sync();
36 }finally{
37 //优雅退出,释放NIO线程组
38 group.shutdownGracefully();
39 }
40 }
41
42 public static void main(String[] args) throws Exception{
43 int port = 10001;
44 if(args != null && args.length > 0){
45 try{
46 port = Integer.valueOf(args[0]);
47 }catch(NumberFormatException e){
48 //采用默认值
49 }
50 }
51 new TimeClient().connect(port, "0.0.0.0");
52 }
53
54 }
TimeClientHandler,拿到的msg已经是解码成字符串之后的应答消息:
1 import io.netty.buffer.ByteBuf;
2 import io.netty.buffer.Unpooled;
3 import io.netty.channel.ChannelHandlerAdapter;
4 import io.netty.channel.ChannelHandlerContext;
5
6 import java.util.logging.Logger;
7
8 public class TimeClientHandler extends ChannelHandlerAdapter{
9
10 private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
11
12 private int counter;
13
14 private byte[]req;
15
16 public TimeClientHandler(){
17 req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
18 }
19
20 //当客户端与服务端TCP链路简历成功后,Netty的NIO线程会调用该方法,发送查询时间的指令给服务器
21 public void channelActive(ChannelHandlerContext ctx){
22 //将请求消息发送给服务端
23 ByteBuf message = null;
24 for(int i = 0;i<100;i++){
25 message = Unpooled.buffer(req.length);
26 message.writeBytes(req);
27 ctx.writeAndFlush(message);
28 }
29 }
30
31 //当服务器返回应答消息时,该方法被调用
32 public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
33 String body = (String)msg;
34 System.out.println("Now is:" + body + "; the counter is :" + ++counter);
35 }
36
37 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
38
39 //释放资源
40 logger.warning("Unexpected exception from downstream :" + cause.getMessage());
41 ctx.close();
42 }
43 }
运行结果:
发现。。就木有粘包或拆包的问题啦~~~~
机缘巧合,同事也一起实现了Scala版~
clientHandler:
1 package main.nettyscala
2
3 import io.netty.buffer.{ByteBuf, Unpooled}
4 import io.netty.channel.{ChannelInboundHandlerAdapter, ChannelHandlerContext, ChannelHandlerAdapter}
5
6 /**
7 * Created by root on 2016/11/18.
8 */
9 class ClientHandler extends ChannelInboundHandlerAdapter {
10 override def channelActive(ctx: ChannelHandlerContext): Unit = {
11 println("channelActive")
12 //val content = "hello server"
13 val content = Console.readLine()
14 ctx.writeAndFlush(Unpooled.copiedBuffer(content.getBytes("UTF-8")))
15 //发送case class 不在发送字符串了,封装一个字符串
16 // ctx.writeAndFlush(RegisterMsg("hello server"))
17 }
18
19 override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
20 println("channelRead")
21 val byteBuf = msg.asInstanceOf[ByteBuf]
22 val bytes = new Array[Byte](byteBuf.readableBytes())
23 byteBuf.readBytes(bytes)
24 val message = new String(bytes, "UTF-8")
25 println(message)
26 }
27
28 override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
29 println("channeReadComplete")
30 ctx.flush()
31 }
32 //发送异常时关闭
33 override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
34 println("exceptionCaught")
35 ctx.close()
36 }
37
38 }
NettyClient:
1 package main.nettyscala
2
3 import io.netty.bootstrap.Bootstrap
4 import io.netty.channel.ChannelInitializer
5 import io.netty.channel.nio.NioEventLoopGroup
6 import io.netty.channel.socket.SocketChannel
7 import io.netty.channel.socket.nio.{NioSocketChannel, NioServerSocketChannel}
8 import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}
9
10
11 object NettyClient {
12 def main(args: Array[String]) {
13 val host = args(0)
14 val port = args(1).toInt
15 val client = new NettyClient
16 client.connect(host, port)
17 }
18 }
19
20 class NettyClient {
21 def connect(host: String, port: Int): Unit = {
22 //创建客户端NIO线程组
23 val eventGroup = new NioEventLoopGroup
24 //创建客户端辅助启动类
25 val bootstrap = new Bootstrap
26 try {
27 //将NIO线程组传入到Bootstrap
28 bootstrap.group(eventGroup)
29 //创建NioSocketChannel
30 .channel(classOf[NioSocketChannel])
31 //绑定I/O事件处理类
32 .handler(new ChannelInitializer[SocketChannel] {
33 override def initChannel(ch: SocketChannel): Unit = {
34 ch.pipeline().addLast(
35 // new ObjectEncoder,
36 // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
37 new ClientHandler
38 )
39 }
40 })
41 //发起异步连接操作
42 val channelFuture = bootstrap.connect(host, port).sync()
43 //等待服务关闭
44 channelFuture.channel().closeFuture().sync()
45 } finally {
46 //优雅的退出,释放线程池资源
47 eventGroup.shutdownGracefully()
48 }
49 }
50 }
NettyServer:
1 package main.nettyscala
2
3 /**
4 * Created by root on 12/8/16.
5 */
6 import io.netty.bootstrap.ServerBootstrap
7 import io.netty.channel.ChannelInitializer
8 import io.netty.channel.nio.NioEventLoopGroup
9 import io.netty.channel.socket.SocketChannel
10 import io.netty.channel.socket.nio.NioServerSocketChannel
11 import io.netty.handler.codec.serialization.{ClassResolvers, ClassResolver, ObjectDecoder, ObjectEncoder}
12
13
14 object NettyServer {
15 def main(args: Array[String]) {
16 val host = args(0)
17 val port = args(1).toInt
18 val server = new NettyServer
19 server.bind(host, port)
20 }
21 }
22 class NettyServer {
23 def bind(host: String, port: Int): Unit = {
24 //配置服务端线程池组
25 //用于服务器接收客户端连接
26 val bossGroup = new NioEventLoopGroup()
27 //用户进行SocketChannel的网络读写
28 val workerGroup = new NioEventLoopGroup()
29
30 try {
31 //是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
32 val bootstrap = new ServerBootstrap()
33 //将两个NIO线程组作为参数传入到ServerBootstrap
34 bootstrap.group(bossGroup, workerGroup)
35 //创建NioServerSocketChannel
36 .channel(classOf[NioServerSocketChannel])
37 //绑定I/O事件处理类
38 .childHandler(new ChannelInitializer[SocketChannel] {
39 override def initChannel(ch: SocketChannel): Unit = {
40 ch.pipeline().addLast(
41 // new ObjectEncoder,
42 // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
43 new ServerHandler
44 )
45 }
46 })
47 //绑定端口,调用sync方法等待绑定操作完成
48 val channelFuture = bootstrap.bind(host, port).sync()
49 //等待服务关闭
50 channelFuture.channel().closeFuture().sync()
51 } finally {
52 //优雅的退出,释放线程池资源
53 bossGroup.shutdownGracefully()
54 workerGroup.shutdownGracefully()
55 }
56 }
57 }
ServerHandler:
1 package main.nettyscala
2
3 /**
4 * Created by root on 12/8/16.
5 */
6 import io.netty.buffer.{Unpooled, ByteBuf}
7 import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
8
9 /**
10 * Created by root on 2016/11/18.
11 */
12 class ServerHandler extends ChannelInboundHandlerAdapter {
13 /**
14 * 有客户端建立连接后调用
15 */
16 override def channelActive(ctx: ChannelHandlerContext): Unit = {
17 println("channelActive invoked")
18 }
19
20 /**
21 * 接受客户端发送来的消息
22 */
23 override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
24 println("channelRead invoked")
25 val byteBuf = msg.asInstanceOf[ByteBuf]
26 val bytes = new Array[Byte](byteBuf.readableBytes())
27 byteBuf.readBytes(bytes)
28 val message = new String(bytes, "UTF-8")
29 println(message)
30 val back = "received message: " + message
31 val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8"))
32 println(msg)
33 ctx.write(resp)
34 }
35
36 /**
37 * 将消息对列中的数据写入到SocketChanne并发送给对方
38 */
39 override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
40 println("channekReadComplete invoked")
41 ctx.flush()
42 }
43
44
45 }
RegisterMsg:
1 package main.nettyscala
2
3 /**
4 * Created by root on 12/8/16.
5 */
6 case class RegisterMsg(content: String) extends Serializable
运行结果: