Java NIO 实现网络通信

Java NIO 的相关资料很多,对 channel,buffer,selector 如何相关概念也有详细的阐述。但是,不亲自写代码调试一遍,对这些概念的理解仍然是一知半解。

即使代码跑起来,也不见得有多懂这些概念,因为只是肤浅的尝试,但肤浅的尝试胜过于纸上谈兵,至少迈出了第一步,后续深入,可能要等到真的有实际应用时,才会深入研究。先贴示例代码。

一个典型的服务端:

Server端代码

public class MyServer {
 
	private Selector selector;
	private ServerSocketChannel serverChannel;
 
	public void start() throws Exception {
		int port = 9527;
		// 创建选择器
		selector = Selector.open();
		// 打开监听通道
		serverChannel = ServerSocketChannel.open();
		// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
		serverChannel.configureBlocking(false);// 开启非阻塞模式
		// 绑定端口 backlog设为1024
		serverChannel.socket().bind(new InetSocketAddress(port), 1024);
		// 监听客户端连接请求
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		System.out.println("服务器已启动,端口号:" + port);
		while (true) {
			// 无论是否有读写事件发生,selector每隔1s被唤醒一次
			selector.select(1000);
			// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
			// selector.select();
			Set<SelectionKey> keys = selector.selectedKeys();
			Iterator<SelectionKey> it = keys.iterator();
			while (it.hasNext()) {
				SelectionKey key = it.next();
				it.remove();
				handleInput(key);
			}
		}
	}
 
	private void handleInput(SelectionKey key) throws Exception {
		if (key.isValid()) {
			// 处理新接入的请求消息
			if (key.isAcceptable()) {
				ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
				// 通过ServerSocketChannel的accept创建SocketChannel实例
				// 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
				SocketChannel sc = ssc.accept();
				// 设置为非阻塞的
				sc.configureBlocking(false);
				// 注册为读
				sc.register(selector, SelectionKey.OP_READ);
			}
			// 读消息
			if (key.isReadable()) {
				SocketChannel sc = (SocketChannel) key.channel();
				// 创建ByteBuffer,并开辟一个1M的缓冲区
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				// 读取请求码流,返回读取到的字节数
				int readBytes = sc.read(buffer);
				// 读取到字节,对字节进行编解码
				if (readBytes > 0) {
					// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
					buffer.flip();
					// 根据缓冲区可读字节数创建字节数组
					byte[] bytes = new byte[buffer.remaining()];
					// 将缓冲区可读字节数组复制到新建的数组中
					buffer.get(bytes);
					String input = new String(bytes, "UTF-8");
					System.out.println("服务器收到消息:" + input);
					// 发送应答消息
					doWrite(sc, LocalTime.now().toString());
				}
 
			} else if (key.isWritable()) {
				ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
				sendbuffer.clear();
				SocketChannel sc = (SocketChannel) key.channel();
				sc.write(sendbuffer);
			}
		}
	}
 
	// 异步发送应答消息
	private void doWrite(SocketChannel channel, String response) throws IOException {
		// 将消息编码为字节数组
		byte[] bytes = response.getBytes();
		// 根据数组容量创建ByteBuffer
		ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
		// 将字节数组复制到缓冲区
		writeBuffer.put(bytes);
		// flip操作
		writeBuffer.flip();
		// 发送缓冲区的字节数组
		channel.write(writeBuffer);
	}
}

启动Server端

public class Main {
 	public static void main(String[] args) throws Exception {
		MyServer myserver = new MyServer();
		myserver.start();
	}
}

关键代码:

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

把 channel 注册到 selector 上,如此,一个 selector 可以管理多个 channel。

selector.select(1000);

该方法功能就是阻塞直到该选择器中的通道所关注的事件就绪,最多阻塞 1000 毫秒,使得程序可以继续往下运行。

while(true){

while (it.hasNext()) {}

}

这样的循环结构,使得服务器不断的轮询是否有请求事件发生,如果有发生,则会获得这个请求中的 channel,并且往这个 channel 中读写数据。

handleInput 方法中,就是处理对应的请求。

客户端:

Client端代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
public class MyClient {
	private Selector selector;
	private SocketChannel socketChannel;
 
