前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的SlowConsumerReaperRunnable

聊聊artemis的SlowConsumerReaperRunnable

原创
作者头像
code4it
修改2020-02-10 15:54:05
3180
修改2020-02-10 15:54:05
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的SlowConsumerReaperRunnable

SlowConsumerPolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerPolicy.java

代码语言:javascript
复制
public enum SlowConsumerPolicy {
   KILL, NOTIFY;
​
   public static SlowConsumerPolicy getType(int type) {
      switch (type) {
         case 0:
            return KILL;
         case 1:
            return NOTIFY;
         default:
            return null;
      }
   }
}
  • SlowConsumerPolicy定义了KILL、NOTIFY两个枚举值

SlowConsumerReaperRunnable

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

代码语言:javascript
复制
   private final class SlowConsumerReaperRunnable implements Runnable {
​
      private final SlowConsumerPolicy policy;
      private final float threshold;
      private final long checkPeriod;
​
      private SlowConsumerReaperRunnable(long checkPeriod, float threshold, SlowConsumerPolicy policy) {
         this.checkPeriod = checkPeriod;
         this.policy = policy;
         this.threshold = threshold;
      }
​
      @Override
      public void run() {
         float queueRate = getRate();
         long queueMessages = getMessageCount();
​
         if (logger.isDebugEnabled()) {
            logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
         }
​
​
         if (consumers.size() == 0) {
            logger.debug("There are no consumers, no need to check slow consumer's rate");
            return;
         } else {
            float queueThreshold = threshold * consumers.size();
​
            if (queueRate < queueThreshold && queueMessages < queueThreshold) {
               if (logger.isDebugEnabled()) {
                  logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
               }
               return;
            }
         }
​
         for (ConsumerHolder consumerHolder : consumers) {
            Consumer consumer = consumerHolder.consumer();
            if (consumer instanceof ServerConsumerImpl) {
               ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
               float consumerRate = serverConsumer.getRate();
               if (consumerRate < threshold) {
                  RemotingConnection connection = null;
                  ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
                  RemotingService remotingService = server.getRemotingService();
​
                  for (RemotingConnection potentialConnection : remotingService.getConnections()) {
                     if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
                        connection = potentialConnection;
                     }
                  }
​
                  serverConsumer.fireSlowConsumer();
​
                  if (connection != null) {
                     ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
                     if (policy.equals(SlowConsumerPolicy.KILL)) {
                        connection.killMessage(server.getNodeID());
                        remotingService.removeConnection(connection.getID());
                        connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
                     } else if (policy.equals(SlowConsumerPolicy.NOTIFY)) {
                        TypedProperties props = new TypedProperties();
​
                        props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, getConsumerCount());
​
                        props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);
​
                        props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(connection.getRemoteAddress()));
​
                        if (connection.getID() != null) {
                           props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(connection.getID().toString()));
                        }
​
                        props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, serverConsumer.getID());
​
                        props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(serverConsumer.getSessionID()));
​
                        Notification notification = new Notification(null, CoreNotificationType.CONSUMER_SLOW, props);
​
                        ManagementService managementService = ((PostOfficeImpl) postOffice).getServer().getManagementService();
                        try {
                           managementService.sendNotification(notification);
                        } catch (Exception e) {
                           ActiveMQServerLogger.LOGGER.failedToSendSlowConsumerNotification(notification, e);
                        }
                     }
                  }
               }
            }
         }
      }
   }
  • SlowConsumerReaperRunnable实现了Runnable接口,其run方法会遍历consumers,对于ServerConsumerImply在其consumerRate小于threshold时执行serverConsumer.fireSlowConsumer();之后对于connection不为null的根据policy进行不同的处理,若为SlowConsumerPolicy.KILL则执行connection.killMessage(server.getNodeID())、connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())),若为SlowConsumerPolicy.NOTIFY则构建NotificationType为CoreNotificationType.CONSUMER_SLOW的notification执行managementService.sendNotification(notification)

fireSlowConsumer

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

代码语言:javascript
复制
public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
​
   //......
​
   public void fireSlowConsumer() {
      if (slowConsumerListener != null) {
         slowConsumerListener.onSlowConsumer(this);
      }
   }
​
   //......
}
  • fireSlowConsumer执行的是slowConsumerListener.onSlowConsumer(this)方法

SlowConsumerDetection

activemq-artemis-2.11.0/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java

代码语言:javascript
复制
   class SlowConsumerDetection implements SlowConsumerDetectionListener {
​
      @Override
      public void onSlowConsumer(ServerConsumer consumer) {
         if (consumer.getProtocolData() != null && consumer.getProtocolData() instanceof AMQConsumer) {
            AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData();
            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(amqConsumer.getOpenwireDestination());
            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
            try {
               advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString());
               protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId(), null);
            } catch (Exception e) {
               ActiveMQServerLogger.LOGGER.warn("Error during method invocation", e);
            }
         }
      }
   }
  • SlowConsumerDetection实现了SlowConsumerDetectionListener接口,其onSlowConsumer方法执行的是protocolManager.fireAdvisory方法

RemotingConnectionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java

代码语言:javascript
复制
public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {
​
   //......
​
   public void killMessage(SimpleString nodeID) {
      if (channelVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
         return;
      }
      Channel clientChannel = getChannel(1, -1);
      DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID);
​
      clientChannel.send(response, -1);
   }
​
   public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
      synchronized (failLock) {
         if (destroyed) {
            return;
         }
​
         destroyed = true;
      }
​
      if (!(me instanceof ActiveMQRemoteDisconnectException)) {
         ActiveMQClientLogger.LOGGER.connectionFailureDetected(transportConnection.getRemoteAddress(), me.getMessage(), me.getType());
      }
​
      try {
         transportConnection.forceClose();
      } catch (Throwable e) {
         ActiveMQClientLogger.LOGGER.failedForceClose(e);
      }
​
      // Then call the listeners
      callFailureListeners(me, scaleDownTargetNodeID);
​
      callClosingListeners();
​
      internalClose();
​
      for (Channel channel : channels.values()) {
         channel.returnBlocking(me);
      }
   }
​
   //......
}   
  • killMessage方法构造DisconnectConsumerWithKillMessage并通过clientChannel.send方法;fail方法则执行transportConnection.forceClose()、callFailureListeners、callClosingListeners、internalClose以及channel.returnBlocking方法

sendNotification

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java

代码语言:javascript
复制
public class ManagementServiceImpl implements ManagementService {
​
   //......
​
   public void sendNotification(final Notification notification) throws Exception {
      if (logger.isTraceEnabled()) {
         logger.trace("Sending Notification = " + notification +
                         ", notificationEnabled=" + notificationsEnabled +
                         " messagingServerControl=" + messagingServerControl);
      }
      // This needs to be synchronized since we need to ensure notifications are processed in strict sequence
      synchronized (this) {
         if (messagingServerControl != null && notificationsEnabled) {
            // We also need to synchronize on the post office notification lock
            // otherwise we can get notifications arriving in wrong order / missing
            // if a notification occurs at same time as sendQueueInfoToQueue is processed
            synchronized (postOffice.getNotificationLock()) {
​
               // First send to any local listeners
               for (NotificationListener listener : listeners) {
                  try {
                     listener.onNotification(notification);
                  } catch (Exception e) {
                     // Exception thrown from one listener should not stop execution of others
                     ActiveMQServerLogger.LOGGER.errorCallingNotifListener(e);
                  }
               }
​
               // start sending notification *messages* only when server has initialised
               // Note at backup initialisation we don't want to send notifications either
               // https://jira.jboss.org/jira/browse/HORNETQ-317
               if (messagingServer == null || !messagingServer.isActive()) {
                  if (logger.isDebugEnabled()) {
                     logger.debug("ignoring message " + notification + " as the server is not initialized");
                  }
                  return;
               }
​
               long messageID = storageManager.generateID();
​
               Message notificationMessage = new CoreMessage(messageID, 512);
​
               // Notification messages are always durable so the user can choose whether to add a durable queue to
               // consume them in
               notificationMessage.setDurable(true);
               notificationMessage.setAddress(managementNotificationAddress);
​
               if (notification.getProperties() != null) {
                  TypedProperties props = notification.getProperties();
                  props.forEach(notificationMessage::putObjectProperty);
               }
​
               notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));
​
               notificationMessage.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
​
               if (notification.getUID() != null) {
                  notificationMessage.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));
               }
​
               postOffice.route(notificationMessage, false);
            }
         }
      }
   }
​
   //......
}
  • sendNotification方法会回调listeners的onNotification方法,之后通过postOffice.route(notificationMessage, false)发送notificationMessage

小结

SlowConsumerReaperRunnable实现了Runnable接口,其run方法会遍历consumers,对于ServerConsumerImply在其consumerRate小于threshold时执行serverConsumer.fireSlowConsumer();之后对于connection不为null的根据policy进行不同的处理,若为SlowConsumerPolicy.KILL则执行connection.killMessage(server.getNodeID())、connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())),若为SlowConsumerPolicy.NOTIFY则构建NotificationType为CoreNotificationType.CONSUMER_SLOW的notification执行managementService.sendNotification(notification)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SlowConsumerPolicy
  • SlowConsumerReaperRunnable
  • fireSlowConsumer
  • SlowConsumerDetection
  • RemotingConnectionImpl
  • sendNotification
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档