前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >nio与netty编程(二)

nio与netty编程(二)

作者头像
周杰伦本人
发布2022-10-25 16:52:18
3750
发布2022-10-25 16:52:18
举报
文章被收录于专栏:同步文章同步文章

文章目录

四 netty

Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用 Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

image-20200808112921876
image-20200808112921876

Netty 整体设计

线程模型
单线程模型
image-20200808113429095
image-20200808113429095

服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,咱们前面的 NIO 案例就属于这种模型。

线程池模型
image-20200808113511069
image-20200808113511069

服务器端采用一个线程专门处理客户端连接请求,采用一个线程池负责 IO 操作。在绝大多数场景下,该模型都能满足使用。

netty模型
image-20200808113824627
image-20200808113824627

比较类似于上面的线程池模型,Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。 NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责。

  • 一个 NioEventLoopGroup 下包含多个 NioEventLoop
  • 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
  • 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
  • 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
  • 每个 NioChannel 都绑定有一个自己的 ChannelPipeline
异步模型

FUTURE, CALLBACK 和 HANDLER Netty 的异步模型是建立在 future 和 callback 的之上的。callback 大家都比较熟悉了,这里重点说说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future去监控方法 fun 的处理过程。 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来。

image-20200808113947065
image-20200808113947065

核心API

ChannelHandler 及其实现类

ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具 体的业务逻辑。API 关系如下图所示:

image-20200808114529898
image-20200808114529898

我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过 重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法:

  • public void channelActive(ChannelHandlerContext ctx),通道就绪事件
  • public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
  • public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
  • public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件
Pipeline 和 和 ChannelPipeline

ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。

image-20200808114654238
image-20200808114654238
  • ChannelPipeline addFirst(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的第一个位置
  • ChannelPipeline addLast(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的最后一个位置
ChannelHandlerContext

这是事件处理器上下文对象,Pipeline 链中的实际处理节点。每个处理节点ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同 时ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler 进行调用。常用方法如下所示:

  • ChannelFuture close(),关闭通道
  • ChannelOutboundInvoker flush(),刷新
  • ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前ChannelHandler 的下一个ChannelHandler 开始处理(出站)
ChannelOption

Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:

  1. ChannelOption.SO_BACKLOG 对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定了队列的大小。
  2. ChannelOption.SO_KEEPALIVE ,一直保持连接活动状态。
ChannelFuture

表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作的处理状态。 常用方法如下所示:

  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕
EventLoopGroup 和其实现类 NioEventLoopGroup

EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个EventLoop 维护着一个 Selector 实例。 EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理,如下图所示:

image-20200808114827573
image-20200808114827573

BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来,通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。 常用方法如下所示:

  • public NioEventLoopGroup(),构造方法
  • public Future<?> shutdownGracefully(),断开连接,关闭线程
ServerBootstrap 和 和 Bootstrap

ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置; Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下所示:

  • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup), 该方法用于服务器端,用来设置两个 EventLoop
  • public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
  • public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现
  • public <T> B option(ChannelOption<T> option, T value),用来给 ServerChannel 添加配置 public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value),用来给接收到的通道添加配置
  • public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务 处理类(自定义的 handler)
  • public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
  • public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连接服务器端
Unpooled 类

这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示: public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)

netty入门demo

pom文件

代码语言:javascript
复制
<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.6.1</version>
        </dependency>
        <dependency>
            <groupId>com.github.markusmo3.urm</groupId>
            <artifactId>urm-core</artifactId>
            <version>1.4.4</version>
        </dependency>
    </dependencies>

定义了一个服务器端业务处理类,继承 ChannelInboundHandlerAdapter,并分别重写了三个方法。

代码语言:javascript
复制
package com.xiepanpan.netty.basic;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description: 服务端的业务处理类
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server:"+ctx);
        ByteBuf byteBuf = (ByteBuf) msg;

        System.out.println("客户端发来的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("好的  我现在在忙 一会儿和您联系",CharsetUtil.UTF_8));
    }

    /**
     * 异常发生事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

编写了一个服务器端程序,配置了线程组,配置了自定义业务处理类,并绑定端口号进行了启动

代码语言:javascript
复制
package com.xiepanpan.netty.basic;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description:  netty网络通信 服务端
 */
public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        //1. 创建一个线程组:接收客户端连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        //2. 创建一个线程组:处理网络操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        //3. 创建服务器端启动助手来配置参数
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //4. 设置两个线程组
        serverBootstrap.group(bossGroup,workerGroup)
                //5. 使用NioServerSocketChannel作为服务器端通道的实现
                .channel(NioServerSocketChannel.class)
                //6. 设置线程队列中等待连接的个数
                .option(ChannelOption.SO_BACKLOG,128)
                //7. 保持活动连接状态
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                //8. 创建一个通道初始化对象
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                });
        System.out.println("server is ready...");
        //9. 绑定端口 bind方法是异步的 sync方法是同步阻塞的
        ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
        System.out.println("server is starting");
        //10. 关闭通道 关闭线程组
        channelFuture.channel().closeFuture().sync();
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();

    }
}

