前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis ClientConsumer的handleRegularMessage

聊聊artemis ClientConsumer的handleRegularMessage

原创
作者头像
code4it
修改2020-01-19 10:23:06
2680
修改2020-01-19 10:23:06
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis ClientConsumer的handleRegularMessage

handleRegularMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

代码语言:javascript
复制
public final class ClientConsumerImpl implements ClientConsumerInternal {
​
   //......
​
   private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);
​
   private final Runner runner = new Runner();
​
   private volatile MessageHandler handler;
​
   //......
​
   private void handleRegularMessage(ClientMessageInternal message) {
      if (message.getAddress() == null) {
         message.setAddress(queueInfo.getAddress());
      }
​
      message.onReceipt(this);
​
      if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
         // We have messages of different priorities so we need to ack them individually since the order
         // of them in the ServerConsumerImpl delivery list might not be the same as the order they are
         // consumed in, which means that acking all up to won't work
         ackIndividually = true;
      }
​
      // Add it to the buffer
      buffer.addTail(message, message.getPriority());
​
      if (handler != null) {
         // Execute using executor
         if (!stopped) {
            queueExecutor();
         }
      } else {
         notify();
      }
   }
​
   private void queueExecutor() {
      if (logger.isTraceEnabled()) {
         logger.trace(this + "::Adding Runner on Executor for delivery");
      }
​
      sessionExecutor.execute(runner);
   }
​
   //......
}
  • ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner

Runner

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

代码语言:javascript
复制
public final class ClientConsumerImpl implements ClientConsumerInternal {
​
   //......
​
   private class Runner implements Runnable {
​
      @Override
      public void run() {
         try {
            callOnMessage();
         } catch (Exception e) {
            ActiveMQClientLogger.LOGGER.onMessageError(e);
​
            lastException = e;
         }
      }
   }
​
   private void callOnMessage() throws Exception {
      if (closing || stopped) {
         return;
      }
​
      session.workDone();
​
      // We pull the message from the buffer from inside the Runnable so we can ensure priority
      // ordering. If we just added a Runnable with the message to the executor immediately as we get it
      // we could not do that
​
      ClientMessageInternal message;
​
      // Must store handler in local variable since might get set to null
      // otherwise while this is executing and give NPE when calling onMessage
      MessageHandler theHandler = handler;
​
      if (theHandler != null) {
         if (rateLimiter != null) {
            rateLimiter.limit();
         }
​
         failedOver = false;
​
         synchronized (this) {
            message = buffer.poll();
         }
​
         if (message != null) {
            if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
               //Ignore, this could be a relic from a previous receiveImmediate();
               return;
            }
​
            boolean expired = message.isExpired();
​
            flowControlBeforeConsumption(message);
​
            if (!expired) {
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Calling handler.onMessage");
               }
               final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
                  @Override
                  public ClassLoader run() {
                     ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
​
                     Thread.currentThread().setContextClassLoader(contextClassLoader);
​
                     return originalLoader;
                  }
               });
​
               onMessageThread = Thread.currentThread();
               try {
                  theHandler.onMessage(message);
               } finally {
                  try {
                     AccessController.doPrivileged(new PrivilegedAction<Object>() {
                        @Override
                        public Object run() {
                           Thread.currentThread().setContextClassLoader(originalLoader);
                           return null;
                        }
                     });
                  } catch (Exception e) {
                     ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);
                  }
​
                  onMessageThread = null;
               }
​
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Handler.onMessage done");
               }
​
               if (message.isLargeMessage()) {
                  message.discardBody();
               }
            } else {
               session.expire(this, message);
            }
​
            // If slow consumer, we need to send 1 credit to make sure we get another message
            if (clientWindowSize == 0) {
               startSlowConsumer();
            }
         }
      }
   }
​
   private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException {
      // Chunk messages will execute the flow control while receiving the chunks
      if (message.getFlowControlSize() != 0) {
         // on large messages we should discount 1 on the first packets as we need continuity until the last packet
         flowControl(message.getFlowControlSize(), !message.isLargeMessage());
      }
   }
​
   public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException {
      if (clientWindowSize >= 0) {
         creditsToSend += messageBytes;
​
         if (creditsToSend >= clientWindowSize) {
            if (clientWindowSize == 0 && discountSlowConsumer) {
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
               }
​
               // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
               // always buffering one after received the first message
               final int credits = creditsToSend - 1;
​
               creditsToSend = 0;
​
               if (credits > 0) {
                  sendCredits(credits);
               }
            } else {
               if (logger.isDebugEnabled()) {
                  logger.debug("Sending " + messageBytes + " from flow-control");
               }
​
               final int credits = creditsToSend;
​
               creditsToSend = 0;
​
               if (credits > 0) {
                  sendCredits(credits);
               }
            }
         }
      }
   }
​
​
   //......
}   
  • Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal,若不为null,则执行flowControlBeforeConsumption(message),对于非expired的会执行theHandler.onMessage(message)方法;对于clientWindowSize为0的则执行startSlowConsumer();flowControlBeforeConsumption方法会执行flowControl方法,该方法会计算credits,然后执行sendCredits(credits)

小结

ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner;Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal进行处理

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • handleRegularMessage
  • Runner
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档