0 <= mark <= position <= limit <= capacity
废话不说,直接上代码。下面使用NIO编写了一个简单的聊天程序,服务端将客户端发来的消息广播给所有客户端。
首先看服务端:
package william.netty.nio;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Auther: ZhangShenao
* @Date: 2019/1/23 15:46
* @Description:聊天程序服务端
*/
public class ChatServer {
private static Map<String, SocketChannel> clients = new ConcurrentHashMap<>();
private static Selector selector;
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8080));
System.err.println("Server 绑定端口 : 8080");
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int select = selector.select();
if (select <= 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isAcceptable()) {
acceptClient((ServerSocketChannel) selectionKey.channel());
} else if (selectionKey.isReadable()) {
broadcastMessage((SocketChannel) selectionKey.channel());
}
}
}
}
private static void acceptClient(ServerSocketChannel serverSocketChannel) throws Exception {
SocketChannel client = serverSocketChannel.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
clients.putIfAbsent(UUID.randomUUID().toString(), client);
System.err.println("客户端连接: " + client);
}
private static void broadcastMessage(SocketChannel channel) throws Exception {
ByteBuffer readBuf = ByteBuffer.allocate(1024);
int read = channel.read(readBuf);
if (read <= 0) {
return;
}
readBuf.flip();
Charset charset = Charset.forName("UTF-8");
String content = String.valueOf(charset.decode(readBuf).array());
System.err.println("Client: " + channel + ", content: " + content);
for (Map.Entry<String, SocketChannel> entry : clients.entrySet()) {
String key = entry.getKey();
SocketChannel client = entry.getValue();
String msg;
if (client == channel) {
msg = "[self]: " + content;
} else {
msg = "[" + key + "]" + content;
}
ByteBuffer writeBuf = ByteBuffer.wrap(msg.getBytes(charset));
// writeBuf.flip(); 使用wrap()方法创建的Buffer,不需要调用flip
client.write(writeBuf);
}
}
}
下面是客户端:
package william.netty.nio;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Auther: ZhangShenao
* @Date: 2019/1/23 16:53
* @Description:聊天程序客户端
*/
public class ChatClient {
private static Selector selector;
private static ExecutorService executor = Executors.newFixedThreadPool(5);
public static void main(String[] args) throws Exception {
selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
while (true) {
int select = selector.select();
if (select <= 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
/*ByteBuffer writeBuf = ByteBuffer.allocate(1024);
writeBuf.put((LocalDateTime.now() + "连接成功").getBytes());
writeBuf.flip();
channel.write(writeBuf);*/
send2Server(channel);
}
channel.register(selector,SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
showMessage((SocketChannel) selectionKey.channel());
}
}
iterator.remove();
}
}
private static void send2Server(SocketChannel channel) {
executor.submit(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
buffer.clear();
String msg = reader.readLine();
buffer.put(msg.getBytes());
buffer.flip();
channel.write(buffer);
}
});
}
private static void showMessage(SocketChannel channel) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
System.err.println(new String(bytes));
}
}