前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty中Channel与Unsafe源码解读

Netty中Channel与Unsafe源码解读

作者头像
良辰美景TT
发布2018-09-11 14:28:38
7310
发布2018-09-11 14:28:38
举报

  Channel是netty网络操作抽象类,包括网络的读,写,链路关闭,发起连接等。我们拿出NioServerSocketChannel来进行分析,NioServerSocketChannel的类图如下所示:

image.png

Netty使用了聚合的方使来实现channel的功能,先看看看AbstractChannel 都聚合了那些功能,源码如下,关键代码会加上相应的注释:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//这是channel的父channel:所谓的父channel是指处理读写的channel有个连接的父channel
    private final Channel parent;
// id表示channel的唯一标识
    private final ChannelId id;
//unsafe类里实现了具体的连接与写数据,之所以命名为unsafe是不希望外部使用,并非是不安全的
    private final Unsafe unsafe;
//管理channelHandler的pipeline。
    private final DefaultChannelPipeline pipeline;

    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);
//地址信息
    private volatile SocketAddress localAddress;
    private volatile SocketAddress remoteAddress;
//eventLoop就是react线程对象啦
    private volatile EventLoop eventLoop;
    private volatile boolean registered;
    private boolean closeInitiated;

    /** Cache for the string representation of this channel */
    private boolean strValActive;
    private String strVal;

//可以看出下面的方法都会触发pipeline的链路调用
    @Override
    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress) {
        return pipeline.connect(remoteAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }

    @Override
    public ChannelFuture disconnect() {
        return pipeline.disconnect();
    }

    @Override
    public ChannelFuture close() {
        return pipeline.close();
    }

   @Override
    public Channel read() {
        pipeline.read();
        return this;
    }

    @Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }

    @Override
    public ChannelFuture write(Object msg, ChannelPromise promise) {
        return pipeline.write(msg, promise);
    }

}

AbstractNioChannel 集成了 JDK内部的channel, SelectionKey。 这个类里实现了将Channel注册到对应的Selector上。源码如下:

public abstract class AbstractNioChannel extends AbstractChannel {
//jdk内部的 channel对象
    private final SelectableChannel ch;
//设置读操作位,
    protected final int readInterestOp;
// 这里定义了selectionKey,里面包含了发生了的事件
    volatile SelectionKey selectionKey;
// promise相关
    private ChannelPromise connectPromise;
    private ScheduledFuture<?> connectTimeoutFuture;
//请求的地址信息
    private SocketAddress requestedRemoteAddress;

//构造方法,需要传入下面几个参数
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

//通过SelectionKey 设置注册读事件
    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

//将channel注册到 Selector上 
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

}

NioServerSocketChannel 提供了主要功能是绑定某个端口与接收client端的连接,关键源码如下:

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

//这个方法属属于NioServerSocketChannel 类方法 调用provider生成jdk底层的ServerSocketChannel 对象 
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

//执行bind操作
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
//javaChannel()方法返回的实际上就是上一个方法生成的ServerSocketChannel 对象
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

//接收客户端的请求会调用到的方法
    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
//调用java底层的accept方法
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
//可以看出这里新建了个NioSocketChannel对象,并把当前对象 当成新创建对象的父对象
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

//将channel注册到Selector上
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }


}

从上面源码上分析Netty在设计Channel这个对象的时候主要做了如下几个功能:

  • 对JDK自带Channel进行了包装, 内部持有JDK Channel的引用
  • Channel对象作为中间层,读写操作会触发pipeline调用链。内部通过unsafe对象实现读写的调用。

Unsafe对象

  Unsafe对象作为channel的内部类,承担着channel网络相关的功能,比如具体的读写操作。其中AbstractUnsafe是AbstractChannel的内部类,部分源码如下:

 protected abstract class AbstractUnsafe implements Unsafe {

//下面是绑定方法的逻辑 传入的是SocketAddress  
        @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 确认当前channel已经注册
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
 //验证传入的参数
            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
//具体的绑定操作在doBind方法里执行 这个方法是channel的方法,也就是我们在上面NioServerSocketChannel 里分析的逻辑
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
//触发 active事件,在pipline链里传播
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }

//往外写数据的操作(这里只往缓存里写数据)
        @Override
        public final void write(Object msg, ChannelPromise promise) {
//验证是否已经注册并且react线程是否已经准备好
            assertEventLoop();
//ChannelOutboundBuffer  表示要往外写数据的缓存
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
//对需要写的数据进行过滤
                msg = filterOutboundMessage(msg);
//对需要写的数据进行大小的预估
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
//将数据增加到缓存中
            outboundBuffer.addMessage(msg, size, promise);
        }

// flush方法用于将数据写入到网络中
        @Override
        public final void flush() {
            assertEventLoop();
//往外发送的缓存对象不能为空
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            outboundBuffer.addFlush();
            flush0();
        }

//最终会调用到这个方法
        @SuppressWarnings("deprecation")
        protected void flush0() {
            if (inFlush0) {
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            inFlush0 = true;
//对channel的状态进行验证
            // Mark all pending write requests as failure if the channel is inactive.
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                    } else {
                        // Do not trigger channelWritabilityChanged because the channel is closed already.
                        outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
//会调用到Channel的doWrite方法,具体实现的源码可以看NioSocketChannel
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                if (t instanceof IOException && config().isAutoClose()) {
                    /**
                     * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                     * failing all flushed messages and also ensure the actual close of the underlying transport
                     * will happen before the promises are notified.
                     *
                     * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                     * may still return {@code true} even if the channel should be closed as result of the exception.
                     */
                    close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                } else {
                    try {
                        shutdownOutput(voidPromise(), t);
                    } catch (Throwable t2) {
                        close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    }
                }
            } finally {
                inFlush0 = false;
            }
        }

//开始读方法的逻辑
        @Override
        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
//会调用channel的doBeginRead方法 可以看上面AbstractNioChannel方法里的注释 
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

//注册方法,channel会往selector里注册关注的事件
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//验证数据
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
//调用下面的方法
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
//会调用到channel里的方法 可以参考上面的AbstractNioChannel类
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

}

Netty服务端接收客户端数据的调用流程

读数据的调用流程

  NioEventLoop的run方法会监控SelectionKey对象,当有读事件时,会调用unsafe对象的read()方法,在read方法的逻辑里会触发pipeline对象链的调用,最终调用到设置的各种ChannelHandler

写数据的调用流程

  通过Channel的writeAndFlush会调用到pipeline的writeAndFlush方法里,在pipeline的调用链里会调用链中的各种ChannelHandler(各以对需要写入的数据进行格式转换)最终通过HeadContext的write方法调用到unsafe里的write逻辑。这里只是把数据写入到ByteBuffer里。通过调用unsafe的flash方法才能最终将数据写入到网络中,也就是上面的分析过程。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.07.20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Unsafe对象
  • Netty服务端接收客户端数据的调用流程
    • 读数据的调用流程
      • 写数据的调用流程
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档