前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的QuorumManager

聊聊artemis的QuorumManager

原创
作者头像
code4it
修改2020-02-11 11:57:18
6460
修改2020-02-11 11:57:18
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的QuorumManager

ClusterTopologyListener

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClusterTopologyListener.java

代码语言:javascript
复制
public interface ClusterTopologyListener {
​
   /**
    * Triggered when a node joins the cluster.
    *
    * @param member
    * @param last   if the whole cluster topology is being transmitted (after adding the listener to
    *               the cluster connection) this parameter will be {@code true} for the last topology
    *               member.
    */
   void nodeUP(TopologyMember member, boolean last);
​
   /**
    * Triggered when a node leaves the cluster.
    *
    * @param eventUID
    * @param nodeID   the id of the node leaving the cluster
    */
   void nodeDown(long eventUID, String nodeID);
}
  • ClusterTopologyListener接口定义了nodeUP、nodeDown方法

QuorumManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java

代码语言:javascript
复制
public final class QuorumManager implements ClusterTopologyListener, ActiveMQComponent {
​
   private final ExecutorService executor;
​
   private final ClusterController clusterController;
​
   /**
    * all the current registered {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum}'s
    */
   private final Map<String, Quorum> quorums = new HashMap<>();
​
   /**
    * any currently running runnables.
    */
   private final Map<QuorumVote, VoteRunnableHolder> voteRunnables = new HashMap<>();
​
   private final Map<SimpleString, QuorumVoteHandler> handlers = new HashMap<>();
​
   private boolean started = false;
​
   /**
    * this is the max size that the cluster has been.
    */
   private int maxClusterSize = 0;
​
   public QuorumManager(ExecutorService threadPool, ClusterController clusterController) {
      this.clusterController = clusterController;
      this.executor = threadPool;
   }
​
   /**
    * we start by simply creating the server locator and connecting in a separate thread
    *
    * @throws Exception
    */
   @Override
   public void start() throws Exception {
      if (started)
         return;
      started = true;
   }
​
   /**
    * stops the server locator
    *
    * @throws Exception
    */
   @Override
   public void stop() throws Exception {
      if (!started)
         return;
      synchronized (voteRunnables) {
         started = false;
         for (VoteRunnableHolder voteRunnableHolder : voteRunnables.values()) {
            for (VoteRunnable runnable : voteRunnableHolder.runnables) {
               runnable.close();
            }
         }
      }
      for (Quorum quorum : quorums.values()) {
         quorum.close();
      }
      quorums.clear();
   }
​
   /**
    * are we started
    *
    * @return
    */
   @Override
   public boolean isStarted() {
      return started;
   }
​
   /**
    * registers a {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum} so that it can be notified of changes in the cluster.
    *
    * @param quorum
    */
   public void registerQuorum(Quorum quorum) {
      quorums.put(quorum.getName(), quorum);
      quorum.setQuorumManager(this);
   }
​
   /**
    * unregisters a {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum}.
    *
    * @param quorum
    */
   public void unRegisterQuorum(Quorum quorum) {
      quorums.remove(quorum.getName());
   }
​
   /**
    * called by the {@link org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal} when the topology changes. we update the
    * {@code maxClusterSize} if needed and inform the {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum}'s.
    *
    * @param topologyMember the topolgy changed
    * @param last           if the whole cluster topology is being transmitted (after adding the listener to
    *                       the cluster connection) this parameter will be {@code true} for the last topology
    */
   @Override
   public void nodeUP(TopologyMember topologyMember, boolean last) {
      final int newClusterSize = clusterController.getDefaultClusterSize();
      maxClusterSize = newClusterSize > maxClusterSize ? newClusterSize : maxClusterSize;
      for (Quorum quorum : quorums.values()) {
         quorum.nodeUp(clusterController.getDefaultClusterTopology());
      }
   }
​
   /**
    * notify the {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum} of a topology change.
    *
    * @param eventUID
    * @param nodeID   the id of the node leaving the cluster
    */
   @Override
   public void nodeDown(long eventUID, String nodeID) {
      for (Quorum quorum : quorums.values()) {
         quorum.nodeDown(clusterController.getDefaultClusterTopology(), eventUID, nodeID);
      }
   }
​
   /**
    * returns the maximum size this cluster has been.
    *
    * @return max size
    */
   public int getMaxClusterSize() {
      return maxClusterSize;
   }
​
   /**
    * ask the quorum to vote within a specific quorum.
    *
    * @param quorumVote the vote to acquire
    */
   public void vote(final QuorumVote quorumVote) {
      List<VoteRunnable> runnables = new ArrayList<>();
      synchronized (voteRunnables) {
         if (!started)
            return;
         //send a vote to each node
         ActiveMQServerLogger.LOGGER.initiatingQuorumVote(quorumVote.getName());
         for (TopologyMemberImpl tm : clusterController.getDefaultClusterTopology().getMembers()) {
            //but not ourselves
            if (!tm.getNodeId().equals(clusterController.getNodeID().toString())) {
               Pair<TransportConfiguration, TransportConfiguration> pair = tm.getConnector();
​
               final TransportConfiguration serverTC = pair.getA();
​
               VoteRunnable voteRunnable = new VoteRunnable(serverTC, quorumVote);
​
               runnables.add(voteRunnable);
            }
         }
         if (runnables.size() > 0) {
            voteRunnables.put(quorumVote, new VoteRunnableHolder(quorumVote, runnables, runnables.size()));
​
            for (VoteRunnable runnable : runnables) {
               executor.submit(runnable);
            }
         } else {
            quorumVote.allVotesCast(clusterController.getDefaultClusterTopology());
         }
      }
   }
​
   /**
    * handle a vote received on the quorum
    *
    * @param handler the name of the handler to use for the vote
    * @param vote    the vote
    * @return the updated vote
    */
   public Vote vote(SimpleString handler, Vote vote) {
      QuorumVoteHandler quorumVoteHandler = handlers.get(handler);
      return quorumVoteHandler.vote(vote);
   }
​
   /**
    * must be called by the quorum when it is happy on an outcome. only one vote can take place at anyone time for a
    * specific quorum
    *
    * @param quorumVote the vote
    */
   public void voteComplete(QuorumVoteServerConnect quorumVote) {
      VoteRunnableHolder holder = voteRunnables.remove(quorumVote);
      if (holder != null) {
         for (VoteRunnable runnable : holder.runnables) {
            runnable.close();
         }
      }
   }
​
   /**
    * called to register vote handlers on the quorum
    *
    * @param quorumVoteHandler the vote handler
    */
   public void registerQuorumHandler(QuorumVoteHandler quorumVoteHandler) {
      handlers.put(quorumVoteHandler.getQuorumName(), quorumVoteHandler);
   }
​
   //......
}
  • QuorumManager实现了ClusterTopologyListener接口,它提供了registerQuorum方法用于注册quorum;其nodeUP方法会遍历quorums,挨个执行quorum.nodeUp(clusterController.getDefaultClusterTopology());其nodeDown方法会遍历quorums,挨个执行quorum.nodeDown(clusterController.getDefaultClusterTopology(), eventUID, nodeID);其vote方法会遍历clusterController.getDefaultClusterTopology().getMembers(),对于非clusterController.getNodeID()的创建VoteRunnable并添加到runnables中,对于runnables不为空的挨个提交到executor执行,否则执行quorumVote.allVotesCast(clusterController.getDefaultClusterTopology())

