专栏首页编程坑太多『互联网架构』软件架构-netty粘包分包编码解码(57)

『互联网架构』软件架构-netty粘包分包编码解码(57)

(一)粘包分包概念

  • 粘包

TCP 由于TCP协议本身的机制(面向连接的可靠地协议-三次握手机制)客户端与服务器会维持一个连接(Channel),数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用Nagle算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP的网络延迟要UDP的高些)然后再发送(超时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象(确切来讲,对于基于TCP协议的应用,不应用包来描述,而应 用 流来描述),个人认为服务器接收端产生的粘包应该与linux内核处理socket的方式 select轮询机制的线性扫描频度无关。 UDP 本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有Nagle算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP头+IP头等等发一次数据封装一次)也就没有粘包一说了。

  • 分包

可能是IP分片传输导致的,也可能是传输过程中丢失部分包导致出现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系),总之就是一个数据包被分成了多次接收。

  • TCP当中,只有流的概念,没有包的概念(根本原因)

简单的概括

(1)粘包: 1.服务端 原因收到的数据放在系统接收缓冲区,用户进程从该缓冲区取数据 2.客户端 原因TCP为提高传输效率,要收集到足够多的数据后才发送一包数据

(2).分包: 1.应用程序写入的字节大小大于套接字发送缓冲区的大小 2.进行mss(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS 3.以太网帧的payload(净荷)大于MTU(1500字节)进行ip分片

(二)Netty粘包分包现象演示

源码:pack目录下的error Server.java

package com.dig8.pack.error;import org.jboss.netty.bootstrap.ServerBootstrap;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * netty服务端 *  * @author idig8.com */public class Server {    public static void main(String[] args) {        //  服务类        ServerBootstrap bootstrap = new ServerBootstrap();        //  boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程        ExecutorService boss = Executors.newCachedThreadPool();        //  worker线程负责数据读写        ExecutorService worker = Executors.newCachedThreadPool();        //  设置niosocket工厂        bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));        //  设置管道的工厂        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            @Override            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline pipeline = Channels.pipeline();                // 管道过滤器                pipeline.addLast("myHandler", new ServerHandler());                return pipeline;            }        });        // 服务类绑定端口        bootstrap.bind(new InetSocketAddress(8888));    }}

ServerHandler.java

package com.dig8.pack.error;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;/** * @author idig8.com */public class ServerHandler extends SimpleChannelHandler{    @Override    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        ChannelBuffer buffer = (ChannelBuffer) e.getMessage();        byte[] bs = buffer.array();        System.out.println("server receive data: " +new String(bs));    }}

Client.java

package com.dig8.pack.error;import org.jboss.netty.bootstrap.ClientBootstrap;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;import org.jboss.netty.handler.codec.string.StringEncoder;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 客户端 *  * @author idig8.com */public class Client {    public static void main(String[] args) throws Exception {        //服务类        ClientBootstrap bootstrap = new  ClientBootstrap();        //线程池        ExecutorService boss = Executors.newCachedThreadPool();        ExecutorService worker = Executors.newCachedThreadPool();        //socket工厂        bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));        //管道工厂        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            @Override            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline pipeline = Channels.pipeline();                pipeline.addLast("1", new StringEncoder());                pipeline.addLast("2", new ClientHandler());                return pipeline;            }        });        //连接服务端        bootstrap.connect(new InetSocketAddress("127.0.0.1", 8888)).sync();    }}

ClientHandler.java

package com.dig8.pack.error;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.*;/** * 客户端消息处理类 * @author idig8.com */public class ClientHandler extends SimpleChannelHandler {    // 包头    private static final int HEAD_FLAG = -32323231;    @Override    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {        Channel channel = ctx.getChannel();        String msg = "Hello,idig8.com";        for (int i = 0; i < 1000; i++) {            channel.write(msg);        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {    }}

运行后出现粘包和分包现象

(三)粘包分包问题解决思路

服务端和客户端约定好稳定的数据包结构

1.客户端根据约定的数据包结构发送数据 2.服务端根据约定的数据包结构来读取数据

通过MyDecoder集成FrameDecoder的方式来 源码:pack目录下的custom Server.java

package com.dig8.pack.custom;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;/** * netty服务端 *  * @author idig8.com */public class Server {    public static void main(String[] args) {        //  服务类        ServerBootstrap bootstrap = new ServerBootstrap();        //  boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程        ExecutorService boss = Executors.newCachedThreadPool();        //  worker线程负责数据读写        ExecutorService worker = Executors.newCachedThreadPool();        //  设置niosocket工厂        bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));        //  设置管道的工厂        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            @Override            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline pipeline = Channels.pipeline();                // 管道过滤器                pipeline.addLast("myDecoder", new MyDecoder());                pipeline.addLast("myHandler", new ServerHandler());                return pipeline;            }        });        // 服务类绑定端口        bootstrap.bind(new InetSocketAddress(7778));    }}

ServerHandler.java

package com.dig8.pack.custom;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;/** * @author idig8.com */public class ServerHandler extends SimpleChannelHandler{    @Override    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        /*ChannelBuffer buffer = (ChannelBuffer) e.getMessage();        byte[] bs = buffer.array();*/        System.out.println("server receive data: " + e.getMessage());    }}

Client.java

package com.dig8.pack.custom;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ClientBootstrap;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;import org.jboss.netty.handler.codec.string.StringEncoder;/** * 客户端 *  * @author idig8.com */public class Client {    public static void main(String[] args) throws Exception {        //服务类        ClientBootstrap bootstrap = new  ClientBootstrap();        //线程池        ExecutorService boss = Executors.newCachedThreadPool();        ExecutorService worker = Executors.newCachedThreadPool();        //socket工厂        bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));        //管道工厂        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            @Override            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline pipeline = Channels.pipeline();                pipeline.addLast("1", new StringEncoder());                pipeline.addLast("2", new ClientHandler());                return pipeline;            }        });        //连接服务端        bootstrap.connect(new InetSocketAddress("127.0.0.1", 7778)).sync();    }}

ClientHandler.java

package com.dig8.pack.custom;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ChannelStateEvent;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.SimpleChannelHandler;/** * 客户端消息处理类 * @author idig8.com */public class ClientHandler extends SimpleChannelHandler {    // 包头    private static final int HEAD_FLAG = -32323231;    @Override    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {        Channel channel = ctx.getChannel();        String msg = "Hello,idig8.com 通过定义包头+长度+数据 防止粘包和分包";        byte[] bytes = msg.getBytes();        // 定义数据包 ,结构为:包头 + 长度 + 数据        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();        // 1.写包头        buffer.writeInt(HEAD_FLAG);// 4字节        // 2.写长度        buffer.writeInt(bytes.length);// 4字节        // 3.写数据本身        buffer.writeBytes(bytes);        for (int i = 0; i < 1000; i++) {            channel.write(buffer);        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {    }}

MyDecoder.java

/** *  */package com.dig8.pack.custom;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.frame.FrameDecoder;/** * @author idig8.com */public class MyDecoder extends FrameDecoder{    // 包头    private static final int HEAD_FLAG = -32323231;    // 数据包基本长度    private final static int BASE_LENGTH = 4 + 4;    @Override    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {        // 收到数据之后,先判断buffer中可读的数据长度是否大于数据包的基本长度        if(buffer.readableBytes() > BASE_LENGTH){            // 防止socket攻击:            if(buffer.readableBytes() > 4096 * 2){ // 4k                System.out.println("socket 攻击了");                buffer.skipBytes(buffer.readableBytes());            }            // 记录包头开始的位置            int headIndex;            while(true){                headIndex = buffer.readerIndex();                buffer.markReaderIndex();                // 代码很关键                if(buffer.readableBytes() < 4){// 包头的长度                    buffer.readerIndex(headIndex);                    return null;                }                // 此时说明包头的长度是足够的                // 正好读取的是包头                if(buffer.readInt() == HEAD_FLAG ){                    break;                }                // [1,2,3,4] 1 1 1 1 1                // 如果不是包头,需要略过一个字节,在略过之前,需要还原读指针位置                buffer.resetReaderIndex();                buffer.readByte();// 略过一个字节                if(buffer.readableBytes() < BASE_LENGTH){                    return null;                }            }            // 此时说明有数据包到来            // 做标记(记住当前读指针的位置)            // buffer.markReaderIndex();            // 1.读长度            int dataLength = buffer.readInt();            if(buffer.readableBytes() < dataLength){                // 说明数据本身的长度还不够, 肯定要继续等待后面的数据到来                // 还原读指针的位置                buffer.readerIndex(headIndex);                return null;            }            // 此时说明数据包已经位置            // 2.读数据本身            byte[] dst = new byte[dataLength];            buffer.readBytes(dst);            // 继续传递下去            // ?            // 如果此时buffer中的数据还没有读完,那么剩下的数据怎么办?            return new String(dst);        }        // return null 表示此时的数据包不完整,需要继续等待下一个数据包的到来 ?        return null;    }}

(三)Netty自带粘包分包解决方案

消息定长

1.FixedLengthFrameDecoder

行分隔符

2.LineBasedFrameDecoder

自定义特殊符号进行分割

3.DelimiterBasedFrameDecoder

源码:pack目录下的nettysolution Server.java

package com.dig8.pack.nettysolution;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;import org.jboss.netty.handler.codec.frame.LineBasedFrameDecoder;import org.jboss.netty.handler.codec.string.StringDecoder;/** * 服务端 * @author idig8.com */public class Server {    public static void main(String[] args) throws Exception {        // 服务类        ServerBootstrap bootstrap = new ServerBootstrap();        // boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程        ExecutorService boss = Executors.newCachedThreadPool();        // worker线程负责数据读写        ExecutorService worker = Executors.newCachedThreadPool();        // 设置niosocket工厂        bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));        // 设置管道的工厂        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            @Override            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline pipeline = Channels.pipeline();                // 管道过滤器                // 方案1:消息定长                //pipeline.addLast("fixedLength", new FixedLengthFrameDecoder(18));                // 方案2:行分隔符                //pipeline.addLast("fixedLength", new LineBasedFrameDecoder(1024));                // 方案3:自定义特殊符号进行分割                pipeline.addLast("delimiter", new DelimiterBasedFrameDecoder(1024,                         ChannelBuffers.copiedBuffer("#@#".getBytes())));                pipeline.addLast("1",new StringDecoder());                  pipeline.addLast("2",new ServerMessageHandler());                       return pipeline;            }        });        // 服务类绑定端口        bootstrap.bind(new InetSocketAddress(7777));        System.out.println("服务端启动...");    }}

ServerMessageHandler.java

package com.dig8.pack.nettysolution;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;/** * 服务端消息处理类 * @author idig8.com */public class ServerMessageHandler extends SimpleChannelHandler {    /**     * 接收消息     */    @Override    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        System.out.println("receive request: " + e.getMessage());    }    /**     * 异常处理     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {    }}

Client.java

package com.dig8.pack.nettysolution;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ClientBootstrap;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.ChannelFuture;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;import org.jboss.netty.handler.codec.frame.LineBasedFrameDecoder;import org.jboss.netty.handler.codec.string.StringEncoder;/** * 客户端 *  * @author idig8.com */public class Client {    public static void main(String[] args) throws Exception {        //服务类        ClientBootstrap bootstrap = new  ClientBootstrap();        //线程池        ExecutorService boss = Executors.newCachedThreadPool();        ExecutorService worker = Executors.newCachedThreadPool();        //socket工厂        bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));        //管道工厂        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            @Override            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline pipeline = Channels.pipeline();                // 方案1:消息定长                //pipeline.addLast("fixedLength", new FixedLengthFrameDecoder(18));                // 方案2:行分隔符                //pipeline.addLast("fixedLength", new LineBasedFrameDecoder(1024));                // 方案3:自定义特殊符号进行分割                pipeline.addLast("delimiter", new DelimiterBasedFrameDecoder(1024,                         ChannelBuffers.copiedBuffer("#@#".getBytes())));                pipeline.addLast("1",new StringEncoder());                pipeline.addLast("2", new ClientMessageHandler());                return pipeline;            }        });        //连接服务端        @SuppressWarnings("unused")        ChannelFuture connect = bootstrap.connect(new InetSocketAddress("127.0.0.1", 7777)).sync();//        Channel channel = connect.getChannel();//        System.out.println("client start");//        Scanner scanner = new Scanner(System.in);//        while(true){//            System.out.println("请输入:");//            channel.write(scanner.next());//        }    }}

ClientMessageHandler.java

package com.dig8.pack.nettysolution;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ChannelStateEvent;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;/** * 客户端消息接受处理类 * @author  idig8.com */public class ClientMessageHandler extends SimpleChannelHandler {    /**     * 接收消息     */    @Override    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        System.out.println("server response : " + e.getMessage());    }    /**     * 新连接     */    @Override    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {        Channel channel = ctx.getChannel();        String separator = "#@#";//System.getProperty("line.separator");// 系统换行符        String msg = "idig8.com send cmd";        for (int i = 0; i < 1000; i++) {            channel.write(msg + i + separator);        }    }    /**     * 异常处理     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {    }}

PS:基本上netty针对tcp 分包粘包已经说完了,确实有了netty真的很方便比传统的socket方便很多。下次说说http 协议实现。

本文分享自微信公众号 - 编程坑太多(idig88),作者:诸葛阿明

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-05-22

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 『互联网架构』软件架构-netty线程模型源码(55)

    1.bootstrap Netty服务端及客户端启动类 2.buffer 缓冲相关,对NIO Buffer做了一些优化、封装 3.channel 处理客户端与服...

    IT故事会
  • 「小程序JAVA实战」小程序的视频点赞功能开发(62)

    IT故事会
  • springboot (九) Swagger2实现Restful API

    IT故事会
  • 『互联网架构』软件架构-netty线程模型源码(55)

    1.bootstrap Netty服务端及客户端启动类 2.buffer 缓冲相关,对NIO Buffer做了一些优化、封装 3.channel 处理客户端与服...

    IT故事会
  • SSM Spring SpringMVC Mybatis框架整合Java配置完整版

      以前用着SSH都是老师给配好的,自己直接改就可以。但是公司主流还是SSM,就自己研究了一下Java版本的配置。网上大多是基于xnl的配置,但是越往后越新的项...

    用户2417870
  • drools规则引擎的动态数据库交接使用和压力测试

    技术从心
  • 配置Eclipse可以查看JDK源码

    Window->Preferences->Java->Installed JREs

    SmileNicky
  • 分布式事务系列--SpringCloud整合byteTCC框架0.5.x版本2

    在使用tcc框架处理分布式事务时,需要我们自己来编写tcc业务代码。这里演示一个简单的加钱的操作。

    IT云清
  • Appium系列|Appium测试框架完善(三)

    当测试脚本有一定数量的情况下,每次执行完所有的测试脚本会发现或多或少的测试脚本执行失败了,失败主要是两种原因一种是源程序bug,另外一种是测试脚本由于各方面的原...

    测试邦
  • 绕过验证码

    所以我开始寻找验证码最常见的地方,比如注册、登录和密码重置页面,我找到的那个是在登录页面。

    用户1631416

扫码关注云+社区

领取腾讯云代金券