前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >netty了解与应用

netty了解与应用

作者头像
sucl
发布2019-08-07 14:49:47
3710
发布2019-08-07 14:49:47
举报
文章被收录于专栏:企业平台构建企业平台构建

学习netty之前,必须先了解下什么是nio,关于nio的教程网上非常多,和传统io的比较,优势,如何使用,各个方面都有比较细致的分析。

而netty可以说是nio实现的最佳方式,相比你来说

1、封装了使用nio的复杂性,提供了较简的api供开发者使用

2、解决了nio编程中的bug与性能问题

3、提供了一些默认的网络通信模型与协议实现

4、活跃的社区与经过考验的用户

首先还是有必要对nio做个简单的了解,关于nio主要涉及到三个元素channel、buffer、selector,可以说对nio的使用也就是对这三个对象的方法使用:

  • Channel 通道,相当于网络通信的请求响应,channel是双向的,由selector协调完成通信
  • Buffer 缓存,数据的读写都是buffer与channel的交互,将数据读出或写入到buffer进行缓存,
  • Selector 选择器,轮训注册的channel,通知关注的事件

通过一个例子简单理解如何使用:

代码语言:javascript
复制
//服务端
public class Server implements Runnable{
    private boolean running = false;
    private static int port = 9100;
    private ServerSocketChannel channel;
    private Selector selector;
    private ByteBuffer buffer = ByteBuffer.allocate(128);

    public Server(){
        listener();
    }

