本文主要研究一下artemis的handleConnectionFailure
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
//......
private void handleConnectionFailure(final Object connectionID,
final ActiveMQException me,
String scaleDownTargetNodeID) {
try {
failoverOrReconnect(connectionID, me, scaleDownTargetNodeID);
} catch (ActiveMQInterruptedException e1) {
// this is just a debug, since an interrupt is an expected event (in case of a shutdown)
logger.debug(e1.getMessage(), e1);
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.unableToHandleConnectionFailure(t);
//for anything else just close so clients are un blocked
close();
throw t;
}
}
private void failoverOrReconnect(final Object connectionID,
final ActiveMQException me,
String scaleDownTargetNodeID) {
ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me);
for (ClientSessionInternal session : sessions) {
SessionContext context = session.getSessionContext();
if (context instanceof ActiveMQSessionContext) {
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;
if (sessionContext.isKilled()) {
setReconnectAttempts(0);
}
}
}
Set<ClientSessionInternal> sessionsToClose = null;
if (!clientProtocolManager.isAlive())
return;
Lock localFailoverLock = lockFailover();
try {
if (connection == null || !connection.getID().equals(connectionID) || !clientProtocolManager.isAlive()) {
// We already failed over/reconnected - probably the first failure came in, all the connections were failed
// over then an async connection exception or disconnect
// came in for one of the already exitLoop connections, so we return true - we don't want to call the
// listeners again
return;
}
if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);
}
callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
// We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
callSessionFailureListeners(me, false, false, scaleDownTargetNodeID);
// Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
// There are either no threads executing in createSession, or one is blocking on a createSession
// result.
// Then interrupt the channel 1 that is blocking (could just interrupt them all)
// Then release all channel 1 locks - this allows the createSession to exit the monitor
// Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and
// returned all its connections to the connection manager (the code to return connections to connection manager
// must be inside the lock
// Then perform failover
// Then release failoverLock
// The other side of the bargain - during createSession:
// The calling thread must get the failoverLock and get its' connections when this is
// locked.
// While this is still locked it must then get the channel1 lock
// It can then release the failoverLock
// It should catch ActiveMQException.INTERRUPTED in the call to channel.sendBlocking
// It should then return its connections, with channel 1 lock still held
// It can then release the channel 1 lock, and retry (which will cause locking on
// failoverLock
// until failover is complete
if (reconnectAttempts != 0) {
if (clientProtocolManager.cleanupBeforeFailover(me)) {
// Now we absolutely know that no threads are executing in or blocked in
// createSession,
// and no
// more will execute it until failover is complete
// So.. do failover / reconnection
RemotingConnection oldConnection = connection;
connection = null;
Connector localConnector = connector;
if (localConnector != null) {
try {
localConnector.close();
} catch (Exception ignore) {
// no-op
}
}
cancelScheduledTasks();
connector = null;
reconnectSessions(oldConnection, reconnectAttempts, me);
if (oldConnection != null) {
oldConnection.destroy();
}
if (connection != null) {
callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
}
}
} else {
RemotingConnection connectionToDestory = connection;
if (connectionToDestory != null) {
connectionToDestory.destroy();
}
connection = null;
}
if (connection == null) {
synchronized (sessions) {
sessionsToClose = new HashSet<>(sessions);
}
callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
callSessionFailureListeners(me, true, false, scaleDownTargetNodeID);
}
} finally {
localFailoverLock.unlock();
}
// This needs to be outside the failover lock to prevent deadlock
if (connection != null) {
callSessionFailureListeners(me, true, true);
}
if (sessionsToClose != null) {
// If connection is null it means we didn't succeed in failing over or reconnecting
// so we close all the sessions, so they will throw exceptions when attempted to be used
for (ClientSessionInternal session : sessionsToClose) {
try {
session.cleanUp(true);
} catch (Exception cause) {
ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);
}
}
}
}
//......
}
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
//......
private void reconnectSessions(final RemotingConnection oldConnection,
final int reconnectAttempts,
final ActiveMQException cause) {
HashSet<ClientSessionInternal> sessionsToFailover;
synchronized (sessions) {
sessionsToFailover = new HashSet<>(sessions);
}
for (ClientSessionInternal session : sessionsToFailover) {
session.preHandleFailover(connection);
}
getConnectionWithRetry(reconnectAttempts, oldConnection);
if (connection == null) {
if (!clientProtocolManager.isAlive())
ActiveMQClientLogger.LOGGER.failedToConnectToServer();
return;
}
List<FailureListener> oldListeners = oldConnection.getFailureListeners();
List<FailureListener> newListeners = new ArrayList<>(connection.getFailureListeners());
for (FailureListener listener : oldListeners) {
// Add all apart from the old DelegatingFailureListener
if (listener instanceof DelegatingFailureListener == false) {
newListeners.add(listener);
}
}
connection.setFailureListeners(newListeners);
// This used to be done inside failover
// it needs to be done on the protocol
((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence());
for (ClientSessionInternal session : sessionsToFailover) {
if (!session.handleFailover(connection, cause)) {
connection.destroy();
this.connection = null;
return;
}
}
}
private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
if (!clientProtocolManager.isAlive())
return;
if (logger.isTraceEnabled()) {
logger.trace("getConnectionWithRetry::" + reconnectAttempts +
" with retryInterval = " +
retryInterval +
" multiplier = " +
retryIntervalMultiplier, new Exception("trace"));
}
long interval = retryInterval;
int count = 0;
while (clientProtocolManager.isAlive()) {
if (logger.isDebugEnabled()) {
logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
}
if (getConnection() != null) {
if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {
// transferring old connection version into the new connection
((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
}
if (logger.isDebugEnabled()) {
logger.debug("Reconnection successful");
}
return;
} else {
// Failed to get connection
if (reconnectAttempts != 0) {
count++;
if (reconnectAttempts != -1 && count == reconnectAttempts) {
if (reconnectAttempts != 1) {
ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
}
return;
}
if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);
}
if (waitForRetry(interval))
return;
// Exponential back-off
long newInterval = (long) (interval * retryIntervalMultiplier);
if (newInterval > maxRetryInterval) {
newInterval = maxRetryInterval;
}
interval = newInterval;
} else {
logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
return;
}
}
}
}
public RemotingConnection getConnection() {
if (closed)
throw new IllegalStateException("ClientSessionFactory is closed!");
if (!clientProtocolManager.isAlive())
return null;
synchronized (connectionLock) {
if (connection != null) {
// a connection already exists, so returning the same one
return connection;
} else {
RemotingConnection connection = establishNewConnection();
this.connection = connection;
//we check if we can actually connect.
// we do it here as to receive the reply connection has to be not null
//make sure to reset this.connection == null
if (connection != null && liveNodeID != null) {
try {
if (!clientProtocolManager.checkForFailover(liveNodeID)) {
connection.destroy();
this.connection = null;
return null;
}
} catch (ActiveMQException e) {
connection.destroy();
this.connection = null;
return null;
}
}
if (connection != null && serverLocator.getAfterConnectInternalListener() != null) {
serverLocator.getAfterConnectInternalListener().onConnection(this);
}
if (serverLocator.getTopology() != null) {
if (connection != null) {
if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
logger.trace(this + "::Subscribing Topology");
}
clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection());
}
} else {
logger.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology"));
}
return connection;
}
}
}
//......
}
ClientSessionFactoryImpl的handleConnectionFailure方法会调用failoverOrReconnect方法,该方法会先遍历sessions,对于sessionContext.isKilled()为true的执行setReconnectAttempts(0);之后执行lockFailover(),然后reconnectAttempts不为0的执行reconnectSessions,最后执行localFailoverLock.unlock()