专栏首页码匠的流水账聊聊hazelcast的PhiAccrualFailureDetector

聊聊hazelcast的PhiAccrualFailureDetector

本文主要研究一下hazelcast的PhiAccrualFailureDetector

FailureDetector

hazelcast-3.12-sources.jar!/com/hazelcast/internal/cluster/fd/FailureDetector.java

/**
 * Failure detector tracks heartbeats of a member and decides liveness/availability of the member.
 */
public interface FailureDetector {

    /**
     * Notifies this failure detector about received heartbeat message from the tracked member.
     *
     * @param timestamp timestamp of heartbeat message in milliseconds
     */
    void heartbeat(long timestamp);

    /**
     * Returns true if the tracked member is considered as alive/available.
     * @param timestamp timestamp in milliseconds
     * @return true if the member is alive
     */
    boolean isAlive(long timestamp);

    /**
     * Returns the last heartbeat timestamp for the tracked member.
     * @return heartbeat timestamp in milliseconds
     */
    long lastHeartbeat();

    /**
     * Returns suspicion level about the tracked member. Returned value is mostly implementation dependent.
     * <code>0</code> indicates no suspicion at all.
     * @param timestamp timestamp in milliseconds
     * @return suspicion level
     */
    double suspicionLevel(long timestamp);
}
  • FailureDetector接口定义了heartbeat、isAlive、lastHeartbeat、suspicionLevel方法

PhiAccrualFailureDetector

hazelcast-3.12-sources.jar!/com/hazelcast/internal/cluster/fd/PhiAccrualFailureDetector.java

/**
 * Port of Akka's PhiAccrualFailureDetector.scala
 * <p>
 * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper.
 * <p>
 * The suspicion level of failure is given by a value called φ (phi).
 * The basic idea of the φ failure detector is to express the value of φ on a scale that
 * is dynamically adjusted to reflect current network conditions. A configurable
 * threshold is used to decide if <code>φ</code> is considered to be a failure.
 * <p>
 * The value of <code>φ</code> is calculated as:
 * <p>
 * <code>
 * <pre>
 * φ = -log10(1 - F(timeSinceLastHeartbeat)
 * </pre>
 * </code>
 * where F is the cumulative distribution function of a normal distribution with mean
 * and standard deviation estimated from historical heartbeat inter-arrival times.
 */
public class PhiAccrualFailureDetector implements FailureDetector {

    static final long NO_HEARTBEAT_TIMESTAMP = -1;

    private final double threshold;
    private final double minStdDeviationMillis;
    private final long acceptableHeartbeatPauseMillis;

    private final HeartbeatHistory heartbeatHistory;
    private volatile long lastHeartbeatMillis = NO_HEARTBEAT_TIMESTAMP;

    /**
     * @param threshold                      A low threshold is prone to generate many wrong suspicions but ensures
     *                                       a quick detection in the event of a real crash. Conversely, a high threshold
     *                                       generates fewer mistakes but needs more time to detect actual crashes
     * @param maxSampleSize                  Number of samples to use for calculation of mean and standard deviation of
     *                                       inter-arrival times.
     * @param minStdDeviationMillis          Minimum standard deviation to use for the normal distribution used when
     *                                       calculating phi. Too low standard deviation might result in too much sensitivity
     *                                       for sudden, but normal, deviations in heartbeat inter arrival times.
     * @param acceptableHeartbeatPauseMillis Duration corresponding to number of potentially lost/delayed
     *                                       heartbeats that will be accepted before considering it to be an anomaly.
     *                                       This margin is important to be able to survive sudden, occasional, pauses
     *                                       in heartbeat arrivals, due to for example garbage collect or network drop.
     * @param firstHeartbeatEstimateMillis   Bootstrap the stats with heartbeats that corresponds to this duration,
     *                                       with a with rather high standard deviation (since environment is unknown
     *                                       in the beginning)
     */
    public PhiAccrualFailureDetector(double threshold, int maxSampleSize, double minStdDeviationMillis,
            long acceptableHeartbeatPauseMillis, long firstHeartbeatEstimateMillis) {

        this.threshold = checkPositive(threshold, "Threshold must be positive: " + threshold);
        this.minStdDeviationMillis = checkPositive(minStdDeviationMillis, "Minimum standard deviation must be positive: "
                + minStdDeviationMillis);

        this.acceptableHeartbeatPauseMillis = checkNotNegative(acceptableHeartbeatPauseMillis,
                "Acceptable heartbeat pause millis must be >= 0: " + acceptableHeartbeatPauseMillis);

        checkPositive(firstHeartbeatEstimateMillis, "First heartbeat value must be > 0: " + firstHeartbeatEstimateMillis);

        heartbeatHistory = new HeartbeatHistory(maxSampleSize);
        firstHeartbeat(firstHeartbeatEstimateMillis);
    }

