前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis消息的推拉模式

聊聊artemis消息的推拉模式

原创
作者头像
code4it
修改2020-02-10 15:32:30
1K0
修改2020-02-10 15:32:30
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis消息的推拉模式

拉模式

receive

activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java

代码语言:javascript
复制
public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscriber {
​
   //......
​
   @Override
   public Message receive() throws JMSException {
      return getMessage(0, false);
   }
​
   @Override
   public Message receive(final long timeout) throws JMSException {
      return getMessage(timeout, false);
   }
​
   @Override
   public Message receiveNoWait() throws JMSException {
      return getMessage(0, true);
   }
​
   private ActiveMQMessage getMessage(final long timeout, final boolean noWait) throws JMSException {
      try {
         ClientMessage coreMessage;
​
         if (noWait) {
            coreMessage = consumer.receiveImmediate();
         } else {
            coreMessage = consumer.receive(timeout);
         }
​
         ActiveMQMessage jmsMsg = null;
​
         if (coreMessage != null) {
            ClientSession coreSession = session.getCoreSession();
            boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE ||
               ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE ||
               coreMessage.getType() == ActiveMQObjectMessage.TYPE;
​
            if (coreMessage.getRoutingType() == null) {
               coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST);
            }
            if (session.isEnable1xPrefixes()) {
               jmsMsg = ActiveMQCompatibleMessage.createMessage(coreMessage, needSession ? coreSession : null, options);
            } else {
               jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? coreSession : null, options);
            }
​
            try {
               jmsMsg.doBeforeReceive();
            } catch (IndexOutOfBoundsException ioob) {
               ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();
               // In case this exception happen you will need to know where it happened.
               // it has been a bug here in the past, and this was used to debug it.
               // nothing better than keep it for future investigations in case it happened again
               IndexOutOfBoundsException newIOOB = new IndexOutOfBoundsException(ioob.getMessage() + "@" + jmsMsg.getCoreMessage());
               newIOOB.initCause(ioob);
               ActiveMQClientLogger.LOGGER.unableToGetMessage(newIOOB);
               throw ioob;
            }
​
            // We Do the ack after doBeforeReceive, as in the case of large messages, this may fail so we don't want messages redelivered
            // https://issues.jboss.org/browse/JBPAPP-6110
            if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {
               jmsMsg.setIndividualAcknowledge();
            } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
               jmsMsg.setClientAcknowledge();
               coreMessage.acknowledge();
            } else {
               coreMessage.acknowledge();
            }
         }
​
         return jmsMsg;
      } catch (ActiveMQException e) {
         ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();
         throw JMSExceptionHelper.convertFromActiveMQException(e);
      } catch (ActiveMQInterruptedException e) {
         ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();
         throw JMSExceptionHelper.convertFromActiveMQException(e);
      }
   }
​
   //......
}
  • ActiveMQMessageConsumer的receive方法最后调用的是getMessage方法,它对于session.getAcknowledgeMode()为ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE执行jmsMsg.setIndividualAcknowledge(),其余的都执行coreMessage.acknowledge()

acknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java

代码语言:javascript
复制
public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {
​
   //......
​
   public ClientMessageImpl acknowledge() throws ActiveMQException {
      if (consumer != null) {
         consumer.acknowledge(this);
      }
​
      return this;
   }
​
   //......
}
  • ClientMessageImpl的acknowledge方法执行的是consumer.acknowledge(this)

推模式

handleMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

代码语言:javascript
复制
public final class ClientConsumerImpl implements ClientConsumerInternal {
​
   //......
​
   public synchronized void handleMessage(final ClientMessageInternal message) throws Exception {
      if (closing) {
         // This is ok - we just ignore the message
         return;
      }
​
      if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
         handleCompressedMessage(message);
      } else {
         handleRegularMessage(message);
      }
   }
​
   private void handleRegularMessage(ClientMessageInternal message) {
      if (message.getAddress() == null) {
         message.setAddress(queueInfo.getAddress());
      }
​
      message.onReceipt(this);
​
      if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
         // We have messages of different priorities so we need to ack them individually since the order
         // of them in the ServerConsumerImpl delivery list might not be the same as the order they are
         // consumed in, which means that acking all up to won't work
         ackIndividually = true;
      }
​
      // Add it to the buffer
      buffer.addTail(message, message.getPriority());
​
      if (handler != null) {
         // Execute using executor
         if (!stopped) {
            queueExecutor();
         }
      } else {
         notify();
      }
   }
​
   private void queueExecutor() {
      if (logger.isTraceEnabled()) {
         logger.trace(this + "::Adding Runner on Executor for delivery");
      }
​
      sessionExecutor.execute(runner);
   }
​
   //......
