专栏首页码匠的流水账聊聊scalecube-cluster的FailureDetector
原创

聊聊scalecube-cluster的FailureDetector

本文主要研究一下scalecube-cluster的FailureDetector

FailureDetector

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetector.java

/**
 * Failure Detector component responsible for monitoring availability of other members in the
 * cluster. This interface is supposed to be used internally as part cluster membership protocol. It
 * doesn't specify that particular node is failed, but just provide information that either it is
 * suspected or trusted at current moment of time. So it is up to cluster membership or other top
 * level component to define when suspected member is actually failed.
 */
public interface FailureDetector {
​
  /**
   * Starts running failure detection algorithm. After started it begins to receive and send ping
   * messages.
   */
  void start();
​
  /** Stops running failure detection algorithm and releases occupied resources. */
  void stop();
​
  /** Listens for results of ping checks (ALIVE/SUSPECT) done periodically by failure detector. */
  Flux<FailureDetectorEvent> listen();
}
  • FailureDetector定义了start、stop、listen方法

FailureDetectorImpl

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {
​
  private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorImpl.class);
​
  // Qualifiers
​
  public static final String PING = "sc/fdetector/ping";
  public static final String PING_REQ = "sc/fdetector/pingReq";
  public static final String PING_ACK = "sc/fdetector/pingAck";
​
  // Injected
​
  private final Member localMember;
  private final Transport transport;
  private final FailureDetectorConfig config;
  private final CorrelationIdGenerator cidGenerator;
​
  // State
​
  private long currentPeriod = 0;
  private List<Member> pingMembers = new ArrayList<>();
  private int pingMemberIndex = 0; // index for sequential ping member selection
​
  // Disposables
​
  private final Disposable.Composite actionsDisposables = Disposables.composite();
​
  // Subject
  private final FluxProcessor<FailureDetectorEvent, FailureDetectorEvent> subject =
      DirectProcessor.<FailureDetectorEvent>create().serialize();
​
  private final FluxSink<FailureDetectorEvent> sink = subject.sink();
​
  // Scheduled
  private final Scheduler scheduler;
​
  /**
   * Creates new instance of failure detector with given transport and settings.
   *
   * @param localMember local cluster member
   * @param transport cluster transport
   * @param membershipProcessor membership event processor
   * @param config failure detector settings
   * @param scheduler scheduler
   * @param cidGenerator correlationId generator
   */
  public FailureDetectorImpl(
      Member localMember,
      Transport transport,
      Flux<MembershipEvent> membershipProcessor,
      FailureDetectorConfig config,
      Scheduler scheduler,
      CorrelationIdGenerator cidGenerator) {
​
    this.localMember = Objects.requireNonNull(localMember);
    this.transport = Objects.requireNonNull(transport);
    this.config = Objects.requireNonNull(config);
    this.scheduler = Objects.requireNonNull(scheduler);
    this.cidGenerator = Objects.requireNonNull(cidGenerator);
​
    // Subscribe
    actionsDisposables.addAll(
        Arrays.asList(
            membershipProcessor //
                .publishOn(scheduler)
                .subscribe(this::onMemberEvent, this::onError),
            transport
                .listen() //
                .publishOn(scheduler)
                .subscribe(this::onMessage, this::onError)));
  }
​
  @Override
  public void start() {
    actionsDisposables.add(
        scheduler.schedulePeriodically(
            this::doPing,
            config.getPingInterval(),
            config.getPingInterval(),
            TimeUnit.MILLISECONDS));
  }
​
  @Override
  public void stop() {
    // Stop accepting requests and sending pings
    actionsDisposables.dispose();
​
    // Stop publishing events
    sink.complete();
  }
​
  @Override
  public Flux<FailureDetectorEvent> listen() {
    return subject.onBackpressureBuffer();
  }
​
  //......
​
}
  • FailureDetectorImpl实现了FailureDetector接口;它定义了3个message的qualifier分别是PING、PING_REQ、PING_ACK;同时也定义了pingMembers列表
  • FailureDetectorImpl的构造器订阅了membershipProcessor触发onMemberEvent方法,订阅了transport.listen()触发onMessage方法
  • start方法通过scheduler.schedulePeriodically注册了doPing任务,每隔pingInterval执行,默认是5000ms;stop方法会执行actionsDisposables.dispose()及sink.complete();listen()则返回subject.onBackpressureBuffer()

