本文是NIO相关demo,终于到写demo这一步了
Server类
ServerHandler类
packagecom.coder.netty.nio;
importjava.io.IOException;
importjava.net.InetSocketAddress;
importjava.nio.ByteBuffer;
importjava.nio.channels.SelectionKey;
importjava.nio.channels.Selector;
importjava.nio.channels.ServerSocketChannel;
importjava.nio.channels.SocketChannel;
importjava.util.Iterator;
importjava.util.Set;
public classServerHandlerimplementsRunnable{
privateSelectorselector;
privateServerSocketChannelserverSocketChannel;
private volatile booleanstarted;
publicServerHandler(intport) {
try{
selector= Selector.open();
serverSocketChannel= ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(newInetSocketAddress(port),1024);
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
started=true;
System.out.println("服务器已启动,端口为:"+ port);
}catch(IOException e) {
e.printStackTrace();
}
}
public voidstop(){
started=false;
}
public voidrun() {
while(started) {
try{
selector.select(1000);
Set keys =selector.selectedKeys();
Iterator iterator = keys.iterator();
SelectionKey key =null;
while(iterator.hasNext()) {
key = iterator.next();
iterator.remove();
handle(key);
}
}catch(IOException e) {
e.printStackTrace();
}
}
if(selector!=null) {
try{
selector.close();
}catch(IOException e) {
e.printStackTrace();
}
}
}
private voidhandle(SelectionKey key)throwsIOException {
if(key.isValid()) {
if(key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
}
if(key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
intreadBytes = socketChannel.read(buffer);
if(readBytes >) {
buffer.flip();
byte[] bytes =new byte[buffer.remaining()];
buffer.get(bytes);
String clientMsg =newString(bytes,"UTF-8");
System.out.println("服务器收到的消息为:"+ clientMsg);
String reponseMsg ="+++++++++"+ clientMsg;
write(socketChannel,reponseMsg);
}else if(readBytes
key.cancel();
socketChannel.close();
}
}
}
}
private voidwrite(SocketChannel channel,String responseMsg)throwsIOException {
byte[] bytes = responseMsg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
Client类
packagecom.coder.netty.nio;
importjava.util.Scanner;
public classClient {
private staticStringHOST="127.0.0.1";
private static intPORT=6667;
private staticClientHandlerclientHandler;
public static synchronized voidstart() {
if(clientHandler!=null) {
clientHandler.stop();
}
clientHandler=newClientHandler(HOST,PORT);
newThread(clientHandler,"Server").start();
}
public static voidmain(String[] args) {
start();
Scanner scanner =newScanner(System.in);
String msg =null;
while(true) {
msg = scanner.nextLine();
if("q".equals(msg)) {
break;
}
try{
clientHandler.sendMsg(msg);
}catch(Exception e) {
e.printStackTrace();
}
}
}
}
ClientHandler类
packagecom.coder.netty.nio;
importjava.io.IOException;
importjava.net.InetSocketAddress;
importjava.nio.ByteBuffer;
importjava.nio.channels.SelectionKey;
importjava.nio.channels.Selector;
importjava.nio.channels.SocketChannel;
importjava.util.Iterator;
importjava.util.Set;
public classClientHandlerimplementsRunnable {
privateStringhost;
private intport;
privateSelectorselector;
privateSocketChannelsocketChannel;
private volatile booleanstarted;
publicClientHandler(String ip, intport) {
this.host= ip;
this.port= port;
try{
selector= Selector.open();
socketChannel= SocketChannel.open();
socketChannel.configureBlocking(false);
started=true;
}catch(IOException e) {
e.printStackTrace();
}
}
public voidstop(){
started=false;
}
public voidrun() {
try{
connect();
}catch(IOException e) {
e.printStackTrace();
}
while(started) {
try{
selector.select(1000);
Set keys =selector.selectedKeys();
Iterator iterator = keys.iterator();
SelectionKey key =null;
while(iterator.hasNext()) {
key = iterator.next();
iterator.remove();
handle(key);
}
}catch(IOException e) {
e.printStackTrace();
}
}
if(selector!=null) {
try{
selector.close();
}catch(IOException e) {
e.printStackTrace();
}
}
}
private voidhandle(SelectionKey key)throwsIOException{
if(key.isValid()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
if(key.isConnectable()){
if(!socketChannel.finishConnect()){
System.exit(1);
}
}
if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(1024);
intreadBytes = socketChannel.read(buffer);
if(readBytes >) {
buffer.flip();
byte[] bytes =new byte[buffer.remaining()];
buffer.get(bytes);
String result =newString(bytes,"UTF-8");
System.out.println("客户端接受到的消息:"+ result);
}else if(readBytes
key.cancel();
socketChannel.close();
}
}
}
}
private voidwrite(SocketChannel channel,String request)throwsIOException{
byte[] bytes = request.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
private voidconnect()throwsIOException {
if(!socketChannel.connect(newInetSocketAddress(host,port))) {
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}
public voidsendMsg(String msg)throwsException{
socketChannel.register(selector,SelectionKey.OP_READ);
write(socketChannel,msg);
}
}
领取专属 10元无门槛券
私享最新 技术干货