首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的MessageLoadBalancingType

聊聊artemis的MessageLoadBalancingType

原创
作者头像
code4it
修改2020-02-17 11:46:32
2580
修改2020-02-17 11:46:32
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的MessageLoadBalancingType

MessageLoadBalancingType

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java

public enum MessageLoadBalancingType {
   OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND");
​
   static {
      // for URI support on ClusterConnection
      BeanSupport.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class);
   }
​
   static class MessageLoadBalancingTypeConverter implements Converter {
​
      @Override
      public <T> T convert(Class<T> type, Object value) {
         return type.cast(MessageLoadBalancingType.getType(value.toString()));
      }
   }
​
   private String type;
​
   MessageLoadBalancingType(final String type) {
      this.type = type;
   }
​
   public String getType() {
      return type;
   }
​
   public static MessageLoadBalancingType getType(String string) {
      if (string.equals(OFF.getType())) {
         return MessageLoadBalancingType.OFF;
      } else if (string.equals(STRICT.getType())) {
         return MessageLoadBalancingType.STRICT;
      } else if (string.equals(ON_DEMAND.getType())) {
         return MessageLoadBalancingType.ON_DEMAND;
      } else {
         return null;
      }
   }
}
  • MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值

PostOfficeImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {
​
   //......
​
   public Pair<RoutingContext, Message> redistribute(final Message message,
                                                     final Queue originatingQueue,
                                                     final Transaction tx) throws Exception {
      Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
​
      if (bindings != null && bindings.allowRedistribute()) {
         // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
         // arrived the target node
         // as described on https://issues.jboss.org/browse/JBPAPP-6130
         Message copyRedistribute = message.copy(storageManager.generateID());
         copyRedistribute.setAddress(originatingQueue.getAddress());
​
         if (tx != null) {
            tx.addOperation(new TransactionOperationAbstract() {
               @Override
               public void afterRollback(Transaction tx) {
                  try {
                     //this will cause large message file to be
                     //cleaned up
                     copyRedistribute.decrementRefCount();
                  } catch (Exception e) {
                     logger.warn("Failed to clean up message: " + copyRedistribute);
                  }
               }
            });
         }
​
         RoutingContext context = new RoutingContextImpl(tx);
​
         boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);
​
         if (routed) {
            return new Pair<>(context, copyRedistribute);
         }
      }
​
      return null;
   }
​
   //......
}
  • PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute

BindingsImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java

public final class BindingsImpl implements Bindings {
​
   //......
​
   @Override
   public boolean allowRedistribute() {
      return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
   }
​
   @Override
   public boolean redistribute(final Message message,
                               final Queue originatingQueue,
                               final RoutingContext context) throws Exception {
      if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
         return false;
      }
​
      if (logger.isTraceEnabled()) {
         logger.trace("Redistributing message " + message);
      }
​
      SimpleString routingName = originatingQueue.getName();
​
      List<Binding> bindings = routingNameBindingMap.get(routingName);
​
      if (bindings == null) {
         // The value can become null if it's concurrently removed while we're iterating - this is expected
         // ConcurrentHashMap behaviour!
         return false;
      }
​
      Integer ipos = routingNamePositions.get(routingName);
​
      int pos = ipos != null ? ipos.intValue() : 0;
​
      int length = bindings.size();
​
      int startPos = pos;
​
      Binding theBinding = null;
​
      // TODO - combine this with similar logic in route()
      while (true) {
         Binding binding;
         try {
            binding = bindings.get(pos);
         } catch (IndexOutOfBoundsException e) {
            // This can occur if binding is removed while in route
            if (!bindings.isEmpty()) {
               pos = 0;
               startPos = 0;
               length = bindings.size();
​
               continue;
            } else {
               break;
            }
         }
​
         pos = incrementPos(pos, length);
​
         Filter filter = binding.getFilter();
​
         boolean highPrior = binding.isHighAcceptPriority(message);
​
         if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {
            theBinding = binding;
​
            break;
         }
​
         if (pos == startPos) {
            break;
         }
      }
​
      routingNamePositions.put(routingName, pos);
​
      if (theBinding != null) {
         theBinding.route(message, context);
​
         return true;
      } else {
         return false;
      }
   }
​
   //......
}
  • BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法

小结

MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值;PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute;BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MessageLoadBalancingType
  • PostOfficeImpl
  • BindingsImpl
  • 小结
  • doc
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档