前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty—基本组件

Netty—基本组件

作者头像
Java开发者之家
发布2021-06-17 16:56:51
3080
发布2021-06-17 16:56:51
举报

netty中组件主要包括Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipeline等。

# Channel、EventLoop、ChannelFuture

Channel---Socket
EventLoop---控制流、多线程处理、并发
定义了netty的核心抽象,用于处理连接的生命周期中所发生的事件
ChannelFuture---异步通知
# Channel接口层次
  1. EmbeddedChannel
  2. LocalServerChannel
  3. NioDatagramChannel
  4. NioSctpChannel
  5. NioSocketChannel
# EventLoop
  1. 一个EventLoopGroup中包含了一个或者多个EventLoop;
  2. 一个EventLoop在它的生命周期内只和一个Thread绑定。
  3. 所有由EventLoop处理的I/O事件都将在专有的Thread上被处理。
  4. 一个Channel在它的生命周期内只注册一个或者多个Channel。
  5. 一个EventLoop可以会被分配给一个或者多个Channel。

# ChannelHandler、ChannelPipeline

# ChannelHandler

ChannelHandler是处理入站和出站的事件、数据的应用程序逻辑容器。其中ChannelInBoundHandler相当于接收入站事件和数据。当你要给连接的客户端响应时,也可以从ChannelInBoundHandler中冲刷数据。

# 自定义ChannelHandler时会用的适配器类
  • ChannelHandlerAdapter
  • ChannelInboundHandlerAdapter
  • ChannelOutboundHandlerAdapter
  • ChannelDuplexHandler
# ChannelPipeline

ChannelPipeline为ChannelHandler链提供容器,当Channel被穿件时,会自动的被分配到它的专属ChannelPipeline中。其中ChannelHandler的执行顺序是由它们被添加的顺序所决定的。

netty
netty

实现代码

  • Server
    public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //创建EventLoopGroup,异步通知消息
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group)
                    //指定nio-Channel(Socket)来传输小心
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(serverHandler);
                        }
                    });
            //异步绑定服务器,调用sync知道sync方法直到成功绑定
            ChannelFuture future = bootstrap.bind().sync();
            //获取Channel的CloseFuture,并且阻塞当前线程知道他完成
            future.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully().sync();
        }
    }
  • Client
public void start()
            throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //连接到远程节点,阻塞等待直到连接完成
            ChannelFuture f = b.connect().sync();
            //阻塞直到channel关闭
            f.channel().closeFuture().sync();
        } finally {
            //关闭连接并且释放所有资源
            group.shutdownGracefully().sync();
        }
    }

# Netty中发送消息的两种方法

  • ChannelHandler ChannelHandler会导致消息从消息从ChannelPipeline的尾端开始流动。
  • ChannelHandlerContext ChannelHandlerContext会导致消息从ChannelPipeline中的下一个ChannelHandler开始流动。

# 实现

# ClientHandler
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //通知channel是活跃状态时发送一条消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        //记录已经接受消息的转发
        System.out.println("Client received: " + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //发生异常记录并且关闭channel
        cause.printStackTrace();
        ctx.close();
    }
}
# ServerHandler
/**
 * @ChannelHandler.Sharable 一个Channel-Handler可以被多个Channel共享
 */
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 对于每个传入的消息都要处理调用,将受到的消息写给发送者,而不冲刷出站消息,从socket中读取消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("Server received" + byteBuf.toString(CharsetUtil.UTF_8));
        ctx.write(byteBuf);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-08-10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • # Channel、EventLoop、ChannelFuture
    • # Channel接口层次
      • # EventLoop
      • # ChannelHandler、ChannelPipeline
        • # ChannelHandler
          • # 自定义ChannelHandler时会用的适配器类
            • # ChannelPipeline
            • # Netty中发送消息的两种方法
            • # 实现
              • # ClientHandler
                • # ServerHandler
                相关产品与服务
                容器服务
                腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档