本文主要研究一下artemis的HAManager
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAManager.java
public interface HAManager extends ActiveMQComponent {
/**
* return the current backup servers
*
* @return the backups
*/
Map<String, ActiveMQServer> getBackupServers();
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/StandaloneHAManager.java
public class StandaloneHAManager implements HAManager {
Map<String, ActiveMQServer> servers = new HashMap<>();
boolean isStarted = false;
@Override
public Map<String, ActiveMQServer> getBackupServers() {
return servers;
}
@Override
public void start() throws Exception {
if (isStarted)
return;
isStarted = true;
}
@Override
public void stop() throws Exception {
if (!isStarted)
return;
isStarted = false;
}
@Override
public boolean isStarted() {
return isStarted;
}
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedHAManager.java
public class ColocatedHAManager implements HAManager {
private final ColocatedPolicy haPolicy;
private final ActiveMQServer server;
private final Map<String, ActiveMQServer> backupServers = new HashMap<>();
private boolean started;
public ColocatedHAManager(ColocatedPolicy haPolicy, ActiveMQServer activeMQServer) {
this.haPolicy = haPolicy;
server = activeMQServer;
}
/**
* starts the HA manager.
*/
@Override
public void start() {
if (started)
return;
server.getActivation().haStarted();
started = true;
}
/**
* stop any backups
*/
@Override
public void stop() {
for (ActiveMQServer activeMQServer : backupServers.values()) {
try {
activeMQServer.stop();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorStoppingServer(e);
}
}
backupServers.clear();
started = false;
}
@Override
public boolean isStarted() {
return started;
}
public synchronized boolean activateBackup(int backupSize,
String journalDirectory,
String bindingsDirectory,
String largeMessagesDirectory,
String pagingDirectory,
SimpleString nodeID) throws Exception {
if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != backupServers.size()) {
return false;
}
if (haPolicy.getBackupPolicy().isSharedStore()) {
return activateSharedStoreBackup(journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory);
} else {
return activateReplicatedBackup(nodeID);
}
}
/**
* return the current backup servers
*
* @return the backups
*/
@Override
public Map<String, ActiveMQServer> getBackupServers() {
return backupServers;
}
/**
* send a request to a live server to start a backup for us
*
* @param connectorPair the connector for the node to request a backup from
* @param backupSize the current size of the requested nodes backups
* @param replicated
* @return true if the request wa successful.
* @throws Exception
*/
public boolean requestBackup(Pair<TransportConfiguration, TransportConfiguration> connectorPair,
int backupSize,
boolean replicated) throws Exception {
ClusterController clusterController = server.getClusterManager().getClusterController();
try
(
ClusterControl clusterControl = clusterController.connectToNode(connectorPair.getA());
) {
clusterControl.authorize();
if (replicated) {
return clusterControl.requestReplicatedBackup(backupSize, server.getNodeID());
} else {
return clusterControl.requestSharedStoreBackup(backupSize, server.getConfiguration().getJournalLocation().getAbsolutePath(), server.getConfiguration().getBindingsLocation().getAbsolutePath(), server.getConfiguration().getLargeMessagesLocation().getAbsolutePath(), server.getConfiguration().getPagingLocation().getAbsolutePath());
}
}
}
private synchronized boolean activateSharedStoreBackup(String journalDirectory,
String bindingsDirectory,
String largeMessagesDirectory,
String pagingDirectory) throws Exception {
Configuration configuration = server.getConfiguration().copy();
ActiveMQServer backup = server.createBackupServer(configuration);
try {
int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);
String name = "colocated_backup_" + backupServers.size() + 1;
//make sure we don't restart as we are colocated
haPolicy.getBackupPolicy().setRestartBackup(false);
//set the backup policy
backup.setHAPolicy(haPolicy.getBackupPolicy());
updateSharedStoreConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory, haPolicy.getBackupPolicy().getScaleDownPolicy() == null);
backupServers.put(configuration.getName(), backup);
backup.start();
} catch (Exception e) {
backup.stop();
ActiveMQServerLogger.LOGGER.activateSharedStoreSlaveFailed(e);
return false;
}
ActiveMQServerLogger.LOGGER.activatingSharedStoreSlave();
return true;
}
/**
* activate a backup server replicating from a specified node.
*
* decline and the requesting server can cast a re vote
*
* @param nodeID the id of the node to replicate from
* @return true if the server was created and started
* @throws Exception
*/
private synchronized boolean activateReplicatedBackup(SimpleString nodeID) throws Exception {
final TopologyMember member;
try {
member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeID.toString());
if (!Objects.equals(member.getBackupGroupName(), haPolicy.getBackupPolicy().getBackupGroupName())) {
return false;
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.activateReplicatedBackupFailed(e);
return false;
}
Configuration configuration = server.getConfiguration().copy();
ActiveMQServer backup = server.createBackupServer(configuration);
try {
int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);
String name = "colocated_backup_" + backupServers.size() + 1;
//make sure we don't restart as we are colocated
haPolicy.getBackupPolicy().setRestartBackup(false);
//set the backup policy
backup.setHAPolicy(haPolicy.getBackupPolicy());
updateReplicatedConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), haPolicy.getBackupPolicy().getScaleDownPolicy() == null);
backup.addActivationParam(ActivationParams.REPLICATION_ENDPOINT, member);
backupServers.put(configuration.getName(), backup);
backup.start();
} catch (Exception e) {
backup.stop();
ActiveMQServerLogger.LOGGER.activateReplicatedBackupFailed(e);
return false;
}
ActiveMQServerLogger.LOGGER.activatingReplica(nodeID);
return true;
}
//......
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
public class ColocatedPolicy implements HAPolicy<LiveActivation> {
/*live stuff*/
private boolean requestBackup = ActiveMQDefaultConfiguration.isDefaultHapolicyRequestBackup();
private int backupRequestRetries = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupRequestRetries();
private long backupRequestRetryInterval = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupRequestRetryInterval();
private int maxBackups = ActiveMQDefaultConfiguration.getDefaultHapolicyMaxBackups();
private int backupPortOffset = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupPortOffset();
/*backup stuff*/
private List<String> excludedConnectors = new ArrayList<>();
private BackupPolicy backupPolicy;
private HAPolicy<LiveActivation> livePolicy;
//......
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
public abstract class BackupPolicy implements HAPolicy<Activation> {
protected ScaleDownPolicy scaleDownPolicy;
protected boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();
public ScaleDownPolicy getScaleDownPolicy() {
return scaleDownPolicy;
}
public void setScaleDownPolicy(ScaleDownPolicy scaleDownPolicy) {
this.scaleDownPolicy = scaleDownPolicy;
}
@Override
public boolean isBackup() {
return true;
}
@Override
public String getScaleDownClustername() {
return null;
}
@Override
public String getScaleDownGroupName() {
return getScaleDownPolicy() != null ? getScaleDownPolicy().getGroupName() : null;
}
public boolean isRestartBackup() {
return restartBackup;
}
public void setRestartBackup(boolean restartBackup) {
this.restartBackup = restartBackup;
}
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java
public class SharedStoreSlavePolicy extends BackupPolicy {
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
private boolean isWaitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
//this is how we act once we have failed over
private SharedStoreMasterPolicy sharedStoreMasterPolicy;
public SharedStoreSlavePolicy() {
}
public SharedStoreSlavePolicy(boolean failoverOnServerShutdown,
boolean restartBackup,
boolean allowAutoFailBack,
ScaleDownPolicy scaleDownPolicy) {
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.restartBackup = restartBackup;
this.allowAutoFailBack = allowAutoFailBack;
this.scaleDownPolicy = scaleDownPolicy;
}
@Deprecated
public long getFailbackDelay() {
return -1;
}
@Deprecated
public void setFailbackDelay(long failbackDelay) {
}
public boolean isFailoverOnServerShutdown() {
return failoverOnServerShutdown;
}
public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown) {
this.failoverOnServerShutdown = failoverOnServerShutdown;
}
public SharedStoreMasterPolicy getSharedStoreMasterPolicy() {
if (sharedStoreMasterPolicy == null) {
sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown, isWaitForActivation);
}
return sharedStoreMasterPolicy;
}
public void setSharedStoreMasterPolicy(SharedStoreMasterPolicy sharedStoreMasterPolicy) {
this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
}
@Override
public boolean isSharedStore() {
return true;
}
@Override
public boolean canScaleDown() {
return scaleDownPolicy != null;
}
public boolean isAllowAutoFailBack() {
return allowAutoFailBack;
}
public void setAllowAutoFailBack(boolean allowAutoFailBack) {
this.allowAutoFailBack = allowAutoFailBack;
}
public void setIsWaitForActivation(boolean isWaitForActivation) {
this.isWaitForActivation = isWaitForActivation;
}
@Override
public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
return new SharedStoreBackupActivation(server, this);
}
@Override
public String getBackupGroupName() {
return null;
}
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
public class ReplicaPolicy extends BackupPolicy {
private String clusterName;
private int maxSavedReplicatedJournalsSize = ActiveMQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();
private String groupName = null;
private boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();
//used if we create a replicated policy for when we become live.
private boolean allowFailback = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
/*
* what quorum size to use for voting
* */
private int quorumSize;
/*
* whether or not this live broker should vote to remain live
* */
private boolean voteOnReplicationFailure;
private ReplicatedPolicy replicatedPolicy;
private final NetworkHealthCheck networkHealthCheck;
private int voteRetries;
private long voteRetryWait;
private final int quorumVoteWait;
private long retryReplicationWait;
//......
@Override
public boolean isRestartBackup() {
return restartBackup;
}
@Override
public void setRestartBackup(boolean restartBackup) {
this.restartBackup = restartBackup;
}
@Override
public boolean isSharedStore() {
return false;
}
//......
}
HAManager继承了ActiveMQComponent接口,它定义了getBackupServers方法;StandaloneHAManager实现了HAManager接口,其getBackupServers方法返回空map;ColocatedHAManager实现了HAManager接口,其getBackupServers方法返回backupServers;activateSharedStoreBackup方法以及activateReplicatedBackup方法都会通过server.createBackupServer(configuration)创建backup,然后添加到backupServers;activateBackup方法则根据haPolicy.getBackupPolicy()来选择执行activateSharedStoreBackup或者是activateReplicatedBackup方法