在HDFS集群中,DataNode掉线判断流程为:
超时时长
。timeout
,则超时时长的计算公式为:
timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval
heartbeat.recheck.interval
是datanode与namenode心跳的时间间隔,heartbeat.recheck.interval
是namenode尝试连接datanode的时间间隔
当datanode没有与namenote发生心跳10次后,namenode尝试主动连接datanode,如果两次都未能连接到datanode,才认为datanode进程已经销毁。
默认的heartbeat.recheck.interval
大小为5分钟,dfs.heartbeat.interval
默认为3秒hdfs-site.xml
配置文件中的 heartbeat.recheck.interval
的单位为毫秒,所以该值在默认配置中为300000,dfs.heartbeat.interval
的单位为秒。
所以,举个例子,如果heartbeat.recheck.interval
设置为 5000(毫秒),dfs.heartbeat.interval
设置为 3(秒),则总的超时时间为40秒NameNode判断DataNode是否下线的时间太长了,利用zookeeper实现服务器上下线动态感知
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
*
* @Description:
* 实现两个功能:
* 1.创建一个/namenode节点下的子节点来模拟实现一个datanode的下线
* 2.删除一个/namenode节点下的子节点来模拟实现一个datanode的下线
* 在/namenode下创建的子节点为EPHEMERAL类型的子节点(临时节点)
*
* 假如datanode02这个服务器节点在zookeeper中创建的临时znode节点为datanode02
* 如果服务器datanode02一直运行,那么zookeeper会一直维护这个会话连接,datanode02这个znode节点会一直存在
* 如果服务器datanode02宕机之后, 那么zookeeper会知道这个临时节点的创建会话已经断开,所以zookeeper会自动删除该临时节点
* 删除了该临时节点,那么监听/namenode节点下的子节点变化的程序(NameNode.java)就能感知到有一个datanode下线了
*
* @author Jed
* @date 2017年12月20日
*/
public class DataNode {
private static final String CONNECT_STRING = "hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181";
private static final int SESSION_TIMEOUT = 5000;
private static final String NAMENODE = "/namenode";
private static final String DATANODE = "datanode01";
public static void main(String[] args) throws Exception {
// 1.获取zookeeper连接
ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, null);
// 2.判断/namenode节点是否存在,如果不存在,先创建,再添加监听
Stat exists = zk.exists(NAMENODE, null);
if(exists == null) {
zk.create(NAMENODE, "namenode".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 3.利用创建/namenode节点下的子节点来模拟实现datanode的上线
zk.create(NAMENODE + "/" + DATANODE, DATANODE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("DataNode " + DATANODE + " 已上线...");
// 4.模拟这个datanode一直运行着,只要手动中断程序,就代表着这个datanode下线
Thread.sleep(Long.MAX_VALUE);
}
}
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
* @Description:
* 模拟HDFS中的namenode角色
* 监听/namenode节点下的子节点列表
* 这个程序必须是一直运行着,而且要做到循环监听
* @author Jed
* @date 2017年12月20日
*/
public class NameNode {
private static ZooKeeper zk;
private static final String CONNECT_STRING = "hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181";
private static final int SESSION_TIMEOUT = 5000;
private static final String NAMENODE = "/namenode";
private static List<String> oldChildNodeList = null;
public static void main(String[] args) throws Exception {
// 1.获取zookeeper连接
zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
String path = event.getPath();
EventType type = event.getType();
KeeperState state = event.getState();
// 过滤掉第一次创建连接的事件
if(state.getIntValue() == 3 && path != null) {
/*
* 5.实现具体的业务逻辑代码
* 业务逻辑代码执行与否是应该根据 event 中的属性来决定
* 在这里,如果是/namenode下(判断path)的子节点发生了改变(事件为NodeChildrenChanged)
* 就调用业务逻辑代码
*/
if(path.equals(NAMENODE) && type == EventType.NodeChildrenChanged) {
List<String> newChildNodeList = null;
/* 6.添加循环监听
* 以下代码有两个作用:
* 1)添加循环监听
* 2)获取/namenode下所有子节点的列表
*/
try {
newChildNodeList = zk.getChildren(path, true);
} catch (Exception e) {
e.printStackTrace();
}
String flag = "上线";
if(newChildNodeList.size() < oldChildNodeList.size()) {
flag = "下线";
}
// 得到是哪个节点发生了改变(上线或者下线了)
String changeNode = getChangeNode(oldChildNodeList, newChildNodeList);
System.out.println("DataNode " + changeNode + " 已" + flag + "...");
// 每次 /name节点下的子节点的列表发生了改变之后 ,都需要去更新
oldChildNodeList = newChildNodeList;
}
}
}
});
// 2.判断/namenode节点是否存在,如果不存在,先创建,再添加监听
Stat exists = zk.exists(NAMENODE, null);
if(exists == null) {
zk.create(NAMENODE, "namenode".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 3.给 /namenode 节点添加 NodeChildrenChanged 事件的监听
oldChildNodeList = zk.getChildren(NAMENODE, true);
// 4.模拟当前namenode一直运行着
Thread.sleep(Long.MAX_VALUE);
}
/**
* 是用来获取两个集合中不同的节点
* 方法:遍历大的集合,然后去小集合中去匹配
* 返回大集合中存在而小集合中不存在的元素
*/
public static String getChangeNode(List<String> oldChildNodeList, List<String> newChildNodeList){
// 默认:增加了一个节点
List<String> big = newChildNodeList;
List<String> small = oldChildNodeList;
if(newChildNodeList.size() < oldChildNodeList.size()){
big = oldChildNodeList;
small = newChildNodeList;
}
for(String val : big){
if(!small.contains(val)){
return val;
}
}
return null;
}
}
(1) 运行NameNode.java
(2) 运行DataNode.java
DataNode控制台输出:
DataNode datanode01 已上线...
NameNode控制台输出:
DataNode datanode01 已上线...
(3) 把DataNode代码中的
private static final String DATANODE = "datanode01";
改为:
private static final String DATANODE = "datanode02";
然后再运行一次DataNode.java
新的DataNode的控制台输出:
DataNode datanode02 已上线...
NameNode的控制台更新为:
DataNode datanode01 已上线...
DataNode datanode02 已上线...
(4) 把DataNode代码中的
private static final String DATANODE = "datanode02";
改为:
private static final String DATANODE = "datanode03";
然后再运行一次DataNode.java
新的DataNode的控制台输出:
DataNode datanode03 已上线...
NameNode的控制台更新为:
DataNode datanode01 已上线...
DataNode datanode02 已上线...
DataNode datanode03 已上线...
(5) 手动关闭一个DataNode程序(比如datanode03) NameNode的控制台更新为:
DataNode datanode01 已上线...
DataNode datanode02 已上线...
DataNode datanode03 已上线...
DataNode datanode03 已下线...
(6) 再手动关闭一个DataNode程序(比如datanode02) NameNode的控制台更新为:
DataNode datanode01 已上线...
DataNode datanode02 已上线...
DataNode datanode03 已上线...
DataNode datanode03 已下线...
DataNode datanode02 已下线...
至此,我们已经模拟实现了服务器上下线的动态感知!