前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >NIO之Channel通道(二)-SelectableChannel、SocketChannel、ServerSocketChannel

NIO之Channel通道(二)-SelectableChannel、SocketChannel、ServerSocketChannel

作者头像
云飞扬
发布2022-04-25 14:20:43
5210
发布2022-04-25 14:20:43
举报
文章被收录于专栏:星汉技术星汉技术

NIO之Channel通道(二)-SelectableChannel、SocketChannel、ServerSocketChannel

1、SelectableChannel

SelectableChannel是一个抽象类,它实现了Channel接口,这个类比较特殊。

SelectableChannel可以被Selector用来多路复用,不过首先需要调用selectableChannel.configureBlocking(false)调整为非阻塞模式。

可选通道的使用大致过程如下:

  • 1.新建通道。
  • 2.将通道的事件注册到选择器Selector上。
  • 3.通过SelectKey获得需要处理的通道,然后对通道进行处理。

SelectableChannel的实现类,可以看出主要分为几大块:

  • 有关UDP协议的:DatagramChannel。
  • 有关SCTP协议的:SctpChannel、SctpMultiChannel、SctpServerChannel。
  • 有关TCP协议的:ServerSocketChannel、SocketChannel。
  • 有关管道的:SinkChannel、SourceChannel这两个抽象类定义在java.nio.channels.Pipe类中。

1.1重要方法

1.1.1register()

注册通道到选择器。在ServerSocketChannle和SocketChannel上提供了register方法来实现注册,通过SelectionKey来实现选择。

此方法有两个重载:

  • SelectionKey register(Selector sel, int ops)
  • SelectionKey register(Selector sel, int ops, Object att)

参数释义:

  • 第一个参数代表要注册的Selector实例。
  • 第二个参数代表本通道感兴趣的操作,这些都定义在SelectionKey类中。
  • 第三个参数Object att是注册时的附件,也就是可以在注册的时候带点什么东西过去。
1.1.2isRegistered()

获取通道是否注册到选择器,新创建的通道没有注册,返回true表示已经注册,false表示没有注册。

1.1.3configureBlocking(boolean)

这个是设置channel的阻塞模式的,true代表block;false为non-block。一般用non-block配合Selector多路复用

1.1.4isBlocking()

检测当前通道的阻塞状态。默认直接返回Channel.blocking属性的值。

2、SocketCannel

用于Socket的TCP连接的数据读写,既可以从Channel读数据,也可以向Channle中写入数据。

2.1重要方法

2.1.1open()

获取SocketCannel通道。

有两个重载:

  • open()
  • open(SocketAddress)
2.1.2validOps()

返回一个本通道支持的操作方式,支持三种,读、写连接。

2.1.3bind(SocketAddress)

绑定一个本地的套接字地址。

2.1.4setOption(SocketOption<T>, T)

设置套接字的操作方式。

2.1.5shutdownInput()

停止当前连接的读操作,并且关闭通道。

2.1.6shutdownOutput()

停止当前连接的写操作,并且关闭通道。

2.1.7socket()

获取一个和当前通道有关的套接字。

2.1.8isConnected()

判断是否建立了连接。

2.1.9isConnectionPending()

当前通道上是否存在正在连接的操作。

2.1.10connect(SocketAddress)

建立连接。

2.1.11finishConnect()

完成连接。

2.1.12getRemoteAddress()

返回一个此通道的套接字已经连接的远程地址。

2.1.13read()

读数据。

此方法有三个重载。

  • read(ByteBuffer)
  • read(ByteBuffer[], int, int)
  • read(ByteBuffer[])
2.1.14write()

写数据。

此方法有三个重载。

  • write(ByteBuffer)
  • write(ByteBuffer[], int, int)
  • write(ByteBuffer[])
2.1.15getLocalAddress()

获取此通道的套接字绑定的地址。

2.2案例

Socket客户端代码:

