前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis JMSBridge的QualityOfServiceMode

聊聊artemis JMSBridge的QualityOfServiceMode

原创
作者头像
code4it
修改2020-02-10 17:13:41
2700
修改2020-02-10 17:13:41
举报

本文主要研究一下artemis JMSBridge的QualityOfServiceMode

QualityOfServiceMode

activemq-artemis-2.11.0/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/QualityOfServiceMode.java

public enum QualityOfServiceMode {
   AT_MOST_ONCE(0), DUPLICATES_OK(1), ONCE_AND_ONLY_ONCE(2);
​
   private final int value;
​
   QualityOfServiceMode(final int value) {
      this.value = value;
   }
​
   public int intValue() {
      return value;
   }
​
   public static QualityOfServiceMode valueOf(final int value) {
      if (value == AT_MOST_ONCE.value) {
         return AT_MOST_ONCE;
      }
      if (value == DUPLICATES_OK.value) {
         return DUPLICATES_OK;
      }
      if (value == ONCE_AND_ONLY_ONCE.value) {
         return ONCE_AND_ONLY_ONCE;
      }
      throw new IllegalArgumentException("invalid QualityOfServiceMode value: " + value);
   }
​
}
  • QualityOfServiceMode定义了三个枚举值,分别是AT_MOST_ONCE、DUPLICATES_OK、ONCE_AND_ONLY_ONCE

sendBatchNonTransacted

activemq-artemis-2.11.0/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java

public final class JMSBridgeImpl implements JMSBridge {
​
   //......
​
   private void sendBatchNonTransacted() {
      try {
         if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE || (qualityOfServiceMode == QualityOfServiceMode.AT_MOST_ONCE && maxBatchSize > 1)) {
            // We client ack before sending
​
            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Client acking source session");
            }
​
            messages.getLast().acknowledge();
​
            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Client acked source session");
            }
         }
​
         boolean exHappened;
​
         do {
            exHappened = false;
            try {
               sendMessages();
            } catch (TransactionRolledbackException e) {
               ActiveMQJMSBridgeLogger.LOGGER.transactionRolledBack(e);
               exHappened = true;
            }
         }
         while (exHappened);
​
         if (maxBatchSize > 1) {
            // The sending session is transacted - we need to commit it
​
            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Committing target session");
            }
​
            targetSession.commit();
​
            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Committed target session");
            }
         }
​
         if (qualityOfServiceMode == QualityOfServiceMode.DUPLICATES_OK) {
            // We client ack after sending
​
            // Note we could actually use Session.DUPS_OK_ACKNOWLEDGE here
            // For a slightly less strong delivery guarantee
​
            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Client acking source session");
            }
​
            messages.getLast().acknowledge();
​
            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Client acked source session");
            }
         }
      } catch (Exception e) {
         if (!stopping) {
            ActiveMQJMSBridgeLogger.LOGGER.bridgeAckError(e, bridgeName);
         }
​
         // We don't call failure otherwise failover would be broken with ActiveMQ
         // We let the ExceptionListener to deal with failures
​
         if (connectedSource) {
            try {
               sourceSession.recover();
            } catch (Throwable ignored) {
            }
         }
​
      } finally {
         // Clear the messages
         messages.clear();
​
      }
   }
​
   //......
}
  • JMSBridgeImpl的sendBatchNonTransacted方法在qualityOfServiceMode为ONCE_AND_ONLY_ONCE或者AT_MOST_ONCE且maxBatchSize大于1的时候先执行messages.getLast().acknowledge();之后使用一个while循环执行sendMessages,循环在没有TransactionRolledbackException异常时会终止;最后在qualityOfServiceMode为DUPLICATES_OK的时候执行messages.getLast().acknowledge()

小结

QualityOfServiceMode定义了三个枚举值,分别是AT_MOST_ONCE、DUPLICATES_OK、ONCE_AND_ONLY_ONCE;JMSBridgeImpl的sendBatchNonTransacted方法在qualityOfServiceMode为ONCE_AND_ONLY_ONCE或者AT_MOST_ONCE(且maxBatchSize大于1)的时候在sendMessages之前先执行ack(如果异常在ack与sendMessages之间,则消息可能丢失;由于ONCE_AND_ONLY_ONCE需要local transaction或者JTA处理,在没有事务情况下与AT_MOST_ONCE相同);而对于qualityOfServiceMode为DUPLICATES_OK的在sendMessages之后执行ack(如果异常在sendMessages与ack之间,则异常之后,client端由于没有收到ack会再次发送消息,可能造成重复)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • QualityOfServiceMode
  • sendBatchNonTransacted
  • 小结
  • doc
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档