前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty Review - 探索Pipeline的Inbound和Outbound

Netty Review - 探索Pipeline的Inbound和Outbound

作者头像
小小工匠
发布2023-12-01 10:01:12
3000
发布2023-12-01 10:01:12
举报
文章被收录于专栏:小工匠聊架构

概念

我们知道boss线程监控到绑定端口上有accept事件,此时会为该socket连接实例化Pipeline,并将InboundHandlerOutboundHandler按序加载到Pipeline中,然后将该socket连接(也就是Channel对象)挂载到selector

一个selector对应一个线程,该线程会轮询所有挂载在他身上的socket连接有没有readwrite事件,然后通过线程池去执行Pipeline的业务流

selector如何查询哪些socket连接有readwrite事件,主要取决于调用操作系统的哪种IO多路复用内核

  • 如果是select注意,此处的select是指操作系统内核的select IO多路复用,不是nettyseletor对象),那么将会遍历所有socket连接,依次询问是否有readwrite事件,最终操作系统内核将所有IO事件的socket连接返回给netty进程,当有很多socket连接时,这种方式将会大大降低性能,因为存在大量socket连接的遍历和内核内存的拷贝
  • 如果是epoll,性能将会大幅提升,因为它基于完成端口事件,已经维护好有IO事件的socket连接列表,selector直接取走,无需遍历,也少掉内核内存拷贝带来的性能损耗

Netty中,InboundOutbound是两个重要的概念,用于描述数据在ChannelPipeline中的流动方向。

Inbound(入站)指的是数据从网络传输到应用程序,即数据从远程主机进入本地主机。在ChannelPipeline中,Inbound数据会依次经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的末尾。

Outbound(出站)指的是数据从应用程序传输到网络,即数据从本地主机发送到远程主机。在ChannelPipeline中,Outbound数据会从Pipeline的末尾开始,逆序经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的起始位置。

InboundOutbound的区别在于数据的流动方向。Inbound数据是从网络进入应用程序,而Outbound数据是从应用程序发送到网络。这意味着Inbound数据是应用程序接收和处理外部数据的入口,而Outbound数据是应用程序发送数据到外部的出口。

虽然InboundOutbound描述了数据的不同流动方向,但它们之间也存在联系。在ChannelPipeline中,InboundOutbound数据可以相互影响和交互。例如,一个ChannelHandler可以在处理Inbound数据时生成Outbound数据作为响应,或者在处理Outbound数据时修改Inbound数据的内容。

总结起来,InboundOutbound是描述数据在ChannelPipeline中流动方向的概念。Inbound数据是从网络进入应用程序,Outbound数据是从应用程序发送到网络。它们在ChannelPipeline中相互影响和交互,共同实现网络数据的处理和传输。


Pipeline的责任链是通过ChannelHandlerContext对象串联的,ChannelHandlerContext对象里封装了ChannelHandler对象,通过prev和next节点实现双向链表。Pipeline的首尾节点分别是headtail,当selector轮询到socketread事件时,将会触发Pipeline责任链,从head开始调起第一个InboundHandlerChannelRead事件,接着通过fire方法依次触发Pipeline上的下一个ChannelHandler .

ChannelHandler分为InbounHandlerOutboundHandler

  • InboundHandler用来处理接收消息
  • OutboundHandler用来处理发送消息。

headChannelHandler既是InboundHandler又是OutboundHandler,无论是read还是write都会经过head,所以head封装了unsafe方法,用来操作socketreadwritetailChannelHandler只是InboundHandlerreadPipleline处理将会最终到达tail


演示之前,我们先附一下代码

Server Code

代码语言:javascript
复制
package com.artisan.pipeline.inout;

import com.artisan.pipeline.inout.handler.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanEchoServer {

    private int port;

    public ArtisanEchoServer(int port) {
        this.port = port;
    }

    private void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ch.pipeline().addLast(new EchoOutboundHandler3());
                            ch.pipeline().addLast(new EchoOutboundHandler2());
                            ch.pipeline().addLast(new EchoOutboundHandler1());


                            ch.pipeline().addLast(new EchoInboundHandler1());
                            ch.pipeline().addLast(new EchoInboundHandler2());
                            ch.pipeline().addLast(new EchoInboundHandler3());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 10000)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            System.out.println("EchoServer正在启动...");

            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            System.out.println("EchoServer绑定端口:" + port);

            channelFuture.channel().closeFuture().sync();
            System.out.println("EchoServer已关闭.");


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        int port = 1234;
        if (args != null && args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        ArtisanEchoServer server = new ArtisanEchoServer(port);
        server.run();
    }
}

6个handler演示如下