自定义了一个客户端业务处理类,继承 ChannelInboundHandlerAdapter ,并分别重写方法。

代码语言:javascript
复制
package com.xiepanpan.netty.basic;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description: 自定义客户单业务处理类
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     *  通道就绪事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client: "+ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好 熟人介绍的", CharsetUtil.UTF_8));
    }

    /**
     * 读取数据事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务器端发来的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }
}

上述代码编写了一个客户端程序,配置了线程组,配置了自定义的业务处理类,并启动连接了服务器端。

代码语言:javascript
复制
package com.xiepanpan.netty.basic;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description:  网络客户端
 */
public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        //1.创建一个线程组
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        //2. 创建客户端的启动助手 完成相关配置
        Bootstrap bootstrap = new Bootstrap();
        //3. 设置线程组
        bootstrap.group(eventLoopGroup)
        //4. 设置客户端通道的实现类
        .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    //5. 创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //6、往pipeline链中添加自定义的handler
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
        //7、 启动客户端去连接服务器端 connect方法是异步的  syn方法是同步阻塞的
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
        //8. 关闭连接
        channelFuture.channel().closeFuture().sync();
    }
}
image-20200808120939451
image-20200808120939451
image-20200808120954970
image-20200808120954970

netty实现网络多人聊天

通过继承SimpleChannelInboundHandler类自定义了一个服务器端业务处理类,并在该类中重写了四个方法,当通道就绪时,输出在线;当通道未就绪时,输出下线;当通道发来数据时,读取数据;当通道出现异常时,关闭通道。

代码语言:javascript
复制
package com.xiepanpan.netty.chat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.ArrayList;
import java.util.List;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description:  自定义聊天服务端处理类
 */
public class ChatServerHandler  extends SimpleChannelInboundHandler<String> {

    public static List<Channel> channelList = new ArrayList<>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel inChannel = ctx.channel();
        channelList.add(inChannel);
        System.out.println("[server]:"+inChannel.remoteAddress().toString().substring(1)+"上线");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelList.remove(channel);
        System.out.println("[server]"+channel.remoteAddress().toString().substring(1)+"下线");
    }

    /**
     * 读取数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

        Channel inChannel = ctx.channel();
        for (Channel channel: channelList) {
            if (channel!=inChannel) {
                channel.writeAndFlush("["+inChannel.remoteAddress().toString().substring(1)+"]"+"说:"+msg+"\n");
            }
        }
    }
}

通过 Netty 编写了一个服务器端程序,里面要特别注意的是:我们往 Pipeline链中添加了处理字符串的编码器和解码器,它们加入到 Pipeline 链中后会自动工作,使得我们在服务器端读写字符串数据时更加方便(不用人工处理 ByteBuf)。

代码语言:javascript
复制
package com.xiepanpan.netty.chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description: 聊天程序服务端
 */
public class ChatServer {

    //服务器端口号
    private int port;

    public ChatServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) {
        new ChatServer(9999).run();
    }

    private void run() {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new ChatServerHandler());
                        }
                    });
            System.out.println("netty chat server启动");
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

通过继承 SimpleChannelInboundHandler 自定义了一个客户端业务处理类,重写了一个方法用来读取服务器端发过来的数据。

代码语言:javascript
复制
package com.xiepanpan.netty.chat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description:  自定义一个客户端业务处理类
 */
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s.trim());
    }
}

通过 Netty 编写了一个客户端程序,里面要特别注意的是:我们往 Pipeline 链中添加了处理字符串的编码器和解码器,他们加入到 Pipeline 链中后会自动工作,使得我们在客户端读写字符串数据时更加方便(不用人工处理 ByteBuf)。

代码语言:javascript
复制
package com.xiepanpan.netty.chat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.sctp.nio.NioSctpChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description: 聊天程序客户端
 */
public class ChatClient {

    private final String host;
    private final  int port;

    public ChatClient(String host,int port) {
        this.host= host;
        this.port = port;
    }

    public static void main(String[] args) {
        new ChatClient("127.0.0.1",9999).run();
    }

    private void run() {

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new ChatClientHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            System.out.println(channel.localAddress().toString().substring(1));

            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg+"\r\n");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

}
image-20200808123120159
image-20200808123120159
image-20200808123140905
image-20200808123140905
image-20200808123158541
image-20200808123158541
image-20200808123221684
image-20200808123221684

