史诗级最强教科书式“NIO与Netty编程”

史诗级最强教科书式“NIO与Netty编程”

1.1 概述

java.nio全称java non-blocking IO,是指JDK1.4开始提供的新API。从JDK1.4开始,Java提供了一系列改进的输入/输出的新特性,也被称为NIO(既New IO),新增了许多用于处理输入输出的类,这些类都被放在java.nio包及子包下,并且对原java.io包中的很多类进行改写,新增类满足NIO的功能。 NIO和BIO有着相同的目的和作用,但是它们的实现方式完全不同,BIO以流的方式处理数据,而NIO以块的方式处理数据,块I/O的效率比流I/O高很多。另外,NIO是非阻塞式的,这一点跟BIO也很不相同,使用它可以提供非阻塞式的高伸缩性网络。 NIO主要有三大核心部分 :Channel(通道),Buffer(缓冲区),Selector(选择器)。传统的BIO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如 :连接打开,数据到达)。因此使用单个线程就可以监听多个数据管道。

1.2 文件IO

1.2.1 概述和核心API

缓冲区(Buffer):实际上是一个容器,是一个特殊的数组,缓冲区对象内置流一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer,如下图所示 :

在NIO中,Buffer是一个顶层父类,它是一个抽象类,常用的Buffer子类有: ByteBuffer,存储字节数据到缓冲区 ShortBuffer,存储字符串数据到缓冲区 CharBuffer,存储字符数据到缓冲区 IntBuffer,存储整数数据到缓冲区 LongBuffer,存储长整型数据到缓冲区 DoubleBuffer,存储小数到缓冲区 FloatBuffer,存储小数到缓冲区 对于Java中的基本数据类型,都有一个具体Buffer类型与之相对应,最常用的自然是ByteBuffer类(二进制数据),该类的主要方法如下所示 : ByteBuffer类(二进制数据),该类的主要方法如下所示 : public abstract ByteBuffer put(byte[] b);存储字节数据到缓冲区 public abstract byte[] get();从缓冲区获得字节数据 public final byte[] array();把缓冲区数据转换成字节数组 public static ByteBuffer allocate(int capacity);设置缓冲区的初始容量 public static ByteBuffer wrap(byte[] array);把一个现成的数组放到缓冲区中使用 public final Buffer flip();翻转缓冲区,重置位置到初始位置 管道(Channel) :类似于BIO中的stream,例如FileInputStream对象,用来建立到目标(文件,网络套接字,硬件设备等)的一个连接,但是需要注意 :BIO中的stream是单向的,例如FileInputStream对象只能进行读取数据的操作,而NIO中的通道(Channel)是双向的,既可以用来进行读操作,也可以用来进行写操作。常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel。FileChannel用于文件的数据读写,DatagramChannel、ServerSocketChannel和SocketChannel。FileChannel用于文件的数据读写,DatagramChannel用于UDP的数据读写,ServerSocketChannel和SocketChannel用于TCP的数据读写。

FileChannel类,该类主要用来对本地文件进行IO操作,主要方法如下所示 : public int read(ByteBuffer dst),读取数据并放到缓冲区中 public int write(ByteBuffer src) , 把缓冲区的数据写到通道中 public long transferFrom(ReadableByteChannel src,long position,long count),从目标通道中复制数据 public long transferTo(long position,long count,WritableByteChannel target),把数据从当前通道复制给目标通道

1.2.2 案例

1.往本地文件中写数据