onMemberEvent

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {
  //......
​
  private void onMemberEvent(MembershipEvent event) {
    Member member = event.member();
    if (event.isRemoved()) {
      pingMembers.remove(member);
    }
    if (event.isAdded()) {
      // insert member into random positions
      int size = pingMembers.size();
      int index = size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0;
      pingMembers.add(index, member);
    }
  }
​
  //......
}
  • onMemberEvent会根据MembershipEvent来移除或使用随机的index添加member到pingMembers中

onMessage

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {
  //......
​
  private void onMessage(Message message) {
    if (isPing(message)) {
      onPing(message);
    } else if (isPingReq(message)) {
      onPingReq(message);
    } else if (isTransitPingAck(message)) {
      onTransitPingAck(message);
    }
  }
​
  private boolean isPing(Message message) {
    return PING.equals(message.qualifier());
  }
​
  private boolean isPingReq(Message message) {
    return PING_REQ.equals(message.qualifier());
  }
​
  private boolean isTransitPingAck(Message message) {
    return PING_ACK.equals(message.qualifier())
        && message.<PingData>data().getOriginalIssuer() != null;
  }
​
  /** Listens to PING message and answers with ACK. */
  private void onPing(Message message) {
    long period = this.currentPeriod;
    LOGGER.trace("Received Ping[{}]", period);
    PingData data = message.data();
    if (!data.getTo().id().equals(localMember.id())) {
      LOGGER.warn(
          "Received Ping[{}] to {}, but local member is {}", period, data.getTo(), localMember);
      return;
    }
    String correlationId = message.correlationId();
    Message ackMessage =
        Message.withData(data)
            .qualifier(PING_ACK)
            .correlationId(correlationId)
            .sender(localMember.address())
            .build();
    Address address = data.getFrom().address();
    LOGGER.trace("Send PingAck[{}] to {}", period, address);
    transport
        .send(address, ackMessage)
        .subscribe(
            null,
            ex ->
                LOGGER.debug(
                    "Failed to send PingAck[{}] to {}, cause: {}", period, address, ex.toString()));
  }
​
  /** Listens to PING_REQ message and sends PING to requested cluster member. */
  private void onPingReq(Message message) {
    long period = this.currentPeriod;
    LOGGER.trace("Received PingReq[{}]", period);
    PingData data = message.data();
    Member target = data.getTo();
    Member originalIssuer = data.getFrom();
    String correlationId = message.correlationId();
    PingData pingReqData = new PingData(localMember, target, originalIssuer);
    Message pingMessage =
        Message.withData(pingReqData)
            .qualifier(PING)
            .correlationId(correlationId)
            .sender(localMember.address())
            .build();
    Address address = target.address();
    LOGGER.trace("Send transit Ping[{}] to {}", period, address);
    transport
        .send(address, pingMessage)
        .subscribe(
            null,
            ex ->
                LOGGER.debug(
                    "Failed to send transit Ping[{}] to {}, cause: {}",
                    period,
                    address,
                    ex.toString()));
  }
​
  /**
   * Listens to ACK with message containing ORIGINAL_ISSUER then converts message to plain ACK and
   * sends it to ORIGINAL_ISSUER.
   */
  private void onTransitPingAck(Message message) {
    long period = this.currentPeriod;
    LOGGER.trace("Received transit PingAck[{}]", period);
    PingData data = message.data();
    Member target = data.getOriginalIssuer();
    String correlationId = message.correlationId();
    PingData originalAckData = new PingData(target, data.getTo());
    Message originalAckMessage =
        Message.withData(originalAckData)
            .qualifier(PING_ACK)
            .correlationId(correlationId)
            .sender(localMember.address())
            .build();
    Address address = target.address();
    LOGGER.trace("Resend transit PingAck[{}] to {}", period, address);
    transport
        .send(address, originalAckMessage)
        .subscribe(
            null,
            ex ->
                LOGGER.debug(
                    "Failed to resend transit PingAck[{}] to {}, cause: {}",
                    period,
                    address,
                    ex.toString()));
  }
​
  //......
​
}
  • onMessage方法则根据消息的不同qualifier及originalIssuer信息来判断执行onPing或onPingReq或onTransitPingAck方法;onPing方法会返回PING_ACK消息给sender;onPingReq方法则会发送PING给pingReq指定的member;onTransitPingAck方法则会将pingReq请求返回的ack再转发回给originalIssuer

