专栏首页javathingsJava NIO 实现网络通信

Java NIO 实现网络通信

Java NIO 的相关资料很多,对 channel,buffer,selector 如何相关概念也有详细的阐述。但是,不亲自写代码调试一遍,对这些概念的理解仍然是一知半解。

即使代码跑起来,也不见得有多懂这些概念,因为只是肤浅的尝试,但肤浅的尝试胜过于纸上谈兵,至少迈出了第一步,后续深入,可能要等到真的有实际应用时,才会深入研究。先贴示例代码。

一个典型的服务端:

Server端代码

public class MyServer {
 
	private Selector selector;
	private ServerSocketChannel serverChannel;
 
	public void start() throws Exception {
		int port = 9527;
		// 创建选择器
		selector = Selector.open();
		// 打开监听通道
		serverChannel = ServerSocketChannel.open();
		// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
		serverChannel.configureBlocking(false);// 开启非阻塞模式
		// 绑定端口 backlog设为1024
		serverChannel.socket().bind(new InetSocketAddress(port), 1024);
		// 监听客户端连接请求
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		System.out.println("服务器已启动,端口号:" + port);
		while (true) {
			// 无论是否有读写事件发生,selector每隔1s被唤醒一次
			selector.select(1000);
			// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
			// selector.select();
			Set<SelectionKey> keys = selector.selectedKeys();
			Iterator<SelectionKey> it = keys.iterator();
			while (it.hasNext()) {
				SelectionKey key = it.next();
				it.remove();
				handleInput(key);
			}
		}
	}
 
	private void handleInput(SelectionKey key) throws Exception {
		if (key.isValid()) {
			// 处理新接入的请求消息
			if (key.isAcceptable()) {
				ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
				// 通过ServerSocketChannel的accept创建SocketChannel实例
				// 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
				SocketChannel sc = ssc.accept();
				// 设置为非阻塞的
				sc.configureBlocking(false);
				// 注册为读
				sc.register(selector, SelectionKey.OP_READ);
			}
			// 读消息
			if (key.isReadable()) {
				SocketChannel sc = (SocketChannel) key.channel();
				// 创建ByteBuffer,并开辟一个1M的缓冲区
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				// 读取请求码流,返回读取到的字节数
				int readBytes = sc.read(buffer);
				// 读取到字节,对字节进行编解码
				if (readBytes > 0) {
					// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
					buffer.flip();
					// 根据缓冲区可读字节数创建字节数组
					byte[] bytes = new byte[buffer.remaining()];
					// 将缓冲区可读字节数组复制到新建的数组中
					buffer.get(bytes);
					String input = new String(bytes, "UTF-8");
					System.out.println("服务器收到消息:" + input);
					// 发送应答消息
					doWrite(sc, LocalTime.now().toString());
				}
 
			} else if (key.isWritable()) {
				ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
				sendbuffer.clear();
				SocketChannel sc = (SocketChannel) key.channel();
				sc.write(sendbuffer);
			}
		}
	}
 
	// 异步发送应答消息
	private void doWrite(SocketChannel channel, String response) throws IOException {
		// 将消息编码为字节数组
		byte[] bytes = response.getBytes();
		// 根据数组容量创建ByteBuffer
		ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
		// 将字节数组复制到缓冲区
		writeBuffer.put(bytes);
		// flip操作
		writeBuffer.flip();
		// 发送缓冲区的字节数组
		channel.write(writeBuffer);
	}
}

启动Server端

public class Main {
 	public static void main(String[] args) throws Exception {
		MyServer myserver = new MyServer();
		myserver.start();
	}
}

关键代码:

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

把 channel 注册到 selector 上,如此,一个 selector 可以管理多个 channel。

selector.select(1000);

该方法功能就是阻塞直到该选择器中的通道所关注的事件就绪,最多阻塞 1000 毫秒,使得程序可以继续往下运行。

while(true){

while (it.hasNext()) {}

}

这样的循环结构,使得服务器不断的轮询是否有请求事件发生,如果有发生,则会获得这个请求中的 channel,并且往这个 channel 中读写数据。

handleInput 方法中,就是处理对应的请求。

客户端:

Client端代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
public class MyClient {
	private Selector selector;
	private SocketChannel socketChannel;
 
