前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty 之 AbstractChannel 和 AbstractNioChannel 源码分析

Netty 之 AbstractChannel 和 AbstractNioChannel 源码分析

作者头像
java404
发布2019-02-26 09:56:48
5580
发布2019-02-26 09:56:48
举报
文章被收录于专栏:java 成神之路java 成神之路

概述

channel 是 netty 网络 IO 操作抽象出来的一个接口,主要功能有:网络IO的读写,客户端发起连接、主动关闭连接,链路关闭,获取通信双方的网络地址等。

这里我们只简单分析下 AbstractChannel 和 AbstractNioChannel 这两个Channel源码。

AbstractChannel 源码

代码语言:javascript
复制
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    // 父 Channel(NioServerSocketChannel 是没有父channel的)
    private final Channel parent;
    // Channel 唯一ID
    private final ChannelId id;
    // Unsafe 对象,封装 ByteBuf 的读写操作
    private final Unsafe unsafe;
    // 关联的 Pipeline 对象
    private final DefaultChannelPipeline pipeline;
    
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);
    // 本地地址和远端地址
    private volatile SocketAddress localAddress;
    private volatile SocketAddress remoteAddress;
    // EventLoop 封装的 Selector
    private volatile EventLoop eventLoop;
    // 是否注册
    private volatile boolean registered;
    private boolean closeInitiated;

    /** Cache for the string representation of this channel */
    private boolean strValActive;
    private String strVal;
AbstractChannel 构造
代码语言:javascript
复制
protected AbstractChannel(Channel parent, ChannelId id) {
    this.parent = parent;
    this.id = id;
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

// Unsafe 实现交给子类实现
protected abstract AbstractUnsafe newUnsafe();

// 创建 DefaultChannelPipeline 对象
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

Unsafe类里实现了具体的连接与写数据。比如:网络的读,写,链路关闭,发起连接等。之所以命名为unsafe是不希望外部使用,并非是不安全的。

DefaultChannelPipeline 只是一个 Handler 的容器,也可以理解为一个Handler链,具体的逻辑由Handler处理,而每个Handler都会分配一个EventLoop,最终的请求还是要EventLoop来执行,而EventLoop中又调用Channel中的内部类Unsafe对应的方法。 新建一个channel会自动创建一个ChannelPipeline。

这里创建 DefaultChannelPipeline,构造中传入当前的 Channel,而读写数据都是在 ChannelPipeline 中进行的,ChannelPipeline 进行读写数据又委托给 Channel 中的 Unsafe 进行操作。

AbstractUnsafe 类结构

从类结构中可以看出 Unsafe 实现了 网络的读,写,连接关闭,发起连接等操作。

AbstractNioChannel 源码

代码语言:javascript
复制
public abstract class AbstractNioChannel extends AbstractChannel {

    // 抽象了 SocketChannel 和 ServerSocketChannel 的公共的父类
    private final SelectableChannel ch;
    // SelectionKey.OP_READ 读事件
    protected final int readInterestOp;
    // 注册到 selector 上返回的 selectorKey
    volatile SelectionKey selectionKey;
    // 是否还有未读的数据
    boolean readPending;
    private final Runnable clearReadPendingRunnable = new Runnable() {
        @Override
        public void run() {
            clearReadPending0();
        }
    };

    /**
     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
     */
    // 连接操作的结果
    private ChannelPromise connectPromise;
    // 连接超时定时任务
    private ScheduledFuture<?> connectTimeoutFuture;
    // 客户端地址
    private SocketAddress requestedRemoteAddress;
   
    ....

SelectableChannel 抽象了 java.nio.SocketChannel 和 java.nio.ServerSocketChannel 的公共方法。netty 封装了 NIO 的channel。

注册 Channel
代码语言:javascript
复制
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 这里注册的事件为0
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

javaChannel() :是 Nio 中的 channel。 eventLoop().unwrappedSelector(): 是 Nio 中的 selector。

在java.nio.channels.SelectionKey 类中定义了 4种事件。

代码语言:javascript
复制
// 读操作事件
public static final int OP_READ = 1 << 0;
// 写操作事件
public static final int OP_WRITE = 1 << 2;
// 客户端链接服务的操作事件
public static final int OP_CONNECT = 1 << 3;
// 服务端接受客户端连接事件
public static final int OP_ACCEPT = 1 << 4;

这里注册的是0,说明对任何事件都不感兴趣,仅仅完成注册操作。把当前的 Channel当做附件进行注册。如果注册成功则返回 selectionKey,通过 selectionKey 可用从 Selector 中获取 当前注册的 Channel。

什么情况下才抛出 CancelledKeyException 异常呢?

由于尚未调用select.select(..)操作,因此可能仍在缓存而未删除“已取消”selectionkey,因此强制调用 selector.selectNow() 方法将已经取消的 selectionKey 从 selector 上删除。

只有第一次抛出此异常,才调用 selector.selectNow() 进行取消。 如果调用 selector.selectNow() 还有取消的缓存,可能是jdk的一个bug。

代码语言:javascript
复制
@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

该方法首先判断 Channel 是否可用,不可用直接返回。 获取该 selectionKey 注册到 selector 的事件。 如果注册的事件 位运算与 读事件 等于0,则说明该 Channel 没有在 selelctor 上注册读事件,在这里注册读事件。

什么情况下会调用 doBeginRead() 方法?

当 Channel 处于 channelActive 状态后,就会在DefaultChannelPipeline.channelActive 方法中调用 doBeginRead() 方法,在 selector 上注册读事件。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.01.27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • AbstractChannel 源码
    • AbstractChannel 构造
      • AbstractUnsafe 类结构
      • AbstractNioChannel 源码
        • 注册 Channel
          • 什么情况下才抛出 CancelledKeyException 异常呢?
            • 什么情况下会调用 doBeginRead() 方法?
            相关产品与服务
            容器服务
            腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档