Java反射使我们能在程序运行时动态调用某个对象的方法/构造函数、获取某个对象的属性,经常用于实现动态代理、工厂模式、Java JDBC加载连接驱动类等,近期阅读开源项目源码发现,它还有一种重要的用途——状态同步。
开源项目NNAnalytics是一个用Java实现的HDFS文件分析项目,能为我们实时分析、展示HDFS中各类文件(小文件、中文件、大文件等)的数据。之所以能做到实时,与使用反射同步HDFS元数据对象变化脱不开关系,接下我们来剖析一下该项目源码并使用一个测试用例来理解如何使用反射实现状态同步。
首先,在NNAnalytics服务启动的时候,会加载fsimage元数据镜像文件在JVM内存中构建org.apache.hadoop.hdfs.server.namenode.INodeMap对象,该对象保存了HDFS中所有的INode信息,INodeMap在JVM内存中保存了所有的HDFS元数据信息。接着,在启动逻辑中,NNAnalytics中的org.apache.hadoop.hdfs.server.namenode.AbstractQueryEngine类会使用反射的getDeclaredField方法获取INodeMap类中的map属性,该属性实际保存了INode信息:
INodeMap inodeMap = fsDirectory.getINodeMap();
Field mapField = inodeMap.getClass().getDeclaredField("map");
mapField.setAccessible(true);
GSet<INode, INodeWithAdditionalFields> gset =
(GSet<INode, INodeWithAdditionalFields>) mapField.get(inodeMap);在反射获取了保存INode信息的map对象之后,AbstractQueryEngine类使用这些信息得到所有文件(files)和目录(dirs)信息:
files =
StreamSupport.stream(gset.spliterator(), true)
.filter(INode::isFile)
.collect(Collectors.toConcurrentMap(node -> node, node -> node));
dirs =
StreamSupport.stream(gset.spliterator(), true)
.filter(INode::isDirectory)
.collect(Collectors.toConcurrentMap(node -> node, node -> node));
all = CollectionsView.combine(files.keySet(), dirs.keySet());以上代码中三个变量在AbstractQueryEngine类中的声明如下:
protected Collection<INode> all;
protected Map<INode, INodeWithAdditionalFields> files;
protected Map<INode, INodeWithAdditionalFields> dirs;由于反射操作的是JVM内存中的对象信息,反射获取的对象和被反射的对象在JVM中指向同一内存地址,因此,当INodeMap中的map属性更新时,all、files、dirs变量也会同时更新,我们就能获取到HDFS文件元数据信息对象的变化信息。
为了证实这一点,我们写段代码测试一下。
接下来我们使用以下代码复现一下使用反射实现状态同步的场景,以下代码由Test1和Test2两个类组成:
Test1类的定义如下:
public class Test1 {
// 保存随机数字
private final List<Integer> list;
static Test1 newInstance(List<Integer> list) {
return new Test1(list);
}
// 类初始化时调用启动add和remove方法
private Test1(List<Integer> list) {
add();
remove();
this.list = list;
}
public List<Integer> getList() {
return list;
}
/**
* 定时添加随机数字
*/
public final void add() {
ScheduledThreadPoolExecutor executors = new ScheduledThreadPoolExecutor(1);
executors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
list.add(new Random().nextInt(100));
}
}, 1, 2, TimeUnit.SECONDS);
}
/**
* 定时移除随机数字
*/
public final void remove() {
ScheduledThreadPoolExecutor executors = new ScheduledThreadPoolExecutor(21);
executors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
list.remove(0);
}
}, 1, 2, TimeUnit.SECONDS);
}
}Test1类很简单,主要有一个属性和两个方法:
list属性,保存随机数字add和remove方法:每隔两秒添加或者移除list中的元素Test2类的定义如下,主要逻辑见注释:
public class Test2 {
// 保存反射获取的Test1中list字段的值
protected static List<Integer> list2;
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
List<Integer> list1 = new ArrayList<>();
list1.add(1);
list1.add(2);
Test1 test1 = Test1.newInstance(list1);
// 1、反射获取list对象
Field field = test1.getClass().getDeclaredField("list");
// 2、设置访问字段时跳过安全检查,否则会抛出java.lang.IllegalAccessException异常
field.setAccessible(true);
// 3、反射获取Test1中list字段的值
list2 = (List) field.get(test1);
// 4、比较list1和list2是否指向同一地址,输出结果true
System.out.println(list2 == list1);
// 5、打印输出list的值,观察list2是否与list1的数据同步
while (true) {
System.out.println(list2);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}输出结果:
[40, 59]
[94, 43]
[43, 93]
[30, 90]
[90, 2]
[53, 56]
[56, 87]
[70, 56]
[56, 42]
[40, 8]
[8, 53]
[58, 91]
[91, 85]
[99, 73]
[73, 70]
[9, 1]
......以上输出证明list2能感知到list1的变化,与list1实现了状态同步。
反射不仅可以帮我们实现动态代理、工厂模式、Java JDBC加载数据库驱动类等操作,还可以帮我们实现状态同步,其根本原因在于反射获取的对象与被反射对象指向同一内存地址。在使用反射操作私有(private)字段时,特别要注意设置字段跳过安全检查,也就是setAccessible(true),否则会抛出java.lang.IllegalAccessException异常。