专栏首页码匠的流水账聊聊artemis的MessageLoadBalancingType
原创

聊聊artemis的MessageLoadBalancingType

本文主要研究一下artemis的MessageLoadBalancingType

MessageLoadBalancingType

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java

public enum MessageLoadBalancingType {
   OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND");
​
   static {
      // for URI support on ClusterConnection
      BeanSupport.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class);
   }
​
   static class MessageLoadBalancingTypeConverter implements Converter {
​
      @Override
      public <T> T convert(Class<T> type, Object value) {
         return type.cast(MessageLoadBalancingType.getType(value.toString()));
      }
   }
​
   private String type;
​
   MessageLoadBalancingType(final String type) {
      this.type = type;
   }
​
   public String getType() {
      return type;
   }
​
   public static MessageLoadBalancingType getType(String string) {
      if (string.equals(OFF.getType())) {
         return MessageLoadBalancingType.OFF;
      } else if (string.equals(STRICT.getType())) {
         return MessageLoadBalancingType.STRICT;
      } else if (string.equals(ON_DEMAND.getType())) {
         return MessageLoadBalancingType.ON_DEMAND;
      } else {
         return null;
      }
   }
}
  • MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值

PostOfficeImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {
​
   //......
​
   public Pair<RoutingContext, Message> redistribute(final Message message,
                                                     final Queue originatingQueue,
                                                     final Transaction tx) throws Exception {
      Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
​
      if (bindings != null && bindings.allowRedistribute()) {
         // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
         // arrived the target node
         // as described on https://issues.jboss.org/browse/JBPAPP-6130
         Message copyRedistribute = message.copy(storageManager.generateID());
         copyRedistribute.setAddress(originatingQueue.getAddress());
​
         if (tx != null) {
            tx.addOperation(new TransactionOperationAbstract() {
               @Override
               public void afterRollback(Transaction tx) {
                  try {
                     //this will cause large message file to be
                     //cleaned up
                     copyRedistribute.decrementRefCount();
                  } catch (Exception e) {
                     logger.warn("Failed to clean up message: " + copyRedistribute);
                  }
               }
            });
         }
​
         RoutingContext context = new RoutingContextImpl(tx);
​
         boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);
​
         if (routed) {
            return new Pair<>(context, copyRedistribute);
         }
      }
​
      return null;
   }
​
   //......
}
  • PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute

BindingsImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java

public final class BindingsImpl implements Bindings {
​
   //......
​
   @Override
   public boolean allowRedistribute() {
      return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
   }
​
   @Override
   public boolean redistribute(final Message message,
                               final Queue originatingQueue,
                               final RoutingContext context) throws Exception {
      if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
         return false;
      }
​
      if (logger.isTraceEnabled()) {
         logger.trace("Redistributing message " + message);
      }
​
      SimpleString routingName = originatingQueue.getName();
​
      List<Binding> bindings = routingNameBindingMap.get(routingName);
​
      if (bindings == null) {
         // The value can become null if it's concurrently removed while we're iterating - this is expected
         // ConcurrentHashMap behaviour!
         return false;
      }
​
      Integer ipos = routingNamePositions.get(routingName);
​
      int pos = ipos != null ? ipos.intValue() : 0;
​
      int length = bindings.size();
​
      int startPos = pos;
​
      Binding theBinding = null;
​
      // TODO - combine this with similar logic in route()
      while (true) {
         Binding binding;
         try {
            binding = bindings.get(pos);
         } catch (IndexOutOfBoundsException e) {
            // This can occur if binding is removed while in route
            if (!bindings.isEmpty()) {
               pos = 0;
               startPos = 0;
               length = bindings.size();
​
               continue;
            } else {
               break;
            }
         }
​
         pos = incrementPos(pos, length);
​
         Filter filter = binding.getFilter();
​
         boolean highPrior = binding.isHighAcceptPriority(message);
​
         if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {
            theBinding = binding;
​
            break;
         }
​
         if (pos == startPos) {
            break;
         }
      }
​
      routingNamePositions.put(routingName, pos);
​
      if (theBinding != null) {
         theBinding.route(message, context);
​
         return true;
      } else {
         return false;
      }
   }
​
   //......
}
  • BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法

小结

MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值;PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute;BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊artemis的MessageLoadBalancingType

    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis...

    codecraft
  • shedlock源码解析

    shedlock-core-0.16.1-sources.jar!/net/javacrumbs/shedlock/core/LockProvider.java

    codecraft
  • 聊聊dubbo的ExecutionDispatcher

    dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/rem...

    codecraft
  • 聊聊artemis的MessageLoadBalancingType

    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis...

    codecraft
  • 高文院士:城市大脑的「痛点」与「突破」丨CCF-GAIR 2020

    2020 年 8 月 7 日,第五届全球人工智能与机器人峰会(CCF-GAIR 2020)于深圳正式拉开帷幕。

    AI科技评论
  • WebSocket客户端断开连接后,服务器端的处理机制

    版权声明:本文为博主汪子熙原创文章,未经博主允许不得转载。 https://jerry.bl...

    Jerry Wang
  • 总结 XSS 与 CSRF 两种跨站攻击

    作者:Jiangge Zhang 来源:https://blog.tonyseek.com/post/introduce-to-xss-and-csrf/(点击...

    java达人
  • Windows 实现单实例进程的两种方法

    此方法参见《Windows 核心编程》第 5 版 17.1.2 章节《在同一个可执行文件或 DLL 的多个实例间共享静态数据》。

    mzlogin
  • 前端科普系列(4):Babel —— 把 ES6 送上天的通天塔

    在上一节 《CommonJS:不是前端却革命了前端》中,我们聊到了 ES6 Module,它是 ES6 中对模块的规范,ES6 是 ECMAScript 6.0...

    2020labs小助手
  • SysML 2019论文解读:视频分析系统的提升

    系统与机器学习会议(SysML)是一个非常新的会议(始于 2018 年),针对的是系统与机器学习的交叉领域。该会议的目标是引出这些领域之间的新联系,包括确定学习...

    机器之心

扫码关注云+社区

领取腾讯云代金券