前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的handleConnectionFailure

聊聊artemis的handleConnectionFailure

作者头像
code4it
发布2020-02-24 14:20:32
3430
发布2020-02-24 14:20:32
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的handleConnectionFailure

handleConnectionFailure

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

代码语言:javascript
复制
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

   //......

   private void handleConnectionFailure(final Object connectionID,
                                        final ActiveMQException me,
                                        String scaleDownTargetNodeID) {
      try {
         failoverOrReconnect(connectionID, me, scaleDownTargetNodeID);
      } catch (ActiveMQInterruptedException e1) {
         // this is just a debug, since an interrupt is an expected event (in case of a shutdown)
         logger.debug(e1.getMessage(), e1);
      } catch (Throwable t) {
         ActiveMQClientLogger.LOGGER.unableToHandleConnectionFailure(t);
         //for anything else just close so clients are un blocked
         close();
         throw t;
      }
   }

   private void failoverOrReconnect(final Object connectionID,
                                    final ActiveMQException me,
                                    String scaleDownTargetNodeID) {
      ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me);

      for (ClientSessionInternal session : sessions) {
         SessionContext context = session.getSessionContext();
         if (context instanceof ActiveMQSessionContext) {
            ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;
            if (sessionContext.isKilled()) {
               setReconnectAttempts(0);
            }
         }
      }

      Set<ClientSessionInternal> sessionsToClose = null;
      if (!clientProtocolManager.isAlive())
         return;
      Lock localFailoverLock = lockFailover();
      try {
         if (connection == null || !connection.getID().equals(connectionID) || !clientProtocolManager.isAlive()) {
            // We already failed over/reconnected - probably the first failure came in, all the connections were failed
            // over then an async connection exception or disconnect
            // came in for one of the already exitLoop connections, so we return true - we don't want to call the
            // listeners again

            return;
         }

         if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
            logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);
         }

         callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
         // We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
         callSessionFailureListeners(me, false, false, scaleDownTargetNodeID);

         // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
         // There are either no threads executing in createSession, or one is blocking on a createSession
         // result.

         // Then interrupt the channel 1 that is blocking (could just interrupt them all)

         // Then release all channel 1 locks - this allows the createSession to exit the monitor

         // Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and
         // returned all its connections to the connection manager (the code to return connections to connection manager
         // must be inside the lock

         // Then perform failover

         // Then release failoverLock

         // The other side of the bargain - during createSession:
         // The calling thread must get the failoverLock and get its' connections when this is
         // locked.
         // While this is still locked it must then get the channel1 lock
         // It can then release the failoverLock
         // It should catch ActiveMQException.INTERRUPTED in the call to channel.sendBlocking
         // It should then return its connections, with channel 1 lock still held
         // It can then release the channel 1 lock, and retry (which will cause locking on
         // failoverLock
         // until failover is complete

         if (reconnectAttempts != 0) {

            if (clientProtocolManager.cleanupBeforeFailover(me)) {

               // Now we absolutely know that no threads are executing in or blocked in
               // createSession,
               // and no
               // more will execute it until failover is complete

               // So.. do failover / reconnection

               RemotingConnection oldConnection = connection;

               connection = null;

               Connector localConnector = connector;
               if (localConnector != null) {
                  try {
                     localConnector.close();
                  } catch (Exception ignore) {
                     // no-op
                  }
               }

               cancelScheduledTasks();

               connector = null;

               reconnectSessions(oldConnection, reconnectAttempts, me);

               if (oldConnection != null) {
                  oldConnection.destroy();
               }

               if (connection != null) {
                  callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
               }
            }
         } else {
            RemotingConnection connectionToDestory = connection;
            if (connectionToDestory != null) {
               connectionToDestory.destroy();
            }
            connection = null;
         }

         if (connection == null) {
            synchronized (sessions) {
               sessionsToClose = new HashSet<>(sessions);
            }
            callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
            callSessionFailureListeners(me, true, false, scaleDownTargetNodeID);
         }
      } finally {
         localFailoverLock.unlock();
      }

      // This needs to be outside the failover lock to prevent deadlock
      if (connection != null) {
         callSessionFailureListeners(me, true, true);
      }
      if (sessionsToClose != null) {
         // If connection is null it means we didn't succeed in failing over or reconnecting
         // so we close all the sessions, so they will throw exceptions when attempted to be used

         for (ClientSessionInternal session : sessionsToClose) {
            try {
               session.cleanUp(true);
            } catch (Exception cause) {
               ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);
            }
         }
      }
   }

   //......
}
  • handleConnectionFailure方法会调用failoverOrReconnect方法,该方法会先遍历sessions,对于sessionContext.isKilled()为true的执行setReconnectAttempts(0);之后执行lockFailover(),然后reconnectAttempts不为0的执行reconnectSessions,最后执行localFailoverLock.unlock()

reconnectSessions

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