@Test
	public void contextLoads() throws Exception {
		String str = "hello,nio,我是谷";
		// 创建输出流
		FileOutputStream fileOutputStream = new FileOutputStream("basic.txt");
		// 从流中得到一个通道
		FileChannel fileChannel = fileOutputStream.getChannel();
		// 提供一个缓冲区
		ByteBuffer allocate = ByteBuffer.allocate(1024);
		// 往缓冲区中存入数据
		allocate.put(str.getBytes());
		// 当数据写入到缓冲区中时,指针指向数据最后一行,那么缓冲区写入通道中输出时,是从最后一行数据开始写入,
		// 这样就会导致写入1024的剩余没有数据的空缓冲区。所以需要翻转缓冲区,重置位置到初始位置
		allocate.flip();
		// 把缓冲区写到通道中,通道负责把数据写入到文件中
		fileChannel.write(allocate);
		// 关闭输出流,因为通道是输出流创建的,所以会一起关闭
		fileOutputStream.close();
	}

NIO中通道是从输出流对象里通过getChannel方法获取到的,该通道是双向的,既可以读,又可以写。在往通道里写数据之前,必须通过put方法把数据存到ByteBuffer中,然后通过通道write可以写数据。在write之前,需要调用flip方法翻转缓冲区,把内部定位到初始位置,这样在接下来写数据时才能把所有数据写到通道里。运行效果如下图 :

@Test  // 从本地文件中读取数据
public void test2() throws Exception {
	File file = new File("basic.text");
	// 1. 创建输入流
	FileInputStream fis = new FileInputStream(file);
	// 2. 得到一个通道
	FileChannel fc = fis.getChannel();
	// 3. 准备一个缓冲区
	ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
	// 4. 从通道里读取数据并存到缓冲区中
	fc.read(buffer);
	System.out.println(new String(buffer.array));
	// 5.关闭
	fis.close();
}
@Test // 使用NIO实现文件复制
public void test3() throws Exception {
	//1. 创建两个流
	FileInputStream fis = new FileInputStream("basic.text");
	FileOutputStream fos = new FileOutputStream("c:\\test\\basic.text");
	// 2. 得到两个通道
	FileChannel sourceFc = fis.getChannel();
	FileChannel destFc = fos.getChannel();
	//3. 复制
	destFc.transferFrom(sourceFc,0,sourceFc.size());
	//4.关闭
	fis.close();
	fos.close();
}

1.3 网络IO

1.3.1 概述和核心API

前面在进行文件IO时用到的FileChannel并不支持非阻塞操作,学习NIO主要就是进行网络IO,Java NIO中的网络通道是非阻塞IO的实现,基于事件驱动,非常适用于服务器需要维持大量连接,但是数据交换量不大的情况,例如一些即时通讯的服务等待。 在Java中编写Socket服务器,通常有以下几种模式 : 一个客户端连接用一个线程,优点 :程序编写简单;缺点 :如果连接非常多,分配的线程也会非常多,服务器可能会因为资源耗尽而崩溃。 把每一个客户端连接交给一个拥有固定数量线程的连接池,优点 : 程序编写相对简单,可以处理大量的连接。缺点 :线程的开销非常大,连接如果非常多,排到现象会比较严重。 使用Java的NIO,用非阻塞的IO方式处理。这种模式可以用一个线程,处理大量的客户端连接。 1。Selector,选择器,能够检测多个注册的通道上是否有事件发生,便获取事件然后针对每个事件进行相应的响应处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接。这样使得只用在连接真正有读写事件发生时,才会调用函数来进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,并且避免了多线程之间的上下文切换导致的开销。 该类的常用方法如下所示 : public static Selector open(),得到一个选择器对象 public int select(long timeout),监控所有注册的channel,当其中有注册的IO操作可以进行时,将对应的SelectionKey加入到内部集团中并返回,参数用来设置超时时间 public Set selectedKeys(),从内部集合中得到所有的SelectionKey 2. SelectionKey,代表了Selector和serverSocketChannel的注册关系,一共四种 : int OP_ACCEPT :有新的网络连接可以accept,值为16 int OP_CONNECT : 代表连接已经建立,值为8 int OP_READ和int OP_WRITE : 代表了读、写操作,值为1和4 该类的常用方法如下所示 : public abstract Selector selector(),得到与之关联的Selector对象 public abstract SelectorChannel channel(),得到与之关联的通道 public final Object attachment(),得到与之关联的共享数据 public abstract SelectionKey interestOps(int ops),设置或改变监听事件 public final boolean isAcceptable(),是否可以accept public final boolean isReadable(),是否可以读 public final boolean isWritable(),是否可以写 3. ServerSocketChannel,用来在服务器端监听新的客户端Socket连接,常用方法如下所示 : public static ServerSocketChannel open(),得到一个ServerSocketChannel通道 public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号 public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值false表示采用非阻塞模式 public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象 public final SelectionKey register(Selector sel,int ops),注册一个选择器并设置监听事件 4. SocketChannel,网络IO通道,具体负责进行读写操作。NIO总是把缓冲区的数据写入通道,或者把通道里的数据读出到缓冲区(buffer)。常用方法如下所示 : public static SocketChannel open(),得到一个SocketChannel通道 public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值false表示采用非阻塞模式 public boolean connect(SocketAddress remote),连接服务器 public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成连接操作 public int write(ByteBuffer src),往通道里写数据 public int read(ByteBuffer dst),从通道里读数据 public final SelectionKey register(Selector sel,int ops,Object att),注册一个选择器并设置监听事件,最后一个参数可以设置共享数据 public final void close(),关闭通道

