专栏首页码匠的流水账聊聊artemis的SessionConsumerFlowCreditMessage
原创

聊聊artemis的SessionConsumerFlowCreditMessage

本文主要研究一下artemis的SessionConsumerFlowCreditMessage

SessionConsumerFlowCreditMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionConsumerFlowCreditMessage.java

public class SessionConsumerFlowCreditMessage extends PacketImpl {
​
   private long consumerID;
   private int credits;
​
   public SessionConsumerFlowCreditMessage(final long consumerID, final int credits) {
      super(SESS_FLOWTOKEN);
      this.consumerID = consumerID;
      this.credits = credits;
   }
​
   public SessionConsumerFlowCreditMessage() {
      super(SESS_FLOWTOKEN);
   }
​
   // Public --------------------------------------------------------
​
   public long getConsumerID() {
      return consumerID;
   }
​
   public int getCredits() {
      return credits;
   }
​
   @Override
   public void encodeRest(final ActiveMQBuffer buffer) {
      buffer.writeLong(consumerID);
      buffer.writeInt(credits);
   }
​
   @Override
   public void decodeRest(final ActiveMQBuffer buffer) {
      consumerID = buffer.readLong();
      credits = buffer.readInt();
   }
​
   @Override
   public String toString() {
      return getParentString() + ", consumerID=" + consumerID + ", credits=" + credits + "]";
   }
​
   @Override
   public int hashCode() {
      final int prime = 31;
      int result = super.hashCode();
      result = prime * result + (int) (consumerID ^ (consumerID >>> 32));
      result = prime * result + credits;
      return result;
   }
​
   @Override
   public boolean equals(Object obj) {
      if (this == obj)
         return true;
      if (!super.equals(obj))
         return false;
      if (!(obj instanceof SessionConsumerFlowCreditMessage))
         return false;
      SessionConsumerFlowCreditMessage other = (SessionConsumerFlowCreditMessage) obj;
      if (consumerID != other.consumerID)
         return false;
      if (credits != other.credits)
         return false;
      return true;
   }
}
  • SessionConsumerFlowCreditMessage继承了PacketImpl,其type为SESS_FLOWTOKEN

ServerSessionPacketHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java

public class ServerSessionPacketHandler implements ChannelHandler {
​
   //......
​
   private volatile AtomicInteger availableCredits = new AtomicInteger(0);
​
   //......
​
   private void onMessagePacket(final Packet packet) {
      if (logger.isTraceEnabled()) {
         logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
      }
      final byte type = packet.getType();
      switch (type) {
         case SESS_SEND: {
            onSessionSend(packet);
            break;
         }
         case SESS_ACKNOWLEDGE: {
            onSessionAcknowledge(packet);
            break;
         }
         case SESS_PRODUCER_REQUEST_CREDITS: {
            onSessionRequestProducerCredits(packet);
            break;
         }
         case SESS_FLOWTOKEN: {
            onSessionConsumerFlowCredit(packet);
            break;
         }
         default:
            // separating a method for everything else as JIT was faster this way
            slowPacketHandler(packet);
            break;
      }
   }
​
   private void onSessionConsumerFlowCredit(Packet packet) {
      this.storageManager.setContext(session.getSessionContext());
      try {
         Packet response = null;
         boolean requiresResponse = false;
         try {
            SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
            session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
         } catch (ActiveMQIOErrorException e) {
            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
         } catch (ActiveMQXAException e) {
            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
         } catch (ActiveMQQueueMaxConsumerLimitReached e) {
            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
         } catch (ActiveMQException e) {
            response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
         } catch (Throwable t) {
            response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
         }
         sendResponse(packet, response, false, false);
      } finally {
         this.storageManager.clearContext();
      }
   }
​
   //......
}
  • onMessagePacket方法在type为SESS_FLOWTOKEN时执行onSessionConsumerFlowCredit方法;该方法执行的是session.receiveConsumerCredits以及sendResponse方法

ServerSessionImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java

public class ServerSessionImpl implements ServerSession, FailureListener {
​
   //......
​
   public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception {
      ServerConsumer consumer = locateConsumer(consumerID);
​
      if (consumer == null) {
         logger.debug("There is no consumer with id " + consumerID);
​
         return;
      }
​
      consumer.receiveCredits(credits);
   }
​
   //......
}
  • receiveConsumerCredits方法执行的是consumer.receiveCredits方法

ServerConsumerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
​
   //......
​
   public void receiveCredits(final int credits) {
      if (credits == -1) {
         if (logger.isDebugEnabled()) {
            logger.debug(this + ":: FlowControl::Received disable flow control message");
         }
         // No flow control
         availableCredits = null;
​
         // There may be messages already in the queue
         promptDelivery();
      } else if (credits == 0) {
         // reset, used on slow consumers
         logger.debug(this + ":: FlowControl::Received reset flow control message");
         availableCredits.set(0);
      } else {
         int previous = availableCredits.getAndAdd(credits);
​
         if (logger.isDebugEnabled()) {
            logger.debug(this + "::FlowControl::Received " +
                            credits +
                            " credits, previous value = " +
                            previous +
                            " currentValue = " +
                            availableCredits.get());
         }
​
         if (previous <= 0 && previous + credits > 0) {
            if (logger.isTraceEnabled()) {
               logger.trace(this + "::calling promptDelivery from receiving credits");
            }
            promptDelivery();
         }
      }
   }
​
   public void promptDelivery() {
      // largeMessageDeliverer is always set inside a lock
      // if we don't acquire a lock, we will have NPE eventually
      if (largeMessageDeliverer != null) {
         resumeLargeMessage();
      } else {
         forceDelivery();
      }
   }
​
   private void forceDelivery() {
      if (browseOnly) {
         messageQueue.getExecutor().execute(browserDeliverer);
      } else {
         messageQueue.deliverAsync();
      }
   }
​
   //......
}
  • receiveCredits方法在credits为-1时设置availableCredits为null,然后执行promptDelivery方法;在credits为0时设置availableCredits为0;其他情况执行availableCredits.getAndAdd(credits);promptDelivery方法主要是执行resumeLargeMessage或者forceDelivery方法

小结

SessionConsumerFlowCreditMessage继承了PacketImpl,其type为SESS_FLOWTOKEN;ServerSessionPacketHandler的onMessagePacket方法在type为SESS_FLOWTOKEN时执行onSessionConsumerFlowCredit方法;该方法执行的是session.receiveConsumerCredits以及sendResponse方法;receiveConsumerCredits方法在receiveCredits方法在credits为-1时设置availableCredits为null,然后执行promptDelivery方法;在credits为0时设置availableCredits为0;其他情况执行availableCredits.getAndAdd(credits)

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊artemis的SessionConsumerFlowCreditMessage

    本文主要研究一下artemis的SessionConsumerFlowCreditMessage

    codecraft
  • 聊聊artemis的gracefulShutdownEnabled

    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis...

    codecraft
  • 聊聊artemis的gracefulShutdownEnabled

    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis...

    codecraft
  • 聊聊artemis的SessionConsumerFlowCreditMessage

    本文主要研究一下artemis的SessionConsumerFlowCreditMessage

    codecraft
  • android PakageManagerService启动流程分析

    PakageManagerService的启动流程图 ? 1.PakageManagerService概述 PakageManagerService是andro...

    xiangzhihong
  • 如何通过恶意宏劫持桌面快捷方式提供后门

    多年以来,一直都有攻击者使用恶意宏来传播恶意软件,并且还设计出了各种方法来让这种技术变得更加有效。近期,研究人员观察到了一种更加隐蔽的基于宏的攻击活动,在这个攻...

    FB客服
  • 【每周一坑】缩小图片尺寸

    之前我们的题目大多偏向解决数学问题,今天来一道偏应用的: 我们知道,通常来说一张图片的分辨率越高,它就越清晰,但文件占用的空间就越大。有时候我们并不需要那么高的...

    Crossin先生
  • OC 密码验证(正则+连续输入+输入过于简单判断)

    GuangdongQi
  • 01背包问题(动态规划)python实现

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    后端技术漫谈
  • Python中模块

           模块对我来说,感觉就像亲属或者朋友已经走过的路,他们已经趟过的浑水、掉过的坑、践行过的路线,全部提供给你,在你需要的时候请求帮助,借鉴他们的解决方...

    用户2398817

扫码关注云+社区

领取腾讯云代金券