专栏首页Jed的技术阶梯zookeeper编程02-服务器上下线动态感知

zookeeper编程02-服务器上下线动态感知

在HDFS集群中,DataNode掉线判断流程为:

  1. datanode进程死亡或者网络故障造成datanode无法与 namenode通信,namenode不会立即 把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长
  2. HDFS默认的超时时长为10分钟30秒。如果定义超时时间为timeout,则超时时长的计算公式为: timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval heartbeat.recheck.interval是datanode与namenode心跳的时间间隔,heartbeat.recheck.interval是namenode尝试连接datanode的时间间隔 当datanode没有与namenote发生心跳10次后,namenode尝试主动连接datanode,如果两次都未能连接到datanode,才认为datanode进程已经销毁。 默认的heartbeat.recheck.interval大小为5分钟,dfs.heartbeat.interval默认为3秒
  3. 需要注意的是hdfs-site.xml配置文件中的 heartbeat.recheck.interval的单位为毫秒,所以该值在默认配置中为300000,dfs.heartbeat.interval 的单位为秒。 所以,举个例子,如果heartbeat.recheck.interval 设置为 5000(毫秒),dfs.heartbeat.interval设置为 3(秒),则总的超时时间为40秒

1. 需求

NameNode判断DataNode是否下线的时间太长了,利用zookeeper实现服务器上下线动态感知

2. 思路

3. 代码实现

(1) DataNode.java

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * 
 * @Description: 
 * 实现两个功能:
 * 1.创建一个/namenode节点下的子节点来模拟实现一个datanode的下线
 * 2.删除一个/namenode节点下的子节点来模拟实现一个datanode的下线
 * 在/namenode下创建的子节点为EPHEMERAL类型的子节点(临时节点)
 * 
 * 假如datanode02这个服务器节点在zookeeper中创建的临时znode节点为datanode02
 * 如果服务器datanode02一直运行,那么zookeeper会一直维护这个会话连接,datanode02这个znode节点会一直存在
 * 如果服务器datanode02宕机之后, 那么zookeeper会知道这个临时节点的创建会话已经断开,所以zookeeper会自动删除该临时节点
 * 删除了该临时节点,那么监听/namenode节点下的子节点变化的程序(NameNode.java)就能感知到有一个datanode下线了
 * 
 * @author Jed
 * @date 2017年12月20日
 */
public class DataNode {

    private static final String CONNECT_STRING = "hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String NAMENODE = "/namenode";
    private static final String DATANODE = "datanode01";
    
    public static void main(String[] args) throws Exception {
        
        // 1.获取zookeeper连接
        ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, null); 
        
