前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的individualAcknowledge

聊聊artemis的individualAcknowledge

原创
作者头像
code4it
修改2020-02-10 17:35:52
6190
修改2020-02-10 17:35:52
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的individualAcknowledge

acknowledge

activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java

代码语言:javascript
复制
public class ActiveMQMessage implements javax.jms.Message {
​
   //......
​
   public void acknowledge() throws JMSException {
      if (session != null) {
         try {
            if (session.isClosed()) {
               throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();
            }
            if (individualAck) {
               message.individualAcknowledge();
            }
            if (clientAck || individualAck) {
               session.commit(session.isBlockOnAcknowledge());
            }
         } catch (ActiveMQException e) {
            throw JMSExceptionHelper.convertFromActiveMQException(e);
         }
      }
   }
​
   //......
}
  • ActiveMQMessage的acknowledge方法对于individualAck为true的会单独执行message.individualAcknowledge()

message.individualAcknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java

代码语言:javascript
复制
public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {
​
   //......
​
   public ClientMessageImpl individualAcknowledge() throws ActiveMQException {
      if (consumer != null) {
         consumer.individualAcknowledge(this);
      }
​
      return this;
   }
​
   //......
}
  • ClientMessageImpl的individualAcknowledge方法会执行consumer.individualAcknowledge(this)

consumer.individualAcknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

代码语言:javascript
复制
public final class ClientConsumerImpl implements ClientConsumerInternal {
​
   //......
​
   public void individualAcknowledge(ClientMessage message) throws ActiveMQException {
      if (lastAckedMessage != null) {
         flushAcks();
      }
​
      session.individualAcknowledge(this, message);
   }
​
   public void flushAcks() throws ActiveMQException {
      if (lastAckedMessage != null) {
         if (logger.isTraceEnabled()) {
            logger.trace(this + "::FlushACK acking lastMessage::" + lastAckedMessage);
         }
         doAck(lastAckedMessage);
      }
   }
​
   private void doAck(final ClientMessageInternal message) throws ActiveMQException {
      ackBytes = 0;
​
      lastAckedMessage = null;
​
      if (logger.isTraceEnabled()) {
         logger.trace(this + "::Acking message " + message);
      }
​
      session.acknowledge(this, message);
   }
​
   //......
}
  • ClientConsumerImpl的individualAcknowledge,对于lastAckedMessage不为null的先执行flushAcks,最后执行session.individualAcknowledge

session.individualAcknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

代码语言:javascript
复制
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {
​
   //......
​
   public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException {
      // if we're pre-acknowledging then we don't need to do anything
      if (preAcknowledge) {
         return;
      }
​
      checkClosed();
​
      startCall();
      try {
​
         sessionContext.sendACK(true, blockOnAcknowledge, consumer, message);
      } finally {
         endCall();
      }
   }
​
   //......
}
  • ClientSessionImpl的individualAcknowledge方法通过sessionContext.sendACK来发送ack

sessionContext.sendACK

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

代码语言:javascript
复制
public class ActiveMQSessionContext extends SessionContext {
​
   //......
​
   public void sendACK(boolean individual,
                       boolean block,
                       final ClientConsumer consumer,
                       final Message message) throws ActiveMQException {
      PacketImpl messagePacket;
      if (individual) {
         messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
      } else {
         messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
      }
​
      if (block) {
         sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE);
      } else {
         sessionChannel.sendBatched(messagePacket);
      }
   }
​
   //......
}   
  • ActiveMQSessionContext的sendACK方法对于individual为true的创建的是SessionIndividualAcknowledgeMessage,最后通过sessionChannel.sendBlocking或者sessionChannel.sendBatched方法发送消息

ServerConsumerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

代码语言:javascript
复制
public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
​
   //......
