public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
之后serverChannel.register(selector, SelectionKey.OP_ACCEPT);
为服务端注册事件:
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel); // 根据Selector找到SelectionKey
if (k != null) {
k.interestOps(ops); // 更新注册事件
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att); // 关键, 注册事件
addKey(k); // 保存该SelectionKey
}
}
return k;
}
}
关键代码在于, 该通道会调用Selector来注册自己:
// SelectorImpl.java
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); // 创建SelectionKey
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k); // Selector注册该SelectionKey
}
k.interestOps(ops); // 向该SelectionKey注册事件
return k;
}
之后看原博客对SelectorImpl::implRegister和SelectorImpl::interestOps的解析即可. Channel注册Selector的操作, 其实转交给了Selector完成.
SelectorImpl监听套接字和注册事件的关键在于EPollArrayWrapper: EPollSelectorImpl内部维护了一个名为pollWrapper的EPollArrayWrapper, implRegister会调用pollWrapper.add(fd)
, interestOps会调用pollWrapper.setInterest(ch.getFDVal(), ops);
.
// EPollArrayWrapper.java
void add(int fd) {
// force the initial update events to 0 as it may be KILLED by a
// previous registration.
synchronized (updateLock) {
assert !registered.get(fd);
setUpdateEvents(fd, (byte)0, true);
}
}
private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
private Map<Integer,Byte> eventsHigh;
我们可以知道, eventsLow和eventsHigh共同存储"fd->events"的映射. 如果fd<MAX_UPDATE_ARRAY_SIZE
, 则存在eventsLow里, 否则存在eventsHigh的map里. 而且在刚添加时, 只注册数字0事件(应该是没有事件的意思吧).
上文k.interestOps(ops);
最终还是调用了EPollArrayWrapper::setInterest来为fd监听事件
void setInterest(int fd, int mask) {
synchronized (updateLock) {
// 如果updateDescriptors满了就扩容
int oldCapacity = updateDescriptors.length;
if (updateCount == oldCapacity) {
int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
int[] newDescriptors = new int[newCapacity];
System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
updateDescriptors = newDescriptors;
}
updateDescriptors[updateCount++] = fd;
// 将要监听的fd上的事件存储起来
byte b = (byte)mask;
assert (b == mask) && (b != KILLED);
setUpdateEvents(fd, b, false);
}
}
setUpdateEvents(fd, b, false);
上文说过, 会把事件存储在eventsLow
或eventsHigh
下.
此处需要留意updateCount
和`updateDescriptors的作用:
setInterest
时自增1, 可见该值指代发生监听事件设置的次数updateDescriptors[updateCount++] = fd;
可见, updateDescriptors存储每一个调用了setInterest
记录的fd.根据以上两步, 准备工作实际已经做好了. 接下来看轮询的代码
this.selector.select();
忽略中间代码, 会一路调用到EPollSelectImpl
:
// SelectorImpl.java
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout); // 关键
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys(); // 关键
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
int poll(long timeout) throws IOException {
updateRegistrations(); // 调用epollCtl更新事件监听
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); // native函数
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
updateRegistrations的源码不贴了, 直接说分析:
short events = getUpdateEvents(fd);
一句会从eventsLow或eventsHigh根据fd取出对应要的事件然后EPollSelectImpl::poll又调用native方法epollWait进行等待.
/**
* Update the keys whose fd's have been selected by the epoll.
* Add the ready keys to the ready queue.
* 更新selectedKeys以便之后this.selector.selectedKeys();使用
*/
private int updateSelectedKeys() {
int entries = pollWrapper.updated; // 之前native方法epollWait的返回值
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i); // pollWrapper是native数组, 也在epollWait中得到了更新
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
// 已经有该SelectionKey就更新事件
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
// 没有该SelectionKey则将其添加到selectedKeys
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
看注释就一目了然了, 该函数就是处理刚才epollWait调用中触发了注册事件的fd.
EPollArrayWrapper
的native函数epollCreate, epollCtl和epollWait. 虽然还有一些别的变量, 比如:
总结来看, EPollArrayWrapper
是epoll关键函数的封装.
EPollArrayWrapper
调用epollCtl完成事件注册与取消. 但并不是马上执行:
EPollSelectorImpl::processDeregisterQueue
中EPollArrayWrapper::poll
中的EPollArrayWrapper::updateRegistrations
EPollSelectorImpl::doSelect
调用, 所以事件的注册和取消是一种lazy机制 最后放一张uml图