代码语言:javascript
复制
package com.artisan.pipeline.inout.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println();

        System.out.println("进入 EchoInboundHandler1.channelRead");

        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println("EchoInboundHandler1.channelRead 收到数据:" + data);
        ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8));

        System.out.println("退出 EchoInboundHandler1 channelRead");

        System.out.println();

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[EchoInboundHandler1.channelReadComplete]");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("[EchoInboundHandler1.exceptionCaught]" + cause.toString());
    }
}
代码语言:javascript
复制
package com.artisan.pipeline.inout.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println();

        System.out.println("进入 EchoInboundHandler2.channelRead");

        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println("EchoInboundHandler2.channelRead 接收到数据:" + data);
        //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));
        ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));

        System.out.println("退出 EchoInboundHandler2 channelRead");

        System.out.println();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[EchoInboundHandler2.channelReadComplete]读取数据完成");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("[EchoInboundHandler2.exceptionCaught]");
    }
}
代码语言:javascript
复制
package com.artisan.pipeline.inout.handler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoInboundHandler3 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println();

        System.out.println("进入 EchoInboundHandler3.channelRead");

        String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);
        System.out.println("EchoInboundHandler3.channelRead 接收到数据:" + data);
        //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write] [EchoInboundHandler3] " + data, CharsetUtil.UTF_8));
        ctx.fireChannelRead(msg);

        System.out.println("退出 EchoInboundHandler3 channelRead");

        System.out.println();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[EchoInboundHandler3.channelReadComplete]读取数据完成");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("[EchoInboundHandler3.exceptionCaught]");
    }

}
代码语言:javascript
复制
package com.artisan.pipeline.inout.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {


    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("进入 EchoOutboundHandler1.write");

        //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write中的write]", CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("在OutboundHandler里测试一下channel().writeAndFlush", CharsetUtil.UTF_8));
        ctx.write(msg);

        System.out.println("退出 EchoOutboundHandler1.write");
    }

}
代码语言:javascript
复制
package com.artisan.pipeline.inout.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("进入 EchoOutboundHandler2.write");

        //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write中的write]", CharsetUtil.UTF_8));
        ctx.write(msg);

        System.out.println("退出 EchoOutboundHandler2.write");
    }
}
代码语言:javascript
复制
package com.artisan.pipeline.inout.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EchoOutboundHandler3 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("进入 EchoOutboundHandler3.write");

        ctx.write(msg);

        System.out.println("退出 EchoOutboundHandler3.write");
    }

}

Client Code

代码语言:javascript
复制
package com.artisan.netty4.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author 小工匠
 * @version 1.0
 * @description: 客户端启动程序
 * @mark: show me the code , change the world
 */
public class ArtisanClient {

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建bootstrap对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            //设置线程组
            bootstrap.group(eventExecutors)
                    //设置客户端的通道实现类型
                    .channel(NioSocketChannel.class)
                    //使用匿名内部类初始化通道
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加客户端通道的处理器
                            ch.pipeline().addLast(new ArtisanClientHandler());
                        }
                    });
            System.out.println("客户端准备就绪");
            //连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();
            //对通道关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程组
            eventExecutors.shutdownGracefully();
        }
    }
}
代码语言:javascript
复制
package com.artisan.netty4.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author 小工匠
 * @version 1.0
 * @description: 通用handler,处理I/O事件
 * @mark: show me the code , change the world
 */
@ChannelHandler.Sharable
public class ArtisanClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("msg send from client 2 server ", CharsetUtil.UTF_8));
        System.out.println("客户端发消息给服务端结束");

        System.out.println();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

}

InboundHandler和OutboundHandler的执行顺序

在InboundHandler中不触发fire方法

ArtisanEchoServer#run 中我们先进存在InboundHandler

先启动server, 在启动Client,我们测试一下

我们可以看到: InboundHandler2没有调用fire事件,InboundHandler3没有被执行

InboundHandler是通过fire事件决定是否要执行下一个InboundHandler,如果InboundHandler没有调用fire事件,那么后续的Pipeline中的Handler将不会执行。

我们来看下源码


InboundHandler和OutboundHandler的执行顺序

加入Pipeline的ChannelHandler的顺序如上。

别忘了放开EchoInboundHandler2

代码语言:javascript
复制
 ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));

我们来验证下

执行顺序如上。

代码语言:javascript
复制
InboundHandler1 => InboundHandler2 => OutboundHandler1 => OutboundHander2 => OutboundHandler3 => InboundHandler3

1、InboundHandler是按照Pipleline的加载顺序,顺序执行。

2、OutboundHandler是按照Pipeline的加载顺序,逆序执行。


如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗

其中EchoInboundHandler2 先不要给客户端发送数据,先屏蔽掉。

代码语言:javascript
复制
public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("进入 EchoInboundHandler2.channelRead");

        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println("EchoInboundHandler2.channelRead 接收到数据:" + data);
//        ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));
        ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));


        System.out.println("退出 EchoInboundHandler2 channelRead");

    }
.......
.......
.......
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-11-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概念
  • Server Code
  • Client Code
  • InboundHandler和OutboundHandler的执行顺序
    • 在InboundHandler中不触发fire方法
      • InboundHandler和OutboundHandler的执行顺序
        • 如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档