分布式系统之间通信可以分为两种:
分布式子系统之间需要通信时,就发送消息。一般通信的两个要点是:消息处理和消息传输。
在Java中可基于Socket、ServerSocket来实现TCP/IP+BIO的系统通信。
为了满足服务端可以同时接受多个请求,最简单的方法是生成多个Socket。但这样会产生两个问题:
为了解决上面的问题,通常采用连接池的方式来维护Socket。一方面能限制Socket的个数;另一方面避免重复创建Socket带来的性能下降问题。这里有一个问题就是设置合适的相应超时时间。因为连接池中Socket个数是有限的,肯定会造成激烈的竞争和等待。
//创建连接
Socket socket = new Socket(目标IP或域名, 目标端口);
//BufferedReader用于读取服务端返回的数据
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//PrintWriter向服务器写入流
PrintWriter out = new PrintWriter(socket.getOutputStream(),true);
//像服务端发送流
out.println("hello");
//阻塞读取服务端的返回信息
in.readLine();
//创建对本地端口的监听
PrintWriter out = new PrintWriter(socket.getOutputStream(),true);
//向服务器发送字符串信息
out.println("hello");
//阻塞读取服务端的返回信息
in.readLine();
Java可以基于Clannel和Selector的相关类来实现TCP/IP+NIO方式的系统间通信。Channel有SocketClannel和ServerSocketChannel两种。
SocketChannel channel = SocketChannel.open();
//设置为非阻塞模式
channel.configureBlocking(false);
//对于非阻塞模式,立即返回false,表示连接正在建立中
channel.connect(SocketAdress);
Selector selector = Selector.open();
//向channel注册selector以及感兴趣的连接事件
channel.regester(selector,SelectionKey.OP_CONNECT);
//阻塞至有感兴趣的IO事件发生,或到达超时时间
int nKeys = selector.select(超时时间【毫秒计】);
//如果希望一直等待知道有感兴趣的事件发生
//int nKeys = selector.select();
//如果希望不阻塞直接返回当前是否有感兴趣的事件发生
//int nKeys = selector.selectNow();
//如果有感兴趣的事件
SelectionKey sKey = null;
if(nKeys>0){
Set<SelectionKey> keys = selector.selectedKeys();
for(SelectionKey key:keys){
//对于发生连接的事件
if(key.isConnectable()){
SocketChannel sc = (SocketChannel)key.channel();
sc.configureBlocking(false);
//注册感兴趣的IO读事件
sKey = sc.register(selector,SelectionKey.OP_READ);
//完成连接的建立
sc.finishConnect();
}
//有流可读取
else if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel sc = (SocketChannel) key.channel();
int readBytes = 0;
try{
int ret = 0;
try{
//读取目前可读取的值,此步为阻塞操作
while((ret=sc.read(buffer))>0){
readBytes += ret;
}
}
fanally{
buffer.flip();
}
}
finally{
if(buffer!=null){
buffer.clear();
}
}
}
//可写入流
else if(key.isWritable()){
//取消对OP_WRITE事件的注册
key.interestOps(key.interestOps() & (!SelectionKey.OP_WRITE));
SocketChannel sc = (SocketChannel) key.channel();
//此步为阻塞操作
int writtenedSize = sc.write(ByteBuffer);
//如未写入,则继续注册感兴趣的OP_WRITE事件
if(writtenedSize==0){
key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
}
}
}
Selector.selectedKeys().clear();
}
//对于要写入的流,可直接调用channel.write来完成。只有在未写入成功时才要注册OP_WRITE事件
int wSize = channel.write(ByteBuffer);
if(wSize == 0){
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket serverSocket = ssc.socket();
//绑定要监听的接口
serverSocket.bind(new InetSocketAdress(port));
ssc.configureBlocking(false);
//注册感兴趣的连接建立事件
ssc.register(selector,SelectionKey.OP_ACCEPT);
Java对UDP/IP方式的网络数据传输同样采用Socket机制,只是UDP/IP下的Socket没有建立连接,因此无法双向通信。如果需要双向通信,必须两端都生成UDP Server。
Java中通过DatagramSocket和DatagramPacket来实现UDP/IP+BIO方式和系统间通信。
由于UDP双端不建立连接,所以也就不存在竞争问题,只是最终读写流的动作是同步的。
//如果希望双向通信,必须启动一个监听端口承担服务器的职责
//如果不能绑定到指定端口,则抛出SocketException
DatagramSocket serverSocket = new DatagramSocket(监听的端口);
byte[] buffer = new byte[65507];
DatagramPacket receivePacket = new DatagramPacket(buffer,buffer.length);
DatagramSocket socket = new DatagramSocket();
DatagramPacket packet = new DatagramPacket(datas,datas.length,server.length);
//阻塞方式发送packet到指定的服务器和端口
socket.send(packet);
//阻塞并同步读取流消息,如果读取的流消息比packet长,则删除更长的消息
//当连接不上目标地址和端口时,抛出PortUnreachableException
DatagramSocket.setSoTimeout(超时时间--毫秒级);
serverSocket.receive(receivePacket);
Java中可以通过DatagramClannel和ByteBuffer来实现UDP/IP方式的系统间通信。
//读取流信息
DatagramChannel receiveChannel = DatagramChannel.open();
receiveChannel.configureBlocking(false);
DatagramSocket socket = receiveChannel.socket();
socket.bind(new InetSocketAddress(rport));
Selector selector = Selector.open();
receiveChannel.register(selector, SelectionKey.OP_REEAD);
//之后即可像TCP/IP+NIO中对selector遍历一样的方式进行流信息的读取
//...
//写入流信息
DatagramChannel sendChannel = DatagramChannel.open();
sendChannel.configureBlocking(false);
SocketAdress target = new InetSocketAdress("127.0.0.1",sport);
sendChannel.connect(target);
//阻塞写入流
sendChannel.write(ByteBuffer);