专栏首页码匠的流水账聊聊scalecube-cluster的MembershipProtocol
原创

聊聊scalecube-cluster的MembershipProtocol

本文主要研究一下scalecube-cluster的MembershipProtocol

MembershipProtocol

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java

/**
 * Cluster Membership Protocol component responsible for managing information about existing members
 * of the cluster.
 */
public interface MembershipProtocol {
​
  /**
   * Starts running cluster membership protocol. After started it begins to receive and send cluster
   * membership messages
   */
  Mono<Void> start();
​
  /** Stops running cluster membership protocol and releases occupied resources. */
  void stop();
​
  /** Listen changes in cluster membership. */
  Flux<MembershipEvent> listen();
​
  /**
   * Returns list of all members of the joined cluster. This will include all cluster members
   * including local member.
   *
   * @return all members in the cluster (including local one)
   */
  Collection<Member> members();
​
  /**
   * Returns list of all cluster members of the joined cluster excluding local member.
   *
   * @return all members in the cluster (excluding local one)
   */
  Collection<Member> otherMembers();
​
  /**
   * Returns local cluster member which corresponds to this cluster instance.
   *
   * @return local member
   */
  Member member();
​
  /**
   * Returns cluster member with given id or null if no member with such id exists at joined
   * cluster.
   *
   * @return member by id
   */
  Optional<Member> member(String id);
​
  /**
   * Returns cluster member by given address or null if no member with such address exists at joined
   * cluster.
   *
   * @return member by address
   */
  Optional<Member> member(Address address);
}
  • MembershipProtocol接口定义了start、stop、listen、members、otherMembers、member方法

MembershipProtocolImpl

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
​
  private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocolImpl.class);
​
  private enum MembershipUpdateReason {
    FAILURE_DETECTOR_EVENT,
    MEMBERSHIP_GOSSIP,
    SYNC,
    INITIAL_SYNC,
    SUSPICION_TIMEOUT
  }
​
  // Qualifiers
​
  public static final String SYNC = "sc/membership/sync";
  public static final String SYNC_ACK = "sc/membership/syncAck";
  public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";
​
  private final Member localMember;
​
  // Injected
​
  private final Transport transport;
  private final MembershipConfig config;
  private final List<Address> seedMembers;
  private final FailureDetector failureDetector;
  private final GossipProtocol gossipProtocol;
  private final MetadataStore metadataStore;
  private final CorrelationIdGenerator cidGenerator;
​
  // State
​
  private final Map<String, MembershipRecord> membershipTable = new HashMap<>();
  private final Map<String, Member> members = new HashMap<>();
​
  // Subject
​
  private final FluxProcessor<MembershipEvent, MembershipEvent> subject =
      DirectProcessor.<MembershipEvent>create().serialize();
  private final FluxSink<MembershipEvent> sink = subject.sink();
​
  // Disposables
  private final Disposable.Composite actionsDisposables = Disposables.composite();
​
  // Scheduled
  private final Scheduler scheduler;
  private final Map<String, Disposable> suspicionTimeoutTasks = new HashMap<>();
​
  /**
   * Creates new instantiates of cluster membership protocol with given transport and config.
   *
   * @param localMember local cluster member
   * @param transport cluster transport
   * @param failureDetector failure detector
   * @param gossipProtocol gossip protocol
   * @param metadataStore metadata store
   * @param config membership config parameters
   * @param scheduler scheduler
   * @param cidGenerator correlation id generator
   */
  public MembershipProtocolImpl(
      Member localMember,
      Transport transport,
      FailureDetector failureDetector,
      GossipProtocol gossipProtocol,
      MetadataStore metadataStore,
      MembershipConfig config,
      Scheduler scheduler,
      CorrelationIdGenerator cidGenerator) {
​
    this.transport = Objects.requireNonNull(transport);
    this.config = Objects.requireNonNull(config);
    this.failureDetector = Objects.requireNonNull(failureDetector);
    this.gossipProtocol = Objects.requireNonNull(gossipProtocol);
    this.metadataStore = Objects.requireNonNull(metadataStore);
    this.localMember = Objects.requireNonNull(localMember);
    this.scheduler = Objects.requireNonNull(scheduler);
    this.cidGenerator = Objects.requireNonNull(cidGenerator);
​
    // Prepare seeds
    seedMembers = cleanUpSeedMembers(config.getSeedMembers());
​
    // Init membership table with local member record
    membershipTable.put(localMember.id(), new MembershipRecord(localMember, ALIVE, 0));
​
    // fill in the table of members with local member
    members.put(localMember.id(), localMember);
​
    actionsDisposables.addAll(
        Arrays.asList(
            // Listen to incoming SYNC and SYNC ACK requests from other members
            transport
                .listen() //
                .publishOn(scheduler)
                .subscribe(this::onMessage, this::onError),
​
            // Listen to events from failure detector
            failureDetector
                .listen()
                .publishOn(scheduler)
                .subscribe(this::onFailureDetectorEvent, this::onError),
​
            // Listen to membership gossips
            gossipProtocol
                .listen()
                .publishOn(scheduler)
                .subscribe(this::onMembershipGossip, this::onError)));
  }