编码与解码

我们在编写网络应用程序的时候需要注意 codec (编解码器),因为数据在网络中传输的都是二进制字节码数据,而我们拿到的目标数据往往不是字节码数据。因此在发送数据时就需要编码,接收数据时就需要解码。 codec 的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据。 其实 Java 的序列化技术就可以作为 codec 去使用,但是它的硬伤太多:

  1. 无法跨语言,这应该是 Java 序列化最致命的问题了。
  2. 序列化后的体积太大,是二进制编码的 5 倍多。
  3. 序列化性能太低。 由于 Java 序列化技术硬伤太多,因此 Netty 自身提供了一些 codec,如下所示:

Netty 提供的解码器:

  1. StringDecoder, 对字符串数据进行解码
  2. ObjectDecoder,对 Java 对象进行解码
  3. … … …

Netty 提供的编码器:

  1. StringEncoder,对字符串数据进行编码
  2. ObjectEncoder,对 Java 对象进行编码
  3. … … …

Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,但其内部使用的仍是 Java 序列化技术,所以我们不建议使用。因此对于 POJO 对象或各种业务对象要实现编码和解码,我们需要更高效更强的技术。

Google 的 的 Protobuf Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,特点如下:

  • 支持跨平台、多语言(支持目前绝大多数语言,例如 C++、C#、Java、python 等)
  • 高性能,高可靠性
  • 使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述, 然后通过 protoc.exe 编译器根据.proto 自动生成.java 文件

目前在使用 Netty 开发时,经常会结合 Protobuf 作为 codec (编解码器)去使用,具体用法如下所示。

Book.proto

代码语言:javascript
复制
syntax = "proto3";
    option java_outer_classname="BookMessage";
    message Book{
        int32 id=1;
        string name =2;
    }

proto3表示版本号 BookMessage是生成的java类名 Book是内部类的类名 真正的POJO int32 id=1;是设置类的属性 等号后是序号 不是属性值

通过 protoc.exe 根据描述文件生成 Java 类,具体操作如下所示:

image-20200808124700153
image-20200808124700153

把生成的 BookMessage.java 拷贝到自己的项目中打开

代码语言:javascript
复制
package com.xiepanpan.netty.codec;// Generated by the protocol buffer compiler.  DO NOT EDIT!
// source: Book.proto

public final class BookMessage {
  private BookMessage() {}
  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistryLite registry) {
  }

  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistry registry) {
    registerAllExtensions(
        (com.google.protobuf.ExtensionRegistryLite) registry);
  }
  public interface BookOrBuilder extends
      // @@protoc_insertion_point(interface_extends:Book)
      com.google.protobuf.MessageOrBuilder {

    /**
     * <code>int32 id = 1;</code>
     */
    int getId();

    /**
     * <code>string name = 2;</code>
     */
    String getName();
    /**
     * <code>string name = 2;</code>
     */
    com.google.protobuf.ByteString
        getNameBytes();
  }
  /**
   * Protobuf type {@code Book}
   */
  public  static final class Book extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:Book)
      BookOrBuilder {
  private static final long serialVersionUID = 0L;
    // Use Book.newBuilder() to construct.
    private Book(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
      super(builder);
    }
    private Book() {
      id_ = 0;
      name_ = "";
    }

    @Override
    public final com.google.protobuf.UnknownFieldSet
    getUnknownFields() {
      return this.unknownFields;
    }
    private Book(
        com.google.protobuf.CodedInputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      this();
      if (extensionRegistry == null) {
        throw new NullPointerException();
      }
      int mutable_bitField0_ = 0;
      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
          com.google.protobuf.UnknownFieldSet.newBuilder();
      try {
        boolean done = false;
        while (!done) {
          int tag = input.readTag();
          switch (tag) {
            case 0:
              done = true;
              break;
            case 8: {

              id_ = input.readInt32();
              break;
            }
            case 18: {
              String s = input.readStringRequireUtf8();

              name_ = s;
              break;
            }
            default: {
              if (!parseUnknownFieldProto3(
                  input, unknownFields, extensionRegistry, tag)) {
                done = true;
              }
              break;
            }
          }
        }
      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
        throw e.setUnfinishedMessage(this);
      } catch (java.io.IOException e) {
        throw new com.google.protobuf.InvalidProtocolBufferException(
            e).setUnfinishedMessage(this);
      } finally {
        this.unknownFields = unknownFields.build();
        makeExtensionsImmutable();
      }
    }
    public static final com.google.protobuf.Descriptors.Descriptor
        getDescriptor() {
      return BookMessage.internal_static_Book_descriptor;
    }

    @Override
    protected FieldAccessorTable
        internalGetFieldAccessorTable() {
      return BookMessage.internal_static_Book_fieldAccessorTable
          .ensureFieldAccessorsInitialized(
              Book.class, Builder.class);
    }

    public static final int ID_FIELD_NUMBER = 1;
    private int id_;
    /**
     * <code>int32 id = 1;</code>
     */
    public int getId() {
      return id_;
    }

    public static final int NAME_FIELD_NUMBER = 2;
    private volatile Object name_;
    /**
     * <code>string name = 2;</code>
     */
    public String getName() {
      Object ref = name_;
      if (ref instanceof String) {
        return (String) ref;
      } else {
        com.google.protobuf.ByteString bs = 
            (com.google.protobuf.ByteString) ref;
        String s = bs.toStringUtf8();
        name_ = s;
        return s;
      }
    }
    /**
     * <code>string name = 2;</code>
     */
    public com.google.protobuf.ByteString
        getNameBytes() {
      Object ref = name_;
      if (ref instanceof String) {
        com.google.protobuf.ByteString b = 
            com.google.protobuf.ByteString.copyFromUtf8(
                (String) ref);
        name_ = b;
        return b;
      } else {
        return (com.google.protobuf.ByteString) ref;
      }
    }

    private byte memoizedIsInitialized = -1;
    @Override
    public final boolean isInitialized() {
      byte isInitialized = memoizedIsInitialized;
      if (isInitialized == 1) return true;
      if (isInitialized == 0) return false;

      memoizedIsInitialized = 1;
      return true;
    }

    @Override
    public void writeTo(com.google.protobuf.CodedOutputStream output)
                        throws java.io.IOException {
      if (id_ != 0) {
        output.writeInt32(1, id_);
      }
      if (!getNameBytes().isEmpty()) {
        com.google.protobuf.GeneratedMessageV3.writeString(output, 2, name_);
      }
      unknownFields.writeTo(output);
    }

    @Override
    public int getSerializedSize() {
      int size = memoizedSize;
      if (size != -1) return size;

      size = 0;
      if (id_ != 0) {
        size += com.google.protobuf.CodedOutputStream
          .computeInt32Size(1, id_);
      }
      if (!getNameBytes().isEmpty()) {
        size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, name_);
      }
      size += unknownFields.getSerializedSize();
      memoizedSize = size;
      return size;
    }

    @Override
    public boolean equals(final Object obj) {
      if (obj == this) {
       return true;
      }
      if (!(obj instanceof Book)) {
        return super.equals(obj);
      }
      Book other = (Book) obj;

      boolean result = true;
      result = result && (getId()
          == other.getId());
      result = result && getName()
          .equals(other.getName());
      result = result && unknownFields.equals(other.unknownFields);
      return result;
    }

    @Override
    public int hashCode() {
      if (memoizedHashCode != 0) {
        return memoizedHashCode;
      }
      int hash = 41;
      hash = (19 * hash) + getDescriptor().hashCode();
      hash = (37 * hash) + ID_FIELD_NUMBER;
      hash = (53 * hash) + getId();
      hash = (37 * hash) + NAME_FIELD_NUMBER;
      hash = (53 * hash) + getName().hashCode();
      hash = (29 * hash) + unknownFields.hashCode();
      memoizedHashCode = hash;
      return hash;
    }

    public static Book parseFrom(
        java.nio.ByteBuffer data)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data);
    }
    public static Book parseFrom(
        java.nio.ByteBuffer data,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data, extensionRegistry);
    }
    public static Book parseFrom(
        com.google.protobuf.ByteString data)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data);
    }
    public static Book parseFrom(
        com.google.protobuf.ByteString data,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data, extensionRegistry);
    }
    public static Book parseFrom(byte[] data)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data);
    }
    public static Book parseFrom(
        byte[] data,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data, extensionRegistry);
    }
    public static Book parseFrom(java.io.InputStream input)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseWithIOException(PARSER, input);
    }
    public static Book parseFrom(
        java.io.InputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseWithIOException(PARSER, input, extensionRegistry);
    }
    public static Book parseDelimitedFrom(java.io.InputStream input)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseDelimitedWithIOException(PARSER, input);
    }
    public static Book parseDelimitedFrom(
        java.io.InputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
    }
    public static Book parseFrom(
        com.google.protobuf.CodedInputStream input)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseWithIOException(PARSER, input);
    }
    public static Book parseFrom(
        com.google.protobuf.CodedInputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return com.google.protobuf.GeneratedMessageV3
          .parseWithIOException(PARSER, input, extensionRegistry);
    }

    @Override
    public Builder newBuilderForType() { return newBuilder(); }
    public static Builder newBuilder() {
      return DEFAULT_INSTANCE.toBuilder();
    }
    public static Builder newBuilder(Book prototype) {
      return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
    }
    @Override
    public Builder toBuilder() {
      return this == DEFAULT_INSTANCE
          ? new Builder() : new Builder().mergeFrom(this);
    }

    @Override
    protected Builder newBuilderForType(
        BuilderParent parent) {
      Builder builder = new Builder(parent);
      return builder;
    }
    /**
     * Protobuf type {@code Book}
     */
    public static final class Builder extends
        com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
        // @@protoc_insertion_point(builder_implements:Book)
        BookOrBuilder {
      public static final com.google.protobuf.Descriptors.Descriptor
          getDescriptor() {
        return BookMessage.internal_static_Book_descriptor;
      }

      @Override
      protected FieldAccessorTable
          internalGetFieldAccessorTable() {
        return BookMessage.internal_static_Book_fieldAccessorTable
            .ensureFieldAccessorsInitialized(
                Book.class, Builder.class);
      }

      // Construct using BookMessage.Book.newBuilder()
      private Builder() {
        maybeForceBuilderInitialization();
      }

      private Builder(
          BuilderParent parent) {
        super(parent);
        maybeForceBuilderInitialization();
      }
      private void maybeForceBuilderInitialization() {
        if (com.google.protobuf.GeneratedMessageV3
                .alwaysUseFieldBuilders) {
        }
      }
      @Override
      public Builder clear() {
        super.clear();
        id_ = 0;

        name_ = "";

        return this;
      }

      @Override
      public com.google.protobuf.Descriptors.Descriptor
          getDescriptorForType() {
        return BookMessage.internal_static_Book_descriptor;
      }

      @Override
      public Book getDefaultInstanceForType() {
        return Book.getDefaultInstance();
      }

      @Override
      public Book build() {
        Book result = buildPartial();
        if (!result.isInitialized()) {
          throw newUninitializedMessageException(result);
        }
        return result;
      }

      @Override
      public Book buildPartial() {
        Book result = new Book(this);
        result.id_ = id_;
        result.name_ = name_;
        onBuilt();
        return result;
      }

      @Override
      public Builder clone() {
        return (Builder) super.clone();
      }
      @Override
      public Builder setField(
          com.google.protobuf.Descriptors.FieldDescriptor field,
          Object value) {
        return (Builder) super.setField(field, value);
      }
      @Override
      public Builder clearField(
          com.google.protobuf.Descriptors.FieldDescriptor field) {
        return (Builder) super.clearField(field);
      }
      @Override
      public Builder clearOneof(
          com.google.protobuf.Descriptors.OneofDescriptor oneof) {
        return (Builder) super.clearOneof(oneof);
      }
      @Override
      public Builder setRepeatedField(
          com.google.protobuf.Descriptors.FieldDescriptor field,
          int index, Object value) {
        return (Builder) super.setRepeatedField(field, index, value);
      }
      @Override
      public Builder addRepeatedField(
          com.google.protobuf.Descriptors.FieldDescriptor field,
          Object value) {
        return (Builder) super.addRepeatedField(field, value);
      }
      @Override
      public Builder mergeFrom(com.google.protobuf.Message other) {
        if (other instanceof Book) {
          return mergeFrom((Book)other);
        } else {
          super.mergeFrom(other);
          return this;
        }
      }

      public Builder mergeFrom(Book other) {
        if (other == Book.getDefaultInstance()) return this;
        if (other.getId() != 0) {
          setId(other.getId());
        }
        if (!other.getName().isEmpty()) {
          name_ = other.name_;
          onChanged();
        }
        this.mergeUnknownFields(other.unknownFields);
        onChanged();
        return this;
      }

      @Override
      public final boolean isInitialized() {
        return true;
      }

      @Override
      public Builder mergeFrom(
          com.google.protobuf.CodedInputStream input,
          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
          throws java.io.IOException {
        Book parsedMessage = null;
        try {
          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
          parsedMessage = (Book) e.getUnfinishedMessage();
          throw e.unwrapIOException();
        } finally {
          if (parsedMessage != null) {
            mergeFrom(parsedMessage);
          }
        }
        return this;
      }

      private int id_ ;
      /**
       * <code>int32 id = 1;</code>
       */
      public int getId() {
        return id_;
      }
      /**
       * <code>int32 id = 1;</code>
       */
      public Builder setId(int value) {
        
        id_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>int32 id = 1;</code>
       */
      public Builder clearId() {
        
        id_ = 0;
        onChanged();
        return this;
      }

      private Object name_ = "";
      /**
       * <code>string name = 2;</code>
       */
      public String getName() {
        Object ref = name_;
        if (!(ref instanceof String)) {
          com.google.protobuf.ByteString bs =
              (com.google.protobuf.ByteString) ref;
          String s = bs.toStringUtf8();
          name_ = s;
          return s;
        } else {
          return (String) ref;
        }
      }
      /**
       * <code>string name = 2;</code>
       */
      public com.google.protobuf.ByteString
          getNameBytes() {
        Object ref = name_;
        if (ref instanceof String) {
          com.google.protobuf.ByteString b = 
              com.google.protobuf.ByteString.copyFromUtf8(
                  (String) ref);
          name_ = b;
          return b;
        } else {
          return (com.google.protobuf.ByteString) ref;
        }
      }
      /**
       * <code>string name = 2;</code>
       */
      public Builder setName(
          String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  
        name_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>string name = 2;</code>
       */
      public Builder clearName() {
        
        name_ = getDefaultInstance().getName();
        onChanged();
        return this;
      }
      /**
       * <code>string name = 2;</code>
       */
      public Builder setNameBytes(
          com.google.protobuf.ByteString value) {
        if (value == null) {
    throw new NullPointerException();
  }
  checkByteStringIsUtf8(value);
        
        name_ = value;
        onChanged();
        return this;
      }
      @Override
      public final Builder setUnknownFields(
          final com.google.protobuf.UnknownFieldSet unknownFields) {
        return super.setUnknownFieldsProto3(unknownFields);
      }

      @Override
      public final Builder mergeUnknownFields(
          final com.google.protobuf.UnknownFieldSet unknownFields) {
        return super.mergeUnknownFields(unknownFields);
      }


      // @@protoc_insertion_point(builder_scope:Book)
    }

    // @@protoc_insertion_point(class_scope:Book)
    private static final Book DEFAULT_INSTANCE;
    static {
      DEFAULT_INSTANCE = new Book();
    }

    public static Book getDefaultInstance() {
      return DEFAULT_INSTANCE;
    }

    private static final com.google.protobuf.Parser<Book>
        PARSER = new com.google.protobuf.AbstractParser<Book>() {
      @Override
      public Book parsePartialFrom(
          com.google.protobuf.CodedInputStream input,
          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
          throws com.google.protobuf.InvalidProtocolBufferException {
        return new Book(input, extensionRegistry);
      }
    };

    public static com.google.protobuf.Parser<Book> parser() {
      return PARSER;
    }

    @Override
    public com.google.protobuf.Parser<Book> getParserForType() {
      return PARSER;
    }

    @Override
    public Book getDefaultInstanceForType() {
      return DEFAULT_INSTANCE;
    }

  }

  private static final com.google.protobuf.Descriptors.Descriptor
    internal_static_Book_descriptor;
  private static final 
    com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
      internal_static_Book_fieldAccessorTable;

  public static com.google.protobuf.Descriptors.FileDescriptor
      getDescriptor() {
    return descriptor;
  }
  private static  com.google.protobuf.Descriptors.FileDescriptor
      descriptor;
  static {
    String[] descriptorData = {
      "\n\nBook.proto\" \n\004Book\022\n\n\002id\030\001 \001(\005\022\014\n\004name" +
      "\030\002 \001(\tB\rB\013BookMessageb\006proto3"
    };
    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
        new com.google.protobuf.Descriptors.FileDescriptor.    InternalDescriptorAssigner() {
          public com.google.protobuf.ExtensionRegistry assignDescriptors(
              com.google.protobuf.Descriptors.FileDescriptor root) {
            descriptor = root;
            return null;
          }
        };
    com.google.protobuf.Descriptors.FileDescriptor
      .internalBuildGeneratedFileFrom(descriptorData,
        new com.google.protobuf.Descriptors.FileDescriptor[] {
        }, assigner);
    internal_static_Book_descriptor =
      getDescriptor().getMessageTypes().get(0);
    internal_static_Book_fieldAccessorTable = new
      com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
        internal_static_Book_descriptor,
        new String[] { "Id", "Name", });
  }

  // @@protoc_insertion_point(outer_class_scope)
}

这个类我们不要编辑它,直接拿着用即可,该类内部有一个内部类,这个内部类才是真正的 POJO,一定要注意。

向 Pipeline 链中添加 ProtobufEncoder 编码器对象

代码语言:javascript
复制
package com.xiepanpan.netty.codec;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description:
 */
public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast("encoder",new ProtobufEncoder());
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
        channelFuture.channel().closeFuture().sync();
    }
}