	public void start() throws Exception {
		int port = 9527;
		String host = "127.0.0.1";
		// 创建选择器
		selector = Selector.open();
		// 打开监听通道
		socketChannel = SocketChannel.open();
		// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
		socketChannel.configureBlocking(false);// 开启非阻塞模式
 
		socketChannel.connect(new InetSocketAddress(host, port));
		// 等待100毫秒直到连接上服务器
		while (!socketChannel.finishConnect()) {
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		socketChannel.register(selector, SelectionKey.OP_CONNECT);
 
		while (true) {
			try {
				// 无论是否有读写事件发生,selector每隔1s被唤醒一次
				selector.select(1000);
				// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
				// selector.select();
				Set<SelectionKey> keys = selector.selectedKeys();
				Iterator<SelectionKey> it = keys.iterator();
				SelectionKey key = null;
				while (it.hasNext()) {
					key = it.next();
					it.remove();
					try {
						handleInput(key);
					} catch (Exception e) {
						if (key != null) {
							key.cancel();
							if (key.channel() != null) {
								key.channel().close();
							}
						}
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
				System.exit(1);
			}
		}
 
	}
 
	private void handleInput(SelectionKey key) throws IOException {
		if (key.isValid()) {
			SocketChannel sc = (SocketChannel) key.channel();
			// 读消息
			if (key.isReadable()) {
				// 创建ByteBuffer,并开辟一个1M的缓冲区
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				// 读取请求码流,返回读取到的字节数
				int readBytes = sc.read(buffer);
				// 读取到字节,对字节进行编解码
				if (readBytes > 0) {
					// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
					buffer.flip();
					// 根据缓冲区可读字节数创建字节数组
					byte[] bytes = new byte[buffer.remaining()];
					// 将缓冲区可读字节数组复制到新建的数组中
					buffer.get(bytes);
					String result = new String(bytes, "UTF-8");
					System.out.println("客户端收到消息:" + result);
				}
			}
		}
	}
 
	// 异步发送消息
	private void doWrite(SocketChannel channel, String request) throws IOException {
		// 将消息编码为字节数组
		byte[] bytes = request.getBytes();
		// 根据数组容量创建ByteBuffer
		ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
		// 将字节数组复制到缓冲区
		writeBuffer.put(bytes);
		// flip操作
		writeBuffer.flip();
		// 发送缓冲区的字节数组
		channel.write(writeBuffer);
	}
 
	public void sendMsg(String msg) throws Exception {
		socketChannel.register(selector, SelectionKey.OP_READ);
		doWrite(socketChannel, msg);
	}
} 

启动Server端Client

public class Main {
 
	public static void main(String[] args) throws Exception {
		final MyClient myclient = new MyClient();
		FutureTask<Integer> Task2 = new FutureTask<>(() -> {
			myclient.start();
			return 0;
		});// 用FutureTask包裹
		Thread Thread2 = new Thread(Task2);// 用Thread包裹
		Thread2.start();
		
		Thread.sleep(1000);
		myclient.sendMsg("time");
		
	}
 
}

客户端代码和服务器端代码,原理是类似的。

上述例子中,先启动服务器端代码,然后启动客户端代码,就能跑起来。例子中,客户端发送任意字符到服务器端,服务器返回当前时间给客户端。

原创文章,转载请注明出处!http://www.javathings.top/java-nio实现网络通信/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏SAP最佳业务实践

SAP最佳业务实践:SD–含客户预付款的销售订单处理(201)-5发货

VL10C交货处理 在此活动中,创建交货。 后勤 ®后勤执行 ® 外向处理 ® 外向交货的发货 ® 外向交货 ® 创建 ® 交货凭证到期日的集中处理 ®销售订单...

4076
来自专栏架构师之旅

【Java SE】Java NIO系列教程(八) SocketChannel

英文:Jakob Jenkov 译文:ifeve - 郑玉婷 链接:http://ifeve.com/socket-channel/ Java NIO中的Soc...

2117
来自专栏JavaEdge

Tomcat架构解析之3 Connector NIOAcceptorPollerWorkerNioSelectorPool

2864
来自专栏清晨我上码

第六节 netty前传-NIO Selector

可以使用单个线程来处理多个channel来节省资源。对于操作系统而言,线程之间切换是昂贵的,并且每个线程也占用操作系统中的一些资源(存储器)。 因此,使用的线程...

1472
来自专栏女程序员的日常

STM8S——Analog/digital converter (ADC)

1、ADC1 and ADC2 are 10-bit successive approximation Anolog to Digital Converters...

3361
来自专栏日常分享

NIO 服务端TCP连接管理的方案

   因为服务端与客户端实现的是长连接,所以需要对客户端的连接情况进行监控,防止无效连接占用资源。

1185
来自专栏xdecode

Netty与传统Server对比

前言 本文旨在介绍传统Socket服务端与NIO服务端的差异. 以餐厅服务员简单举例,每个客人对应一个请求. 传统Socket / OIO 1 public ...

2557
来自专栏蓝天

基于zookeeper的主备切换方法

继承CZookeeperHelper即可快速实现主备切换: https://github.com/eyjian/mooon/blob/master/mooo...

1442
来自专栏大数据架构

Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式

2005
来自专栏chenssy

【死磕Netty】-----NIO基础详解

原文出处http://cmsblogs.com/ 『chenssy』 转载请注明原创出处,谢谢! Netty 是基于Java NIO 封装的网络通讯框架,只有充...

4726

扫码关注云+社区

领取腾讯云代金券