​
  @Override
  public Flux<MembershipEvent> listen() {
    return subject.onBackpressureBuffer();
  }
​
  @Override
  public Mono<Void> start() {
    // Make initial sync with all seed members
    return Mono.create(
        sink -> {
          // In case no members at the moment just schedule periodic sync
          if (seedMembers.isEmpty()) {
            schedulePeriodicSync();
            sink.success();
            return;
          }
          // If seed addresses are specified in config - send initial sync to those nodes
          LOGGER.debug("Making initial Sync to all seed members: {}", seedMembers);
​
          //noinspection unchecked
          Mono<Message>[] syncs =
              seedMembers
                  .stream()
                  .map(
                      address -> {
                        String cid = cidGenerator.nextCid();
                        return transport
                            .requestResponse(prepareSyncDataMsg(SYNC, cid), address)
                            .filter(this::checkSyncGroup);
                      })
                  .toArray(Mono[]::new);
​
          // Process initial SyncAck
          Flux.mergeDelayError(syncs.length, syncs)
              .take(1)
              .timeout(Duration.ofMillis(config.getSyncTimeout()), scheduler)
              .publishOn(scheduler)
              .flatMap(message -> onSyncAck(message, true))
              .doFinally(
                  s -> {
                    schedulePeriodicSync();
                    sink.success();
                  })
              .subscribe(
                  null,
                  ex -> LOGGER.info("Exception on initial SyncAck, cause: {}", ex.toString()));
        });
  }
​
  @Override
  public void stop() {
    // Stop accepting requests, events and sending sync
    actionsDisposables.dispose();
​
    // Cancel remove members tasks
    for (String memberId : suspicionTimeoutTasks.keySet()) {
      Disposable future = suspicionTimeoutTasks.get(memberId);
      if (future != null && !future.isDisposed()) {
        future.dispose();
      }
    }
    suspicionTimeoutTasks.clear();
​
    // Stop publishing events
    sink.complete();
  }
​
  @Override
  public Collection<Member> members() {
    return new ArrayList<>(members.values());
  }
​
  @Override
  public Collection<Member> otherMembers() {
    return new ArrayList<>(members.values())
        .stream()
        .filter(member -> !member.equals(localMember))
        .collect(Collectors.toList());
  }
​
  @Override
  public Member member() {
    return localMember;
  }
​
  @Override
  public Optional<Member> member(String id) {
    return Optional.ofNullable(members.get(id));
  }
​
  @Override
  public Optional<Member> member(Address address) {
    return new ArrayList<>(members.values())
        .stream()
        .filter(member -> member.address().equals(address))
        .findFirst();
  }
​
  //......
​
}
  • MembershipProtocolImpl实现了MembershipProtocol接口;它定义了MembershipUpdateReason枚举(FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT)
  • MembershipProtocolImpl的构造器监听了transport.listen()触发onMessage方法;监听了failureDetector.listen()触发onFailureDetectorEvent方法;监听了gossipProtocol.listen()触发onMembershipGossip方法
  • MembershipProtocolImpl的start方法在seedMembers.isEmpty()的时候会执行schedulePeriodicSync方法即每隔syncInterval执行doSync方法;当seedMembers不为空时则遍历seedMembers通过transport.requestResponse发送SYNC,执行成功则触发onSyncAck;stop方法则挨个销毁suspicionTimeoutTasks的future

