前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >初识Netty

初识Netty

作者头像
Liusy
发布2020-09-01 14:51:14
3750
发布2020-09-01 14:51:14
举报
文章被收录于专栏:Liusy01Liusy01

【导读】我在两年前的时候就购买了《Netty权威指南》看了一下,不过没看懂哈哈哈哈,工作中也用不到,很快就忘了,直到前段时间在dy那边需要我重构一个TCP连接通信工具(半个月时间只给它搭了一个架子),所以最近也重新翻开书进行学习。Netty是最流行的NIO框架之一,其健壮性、功能、性能、可定制性和可扩展性都是很优秀的。Netty是Hadoop生态中RPC框架Avro以及Dubbo底层的通信框架。今儿就来聊一下其的简单使用。

一、Netty逻辑架构

采用三层网络架构进行设计和开发,架构如图:

1、Reactor通信调度层

监听网络的读写和连接请求,负责将网络层的数据读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接激活、读写事件等,然后将这些事件触发到Pipeline中,由Pipeline管理的职责链进行后续处理。

什么是反应堆Reactor:

BIO模型在并发高的情况下,会创建大量线程,性能很低并可能导致服务宕机。

NIO工作原理:

(1)由一个专门的线程处理所有的IO事件,并负责分发,也就是Selector

(2)事件驱动机制:事件到的时候触发,而不是同步的去监视事件

(3)线程通信:线程之间通过wait/notify等方式进行通信,减少无谓的线程切换

2、职责链Pipeline

负责事件在职责链之间的有序传播,动态的编排职责链。可选择监听和处理特定的事件,拦截处理事件,比如对消息的编解码。使用的是设计模式中的责任链模式。

3、业务逻辑编排层Service ChannelHandler

分成两类:一类是纯粹的业务逻辑编排,另一类是应用层协议插件,用于特定协议的会话和链路管理。比如对消息进行编解码,协议不同,编解码步骤也不一样,例如Http和WebSocket

二、用Netty实现简易聊天窗口

在上一篇用JavaNIO写了一个例子,Java原生的NIO开发需要手动判断事件的类型才能做下一步处理,相比于原生Java NIO的开发,Netty的开发会简单很多,不需要开发者关注当前事件是什么类型的事件,只需在相应的方法里关注业务逻辑的处理等。

1、环境准备:引入Netty依赖

2、服务端开发

