首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的ExpiryScanner

聊聊artemis的ExpiryScanner

作者头像
code4it
发布2020-02-24 09:37:11
3960
发布2020-02-24 09:37:11
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的ExpiryScanner

startExpiryScanner

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {

   //......

   private ExpiryReaper expiryReaperRunnable;

   //......

   public synchronized void startExpiryScanner() {
      if (expiryReaperPeriod > 0) {
         if (expiryReaperRunnable != null)
            expiryReaperRunnable.stop();
         expiryReaperRunnable = new ExpiryReaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), expiryReaperPeriod, TimeUnit.MILLISECONDS, false);

         expiryReaperRunnable.start();
      }
   }

   public synchronized void stop() throws Exception {
      started = false;

      managementService.removeNotificationListener(this);

      if (expiryReaperRunnable != null)
         expiryReaperRunnable.stop();

      if (addressQueueReaperRunnable != null)
         addressQueueReaperRunnable.stop();

      addressManager.clear();

      queueInfos.clear();
   }

   //......
}
  • PostOfficeImpl的startExpiryScanner方法会执行expiryReaperRunnable.start();其stop方法会执行expiryReaperRunnable.stop()

ExpiryReaper

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

   private final class ExpiryReaper extends ActiveMQScheduledComponent {

      ExpiryReaper(ScheduledExecutorService scheduledExecutorService,
                   Executor executor,
                   long checkPeriod,
                   TimeUnit timeUnit,
                   boolean onDemand) {
         super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
      }

      @Override
      public void run() {
         // The reaper thread should be finished case the PostOffice is gone
         // This is to avoid leaks on PostOffice between stops and starts
         for (Queue queue : getLocalQueues()) {
            try {
               queue.expireReferences();
            } catch (Exception e) {
               ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
            }
         }
      }
   }
  • ExpiryReaper继承了ActiveMQScheduledComponent,其run方法会遍历localQueue挨个执行queue.expireReferences()

expireReferences

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   private final ExpiryScanner expiryScanner = new ExpiryScanner();

   //......

   public void expireReferences() {
      if (isExpirationRedundant()) {
         return;
      }

      if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) {
         expiryScanner.scannerRunning.incrementAndGet();
         getExecutor().execute(expiryScanner);
      }
   }

   //......
}
  • QueueImpl的expireReferences方法会往executor提交执行expiryScanner

ExpiryScanner

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

   class ExpiryScanner implements Runnable {

      public AtomicInteger scannerRunning = new AtomicInteger(0);

      @Override
      public void run() {

         boolean expired = false;
         boolean hasElements = false;
         int elementsExpired = 0;

         LinkedList<MessageReference> expiredMessages = new LinkedList<>();
         synchronized (QueueImpl.this) {
            if (queueDestroyed) {
               return;
            }

            if (logger.isDebugEnabled()) {
               logger.debug("Scanning for expires on " + QueueImpl.this.getName());
            }

            LinkedListIterator<MessageReference> iter = iterator();

            try {
               while (postOffice.isStarted() && iter.hasNext()) {
                  hasElements = true;
                  MessageReference ref = iter.next();
                  if (ref.getMessage().isExpired()) {
                     incDelivering(ref);
                     expired = true;
                     expiredMessages.add(ref);
                     iter.remove();

                     if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
                        logger.debug("Breaking loop of expiring");
                        scannerRunning.incrementAndGet();
                        getExecutor().execute(this);
                        break;
                     }
                  }
               }
            } finally {
               try {
                  iter.close();
               } catch (Throwable ignored) {
               }
               scannerRunning.decrementAndGet();
               logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");

            }
         }

         if (!expiredMessages.isEmpty()) {
            Transaction tx = new TransactionImpl(storageManager);
            for (MessageReference ref : expiredMessages) {
               if (tx == null) {
                  tx = new TransactionImpl(storageManager);
               }
               try {
                  expire(tx, ref);
                  refRemoved(ref);
               } catch (Exception e) {
                  ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
               }
            }

            try {
               tx.commit();
            } catch (Exception e) {
               ActiveMQServerLogger.LOGGER.unableToCommitTransaction(e);
            }
            logger.debug("Expired " + elementsExpired + " references");


         }

         // If empty we need to schedule depaging to make sure we would depage expired messages as well
         if ((!hasElements || expired) && pageIterator != null && pageIterator.tryNext() > 0) {
            scheduleDepage(true);
         }
      }
   }
  • ExpiryScanner实现了Runnable接口,其run方法会遍历MessageReference的iterator,挨个判断message是否expired,若为true则执行incDelivering并且添加到expiredMessages以及从iterator中移除,之后递增expiredMessages并且判断是否大于等于MAX_DELIVERIES_IN_LOOP(1000),若为true则提交到executor执行并跳出循环;之后遍历expiredMessages,挨个执行expire(tx, ref)以及refRemoved(ref),最后执行tx.commit()

