首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往
29 篇文章
1
netty案例,netty4.1基础入门篇一《嗨!NettyServer》
2
netty案例,netty4.1基础入门篇二《NettyServer接收数据》
3
netty案例,netty4.1基础入门篇零《初入JavaIO之门BIO、NIO、AIO实战练习》
4
netty案例,netty4.1基础入门篇三《NettyServer字符串解码器》
5
netty案例,netty4.1基础入门篇四《NettyServer收发数据》
6
netty案例,netty4.1基础入门篇五《NettyServer字符串编码器》
7
netty案例,netty4.1基础入门篇六《NettyServer群发消息》
8
netty案例,netty4.1基础入门篇九《自定义编码解码器,处理半包、粘包数据》
9
netty案例,netty4.1基础入门篇七《嗨!NettyClient》
10
netty案例,netty4.1基础入门篇八《NettyClient半包粘包处理》发数据方式》
11
netty案例,netty4.1基础入门篇十《关于ChannelOutboundHandlerAdapter简单使用》
12
netty案例,netty4.1基础入门篇十一《netty udp通信方式案例Demo》
13
netty案例,netty4.1基础入门篇十二《简单实现一个基于Netty搭建的Http服务》
14
netty案例,netty4.1中级拓展篇二《Netty使用Protobuf传输数据》
15
netty案例,netty4.1中级拓展篇一《Netty与SpringBoot整合》
16
netty案例,netty4.1中级拓展篇三《Netty传输Java对象》
17
netty案例,netty4.1中级拓展篇五《基于Netty搭建WebSocket,模仿微信聊天页面》
18
netty案例,netty4.1中级拓展篇六《SpringBoot+Netty+Elasticsearch收集日志存储》
19
netty案例,netty4.1中级拓展篇七《Netty请求响应同步通信》
20
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》
21
《Netty + JavaFx 实战:仿桌面版微信聊天》
22
手写类似dubbo的rpc框架第三章《rpc框架》
23
手写类似dubbo的rpc框架第二章《netty通信》
24
netty案例,netty4.1中级拓展篇六《SpringBoot+Netty+Elasticsearch收集日志信息数据存储》
25
netty案例,netty4.1中级拓展篇七《Netty请求响应同步通信》
26
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》
27
netty案例,netty4.1基础入门篇九《自定义编码解码器》
28
netty案例,netty4.1基础入门篇十《关于ChannelOutboundHandlerAdapter简单使用》
29
netty案例,netty4.1基础入门篇九《自定义编码解码器,处理半包、粘包数据》

netty案例,netty4.1中级拓展篇二《Netty使用Protobuf传输数据》

小傅哥 | https://bugstack.cn 沉淀、分享、成长,让自己和他人都能有所收获。专注于原创专题案例编写,目前已完成的专题有;Netty4.x实战专题案例、用Java实现JVM、基于JavaAgent的全链路监控、手写RPC框架、架构设计专题案例、源码分析等。你用剑🗡、我用刀🔪,好的代码都很烧,望你不吝出招!

一、前言介绍

在netty数据传输过程中可以有很多选择,比如;字符串、json、xml、java对象,但为了保证传输的数据具备;良好的通用性、方便的操作性和传输的高性能,我们可以选择protobuf作为我们的数据传输格式。目前protobuf可以支持;C++、C#、Dart、Go、Java、Python等,也可以在JS里使用。知识点;ProtobufDecoder、ProtobufEncoder、ProtobufVarint32FrameDecoder、ProtobufVarint32LengthFieldPrepender。

What are protocol buffers?

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages. https://developers.google.cn/protocol-buffers

二、开发环境

  1. jdk1.8【jdk1.7以下只能部分支持netty】
  2. Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】
  3. protoc-3.5.0-win32 【用于编译proto文件(protoc -I=源地址 --java_out=目标地址 源地址/xxx.proto),源码中已经提供,如果是其他开发环境可以自行下载】

三、代码示例