onMessage

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......
​
  private void onMessage(Message message) {
    if (checkSyncGroup(message)) {
      if (SYNC.equals(message.qualifier())) {
        onSync(message).subscribe(null, this::onError);
      }
      if (SYNC_ACK.equals(message.qualifier())) {
        if (message.correlationId() == null) { // filter out initial sync
          onSyncAck(message, false).subscribe(null, this::onError);
        }
      }
    }
  }
​
  private boolean checkSyncGroup(Message message) {
    if (message.data() instanceof SyncData) {
      SyncData syncData = message.data();
      return config.getSyncGroup().equals(syncData.getSyncGroup());
    }
    return false;
  }
​
  /** Merges incoming SYNC data, merges it and sending back merged data with SYNC_ACK. */
  private Mono<Void> onSync(Message syncMsg) {
    return Mono.defer(
        () -> {
          LOGGER.debug("Received Sync: {}", syncMsg);
          return syncMembership(syncMsg.data(), false)
              .doOnSuccess(
                  avoid -> {
                    Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
                    Address address = syncMsg.sender();
                    transport
                        .send(address, message)
                        .subscribe(
                            null,
                            ex ->
                                LOGGER.debug(
                                    "Failed to send SyncAck: {} to {}, cause: {}",
                                    message,
                                    address,
                                    ex.toString()));
                  });
        });
  }
​
  private Mono<Void> onSyncAck(Message syncAckMsg, boolean onStart) {
    return Mono.defer(
        () -> {
          LOGGER.debug("Received SyncAck: {}", syncAckMsg);
          return syncMembership(syncAckMsg.data(), onStart);
        });
  }
​
  private Mono<Void> syncMembership(SyncData syncData, boolean onStart) {
    return Mono.defer(
        () -> {
          MembershipUpdateReason reason =
              onStart ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC;
          return Mono.whenDelayError(
              syncData
                  .getMembership()
                  .stream()
                  .filter(r1 -> !r1.equals(membershipTable.get(r1.id())))
                  .map(r1 -> updateMembership(r1, reason))
                  .toArray(Mono[]::new));
        });
  }
​
  //......
​
}
  • onMessage方法首先通过checkSyncGroup检查一下是不是该syncGroup的消息;之后根据message.qualifier()是SYNC则执行onSync,是SYNC_ACK则执行onSyncAck
  • onSync方法则执行syncMembership,成功时向sender返回SYNC_ACK信息;onSyncAck方法也是调用syncMembership,只不过没有再向sender返回信息
  • syncMembership方法则根据syncData的membership来挨个执行updateMembership方法

onFailureDetectorEvent

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......
​
  /** Merges FD updates and processes them. */
  private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
    MembershipRecord r0 = membershipTable.get(fdEvent.member().id());
    if (r0 == null) { // member already removed
      return;
    }
    if (r0.status() == fdEvent.status()) { // status not changed
      return;
    }
    LOGGER.debug("Received status change on failure detector event: {}", fdEvent);
    if (fdEvent.status() == ALIVE) {
      // TODO: Consider to make more elegant solution
      // Alive won't override SUSPECT so issue instead extra sync with member to force it spread
      // alive with inc + 1
      Message syncMsg = prepareSyncDataMsg(SYNC, null);
      Address address = fdEvent.member().address();
      transport
          .send(address, syncMsg)
          .subscribe(
              null,
              ex ->
                  LOGGER.debug(
                      "Failed to send {} to {}, cause: {}", syncMsg, address, ex.toString()));
    } else {
      MembershipRecord record =
          new MembershipRecord(r0.member(), fdEvent.status(), r0.incarnation());
      updateMembership(record, MembershipUpdateReason.FAILURE_DETECTOR_EVENT)
          .subscribe(null, this::onError);
    }
  }
​
  //......
​
}
  • onFailureDetectorEvent方法根据FailureDetectorEvent判断该MembershipRecord的状态是否有变化,如果变为ALIVE则往fdEvent.member().address()发送SYNC信息;否则使用MembershipUpdateReason.FAILURE_DETECTOR_EVENT来updateMembership

onMembershipGossip

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......
​
  /** Merges received membership gossip (not spreading gossip further). */
  private void onMembershipGossip(Message message) {
    if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) {
      MembershipRecord record = message.data();
      LOGGER.debug("Received membership gossip: {}", record);
      updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP)
          .subscribe(null, this::onError);
    }
  }
​
  //......
​
}
  • onMembershipGossip方法则针对message.qualifier()为MEMBERSHIP_GOSSIP的消息使用MembershipUpdateReason.MEMBERSHIP_GOSSIP来updateMembership