isExpired

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java

public interface Message {

   //......

   default boolean isExpired() {
      if (getExpiration() == 0) {
         return false;
      }

      return System.currentTimeMillis() - getExpiration() >= 0;
   }

   //......

}      
  • Message的isExpired方法判断expiration是否为0,若为0则返回false,否则判断当前时间与expiration的差值是否大于等于0,若为true则返回true

expire

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   private void expire(final Transaction tx, final MessageReference ref) throws Exception {
      SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();

      if (expiryAddress != null) {
         Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);

         if (bindingList == null || bindingList.getBindings().isEmpty()) {
            ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
            acknowledge(tx, ref, AckReason.EXPIRED, null);
         } else {
            move(expiryAddress, tx, ref, true, true);
         }
      } else {
         if (!printErrorExpiring) {
            printErrorExpiring = true;
            // print this only once
            ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
         }

         acknowledge(tx, ref, AckReason.EXPIRED, null);
      }

      if (server != null && server.hasBrokerMessagePlugins()) {
         ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
         if (expiryLogger == null) {
            expiryLogger = new ExpiryLogger();
            tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
            tx.addOperation(expiryLogger);
         }

         expiryLogger.addExpiry(address, ref);
      }

   }

   //......
}
  • expire方法先获取expiryAddress,之后通过postOffice.lookupBindingsForAddress(expiryAddress)获取bindingList,若不为null则执行move操作

move

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   private void move(final SimpleString toAddress,
                     final Transaction tx,
                     final MessageReference ref,
                     final boolean expiry,
                     final boolean rejectDuplicate,
                     final long... queueIDs) throws Exception {
      Message copyMessage = makeCopy(ref, expiry);

      copyMessage.setAddress(toAddress);

      if (queueIDs != null && queueIDs.length > 0) {
         ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
         for (long id : queueIDs) {
            buffer.putLong(id);
         }
         copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
      }

      postOffice.route(copyMessage, tx, false, rejectDuplicate);

      if (expiry) {
         acknowledge(tx, ref, AckReason.EXPIRED, null);
      } else {
         acknowledge(tx, ref);
      }

   }

   //......
}
  • move方法先使用makeCopy进行拷贝得到copyMessage,然后通过postOffice.route(copyMessage, tx, false, rejectDuplicate)进行重新路由,最后执行acknowledge方法,若expiry为true则reason为AckReason.EXPIRED

小结

  • ExpiryScanner实现了Runnable接口,其run方法会遍历MessageReference的iterator,挨个判断message是否expired,若为true则执行incDelivering并且添加到expiredMessages以及从iterator中移除;之后遍历expiredMessages,挨个执行expire(tx, ref)以及refRemoved(ref),最后执行tx.commit()
  • expire方法expire方法先获取expiryAddress,之后通过postOffice.lookupBindingsForAddress(expiryAddress)获取bindingList,若不为null则执行move操作
  • move方法先使用makeCopy进行拷贝得到copyMessage,然后通过postOffice.route(copyMessage, tx, false, rejectDuplicate)进行重新路由,最后执行acknowledge方法,若expiry为true则reason为AckReason.EXPIRED

doc

  • QueueImpl
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-01-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • startExpiryScanner
  • ExpiryReaper
  • expireReferences
  • ExpiryScanner
  • isExpired
  • expire
  • move
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档