前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的HAManager

聊聊artemis的HAManager

作者头像
code4it
发布2020-02-24 09:47:25
5510
发布2020-02-24 09:47:25
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下artemis的HAManager

HAManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAManager.java

代码语言:javascript
复制
public interface HAManager extends ActiveMQComponent {

   /**
    * return the current backup servers
    *
    * @return the backups
    */
   Map<String, ActiveMQServer> getBackupServers();
}
  • HAManager继承了ActiveMQComponent接口,它定义了getBackupServers方法

StandaloneHAManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/StandaloneHAManager.java

代码语言:javascript
复制
public class StandaloneHAManager implements HAManager {

   Map<String, ActiveMQServer> servers = new HashMap<>();

   boolean isStarted = false;

   @Override
   public Map<String, ActiveMQServer> getBackupServers() {
      return servers;
   }

   @Override
   public void start() throws Exception {
      if (isStarted)
         return;
      isStarted = true;
   }

   @Override
   public void stop() throws Exception {
      if (!isStarted)
         return;
      isStarted = false;
   }

   @Override
   public boolean isStarted() {
      return isStarted;
   }
}
  • StandaloneHAManager实现了HAManager接口,其getBackupServers方法返回空map

ColocatedHAManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedHAManager.java

代码语言:javascript
复制
public class ColocatedHAManager implements HAManager {

   private final ColocatedPolicy haPolicy;

   private final ActiveMQServer server;

   private final Map<String, ActiveMQServer> backupServers = new HashMap<>();

   private boolean started;

   public ColocatedHAManager(ColocatedPolicy haPolicy, ActiveMQServer activeMQServer) {
      this.haPolicy = haPolicy;
      server = activeMQServer;
   }

   /**
    * starts the HA manager.
    */
   @Override
   public void start() {
      if (started)
         return;

      server.getActivation().haStarted();

      started = true;
   }

   /**
    * stop any backups
    */
   @Override
   public void stop() {
      for (ActiveMQServer activeMQServer : backupServers.values()) {
         try {
            activeMQServer.stop();
         } catch (Exception e) {
            ActiveMQServerLogger.LOGGER.errorStoppingServer(e);
         }
      }
      backupServers.clear();
      started = false;
   }

   @Override
   public boolean isStarted() {
      return started;
   }