3.4 AIO编程

JDK1.7引入了Asynchronous I/O,既AIO。再进行I/O编程中,常用到两种模式 :Reactor和Proactor。Java的NIO就是Reactor,当有事件触发时,服务器端得到通知,进行相应的处理。 AIO即NIO2.0,叫做异步不阻塞的IO。AIO引入异步通道的概念,采用 了Proactor模式,简化了程序编写,一个有效的请求才启动一个线程,它的特点是先有操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间长的应用。

3.5 IO对比总结

IO的方式通常分为几种,同步阻塞的BIO、同步非阻塞的NIO、异步非阻塞的AIO。 BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。 NIO方式适用于连接数据多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。 AIO方式适用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

4.1 概述

Netty是由JBOSS提供的一个Java开源框架。Netty提供异步的、基于事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络IO程序。 Netty是一个基于NIO的网络编程框架,使用Netty可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了NIO的开发过程。作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的Elasticsearch、Dubbo框架内部都采用了Netty。

4.2 Netty整体设计

4.2.1 线程模型

1.单线程模型

服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写等),编码简单,清洗明了,但是如果客户端连接数量较多,将无法支撑,咋们前面的NIO案例就属于这种模型。 2. 线程池模型 服务器端采用一个线程专门处理客户端连接请求,采用一个线程组负责IO操作。在绝大数场景下,该模型都能满足使用。

3.Netty模型

Netty抽象出两组线程池,BossGroup专门负责接收客户端连接,WorkderGroup专门负责网络读写操作。NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通道。NioEventLoop内部采用串行化设计,从消息的读取-》解码-》处理-》编码-》发送,始终由IO线程NioEventLoop负责。 一个NioEventLoopGroup下包含多个NioEventLoop 每个NioEventLoop中包含有一个Selector,一个taskQueue 每个NioChannel只会绑定在唯一的NioEventLoop上 每个NioChannel都绑定有一个自己的ChannelPipeline

4.2.2 异步模型

FUTURE、CALLBACK和HANDLER Netty的异步模型是建立在future和callback的之上的。Future的核心思想是 :假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不适合。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程。 在使用Netty进行编程时,拦截操作和转换出入站数据只需要提供callback或利用future即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码。Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来。

4.3 核心API

  • ChannelHandler及其实现类 ChannelHandler接口定义了许多事件处理的方法,我们可以通过重写这些方法区实现具体的业务逻辑。API关系如下图所示:

自定义一个Handler类去继承ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,一般都需要重写以下方法: public void channelActive(ChannelHandlerContext ctx),通道就绪事件 public void channelRead(ChannelHandlerContext ctx,Object msg),通道读取数据事件 public void channelReadComplete(ChannelHandlerContext ctx),数据读取完毕事件 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause),通道发生异常事件

  • Pipeline和ChannelPipline ChannelPipeline是一个Handler的集合,它负责处理和拦截inbound或者outbound的事件和操作,相当于一个贯穿Netty的链。

