前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的lastValueProperty

聊聊artemis的lastValueProperty

作者头像
code4it
发布2020-02-24 09:45:16
2590
发布2020-02-24 09:45:16
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的lastValueProperty

CoreMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java

代码语言:javascript
复制
public class CoreMessage extends RefCountMessage implements ICoreMessage {

   //......

   public SimpleString getLastValueProperty() {
      return getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
   }

   @Override
   public Message setLastValueProperty(SimpleString lastValueName) {
      return putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValueName);
   }

   //......
}
  • CoreMessage提供了getLastValueProperty、setLastValueProperty方法,设置的是message的Message.HDR_LAST_VALUE_NAME(_AMQ_LVQ_NAME)属性

MessageReferenceImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java

代码语言:javascript
复制
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {

   //......

   public SimpleString getLastValueProperty() {
      SimpleString lastValue = message.getSimpleStringProperty(queue.getLastValueKey());
      if (lastValue == null) {
         lastValue = message.getLastValueProperty();
      }
      return lastValue;
   }

   //......
}
  • MessageReferenceImpl提供了getLastValueProperty方法,它先从message获取queue.getLastValueKey()的属性值,如果为null再读取message.getLastValueProperty()

LastValueQueue

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java

代码语言:javascript
复制
public class LastValueQueue extends QueueImpl {

   private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
   private final SimpleString lastValueKey;

   //......

   @Override
   public synchronized void addTail(final MessageReference ref, final boolean direct) {
      if (scheduleIfPossible(ref)) {
         return;
      }
      final SimpleString prop = ref.getLastValueProperty();

      if (prop != null) {
         HolderReference hr = map.get(prop);

         if (hr != null) {
            // We need to overwrite the old ref with the new one and ack the old one

            replaceLVQMessage(ref, hr);

         } else {
            hr = new HolderReference(prop, ref);

            map.put(prop, hr);

            super.addTail(hr, direct);
         }
      } else {
         super.addTail(ref, direct);
      }
   }

   @Override
   public synchronized void addHead(final MessageReference ref, boolean scheduling) {
      // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
      if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
         return;
      }

      SimpleString lastValueProp = ref.getLastValueProperty();

      if (lastValueProp != null) {
         HolderReference hr = map.get(lastValueProp);

         if (hr != null) {
            if (scheduling) {
               // We need to overwrite the old ref with the new one and ack the old one

               replaceLVQMessage(ref, hr);
            } else {
               // We keep the current ref and ack the one we are returning

               super.referenceHandled(ref);

               try {
                  super.acknowledge(ref);
               } catch (Exception e) {
                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
               }
            }
         } else {
            hr = new HolderReference(lastValueProp, ref);

            map.put(lastValueProp, hr);

            super.addHead(hr, scheduling);
         }
      } else {
         super.addHead(ref, scheduling);
      }
   }

   private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
      MessageReference oldRef = hr.getReference();

      referenceHandled(oldRef);
      super.refRemoved(oldRef);

      try {
         oldRef.acknowledge(null, AckReason.REPLACED, null);
      } catch (Exception e) {
         ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
      }

      hr.setReference(ref);
      addRefSize(ref);
      refAdded(ref);
   }

   //......  
}
  • LastValueQueue继承了QueueImpl,其addTail或addHead方法先通过ref.getLastValueProperty()找到lastValueProp,在从map获取HolderReference,对于HolderReference不为null的,执行replaceLVQMessage方法;该方法会替换到oldRef并对oldRef进行ack

小结

CoreMessage提供了getLastValueProperty、setLastValueProperty方法,设置的是message的Message.HDR_LAST_VALUE_NAME(_AMQ_LVQ_NAME)属性;MessageReferenceImpl提供了getLastValueProperty方法,它先从message获取queue.getLastValueKey()的属性值,如果为null再读取message.getLastValueProperty();LastValueQueue继承了QueueImpl,其addTail或addHead方法先通过ref.getLastValueProperty()找到lastValueProp,在从map获取HolderReference,对于HolderReference不为null的,执行replaceLVQMessage方法;该方法会替换到oldRef并对oldRef进行ack

doc

  • LastValueQueue
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CoreMessage
  • MessageReferenceImpl
  • LastValueQueue
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档