往服务器端发送图书(POJO)时就可以使用生成的 BookMessage 类搞定,非常方便。

代码语言:javascript
复制
package com.xiepanpan.netty.codec;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description:
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 通道就绪事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        BookMessage.Book book = BookMessage.Book.newBuilder().setId(1).setName("恋爱必修课").build();
        ctx.writeAndFlush(book);
    }
}

在编写服务器端程序时,要向 Pipeline 链中添加 ProtobufDecoder 解码器对象。

代码语言:javascript
复制
package com.xiepanpan.netty.codec;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;

import java.util.concurrent.ExecutorService;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description:
 */
public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(workerGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,128)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast("decoder",new ProtobufDecoder(BookMessage.Book.getDefaultInstance()));
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                });
        ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
        System.out.println("server is starting...");
        channelFuture.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

在服务器端接收数据时,直接就可以把数据转换成 POJO 使用,非常方便

代码语言:javascript
复制
package com.xiepanpan.netty.codec;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description:
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        BookMessage.Book book = (BookMessage.Book) msg;

        System.out.println("客户端发来数据:"+book.getName());
    }
}
image-20200808222743997
image-20200808222743997

五 自定义rpc

RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络实现的技术。常见的 RPC 框架有: 源自阿里的 Dubbo,Spring 旗下的 Spring Cloud,Google 出品的 grpc 等等。

