前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的QuorumVote

聊聊artemis的QuorumVote

作者头像
code4it
发布2020-02-24 09:45:47
3640
发布2020-02-24 09:45:47
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的QuorumVote

QuorumVote

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVote.java

代码语言:javascript
复制
public abstract class QuorumVote<V extends Vote, T> {

   private SimpleString name;

   public QuorumVote(SimpleString name) {
      this.name = name;
   }

   /**
    * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when one of the nodes in the quorum is
    * successfully connected to. The QuorumVote can then decide whether or not a decision can be made with just that information.
    *
    * @return the vote to use
    */
   public abstract Vote connected();

   /**
    * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} fails to connect to a node in the quorum.
    * The QuorumVote can then decide whether or not a decision can be made with just that information however the node
    * cannot cannot be asked.
    *
    * @return the vote to use
    */
   public abstract Vote notConnected();

   /**
    * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when a vote can be made, either from the
    * cluster or decided by itself.
    *
    * @param vote the vote to make.
    */
   public abstract void vote(V vote);

   /**
    * get the decion of the vote
    *
    * @return the voting decision
    */
   public abstract T getDecision();

   /**
    * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when all the votes have been cast and received.
    *
    * @param voteTopology the topology of where the votes were sent.
    */
   public abstract void allVotesCast(Topology voteTopology);

   /**
    * the name of this quorum vote, used for identifying the correct {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler}
    *
    * @return the name of the wuorum vote
    */
   public SimpleString getName() {
      return name;
   }
}
  • QuorumVote是个抽象类,定义了connected、notConnected、vote、getDecision、allVotesCast抽象方法

QuorumVoteServerConnect

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java

代码语言:javascript
复制
public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> {

   public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LiveFailoverQuorumVote");
   private final CountDownLatch latch;
   private final String targetNodeId;
   private final String liveConnector;

   private int votesNeeded;

   private int total = 0;

   private boolean decision = false;

   // Is this the live requesting to stay live, or a backup requesting to become live.
   private boolean requestToStayLive = false;

   /**
    * live nodes | remaining nodes |  majority   | votes needed
    * 1      |       0         |     0       |      0
    * 2      |       1         |     1       |      1
    * n      |    r = n-1      |   n/2 + 1   |   n/2 + 1 rounded
    * 3      |       2         |     2.5     |      2
    * 4      |       3         |      3      |      3
    * 5      |       4         |     3.5     |      3
    * 6      |       5         |      4      |      4
    */
   public QuorumVoteServerConnect(int size, String targetNodeId, boolean requestToStayLive, String liveConnector) {
      super(LIVE_FAILOVER_VOTE);
      this.targetNodeId = targetNodeId;
      this.liveConnector = liveConnector;
      double majority;
      if (size <= 2) {
         majority = ((double) size) / 2;
      } else {
         //even
         majority = ((double) size) / 2 + 1;
      }
      //votes needed could be say 2.5 so we add 1 in this case
      votesNeeded = (int) majority;
      latch = new CountDownLatch(votesNeeded);
      if (votesNeeded == 0) {
         decision = true;
      }
      this.requestToStayLive = requestToStayLive;
   }

   public QuorumVoteServerConnect(int size, String targetNodeId) {
      this(size, targetNodeId, false, null);
   }
   /**
    * if we can connect to a node
    *
    * @return
    */
   @Override
   public Vote connected() {
      return new ServerConnectVote(targetNodeId, requestToStayLive, null);
   }
   /**
    * if we cant connect to the node
    *
    * @return
    */
   @Override
   public Vote notConnected() {
      return new BooleanVote(false);
   }

   /**
    * live nodes | remaining nodes |  majority   | votes needed
    * 1      |       0         |     0       |      0
    * 2      |       1         |     1       |      1
    * n      |    r = n-1      |   n/2 + 1   |   n/2 + 1 rounded
    * 3      |       2         |     2.5     |      2
    * 4      |       3         |      3      |      3
    * 5      |       4         |     3.5     |      3
    * 6      |       5         |      4      |      4
    *
    * @param vote the vote to make.
    */
   @Override
   public synchronized void vote(ServerConnectVote vote) {
      if (decision)
         return;
      if (!requestToStayLive && vote.getVote()) {
         total++;
         latch.countDown();
         if (total >= votesNeeded) {
            decision = true;
         }//do the opposite, if it says there is a node connected it means the backup has come live
      } else if (requestToStayLive && vote.getVote()) {
         total++;
         latch.countDown();
         if (liveConnector != null && !liveConnector.equals(vote.getTransportConfiguration())) {
            ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector);
            return;
         }
         if (total >= votesNeeded) {
            decision = true;
         }
      }
   }

   @Override
   public void allVotesCast(Topology voteTopology) {
      while (latch.getCount() > 0) {
         latch.countDown();
      }
   }

   @Override
   public Boolean getDecision() {
      return decision;
   }

   public void await(int latchTimeout, TimeUnit unit) throws InterruptedException {
      ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout, unit.toString().toLowerCase());
      if (latch.await(latchTimeout, unit))
         ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes();
      else
         ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses();
   }

   public boolean isRequestToStayLive() {
      return requestToStayLive;
   }
}
  • QuorumVoteServerConnect继承了QuorumVote,其构造器根据size初始化votesNeeded及decision;其connected方法返回ServerConnectVote;其notConnected方法返回BooleanVote(false);其vote方法对于ServerConnectVote的vote为true的递增total,同时latch.countDown(),对于total大于等于votesNeeded的更新decision为true;其allVotesCast方法则循环latch.countDown()

小结

QuorumVote是个抽象类,定义了connected、notConnected、vote、getDecision、allVotesCast抽象方法;QuorumVoteServerConnect继承了QuorumVote,其构造器根据size初始化votesNeeded及decision;其connected方法返回ServerConnectVote;其notConnected方法返回BooleanVote(false);其vote方法对于ServerConnectVote的vote为true的递增total,同时latch.countDown(),对于total大于等于votesNeeded的更新decision为true;其allVotesCast方法则循环latch.countDown()

doc

  • QuorumVoteServerConnect
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • QuorumVote
  • QuorumVoteServerConnect
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档