代码语言:txt
复制
itstack-demo-netty-2-02
└── src
    ├── main
    │   └── java
    │       └── org.itstack.demo.netty
    │           ├── client
    │           │	├── MyChannelInitializer.java
    │           │	├── MyClientHandler.java
    │           │	└── NettyClient.java
    │           ├── domain
    │           │	├── MsgBody.java
    │           │	├── MsgBodyOrBuilder.java
    │           │	└── MsgInfo.java
    │           ├── proto
    │           │	└── MsgInfo.proto
    │           ├── server
    │           │	├── MyChannelInitializer.java
    │           │	├── MyServerHandler.java
    │           │	└── NettyServer.java
    │           └── util
    │           	└── MsgUtil.java
    │
    └── test
         └── java
             └── org.itstack.demo.test
                 └── ApiTest.java

client/MyChannelInitializer.java

代码语言:txt
复制
**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        //protobuf 处理
        channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
        channel.pipeline().addLast(new ProtobufDecoder(MsgBody.getDefaultInstance()));
        channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
        channel.pipeline().addLast(new ProtobufEncoder());
        // 在管道中添加我们自己的接收数据实现方法
        channel.pipeline().addLast(new MyClientHandler());
    }

}

client/MyClientHandler.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class MyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("链接报告开始");
        System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());
        System.out.println("链接报告IP:" + channel.localAddress().getHostString());
        System.out.println("链接报告Port:" + channel.localAddress().getPort());
        System.out.println("链接报告完毕");
        //通知客户端链接建立成功
        String str = "通知服务端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString();
        ctx.writeAndFlush(MsgUtil.buildMsg(channel.id().toString(), str));
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("断开链接" + ctx.channel().localAddress().toString());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息类型:" + msg.getClass());
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息内容:" + JsonFormat.printToString((MsgBody) msg));
    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println("异常信息:\r\n" + cause.getMessage());
    }

}

client/NettyClient.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class NettyClient {

    public static void main(String[] args) {
        new NettyClient().connect("127.0.0.1", 7397);
    }

    private void connect(String inetHost, int inetPort) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.AUTO_READ, true);
            b.handler(new MyChannelInitializer());
            ChannelFuture f = b.connect(inetHost, inetPort).sync();
            System.out.println("itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}");

            f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"));
            f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"));
            f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"));
            f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"));
            f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"));

            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

proto/MsgInfo.proto

代码语言:txt
复制
syntax = "proto3";

package org.itstack.demo.netty.domain;

option java_package = "org.itstack.demo.netty.domain";
option java_multiple_files = true;
option java_outer_classname = "MsgInfo";

message MsgBody {

    string channelId = 1;
    string msgInfo = 2;

}

server/MyChannelInitializer.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) {
        //protobuf 处理
        channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
        channel.pipeline().addLast(new ProtobufDecoder(MsgBody.getDefaultInstance()));
        channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
        channel.pipeline().addLast(new ProtobufEncoder());
        // 在管道中添加我们自己的接收数据实现方法
        channel.pipeline().addLast(new MyServerHandler());
    }

}

server/MyServerHandler.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("链接报告开始");
        System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());
        System.out.println("链接报告IP:" + channel.localAddress().getHostString());
        System.out.println("链接报告Port:" + channel.localAddress().getPort());
        System.out.println("链接报告完毕");
        //通知客户端链接建立成功
        String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
        ctx.writeAndFlush(MsgUtil.buildMsg(channel.id().toString(), str));
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息类型:" + msg.getClass());
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息内容:" + JsonFormat.printToString((MsgBody) msg));
    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println("异常信息:\r\n" + cause.getMessage());
    }

}

server/NettyServer.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class NettyServer {

    public static void main(String[] args) {
        new NettyServer().bing(7397);
    }

    private void bing(int port) {
        //配置服务端NIO线程组
        EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)    //非阻塞模式
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new MyChannelInitializer());
            ChannelFuture f = b.bind(port).sync();
            System.out.println("itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}");
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            childGroup.shutdownGracefully();
            parentGroup.shutdownGracefully();
        }

    }

}

util/MsgUtil.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class MsgUtil {

    /**
     * 构建protobuf消息体
     */
    public static MsgBody buildMsg(String channelId, String msgInfo) {
        MsgBody.Builder msg = MsgBody.newBuilder();
        msg.setChannelId(channelId);
        msg.setMsgInfo(msgInfo);
        return msg.build();
    }

}

