前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis message的duplicateProperty

聊聊artemis message的duplicateProperty

原创
作者头像
code4it
修改2020-02-10 17:07:54
5370
修改2020-02-10 17:07:54
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的duplicateProperty

Message

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java

代码语言:javascript
复制
public interface Message {
​
   //......
​
   default Object getDuplicateProperty() {
      return null;
   }
​
   default byte[] getDuplicateIDBytes() {
      Object duplicateID = getDuplicateProperty();
​
      if (duplicateID == null) {
         return null;
      } else {
         if (duplicateID instanceof SimpleString) {
            return ((SimpleString) duplicateID).getData();
         } else if (duplicateID instanceof String) {
            return new SimpleString(duplicateID.toString()).getData();
         } else {
            return (byte[]) duplicateID;
         }
      }
   }
​
   //......
}
  • Message接口定义了getDuplicateProperty、getDuplicateIDBytes方法,其中getDuplicateIDBytes方法会读取getDuplicateProperty的值,然后转换为byte数组

CoreMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java

代码语言:javascript
复制
public class CoreMessage extends RefCountMessage implements ICoreMessage {
​
   //......
​
   public Object getDuplicateProperty() {
      return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
   }
​
   //......
}
  • CoreMessage实现了ICoreMessage接口,而ICoreMessage接口继承了Message接口;它的getDuplicateProperty方法会取Message.HDR_DUPLICATE_DETECTION_ID属性的值

checkDuplicateID

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

代码语言:javascript
复制
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {
​
   //......
​
   private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<>();
​
   //......   
​
   public RoutingStatus route(final Message message,
                              final RoutingContext context,
                              final boolean direct,
                              boolean rejectDuplicates,
                              final Binding bindingMove) throws Exception {
​
      RoutingStatus result;
      // Sanity check
      if (message.getRefCount() > 0) {
         throw new IllegalStateException("Message cannot be routed more than once");
      }
​
      final SimpleString address = context.getAddress(message);
​
      setPagingStore(address, message);
​
      AtomicBoolean startedTX = new AtomicBoolean(false);
​
      applyExpiryDelay(message, address);
​
      if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
         return RoutingStatus.DUPLICATED_ID;
      }
​
      message.clearInternalProperties();
​
      Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
​
      AddressInfo addressInfo = addressManager.getAddressInfo(address);
​
      //......
​
      if (server.hasBrokerMessagePlugins()) {
         server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result));
      }
​
      return result;
   }
​
   private boolean checkDuplicateID(final Message message,
                                    final RoutingContext context,
                                    boolean rejectDuplicates,
                                    AtomicBoolean startedTX) throws Exception {
      // Check the DuplicateCache for the Bridge first
​
      Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
      if (bridgeDup != null) {
         // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
         byte[] bridgeDupBytes = (byte[]) bridgeDup;
​
         DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
​
         if (context.getTransaction() == null) {
            context.setTransaction(new TransactionImpl(storageManager));
            startedTX.set(true);
         }
​
         if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {
            context.getTransaction().rollback();
            startedTX.set(false);
            message.decrementRefCount();
            return false;
         }
      } else {
         // if used BridgeDuplicate, it's not going to use the regular duplicate
         // since this will would break redistribution (re-setting the duplicateId)
         byte[] duplicateIDBytes = message.getDuplicateIDBytes();
​
         DuplicateIDCache cache = null;
​
         boolean isDuplicate = false;
​
         if (duplicateIDBytes != null) {
            cache = getDuplicateIDCache(context.getAddress(message));
​
            isDuplicate = cache.contains(duplicateIDBytes);
​
            if (rejectDuplicates && isDuplicate) {
               ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message);
​
               String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString();
​
               if (context.getTransaction() != null) {
                  context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage));
               }
​
               message.decrementRefCount();
​
               return false;
            }
         }
​
         if (cache != null && !isDuplicate) {
            if (context.getTransaction() == null) {
               // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
               context.setTransaction(new TransactionImpl(storageManager));
​
               startedTX.set(true);
            }
​
            cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get());
         }
      }
​
      return true;
   }
​
   public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {
      DuplicateIDCache cache = duplicateIDCaches.get(address);
​
      if (cache == null) {
         cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache);
​
         DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
​
         if (oldCache != null) {
            cache = oldCache;
         }
      }
​
      return cache;
   }