updateMembership

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......
​
  /**
   * Try to update membership table with the given record.
   *
   * @param r1 new membership record which compares with existing r0 record
   * @param reason indicating the reason for updating membership table
   */
  private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason reason) {
    return Mono.defer(
        () -> {
          Objects.requireNonNull(r1, "Membership record can't be null");
          // Get current record
          MembershipRecord r0 = membershipTable.get(r1.id());
​
          // Check if new record r1 overrides existing membership record r0
          if (!r1.isOverrides(r0)) {
            return Mono.empty();
          }
​
          // If received updated for local member then increase incarnation and spread Alive gossip
          if (r1.member().id().equals(localMember.id())) {
            int currentIncarnation = Math.max(r0.incarnation(), r1.incarnation());
            MembershipRecord r2 =
                new MembershipRecord(localMember, r0.status(), currentIncarnation + 1);
​
            membershipTable.put(localMember.id(), r2);
​
            LOGGER.debug(
                "Local membership record r0: {}, but received r1: {}, "
                    + "spread with increased incarnation r2: {}",
                r0,
                r1,
                r2);
​
            spreadMembershipGossip(r2)
                .subscribe(
                    null,
                    ex -> {
                      // on-op
                    });
            return Mono.empty();
          }
​
          // Update membership
          if (r1.isDead()) {
            membershipTable.remove(r1.id());
          } else {
            membershipTable.put(r1.id(), r1);
          }
​
          // Schedule/cancel suspicion timeout task
          if (r1.isSuspect()) {
            scheduleSuspicionTimeoutTask(r1);
          } else {
            cancelSuspicionTimeoutTask(r1.id());
          }
​
          // Emit membership and regardless of result spread gossip
          return emitMembershipEvent(r0, r1)
              .doFinally(
                  s -> {
                    // Spread gossip (unless already gossiped)
                    if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP
                        && reason != MembershipUpdateReason.INITIAL_SYNC) {
                      spreadMembershipGossip(r1)
                          .subscribe(
                              null,
                              ex -> {
                                // no-op
                              });
                    }
                  });
        });
  }
​
  private Mono<Void> spreadMembershipGossip(MembershipRecord record) {
    return Mono.defer(
        () -> {
          Message msg = Message.withData(record).qualifier(MEMBERSHIP_GOSSIP).build();
          LOGGER.debug("Spead membreship: {} with gossip", msg);
          return gossipProtocol
              .spread(msg)
              .doOnError(
                  ex ->
                      LOGGER.debug(
                          "Failed to spread membership: {} with gossip, cause: {}",
                          msg,
                          ex.toString()))
              .then();
        });
  }
​
  private void scheduleSuspicionTimeoutTask(MembershipRecord record) {
    long suspicionTimeout =
        ClusterMath.suspicionTimeout(
            config.getSuspicionMult(), membershipTable.size(), config.getPingInterval());
    suspicionTimeoutTasks.computeIfAbsent(
        record.id(),
        id ->
            scheduler.schedule(
                () -> onSuspicionTimeout(id), suspicionTimeout, TimeUnit.MILLISECONDS));
  }
​
  private void onSuspicionTimeout(String memberId) {
    suspicionTimeoutTasks.remove(memberId);
    MembershipRecord record = membershipTable.get(memberId);
    if (record != null) {
      LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", record);
      MembershipRecord deadRecord =
          new MembershipRecord(record.member(), DEAD, record.incarnation());
      updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT)
          .subscribe(null, this::onError);
    }
  }
​
  private void cancelSuspicionTimeoutTask(String memberId) {
    Disposable future = suspicionTimeoutTasks.remove(memberId);
    if (future != null && !future.isDisposed()) {
      future.dispose();
    }
  }