ApiTest.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class ApiTest {

    public static void main(String[] args) throws JsonFormat.ParseException {
        MsgBody.Builder msg = MsgBody.newBuilder();
        msg.setChannelId("abD01223");
        msg.setMsgInfo("hi helloworld");
        MsgBody msgBody = msg.build();

        //protobuf转Json 需要引入protobuf-java-format
        String msgBodyStr = JsonFormat.printToString(msgBody);
        System.out.println(msgBodyStr);

        //json转protobuf 需要引入protobuf-java-format
        JsonFormat.merge("{\"channelId\": \"HBdhi993\",\"msgInfo\": \"hi bugstack虫洞栈\"}", msg);
        msgBody = msg.build();
        System.out.println(msgBody.getChannelId());
        System.out.println(msgBody.getMsgInfo());

    }

}

四、测试结果

编译proto *idea的Terminal下执行编译命令

操作:idea选中E:\itstack\GIT\itstack.org\itstack-demo-netty\itstack-demo-netty-2-02\protoc-3.5.0-win32\bin

命令:protoc -I=源地址 --java_out=目标地址 源地址/xxx.proto

代码语言:txt
复制
E:\itstack\GIT\itstack.org\itstack-demo-netty\itstack-demo-netty-2-02\protoc-3.5.0-win32\bin>protoc.exe -I=E:\itstack\GIT\itstack.org\itstack-demo-netty\itstack-demo-netty-2-02\src\main\java\org\itstack\demo\netty\proto --java_out=E:\itstack\GIT\itstack.org\itstack-demo-netty\itstack-demo-netty-2-02\src\main
\java MsgInfo.proto

启动NettyServer

启动NettyClient

服务端执行结果

代码语言:txt
复制
itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}
链接报告开始
链接报告信息:有一客户端链接到本服务端。channelId:807679da
链接报告IP:127.0.0.1
链接报告Port:7397
链接报告完毕
2019-08-04 14:06:01 接收到消息类型:class org.itstack.demo.netty.domain.MsgBody
2019-08-04 14:06:01 接收到消息内容:{"channelId": "abc14b89","msgInfo": "通知服务端链接建立成功 Sun Aug 04 14:06:01 CST 2019 127.0.0.1"}
2019-08-04 14:06:01 接收到消息类型:class org.itstack.demo.netty.domain.MsgBody
2019-08-04 14:06:01 接收到消息内容:{"channelId": "abc14b89","msgInfo": "你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"}
2019-08-04 14:06:01 接收到消息类型:class org.itstack.demo.netty.domain.MsgBody
2019-08-04 14:06:01 接收到消息内容:{"channelId": "abc14b89","msgInfo": "你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"}
2019-08-04 14:06:01 接收到消息类型:class org.itstack.demo.netty.domain.MsgBody
2019-08-04 14:06:01 接收到消息内容:{"channelId": "abc14b89","msgInfo": "你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"}
2019-08-04 14:06:01 接收到消息类型:class org.itstack.demo.netty.domain.MsgBody
2019-08-04 14:06:01 接收到消息内容:{"channelId": "abc14b89","msgInfo": "你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"}
2019-08-04 14:06:01 接收到消息类型:class org.itstack.demo.netty.domain.MsgBody
2019-08-04 14:06:01 接收到消息内容:{"channelId": "abc14b89","msgInfo": "你好,使用protobuf通信格式的服务端,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,关注我获取案例源码。"}
异常信息:
远程主机强迫关闭了一个现有的连接。
客户端断开链接/127.0.0.1:7397

Process finished with exit code -1

客户端执行结果

代码语言:txt
复制
itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}
链接报告开始
链接报告信息:本客户端链接到服务端。channelId:abc14b89
链接报告IP:127.0.0.1
链接报告Port:51218
链接报告完毕
2019-08-04 14:06:01 接收到消息类型:class org.itstack.demo.netty.domain.MsgBody
2019-08-04 14:06:01 接收到消息内容:{"channelId": "807679da","msgInfo": "通知客户端链接建立成功 Sun Aug 04 14:06:01 CST 2019 127.0.0.1\r\n"}

Process finished with exit code -1

下一篇
举报
领券