VoteRunnable

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java

代码语言:javascript
复制
   private final class VoteRunnable implements Runnable {
​
      private final TransportConfiguration serverTC;
      private final QuorumVote quorumVote;
      private ClusterControl clusterControl;
​
      private VoteRunnable(TransportConfiguration serverTC, QuorumVote quorumVote) {
         this.serverTC = serverTC;
         this.quorumVote = quorumVote;
      }
​
      @Override
      public void run() {
         try {
            Vote vote;
            if (!started)
               return;
            //try to connect to the node i want to send a vote to
            clusterControl = clusterController.connectToNode(serverTC);
            clusterControl.authorize();
            //if we are successful get the vote and check whether we need to send it to the target server,
            //just connecting may be enough
​
            vote = quorumVote.connected();
            if (vote.isRequestServerVote()) {
               vote = clusterControl.sendQuorumVote(quorumVote.getName(), vote);
               quorumVote.vote(vote);
            } else {
               quorumVote.vote(vote);
            }
         } catch (Exception e) {
            Vote vote = quorumVote.notConnected();
            quorumVote.vote(vote);
         } finally {
            try {
               if (clusterControl != null) {
                  clusterControl.close();
               }
            } catch (Exception e) {
               //ignore
            }
            QuorumManager.this.votingComplete(quorumVote);
         }
      }
​
      public void close() {
         if (clusterControl != null) {
            clusterControl.close();
         }
      }
   }
  • VoteRunnable实现了Runnable接口,其run方法先执行quorumVote.connected(),对于vote.isRequestServerVote()的执行clusterControl.sendQuorumVote(quorumVote.getName(), vote),之后执行quorumVote.vote(vote)标记vote,最后执行QuorumManager.this.votingComplete(quorumVote)