image-20200808221136271
image-20200808221136271
  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。接下来我们基于 Netty 自己动手搞定一个 RPC。

结构设计
image-20200808221335207
image-20200808221335207
  • Client(服务的调用方): 两个接口 + 一个包含 main 方法的测试类
  • Client Stub: 一个客户端代理类 + 一个客户端业务处理类
  • Server(服务的提供方): 两个接口 + 两个实现类
  • Server Stub: 一个网络处理服务器 + 一个服务器业务处理类

注意:服务调用方的接口必须跟服务提供方的接口保持一致(包路径可以不一致) 最终要实现的目标是:在 TestNettyRPC 中远程调用 HelloRPCImpl 或 HelloNettyImpl 中的方法

Server(服务的提供方)

作为服务的提供方,我们分别编写了两个接口和两个实现类,供消费方远程调用。

代码语言:javascript
复制
package com.xiepanpan.netty.rpc.server;

public interface HelloNetty {
    String hello();
}
代码语言:javascript
复制
package com.xiepanpan.netty.rpc.server;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description:
 */
public class HelloNettyImpl implements HelloNetty {
    @Override
    public String hello() {
        return "hello netty";
    }
}
代码语言:javascript
复制
package com.xiepanpan.netty.rpc.server;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description:
 */
public interface HelloRpc {

