在局域网内,组播通讯还是很有用处的,以下代码基于MulticastSocket类进一步封装更加方便的实现组播数据发送和组播数据接收功能。
package net.gdface.utils;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.*;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.net.HostAndPort;
/**
* 网络管理工具类
* @author guyadong
*
*/
public class NetworkUtil{
/**
* 向指定的组播地址和端口发送组播数据
* @param group
* @param port
* @param message
* @param ttl
* @throws IOException
*/
public static void sendMultiCast(InetAddress group,int port,byte[] message, Integer ttl) throws IOException{
checkArgument(null != group,"group is null");
checkArgument(group.isMulticastAddress(),"group %s is not a multicast address",group);
checkArgument(message != null && message.length > 0,"message is null or empty");
MulticastSocket ms = new MulticastSocket();
try {
if(ttl != null){
ms.setTimeToLive(ttl);
}
ms.send(new DatagramPacket(message, message.length,group,port));
} finally {
ms.close();
}
}
/**
* 向指定的组播地址和端口发送组播数据
* @param bindaddr 组播IP地址
* @param port 端口
* @param message
* @throws IOException
*/
public static void sendMultiCast(String bindaddr,int port,byte[] message) throws IOException{
checkArgument(!Strings.isNullOrEmpty(bindaddr),"bindaddr is null or empty");
sendMultiCast(InetAddress.getByName(bindaddr),port,message, null);
}
/**
* 向指定的组播地址和端口发送组播数据
* @param hostPort 组播地址和端口号(:号分隔) 如:244.12.12.12:4331,或[244.12.12.12:4331]
* @param message
* @throws IOException
*/
public static void sendMultiCast(String hostPort,byte[] message) throws IOException{
HostAndPort hostAndPort = HostAndPort.fromString(hostPort);
sendMultiCast(InetAddress.getByName(hostAndPort.getHost()),hostAndPort.getPort(),message, null);
}
/**
* 循环接收group,port指定的组播地址发送的数据并交给processor 处理
* @param group 组播地址
* @param port 端口号
* @param bufferSize 组播数据最大长度,根据此参数值分配数据接收缓冲区长度
* @param processor 数据处理器,返回false,则中止循环
* @param onerr 异常处理器,返回false,则中止循环,为{@code null}则使用默认值{@link Predicates#alwaysTrue}
* @param stop 中止标记,调用者可通过些标记异步控制循环结束,可为{@code null}
* @throws IOException 网络IO异常
*/
public static void recevieMultiCastLoop(InetAddress group,int port,int bufferSize,
Predicate<byte[]>processor,
Predicate<Throwable> onerr,
AtomicBoolean stop) throws IOException{
checkArgument(null != group,"group is null");
checkArgument(group.isMulticastAddress(),"group %s is not a multicast address",group);
onerr = MoreObjects.firstNonNull(onerr, Predicates.<Throwable>alwaysTrue());
if(stop == null){
stop = new AtomicBoolean(false);
}
byte[] message = new byte[bufferSize];
DatagramPacket packet = new DatagramPacket(message, message.length);
MulticastSocket ms = new MulticastSocket(port);
ms.joinGroup(group);
try {
while(!stop.get()){
try {
ms.receive(packet);
byte[] recevied = new byte[packet.getLength()];
System.arraycopy(message, 0, recevied, 0, packet.getLength());
if(!processor.apply(recevied)){
break;
}
} catch (Exception e) {
if(!onerr.apply(e)){
break;
}
}
}
} finally {
ms.leaveGroup(group);
ms.close();
}
}
/**
* @param bindaddr 组播IP地址
* @param port 端口号
* @param bufferSize
* @param processor
* @param onerr
* @param stop
* @throws IOException
* @see #recevieMultiCastLoop(InetAddress, int, int, Predicate, Predicate, AtomicBoolean)
*/
public static void recevieMultiCastLoop(String bindaddr,int port,int bufferSize,
Predicate<byte[]>processor,
Predicate<Throwable> onerr,
AtomicBoolean stop) throws IOException{
checkArgument(!Strings.isNullOrEmpty(bindaddr),"bindaddr is null or empty");
recevieMultiCastLoop(InetAddress.getByName(bindaddr),port,bufferSize,processor,onerr,stop);
}
/**
* @param hostPort 组播地址和端口号(:号分隔) 如:244.12.12.12:4331,或[244.12.12.12:4331]
* @param bufferSize
* @param processor
* @param onerr
* @param stop
* @throws IOException
* @see #recevieMultiCastLoop(InetAddress, int, int, Predicate, Predicate, AtomicBoolean)
*/
public static void recevieMultiCastLoop(String hostPort,int bufferSize,
Predicate<byte[]>processor,
Predicate<Throwable> onerr,
AtomicBoolean stop) throws IOException{
HostAndPort hostAndPort = HostAndPort.fromString(hostPort);
recevieMultiCastLoop(InetAddress.getByName(hostAndPort.getHost()),hostAndPort.getPort(),bufferSize,processor,onerr,stop);
}
}
完整代码参见码云(GITEE)仓库地址: https://gitee.com/l0km/common-java/blob/master/common-base2/src/main/java/net/gdface/utils/NetworkUtil.java
JUNIT调用示例:
package net.gdface.utils;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import static net.gdface.utils.NetworkUtil.*;
public class MultiCastTest {
// 定义组播地址及端口号
private static final String hostAndPort = "224.42.64.11:26411";
// 组播接收线程中止标志
private static AtomicBoolean stop = new AtomicBoolean(false);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// 启动组播数据接收线程
new Thread(){
@Override
public void run() {
try {
recevieMultiCastLoop(hostAndPort, 200,new Predicate<byte[]>() {
@Override
public boolean apply(byte[] input) {
// 处理接收到的组播包数据
// 只是简单的向控制台输出
try {
System.out.write(input);
System.out.println();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
},
Predicates.<Throwable>alwaysFalse(),
stop);
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Test
public void testSend() {
// 发送组播数据
try {
for(int i=0;i<100;++i){
sendMultiCast(hostAndPort, String.format("hello %s", i).getBytes());
Thread.sleep(500);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 结束组播接收线程
stop.set(true);
}
}
完整代码参见码云(GITEE)仓库地址: https://gitee.com/l0km/common-java/blob/master/common-base2/src/test/java/net/gdface/utils/MultiCastTest.java