一个简单的服务端代码:
public class SimpleServer { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(boss,worker) .channel(NioServerSocketChannel.class) .handler(new ServerHandler()) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { } }); ChannelFuture f = b.bind(8000).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } private static class ServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded"); } } }
我们从bind(...)方法开始
AbstractBootstrap.java /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
然后我们调了doBind()方法:
AbstractBootstrap.java private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//初始化及注册 final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise);//绑定端口 return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
我们继续断点进initAndRegister();这个方法内部
AbstractBootstrap.java final ChannelFuture initAndRegister() { Channel channel = null; try { //这里的channelFactory是ReflectiveChannelFactory channel = channelFactory.newChannel(); //初始化参数 init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
initAndRegister()
做了几件事情
newChannel();
我们先来分析一个
ReflectiveChannelFactory.java @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
可以看到里面是通过反射创建了一个NioServerSocketChannel 这个对象
我们再来看看NioServerSocketChannel
NioServerSocketChannel.java public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } /** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
这里调用了父类的构造器
我们接下来跟踪一下父类
AbstractNioMessageChannel.java /** * @see {@link AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)} */ protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } AbstractNioChannel.java /** * Create a new instance * * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} * @param ch the underlying {@link SelectableChannel} on which it operates * @param readInterestOp the ops to set to receive data from the {@link SelectableChannel} */ protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } AbstractChannel.java /** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protected AbstractChannel(Channel parent) { this.parent = parent; // 创建 ChannelId 对象 id = newId(); // 创建 Unsafe 对象 unsafe = newUnsafe(); // 创建 DefaultChannelPipeline 对象 pipeline = newChannelPipeline(); }
通过不断跟踪我们把Channel对象创建完毕 , 它总共包含一下几个核心组件:
接下来我们回到initAndRegister()
这个方法,继续往下分析int(channel)
这个方法
ServerBootstrap.java @Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler. // In this case the initChannel(...) method will only be called after this method returns. Because // of this we need to ensure we add our handler in a delayed fashion so all the users handler are // placed in front of the ServerBootstrapAcceptor. ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
看到这个方法, 我们大概知道以下几件事:
接下来我们再分析
ChannelFuture regFuture = config().group().register(channel);
通过断点可以看到调用到:
SingleThreadEventLoop.java @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
在调用到:
AbstractChannel.java @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
这段这么长的代码,其实重点就两行代码:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { //... AbstractChannel.this.eventLoop = eventLoop; //... register0(promise); //... }
我们断点进入到register0
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) {// 确保 Channel 是打开的 return; } // 记录是否为首次注册 boolean firstRegistration = neverRegistered; // 执行注册逻辑 doRegister(); // 标记首次注册为 false neverRegistered = false; // 标记 Channel 为已注册 registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); // 回调通知 `promise` 执行成功 safeSetSuccess(promise); // 触发通知已注册事件 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
最终我们在doRegister()
里面看到了java NIO的注册方法
AbstractNioChannel.java @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
至此我们分析完了doBind()
里面的initAndRegister()
,
接下来我们分析doBind0(regFuture, channel, localAddress, promise);
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { //注册成功,绑定端口 if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); //注册失败,回调通知promise异常 } else { promise.setFailure(regFuture.cause()); } } }); }
我们继续断点跟踪bind()
方法:
最后我们调用到AbstractUnsafe 的bind()
方法
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { // 判断是否在 EventLoop 的线程中。 assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } // 记录 Channel 是否激活 boolean wasActive = isActive(); try { // 绑定 Channel 的端口 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // 若 Channel 是新激活的,触发通知 Channel 已激活的事件。 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } // 回调通知 promise 执行成功 safeSetSuccess(promise); }
在dobind()
方法里面终于用java NIO绑定了端口
NioServerSocketChannel.java @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
在invokeLater
里面加入了异步执行过程,通过提交一个新任务到EventLoop的线程中
private void invokeLater(Runnable task) { try { // This method is used by outbound operation implementations to trigger an inbound event later. // They do not trigger an inbound event immediately because an outbound operation might have been // triggered by another inbound event handler method. If fired immediately, the call stack // will look like this for example: // // handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection. // -> handlerA.ctx.close() // -> channel.unsafe.close() // -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet // // which means the execution of two inbound handler methods of the same handler overlap undesirably. eventLoop().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } }
通过断点跟踪,可以到AbstractUnsafe#beginRead()
方法
@Override public final void beginRead() { // 判断是否在 EventLoop 的线程中。 assertEventLoop(); // Channel 必须激活 if (!isActive()) { return; } // 执行开始读取 try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
doBeginRead()
方法,执行开始读取,对于NioServerSocketChannel 来说,实现如下:
AbstractNioMessageChannel.java @Override protected void doBeginRead() throws Exception { if (inputShutdown) { return; } super.doBeginRead(); } AbstractNioChannel.java @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
selectionKey.interestOps()
方法调用后,服务端可以开始处理客户端的连接事件.
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句