    String hello(String name);
}
代码语言:javascript
复制
package com.xiepanpan.netty.rpc.server;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description:
 */
public class HelloRpcImpl implements HelloRpc {
    @Override
    public String hello(String name) {
        return "hello,"+name;
    }
}
Server Stub 部分

作为实体类用来封装消费方发起远程调用时传给服务方的数据。

代码语言:javascript
复制
package com.xiepanpan.netty.rpc.serverStub;

import java.io.Serializable;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7
 * @Description: 用来封装消费方发起远程调用时传给服务方的数据
 */
public class ClassInfo implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 类名
     */
    private String className;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 参数类型
     */
    private Class<?>[] types;
    /**
     * 参数列表
     */
    private Object[] objects;

    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getTypes() {
        return types;
    }

    public void setTypes(Class<?>[] types) {
        this.types = types;
    }

    public Object[] getObjects() {
        return objects;
    }

    public void setObjects(Object[] objects) {
        this.objects = objects;
    }
}

业务处理类,读取消费方发来的数据,并根据得到的数据进行本地调用,然后把结果返回给消费方。

代码语言:javascript
复制
package com.xiepanpan.netty.rpc.serverStub;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.reflections.Reflections;

import java.lang.reflect.Method;
import java.util.Set;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description: 服务端业务处理类
 */
