前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的reconnectAttempts

聊聊artemis的reconnectAttempts

原创
作者头像
code4it
修改2020-02-13 10:01:33
3850
修改2020-02-13 10:01:33
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下artemis的reconnectAttempts

reconnectAttempts

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java

代码语言:javascript
复制
public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {
​
   //......
​
   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration,
                                                    int reconnectAttempts) throws Exception {
      assertOpen();
​
      initialize();
​
      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
​
      addToConnecting(factory);
      try {
         try {
            factory.connect(reconnectAttempts);
         } catch (ActiveMQException e1) {
            //we need to make sure is closed just for garbage collection
            factory.close();
            throw e1;
         }
         addFactory(factory);
         return factory;
      } finally {
         removeFromConnecting(factory);
      }
   }
​
   //......
}
  • ServerLocatorImpl的createSessionFactory方法创建ClientSessionFactoryImpl,然后执行factory.connect(reconnectAttempts)

ClientSessionFactoryImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

代码语言:javascript
复制
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
​
   //......
​
   public void connect(final int initialConnectAttempts) throws ActiveMQException {
      // Get the connection
      getConnectionWithRetry(initialConnectAttempts, null);
​
      if (connection == null) {
         StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);
         if (backupConfig != null) {
            msg.append(" and backup configuration ").append(backupConfig);
         }
         throw new ActiveMQNotConnectedException(msg.toString());
      }
​
   }
​
   private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
      if (!clientProtocolManager.isAlive())
         return;
      if (logger.isTraceEnabled()) {
         logger.trace("getConnectionWithRetry::" + reconnectAttempts +
                         " with retryInterval = " +
                         retryInterval +
                         " multiplier = " +
                         retryIntervalMultiplier, new Exception("trace"));
      }
​
      long interval = retryInterval;
​
      int count = 0;
​
      while (clientProtocolManager.isAlive()) {
         if (logger.isDebugEnabled()) {
            logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
         }
​
         if (getConnection() != null) {
            if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {
               // transferring old connection version into the new connection
               ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
            }
            if (logger.isDebugEnabled()) {
               logger.debug("Reconnection successful");
            }
            return;
         } else {
            // Failed to get connection
​
            if (reconnectAttempts != 0) {
               count++;
​
               if (reconnectAttempts != -1 && count == reconnectAttempts) {
                  if (reconnectAttempts != 1) {
                     ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
                  }
​
                  return;
               }
​
               if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
                  ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);
               }
​
               if (waitForRetry(interval))
                  return;
​
               // Exponential back-off
               long newInterval = (long) (interval * retryIntervalMultiplier);
​
               if (newInterval > maxRetryInterval) {
                  newInterval = maxRetryInterval;
               }
​
               interval = newInterval;
            } else {
               logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
               return;
            }
         }
      }
   }
​
   public boolean waitForRetry(long interval) {
      try {
         if (clientProtocolManager.waitOnLatch(interval)) {
            return true;
         }
      } catch (InterruptedException ignore) {
         throw new ActiveMQInterruptedException(createTrace);
      }
      return false;
   }
​
   //......
}   
  • ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待

ActiveMQClientProtocolManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java

代码语言:javascript
复制
public class ActiveMQClientProtocolManager implements ClientProtocolManager {
​
   //......
​
   private final CountDownLatch waitLatch = new CountDownLatch(1);
​
   //......
​
   public boolean waitOnLatch(long milliseconds) throws InterruptedException {
      return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);
   }
​
   public void stop() {
      alive = false;
​
      synchronized (inCreateSessionGuard) {
         if (inCreateSessionLatch != null)
            inCreateSessionLatch.countDown();
      }
​
      Channel channel1 = getChannel1();
      if (channel1 != null) {
         channel1.returnBlocking();
      }
​
      waitLatch.countDown();
​
   }
​
   //......
}
  • ActiveMQClientProtocolManager有个名为waitLatch的CountDownLatch,waitOnLatch方法通过waitLatch.await(milliseconds, TimeUnit.MILLISECONDS)进行等待,而stop方法则执行waitLatch.countDown()

小结

ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • reconnectAttempts
  • ClientSessionFactoryImpl
  • ActiveMQClientProtocolManager
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档