ChannelPipeline addFirst(ChannelHandler…handlers),把一个业务处理类(handler)添加到链中的第一个位置 ChannelPipeline addLast(ChannelHandler…handlers),把一个业务处理类(handler)添加到链中的最后一个位置

  • ChannelHandlerContext 这是事件处理器上下文对象,Pipeline链中的实际处理节点。每个处理节点ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和Channel的信息,方便对ChannelHandler进行调用。 常用方法如下所示 : ChannelFuture close(),关闭通道 ChannelOutboundInvoker flush(),刷新 ChannelFuture writeAndFlush(Object msg),将数据写到ChannelPipeline中当前ChannelHandler的下一个ChanelHandler开始处理(出站)
  • ChannelOption Netty在创建Channel实例后,一般都需要设置ChannelOption参数。ChannelOption是Socket的标准参数,而非Netty独创的。常用的参数配置有 :
  1. ChannelOption.SO_BACKLOG 对应TCP/IP协议listen函数中的backlog参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列d大小。
  2. ChannelOption.SO_KEEPALIVE 一直保持连接活动状态。
  • ChannelFuture 表示Channel中异步I/O操作的结果,在Netty中所有的I/O操作都是异步的,I/O的调用会直接返回,调用者并不能立刻获得结果,但是可以通过ChannelFuture来获取I/O操作的处理状态。常用方法如下所示 : Channel channel(),返回当前正在进行IO操作的通道 ChannelFuture sync(),等待异步操作执行完毕
  • EventLoopGroup和其实现类NioEventLoopGroup EventLoopGroup是一组EventLoop的抽象,Netty为了更好的利用多核CPU资源,一般会有多个EventLoop同时工作,每个EventLoop维护着一个Selector实例。 EventLoopGroup提供next接口,可以从组里面按照一定规则获取其中一个EventLoop来处理任务。在Netty服务器端编程中,我们一般都需要提供两个EventLoopGroup,例如:BossEventLoopGroup和WorkderEventLoopGroup。 通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程。BossEventLoop负责接收客户端的连接并将SocketChannel交给WorkerEventLoopGroup来进行IO处理,如下图所示 :

BossEventLoopGroup通常是一个单线程的EventLoop,EventLoop维护着一个注册了ServerSocketChannel的Selector实例,BossEventLoop不断轮询Selector将连接事件分离出来,通常是OP_ACCEPT事件,然后将接收到的SocketChannel交给WorkderEventLoopGroup,WorkderEventLoopGroup会由next选择其中一个EventLoopGroup来将这个SocketChannel注册到其维护的Selector并对其后续的IO事件进行处理。 常用方法如下所示 : public NioEventLoopGroup(),构造方法 public Future<?> shutdownGracefully(),断开连接,关闭线程

  • ServerBootstrap和Bootstrap ServerBootStrap是Netty中的服务端端启动助手,通过它可以完成服务器端的各种配置;Bootstrap是Netty中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下所示 : public ServerBootstrap group(EventLoopGroup parentGroup,EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop public B group(EventLoopGroup group),该方法用于客户端,用来设置一个EventLoop public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现 public B option(ChannelOption option,T value),用来给ServerChannel添加配置 public ServerBootstrap childOption(ChannelOption childOption,T value),用来给接收到的通道添加配置 public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) public ChannelFuture bind(int inetPort),该方法用于服务器端,用来设置占用的端口号 public ChannelFuture connect(String inetHost,int inetPort),该方法用于客户端,用来连接服务器端
  • Unpooled类 这是Netty提供的一个专门用来操作缓冲区的工具类,常用方法如下所示 : public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据和字符编码返回一个ByteBuf对象(类似于Nio中的ByteBuffer对象)