	public void start() throws Exception {
		int port = 9527;
		String host = "127.0.0.1";
		// 创建选择器
		selector = Selector.open();
		// 打开监听通道
		socketChannel = SocketChannel.open();
		// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
		socketChannel.configureBlocking(false);// 开启非阻塞模式
 
		socketChannel.connect(new InetSocketAddress(host, port));
		// 等待100毫秒直到连接上服务器
		while (!socketChannel.finishConnect()) {
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		socketChannel.register(selector, SelectionKey.OP_CONNECT);
 
		while (true) {
			try {
				// 无论是否有读写事件发生,selector每隔1s被唤醒一次
				selector.select(1000);
				// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
				// selector.select();
				Set<SelectionKey> keys = selector.selectedKeys();
				Iterator<SelectionKey> it = keys.iterator();
				SelectionKey key = null;
				while (it.hasNext()) {
					key = it.next();
					it.remove();
					try {
						handleInput(key);
					} catch (Exception e) {
						if (key != null) {
							key.cancel();
							if (key.channel() != null) {
								key.channel().close();
							}
						}
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
				System.exit(1);
			}
		}
 
	}
 
	private void handleInput(SelectionKey key) throws IOException {
		if (key.isValid()) {
			SocketChannel sc = (SocketChannel) key.channel();
			// 读消息
			if (key.isReadable()) {
				// 创建ByteBuffer,并开辟一个1M的缓冲区
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				// 读取请求码流,返回读取到的字节数
				int readBytes = sc.read(buffer);
				// 读取到字节,对字节进行编解码
				if (readBytes > 0) {
					// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
					buffer.flip();
					// 根据缓冲区可读字节数创建字节数组
					byte[] bytes = new byte[buffer.remaining()];
					// 将缓冲区可读字节数组复制到新建的数组中
					buffer.get(bytes);
					String result = new String(bytes, "UTF-8");
					System.out.println("客户端收到消息:" + result);
				}
			}
		}
	}
 
	// 异步发送消息
	private void doWrite(SocketChannel channel, String request) throws IOException {
		// 将消息编码为字节数组
		byte[] bytes = request.getBytes();
		// 根据数组容量创建ByteBuffer
		ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
		// 将字节数组复制到缓冲区
		writeBuffer.put(bytes);
		// flip操作
		writeBuffer.flip();
		// 发送缓冲区的字节数组
		channel.write(writeBuffer);
	}
 
	public void sendMsg(String msg) throws Exception {
		socketChannel.register(selector, SelectionKey.OP_READ);
		doWrite(socketChannel, msg);
	}
} 

启动Server端Client

public class Main {
 
	public static void main(String[] args) throws Exception {
		final MyClient myclient = new MyClient();
		FutureTask<Integer> Task2 = new FutureTask<>(() -> {
			myclient.start();
			return 0;
		});// 用FutureTask包裹
		Thread Thread2 = new Thread(Task2);// 用Thread包裹
		Thread2.start();
		
		Thread.sleep(1000);
		myclient.sendMsg("time");
		
	}
 
}

客户端代码和服务器端代码,原理是类似的。

上述例子中,先启动服务器端代码,然后启动客户端代码,就能跑起来。例子中,客户端发送任意字符到服务器端,服务器返回当前时间给客户端。

原创文章,转载请注明出处!http://www.javathings.top/java-nio实现网络通信/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java 的 NIO 是如何工作的?

    在这个数据爆炸的时代,有大量的数据在系统中流动,一个应用系统的瓶颈往往都是 IO 瓶颈。传统的 javaIO 模型是 BIO,也就是同步阻塞 IO,数据在写入 ...

    水货程序员
  • Java 面试题背诵手册

    ApplicationListener 和@EventListener 注解实现事件监听

    水货程序员
  • Java 中的线程池是什么 (面试必背)?

    这个文章不会涉及太深的线程知识(太深我也不懂)。这里只是把线程池的一些概念整理一下,当被问到这个题目的时候,尽可能背给面试官听就行了。

    水货程序员
  • C++临时变量的常量性

    在Linux环境使用g++编译,会出现: invalid initialization of non-const reference of type ‘std:...

    Dabelv
  • 如果非要在这份爱加一个期限,我希望是一万年

    阅读 101 童琪琳 在我很小很小的时候,我就认识一个人。 ? ? ? 她教我走路、说话,教给我做人的道理。 ? 在我童年的印象中,她是一个快乐的人, ? ...

    用户1279178
  • 机器学习day7-逻辑回归,分类问题

    逻辑回归处理的是分类问题,线性回归处理回归问题。两者都是采用极大似然估计对训练样本建模,线性回归使用最小二乘法,逻辑回归则是似然函数。

    Rare0716
  • 构建基于ServiceMesh的中台架构

    微服务架构中,随着数据量不断增大,吞吐量不断增加,业务越来越复杂,服务的个数会越来越多,分层会越来越细,除了数据服务层,还会衍生出业务服务层,前后端分离等各种层...

    架构师之路
  • 浏览器中玩人脸识别

    其实浏览器中的人脸识别 API 已经发布有一段时间了,从Chrome 70 版本以上就有了。其中包括了人脸,文本或 QR 码的识别,基本上覆盖了当前互联网应用的...

    IMWeb前端团队
  • 测试框架TestNG使用介绍

    在本期中,给大家分享一下TestNG测试框架的基础知识,使用TestNG的优点,TestNG的基本注解如何使用,套件、忽略、异常、依赖、参数化、超时等测试该如何...

    软测小生
  • 全面屏+区块链” 联想Z5这个大招友商怕是接不住了!

    前几天,联想集团副总裁、联想手机中国区产品研发负责人常程就发布一条微博,直怼“收购”传闻,劝吃瓜群众都散了,较量才刚刚开始,6月将有猛兽出笼,等着友商接招。没想...

    区块链领域

扫码关注云+社区

领取腾讯云代金券