专栏首页码匠的流水账聊聊artemis的individualAcknowledge
原创

聊聊artemis的individualAcknowledge

本文主要研究一下artemis的individualAcknowledge

acknowledge

activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java

public class ActiveMQMessage implements javax.jms.Message {
​
   //......
​
   public void acknowledge() throws JMSException {
      if (session != null) {
         try {
            if (session.isClosed()) {
               throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();
            }
            if (individualAck) {
               message.individualAcknowledge();
            }
            if (clientAck || individualAck) {
               session.commit(session.isBlockOnAcknowledge());
            }
         } catch (ActiveMQException e) {
            throw JMSExceptionHelper.convertFromActiveMQException(e);
         }
      }
   }
​
   //......
}
  • ActiveMQMessage的acknowledge方法对于individualAck为true的会单独执行message.individualAcknowledge()

message.individualAcknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java

public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {
​
   //......
​
   public ClientMessageImpl individualAcknowledge() throws ActiveMQException {
      if (consumer != null) {
         consumer.individualAcknowledge(this);
      }
​
      return this;
   }
​
   //......
}
  • ClientMessageImpl的individualAcknowledge方法会执行consumer.individualAcknowledge(this)

consumer.individualAcknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {
​
   //......
​
   public void individualAcknowledge(ClientMessage message) throws ActiveMQException {
      if (lastAckedMessage != null) {
         flushAcks();
      }
​
      session.individualAcknowledge(this, message);
   }
​
   public void flushAcks() throws ActiveMQException {
      if (lastAckedMessage != null) {
         if (logger.isTraceEnabled()) {
            logger.trace(this + "::FlushACK acking lastMessage::" + lastAckedMessage);
         }
         doAck(lastAckedMessage);
      }
   }
​
   private void doAck(final ClientMessageInternal message) throws ActiveMQException {
      ackBytes = 0;
​
      lastAckedMessage = null;
​
      if (logger.isTraceEnabled()) {
         logger.trace(this + "::Acking message " + message);
      }
​
      session.acknowledge(this, message);
   }
​
   //......
}
  • ClientConsumerImpl的individualAcknowledge,对于lastAckedMessage不为null的先执行flushAcks,最后执行session.individualAcknowledge

session.individualAcknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {
​
   //......
​
   public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException {
      // if we're pre-acknowledging then we don't need to do anything
      if (preAcknowledge) {
         return;
      }
​
      checkClosed();
​
      startCall();
      try {
​
         sessionContext.sendACK(true, blockOnAcknowledge, consumer, message);
      } finally {
         endCall();
      }
   }
​
   //......
}
  • ClientSessionImpl的individualAcknowledge方法通过sessionContext.sendACK来发送ack

sessionContext.sendACK

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {
​
   //......
​
   public void sendACK(boolean individual,
                       boolean block,
                       final ClientConsumer consumer,
                       final Message message) throws ActiveMQException {
      PacketImpl messagePacket;
      if (individual) {
         messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
      } else {
         messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
      }
​
      if (block) {
         sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE);
      } else {
         sessionChannel.sendBatched(messagePacket);
      }
   }
​
   //......
}   
  • ActiveMQSessionContext的sendACK方法对于individual为true的创建的是SessionIndividualAcknowledgeMessage,最后通过sessionChannel.sendBlocking或者sessionChannel.sendBatched方法发送消息

ServerConsumerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
​
   //......
​
   public synchronized void acknowledge(Transaction tx, final long messageID) throws Exception {
      if (browseOnly) {
         return;
      }
​
      // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
      // acknowledged
​
      // We use a transaction here as if the message is not found, we should rollback anything done
      // This could eventually happen on retries during transactions, and we need to make sure we don't ACK things we are not supposed to acknowledge
​
      boolean startedTransaction = false;
​
      if (tx == null) {
         startedTransaction = true;
         tx = new TransactionImpl(storageManager);
      }
​
      try {
​
         MessageReference ref;
         do {
            synchronized (lock) {
               ref = deliveringRefs.poll();
            }
​
            if (logger.isTraceEnabled()) {
               logger.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);
            }
​
            if (ref == null) {
               ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
               tx.markAsRollbackOnly(ils);
               throw ils;
            }
​
            ref.acknowledge(tx, this);
​
            acks++;
         }
         while (ref.getMessageID() != messageID);
​
         if (startedTransaction) {
            tx.commit();
         }
      } catch (ActiveMQException e) {
         if (startedTransaction) {
            tx.rollback();
         } else {
            tx.markAsRollbackOnly(e);
         }
         throw e;
      } catch (Throwable e) {
         ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);
         ActiveMQException activeMQIllegalStateException = new ActiveMQIllegalStateException(e.getMessage());
         if (startedTransaction) {
            tx.rollback();
         } else {
            tx.markAsRollbackOnly(activeMQIllegalStateException);
         }
         throw activeMQIllegalStateException;
      }
   }
