netty是基于nio的基础上包装和拓展的,想要了解netty就先要了解它与nio之间的联系,那么NioServerSocketChannel就是首先要了解的类
int port = 9999;
ServerSocketChannel serverSocketChannel = null;
Selector selector = null;
InetSocketAddress localAddress = new InetSocketAddress(port);
Random rnd = new Random();
try {
/*创建选择器*/
selector = Selector.open();
/*创建服务器通道*/
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
/*设置监听服务器的端口,设置最大连接缓冲数为100*/
serverSocketChannel.bind(localAddress, 100);
/*服务器通道只能对tcp链接事件感兴趣*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e1) {
return;
}
/*服务器线程被中断后会退出*/
try{
while(selector.select() > 0){
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
/*防止下次select方法返回已处理过的通道*/
it.remove();
/*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/
try{
/*serverSocketChannel通道只能对链接事件感兴趣*/
if(key.isAcceptable()){
/*accept方法会返回一个普通通道,
每个通道在内核中都对应一个socket缓冲区*/
SocketChannel sc = serverSocketChannel.accept();
sc.configureBlocking(false);
/*向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区*/
int interestSet = SelectionKey.OP_READ;
sc.register(selector, interestSet, new Buffers(256, 256));
}
/*(普通)通道感兴趣读事件且有数据可读*/
if(key.isReadable()){
/*通过SelectionKey获取通道对应的缓冲区*/
Buffers buffers = (Buffers)key.attachment();
ByteBuffer readBuffer = buffers.getReadBuffer();
ByteBuffer writeBuffer = buffers.gerWriteBuffer();
/*通过SelectionKey获取对应的通道*/
SocketChannel sc = (SocketChannel) key.channel();
/*从底层socket读缓冲区中读入数据*/
sc.read(readBuffer);
readBuffer.flip();
/*设置通道写事件*/
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
...........
我们来看下上面的几步:
代码是:
selector = Selector.open();
Selector.open对应的代码是:
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
是通过SelectorProvider来打开selector的
代码是:
serverSocketChannel = ServerSocketChannel.open();
ServerSocketChannel.open()对应的代码是:
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
这里调用的也是SelectorProvider.provider()的openServerSocketChannel方法。
代码是:
serverSocketChannel.bind(localAddress, 100)
代码是:
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
在通道上对感兴趣的key进行注册。最后调用的是java.nio.channels.SelectableChannel.register(java.nio.channels.Selector, int)
while(selector.select() > 0){
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
/*防止下次select方法返回已处理过的通道*/
it.remove();
/*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/
try{
/*serverSocketChannel通道只能对链接事件感兴趣*/
if(key.isAcceptable()){
/*accept方法会返回一个普通通道,
每个通道在内核中都对应一个socket缓冲区*/
SocketChannel sc = serverSocketChannel.accept();
sc.configureBlocking(false);
/*向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区*/
int interestSet = SelectionKey.OP_READ;
sc.register(selector, interestSet, new Buffers(256, 256));
.........
/*(普通)通道感兴趣读事件且有数据可读*/
if(key.isReadable()){
/*通过SelectionKey获取通道对应的缓冲区*/
Buffers buffers = (Buffers)key.attachment();
ByteBuffer readBuffer = buffers.getReadBuffer();
ByteBuffer writeBuffer = buffers.gerWriteBuffer();
/*通过SelectionKey获取对应的通道*/
SocketChannel sc = (SocketChannel) key.channel();
/*从底层socket读缓冲区中读入数据*/
sc.read(readBuffer);
readBuffer.flip();
/*设置通道写事件*/
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
netty server的一般写法:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer(sslCtx));
Channel ch = b.bind(PORT).sync().channel();
System.out.println("Open your web browser and navigate to " +
(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress):
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
..........
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
System.out.println("channel:" + channel);
init(channel);
其中channelFactory的创建位置在bootstrap.channel方法即io.netty.bootstrap.AbstractBootstrap#channel方法里,具体代码如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
具体调用的是:io.netty.channel.ReflectiveChannelFactory#newChannel
最终构造的是:
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
这里的newSocket方法是:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
返回的是一个ServerSocketChannel对象。
调用重载的构造方法是:
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
NioServerSocketChannelConfig类是netty使用过程中比较重要的配置类。继续看super方法:
/**
* @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
*/
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
这里传入的parent为null,ch为ServerSocketChannel,readInterestOp为SelectionKey.OP_ACCEPT。继续往上看:
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
继续往下看:
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected ChannelId newId() {
return DefaultChannelId.newInstance();
}
private DefaultChannelId() {
data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
int i = 0;
// machineId
System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
i += MACHINE_ID.length;
// processId
i = writeInt(i, PROCESS_ID);
// sequence
i = writeInt(i, nextSequence.getAndIncrement());
// timestamp (kind of)
i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
// random
int random = PlatformDependent.threadLocalRandom().nextInt();
i = writeInt(i, random);
assert i == data.length;
hashCode = Arrays.hashCode(data);
}
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
对应的方法为:
它的父类中有相当多的方法,我们来看一下:
这些方法在后面源码分析中会来一一讲解它们的用途。
一些方法:
@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
@Override
protected SocketAddress localAddress0() {
return SocketUtils.localSocketAddress(javaChannel().socket());
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
实际上是通过serverSocketChannel.accept()返回一个SocketChannel,可以与上面的nio编程进行类比。
NioServerSocketChannel实际上是对nio的ServerSocketChannel的一些方法和使用进行了封装。