VoteRunnableHolder

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java

代码语言:javascript
复制
   private final class VoteRunnableHolder {
​
      private final QuorumVote quorumVote;
      private final List<VoteRunnable> runnables;
      private int size;
​
      private VoteRunnableHolder(QuorumVote quorumVote, List<VoteRunnable> runnables, int size) {
         this.quorumVote = quorumVote;
​
         this.runnables = runnables;
         this.size = size;
      }
​
      public synchronized void voteComplete() {
         size--;
         if (size <= 0) {
            quorumVote.allVotesCast(clusterController.getDefaultClusterTopology());
         }
      }
   }
  • VoteRunnableHolder的voteComplete会递减size,在最后size小于等于0时触发quorumVote.allVotesCast(clusterController.getDefaultClusterTopology()),标记所有投票已经发送出去

QuorumVoteMessage

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java

代码语言:javascript
复制
public class QuorumVoteMessage extends PacketImpl {
​
   private SimpleString handler;
​
   private Vote vote;
​
   private ActiveMQBuffer voteBuffer;
​
   public QuorumVoteMessage() {
      super(QUORUM_VOTE);
   }
​
   public QuorumVoteMessage(SimpleString handler, Vote vote) {
      super(QUORUM_VOTE);
      this.handler = handler;
      this.vote = vote;
   }
​
   @Override
   public void encodeRest(ActiveMQBuffer buffer) {
      super.encodeRest(buffer);
      buffer.writeSimpleString(handler);
      vote.encode(buffer);
   }
​
   @Override
   public void decodeRest(ActiveMQBuffer buffer) {
      super.decodeRest(buffer);
      handler = buffer.readSimpleString();
      voteBuffer = ActiveMQBuffers.fixedBuffer(buffer.readableBytes());
      buffer.readBytes(voteBuffer);
   }
​
   public SimpleString getHandler() {
      return handler;
   }
​
   public Vote getVote() {
      return vote;
   }
​
   public void decode(QuorumVoteHandler voteHandler) {
      vote = voteHandler.decode(voteBuffer);
   }
​
​
   @Override
   public String toString() {
      StringBuffer buff = new StringBuffer(getParentString());
      buff.append("]");
      return buff.toString();
   }
​
   @Override
   public String getParentString() {
      StringBuffer buff = new StringBuffer(super.getParentString());
      buff.append(", vote=" + vote);
      buff.append(", handler=" + handler);
      return buff.toString();
   }
}
  • QuorumVoteMessage继承了PacketImpl,其type为QUORUM_VOTE

ClusterControllerChannelHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java

代码语言:javascript
复制
   private final class ClusterControllerChannelHandler implements ChannelHandler {
​
      private final Channel clusterChannel;
      private final Acceptor acceptorUsed;
      private final CoreRemotingConnection remotingConnection;
      private final ChannelHandler channelHandler;
      boolean authorized = false;
​
      private ClusterControllerChannelHandler(Channel clusterChannel,
                                              Acceptor acceptorUsed,
                                              CoreRemotingConnection remotingConnection,
                                              ChannelHandler channelHandler) {
         this.clusterChannel = clusterChannel;
         this.acceptorUsed = acceptorUsed;
         this.remotingConnection = remotingConnection;
         this.channelHandler = channelHandler;
      }
​
      @Override
      public void handlePacket(Packet packet) {
         if (!isStarted()) {
            if (channelHandler != null) {
               channelHandler.handlePacket(packet);
            }
            return;
         }
​
         if (!authorized) {
            if (packet.getType() == PacketImpl.CLUSTER_CONNECT) {
               ClusterConnection clusterConnection = acceptorUsed.getClusterConnection();
​
               //if this acceptor isn't associated with a cluster connection use the default
               if (clusterConnection == null) {
                  clusterConnection = server.getClusterManager().getDefaultConnection(null);
               }
​
               ClusterConnectMessage msg = (ClusterConnectMessage) packet;
​
               if (server.getConfiguration().isSecurityEnabled() && !clusterConnection.verify(msg.getClusterUser(), msg.getClusterPassword())) {
                  clusterChannel.send(new ClusterConnectReplyMessage(false));
               } else {
                  authorized = true;
                  clusterChannel.send(new ClusterConnectReplyMessage(true));
               }
            }
         } else {
            if (packet.getType() == PacketImpl.NODE_ANNOUNCE) {
               NodeAnnounceMessage msg = (NodeAnnounceMessage) packet;
​
               Pair<TransportConfiguration, TransportConfiguration> pair;
               if (msg.isBackup()) {
                  pair = new Pair<>(null, msg.getConnector());
               } else {
                  pair = new Pair<>(msg.getConnector(), msg.getBackupConnector());
               }
               if (logger.isTraceEnabled()) {
                  logger.trace("Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
               }
​
               if (acceptorUsed != null) {
                  ClusterConnection clusterConn = acceptorUsed.getClusterConnection();
                  if (clusterConn != null) {
                     String scaleDownGroupName = msg.getScaleDownGroupName();
                     clusterConn.nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), msg.getBackupGroupName(), scaleDownGroupName, pair, msg.isBackup());
                  } else {
                     logger.debug("Cluster connection is null on acceptor = " + acceptorUsed);
                  }
               } else {
                  logger.debug("there is no acceptor used configured at the CoreProtocolManager " + this);
               }
            } else if (packet.getType() == PacketImpl.QUORUM_VOTE) {
               QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet;
               QuorumVoteHandler voteHandler = quorumManager.getVoteHandler(quorumVoteMessage.getHandler());
               if (voteHandler == null) {
                  ActiveMQServerLogger.LOGGER.noVoteHandlerConfigured();
                  return;
               }
               quorumVoteMessage.decode(voteHandler);
               ActiveMQServerLogger.LOGGER.receivedQuorumVoteRequest(quorumVoteMessage.getVote().toString());
               Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
               ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
               clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
            } else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
               ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
               //we don't really need to check as it should always be true
               if (server.getNodeID().equals(message.getTargetNodeId())) {
                  server.addScaledDownNode(message.getScaledDownNodeId());
               }
            } else if (channelHandler != null) {
               channelHandler.handlePacket(packet);
            }
         }
      }
