本文主要研究一下artemis的MessageLoadBalancingType
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java
public enum MessageLoadBalancingType {
OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND");
static {
// for URI support on ClusterConnection
BeanSupport.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class);
}
static class MessageLoadBalancingTypeConverter implements Converter {
@Override
public <T> T convert(Class<T> type, Object value) {
return type.cast(MessageLoadBalancingType.getType(value.toString()));
}
}
private String type;
MessageLoadBalancingType(final String type) {
this.type = type;
}
public String getType() {
return type;
}
public static MessageLoadBalancingType getType(String string) {
if (string.equals(OFF.getType())) {
return MessageLoadBalancingType.OFF;
} else if (string.equals(STRICT.getType())) {
return MessageLoadBalancingType.STRICT;
} else if (string.equals(ON_DEMAND.getType())) {
return MessageLoadBalancingType.ON_DEMAND;
} else {
return null;
}
}
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {
//......
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
if (bindings != null && bindings.allowRedistribute()) {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
copyRedistribute.setAddress(originatingQueue.getAddress());
if (tx != null) {
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterRollback(Transaction tx) {
try {
//this will cause large message file to be
//cleaned up
copyRedistribute.decrementRefCount();
} catch (Exception e) {
logger.warn("Failed to clean up message: " + copyRedistribute);
}
}
});
}
RoutingContext context = new RoutingContextImpl(tx);
boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);
if (routed) {
return new Pair<>(context, copyRedistribute);
}
}
return null;
}
//......
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
public final class BindingsImpl implements Bindings {
//......
@Override
public boolean allowRedistribute() {
return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
}
@Override
public boolean redistribute(final Message message,
final Queue originatingQueue,
final RoutingContext context) throws Exception {
if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Redistributing message " + message);
}
SimpleString routingName = originatingQueue.getName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
if (bindings == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
return false;
}
Integer ipos = routingNamePositions.get(routingName);
int pos = ipos != null ? ipos.intValue() : 0;
int length = bindings.size();
int startPos = pos;
Binding theBinding = null;
// TODO - combine this with similar logic in route()
while (true) {
Binding binding;
try {
binding = bindings.get(pos);
} catch (IndexOutOfBoundsException e) {
// This can occur if binding is removed while in route
if (!bindings.isEmpty()) {
pos = 0;
startPos = 0;
length = bindings.size();
continue;
} else {
break;
}
}
pos = incrementPos(pos, length);
Filter filter = binding.getFilter();
boolean highPrior = binding.isHighAcceptPriority(message);
if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {
theBinding = binding;
break;
}
if (pos == startPos) {
break;
}
}
routingNamePositions.put(routingName, pos);
if (theBinding != null) {
theBinding.route(message, context);
return true;
} else {
return false;
}
}
//......
}
MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值;PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute;BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法