public class InvokeHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ClassInfo classInfo = (ClassInfo) msg;
        Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
        Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
        //通过反射调用实现类的方法
        Object result = method.invoke(clazz, classInfo.getObjects());
        ctx.writeAndFlush(result);
    }

    /**
     * 得到某接口下某个实现类的名字
     * @param classInfo
     * @return
     */
    private String getImplClassName(ClassInfo classInfo) throws ClassNotFoundException {
        String interfacePath ="com.xiepanpan.netty.rpc.server";
        int lastDot = classInfo.getClassName().lastIndexOf(".");
        String interfaceName = classInfo.getClassName().substring(lastDot);
        Class<?> superClass = Class.forName(interfacePath + interfaceName);
        Reflections reflections = new Reflections(interfacePath);
        //得到某接口下的所有实现类
        Set<Class<?>> implClassSet = (Set<Class<?>>) reflections.getSubTypesOf(superClass);
        if (implClassSet.size()==0) {
            System.out.println("未找到实现类");
            return null;
        }else if (implClassSet.size()>1) {
            System.out.println("找到多个实现类,未明确使用哪一个");
            return null;
        }else {
            Class[] classes = implClassSet.toArray(new Class[0]);
            return classes[0].getName();
        }
    }
}

用 Netty 实现的网络服务器,采用 Netty 自带的 ObjectEncoder 和 ObjectDecoder作为编解码器(为了降低复杂度,这里并没有使用第三方的编解码器),当然实际开发时也可以采用 JSON 或 XML。

