前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的MessageLoadBalancingType

聊聊artemis的MessageLoadBalancingType

作者头像
code4it
发布2020-02-24 09:47:41
2680
发布2020-02-24 09:47:41
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下artemis的MessageLoadBalancingType

MessageLoadBalancingType

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java

代码语言:javascript
复制
public enum MessageLoadBalancingType {
   OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND");

   static {
      // for URI support on ClusterConnection
      BeanSupport.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class);
   }

   static class MessageLoadBalancingTypeConverter implements Converter {

      @Override
      public <T> T convert(Class<T> type, Object value) {
         return type.cast(MessageLoadBalancingType.getType(value.toString()));
      }
   }

   private String type;

   MessageLoadBalancingType(final String type) {
      this.type = type;
   }

   public String getType() {
      return type;
   }

   public static MessageLoadBalancingType getType(String string) {
      if (string.equals(OFF.getType())) {
         return MessageLoadBalancingType.OFF;
      } else if (string.equals(STRICT.getType())) {
         return MessageLoadBalancingType.STRICT;
      } else if (string.equals(ON_DEMAND.getType())) {
         return MessageLoadBalancingType.ON_DEMAND;
      } else {
         return null;
      }
   }
}
  • MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值

PostOfficeImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

代码语言:javascript
复制
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {

   //......

   public Pair<RoutingContext, Message> redistribute(final Message message,
                                                     final Queue originatingQueue,
                                                     final Transaction tx) throws Exception {
      Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());

      if (bindings != null && bindings.allowRedistribute()) {
         // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
         // arrived the target node
         // as described on https://issues.jboss.org/browse/JBPAPP-6130
         Message copyRedistribute = message.copy(storageManager.generateID());
         copyRedistribute.setAddress(originatingQueue.getAddress());

         if (tx != null) {
            tx.addOperation(new TransactionOperationAbstract() {
               @Override
               public void afterRollback(Transaction tx) {
                  try {
                     //this will cause large message file to be
                     //cleaned up
                     copyRedistribute.decrementRefCount();
                  } catch (Exception e) {
                     logger.warn("Failed to clean up message: " + copyRedistribute);
                  }
               }
            });
         }

         RoutingContext context = new RoutingContextImpl(tx);

         boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);

         if (routed) {
            return new Pair<>(context, copyRedistribute);
         }
      }

      return null;
   }

   //......
}
  • PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute

BindingsImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java

代码语言:javascript
复制
public final class BindingsImpl implements Bindings {

   //......

   @Override
   public boolean allowRedistribute() {
      return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
   }

   @Override
   public boolean redistribute(final Message message,
                               final Queue originatingQueue,
                               final RoutingContext context) throws Exception {
      if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
         return false;
      }

      if (logger.isTraceEnabled()) {
         logger.trace("Redistributing message " + message);
      }

      SimpleString routingName = originatingQueue.getName();

      List<Binding> bindings = routingNameBindingMap.get(routingName);

      if (bindings == null) {
         // The value can become null if it's concurrently removed while we're iterating - this is expected
         // ConcurrentHashMap behaviour!
         return false;
      }

      Integer ipos = routingNamePositions.get(routingName);

      int pos = ipos != null ? ipos.intValue() : 0;

      int length = bindings.size();

      int startPos = pos;

      Binding theBinding = null;

      // TODO - combine this with similar logic in route()
      while (true) {
         Binding binding;
         try {
            binding = bindings.get(pos);
         } catch (IndexOutOfBoundsException e) {
            // This can occur if binding is removed while in route
            if (!bindings.isEmpty()) {
               pos = 0;
               startPos = 0;
               length = bindings.size();

               continue;
            } else {
               break;
            }
         }

         pos = incrementPos(pos, length);

         Filter filter = binding.getFilter();

         boolean highPrior = binding.isHighAcceptPriority(message);

         if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {
            theBinding = binding;

            break;
         }

         if (pos == startPos) {
            break;
         }
      }

      routingNamePositions.put(routingName, pos);

      if (theBinding != null) {
         theBinding.route(message, context);

         return true;
      } else {
         return false;
      }
   }

   //......
}
  • BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法

小结

MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值;PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute;BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法

doc

  • MessageLoadBalancingType
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MessageLoadBalancingType
  • PostOfficeImpl
  • BindingsImpl
  • 小结
  • doc
相关产品与服务
云数据库 Redis®
腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档