doPing

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {
  //......
​
  private void doPing() {
    // Increment period counter
    long period = currentPeriod++;
​
    // Select ping member
    Member pingMember = selectPingMember();
    if (pingMember == null) {
      return;
    }
​
    // Send ping
    String cid = cidGenerator.nextCid();
    PingData pingData = new PingData(localMember, pingMember);
    Message pingMsg =
        Message.withData(pingData)
            .qualifier(PING)
            .correlationId(cid)
            .sender(localMember.address())
            .build();
​
    LOGGER.trace("Send Ping[{}] to {}", period, pingMember);
    Address address = pingMember.address();
    transport
        .requestResponse(pingMsg, address)
        .timeout(Duration.ofMillis(config.getPingTimeout()), scheduler)
        .publishOn(scheduler)
        .subscribe(
            message -> {
              LOGGER.trace("Received PingAck[{}] from {}", period, pingMember);
              publishPingResult(period, pingMember, MemberStatus.ALIVE);
            },
            ex -> {
              LOGGER.debug(
                  "Failed to get PingAck[{}] from {} within {} ms",
                  period,
                  pingMember,
                  config.getPingTimeout());
​
              final int timeLeft = config.getPingInterval() - config.getPingTimeout();
              final List<Member> pingReqMembers = selectPingReqMembers(pingMember);
​
              if (timeLeft <= 0 || pingReqMembers.isEmpty()) {
                LOGGER.trace("No PingReq[{}] occurred", period);
                publishPingResult(period, pingMember, MemberStatus.SUSPECT);
              } else {
                doPingReq(currentPeriod, pingMember, pingReqMembers, cid);
              }
            });
  }
​
  private Member selectPingMember() {
    if (pingMembers.isEmpty()) {
      return null;
    }
    if (pingMemberIndex >= pingMembers.size()) {
      pingMemberIndex = 0;
      Collections.shuffle(pingMembers);
    }
    return pingMembers.get(pingMemberIndex++);
  }
​
  private List<Member> selectPingReqMembers(Member pingMember) {
    if (config.getPingReqMembers() <= 0) {
      return Collections.emptyList();
    }
    List<Member> candidates = new ArrayList<>(pingMembers);
    candidates.remove(pingMember);
    if (candidates.isEmpty()) {
      return Collections.emptyList();
    }
    Collections.shuffle(candidates);
    boolean selectAll = candidates.size() < config.getPingReqMembers();
    return selectAll ? candidates : candidates.subList(0, config.getPingReqMembers());
  }
​
  private void doPingReq(
      long period, final Member pingMember, final List<Member> pingReqMembers, String cid) {
    Message pingReqMsg =
        Message.withData(new PingData(localMember, pingMember))
            .qualifier(PING_REQ)
            .correlationId(cid)
            .sender(localMember.address())
            .build();
    LOGGER.trace("Send PingReq[{}] to {} for {}", period, pingReqMembers, pingMember);
​
    Duration timeout = Duration.ofMillis(config.getPingInterval() - config.getPingTimeout());
    pingReqMembers.forEach(
        member ->
            transport
                .requestResponse(pingReqMsg, member.address())
                .timeout(timeout, scheduler)
                .publishOn(scheduler)
                .subscribe(
                    message -> {
                      LOGGER.trace(
                          "Received transit PingAck[{}] from {} to {}",
                          period,
                          message.sender(),
                          pingMember);
                      publishPingResult(period, pingMember, MemberStatus.ALIVE);
                    },
                    throwable -> {
                      LOGGER.trace(
                          "Timeout getting transit PingAck[{}] from {} to {} within {} ms",
                          period,
                          pingReqMembers,
                          pingMember,
                          timeout);
                      publishPingResult(period, pingMember, MemberStatus.SUSPECT);
                    }));
  }
​
  private void publishPingResult(long period, Member member, MemberStatus status) {
    LOGGER.debug("Member {} detected as {} at [{}]", member, status, period);
    sink.next(new FailureDetectorEvent(member, status));
  }
​
  //......
​
}
  • doPing方法首先递增currentPeriod,然后通过selectPingMember随机选择pingMember,之后构造pingData,然后通过transport.requestResponse发送请求,请求成功时执行publishPingResult,异常情况下则通过selectPingReqMembers随机选择pingReqMembers,在config.getPingInterval() - config.getPingTimeout()小于等于0或者pingReqMembers为空时则执行publishPingResult(period, pingMember, MemberStatus.SUSPECT),否则进行doPingReq
  • selectPingMember方法在pingMemberIndex大于等于pingMembers.size()的时候会重置该index为0,并执行Collections.shuffle(pingMembers),之后递增pingMemberIndex;selectPingReqMembers方法则基于pingMembers创建新的list然后移除pingMember得到candidates,之后进行Collections.shuffle(candidates),然后根据config.getPingReqMembers()来subList该candidates列表得到pingReqMembers
  • onPingReq方法则遍历pingReqMembers对其发送pingReqMsg,当接收到transit PingAck时则执行publishPingResult(period, pingMember, MemberStatus.ALIVE),出现异常时执行publishPingResult(period, pingMember, MemberStatus.SUSPECT);publishPingResult方法往sink里头放入FailureDetectorEvent事件

