前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的callFailoverTimeout

聊聊artemis的callFailoverTimeout

原创
作者头像
code4it
修改2020-02-18 09:27:05
2970
修改2020-02-18 09:27:05
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下artemis的callFailoverTimeout

establishNewConnection

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

代码语言:javascript
复制
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
​
   //......
​
   protected RemotingConnection establishNewConnection() {
      Connection transportConnection = createTransportConnection();
​
      if (transportConnection == null) {
         if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
            logger.trace("Neither backup or live were active, will just give up now");
         }
         return null;
      }
​
      RemotingConnection newConnection = clientProtocolManager.connect(transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, new SessionFactoryTopologyHandler());
​
      newConnection.addFailureListener(new DelegatingFailureListener(newConnection.getID()));
​
      schedulePing();
​
      if (logger.isTraceEnabled()) {
         logger.trace("returning " + newConnection);
      }
​
      return newConnection;
   }
​
   //......
}
  • ClientSessionFactoryImpl的establishNewConnection通过clientProtocolManager.connect创建RemotingConnection

ActiveMQClientProtocolManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java

代码语言:javascript
复制
public class ActiveMQClientProtocolManager implements ClientProtocolManager {
​
   //......
​
   public RemotingConnection connect(Connection transportConnection,
                                     long callTimeout,
                                     long callFailoverTimeout,
                                     List<Interceptor> incomingInterceptors,
                                     List<Interceptor> outgoingInterceptors,
                                     TopologyResponseHandler topologyResponseHandler) {
      this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, executor);
​
      this.topologyResponseHandler = topologyResponseHandler;
​
      getChannel0().setHandler(new Channel0Handler(connection));
​
      sendHandshake(transportConnection);
​
      return connection;
   }
​
   //......
}
  • ActiveMQClientProtocolManager的connect方法创建了RemotingConnectionImpl

RemotingConnectionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java

代码语言:javascript
复制
public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {
​
   //......
​
   private final long blockingCallTimeout;
​
   private final long blockingCallFailoverTimeout;
​
   //......
​
   public RemotingConnectionImpl(final PacketDecoder packetDecoder,
                                 final Connection transportConnection,
                                 final long blockingCallTimeout,
                                 final long blockingCallFailoverTimeout,
                                 final List<Interceptor> incomingInterceptors,
                                 final List<Interceptor> outgoingInterceptors,
                                 final Executor connectionExecutor) {
      this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, connectionExecutor);
   }
​
   private RemotingConnectionImpl(final PacketDecoder packetDecoder,
                                  final Connection transportConnection,
                                  final long blockingCallTimeout,
                                  final long blockingCallFailoverTimeout,
                                  final List<Interceptor> incomingInterceptors,
                                  final List<Interceptor> outgoingInterceptors,
                                  final boolean client,
                                  final SimpleString nodeID,
                                  final Executor connectionExecutor) {
      super(transportConnection, connectionExecutor);
​
      this.packetDecoder = packetDecoder;
​
      this.blockingCallTimeout = blockingCallTimeout;
​
      this.blockingCallFailoverTimeout = blockingCallFailoverTimeout;
​
      this.incomingInterceptors = incomingInterceptors;
​
      this.outgoingInterceptors = outgoingInterceptors;
​
      this.client = client;
​
      this.nodeID = nodeID;
​
      transportConnection.setProtocolConnection(this);
​
      if (logger.isTraceEnabled()) {
         logger.trace("RemotingConnectionImpl created: " + this);
      }
   }
​
   @Override
   public long getBlockingCallTimeout() {
      return blockingCallTimeout;
   }
​
   @Override
   public long getBlockingCallFailoverTimeout() {
      return blockingCallFailoverTimeout;
   }
​
   //......
}   
  • RemotingConnectionImpl定义了blockingCallFailoverTimeout属性

waitForFailOver

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java

代码语言:javascript
复制
public final class ChannelImpl implements Channel {
​
   //......
​
   private final Lock lock = new ReentrantLock();
​
   private final Condition sendCondition = lock.newCondition();
​
   private final Condition failoverCondition = lock.newCondition();
​
   private boolean failingOver;
​
   //......
​
   private void waitForFailOver(String timeoutMsg) {
      try {
         if (connection.getBlockingCallFailoverTimeout() < 0) {
            while (failingOver) {
               failoverCondition.await();
            }
         } else if (!ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout())) {
            logger.debug(timeoutMsg);
         }
      } catch (InterruptedException e) {
         throw new ActiveMQInterruptedException(e);
      }
   }
​
   private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
      if (invokeInterceptors(packet, interceptors, connection) != null) {
         return false;
      }
​
      synchronized (sendLock) {
         packet.setChannelID(id);
​
         if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
            packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
         }
​
         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
         }
​
         ActiveMQBuffer buffer = packet.encode(connection);
​
         lock.lock();
​
         try {
            if (failingOver) {
               waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
            }
​
            // Sanity check
            if (transferring) {
               throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
            }
​
            if (resendCache != null && packet.isRequiresConfirmations()) {
               addResendPacket(packet);
            }
​
         } finally {
            lock.unlock();
         }
​
         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
         }
​
         checkReconnectID(reconnectID);
​
         //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
         //As the send could block if the response cache cannot add, preventing responses to be handled.
         if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
            while (!responseAsyncCache.add(packet)) {
               try {
                  Thread.sleep(1);
               } catch (Exception e) {
                  // Ignore
               }
            }
         }