​
   //......
}
  • PostOfficeImpl的route方法在context.isDuplicateDetection()为true时,会调用checkDuplicateID方法,在其返回false时会直接返回RoutingStatus.DUPLICATED_ID;checkDuplicateID方法在bridgeDup为null时会通过message.getDuplicateIDBytes()获取duplicateIDBytes,若不为null则通过getDuplicateIDCache方法从duplicateIDCaches获取DuplicateIDCache,然后判断是否包含该duplicateIDBytes,若为true且rejectDuplicates为true则返回false;而对于cache不为null,且isDuplicate为false的则通过cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get())方法将该duplicateIDBytes添加到cache

handleDuplicateIds

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java

代码语言:javascript
复制
public class PostOfficeJournalLoader implements JournalLoader {
​
   //......
​
   public void handleDuplicateIds(Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
      for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet()) {
         SimpleString address = entry.getKey();
​
         DuplicateIDCache cache = postOffice.getDuplicateIDCache(address);
​
         if (configuration.isPersistIDCache()) {
            cache.load(entry.getValue());
         }
      }
   }
​
   //......
}
  • PostOfficeJournalLoader的handleDuplicateIds方法在configuration.isPersistIDCache()为true时会执行cache.load(entry.getValue())

DuplicateIDCache

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java

代码语言:javascript
复制
public interface DuplicateIDCache {
​
   boolean contains(byte[] duplicateID);
​
   boolean atomicVerify(byte[] duplID, Transaction tx) throws Exception;
​
   void addToCache(byte[] duplicateID) throws Exception;
​
   void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
​
   /**
    * it will add the data to the cache.
    * If TX == null it won't use a transaction.
    * if instantAdd=true, it won't wait a transaction to add on the cache which is needed on the case of the Bridges
    */
   void addToCache(byte[] duplicateID, Transaction tx, boolean instantAdd) throws Exception;
​
   void deleteFromCache(byte[] duplicateID) throws Exception;
​
   void load(List<Pair<byte[], Long>> theIds) throws Exception;
​
   void load(Transaction tx, byte[] duplID);
​
   void clear() throws Exception;
​
   List<Pair<byte[], Long>> getMap();
}
  • DuplicateIDCache定义了contains、addToCache、load等方法

DuplicateIDCacheImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java

代码语言:javascript
复制
public class DuplicateIDCacheImpl implements DuplicateIDCache {
​
   private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class);
​
   // ByteHolder, position
   private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>();
​
   private final SimpleString address;
​
   // Note - deliberately typed as ArrayList since we want to ensure fast indexed
   // based array access
   private final ArrayList<Pair<ByteArrayHolder, Long>> ids;
​
   private int pos;
​
   private final int cacheSize;
​
   private final StorageManager storageManager;
​
   private final boolean persist;
​
   public DuplicateIDCacheImpl(final SimpleString address,
                               final int size,
                               final StorageManager storageManager,
                               final boolean persist) {
      this.address = address;
​
      cacheSize = size;
​
      ids = new ArrayList<>(size);
​
      this.storageManager = storageManager;
​
      this.persist = persist;
   }
​
   @Override
   public void load(final List<Pair<byte[], Long>> theIds) throws Exception {
      long txID = -1;
​
      // If we have more IDs than cache size, we shrink the first ones
      int deleteCount = theIds.size() - cacheSize;
      if (deleteCount < 0) {
         deleteCount = 0;
      }
​
      for (Pair<byte[], Long> id : theIds) {
         if (deleteCount > 0) {
            if (txID == -1) {
               txID = storageManager.generateID();
            }
            if (logger.isTraceEnabled()) {
               logger.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB()));
            }
​
            storageManager.deleteDuplicateIDTransactional(txID, id.getB());
            deleteCount--;
         } else {
            ByteArrayHolder bah = new ByteArrayHolder(id.getA());
​
            Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB());
​
            cache.put(bah, ids.size());
​
            ids.add(pair);
            if (logger.isTraceEnabled()) {
               logger.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB()));
            }
         }
​
      }
​
      if (txID != -1) {
         storageManager.commit(txID);
      }
​
      pos = ids.size();
​
      if (pos == cacheSize) {
         pos = 0;
      }
​
   }
​
   public boolean contains(final byte[] duplID) {
      boolean contains = cache.get(new ByteArrayHolder(duplID)) != null;
​
      if (contains) {
         logger.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0));
      }
      return contains;
   }
​
   public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
      long recordID = -1;
​
      if (tx == null) {
         if (persist) {
            recordID = storageManager.generateID();
            storageManager.storeDuplicateID(address, duplID, recordID);
         }
​
         addToCacheInMemory(duplID, recordID);
      } else {
         if (persist) {
            recordID = storageManager.generateID();
            storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
​
            tx.setContainsPersistent();
         }
​
         if (logger.isTraceEnabled()) {
            logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);
         }
