前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的transactionTimeoutScanPeriod

聊聊artemis的transactionTimeoutScanPeriod

作者头像
code4it
发布2020-02-24 09:45:02
2800
发布2020-02-24 09:45:02
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的transactionTimeoutScanPeriod

transactionTimeoutScanPeriod

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java

代码语言:javascript
复制
public class ConfigurationImpl implements Configuration, Serializable {

   //......

   private long transactionTimeoutScanPeriod = ActiveMQDefaultConfiguration.getDefaultTransactionTimeoutScanPeriod();

   //......

   @Override
   public long getTransactionTimeoutScanPeriod() {
      return transactionTimeoutScanPeriod;
   }

   @Override
   public ConfigurationImpl setTransactionTimeoutScanPeriod(final long period) {
      transactionTimeoutScanPeriod = period;
      return this;
   }

   //......
}
  • ConfigurationImpl定义了transactionTimeoutScanPeriod属性,默认为1000

ActiveMQServerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

代码语言:javascript
复制
public class ActiveMQServerImpl implements ActiveMQServer {

   //......
    
   synchronized boolean initialisePart1(boolean scalingDown) throws Exception {
    
      //......

      resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);

       //......
   }

   //......
}
  • ActiveMQServerImpl的initialisePart1使用configuration.getTransactionTimeout()、configuration.getTransactionTimeoutScanPeriod()、scheduledPool创建了ResourceManagerImpl

ResourceManagerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java

代码语言:javascript
复制
public class ResourceManagerImpl implements ResourceManager {

   private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<>();

   private final List<HeuristicCompletionHolder> heuristicCompletions = new ArrayList<>();

   private final int defaultTimeoutSeconds;

   private boolean started = false;

   private TxTimeoutHandler task;

   private final long txTimeoutScanPeriod;

   private final ScheduledExecutorService scheduledThreadPool;

   public ResourceManagerImpl(final int defaultTimeoutSeconds,
                              final long txTimeoutScanPeriod,
                              final ScheduledExecutorService scheduledThreadPool) {
      this.defaultTimeoutSeconds = defaultTimeoutSeconds;
      this.txTimeoutScanPeriod = txTimeoutScanPeriod;
      this.scheduledThreadPool = scheduledThreadPool;
   }

   // ActiveMQComponent implementation

   @Override
   public int size() {
      return transactions.size();
   }

   @Override
   public void start() throws Exception {
      if (started) {
         return;
      }
      task = new TxTimeoutHandler();
      Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task, txTimeoutScanPeriod, txTimeoutScanPeriod, TimeUnit.MILLISECONDS);
      task.setFuture(future);

      started = true;
   }

   @Override
   public void stop() throws Exception {
      if (!started) {
         return;
      }
      if (task != null) {
         task.close();
      }

      started = false;
   }

   //......
}
  • ResourceManagerImpl实现了ResourceManager接口,其start方法创建了TxTimeoutHandler,并以txTimeoutScanPeriod的fixedRate去调度执行

TxTimeoutHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java

代码语言:javascript
复制
   private class TxTimeoutHandler implements Runnable {

      private boolean closed = false;

      private Future<?> future;

      @Override
      public void run() {
         if (closed) {
            return;
         }

         Set<Transaction> timedoutTransactions = new HashSet<>();

         long now = System.currentTimeMillis();

         for (Transaction tx : transactions.values()) {

            if (tx.hasTimedOut(now, defaultTimeoutSeconds)) {
               Transaction removedTX = removeTransaction(tx.getXid());
               if (removedTX != null) {
                  ActiveMQServerLogger.LOGGER.timedOutXID(removedTX.getXid());
                  timedoutTransactions.add(removedTX);
               }
            }
         }

         for (Transaction failedTransaction : timedoutTransactions) {
            try {
               failedTransaction.rollback();
            } catch (Exception e) {
               ActiveMQServerLogger.LOGGER.errorTimingOutTX(e, failedTransaction.getXid());
            }
         }
      }

      synchronized void setFuture(final Future<?> future) {
         this.future = future;
      }

      void close() {
         if (future != null) {
            future.cancel(false);
         }

         closed = true;
      }

   }
  • TxTimeoutHandler实现了Runnable接口,其run方法会遍历transactions,挨个执行tx.hasTimedOut(now, defaultTimeoutSeconds),对于timeout的则执行removeTransaction(tx.getXid()),之后挨个执行rollback

小结

ActiveMQServerImpl的initialisePart1使用configuration.getTransactionTimeout()、configuration.getTransactionTimeoutScanPeriod()、scheduledPool创建了ResourceManagerImpl;ResourceManagerImpl实现了ResourceManager接口,其start方法创建了TxTimeoutHandler,并以txTimeoutScanPeriod的fixedRate去调度执行;TxTimeoutHandler实现了Runnable接口,其run方法会遍历transactions,挨个执行tx.hasTimedOut(now, defaultTimeoutSeconds),对于timeout的则执行removeTransaction(tx.getXid()),之后挨个执行rollback

doc

  • ResourceManagerImpl
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • transactionTimeoutScanPeriod
  • ActiveMQServerImpl
  • ResourceManagerImpl
  • TxTimeoutHandler
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档