        // 2.判断/namenode节点是否存在,如果不存在,先创建,再添加监听
        Stat exists = zk.exists(NAMENODE, null);
        if(exists == null) {
            zk.create(NAMENODE, "namenode".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        
        // 3.利用创建/namenode节点下的子节点来模拟实现datanode的上线
        zk.create(NAMENODE + "/" + DATANODE, DATANODE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("DataNode " +  DATANODE + " 已上线...");
        
        // 4.模拟这个datanode一直运行着,只要手动中断程序,就代表着这个datanode下线
        Thread.sleep(Long.MAX_VALUE);
    }
}

(2) NameNode.java

import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

/**
 * @Description: 
 * 模拟HDFS中的namenode角色
 * 监听/namenode节点下的子节点列表
 * 这个程序必须是一直运行着,而且要做到循环监听
 * @author Jed
 * @date 2017年12月20日
 */
public class NameNode {
    
    private static ZooKeeper zk;
    private static final String CONNECT_STRING = "hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String NAMENODE = "/namenode";
    private static List<String> oldChildNodeList = null;
    
    public static void main(String[] args) throws Exception {
        
        // 1.获取zookeeper连接
        zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
            
            @Override
            public void process(WatchedEvent event) {
                String path = event.getPath();
                EventType type = event.getType();
                KeeperState state = event.getState();
                
                // 过滤掉第一次创建连接的事件
                if(state.getIntValue() == 3 && path != null) {
                    
                    /*
                     * 5.实现具体的业务逻辑代码
                     * 业务逻辑代码执行与否是应该根据 event 中的属性来决定
                     * 在这里,如果是/namenode下(判断path)的子节点发生了改变(事件为NodeChildrenChanged)
                     * 就调用业务逻辑代码 
                     */
                    if(path.equals(NAMENODE) && type == EventType.NodeChildrenChanged) {
                        List<String> newChildNodeList = null;
                        
                        /* 6.添加循环监听
                         * 以下代码有两个作用:
                         * 1)添加循环监听
                         * 2)获取/namenode下所有子节点的列表
                         */
                        try {
                            newChildNodeList = zk.getChildren(path, true);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        
                        String flag = "上线";
                        if(newChildNodeList.size() < oldChildNodeList.size()) {
                            flag = "下线";
                        }
                        
                        // 得到是哪个节点发生了改变(上线或者下线了)
                        String changeNode = getChangeNode(oldChildNodeList, newChildNodeList);
                        System.out.println("DataNode " + changeNode + " 已" + flag + "...");
                        
                        // 每次 /name节点下的子节点的列表发生了改变之后 ,都需要去更新
                        oldChildNodeList = newChildNodeList;
                    }
                }
            }
        }); 
        
        // 2.判断/namenode节点是否存在,如果不存在,先创建,再添加监听
        Stat exists = zk.exists(NAMENODE, null);
        if(exists == null) {
            zk.create(NAMENODE, "namenode".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        
        // 3.给 /namenode 节点添加 NodeChildrenChanged 事件的监听
        oldChildNodeList = zk.getChildren(NAMENODE, true);
        
        // 4.模拟当前namenode一直运行着
        Thread.sleep(Long.MAX_VALUE);
    }
    
    /**
     * 是用来获取两个集合中不同的节点
     * 方法:遍历大的集合,然后去小集合中去匹配
     * 返回大集合中存在而小集合中不存在的元素
     */
    public static String getChangeNode(List<String> oldChildNodeList, List<String> newChildNodeList){
        
        // 默认:增加了一个节点
        List<String> big = newChildNodeList;
        List<String> small = oldChildNodeList;
        
        if(newChildNodeList.size() < oldChildNodeList.size()){
            big = oldChildNodeList;
            small = newChildNodeList;
        }
        
        for(String val : big){
            if(!small.contains(val)){
                return val;
            }
        }
        
        return null;
    }
}

4. 测试

(1) 运行NameNode.java

(2) 运行DataNode.java

DataNode控制台输出:

DataNode datanode01 已上线...

NameNode控制台输出:

DataNode datanode01 已上线...

(3) 把DataNode代码中的

private static final String DATANODE = "datanode01";

改为:

private static final String DATANODE = "datanode02";

然后再运行一次DataNode.java

新的DataNode的控制台输出:

DataNode datanode02 已上线...

NameNode的控制台更新为:

DataNode datanode01 已上线...
DataNode datanode02 已上线...

(4) 把DataNode代码中的

private static final String DATANODE = "datanode02";

改为:

private static final String DATANODE = "datanode03";

然后再运行一次DataNode.java

新的DataNode的控制台输出:

DataNode datanode03 已上线...

NameNode的控制台更新为:

DataNode datanode01 已上线...
DataNode datanode02 已上线...
DataNode datanode03 已上线...

(5) 手动关闭一个DataNode程序(比如datanode03) NameNode的控制台更新为:

DataNode datanode01 已上线...
DataNode datanode02 已上线...
DataNode datanode03 已上线...
DataNode datanode03 已下线...

(6) 再手动关闭一个DataNode程序(比如datanode02) NameNode的控制台更新为:

DataNode datanode01 已上线...
DataNode datanode02 已上线...
DataNode datanode03 已上线...
DataNode datanode03 已下线...
DataNode datanode02 已下线...

至此,我们已经模拟实现了服务器上下线的动态感知!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • [算法题] 使用数组实现栈和队列

    CoderJed
  • 016.Elasticsearch文档管理操作

    当向一个不存在的index中添加document时,可以自动创建索引,也可以根据传入的数据自动创建mapping,ES也会自动对这些文档进行倒排索引

    CoderJed
  • HBase Java API 03:HBase与MapReduce整合

    编写MapReduce程序,把"student"表中"info"列族下的"name"那一列抽取出来,存入新HBase表"student_extract"中,要求...

    CoderJed
  • 贪心算法(四)——最小代价生成树

    问题描述 n个村庄间架设通信线路,每个村庄间的距离不同,如何架设最节省开销? 这个问题中,村庄可以抽象成节点,村庄之间的距离抽象成带权值的边,要求最节约...

    大闲人柴毛毛
  • 基于node+socket.io+redis的多房间多进程聊天室

    消息实时推送,指的是将消息实时地推送到浏览器,用户不需要刷新浏览器就可以实时获取最新的消息,实时聊天室的技术原理也是如此。传统的Web站点为了实现推送技术,所用...

    IMWeb前端团队
  • 基于node+socket.io+redis的多房间多进程聊天室

    本文作者:IMWeb jaychen 原文出处:IMWeb社区 未经同意,禁止转载 ? 一、相关技术介绍: 消息实时推送,指的是将消息实时地推送到浏览...

    IMWeb前端团队
  • hadoop2.7.3源码解析之hdfs删除文件全流程分析h

    客户端通过ClientProtocol.delete(String, boolean)方法来删除文件,最终实现是NameNodeRpcServer.delete...

    大数据技术与应用实战
  • 网易新闻《娱乐圈画传》H5的动画技巧

    今天看到一个非常喜欢的H5,又是网易出品的!于是,我忍不住去研究了他的实现方式,有3个值得我们学习的地方,分别是逐帧动画,多种变换叠加的css动画,还有最亮的:...

    mixlab
  • 移植一个实时OS很难?那就手把手教你如何快速移植一个RT-Thread Nano吧!

    最近在学习RT-Thread的使用,同时也相当于在拿它评估做产品的软件开发周期,最终学习的目的也就是希望能在未来的项目上用起来,STM32CubeMX其实已经支...

    morixinguan
  • 好好编程-物流项目13【登录认证-shiro实现】

      我们已经完成了用户的CRUD操作。本文我们来介绍下基于Shiro的登录认证操作。

    用户4919348

扫码关注云+社区

领取腾讯云代金券