​
  private Mono<Void> emitMembershipEvent(MembershipRecord r0, MembershipRecord r1) {
    return Mono.defer(
        () -> {
          final Member member = r1.member();
​
          if (r1.isDead()) {
            members.remove(member.id());
            // removed
            return Mono.fromRunnable(
                () -> {
                  Map<String, String> metadata = metadataStore.removeMetadata(member);
                  sink.next(MembershipEvent.createRemoved(member, metadata));
                });
          }
​
          if (r0 == null && r1.isAlive()) {
            members.put(member.id(), member);
            // added
            return metadataStore
                .fetchMetadata(member)
                .doOnSuccess(
                    metadata -> {
                      metadataStore.updateMetadata(member, metadata);
                      sink.next(MembershipEvent.createAdded(member, metadata));
                    })
                .onErrorResume(TimeoutException.class, e -> Mono.empty())
                .then();
          }
​
          if (r0 != null && r0.incarnation() < r1.incarnation()) {
            // updated
            return metadataStore
                .fetchMetadata(member)
                .doOnSuccess(
                    metadata1 -> {
                      Map<String, String> metadata0 =
                          metadataStore.updateMetadata(member, metadata1);
                      sink.next(MembershipEvent.createUpdated(member, metadata0, metadata1));
                    })
                .onErrorResume(TimeoutException.class, e -> Mono.empty())
                .then();
          }
​
          return Mono.empty();
        });
  }
​
  //......
​
}
  • updateMembership会对比传入的MembershipRecord与本地的localMember,如果是需要更新localMember则执行spreadMembershipGossip,之后根据MembershipRecord的状态做不同处理,比如isDead则从membershipTable移除,比如isSuspect则执行scheduleSuspicionTimeoutTask,否则执行cancelSuspicionTimeoutTask,最后执行emitMembershipEvent及spreadMembershipGossip
  • scheduleSuspicionTimeoutTask方法计算suspicionTimeout然后注册一个SuspicionTimeout的延时任务如果suspicionTimeoutTasks没有该record.id()的task的话;onSuspicionTimeout首先将该task从suspicionTimeoutTasks移除,然后使用MembershipUpdateReason.SUSPICION_TIMEOUT来updateMembership;cancelSuspicionTimeoutTask方法也是将该task从suspicionTimeoutTasks移除,并dispose该future
  • emitMembershipEvent方法这里主要是更新member在metadataStore的Metadata,如果是isDead则执行metadataStore.removeMetadata(member),其他的则看情况执行metadataStore.updateMetadata(member, metadata)

schedulePeriodicSync

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......
​
  private void schedulePeriodicSync() {
    int syncInterval = config.getSyncInterval();
    actionsDisposables.add(
        scheduler.schedulePeriodically(
            this::doSync, syncInterval, syncInterval, TimeUnit.MILLISECONDS));
  }
​
  private void doSync() {
    Optional<Address> addressOptional = selectSyncAddress();
    if (!addressOptional.isPresent()) {
      return;
    }
​
    Address address = addressOptional.get();
    Message message = prepareSyncDataMsg(SYNC, null);
    LOGGER.debug("Send Sync: {} to {}", message, address);
    transport
        .send(address, message)
        .subscribe(
            null,
            ex ->
                LOGGER.debug(
                    "Failed to send Sync: {} to {}, cause: {}", message, address, ex.toString()));
  }
​
  private Optional<Address> selectSyncAddress() {
    List<Address> addresses =
        Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address))
            .collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new));
    Collections.shuffle(addresses);
    if (addresses.isEmpty()) {
      return Optional.empty();
    } else {
      int i = ThreadLocalRandom.current().nextInt(addresses.size());
      return Optional.of(addresses.get(i));
    }
  }
​
  private Message prepareSyncDataMsg(String qualifier, String cid) {
    List<MembershipRecord> membershipRecords = new ArrayList<>(membershipTable.values());
    SyncData syncData = new SyncData(membershipRecords, config.getSyncGroup());
    return Message.withData(syncData)
        .qualifier(qualifier)
        .correlationId(cid)
        .sender(localMember.address())
        .build();
  }
​
  //......
}
  • schedulePeriodically会注册doSync任务每隔syncInterval执行;doSync方法首先调用selectSyncAddress随机选择一个member来作为发送SYNC的目标,之后通过prepareSyncDataMsg构造sync消息,然后通过transport.send发送