代码语言:javascript
复制
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

   //......

   private void reconnectSessions(final RemotingConnection oldConnection,
                                  final int reconnectAttempts,
                                  final ActiveMQException cause) {
      HashSet<ClientSessionInternal> sessionsToFailover;
      synchronized (sessions) {
         sessionsToFailover = new HashSet<>(sessions);
      }

      for (ClientSessionInternal session : sessionsToFailover) {
         session.preHandleFailover(connection);
      }

      getConnectionWithRetry(reconnectAttempts, oldConnection);

      if (connection == null) {
         if (!clientProtocolManager.isAlive())
            ActiveMQClientLogger.LOGGER.failedToConnectToServer();

         return;
      }

      List<FailureListener> oldListeners = oldConnection.getFailureListeners();

      List<FailureListener> newListeners = new ArrayList<>(connection.getFailureListeners());

      for (FailureListener listener : oldListeners) {
         // Add all apart from the old DelegatingFailureListener
         if (listener instanceof DelegatingFailureListener == false) {
            newListeners.add(listener);
         }
      }

      connection.setFailureListeners(newListeners);

      // This used to be done inside failover
      // it needs to be done on the protocol
      ((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence());

      for (ClientSessionInternal session : sessionsToFailover) {
         if (!session.handleFailover(connection, cause)) {
            connection.destroy();
            this.connection = null;
            return;
         }
      }
   }

   private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
      if (!clientProtocolManager.isAlive())
         return;
      if (logger.isTraceEnabled()) {
         logger.trace("getConnectionWithRetry::" + reconnectAttempts +
                         " with retryInterval = " +
                         retryInterval +
                         " multiplier = " +
                         retryIntervalMultiplier, new Exception("trace"));
      }

      long interval = retryInterval;

      int count = 0;

      while (clientProtocolManager.isAlive()) {
         if (logger.isDebugEnabled()) {
            logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
         }

         if (getConnection() != null) {
            if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {
               // transferring old connection version into the new connection
               ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
            }
            if (logger.isDebugEnabled()) {
               logger.debug("Reconnection successful");
            }
            return;
         } else {
            // Failed to get connection

            if (reconnectAttempts != 0) {
               count++;

               if (reconnectAttempts != -1 && count == reconnectAttempts) {
                  if (reconnectAttempts != 1) {
                     ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
                  }

                  return;
               }

               if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
                  ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);
               }

               if (waitForRetry(interval))
                  return;

               // Exponential back-off
               long newInterval = (long) (interval * retryIntervalMultiplier);

               if (newInterval > maxRetryInterval) {
                  newInterval = maxRetryInterval;
               }

               interval = newInterval;
            } else {
               logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
               return;
            }
         }
      }
   }

   public RemotingConnection getConnection() {
      if (closed)
         throw new IllegalStateException("ClientSessionFactory is closed!");
      if (!clientProtocolManager.isAlive())
         return null;
      synchronized (connectionLock) {
         if (connection != null) {
            // a connection already exists, so returning the same one
            return connection;
         } else {
            RemotingConnection connection = establishNewConnection();

            this.connection = connection;

            //we check if we can actually connect.
            // we do it here as to receive the reply connection has to be not null
            //make sure to reset this.connection == null
            if (connection != null && liveNodeID != null) {
               try {
                  if (!clientProtocolManager.checkForFailover(liveNodeID)) {
                     connection.destroy();
                     this.connection = null;
                     return null;
                  }
               } catch (ActiveMQException e) {
                  connection.destroy();
                  this.connection = null;
                  return null;
               }
            }

            if (connection != null && serverLocator.getAfterConnectInternalListener() != null) {
               serverLocator.getAfterConnectInternalListener().onConnection(this);
            }

            if (serverLocator.getTopology() != null) {
               if (connection != null) {
                  if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
                     logger.trace(this + "::Subscribing Topology");
                  }
                  clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection());
               }
            } else {
               logger.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology"));
            }

            return connection;
         }
      }
   }

   //......
}
  • reconnectSessions方法首先执行getConnectionWithRetry,然后挨个将oldListeners添加到新的connection中,最后遍历sessionsToFailover执行session.handleFailover(connection, cause),对于返回false的执行connection.destroy()然后return;getConnectionWithRetry方法通过getConnection()获取新连接并赋值给connection,如果connection为null则进行重试直到reconnectAttempts小于等于0,重试时通过waitForRetry(interval)来控制重试的间隔

小结

ClientSessionFactoryImpl的handleConnectionFailure方法会调用failoverOrReconnect方法,该方法会先遍历sessions,对于sessionContext.isKilled()为true的执行setReconnectAttempts(0);之后执行lockFailover(),然后reconnectAttempts不为0的执行reconnectSessions,最后执行localFailoverLock.unlock()

doc

  • ClientSessionFactoryImpl
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • handleConnectionFailure
  • reconnectSessions
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档