入门案例 :

		<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>
package com.example.testdemo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        // 1. 创建一个线程组,接收客户端连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        // 2. 创建一个线程组,处理网络操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 3. 创建服务器端启动助手来配置参数
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 4. 设置两个线程组
        serverBootstrap.group(bossGroup, workerGroup)
                // 5。 使用NioServerSocketChannel作为服务器端通道的实现
                .channel(NioServerSocketChannel.class)
                // 6. 设置线程队列中等待连接的个数
                .option(ChannelOption.SO_BACKLOG, 128)
                // 7. 保持活跃连接状态
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 8。 创建一个通道初始化对象
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    // 9。往pipline链中添加自定义的handler类
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        channel.pipeline().addLast(new NettyServerHandler());
                    }
                });
        System.out.println("...Server ready....");
        // 10. 绑定端口 ,bind方法是异步的,sync同步阻塞
        ChannelFuture sync = serverBootstrap.bind(9999).sync();
        System.out.println("...server start");
        // 11。 关闭通道,关闭线程组
        sync.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}
package com.example.testdemo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server : " + ctx);
        ByteBuf byteBuf = (ByteBuf)msg;
        System.out.println("客户端发来的消息 :" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("就是没钱", CharsetUtil.UTF_8));
    }
}
package com.example.testdemo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        // 1. 创建一个线程组
        EventLoopGroup group = new NioEventLoopGroup();
        // 2。创建客户端的启动助手,完成香港配置
        Bootstrap bootstrap = new Bootstrap();
        // 3.设置线程组
        bootstrap.group(group)
                // 4. 设置客户端通道的实现类
                .channel(NioSocketChannel.class)
                // 5. 创建一个通道初始化对象
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 6. 往pipline链中添加自定义handler
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
        // 7. 启动客户端去连接服务器 异步非阻塞,connect是异步的,它会立马返回一个future对象,sync是同步阻塞的用于等待主线程
        System.out.println("...Client is ready ...");
        ChannelFuture sync = bootstrap.connect("127.0.0.1", 9999).sync();
        // 8. 关闭连接 异步非阻塞
        sync.channel().closeFuture().sync();
    }
}
package com.example.testdemo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 客户端业务处理类
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf)msg;
        System.out.println("服务器端发来的消息 : " + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client : " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("老板,还钱吧", CharsetUtil.UTF_8));
    }
}

聊天案例

package com.example.testdemo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 聊天程序服务器端
 */
public class ChatServer {

    /**
     * 服务器端端口号
     */
    private int port;

    public ChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    // 往pipeline链中添加一个解码器
                                    .addLast("decoder", new StringDecoder())
                                    // 往pipeline链中添加一个编码器
                                    .addLast("encoder", new StringEncoder())
                                    // 往pipline链中添加自定义的handler(业务处理类)
                                    .addLast(new ChatServerHandler());
                        }
                    });
            System.out.println("Netty chat Server启动。。。");
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println("Netty chat Server关闭。。。");
        }
    }

    public static void main(String[] args) throws Exception {
        new ChatServer(9999).run();
    }

}
package com.example.testdemo.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定义一个服务器端业务处理类
 */
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static List<Channel> channels = new ArrayList<>();

    /**
     * 读取数据
     *
     * @param channelHandlerContext
     * @param s
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel inChannel = channelHandlerContext.channel();
        channels.forEach(channel -> {
            if (channel != inChannel) {
                channel.writeAndFlush("[" + inChannel.remoteAddress().toString().substring(1) + "]" + "说 :" + s + "\n");
            }
        });
    }

    /**
     * 通道就绪
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channels.add(channel);
        System.out.println("[Server] : " + channel.remoteAddress().toString().substring(1) + "上线");
    }

    /**
     * 通道未就绪
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channels.remove(channel);
        System.out.println("[Server] : " + channel.remoteAddress().toString().substring(1) + "离线");
    }
}
package com.example.testdemo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

/**
 * 聊天程序客户端
 */