   public synchronized boolean activateBackup(int backupSize,
                                              String journalDirectory,
                                              String bindingsDirectory,
                                              String largeMessagesDirectory,
                                              String pagingDirectory,
                                              SimpleString nodeID) throws Exception {
      if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != backupServers.size()) {
         return false;
      }
      if (haPolicy.getBackupPolicy().isSharedStore()) {
         return activateSharedStoreBackup(journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory);
      } else {
         return activateReplicatedBackup(nodeID);
      }
   }

   /**
    * return the current backup servers
    *
    * @return the backups
    */
   @Override
   public Map<String, ActiveMQServer> getBackupServers() {
      return backupServers;
   }

   /**
    * send a request to a live server to start a backup for us
    *
    * @param connectorPair the connector for the node to request a backup from
    * @param backupSize    the current size of the requested nodes backups
    * @param replicated
    * @return true if the request wa successful.
    * @throws Exception
    */
   public boolean requestBackup(Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                int backupSize,
                                boolean replicated) throws Exception {
      ClusterController clusterController = server.getClusterManager().getClusterController();
      try
         (
            ClusterControl clusterControl = clusterController.connectToNode(connectorPair.getA());
         ) {
         clusterControl.authorize();
         if (replicated) {
            return clusterControl.requestReplicatedBackup(backupSize, server.getNodeID());
         } else {
            return clusterControl.requestSharedStoreBackup(backupSize, server.getConfiguration().getJournalLocation().getAbsolutePath(), server.getConfiguration().getBindingsLocation().getAbsolutePath(), server.getConfiguration().getLargeMessagesLocation().getAbsolutePath(), server.getConfiguration().getPagingLocation().getAbsolutePath());

         }
      }
   }

   private synchronized boolean activateSharedStoreBackup(String journalDirectory,
                                                          String bindingsDirectory,
                                                          String largeMessagesDirectory,
                                                          String pagingDirectory) throws Exception {
      Configuration configuration = server.getConfiguration().copy();
      ActiveMQServer backup = server.createBackupServer(configuration);
      try {
         int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);
         String name = "colocated_backup_" + backupServers.size() + 1;
         //make sure we don't restart as we are colocated
         haPolicy.getBackupPolicy().setRestartBackup(false);
         //set the backup policy
         backup.setHAPolicy(haPolicy.getBackupPolicy());
         updateSharedStoreConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory, haPolicy.getBackupPolicy().getScaleDownPolicy() == null);

         backupServers.put(configuration.getName(), backup);
         backup.start();
      } catch (Exception e) {
         backup.stop();
         ActiveMQServerLogger.LOGGER.activateSharedStoreSlaveFailed(e);
         return false;
      }
      ActiveMQServerLogger.LOGGER.activatingSharedStoreSlave();
      return true;
   }

   /**
    * activate a backup server replicating from a specified node.
    *
    * decline and the requesting server can cast a re vote
    *
    * @param nodeID the id of the node to replicate from
    * @return true if the server was created and started
    * @throws Exception
    */
   private synchronized boolean activateReplicatedBackup(SimpleString nodeID) throws Exception {
      final TopologyMember member;
      try {
         member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeID.toString());
         if (!Objects.equals(member.getBackupGroupName(), haPolicy.getBackupPolicy().getBackupGroupName())) {
            return false;
         }
      } catch (Exception e) {
         ActiveMQServerLogger.LOGGER.activateReplicatedBackupFailed(e);
         return false;
      }
      Configuration configuration = server.getConfiguration().copy();
      ActiveMQServer backup = server.createBackupServer(configuration);
      try {
         int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);
         String name = "colocated_backup_" + backupServers.size() + 1;
         //make sure we don't restart as we are colocated
         haPolicy.getBackupPolicy().setRestartBackup(false);
         //set the backup policy
         backup.setHAPolicy(haPolicy.getBackupPolicy());
         updateReplicatedConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), haPolicy.getBackupPolicy().getScaleDownPolicy() == null);
         backup.addActivationParam(ActivationParams.REPLICATION_ENDPOINT, member);
         backupServers.put(configuration.getName(), backup);
         backup.start();
      } catch (Exception e) {
         backup.stop();
         ActiveMQServerLogger.LOGGER.activateReplicatedBackupFailed(e);
         return false;
      }
      ActiveMQServerLogger.LOGGER.activatingReplica(nodeID);
      return true;
   }

   //......
}
  • ColocatedHAManager实现了HAManager接口,其getBackupServers方法返回backupServers;activateSharedStoreBackup方法以及activateReplicatedBackup方法都会通过server.createBackupServer(configuration)创建backup,然后添加到backupServers;activateBackup方法则根据haPolicy.getBackupPolicy()来选择执行activateSharedStoreBackup或者是activateReplicatedBackup方法

ColocatedPolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java

代码语言:javascript
复制
public class ColocatedPolicy implements HAPolicy<LiveActivation> {

   /*live stuff*/
   private boolean requestBackup = ActiveMQDefaultConfiguration.isDefaultHapolicyRequestBackup();

   private int backupRequestRetries = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupRequestRetries();

   private long backupRequestRetryInterval = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupRequestRetryInterval();

   private int maxBackups = ActiveMQDefaultConfiguration.getDefaultHapolicyMaxBackups();

   private int backupPortOffset = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupPortOffset();

   /*backup stuff*/
   private List<String> excludedConnectors = new ArrayList<>();

   private BackupPolicy backupPolicy;

   private HAPolicy<LiveActivation> livePolicy;

   //......
}
  • ColocatedPolicy实现了HAPolicy接口,其定义了backupPolicy属性

BackupPolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java

代码语言:javascript
复制
public abstract class BackupPolicy implements HAPolicy<Activation> {

   protected ScaleDownPolicy scaleDownPolicy;
   protected boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();

   public ScaleDownPolicy getScaleDownPolicy() {
      return scaleDownPolicy;
   }

   public void setScaleDownPolicy(ScaleDownPolicy scaleDownPolicy) {
      this.scaleDownPolicy = scaleDownPolicy;
   }

   @Override
   public boolean isBackup() {
      return true;
   }

   @Override
   public String getScaleDownClustername() {
      return null;
   }

   @Override
   public String getScaleDownGroupName() {
      return getScaleDownPolicy() != null ? getScaleDownPolicy().getGroupName() : null;
   }

   public boolean isRestartBackup() {
      return restartBackup;
   }

   public void setRestartBackup(boolean restartBackup) {
      this.restartBackup = restartBackup;
   }
}
  • BackupPolicy声明实现了HAPolicy接口,其isBackup方法返回true;它有两个实现类分别是SharedStoreSlavePolicy与ReplicaPolicy

SharedStoreSlavePolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java

代码语言:javascript
复制
public class SharedStoreSlavePolicy extends BackupPolicy {

   private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();

   private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();

   private boolean isWaitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();

   //this is how we act once we have failed over
   private SharedStoreMasterPolicy sharedStoreMasterPolicy;

   public SharedStoreSlavePolicy() {
   }

   public SharedStoreSlavePolicy(boolean failoverOnServerShutdown,
                                 boolean restartBackup,
                                 boolean allowAutoFailBack,
                                 ScaleDownPolicy scaleDownPolicy) {
      this.failoverOnServerShutdown = failoverOnServerShutdown;
      this.restartBackup = restartBackup;
      this.allowAutoFailBack = allowAutoFailBack;
      this.scaleDownPolicy = scaleDownPolicy;
   }

   @Deprecated
   public long getFailbackDelay() {
      return -1;
   }

   @Deprecated
   public void setFailbackDelay(long failbackDelay) {
   }

   public boolean isFailoverOnServerShutdown() {
      return failoverOnServerShutdown;
   }

   public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown) {
      this.failoverOnServerShutdown = failoverOnServerShutdown;
   }

   public SharedStoreMasterPolicy getSharedStoreMasterPolicy() {
      if (sharedStoreMasterPolicy == null) {
         sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown, isWaitForActivation);
      }
      return sharedStoreMasterPolicy;
   }

   public void setSharedStoreMasterPolicy(SharedStoreMasterPolicy sharedStoreMasterPolicy) {
      this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
   }

   @Override
   public boolean isSharedStore() {
      return true;
   }

   @Override
   public boolean canScaleDown() {
      return scaleDownPolicy != null;
   }

   public boolean isAllowAutoFailBack() {
      return allowAutoFailBack;
   }

   public void setAllowAutoFailBack(boolean allowAutoFailBack) {
      this.allowAutoFailBack = allowAutoFailBack;
   }

   public void setIsWaitForActivation(boolean isWaitForActivation) {
      this.isWaitForActivation = isWaitForActivation;
   }

   @Override
   public Activation createActivation(ActiveMQServerImpl server,
                                      boolean wasLive,
                                      Map<String, Object> activationParams,
                                      ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
      return new SharedStoreBackupActivation(server, this);
   }

   @Override
   public String getBackupGroupName() {
      return null;
   }
}
  • SharedStoreSlavePolicy继承了BackupPolicy,其isSharedStore方法返回true

ReplicaPolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java

代码语言:javascript
复制
public class ReplicaPolicy extends BackupPolicy {

   private String clusterName;

   private int maxSavedReplicatedJournalsSize = ActiveMQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();

   private String groupName = null;

   private boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();

   //used if we create a replicated policy for when we become live.
   private boolean allowFailback = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();

   private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();

   /*
   * what quorum size to use for voting
   * */
   private int quorumSize;

   /*
   * whether or not this live broker should vote to remain live
   * */
   private boolean voteOnReplicationFailure;

   private ReplicatedPolicy replicatedPolicy;

   private final NetworkHealthCheck networkHealthCheck;

   private int voteRetries;

   private long voteRetryWait;

   private final int quorumVoteWait;

   private long retryReplicationWait;

   //......

   @Override
   public boolean isRestartBackup() {
      return restartBackup;
   }

   @Override
   public void setRestartBackup(boolean restartBackup) {
      this.restartBackup = restartBackup;
   }

   @Override
   public boolean isSharedStore() {
      return false;
   }

   //......  
}
  • ReplicaPolicy继承了BackupPolicy,其isSharedStore方法返回false

小结

HAManager继承了ActiveMQComponent接口,它定义了getBackupServers方法;StandaloneHAManager实现了HAManager接口,其getBackupServers方法返回空map;ColocatedHAManager实现了HAManager接口,其getBackupServers方法返回backupServers;activateSharedStoreBackup方法以及activateReplicatedBackup方法都会通过server.createBackupServer(configuration)创建backup,然后添加到backupServers;activateBackup方法则根据haPolicy.getBackupPolicy()来选择执行activateSharedStoreBackup或者是activateReplicatedBackup方法

doc

  • HAManager
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HAManager
  • StandaloneHAManager
  • ColocatedHAManager
  • ColocatedPolicy
  • BackupPolicy
    • SharedStoreSlavePolicy
      • ReplicaPolicy
      • 小结
      • doc
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档