小结

  • FailureDetector定义了start、stop、listen方法;FailureDetectorImpl实现了FailureDetector接口;它定义了3个message的qualifier分别是PING、PING_REQ、PING_ACK;同时也定义了pingMembers列表;FailureDetectorImpl的构造器订阅了membershipProcessor触发onMemberEvent方法,订阅了transport.listen()触发onMessage方法;start方法通过scheduler.schedulePeriodically注册了doPing任务,每隔pingInterval执行,默认是5000ms;stop方法会执行actionsDisposables.dispose()及sink.complete();listen()则返回subject.onBackpressureBuffer()
  • onMemberEvent会根据MembershipEvent来移除或使用随机的index添加member到pingMembers中;onMessage方法则根据消息的不同qualifier及originalIssuer信息来判断执行onPing或onPingReq或onTransitPingAck方法;onPing方法会返回PING_ACK消息给sender;onPingReq方法则会发送PING给pingReq指定的member;onTransitPingAck方法则会将pingReq请求返回的ack再转发回给originalIssuer
  • doPing方法首先递增currentPeriod,然后通过selectPingMember随机选择pingMember,之后构造pingData,然后通过transport.requestResponse发送请求,请求成功时执行publishPingResult,异常情况下则通过selectPingReqMembers随机选择pingReqMembers,在config.getPingInterval() - config.getPingTimeout()小于等于0或者pingReqMembers为空时则执行publishPingResult(period, pingMember, MemberStatus.SUSPECT),否则进行doPingReq

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊scalecube-cluster的FailureDetector

    本文主要研究一下scalecube-cluster的FailureDetector

    codecraft
  • 聊聊openjdk的BufferPoolMXBean

    java.management/java/lang/management/PlatformManagedObject.java

    codecraft
  • 聊聊openjdk的BufferPoolMXBean

    java.management/java/lang/management/PlatformManagedObject.java

    codecraft
  • 聊聊scalecube-cluster的FailureDetector

    本文主要研究一下scalecube-cluster的FailureDetector

    codecraft
  • 通过SvcUtil.exe生成客户端代码和配置

    WCF服务调用通过两种常用的方式:一种是借助代码生成工具SvcUtil.exe或者添加服务引用的方式,一种是通过ChannelFactory直接创建服务代理对象...

    跟着阿笨一起玩NET
  • 谈谈 JavaScript 中的 声明提前(hoisting)

    版权声明:本文为博主原创文章,欢迎转载,转载请注明出处。 https://blog.csdn...

    FEWY
  • 通过Service访问应用 (1)

    通过之前的操作,应用部署完成了,我们的Demo网站已经成功启动了,那么如何访问网站呢?

    心莱科技雪雁
  • 深入解析新型加密货币挖矿恶意软件ZombieBoy

    延续了2018年加密货币挖矿恶意软件的趋势,我发现了另一种类似于5月初发现的“MassMine”的挖矿恶意软件。我把这个家族称为ZombieBoy,因为它使用了...

    FB客服
  • 还在用工具激活系统?小心被当做矿机!

    近日,深信服EDR安全团队捕获到一个伪装成激活软件WindowsLoader的病毒样本。经分析,该样本并没有激活功能,其主要功能是安装广告软件以及挖矿程序。

    FB客服
  • 《Java小游戏实现》:贪吃蛇

    现在是资源共享的时代,同样也是知识分享的时代,如果你觉得本文能学到知识,请把知识与别人分享。

    互扯程序

扫码关注云+社区

领取腾讯云代金券