public class ChatClient {
    /**
     * 服务器端IP地址
     */
    private final String host;

    /**
     * 服务器端端口和
     */
    private final int port;

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 往pipeline链中添加一个解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            // 往pipeline链中添加一个编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            // 往pipeline链中添加自定义的handler(业务处理类)
                            pipeline.addLast(new ChatClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            System.out.println("----" + channel.localAddress().toString().substring(1) + "----");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String nextLine = scanner.nextLine();
                channel.writeAndFlush(nextLine  + "\r\n");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new ChatClient("127.0.0.1", 9999).run();
    }
}
package com.example.testdemo.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * 自定义一个客户端业务处理类
 */
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s.trim());
    }
}

4.6 编码和解码

4.6.1 概述

在编写网络应用程序的时候需要注意codec(编解码器),因为数据在网络中传输的都是二进制字节吗数据,而我们拿到的目标数据往往不是字节吗数据。因此在发送数据时就需要编码,接收数据时需要解码。 codec的组成部分有两个 :decoder(解码器)和encoder(编码器)。encoder负责把业务数据转换成字节码数据,decoder负责把字节码数据转换成业务数据。

其实java的序列化技术就可以作为codec去使用,但是它的硬伤太多 :

  1. 无法跨语言,这应该是Java序列化最致命的问题了。
  2. 序列化后体积太大,是二进制编码的5倍多。
  3. 序列化性能太低。 由于Java序列化硬伤太多,因此Netty自身提供了一些codec,如下所示 : Netty提供的解码器 :
  4. StringDecoder,对字符串数据解码
  5. ObjectDecoder,对Java对象进行解码 Netty提供的解码器:
  6. StringEncoder,对字符串数据进行编码
  7. ObjectEncoder,对Java对象进行编码 Netty自带的ObjectDecoder和ObjectEncoder可以用来实现POJP对象或各种业务对象的编码和解码,但其内部使用的仍是Java序列化技术,所以不建议使用。

4.6.2 Google的Protobuf

Protobuf是Google发布的开源项目,全称Google Protocol Buffers,特定如下 :

  • 支持跨平台、多语言(支持目前绝大多数语言,例如C++、C#、Java、Python等)
  • 高性能,高可靠性
  • 使用protobuf编译器能自动生成代码,Protpbuf是将类的定义使用.protp文件进行描述,然后通过protoc.exe编译器根据.proto自动生成.java文件 目前在使用Netty开发时,经常会结合Protobuf作为codec(编解码器)去使用,具体用法如下所示 :
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.9.1</version>
</dependency>

通过protoc.exe根据该文件生成java类,如下操作所示 :

在protoc.exe根目录运行命令 :protoc --java_out=. Book.pro

5.1 自定义RPC

概述: RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络实现的技术。常见的RPC框架有 :Dubbo,Grpc。

  1. 服务消费房(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给server stub
  7. server stub将返回导入结果进行编码并发送至消费方
  8. client stub接收到消息并进行解码
  9. 服务消费方(client)得到结果 RPC的目标就是将2-8这些步骤都封装起来,用户无需关系这些细节,可以像调用本地方法一样即可完成远程服务调用。

5.2 设计与实现

5.2.1 结果设计

  • Client(服务的调用方) :两个接口 + 一个包含main方法的测试类
  • Client Stub :一个客户端代理类 + 一个客户端业务处理类
  • Server(服务的提供方) :两个接口 + 两个实现类
  • Server Stub :一个网络处理服务器 + 一个服务器业务处理类 注意 :服务调用方的接口必须跟服务提供方的接口保持一致(包路径可以不一致) 最终要实现的目标是 :在TestNettyRPC中远程调用HelloRPCImpl或HelloNettyImpl中的方法
package com.example.testdemo.rpc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class NettyRPCServer {
    private int port;
    public NettyRPCServer(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .localAddress(port)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 编码器
                            pipeline.addLast("encoder", new ObjectEncoder());
                            // 解码器
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            // 服务器端业务处理类
                            pipeline.addLast(new InvokerHandler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("...Server is ready...");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyRPCServer(9999).start();
    }
}
package com.example.testdemo.rpc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class NettyRPCProxy {

    /**
     * 根据接口创建代理对象
     *
     * @param target
     * @return
     */
    public static Object create(Class target) {
        return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 封装classinfo
                ClassInfo classInfo = new ClassInfo();
                classInfo.setClassName(getClass().getName());
                classInfo.setMethodName(method.getName());
                classInfo.setObjects(args);
                classInfo.setTypes(method.getParameterTypes());

                // 开始用Netty发送数据
                EventLoopGroup group = new NioEventLoopGroup();
                ResultHandler resultHandler = new ResultHandler();
                try {
                    Bootstrap bootstrap = new Bootstrap();
                    bootstrap.group(group)
                            .channel(NioSocketChannel.class)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    ChannelPipeline pipeline = ch.pipeline();
                                    // 编码器
                                    pipeline.addLast("encoder", new ObjectEncoder());
                                    // 解码器
                                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                    // 客户端业务处理类
                                    pipeline.addLast("handler", resultHandler);
                                }
                            });
                    ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
                    future.channel().writeAndFlush(classInfo).sync();
                    future.channel().closeFuture().sync();
                } finally {
                    group.shutdownGracefully();
                }

                return resultHandler.getResponse();
            }
        });
    }

}
package com.example.testdemo.rpc;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.reflections.Reflections;