    private void listener(){
        try {
            channel = ServerSocketChannel.open();

            channel.socket().bind(new InetSocketAddress(port));

            channel.configureBlocking(false);

            selector = Selector.open();

            channel.register(selector, SelectionKey.OP_ACCEPT);

            running = true;
            System.out.println(String.format("启动成功,监听端口:%s",port));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void run() {
        while (true){
            try {
                int count = selector.select();//接入的客户端数量
                if(count>0){
                    Iterator<SelectionKey> keyIt = selector.selectedKeys().iterator();
                    while (keyIt.hasNext()){
                        SelectionKey key = keyIt.next();
                        if(key.isValid()){//此键是否有效
    //                        System.out.println("valid");
                        }
                        if(key.isConnectable()){//此键的通道是否已完成其套接字连接操作
    //                        System.out.println("connectable");
                        }
                        if(key.isAcceptable()){//此键的通道是否已准备好接受新的套接字连接
                            System.out.println("acceptable");
                            ServerSocketChannel _channel = (ServerSocketChannel)key.channel();//和上面定义channel一样
                            _channel.accept()//这个就是acceptable,获取连接的客户端
                                    .configureBlocking(false)
                                    .register(selector, SelectionKey.OP_READ );
                        }
                        //读的时候需要注意limit的位置(remaining()),而并不是整个buffer的内容(array())
                        if(key.isReadable()){//此键的通道是否准备读取
    //                        System.out.println("readable");
                            buffer.flip();//写->读 l->p,p->0
                            SocketChannel _channel = (SocketChannel)key.channel();//客户端
                            //将buffer读到channel中
                            int len = _channel.read(buffer);//position、limit变化了 P->l
                            buffer.flip();//如果不调用remaining()
                            System.out.println(new String(buffer.array(),0,buffer.remaining()));//len
                            buffer.clear();
                            _channel.register(selector, SelectionKey.OP_WRITE);
                        }
                        //在写的时候,对于buffer经历了写(将数据写入buffer)、读(从buffer读取写入channel)
                        if(key.isWritable()){//此键的通道是否准备写入
    //                        System.out.println("writable");
                            SocketChannel _channel =(SocketChannel)key.channel();//服务端
                            buffer.clear();//读->写,准备往buffer写入  p->0,l->c
                            String msg = "message :"+new Date();
                            buffer.put(msg.getBytes());
                            buffer.flip();//写->读,channel从buffer中读出 l->p,p->0
                            _channel.write(buffer);
                            _channel.register(selector, SelectionKey.OP_READ);
                        }
                        keyIt.remove();//需要主动删除,不然会重复消费
                    }
                }
            } catch (IOException e) {
//                _channel.close();
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Thread app = new Thread(new Server());
        app.start();
    }
}

我们可以直接使用telnet localhost 9100,或者自己写个简单的客户端:

代码语言:javascript
复制
public class Client implements Runnable{
    private SocketChannel clientChannel;
    private Selector selector;
    private ByteBuffer buffer = ByteBuffer.allocate(128);

    public Client() throws IOException {
        clientChannel = SocketChannel.open();
        clientChannel.configureBlocking(false);
        clientChannel.connect(new InetSocketAddress("localhost",9100));

        selector = Selector.open();

        clientChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_CONNECT);
        while (!clientChannel.finishConnect()){

        }
        System.out.println("连接成功");
    }

    public void write(String msg){
        buffer.clear();
        buffer.put(msg.getBytes());
        buffer.flip();
        try {
            clientChannel.write(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void run() {
        while (true){
            try {
                selector.select();
                Iterator<SelectionKey> keyIt = selector.selectedKeys().iterator();
                while (keyIt.hasNext()){
                    SelectionKey key = keyIt.next();
                    if(key.isConnectable()){
                        System.out.println("connected");
                    }
                    if(key.isReadable()){
                        SocketChannel channel = (SocketChannel) key.channel();
                        buffer.clear();
                        int len = channel.read(buffer);
                        buffer.flip();
                        System.out.println( "接收信息:"+ new String(buffer.array(),0,len));
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        new Thread(client).start();

        Scanner scanner = new Scanner(System.in);
        while (true){
            client.write(scanner.nextLine());
        }
    }
}

主要通过示例理解开发的过程,以及上门说到的三个对象以及其衍生对象的使用方法,特别要注意下buffer的读写切换,比如我们在往channel中写入数据时,首先需要调用clear()方法,然后channel从buffer中读取数据,需要调用flip()将其转换成读模式。其次,在从buffer读取数据时,比如多次对buffer写入过,新的写入前调用的clear()并不是将buffer清空,只不过将其position与limit的位置进行了重设。

比如

代码语言:javascript
复制
CharBuffer buffer = CharBuffer.allocate(10);
buffer.put(new char[]{'1','2','3','4','5'});
buffer.clear();
buffer.put(new char[]{'a'});
//buffer里面的值其实是 a 1 2 3 4 5 但是读模式时 limit=1;不要用buffer.array(),那样是所有的

关于netty,相比而言使用比较简单,可以看到我们如果单纯的使用nio进行开发,需要关注很多细节的处理,同时还有性能的保证都是需要自己去处理的,特别是在网络通信中,各种协议的实现、通信保持、心跳检查、数据编码、拆包粘包、数据缓存等实现都需要开发者去完成,因此开发一套可靠性比较高的通信框架是非常复杂的。

netty既然封装了这些复杂度,那么我们如何去使用,还是通过示例去属性,毕竟关于netty的实现原理,通信模型网上有太多的解释,作为了解已经足够,通过示例学习,能更快了解该框架:

代码语言:javascript
复制
//服务端
public class Server {
	
	private int port;

	public Server(int port) {
		this.port = port;
	}
	
	public void run() throws InterruptedException {
		//1
		EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
		EventLoopGroup wookerLoopGroup = new NioEventLoopGroup();
		//2
		ServerBootstrap serverBootstrap = new ServerBootstrap();
		//3
		serverBootstrap.group(bossLoopGroup, wookerLoopGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 128)	//设置TCP缓冲区
				.option(ChannelOption.SO_SNDBUF, 10*1024)	//发送缓冲
				.option(ChannelOption.SO_RCVBUF, 10*1024)	//接收缓冲
				.childOption(ChannelOption.SO_KEEPALIVE, true)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel channel) throws Exception {
						channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){

							@Override
							public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
								ByteBuf buf = (ByteBuf) msg;
								byte[] req = new byte[buf.readableBytes()];
								buf.readBytes(req);
								System.err.println("接收到的信息:" + new String(req));
								ByteBuf resp = Unpooled.copiedBuffer("服务端返回信息".getBytes());
								ctx.writeAndFlush(resp);
							}

							@Override
							public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
								ctx.close();
							}

							@Override
							public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
								cause.printStackTrace();
								ctx.close();
							}
						});
					}

				});
		//4
		ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
		channelFuture.channel().closeFuture().sync();
		//5
		wookerLoopGroup.shutdownGracefully();
		bossLoopGroup.shutdownGracefully();
			
	}
	
	public static void main(String[] args) throws InterruptedException {
		new Server(9093).run();
	}

}
代码语言:javascript
复制
//客户端
public class Client {

	private String url;
	private int port;

	public Client(String url,int port){
		this.url = url;
		this.port = port;
	}

	public void start() throws InterruptedException {
		//1
		EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
		//2
		Bootstrap bootstrap = new Bootstrap();
		//3
		bootstrap.group(eventLoopGroup)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.SO_KEEPALIVE,true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel channel) throws Exception {
						channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buffer = (ByteBuf) msg;
                                byte[] bs = new byte[buffer.readableBytes()];
                                buffer.readBytes(bs);
                                System.err.println("客户端接收的消息:"+new String(bs));
                            }

                            @Override
                            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                                ctx.close();
                            }

                            @Override
                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                cause.printStackTrace();
                                ctx.close();
                            }
                        });
					}

				});
		//4
		ChannelFuture future = bootstrap.connect(url, port);
		//5
		future.channel().writeAndFlush(Unpooled.copiedBuffer("客户端发送的信息!".getBytes()));
		future.channel().closeFuture().sync();
		//6
		eventLoopGroup.shutdownGracefully();
	}

	public static void main(String[] args) throws InterruptedException {
		Client client = new Client("localhost",9093);
		client.start();
	}
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档