    // guess statistics for first heartbeat,
    // important so that connections with only one heartbeat becomes unavailable
    // bootstrap with 2 entries with rather high standard deviation
    @SuppressWarnings("checkstyle:magicnumber")
    private void firstHeartbeat(long firstHeartbeatEstimateMillis) {
        long stdDeviationMillis = firstHeartbeatEstimateMillis / 4;
        heartbeatHistory.add(firstHeartbeatEstimateMillis - stdDeviationMillis);
        heartbeatHistory.add(firstHeartbeatEstimateMillis + stdDeviationMillis);
    }

    private double ensureValidStdDeviation(double stdDeviationMillis) {
        return Math.max(stdDeviationMillis, minStdDeviationMillis);
    }

    /**
     * The suspicion level of the accrual failure detector.
     *
     * If a connection does not have any records in failure detector then it is
     * considered healthy.
     */
    private double phi(long timestampMillis) {
        long timeDiffMillis;
        double meanMillis;
        double stdDeviationMillis;

        synchronized (heartbeatHistory) {
            long lastTimestampMillis = lastHeartbeatMillis;
            if (lastTimestampMillis == NO_HEARTBEAT_TIMESTAMP) {
                return 0.0;
            }

            timeDiffMillis = timestampMillis - lastTimestampMillis;
            meanMillis = heartbeatHistory.mean();
            stdDeviationMillis = ensureValidStdDeviation(heartbeatHistory.stdDeviation());
        }

        return phi(timeDiffMillis, meanMillis + acceptableHeartbeatPauseMillis, stdDeviationMillis);
    }

    /**
     * Calculation of phi, derived from the Cumulative distribution function for
     * N(mean, stdDeviation) normal distribution, given by
     * 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))
     * where y = (x - mean) / standard_deviation
     * This is an approximation defined in β Mathematics Handbook (Logistic approximation).
     * Error is 0.00014 at +- 3.16
     * The calculated value is equivalent to -log10(1 - CDF(y))
     */
    @SuppressWarnings("checkstyle:magicnumber")
    private static double phi(long timeDiffMillis, double meanMillis, double stdDeviationMillis) {
        double y = (timeDiffMillis - meanMillis) / stdDeviationMillis;
        double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
        if (timeDiffMillis > meanMillis) {
            return -Math.log10(e / (1.0 + e));
        } else {
            return -Math.log10(1.0 - 1.0 / (1.0 + e));
        }
    }

    @Override
    public boolean isAlive(long timestampMillis) {
        double phi = phi(timestampMillis);
        return phi < threshold;
    }

    @Override
    public void heartbeat(long timestampMillis) {
        synchronized (heartbeatHistory) {
            long lastTimestampMillis = getAndSetLastHeartbeat(timestampMillis);
            if (lastTimestampMillis == NO_HEARTBEAT_TIMESTAMP) {
                return;
            }

            if (isAlive(timestampMillis)) {
                heartbeatHistory.add(timestampMillis - lastTimestampMillis);
            }
        }
    }

    private long getAndSetLastHeartbeat(long timestampMillis) {
        long lastTimestampMillis = lastHeartbeatMillis;
        lastHeartbeatMillis = timestampMillis;
        return lastTimestampMillis;
    }

    @Override
    public long lastHeartbeat() {
        return lastHeartbeatMillis;
    }

    @Override
    public double suspicionLevel(long timestamp) {
        return phi(timestamp);
    }

    /**
     * Holds the heartbeat statistics for a specific member.
     * It is capped by the number of samples specified in `maxSampleSize`.
     *
     * The stats (mean, variance, stdDeviation) are not defined for
     * for empty HeartbeatHistory, i.e. throws ArithmeticException.
     */
    private static class HeartbeatHistory {
        private final int maxSampleSize;
        private final LinkedList<Long> intervals = new LinkedList<Long>();
        private long intervalSum;
        private long squaredIntervalSum;

        HeartbeatHistory(int maxSampleSize) {
            if (maxSampleSize < 1) {
                throw new IllegalArgumentException("Sample size must be >= 1 : " + maxSampleSize);
            }
            this.maxSampleSize = maxSampleSize;
        }

        double mean() {
            return (double) intervalSum / intervals.size();
        }

        double variance() {
            double mean = mean();
            return ((double) squaredIntervalSum / intervals.size()) - (mean * mean);
        }

        double stdDeviation() {
            return Math.sqrt(variance());
        }

        void add(long interval) {
            if (intervals.size() >= maxSampleSize) {
                dropOldest();
            }
            intervals.add(interval);
            intervalSum += interval;
            squaredIntervalSum += pow2(interval);
        }

        private void dropOldest() {
            long dropped = intervals.pollFirst();
            intervalSum -= dropped;
            squaredIntervalSum -= pow2(dropped);
        }