​
   public synchronized void individualAcknowledge(Transaction tx, final long messageID) throws Exception {
      if (browseOnly) {
         return;
      }
​
      boolean startedTransaction = false;
​
      if (logger.isTraceEnabled()) {
         logger.trace("individualACK messageID=" + messageID);
      }
​
      if (tx == null) {
         if (logger.isTraceEnabled()) {
            logger.trace("individualACK starting new TX");
         }
         startedTransaction = true;
         tx = new TransactionImpl(storageManager);
      }
​
      try {
​
         MessageReference ref;
         ref = removeReferenceByID(messageID);
​
         if (logger.isTraceEnabled()) {
            logger.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);
         }
​
         if (ref == null) {
            ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);
            tx.markAsRollbackOnly(ils);
            throw ils;
         }
​
         ref.acknowledge(tx, this);
​
         acks++;
​
         if (startedTransaction) {
            tx.commit();
         }
      } catch (ActiveMQException e) {
         if (startedTransaction) {
            tx.rollback();
         } else if (tx != null) {
            tx.markAsRollbackOnly(e);
         }
         throw e;
      } catch (Throwable e) {
         ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);
         ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());
         if (startedTransaction) {
            tx.rollback();
         } else if (tx != null) {
            tx.markAsRollbackOnly(hqex);
         }
         throw hqex;
      }
​
   }
​
   //......
}   
  • ServerConsumerImpl的individualAcknowledge方法先根据messageID将该消息从deliveringRefs中移除,然后执行ref.acknowledge(tx, this);而普通的acknowledge方法则是不断循环执行deliveringRefs.poll(),然后执行ref.acknowledge(tx, this),直到取到指定messageID的ref才跳出循环

小结

ActiveMQMessage的acknowledge方法对于individualAck为true的会单独执行message.individualAcknowledge();ClientMessageImpl的individualAcknowledge方法会执行consumer.individualAcknowledge(this);ClientConsumerImpl的individualAcknowledge,对于lastAckedMessage不为null的先执行flushAcks,最后执行session.individualAcknowledge;ClientSessionImpl的individualAcknowledge方法通过sessionContext.sendACK来发送ack;ActiveMQSessionContext的sendACK方法对于individual为true的创建的是SessionIndividualAcknowledgeMessage,最后通过sessionChannel.sendBlocking或者sessionChannel.sendBatched方法发送消息

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊artemis的individualAcknowledge

    activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/art...

    codecraft
  • 聊聊artemis的ExpiryScanner

    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis...

    codecraft
  • 聊聊artemis的ExpiryScanner

    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis...

    codecraft
  • 聊聊artemis的individualAcknowledge

    activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/art...

    codecraft
  • Day3-linux用户管理

    腾讯微博要关停了,说起来还是比较感慨的,当年为了和新浪竞争砸了不少钱,就因为晚了8个月一直搞不过,可能也有其他原因,如果微博做熟人社交对qq的影响很大,谁知道后...

    机智的程序员小熊
  • 23种设计模式之备忘录模式

    定义: 在不破坏封装性的前提下, 捕获一个对象的内部状态, 并在该对象之外保存这个状态. 这样以后就可将该对象回复到原先保存的状态

    烟草的香味
  • Fortify Audit Workbench 笔记 Access Control: Database

    如果没有适当的 access control,就会执行一个包含用户控制主键的 SQL 指令,从而允许攻击者访问未经授权的记录。

    用户1637609
  • cloudera第五天

    为了便于显示,请选择15分钟作为间隔时间。您不局限于单个列,也可以将其视为两列显示。从左上角选择两列布局。在这里,让我们将饼图拖动到左列中新创建的行。

    DataScience
  • 左右互搏术?SQL注入攻击自己一年前写的MD5加密程序

    上软件工程这门课的时候,王老师说写代码的时候要严谨,顺带地提到了SQL注入并进行了简单的演示。那么什么是SQL注入呢?SQL注入是一种注入攻击,由于应用程序对用...

    喜欢ctrl的cxk
  • .net持续集成sonarqube篇之sonarqube基本操作(二)

    Activity界面主要是对多次构建管理界面,主要是帮助管理员快速了解项目每次构建与以往构建相比问题是增加了还是减少了等指标.由于目前我们仅进行了一次构建,因此...

    Edison.Ma

扫码关注云+社区

领取腾讯云代金券