首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的reconnectAttempts

聊聊artemis的reconnectAttempts

作者头像
code4it
发布2020-02-24 09:46:31
3350
发布2020-02-24 09:46:31
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的reconnectAttempts

reconnectAttempts

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java

public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {

   //......

   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration,
                                                    int reconnectAttempts) throws Exception {
      assertOpen();

      initialize();

      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);

      addToConnecting(factory);
      try {
         try {
            factory.connect(reconnectAttempts);
         } catch (ActiveMQException e1) {
            //we need to make sure is closed just for garbage collection
            factory.close();
            throw e1;
         }
         addFactory(factory);
         return factory;
      } finally {
         removeFromConnecting(factory);
      }
   }

   //......
}
  • ServerLocatorImpl的createSessionFactory方法创建ClientSessionFactoryImpl,然后执行factory.connect(reconnectAttempts)

ClientSessionFactoryImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

   //......

   public void connect(final int initialConnectAttempts) throws ActiveMQException {
      // Get the connection
      getConnectionWithRetry(initialConnectAttempts, null);

      if (connection == null) {
         StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);
         if (backupConfig != null) {
            msg.append(" and backup configuration ").append(backupConfig);
         }
         throw new ActiveMQNotConnectedException(msg.toString());
      }

   }

   private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
      if (!clientProtocolManager.isAlive())
         return;
      if (logger.isTraceEnabled()) {
         logger.trace("getConnectionWithRetry::" + reconnectAttempts +
                         " with retryInterval = " +
                         retryInterval +
                         " multiplier = " +
                         retryIntervalMultiplier, new Exception("trace"));
      }

      long interval = retryInterval;

      int count = 0;

      while (clientProtocolManager.isAlive()) {
         if (logger.isDebugEnabled()) {
            logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
         }

         if (getConnection() != null) {
            if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {
               // transferring old connection version into the new connection
               ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
            }
            if (logger.isDebugEnabled()) {
               logger.debug("Reconnection successful");
            }
            return;
         } else {
            // Failed to get connection

            if (reconnectAttempts != 0) {
               count++;

               if (reconnectAttempts != -1 && count == reconnectAttempts) {
                  if (reconnectAttempts != 1) {
                     ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
                  }

                  return;
               }

               if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
                  ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);
               }

               if (waitForRetry(interval))
                  return;

               // Exponential back-off
               long newInterval = (long) (interval * retryIntervalMultiplier);

               if (newInterval > maxRetryInterval) {
                  newInterval = maxRetryInterval;
               }

               interval = newInterval;
            } else {
               logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
               return;
            }
         }
      }
   }

   public boolean waitForRetry(long interval) {
      try {
         if (clientProtocolManager.waitOnLatch(interval)) {
            return true;
         }
      } catch (InterruptedException ignore) {
         throw new ActiveMQInterruptedException(createTrace);
      }
      return false;
   }

   //......
}  
  • ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待

ActiveMQClientProtocolManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java

public class ActiveMQClientProtocolManager implements ClientProtocolManager {

   //......

   private final CountDownLatch waitLatch = new CountDownLatch(1);

   //......

   public boolean waitOnLatch(long milliseconds) throws InterruptedException {
      return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);
   }

   public void stop() {
      alive = false;

      synchronized (inCreateSessionGuard) {
         if (inCreateSessionLatch != null)
            inCreateSessionLatch.countDown();
      }

      Channel channel1 = getChannel1();
      if (channel1 != null) {
         channel1.returnBlocking();
      }

      waitLatch.countDown();

   }

   //......
}
  • ActiveMQClientProtocolManager有个名为waitLatch的CountDownLatch,waitOnLatch方法通过waitLatch.await(milliseconds, TimeUnit.MILLISECONDS)进行等待,而stop方法则执行waitLatch.countDown()

小结

ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待

doc

  • ClientSessionFactoryImpl
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • reconnectAttempts
  • ClientSessionFactoryImpl
  • ActiveMQClientProtocolManager
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档