​
         // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
         // buffer is full, preventing any incoming buffers being handled and blocking failover
         try {
            connection.getTransportConnection().write(buffer, flush, batch);
         } catch (Throwable t) {
            //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
            //The client would get still know about this as the exception bubbles up the call stack instead.
            if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
               responseAsyncCache.remove(packet.getCorrelationID());
            }
            throw t;
         }
         return true;
      }
   }
​
   @Override
   public void lock() {
      if (logger.isTraceEnabled()) {
         logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " lock channel " + this);
      }
      lock.lock();
​
      reconnectID.incrementAndGet();
​
      failingOver = true;
​
      lock.unlock();
   }
​
   @Override
   public void unlock() {
      if (logger.isTraceEnabled()) {
         logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " unlock channel " + this);
      }
      lock.lock();
​
      failingOver = false;
​
      failoverCondition.signalAll();
​
      lock.unlock();
   }
​
   //......
}
  • ChannelImpl的waitForFailOver在connection.getBlockingCallFailoverTimeout()大于等于0的时候执行ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout());send方法在failingOver为true时会执行waitForFailOver方法;其lock方法会设置failingOver为true,unlock方法会设置failingOver为false

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

代码语言:javascript
复制
public class ActiveMQSessionContext extends SessionContext {
​
   //......
​
   private final Channel sessionChannel;
​
   //......
​
   @Override
   public void lockCommunications() {
      sessionChannel.lock();
   }
​
   @Override
   public void releaseCommunications() {
      sessionChannel.setTransferring(false);
      sessionChannel.unlock();
   }
​
   //......
}
  • ActiveMQSessionContext的lockCommunications方法会执行sessionChannel.lock(),而releaseCommunications会执行sessionChannel.unlock()

ClientSessionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

代码语言:javascript
复制
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {
​
   //......
​
   public void preHandleFailover(RemotingConnection connection) {
      // We lock the channel to prevent any packets to be added to the re-send
      // cache during the failover process
      //we also do this before the connection fails over to give the session a chance to block for failover
      sessionContext.lockCommunications();
   }
​
   public boolean handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) {
      boolean suc = true;
​
      synchronized (this) {
         if (closed) {
            return true;
         }
​
         boolean resetCreditManager = false;
​
         try {
​
            // TODO remove this and encapsulate it
​
            boolean reattached = sessionContext.reattachOnNewConnection(backupConnection);
​
            if (!reattached) {
​
               // We change the name of the Session, otherwise the server could close it while we are still sending the recreate
               // in certain failure scenarios
               // For instance the fact we didn't change the name of the session after failover or reconnect
               // was the reason allowing multiple Sessions to be closed simultaneously breaking concurrency
               this.name = UUIDGenerator.getInstance().generateStringUUID();
​
               sessionContext.resetName(name);
​
               Map<ConsumerContext, ClientConsumerInternal> clonedConsumerEntries = cloneConsumerEntries();
​
               for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {
                  consumer.clearAtFailover();
               }
​
               // The session wasn't found on the server - probably we're failing over onto a backup server where the
               // session won't exist or the target server has been restarted - in this case the session will need to be
               // recreated,
               // and we'll need to recreate any consumers
​
               // It could also be that the server hasn't been restarted, but the session is currently executing close,
               // and
               // that
               // has already been executed on the server, that's why we can't find the session- in this case we *don't*
               // want
               // to recreate the session, we just want to unblock the blocking call
               if (!inClose && mayAttemptToFailover) {
                  sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
​
                  for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : clonedConsumerEntries.entrySet()) {
​
                     ClientConsumerInternal consumerInternal = entryx.getValue();
                     synchronized (consumerInternal) {
                        if (!consumerInternal.isClosed()) {
                           sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started);
                        }
                     }
                  }
​
                  if ((!autoCommitAcks || !autoCommitSends) && workDone) {
                     // this is protected by a lock, so we can guarantee nothing will sneak here
                     // while we do our work here
                     rollbackOnly = true;
                  }
                  if (currentXID != null) {
                     sessionContext.xaFailed(currentXID);
                     rollbackOnly = true;
                  }
​
                  // Now start the session if it was already started
                  if (started) {
                     for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {
                        consumer.clearAtFailover();
                        consumer.start();
                     }
​
                     sessionContext.restartSession();
                  }
​
                  resetCreditManager = true;
               }
​
               sessionContext.returnBlocking(cause);
            }
         } catch (Throwable t) {
            ActiveMQClientLogger.LOGGER.failedToHandleFailover(t);
            suc = false;
         } finally {
            sessionContext.releaseCommunications();
         }
​
         if (resetCreditManager) {
            synchronized (producerCreditManager) {
               producerCreditManager.reset();
            }
​
            // Also need to send more credits for consumers, otherwise the system could hand with the server
            // not having any credits to send
         }
      }
​
      HashMap<String, String> metaDataToSend;
​
      synchronized (metadata) {
         metaDataToSend = new HashMap<>(metadata);
      }
​
      sessionContext.resetMetadata(metaDataToSend);
​
      return suc;
​
   }
​
   //......
}   
  • ClientSessionImpl的preHandleFailover方法会执行sessionContext.lockCommunications(),而handleFailover方法在recreateConsumerOnServer之后的finally里头会执行sessionContext.releaseCommunications()

小结

RemotingConnectionImpl定义了blockingCallFailoverTimeout属性;ChannelImpl的waitForFailOver在connection.getBlockingCallFailoverTimeout()大于等于0的时候执行ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout());send方法在failingOver为true时会执行waitForFailOver方法;其lock方法会设置failingOver为true,unlock方法会设置failingOver为false

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • establishNewConnection
  • ActiveMQClientProtocolManager
  • RemotingConnectionImpl
  • waitForFailOver
  • ActiveMQSessionContext
  • ClientSessionImpl
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档