通过聊天室项目的演化。介绍BIO的基本用法与优缺点。
image.png
echo聊天室
greet from socket.
echo from server: <greet from socket.>
群聊聊天室
/**
* 常量
*
* @author futao
* @date 2020/7/2
*/
public class Constants {
/**
* 服务器端口
*/
public static final int SERVER_PORT = 9507;
/**
* 关键字-客户端退出系统
*/
public static final String KEY_WORD_QUIT = "quit";
/**
* 使用的字符集编码
*/
public static final Charset CHARSET = StandardCharsets.UTF_8;
}
/**
* @author futao
* @date 2020/7/2
*/
public class IOUtils {
/**
* 从输入流中读取字符串
*
* @param is 输入流
* @return 读取到的字符串
* @throws IOException
*/
public static String readString(InputStream is) throws IOException {
//使用带有缓冲区的BufferInputStream以提高读取性能
BufferedInputStream bufferedInputStream = new BufferedInputStream(is);
//从缓冲区中一次读取的数据
byte[] buffer = new byte[1024 * 4];
//读取的字符串
StringBuilder fullMessage = new StringBuilder();
//本次读取到的字节个数
int curBufferSize;
//循环将数据写入缓冲区buffer,并返回读取到的字节个数。当前数据读取完毕会返回-1。
// curBufferSize这个参数的作用有两个
// 1. 判断是否读取到了流的末尾(==-1?)
// 2. 缓冲区字节数组buffer可能并没有写满,只写了curBufferSize,
// 那么我们只需要将字节数组中前面curBufferSize个字节转换成字符串就行。
while ((curBufferSize = bufferedInputStream.read(buffer)) != -1) {
//将buffer中的数据转换成字符串,从buffer的第0个字节开始,读取curBufferSize个字节
fullMessage.append(new String(buffer, 0, curBufferSize));
}
return fullMessage.toString();
}
}
greet from socket.
echo from server: <greet from socket.>
ServerSocket
并绑定所监听的端口serverSocket.accept()
阻塞监听客户端的接入Socket
,并将该Socket
上流的读写操作交给子线程去处理,主线程继续阻塞在accept()
监听客户端的接入,否则同一时刻只能有一个客户端接入。BioChatServer
/**
* @author futao
* @date 2020/7/2
*/
public class BioChatServer {
private static final Logger logger = LoggerFactory.getLogger(BioChatServer.class);
/**
* 启动服务器
*/
public void start() {
//创建服务端ServerSocket,并监听端口
try (ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT)) {
logger.debug("========== 基于BIO的聊天室在[{}]端口启动成功 ==========", Constants.SERVER_PORT);
//循环accept()监听
while (true) {
//accept()将阻塞,直到有客户端Socket接入。并在服务端创建一个Socket与其对应
Socket socket = serverSocket.accept();
logger.debug("客户端[{}]成功接入", socket.getPort());
//将获取到的客户端连接交给子线程去处理,不影响主线程继续监听,等待下一个客户端连接
new Thread(() -> {
try (
//用于从客户端读取数据
InputStream inputStream = socket.getInputStream();
//用于将数据写给客户端
OutputStream outputStream = socket.getOutputStream()
) {
//从输入流中读取数据
String fullMessage = IOUtils.readString(inputStream);
logger.info("接收到客户端【{}】发来的消息[{}]", socket.getPort(), fullMessage);
//编码与响应
outputStream.write(String.format("echo from server: <%s>", fullMessage).getBytes(Constants.CHARSET));
} catch (IOException e) {
logger.error("客户端异常", e);
}
}).start();
}
} catch (IOException e) {
logger.error("服务器启动失败", e);
return;
}
}
public static void main(String[] args) {
new BioChatServer().start();
}
}
BioChatClient
/**
* @author futao
* @date 2020/7/2
*/
public class BioChatClient {
private static final Logger logger = LoggerFactory.getLogger(BioChatClient.class);
/**
* 启动客户端
*/
private void start() {
try (
//尝试连接到服务器
Socket socket = new Socket("localhost", Constants.SERVER_PORT);
//获取到输入流
InputStream inputStream = socket.getInputStream();
//输出流
OutputStream outputStream = socket.getOutputStream()
) {
logger.debug("========== 成功连接到聊天服务器 ==========");
//获取到用户输入的字符串
String userInputStr = new Scanner(System.in).nextLine();
//将字符串转换成字节,写入输出流
outputStream.write(userInputStr.getBytes(Constants.CHARSET));
//刷新缓冲区
outputStream.flush();
//【重要】关闭输出流,通知服务器客户端消息已经发送完毕
socket.shutdownOutput();
//读取服务端的响应
logger.info("接收到消息:[{}]", IOUtils.readString(inputStream));
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new BioChatClient().start();
}
}
BioChatClient.start()
的socket.shutdownOutput();
,如果不将客户端socket
对应的输出流关闭,服务器端将不知道客户端消息是否发送完毕,服务端会一直阻塞在inputstream.read()
。image.png
main
线程上发生的。image.png
image.png
image.png
Inputstream.read()
,因为服务端根本不知道客户端的数据已经发送完毕了。客户端打开一个输出流,如果不做约定,也不关闭它,那么服务端永远不知道客户端是否发送完消息,那么服务端会一直等待下去,直到读取超时。
socket.shutdownOutput();
而不是outputStream.close();
,这样还能继续监听从服务端响应的输入流。Reader.readLine()
将会在读取到回车\r
,换行\n
或者回车紧跟着换行\r\n
时返回读取到的数据。Writer.writeLine()/Reader.readLine()
\r\n
,标识消息发送完毕。
/**
* 使用标记符号的方式通知消息发送完毕
*
* @author futao
* @date 2020/7/2
*/
public class BioChatServer {
private static final Logger logger = LoggerFactory.getLogger(BioChatServer.class);
private static final Set<Socket> CLIENT_SOCKET_SET = new HashSet<Socket>() {
@Override
public synchronized boolean add(Socket o) {
return super.add(o);
}
@Override
public synchronized boolean remove(Object o) {
return super.remove(o);
}
};
public void start() {
//创建服务端ServerSocket,并监听端口
try (ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT)) {
logger.debug("========== 基于BIO的聊天室在[{}]端口启动成功 ==========", Constants.SERVER_PORT);
//循环accept()监听
while (true) {
//accept()将阻塞,直到有客户端Socket接入。并在服务端创建一个Socket与其对应
Socket socket = serverSocket.accept();
logger.debug("客户端[{}]上线", socket.getPort());
CLIENT_SOCKET_SET.add(socket);
//将获取到的客户端连接交给子线程去处理,不影响主线程继续监听,等待下一个客户端连接
new Thread(() -> {
try {
//用于从客户端读取数据(将字节流转换成字符流)
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new BufferedInputStream(socket.getInputStream())));
while (true) {
//从输入流中读取数据
String message = bufferedReader.readLine();
if (StringUtils.isNotBlank(message)) {
logger.info("接收到客户端【{}】发来的消息[{}]", socket.getPort(), message);
} else {
isQuit(message, socket);
}
//判断是否为下线
boolean isQuit = isQuit(message, socket);
//转发消息
if (!isQuit) {
forwardMessage(socket.getPort(), String.format("<from %s>", socket.getPort()) + message);
} else {
break;
}
}
} catch (IOException e) {
logger.error("客户端异常", e);
}
}).start();
}
} catch (IOException e) {
logger.error("服务器启动失败", e);
return;
}
}
public boolean isQuit(String message, Socket socket) throws IOException {
boolean isQuit = Constants.KEY_WORD_QUIT.equals(message);
if (isQuit) {
CLIENT_SOCKET_SET.remove(socket);
int port = socket.getPort();
socket.close();
logger.debug("客户端[{}]下线", port);
}
return isQuit;
}
/**
* 转发消息
*
* @param curSocketPort 当前发送消息的客户端Socket的端口
* @param message 需要转发的消息
*/
public void forwardMessage(int curSocketPort, String message) {
message += "\r\n";
if (StringUtils.isBlank(message)) {
return;
}
for (Socket socket : CLIENT_SOCKET_SET) {
if (socket.isClosed() || socket.getPort() == curSocketPort) {
continue;
}
if (socket.getPort() != curSocketPort) {
try {
OutputStream outputStream = socket.getOutputStream();
//将字符串编码之后写入客户端
outputStream.write(message.getBytes(Constants.CHARSET));
//刷新缓冲区
outputStream.flush();
} catch (IOException e) {
logger.error("消息转发失败", e);
}
}
}
}
public static void main(String[] args) {
new BioChatServer().start();
}
}
/**
* @author futao
* @date 2020/7/2
*/
public class BioChatClient {
private static final Logger logger = LoggerFactory.getLogger(BioChatClient.class);
/**
* 开启这个线程的目的是,当用户输入了退出指令,需要通知监听响应的线程也结束,
* 否则如果监听响应的线程还处于阻塞状态的话,客户端应用是无法停止的
*/
private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
private void start() {
try {
Socket socket = new Socket("localhost", Constants.SERVER_PORT);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Constants.CHARSET));
OutputStream outputStream = socket.getOutputStream();
logger.debug("========== 成功连接到聊天服务器 ==========");
new Thread(() -> {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in, Constants.CHARSET));
while (true) {
try {
String userInputStr = bufferedReader.readLine();
//需要加上换行符
outputStream.write((userInputStr + "\n").getBytes(Constants.CHARSET));
outputStream.flush();
if (Constants.KEY_WORD_QUIT.equals(userInputStr)) {
reader.close();
outputStream.close();
socket.close();
//通知监听响应的线程也结束
executorService.shutdownNow();
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
logger.debug("========== 退出聊天 ==========");
}).start();
executorService.execute(() -> {
//线程一直监听服务端发送的消息
String message;
try {
while (!socket.isInputShutdown() && (message = reader.readLine()) != null) {
logger.info("接收到消息:[{}]", message);
}
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new BioChatClient().start();
}
}
image.png
image.png
启动50个客户端,观察服务器端线程的情况。
/**
* @author futao
* @date 2020/7/5
*/
public class ClientRunner {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 50; i++) {
executorService.execute(() -> {
new BioChatClient().start();
});
}
}
}
image.png
服务端有50个线程与客户端对应。
服务端在accept()与客户端建立Socket连接之后,将该任务交给线程池去处理,而不是每次都开启一个新的线程。
image.png
image.png
欢迎在评论区留下你看文章时的思考,及时说出,有助于加深记忆和理解,还能和像你一样也喜欢这个话题的读者相遇~