前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的DelayedAddRedistributor

聊聊artemis的DelayedAddRedistributor

原创
作者头像
code4it
修改2020-02-21 10:09:04
3730
修改2020-02-21 10:09:04
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的DelayedAddRedistributor

addRedistributor

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

代码语言:javascript
复制
public class QueueImpl extends CriticalComponentImpl implements Queue {
​
   //......
​
   public synchronized void addRedistributor(final long delay) {
      clearRedistributorFuture();
​
      if (redistributor != null) {
         // Just prompt delivery
         deliverAsync();
      }
​
      if (delay > 0) {
         if (consumers.isEmpty()) {
            DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
​
            redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
         }
      } else {
         internalAddRedistributor(executor);
      }
   }
​
   //......
}
  • QueueImpl的addRedistributor在delay大于0的时候会创建并调度DelayedAddRedistributor

DelayedAddRedistributor

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

代码语言:javascript
复制
public class QueueImpl extends CriticalComponentImpl implements Queue {
​
   //......
​
   private class DelayedAddRedistributor implements Runnable {
​
      private final ArtemisExecutor executor1;
​
      DelayedAddRedistributor(final ArtemisExecutor executor) {
         this.executor1 = executor;
      }
​
      @Override
      public void run() {
         synchronized (QueueImpl.this) {
            internalAddRedistributor(executor1);
​
            clearRedistributorFuture();
         }
      }
   }
​
   private void internalAddRedistributor(final ArtemisExecutor executor) {
      // create the redistributor only once if there are no local consumers
      if (consumers.isEmpty() && redistributor == null) {
         if (logger.isTraceEnabled()) {
            logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());
         }
​
         redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));
​
         redistributor.consumer.start();
​
         deliverAsync();
      }
   }
​
   private void clearRedistributorFuture() {
      ScheduledFuture<?> future = redistributorFuture;
      redistributorFuture = null;
      if (future != null) {
         future.cancel(false);
      }
   }
​
   public void deliverAsync() {
      deliverAsync(false);
   }
​
   private void deliverAsync(boolean noWait) {
      if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
         scheduledRunners.incrementAndGet();
         checkDepage(noWait);
         try {
            getExecutor().execute(deliverRunner);
         } catch (RejectedExecutionException ignored) {
            // no-op
            scheduledRunners.decrementAndGet();
         }
      }
   }
​
   //......
}
  • DelayedAddRedistributor实现了Runnable方法,其run方先执行internalAddRedistributor,后执行clearRedistributorFuture;internalAddRedistributor会创建Redistributor以及ConsumerHolder,然后执行redistributor.consumer.start(),最后执行deliverAsync方法调度执行DeliverRunner

ConsumerHolder

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

代码语言:javascript
复制
   protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {
​
      ConsumerHolder(final T consumer) {
         this.consumer = consumer;
      }
​
      final T consumer;
​
      LinkedListIterator<MessageReference> iter;
​
      private void resetIterator() {
         if (iter != null) {
            iter.close();
         }
         iter = null;
      }
​
      private Consumer consumer() {
         return consumer;
      }
​
      @Override
      public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         ConsumerHolder<?> that = (ConsumerHolder<?>) o;
         return Objects.equals(consumer, that.consumer);
      }
​
      @Override
      public int hashCode() {
         return Objects.hash(consumer);
      }
​
      @Override
      public int getPriority() {
         return consumer.getPriority();
      }
   }
  • ConsumerHolder实现了PriorityAware接口,其getPriority方法返回的是consumer.getPriority()

Redistributor

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java

