前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java nio 源码分析1 事件注册

java nio 源码分析1 事件注册

作者头像
平凡的学生族
发布2019-12-19 19:14:35
8670
发布2019-12-19 19:14:35
举报
文章被收录于专栏:后端技术后端技术

参考

代码语言:javascript
复制
public static Selector open() throws IOException {
  return SelectorProvider.provider().openSelector();
}
  • linux中是EPollSelectorProvider, 该openSelector方法为:
代码语言:javascript
复制
public AbstractSelector openSelector() throws IOException {
    return new EPollSelectorImpl(this);
}

之后serverChannel.register(selector, SelectionKey.OP_ACCEPT);为服务端注册事件:

代码语言:javascript
复制
public final SelectionKey register(Selector sel, int ops,
                                   Object att)
    throws ClosedChannelException
{
    synchronized (regLock) {
        if (!isOpen())
            throw new ClosedChannelException();
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (blocking)
            throw new IllegalBlockingModeException();
        SelectionKey k = findKey(sel);  // 根据Selector找到SelectionKey
        if (k != null) {
            k.interestOps(ops);  // 更新注册事件
            k.attach(att);
        }
        if (k == null) {
            // New registration
            synchronized (keyLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                k = ((AbstractSelector)sel).register(this, ops, att);  // 关键, 注册事件
                addKey(k);  // 保存该SelectionKey
            }
        }
        return k;
    }
}

关键代码在于, 该通道会调用Selector来注册自己:

代码语言:javascript
复制
// SelectorImpl.java
protected final SelectionKey register(AbstractSelectableChannel ch,
                                      int ops,
                                      Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);  // 创建SelectionKey
    k.attach(attachment);
    synchronized (publicKeys) {
        implRegister(k);  // Selector注册该SelectionKey
    }
    k.interestOps(ops);  // 向该SelectionKey注册事件
    return k;
}

之后看原博客对SelectorImpl::implRegister和SelectorImpl::interestOps的解析即可. Channel注册Selector的操作, 其实转交给了Selector完成.

SelectorImpl监听套接字和注册事件的关键在于EPollArrayWrapper: EPollSelectorImpl内部维护了一个名为pollWrapper的EPollArrayWrapper, implRegister会调用pollWrapper.add(fd), interestOps会调用pollWrapper.setInterest(ch.getFDVal(), ops);.

EPollArrayWrapper

添加fd

代码语言:javascript
复制
//  EPollArrayWrapper.java
void add(int fd) {
    // force the initial update events to 0 as it may be KILLED by a
    // previous registration.
    synchronized (updateLock) {
        assert !registered.get(fd);
        setUpdateEvents(fd, (byte)0, true);
    }
}
代码语言:javascript
复制
private void setUpdateEvents(int fd, byte events, boolean force) {
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        if ((eventsLow[fd] != KILLED) || force) {
            eventsLow[fd] = events;
        }
    } else {
        Integer key = Integer.valueOf(fd);
        if (!isEventsHighKilled(key) || force) {
            eventsHigh.put(key, Byte.valueOf(events));
        }
    }
}
代码语言:javascript
复制
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
private Map<Integer,Byte> eventsHigh;

我们可以知道, eventsLow和eventsHigh共同存储"fd->events"的映射. 如果fd<MAX_UPDATE_ARRAY_SIZE, 则存在eventsLow里, 否则存在eventsHigh的map里. 而且在刚添加时, 只注册数字0事件(应该是没有事件的意思吧).

监听fd

上文k.interestOps(ops);最终还是调用了EPollArrayWrapper::setInterest来为fd监听事件

代码语言:javascript
复制
void setInterest(int fd, int mask) {
    synchronized (updateLock) {
        // 如果updateDescriptors满了就扩容
        int oldCapacity = updateDescriptors.length;
        if (updateCount == oldCapacity) {
            int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
            int[] newDescriptors = new int[newCapacity];
            System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
            updateDescriptors = newDescriptors;
        }
        updateDescriptors[updateCount++] = fd;

        // 将要监听的fd上的事件存储起来
        byte b = (byte)mask;
        assert (b == mask) && (b != KILLED);
        setUpdateEvents(fd, b, false);
    }
}