​
   public synchronized void acknowledge(Transaction tx, final long messageID) throws Exception {
      if (browseOnly) {
         return;
      }
​
      // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
      // acknowledged
​
      // We use a transaction here as if the message is not found, we should rollback anything done
      // This could eventually happen on retries during transactions, and we need to make sure we don't ACK things we are not supposed to acknowledge
​
      boolean startedTransaction = false;
​
      if (tx == null) {
         startedTransaction = true;
         tx = new TransactionImpl(storageManager);
      }
​
      try {
​
         MessageReference ref;
         do {
            synchronized (lock) {
               ref = deliveringRefs.poll();
            }
​
            if (logger.isTraceEnabled()) {
               logger.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);
            }
​
            if (ref == null) {
               ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
               tx.markAsRollbackOnly(ils);
               throw ils;
            }
​
            ref.acknowledge(tx, this);
​
            acks++;
         }
         while (ref.getMessageID() != messageID);
​
         if (startedTransaction) {
            tx.commit();
         }
      } catch (ActiveMQException e) {
         if (startedTransaction) {
            tx.rollback();
         } else {
            tx.markAsRollbackOnly(e);
         }
         throw e;
      } catch (Throwable e) {
         ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);
         ActiveMQException activeMQIllegalStateException = new ActiveMQIllegalStateException(e.getMessage());
         if (startedTransaction) {
            tx.rollback();
         } else {
            tx.markAsRollbackOnly(activeMQIllegalStateException);
         }
         throw activeMQIllegalStateException;
      }
   }
​
   public synchronized void individualAcknowledge(Transaction tx, final long messageID) throws Exception {
      if (browseOnly) {
         return;
      }
​
      boolean startedTransaction = false;
​
      if (logger.isTraceEnabled()) {
         logger.trace("individualACK messageID=" + messageID);
      }
​
      if (tx == null) {
         if (logger.isTraceEnabled()) {
            logger.trace("individualACK starting new TX");
         }
         startedTransaction = true;
         tx = new TransactionImpl(storageManager);
      }
​
      try {
​
         MessageReference ref;
         ref = removeReferenceByID(messageID);
​
         if (logger.isTraceEnabled()) {
            logger.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);
         }
​
         if (ref == null) {
            ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);
            tx.markAsRollbackOnly(ils);
            throw ils;
         }
​
         ref.acknowledge(tx, this);
​
         acks++;
​
         if (startedTransaction) {
            tx.commit();
         }
      } catch (ActiveMQException e) {
         if (startedTransaction) {
            tx.rollback();
         } else if (tx != null) {
            tx.markAsRollbackOnly(e);
         }
         throw e;
      } catch (Throwable e) {
         ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);
         ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());
         if (startedTransaction) {
            tx.rollback();
         } else if (tx != null) {
            tx.markAsRollbackOnly(hqex);
         }
         throw hqex;
      }
​
   }
​
   //......
}   
  • ServerConsumerImpl的individualAcknowledge方法先根据messageID将该消息从deliveringRefs中移除,然后执行ref.acknowledge(tx, this);而普通的acknowledge方法则是不断循环执行deliveringRefs.poll(),然后执行ref.acknowledge(tx, this),直到取到指定messageID的ref才跳出循环

小结

ActiveMQMessage的acknowledge方法对于individualAck为true的会单独执行message.individualAcknowledge();ClientMessageImpl的individualAcknowledge方法会执行consumer.individualAcknowledge(this);ClientConsumerImpl的individualAcknowledge,对于lastAckedMessage不为null的先执行flushAcks,最后执行session.individualAcknowledge;ClientSessionImpl的individualAcknowledge方法通过sessionContext.sendACK来发送ack;ActiveMQSessionContext的sendACK方法对于individual为true的创建的是SessionIndividualAcknowledgeMessage,最后通过sessionChannel.sendBlocking或者sessionChannel.sendBatched方法发送消息

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • acknowledge
  • message.individualAcknowledge
  • consumer.individualAcknowledge
  • session.individualAcknowledge
  • sessionContext.sendACK
  • ServerConsumerImpl
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档