​
}
  • ClientConsumerImpl的handleRegularMessage会执行buffer.addTail(message, message.getPriority())以及queueExecutor(),queueExecutor通过sessionExecutor执行Runner

callOnMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

代码语言:javascript
复制
public final class ClientConsumerImpl implements ClientConsumerInternal {
​
   //......
​
   private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);
​
   //......   
​
   private class Runner implements Runnable {
​
      @Override
      public void run() {
         try {
            callOnMessage();
         } catch (Exception e) {
            ActiveMQClientLogger.LOGGER.onMessageError(e);
​
            lastException = e;
         }
      }
   }
​
   private void callOnMessage() throws Exception {
      if (closing || stopped) {
         return;
      }
​
      session.workDone();
​
      // We pull the message from the buffer from inside the Runnable so we can ensure priority
      // ordering. If we just added a Runnable with the message to the executor immediately as we get it
      // we could not do that
​
      ClientMessageInternal message;
​
      // Must store handler in local variable since might get set to null
      // otherwise while this is executing and give NPE when calling onMessage
      MessageHandler theHandler = handler;
​
      if (theHandler != null) {
         if (rateLimiter != null) {
            rateLimiter.limit();
         }
​
         failedOver = false;
​
         synchronized (this) {
            message = buffer.poll();
         }
​
         if (message != null) {
            if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
               //Ignore, this could be a relic from a previous receiveImmediate();
               return;
            }
​
            boolean expired = message.isExpired();
​
            flowControlBeforeConsumption(message);
​
            if (!expired) {
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Calling handler.onMessage");
               }
               final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
                  @Override
                  public ClassLoader run() {
                     ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
​
                     Thread.currentThread().setContextClassLoader(contextClassLoader);
​
                     return originalLoader;
                  }
               });
​
               onMessageThread = Thread.currentThread();
               try {
                  theHandler.onMessage(message);
               } finally {
                  try {
                     AccessController.doPrivileged(new PrivilegedAction<Object>() {
                        @Override
                        public Object run() {
                           Thread.currentThread().setContextClassLoader(originalLoader);
                           return null;
                        }
                     });
                  } catch (Exception e) {
                     ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);
                  }
​
                  onMessageThread = null;
               }
​
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Handler.onMessage done");
               }
​
               if (message.isLargeMessage()) {
                  message.discardBody();
               }
            } else {
               session.expire(this, message);
            }
​
            // If slow consumer, we need to send 1 credit to make sure we get another message
            if (clientWindowSize == 0) {
               startSlowConsumer();
            }
         }
      }
   }
​
   //......
}   
  • Runner的run方法会执行callOnMessage方法,它会从buffer.poll()消息,如果不为null且非expired则执行theHandler.onMessage(message)

onMessage

activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java

代码语言:javascript
复制
public class JMSMessageListenerWrapper implements MessageHandler {
​
   private final ConnectionFactoryOptions options;
   private final ActiveMQConnection connection;
​
   private final ActiveMQSession session;
​
   private final MessageListener listener;
​
   private final ClientConsumer consumer;
​
   private final boolean transactedOrClientAck;
​
   private final boolean individualACK;
​
   private final boolean clientACK;
​
   protected JMSMessageListenerWrapper(final ConnectionFactoryOptions options,
                                       final ActiveMQConnection connection,
                                       final ActiveMQSession session,
                                       final ClientConsumer consumer,
                                       final MessageListener listener,
                                       final int ackMode) {
      this.options = options;
​
      this.connection = connection;
​
      this.session = session;
​
      this.consumer = consumer;
​
      this.listener = listener;
​
      transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();
​
      individualACK = (ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
​
      clientACK = (ackMode == Session.CLIENT_ACKNOWLEDGE);
   }
​
   /**
    * In this method we apply the JMS acknowledgement and redelivery semantics
    * as per JMS spec
    */
   @Override
   public void onMessage(final ClientMessage message) {
      ActiveMQMessage msg;
​
      if (session.isEnable1xPrefixes()) {
         msg = ActiveMQCompatibleMessage.createMessage(message, session.getCoreSession(), options);
      } else {
         msg = ActiveMQMessage.createMessage(message, session.getCoreSession(), options);
      }
​
      if (individualACK) {
         msg.setIndividualAcknowledge();
      }
​
      if (clientACK) {
         msg.setClientAcknowledge();
      }
​
      try {
         msg.doBeforeReceive();
      } catch (Exception e) {
         ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e);
         return;
      }
​
      if (transactedOrClientAck) {
         try {
            message.acknowledge();
         } catch (ActiveMQException e) {
            ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();
            ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
         }
      }
​
      try {
         connection.getThreadAwareContext().setCurrentThread(false);
         listener.onMessage(msg);
      } catch (RuntimeException e) {
         // See JMS 1.1 spec, section 4.5.2
​
         ActiveMQJMSClientLogger.LOGGER.onMessageError(e);
​
         if (!transactedOrClientAck) {
            try {
               if (individualACK) {
                  message.individualAcknowledge();
               }
​
               session.getCoreSession().rollback(true);
​
               session.setRecoverCalled(true);
            } catch (Exception e2) {
               ActiveMQJMSClientLogger.LOGGER.errorRecoveringSession(e2);
            }
         }
      } finally {
         connection.getThreadAwareContext().clearCurrentThread(false);
      }
      if (!session.isRecoverCalled() && !individualACK) {
         try {
            // We don't want to call this if the consumer was closed from inside onMessage
            if (!consumer.isClosed() && !transactedOrClientAck) {
               message.acknowledge();
            }
         } catch (ActiveMQException e) {
            ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();
            ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
         }
      }
​
      session.setRecoverCalled(false);
   }
}
  • onMessage方法在transactedOrClientAck为true时会执行message.acknowledge();在触发listener.onMessage(msg)之后会在非session.isRecoverCalled()且非individualACK且非consumer.isClosed()且非transactedOrClientAck时执行message.acknowledge()

acknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java

代码语言:javascript
复制
public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {
    
    //......
​
   public ClientMessageImpl acknowledge() throws ActiveMQException {
      if (consumer != null) {
         consumer.acknowledge(this);
      }
​
      return this;
   }
​
    //......
​
}
  • acknowledge方法执行的是consumer.acknowledge(this)方法

ClientConsumerImpl.acknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

代码语言:javascript
复制
public final class ClientConsumerImpl implements ClientConsumerInternal {
​
   //......
​
   public void acknowledge(final ClientMessage message) throws ActiveMQException {
      ClientMessageInternal cmi = (ClientMessageInternal) message;
​
      if (ackIndividually) {
         individualAcknowledge(message);
      } else {
​
         ackBytes += message.getEncodeSize();
​
         if (logger.isTraceEnabled()) {
            logger.trace(this + "::acknowledge ackBytes=" + ackBytes + " and ackBatchSize=" + ackBatchSize + ", encodeSize=" + message.getEncodeSize());
         }
​
         if (ackBytes >= ackBatchSize) {
            if (logger.isTraceEnabled()) {
               logger.trace(this + ":: acknowledge acking " + cmi);
            }
            doAck(cmi);
         } else {
            if (logger.isTraceEnabled()) {
               logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);
            }
            lastAckedMessage = cmi;
         }
      }
   }
​
   private void doAck(final ClientMessageInternal message) throws ActiveMQException {
      ackBytes = 0;
​
      lastAckedMessage = null;
​
      if (logger.isTraceEnabled()) {
         logger.trace(this + "::Acking message " + message);
      }
​
      session.acknowledge(this, message);
   }
​
   //......
}
  • ClientConsumerImpl的acknowledge方法执行的是doAck方法,而doAck方法执行的是session.acknowledge(this, message)

ClientSessionImpl.acknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

代码语言:javascript
复制
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {
​
   //......
​
   public void acknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException {
      // if we're pre-acknowledging then we don't need to do anything
      if (preAcknowledge) {
         return;
      }
​
      checkClosed();
      if (logger.isDebugEnabled()) {
         logger.debug("client ack messageID = " + message.getMessageID());
      }
​
      startCall();
      try {
         sessionContext.sendACK(false, blockOnAcknowledge, consumer, message);
      } finally {
         endCall();
      }
   }
​
   //......
}
  • ClientSessionImpl的acknowledge方法通过sessionContext.sendACK来发送ack

小结

  • ActiveMQMessageConsumer的receive采用的是拉模式,它对于session.getAcknowledgeMode()为ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE执行jmsMsg.setIndividualAcknowledge(),其余的都执行coreMessage.acknowledge();ClientMessageImpl的acknowledge方法执行的是consumer.acknowledge(this)
  • ClientConsumerImpl的handleMessage采用的是推模式,它会执行buffer.addTail(message, message.getPriority())以及queueExecutor(),queueExecutor通过sessionExecutor执行Runner;Runner的run方法会执行callOnMessage方法,它会从buffer.poll()消息,如果不为null且非expired则执行theHandler.onMessage(message);最后触发的是执行的是consumer.acknowledge(this)方法
  • ClientConsumerImpl的acknowledge方法执行的是doAck方法,而doAck方法执行的是session.acknowledge(this, message);ClientSessionImpl的acknowledge方法通过sessionContext.sendACK来发送ack

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 拉模式
    • receive
      • acknowledge
      • 推模式
        • handleMessage
          • callOnMessage
            • onMessage
              • acknowledge
              • ClientConsumerImpl.acknowledge
                • ClientSessionImpl.acknowledge
                • 小结
                • doc
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档