​
   }
  • ClusterControllerChannelHandler实现了ChannelHandler接口,其handlePacket方法在接收到type为PacketImpl.QUORUM_VOTE的数据时会执行quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote()),然后将返还的vote包装为QuorumVoteReplyMessage响应回去

QuorumVoteHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteHandler.java

代码语言:javascript
复制
public interface QuorumVoteHandler {
​
   /**
    * @param vote
    * @return
    */
   Vote vote(Vote vote);
​
   /**
    * the name of the quorum vote
    *
    * @return the name
    */
   SimpleString getQuorumName();
​
   Vote decode(ActiveMQBuffer voteBuffer);
}
  • QuorumVoteHandler定义了vote、getQuorumName、decode方法

ServerConnectVoteHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java

代码语言:javascript
复制
public class ServerConnectVoteHandler implements QuorumVoteHandler {
   private final ActiveMQServerImpl server;
​
   public ServerConnectVoteHandler(ActiveMQServerImpl server) {
      this.server = server;
   }
​
   @Override
   public Vote vote(Vote vote) {
      ServerConnectVote serverConnectVote = (ServerConnectVote) vote;
      String nodeid = serverConnectVote.getNodeId();
      try {
         TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
​
         if (member != null && member.getLive() != null) {
            ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid);
            return new ServerConnectVote(nodeid, (Boolean) vote.getVote(), member.getLive().toString());
         }
         ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid);
      } catch (Exception e) {
         e.printStackTrace();
      }
      return new ServerConnectVote(nodeid, !((Boolean) vote.getVote()), null);
   }
​
   @Override
   public SimpleString getQuorumName() {
      return QuorumVoteServerConnect.LIVE_FAILOVER_VOTE;
   }
​
   @Override
   public Vote decode(ActiveMQBuffer voteBuffer) {
      ServerConnectVote vote = new ServerConnectVote();
      vote.decode(voteBuffer);
      return vote;
   }
}
  • ServerConnectVoteHandler实现了QuorumVoteHandler接口,其vote方法根据nodeid获取topology的member,判断其是否alive,返回新的ServerConnectVote,其getQuorumName返回QuorumVoteServerConnect.LIVE_FAILOVER_VOTE

小结

QuorumManager提供了两个vote方法,只有QuorumVote参数的方法用于发起quorumVote,它会遍历clusterController.getDefaultClusterTopology().getMembers()去发起QuorumVoteMessage;另外一个方法vote方法是ClusterControllerChannelHandler接收到QuorumVoteMessage执行的方法,它将委托给了QuorumVoteHandler,然后响应QuorumVoteReplyMessage回去;VoteRunnable接收到QuorumVoteReplyMessage时会执行quorumVote.vote(vote)来统计投票

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ClusterTopologyListener
  • QuorumManager
  • VoteRunnable
  • VoteRunnableHolder
  • QuorumVoteMessage
  • ClusterControllerChannelHandler
  • QuorumVoteHandler
  • ServerConnectVoteHandler
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档