​
​
         if (instantAdd) {
            tx.addOperation(new AddDuplicateIDOperation(duplID, recordID, false));
         } else {
            // For a tx, it's important that the entry is not added to the cache until commit
            // since if the client fails then resends them tx we don't want it to get rejected
            tx.afterStore(new AddDuplicateIDOperation(duplID, recordID, true));
         }
      }
   }
​
   private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) {
      if (logger.isTraceEnabled()) {
         logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID));
      }
​
      ByteArrayHolder holder = new ByteArrayHolder(duplID);
​
      cache.put(holder, pos);
​
      Pair<ByteArrayHolder, Long> id;
​
      if (pos < ids.size()) {
         // Need fast array style access here -hence ArrayList typing
         id = ids.get(pos);
​
         // The id here might be null if it was explicit deleted
         if (id.getA() != null) {
            if (logger.isTraceEnabled()) {
               logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB()));
            }
​
            cache.remove(id.getA());
​
            // Record already exists - we delete the old one and add the new one
            // Note we can't use update since journal update doesn't let older records get
            // reclaimed
​
            if (id.getB() != null) {
               try {
                  storageManager.deleteDuplicateID(id.getB());
               } catch (Exception e) {
                  ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);
               }
            }
         }
​
         id.setA(holder);
​
         // The recordID could be negative if the duplicateCache is configured to not persist,
         // -1 would mean null on this case
         id.setB(recordID >= 0 ? recordID : null);
​
         if (logger.isTraceEnabled()) {
            logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));
         }
​
         holder.pos = pos;
      } else {
         id = new Pair<>(holder, recordID >= 0 ? recordID : null);
​
         if (logger.isTraceEnabled()) {
            logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));
         }
​
         ids.add(id);
​
         holder.pos = pos;
      }
​
      if (pos++ == cacheSize - 1) {
         pos = 0;
      }
   }
​
   //......
}
  • DuplicateIDCacheImpl实现了DuplicateIDCache接口,其load方法会将数据加载到cache中,其key为ByteArrayHolder类型;其contains方法则根据duplID的byte数组创建ByteArrayHolder,然后从cache中查找是否存在;addToCache方法在tx为null时,则执行addToCacheInMemory,否则往tx添加AddDuplicateIDOperation或者在afterStore时执行AddDuplicateIDOperation;addToCacheInMemory主要是往cache添加记录,同时将cache的大小维护在指定的cacheSize

AddDuplicateIDOperation

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java

代码语言:javascript
复制
   private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
​
      final byte[] duplID;
​
      final long recordID;
​
      volatile boolean done;
​
      private final boolean afterCommit;
​
      AddDuplicateIDOperation(final byte[] duplID, final long recordID, boolean afterCommit) {
         this.duplID = duplID;
         this.recordID = recordID;
         this.afterCommit = afterCommit;
      }
​
      private void process() {
         if (!done) {
            addToCacheInMemory(duplID, recordID);
​
            done = true;
         }
      }
​
      @Override
      public void afterCommit(final Transaction tx) {
         if (afterCommit) {
            process();
         }
      }
​
      @Override
      public void beforeCommit(Transaction tx) throws Exception {
         if (!afterCommit) {
            process();
         }
      }
​
      @Override
      public List<MessageReference> getRelatedMessageReferences() {
         return null;
      }
   }
  • AddDuplicateIDOperation继承了TransactionOperationAbstract;其afterCommit、beforeCommit都会执行process方法,而process方法则是调用addToCacheInMemory(duplID, recordID)

小结

  • CoreMessage实现了ICoreMessage接口,而ICoreMessage接口继承了Message接口;它的getDuplicateProperty方法会取Message.HDR_DUPLICATE_DETECTION_ID属性的值
  • PostOfficeImpl的route方法在context.isDuplicateDetection()为true时,会调用checkDuplicateID方法,在其返回false时会直接返回RoutingStatus.DUPLICATED_ID;checkDuplicateID方法在bridgeDup为null时会通过message.getDuplicateIDBytes()获取duplicateIDBytes,若不为null则通过getDuplicateIDCache方法从duplicateIDCaches获取DuplicateIDCache,然后判断是否包含该duplicateIDBytes,若为true且rejectDuplicates为true则返回false;而对于cache不为null,且isDuplicate为false的则通过cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get())方法将该duplicateIDBytes添加到cache
  • PostOfficeJournalLoader的handleDuplicateIds方法在configuration.isPersistIDCache()为true时会执行cache.load(entry.getValue())

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Message
  • CoreMessage
  • checkDuplicateID
  • handleDuplicateIds
  • DuplicateIDCache
  • DuplicateIDCacheImpl
  • AddDuplicateIDOperation
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档