import java.lang.reflect.Method;
import java.util.Set;

/**
 * 服务器端业务处理类
 *
 */
public class InvokerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 得到某接口下某个实现类的名字
     *
     * @param classInfo
     * @return
     * @throws Exception
     */
    private String getImplClassName(ClassInfo classInfo) throws Exception {
        // 服务方接口和实现类所在的包路径
        String interfacePath = "com.example.testdemo.rpc";
        int lastDot = classInfo.getClassName().lastIndexOf(".");
        String interfaceName = classInfo.getClassName().substring(lastDot);
        Class supperClass = Class.forName(interfacePath + interfaceName);
        Reflections reflection = new Reflections(interfacePath);
        // 得到某接口下的所有实现类
        Set<Class> implClassSet = reflection.getSubTypesOf(supperClass);
        if (implClassSet.size() == 0) {
            System.out.println("未找到实现类");
            return null;
        } else if (implClassSet.size() > 1) {
            System.out.println("找个多个实现类,未明确使用哪一个");
            return null;
        } else {
            // 把集合转换为数组
            Class[] classes = implClassSet.toArray(new Class[0]);
            // 得到实现类的名字
            return classes[0].getName();
        }
    }

    /**
     * 读取客户端发来的数据并通过反射调用实现类的方法
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ClassInfo classInfo = (ClassInfo)msg;
        Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
        Method method = classInfo.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
        // 通过反射调用实现类的方法
        Object result = method.invoke(clazz, classInfo.getObjects());
        ctx.writeAndFlush(result);
    }
}
package com.example.testdemo.rpc;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 客户端业务处理类
 */
public class ResultHandler extends ChannelInboundHandlerAdapter {

    private Object response;

    public Object getResponse() {
        return response;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
        ctx.close();
    }
}
package com.example.testdemo.rpc;

import java.io.Serializable;

public class ClassInfo implements Serializable {

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getTypes() {
        return types;
    }

    public void setTypes(Class<?>[] types) {
        this.types = types;
    }

    public Object[] getObjects() {
        return objects;
    }

    public void setObjects(Object[] objects) {
        this.objects = objects;
    }

    private static final long serialVersionUID = 1L;

    /**
     * 类名
     */
    private String className;

    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class<?>[] types;

    /**
     * 参数列表
     */
    private Object[] objects;


}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券