代码语言:javascript
复制
public class Server {    private int port;    public Server(int port) {        this.port = port;    }    public void start(){        //配置服务端NIO线程        //接收客户端连接,事件分发线程池        NioEventLoopGroup boss = null;        //处理读写事件线程池        NioEventLoopGroup workder = null;        try {            boss = new NioEventLoopGroup();            workder = new NioEventLoopGroup();
            //创建服务端,相当于NIO的ServerSocketChannel            ServerBootstrap server = new ServerBootstrap();            server.group(boss,workder)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline pipeline = ch.pipeline();                            //绑定IO事件处理类                            pipeline.addLast(new ServerHandler());                        }                    })                    //最大客户端连接数                    .option(ChannelOption.SO_BACKLOG, 128)                    //与客户端保持长链接                    .childOption(ChannelOption.SO_KEEPALIVE, true);            //绑定端口,同步等待            ChannelFuture future = server.bind(this.port).sync();            System.out.println("服务端已启动,端口为:"+port);
            //等待服务器监听端口关闭            future.channel().closeFuture().sync();
        } catch (Exception e) {
        } finally {            //优雅退出,释放线程池资源            boss.shutdownGracefully();            workder.shutdownGracefully();        }    }    public static void main(String[] args) {        new Server(8080).start();    }}

其中在childHandler中是绑定了IO事件的处理类,作用类似于Reactor模式中的Handler类,主要用于处理网络IO事件,例如消息编解码等,我这边主要是用于演示Netty使用,所以未做编解码的处理。

在这边主要用于对聊天室信息发送的处理,看一下其编码:

代码语言:javascript
复制
public class ServerHandler extends ChannelHandlerAdapter {
    //存放所有Channel    private static final ChannelGroup onLineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    //定义channel的私有名称属性,用于区分channel    AttributeKey<String> name = AttributeKey.valueOf("name");
    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //将消息转换成ByteBuf,对应于JavaNIO的ByteBuffer        ByteBuf buf = (ByteBuf) msg;        byte[] bytes = new byte[buf.readableBytes()];        buf.readBytes(bytes);        String body = new String(bytes, "UTF-8");        Channel channel = ctx.channel();
        String message = null;        //如果是新建连接        if (!onLineUsers.contains(channel)) {            channel.attr(name).getAndSet(body);            onLineUsers.add(channel);            message = body+"在"+getCurrentTime()+"进入聊天室,当前聊天室人数为:"+onLineUsers.size();            System.out.println(message);            for (Channel user : onLineUsers) {                if (user!=channel) {                    //发送给聊天室的其他人                    ByteBuf resp = Unpooled.copiedBuffer(message.getBytes());                    user.writeAndFlush(resp);                } else {                    //发送给自己                    ByteBuf resp = Unpooled.copiedBuffer("您已进入聊天室".getBytes());                    user.writeAndFlush(resp);                }            }        } else {            //将发送的消息发给其他人            message = "在"+getCurrentTime()+"发送了一条消息,内容:" + body;            for (Channel user : onLineUsers) {                if (user!=channel) {                    ByteBuf resp = Unpooled.copiedBuffer((channel.attr(name).get()+message).getBytes());                    user.writeAndFlush(resp);                }            }        }    }
    public static String getCurrentTime(){        SimpleDateFormat format = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");        String time = format.format(new Date());        return time;    }
}

需要继承ChannelHandlerAdapter类重写ChannelRead方法,用于读取客户端消息并进行处理,利用ChannelGroup存放所有channel,也就是聊天室的人,利用AttributeKey定义channel的私有名称属性,用于区分channel。

3、客户端开发

代码语言:javascript
复制
public class Client {
    public static void main(String[] args) {        //监听控制台输入,将输入的字符用于在服务端定义当前连接的名字        Scanner scanner = new Scanner(System.in);        new Client().start(scanner.nextLine());    }
    private void start(final String name) {        //用于IO读写的线程池        NioEventLoopGroup group = new NioEventLoopGroup();        try {            //相当于NIO的SocketChannel            Bootstrap client = new Bootstrap();            client.group(group)                    .option(ChannelOption.TCP_NODELAY,true)                    .channel(NioSocketChannel.class)                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            //绑定IO读写事件的处理类                            ch.pipeline().addLast(new ClientHandler(name));                        }                    });            //发起连接            ChannelFuture cf = client.connect(new InetSocketAddress(8080)).sync();            //等待关闭            cf.channel().closeFuture().sync();        } catch (Exception e) {        } finally {            group.shutdownGracefully();        }    }
}

I/O事件处理类:

代码语言:javascript
复制
public class ClientHandler extends ChannelHandlerAdapter{
    private String name;
    public ClientHandler(String name) {        this.name = name;    }

    /**     * 连接成功后触发此方法     * @param ctx     * @throws Exception     */    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //将第一次输入的名称发送给客户端        String order = name;        ByteBuf buf = Unpooled.buffer(order.length());        buf.writeBytes(order.getBytes());        ctx.writeAndFlush(buf);        //另起一个线程监听控制台输入,如果不添加线程,此客户端会阻塞在此        new Thread(new Runnable() {            @Override            public void run() {                Scanner scanner = new Scanner(System.in);                while (scanner.hasNextLine()) {                    String s = scanner.nextLine();                    ByteBuf buffer = Unpooled.buffer(s.length());                    buffer.writeBytes(s.getBytes());                    ctx.writeAndFlush(buffer);                }            }        }).start();
    }


    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //接收服务端返回的消息        ByteBuf buf=(ByteBuf) msg;        byte[] bytes=new byte[buf.readableBytes()];        buf.readBytes(bytes);        String txt=new String(bytes,"UTF-8");        System.out.println("【"+name+"】接收到信息:"+txt);    }
}

4、启动

(1)启动服务端

(2)启动两个客户端,名称分别是张三、李四

张三:

此时服务端会输出信息:

李四:

此时服务端会输出信息:

用张三的窗口发送一条消息给李四:

李四会受到消息:

当一个客户端发消息的时候,其他在群里面的人都会受到消息。

=============================

以上就是Netty的简略使用,更多敬请关注之后几篇,一起学习。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Liusy01 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档