setUpdateEvents(fd, b, false);上文说过, 会把事件存储在eventsLoweventsHigh下. 此处需要留意updateCount和`updateDescriptors的作用:

  1. updateCount只在setInterest时自增1, 可见该值指代发生监听事件设置的次数
  2. updateDescriptors[updateCount++] = fd;可见, updateDescriptors存储每一个调用了setInterest记录的fd.

根据以上两步, 准备工作实际已经做好了. 接下来看轮询的代码

在selector.select轮询中的体现

this.selector.select(); 忽略中间代码, 会一路调用到EPollSelectImpl:

代码语言:javascript
复制
// SelectorImpl.java
protected int doSelect(long timeout) throws IOException {
    if (closed)
        throw new ClosedSelectorException();
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(timeout);  // 关键
    } finally {
        end();
    }
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();  // 关键
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

poll

代码语言:javascript
复制
int poll(long timeout) throws IOException {
        updateRegistrations();  // 调用epollCtl更新事件监听
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);  // native函数
        for (int i=0; i<updated; i++) {
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }

updateRegistrations的源码不贴了, 直接说分析:

  1. updateRegistrations会根据updateCount取出updateDescriptors中的每一项
  2. short events = getUpdateEvents(fd);一句会从eventsLow或eventsHigh根据fd取出对应要的事件
  3. registered会记录的fd是否被注册过
  4. 根据情况决定要注册的事件, 最终由native方法epollCtl完成

然后EPollSelectImpl::poll又调用native方法epollWait进行等待.

updateSelectedKeys

代码语言:javascript
复制
/**
 * Update the keys whose fd's have been selected by the epoll.
 * Add the ready keys to the ready queue.
 * 更新selectedKeys以便之后this.selector.selectedKeys();使用
 */
private int updateSelectedKeys() {
    int entries = pollWrapper.updated;  // 之前native方法epollWait的返回值
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        int nextFD = pollWrapper.getDescriptor(i);  // pollWrapper是native数组, 也在epollWait中得到了更新
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        // ski is null in the case of an interrupt
        if (ski != null) {
            int rOps = pollWrapper.getEventOps(i);
            if (selectedKeys.contains(ski)) {
                // 已经有该SelectionKey就更新事件
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                // 没有该SelectionKey则将其添加到selectedKeys
                ski.channel.translateAndSetReadyOps(rOps, ski);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    return numKeysUpdated;
}

看注释就一目了然了, 该函数就是处理刚才epollWait调用中触发了注册事件的fd.

总结

  1. java nio的底层机制是epoll
  2. 其底层关键方法是来自EPollArrayWrapper的native函数epollCreate, epollCtl和epollWait. 虽然还有一些别的变量, 比如:
    • pollArray用来接收触发事件的fd
    • updateCount用来接收epollWait的返回值

    总结来看, EPollArrayWrapper是epoll关键函数的封装.

  3. 顶层Channel的register和cancel等调用都会转交给Selector执行. 而Selector最终都会借EPollArrayWrapper调用epollCtl完成事件注册与取消. 但并不是马上执行:
    • 取消事件的处理在EPollSelectorImpl::processDeregisterQueue
    • 更新注册事件的处理在EPollArrayWrapper::poll中的EPollArrayWrapper::updateRegistrations
    • 以上两者都在EPollSelectorImpl::doSelect调用, 所以事件的注册和取消是一种lazy机制

最后放一张uml图

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参考
  • EPollArrayWrapper
    • 添加fd
    • 监听fd
      • 在selector.select轮询中的体现
        • poll
        • updateSelectedKeys
    • 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档