        private static long pow2(long x) {
            return x * x;
        }
    }
}
  • PhiAccrualFailureDetector实现了FailureDetector接口,其实现是akka的PhiAccrualFailureDetector.scala的java版本
  • φ (phi)为指定值被认定为failure的suspicion level,其计算公式为φ = -log10(1 - CDF(timeSinceLastHeartbeat),其中CDF函数为normal distribution的cumulative distribution function,即正态分布的累积分布函数
  • phi方法使用了β Mathematics Handbook中定义的Logistic approximation公式来近似计算CDF(y)(Error is 0.00014 at +- 3.16),即CDF(y)=1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y))),其中y = (x - mean) / standard_deviation

CDF(y)在x > mean的时候(e<1)将公式转换为φ = -Math.log10(e / (1.0 + e));在x <= mean的时候(e>=1)的采用公式-Math.log10(1.0 - 1.0 / (1.0 + e)),目前还不清楚为何这样区分计算

  • isAlive方法会计算该timestampMillis的phi值,然后与threshold值(hazelcast中默认为10)判断,小于threshold值才判断为live
  • heartbeat方法会先判断该timestampMillis是否live,是的话,再将timestampMillis - lastTimestampMillis值添加到heartbeatHistory
  • 这里的实现增加了acceptableHeartbeatPauseMillis参数,即在最后计算phi值时传入的meanMillis为meanMillis + acceptableHeartbeatPauseMillis

小结

  • FailureDetector接口定义了heartbeat、isAlive、lastHeartbeat、suspicionLevel方法;PhiAccrualFailureDetector实现了FailureDetector接口,其实现是akka的PhiAccrualFailureDetector.scala的java版本
  • φ (phi)为指定值被认定为failure的suspicion level,其计算公式为φ = -log10(1 - CDF(timeSinceLastHeartbeat),其中CDF函数为normal distribution的cumulative distribution function,即正态分布的累积分布函数
  • akka的实现中,其phi方法使用了β Mathematics Handbook中定义的Logistic approximation公式来近似计算CDF(y)(Error is 0.00014 at +- 3.16),即CDF(y)=1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y))),其中y = (x - mean) / standard_deviation;另外也增加了acceptableHeartbeatPauseMillis参数,即在最后计算phi值时传入的meanMillis为meanMillis + acceptableHeartbeatPauseMillis
  • isAlive方法会计算该timestampMillis的phi值,然后与threshold值(hazelcast中默认为10)判断,小于threshold值才判断为live
  • heartbeat方法会先判断该timestampMillis是否live,是的话,再将timestampMillis - lastTimestampMillis值添加到heartbeatHistory

doc

  • PhiAccrualFailureDetector.scala
  • PhiAccrualFailureDetector.java
  • A logistic approximation to the cumulative normal distribution - Core

本文分享自微信公众号 - 码匠的流水账(geek_luandun)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-04-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 从零开始构建Flink开发项目-Scala版

    今天要做一个Flink的测试,因此需要创建一个简单的Flink项目,于是找到了下面这种方式来创建一个Flink启动项目。

    kongxx
  • 看了很多技术书,为啥仍然写不出项目?

    这大概是还在读书的同学最大的困惑了。自己明明看了很多书,感觉不到自己的进步,很有挫败感。计算机科学是一门实践的科学,你发现你看了《现代操作系统》,《CSAPP》...

    Leetcode名企之路
  • 大数据技术之_19_Spark学习_06_Spark 源码解析小结

    1、spark 一开始使用 akka 作为网络通信框架,spark 2.X 版本以后完全抛弃 akka,而使用 netty 作为新的网络通信框架。 最主要原因...

    黑泽君
  • c++ 开发中利用yaml-cpp读写yaml配置文件

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/briblue/article/details/895...

    Frank909
  • 使用Akka实现并发

    所以我从一个简单的Java程序开始,运行一个while循环直到EOF,然后进行JDBC调用来存储值。这是需要花一个小时才完成了,但后来我意识到程序的运行时比创建...

    银河1号
  • 大数据之脚踏实地学16--Scala列表、元组与映射

    在上一期的《大数据之脚踏实地学15--Scala的数组操作》分享中,介绍了Scala的数组相关知识,借助于Array函数可以构造定长数组(即数组一旦定义好长度,...

    1480
  • Kafka 的 Lag 计算误区及正确实现

    消息堆积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等功能正是得益于消息中间件的消息堆积能力。然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰...

    zhisheng
  • Mac搭建Spark环境

    版权声明:本文为博主-姜兴琪原创文章,未经博主允许不得转载。 https://blog.csdn.net/jxq0816/article/details...

    week
  • 大数据之脚踏实地学15--Scala的数组操作

    Scala中的数组是一种非常重要的数据结构,它是用来存储同类型元素的容器,除此Scala还有其他存储数据的容器,例如元组、列表、映射等。在本期的内容分享中,我们...

    1480

扫码关注云+社区

领取腾讯云代金券