代码语言:javascript
复制
package com.xiepanpan.netty.rpc.serverStub;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description: 网络处理服务器
 */
public class NettyRpcServer {

    private int port;

    public NettyRpcServer(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(workerGroup, bossGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .localAddress(port).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //编码器
                    pipeline.addLast("encoder",new ObjectEncoder());
                    pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                    pipeline.addLast(new InvokeHandler());
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            System.out.println("server is ready");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyRpcServer(9999).start();
    }
}
Client Stub 部分

客户端的业务处理类读取远程调用返回的数据

代码语言:javascript
复制
package com.xiepanpan.netty.rpc.clientStub;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description: 客户端业务处理类
 */
public class ResultHandler extends ChannelInboundHandlerAdapter {

    private Object response;
    public Object getResponse(){
        return response;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
        ctx.close();
    }
}

用 Netty 实现的客户端代理类,采用 Netty 自带的 ObjectEncoder 和 ObjectDecoder作为编解码器(为了降低复杂度,这里并没有使用第三方的编解码器),当然实际开发时也可以采用 JSON 或 XML。

代码语言:javascript
复制
package com.xiepanpan.netty.rpc.clientStub;

import com.xiepanpan.netty.rpc.server.HelloNetty;
import com.xiepanpan.netty.rpc.serverStub.ClassInfo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description:
 */
public class NettyRpcProxy {
    public static Object create(final Class target) {
        return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                ClassInfo classInfo = new ClassInfo();
                classInfo.setClassName(target.getName());
                classInfo.setMethodName(method.getName());
                classInfo.setObjects(args);
                classInfo.setTypes(method.getParameterTypes());

                //开始用netty发送数据
                EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

                final ResultHandler resultHandler = new ResultHandler();
                try {
                    Bootstrap bootstrap = new Bootstrap();
                    bootstrap.group(eventLoopGroup)
                            .channel(NioSocketChannel.class)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel socketChannel) throws Exception {
                                    ChannelPipeline pipeline = socketChannel.pipeline();
                                    //编码器
                                    pipeline.addLast("encoder",new ObjectEncoder());
                                    //解码器 构造方法第一个参数设置二进制数据的最大字节数 第二个参数设置具体使用哪个类解析器
                                    pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                    //客户端业务处理类
                                    pipeline.addLast("handler",resultHandler);

                                }
                            });
                    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
                    channelFuture.channel().writeAndFlush(classInfo).sync();
                    channelFuture.channel().closeFuture().sync();
                } finally {
                    eventLoopGroup.shutdownGracefully();
                }
                return resultHandler.getResponse();
            }
        });
    }
}
Client( 服务的调用方- 消费方)

消费方不需要知道底层的网络实现细节,就像调用本地方法一样成功发起了两次远程调用。

代码语言:javascript
复制
package com.xiepanpan.netty.rpc.client;

import com.xiepanpan.netty.rpc.clientStub.NettyRpcProxy;
import com.xiepanpan.netty.rpc.server.HelloNetty;
import com.xiepanpan.netty.rpc.server.HelloRpc;

/**
 * @author: xiepanpan
 * @Date: 2020/8/7 0007
 * @Description: 服务调用方
 */
public class TestNettyRpc {

    public static void main(String[] args) {
        HelloNetty helloNetty = (HelloNetty) NettyRpcProxy.create(HelloNetty.class);
        System.out.println(helloNetty.hello());

        HelloRpc helloRpc = (HelloRpc) NettyRpcProxy.create(HelloRpc.class);
        System.out.println(helloRpc.hello("RPC"));
    }
}
image-20200808224757297
image-20200808224757297
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-08-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 四 netty
    • Netty 整体设计
      • 线程模型
      • 异步模型
    • 核心API
      • ChannelHandler 及其实现类
      • Pipeline 和 和 ChannelPipeline
      • ChannelHandlerContext
      • ChannelOption
      • ChannelFuture
      • EventLoopGroup 和其实现类 NioEventLoopGroup
      • ServerBootstrap 和 和 Bootstrap
      • Unpooled 类
    • netty入门demo
      • netty实现网络多人聊天
        • 编码与解码
          • 结构设计
      • 五 自定义rpc
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档