小结

  • MembershipProtocol接口定义了start、stop、listen、members、otherMembers、member方法;MembershipProtocolImpl实现了MembershipProtocol接口;它定义了MembershipUpdateReason枚举(FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT)
  • MembershipProtocolImpl的构造器监听了transport.listen()触发onMessage方法;监听了failureDetector.listen()触发onFailureDetectorEvent方法;监听了gossipProtocol.listen()触发onMembershipGossip方法;MembershipProtocolImpl的start方法在seedMembers.isEmpty()的时候会执行schedulePeriodicSync方法即每隔syncInterval执行doSync方法;当seedMembers不为空时则遍历seedMembers通过transport.requestResponse发送SYNC,执行成功则触发onSyncAck;stop方法则挨个销毁suspicionTimeoutTasks的future
  • onMessage方法首先通过checkSyncGroup检查一下是不是该syncGroup的消息;之后根据message.qualifier()是SYNC则执行onSync,是SYNC_ACK则执行onSyncAck;onSync方法则执行syncMembership,成功时向sender返回SYNC_ACK信息;onSyncAck方法也是调用syncMembership,只不过没有再向sender返回信息;syncMembership方法则根据syncData的membership来挨个执行updateMembership方法
  • onFailureDetectorEvent方法根据FailureDetectorEvent判断该MembershipRecord的状态是否有变化,如果变为ALIVE则往fdEvent.member().address()发送SYNC信息;否则使用MembershipUpdateReason.FAILURE_DETECTOR_EVENT来updateMembership;onMembershipGossip方法则针对message.qualifier()为MEMBERSHIP_GOSSIP的消息使用MembershipUpdateReason.MEMBERSHIP_GOSSIP来updateMembership
  • updateMembership会对比传入的MembershipRecord与本地的localMember,如果是需要更新localMember则执行spreadMembershipGossip,之后根据MembershipRecord的状态做不同处理,比如isDead则从membershipTable移除,比如isSuspect则执行scheduleSuspicionTimeoutTask,否则执行cancelSuspicionTimeoutTask,最后执行emitMembershipEvent及spreadMembershipGossip
  • schedulePeriodically会注册doSync任务每隔syncInterval执行;doSync方法首先调用selectSyncAddress随机选择一个member来作为发送SYNC的目标,之后通过prepareSyncDataMsg构造sync消息,然后通过transport.send发送

MembershipProtocolImpl的start方法会注册doSync任务(每隔syncInterval执行),该任务会发送SYNC消息给随机选择出来的member,来sync全量的membershipRecords;onMessage方法接收到SYNC消息时执行syncMembership并在成功时返回SYNC_ACK,接收到SYNC_ACK时也是执行syncMembership;onFailureDetectorEvent及onMembershipGossip方法都会触发updateMembership方法来更新membershipTable必要是进行spreadMembershipGossip

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊scalecube-cluster的MembershipProtocol

    本文主要研究一下scalecube-cluster的MembershipProtocol

    codecraft
  • 聊聊SimpleCanalConnector的getWithoutAck

    本文主要研究一下SimpleCanalConnector的getWithoutAck

    codecraft
  • 聊聊SimpleCanalConnector的getWithoutAck

    本文主要研究一下SimpleCanalConnector的getWithoutAck

    codecraft
  • 聊聊scalecube-cluster的MembershipProtocol

    本文主要研究一下scalecube-cluster的MembershipProtocol

    codecraft
  • 8大前端安全问题(下)| 洞见

    在《8大前端安全问题(上)》这篇文章里我们谈到了什么是前端安全问题,并且介绍了其中的4大典型安全问题,本篇文章将介绍剩下的4大前端安全问题,它们分别是: 防火防...

    ThoughtWorks
  • 云计算和大数据:繁华背后的凄凉

    从目前来看,2015年云计算与大数据产业一定会继续快速增长,各种强调数据重要性的论调都是老生常谈了,我们现在想知道的是,在云计算与大数据产业已经十分热闹的情况下...

    静一
  • Hygieia-你值得拥有!!!(上篇)

    Hygieia,由Capitalone公司开源的DevOps系统,可构建软件需求、开发、测试、部署全流程的端到端Dashboard看板平台,采集各种常见系统的数...

    用户5521279
  • DevOps利器- Hygieia平台开发部署

    Capitalone(全美十大银行之一)开源的DevOps利器。使用Hygieia后,在整个软件开发周期中,用户可以选择VersionOne或Jira进行用户故...

    kl博主
  • Boltdb源码分析(三)----meta结构

    版权声明:本文为作者原创,如需转载请通知本人,并标明出处和作者。擅自转...

    月牙寂道长
  • kubernetes其他控制器之PodDisruptionBudget

    在Kubernetes中为了保证业务不中断或者业务SLA不降级,需要将应用集群化部署,比如Deployment,StatefulSet部署等。虽然是集群化部署,...

    极客运维圈

扫码关注云+社区

领取腾讯云代金券