代码语言:javascript
复制
public class Redistributor implements Consumer {
​
   private boolean active;
​
   private final StorageManager storageManager;
​
   private final PostOffice postOffice;
​
   private final Executor executor;
​
   private final int batchSize;
​
   private final Queue queue;
​
   private int count;
​
   private final long sequentialID;
​
   // a Flush executor here is happening inside another executor.
   // what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all.
   // So, instead of using a future we will use a plain ReusableLatch here
   private ReusableLatch pendingRuns = new ReusableLatch();
​
   public Redistributor(final Queue queue,
                        final StorageManager storageManager,
                        final PostOffice postOffice,
                        final Executor executor,
                        final int batchSize) {
      this.queue = queue;
​
      this.sequentialID = storageManager.generateID();
​
      this.storageManager = storageManager;
​
      this.postOffice = postOffice;
​
      this.executor = executor;
​
      this.batchSize = batchSize;
   }
​
   @Override
   public long sequentialID() {
      return sequentialID;
   }
​
   @Override
   public Filter getFilter() {
      return null;
   }
​
   @Override
   public String debug() {
      return toString();
   }
​
   @Override
   public String toManagementString() {
      return "Redistributor[" + queue.getName() + "/" + queue.getID() + "]";
   }
​
   @Override
   public void disconnect() {
      //noop
   }
​
   public synchronized void start() {
      active = true;
   }
​
   @Override
   public synchronized HandleStatus handle(final MessageReference reference) throws Exception {
      if (!active) {
         return HandleStatus.BUSY;
      } else if (reference.getMessage().getGroupID() != null) {
         //we shouldn't redistribute with message groups return NO_MATCH so other messages can be delivered
         return HandleStatus.NO_MATCH;
      }
​
      final Transaction tx = new TransactionImpl(storageManager);
​
      final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
​
      if (routingInfo == null) {
         tx.rollback();
         return HandleStatus.BUSY;
      }
​
      if (!reference.getMessage().isLargeMessage()) {
​
         postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
​
         ackRedistribution(reference, tx);
      } else {
         active = false;
         executor.execute(new Runnable() {
            @Override
            public void run() {
               try {
​
                  postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
​
                  ackRedistribution(reference, tx);
​
                  synchronized (Redistributor.this) {
                     active = true;
​
                     count++;
​
                     queue.deliverAsync();
                  }
               } catch (Exception e) {
                  try {
                     tx.rollback();
                  } catch (Exception e2) {
                     // Nothing much we can do now
                     ActiveMQServerLogger.LOGGER.failedToRollback(e2);
                  }
               }
            }
         });
      }
​
      return HandleStatus.HANDLED;
   }
​
   //......
}
  • Redistributor实现了Consumer接口,其start方法标记active为true;其handle方法在active为false时返回HandleStatus.BUSY;之后执行postOffice.redistribute(reference.getMessage(), queue, tx)获取routingInfo,然后对于非largeMessage的执行postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false)以及ackRedistribution(reference, tx),最后返回HandleStatus.HANDLED

DeliverRunner

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

代码语言:javascript
复制
public class QueueImpl extends CriticalComponentImpl implements Queue {
​
   //......
​
   private final class DeliverRunner implements Runnable {
​
      @Override
      public void run() {
         try {
            // during the transition between paging and nonpaging, we could have this using a different executor
            // and at this short period we could have more than one delivery thread running in async mode
            // this will avoid that possibility
            // We will be using the deliverRunner instance as the guard object to avoid multiple threads executing
            // an asynchronous delivery
            enterCritical(CRITICAL_DELIVER);
            boolean needCheckDepage = false;
            try {
               deliverLock.lock();
               try {
                  needCheckDepage = deliver();
               } finally {
                  deliverLock.unlock();
               }
            } finally {
               leaveCritical(CRITICAL_DELIVER);
            }
​
            if (needCheckDepage) {
               enterCritical(CRITICAL_CHECK_DEPAGE);
               try {
                  checkDepage(true);
               } finally {
                  leaveCritical(CRITICAL_CHECK_DEPAGE);
               }
            }
​
         } catch (Exception e) {
            ActiveMQServerLogger.LOGGER.errorDelivering(e);
         }
      }
   }
​
   private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) {
      HandleStatus status;
      try {
         status = consumer.handle(reference);
      } catch (Throwable t) {
         ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
​
         // If the consumer throws an exception we remove the consumer
         try {
            removeConsumer(consumer);
         } catch (Exception e) {
            ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
         }
         return HandleStatus.BUSY;
      }
​
      if (status == null) {
         throw new IllegalStateException("ClientConsumer.handle() should never return null");
      }
​
      return status;
   }
​
   //......
}
  • DeliverRunner实现了Runnable接口,其run方法会执行deliver方法,该方法会执行handle方法,后者会执行consumer.handle(reference);而在redistributor不为null时,其consumer为redistributor.consumer

小结

QueueImpl的addRedistributor在delay大于0的时候会创建并调度DelayedAddRedistributor;DelayedAddRedistributor实现了Runnable方法,其run方先执行internalAddRedistributor,后执行clearRedistributorFuture;internalAddRedistributor会创建Redistributor以及ConsumerHolder,然后执行redistributor.consumer.start(),最后执行deliverAsync方法调度执行DeliverRunner;DeliverRunner实现了Runnable接口,其run方法会执行deliver方法,该方法会执行handle方法,后者会执行consumer.handle(reference);而在redistributor不为null时,其consumer为redistributor.consumer;redistributor.consumer的handle方法对于非largeMessage的执行postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false)以及ackRedistribution(reference, tx),最后返回HandleStatus.HANDLED

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • addRedistributor
  • DelayedAddRedistributor
  • ConsumerHolder
  • Redistributor
  • DeliverRunner
  • 小结
  • doc
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档