代码语言:javascript
复制
package xyz.xujd.testcase.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class SocketChannelTest {

	public static void main(String[] args) {
		try {
			SocketChannel sc = SocketChannel.open();
			sc.configureBlocking(true);
			System.out.println(sc.connect(new InetSocketAddress("localhost", 8090)));
			ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
			while (true) {
				int count = 0;
				while ((count = sc.read(byteBuffer)) > 0) {
					byteBuffer.flip();
					while (byteBuffer.hasRemaining()) {
						System.out.println((char) byteBuffer.get());
					}
					byteBuffer.clear();
				}
				sc.write(byteBuffer.put("return".getBytes()));
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

3、ServerSocketChannel

针对面向流的侦听套接字的可选择通道。多个并发线程可安全地使用服务器套接字通道。

通过ServerSocketChannel可以监听TCP连接,服务端监听到连接之后,会为每个请求创建一个SocketChannel。

3.1重要方法

3.1.1accept()

接受连接。

3.1.2bind()

将通道的套接字与本地地址绑定,并且配置套接字监听连接。

此方法有两个重载。

  • bind(SocketAddress)
  • bind(SocketAddress, int)
3.1.3open()

获取ServerSocketChannel通道。

3.1.4socket()

获取服务器关联的套接字。

3.1.5validOps()

返回一个操作集,表示次通道支持的操作。

3.1.6setOption(SocketOption<T>, T)

设置服务器操作集。

3.1.7getLocalAddress()

获取服务器套接字关联的地址。

3.2案例

Socket服务端代码:

代码语言:javascript
复制
package xyz.xujd.testcase.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class ServerSocketChannelTest {

	private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

	public static void main(String[] args) {
		ServerSocketChannelTest tssc = new ServerSocketChannelTest();
		tssc.run();
	}

	public void run() {
		try {
			// 创建一个选择器。
			Selector selector = Selector.open();
			// 打开一个通道。
			ServerSocketChannel ssc = ServerSocketChannel.open();
			// 设置为非阻塞模式
			ssc.configureBlocking(false);
			// 在通道上绑定一个地址
			ssc.socket().bind(new InetSocketAddress(8090));
			// 将通道注册到选择器上,并设置选择器的时间是OP_ACCET
			// 一个通道可以注册到多个选择器上,但是一个通道在一个选择器上只能注册一次。
			// 不同的通道在选择器中注册的事件不一样,第二个参数是有限制的,由channel类型决定。
			ssc.register(selector, SelectionKey.OP_ACCEPT);
			// 服务器端准备就绪。
			System.out.println("------------------服务器开始等待客户端连接----------------------------------------");
			while (true) {
				// 选择器阻塞,等待事件到来,事件不来就阻塞,事件到来返回时间到来的通道个数。
				int i = selector.select();
				if (i == 0) {
					continue;
				}
				// 遍历事件。
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext()) {
					SelectionKey selectionKey = it.next();
					// 如果是OP_ACCEPT,表示这个事件是ServerSocketChannel的事件,因为SocketChannel没有这个事件。
					if (selectionKey.isAcceptable()) {
						ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
						// 建立连接
						SocketChannel channel = server.accept();
						// 将建立的链接设置为准备读,并注册到选择器上。
						registerChannel(selector, channel, SelectionKey.OP_READ);
						// 向建立的通道写入内容。
						hello(channel);
					}
					// 如果可读时间,没有在ServerScoketChannel上注册可读,值在SokectChannel上注册了可读。
					if (selectionKey.isReadable()) {
						// 从通道中读取数据,只有通道中有数据,才会进入这个条件中。
						readFromSocket(selectionKey);
					}
					// 此时只是将当前出发的事件从list中移除,在下一次选择时,还会查看改通道是否有操作事件发生。
					it.remove();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 读取数据
	 * 
	 * @param selectionKey
	 * @throws IOException
	 */
	private void readFromSocket(SelectionKey selectionKey) throws IOException {
		SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
		buffer.clear();
		int count = 0;
		// 当有数据可读时,第一次读到的数据不应该是0,它会读取到0或-1
		while ((count = socketChannel.read(buffer)) > 0) {
			buffer.flip();
			// 将数据回写到通道中
			while (buffer.hasRemaining()) {
				socketChannel.write(buffer);
			}
			buffer.clear();
		}
		if (count < 0) {
			socketChannel.close();
		}
	}

	/**
	 * 向通道中写入数据。
	 *
	 * @param channel
	 * @throws IOException
	 */
	private void hello(SocketChannel channel) throws IOException {
		buffer.clear();
		buffer.put("Hello".getBytes());
		buffer.flip();
		channel.write(buffer);
	}

	/**
	 * 将channel注册到选择器上。
	 * 
	 * @param selector 选择器
	 * @param channel  通道
	 * @param opRead   操作
	 * @throws IOException
	 */
	private void registerChannel(Selector selector, SocketChannel channel, int opRead) throws IOException {
		if (channel == null) {
			return;
		}
		channel.configureBlocking(false);
		channel.register(selector, opRead);
	}

}

4、测试阻塞

利用ServerSocketChannel、SocketChannel实现NIO方式先的TCP通信的类,在非阻塞模式下ACCEPT CONNECT READ WRITE方法都不产生阻塞。

4.1ACCEPT的非阻塞验证

代码语言:javascript
复制
	public static void main(String[] args) throws IOException {
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);
		ssc.bind(new InetSocketAddress(44444));
		ssc.accept();
	}

执行程序,执行过程一闪而过,没有出现阻塞。

4.2CONNECT的非阻塞验证

代码语言:javascript
复制
	public static void main(String[] args) throws IOException {
		SocketChannel sc = SocketChannel.open();
		sc.configureBlocking(false);
		sc.connect(new InetSocketAddress("127.0.0.1", 44444));
	}

执行程序,执行过程一闪而过,没有出现阻塞。

4.3READ的非阻塞验证

服务端:

代码语言:javascript
复制
	public static void main(String[] args) throws IOException {
		// 创建通道
		ServerSocketChannel ssc = ServerSocketChannel.open();
		// 开启非阻塞模式
		ssc.configureBlocking(false);
		// 绑定监听端口
		ssc.bind(new InetSocketAddress(44444));
		SocketChannel sc = null;
		// 确保已经建立连接
		while (sc == null) {
			sc = ssc.accept();
		}
		// 开启非阻塞模式
		sc.configureBlocking(false);
		// 创建缓存区
		ByteBuffer buf = ByteBuffer.allocate(5);
		// 读取数据
		sc.read(buf);
	}

客户端:

代码语言:javascript
复制
	public static void main(String[] args) throws IOException {
		// 创建通道
		SocketChannel sc = SocketChannel.open();
		// 开启非阻塞模式
		sc.configureBlocking(false);
		// 建立连接,确定连接目标,返回是否连接成功
		boolean isConn = sc.connect(new InetSocketAddress("127.0.0.1", 44444));
		// 如果没有连接成功,重复连接,直到连接建立
		while (!isConn) {
			isConn = sc.finishConnect();
		}
		// 模拟写阻塞
		while (true) {
		}
	}

首先执行服务端代码,然后执行客户端代码,当客户端代码执行过后,服务端代码就结束执行了,说明服务端代码读操作不阻塞。

4.4WRITE的非阻塞验证

客户端:

代码语言:javascript
复制
	public static void main(String[] args) throws IOException {
		// 创建通道
		SocketChannel sc = SocketChannel.open();
		// 开启非阻塞模式
		sc.configureBlocking(false);
		// 建立连接,确定连接目标,返回是否连接成功
		boolean isConn = sc.connect(new InetSocketAddress("127.0.0.1", 44444));
		// 如果没有连接成功,重复连接,直到连接建立
		while (!isConn) {
			isConn = sc.finishConnect();
		}
		// 写操作计数
		int conut=0;
		while (true) {
			//模拟写操作
			int i=sc.write(ByteBuffer.wrap("a".getBytes()));
			conut+=i;
			System.out.println(conut);
		}
	}

服务器端:

代码语言:javascript
复制
	public static void main(String[] args) throws IOException {
		// 创建通道
		ServerSocketChannel ssc = ServerSocketChannel.open();
		// 开启非阻塞模式
		ssc.configureBlocking(false);
		// 绑定监听端口
		ssc.bind(new InetSocketAddress(44444));
		SocketChannel sc = null;
		// 确保已经建立连接
		while (sc == null) {
			sc = ssc.accept();
		}
		// 开启非阻塞模式
		sc.configureBlocking(false);
		//模拟读操作阻塞
		while(true){}
	}

执行服务器端代码,执行客户端代码,可以看到客户端一直在输出数字,大概到两万三千多的时候一直不停的打印出这个数字,而服务器什么都没有打印,说明没有接收,但是客户端仍然可以一直写,证明写操作也没有阻塞。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/04/16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • NIO之Channel通道(二)-SelectableChannel、SocketChannel、ServerSocketChannel
    • 1、SelectableChannel
      • 1.1重要方法
    • 2、SocketCannel
      • 2.1重要方法
      • 2.2案例
    • 3、ServerSocketChannel
      • 3.1重要方法
      • 3.2案例
    • 4、测试阻塞
      • 4.1ACCEPT的非阻塞验证
      • 4.2CONNECT的非阻塞验证
      • 4.3READ的非阻塞验证
      • 4.4WRITE的非阻塞验证
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档