前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的maxDeliveryAttempts

聊聊artemis的maxDeliveryAttempts

作者头像
code4it
发布2020-02-24 09:40:47
3000
发布2020-02-24 09:40:47
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的maxDeliveryAttempts

maxDeliveryAttempts

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java

代码语言:javascript
复制
public class AddressSettings implements Mergeable<AddressSettings>, Serializable, EncodingSupport {

   //......

   public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;

   private Integer maxDeliveryAttempts = null;

   private SimpleString deadLetterAddress = null;

   //......

   public int getMaxDeliveryAttempts() {
      return maxDeliveryAttempts != null ? maxDeliveryAttempts : AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS;
   }

   public AddressSettings setMaxDeliveryAttempts(final int maxDeliveryAttempts) {
      this.maxDeliveryAttempts = maxDeliveryAttempts;
      return this;
   }

   public SimpleString getDeadLetterAddress() {
      return deadLetterAddress;
   }

   public AddressSettings setDeadLetterAddress(final SimpleString deadLetterAddress) {
      this.deadLetterAddress = deadLetterAddress;
      return this;
   }

   //......
}  
  • AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10

checkRedelivery

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

代码语言:javascript
复制
public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
                                  final long timeBase,
                                  final boolean ignoreRedeliveryDelay) throws Exception {
      Message message = reference.getMessage();

      if (internalQueue) {
         if (logger.isTraceEnabled()) {
            logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
         }
         // no DLQ check on internal queues
         return new Pair<>(true, false);
      }

      if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
         storageManager.updateDeliveryCount(reference);
      }

      AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());

      int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
      int deliveryCount = reference.getDeliveryCount();

      // First check DLA
      if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) {
         if (logger.isTraceEnabled()) {
            logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
         }
         boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());

         return new Pair<>(false, dlaResult);
      } else {
         // Second check Redelivery Delay
         long redeliveryDelay = addressSettings.getRedeliveryDelay();
         if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
            redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount);

            if (logger.isTraceEnabled()) {
               logger.trace("Setting redeliveryDelay=" + redeliveryDelay + " on reference=" + reference);
            }

            reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);

            if (!reference.isPaged() && reference.isDurable() && isDurable()) {
               storageManager.updateScheduledDeliveryTime(reference);
            }
         }

         decDelivering(reference);

         return new Pair<>(true, false);
      }
   }

   //......
}
  • QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress

sendToDeadLetterAddress

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

代码语言:javascript
复制
public class QueueImpl extends CriticalComponentImpl implements Queue {
  
   //......

   private boolean sendToDeadLetterAddress(final Transaction tx,
                                        final MessageReference ref,
                                        final SimpleString deadLetterAddress) throws Exception {
      if (deadLetterAddress != null) {
         Bindings bindingList = postOffice.lookupBindingsForAddress(deadLetterAddress);

         if (bindingList == null || bindingList.getBindings().isEmpty()) {
            ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
            ref.acknowledge(tx, AckReason.KILLED, null);
         } else {
            ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
            move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
            return true;
         }
      } else {
         ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);

         ref.acknowledge(tx, AckReason.KILLED, null);
      }

      return false;
   }

   private void move(final Transaction originalTX,
                     final SimpleString address,
                     final Binding binding,
                     final MessageReference ref,
                     final boolean rejectDuplicate,
                     final AckReason reason,
                     final ServerConsumer consumer) throws Exception {
      Transaction tx;

      if (originalTX != null) {
         tx = originalTX;
      } else {
         // if no TX we create a new one to commit at the end
         tx = new TransactionImpl(storageManager);
      }

      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);

      copyMessage.setAddress(address);

      postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);

      acknowledge(tx, ref, reason, consumer);

      if (originalTX == null) {
         tx.commit();
      }
   }

   //......
}
  • sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

incrementDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

代码语言:javascript
复制
public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

   //......

   public HandleStatus handle(final MessageReference ref) throws Exception {
      // available credits can be set back to null with a flow control option.
      AtomicInteger checkInteger = availableCredits;
      if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
         if (logger.isDebugEnabled()) {
            logger.debug(this + " is busy for the lack of credits. Current credits = " +
                            availableCredits +
                            " Can't receive reference " +
                            ref);
         }

         return HandleStatus.BUSY;
      }

      synchronized (lock) {
         // If the consumer is stopped then we don't accept the message, it
         // should go back into the
         // queue for delivery later.
         // TCP-flow control has to be done first than everything else otherwise we may lose notifications
         if ((callback != null && !callback.isWritable(this, protocolContext)) || !started || transferring) {
            return HandleStatus.BUSY;
         }

         // If there is a pendingLargeMessage we can't take another message
         // This has to be checked inside the lock as the set to null is done inside the lock
         if (largeMessageDeliverer != null) {
            if (logger.isDebugEnabled()) {
               logger.debug(this + " is busy delivering large message " +
                               largeMessageDeliverer +
                               ", can't deliver reference " +
                               ref);
            }
            return HandleStatus.BUSY;
         }
         final Message message = ref.getMessage();

         if (!message.acceptsConsumer(sequentialID())) {
            return HandleStatus.NO_MATCH;
         }

         if (filter != null && !filter.match(message)) {
            if (logger.isTraceEnabled()) {
               logger.trace("Reference " + ref + " is a noMatch on consumer " + this);
            }
            return HandleStatus.NO_MATCH;
         }

         if (logger.isTraceEnabled()) {
            logger.trace("ServerConsumerImpl::" + this + " Handling reference " + ref);
         }
         if (!browseOnly) {
            if (!preAcknowledge) {
               deliveringRefs.add(ref);
            }

            ref.handled();

            ref.setConsumerId(this.id);

            ref.incrementDeliveryCount();

            // If updateDeliveries = false (set by strict-update),
            // the updateDeliveryCountAfterCancel would still be updated after c
            if (strictUpdateDeliveryCount && !ref.isPaged()) {
               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
                  !ref.getQueue().isInternalQueue() &&
                  !ref.isPaged()) {
                  storageManager.updateDeliveryCount(ref);
               }
            }

            if (preAcknowledge) {
               if (message.isLargeMessage()) {
                  // we must hold one reference, or the file will be deleted before it could be delivered
                  ((LargeServerMessage) message).incrementDelayDeletionCount();
               }

               // With pre-ack, we ack *before* sending to the client
               ref.getQueue().acknowledge(ref, this);
               acks++;
            }

            if (message.isLargeMessage() && this.supportLargeMessage) {
               largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, ref);
            }

         }

         pendingDelivery.countUp();

         return HandleStatus.HANDLED;
      }
   }

   //......
}
  • ServerConsumerImpl的handle方法会在非browseOnly的情况下会调用ref.incrementDeliveryCount()来增加deliveryCount;必要的时候会执行storageManager.updateDeliveryCount(ref)

updateDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java

代码语言:javascript
复制
public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {

   //......

   public void updateDeliveryCount(final MessageReference ref) throws Exception {
      // no need to store if it's the same value
      // otherwise the journal will get OME in case of lots of redeliveries
      if (ref.getDeliveryCount() == ref.getPersistedCount()) {
         return;
      }

      ref.setPersistedCount(ref.getDeliveryCount());
      DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());

      readLock();
      try {
         messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
      } finally {
         readUnLock();
      }
   }

   //......
}
  • AbstractJournalStorageManager的updateDeliveryCount方法会更新persistedCount到storage

小结

AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10;QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress;sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

doc

  • QueueImpl
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-01-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • maxDeliveryAttempts
  • checkRedelivery
  • sendToDeadLetterAddress
  • incrementDeliveryCount
  • updateDeliveryCount
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档