首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的CriticalAnalyzerPolicy

聊聊artemis的CriticalAnalyzerPolicy

原创
作者头像
code4it
修改2020-02-11 11:22:11
2620
修改2020-02-11 11:22:11
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的CriticalAnalyzerPolicy

CriticalAnalyzerPolicy

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerPolicy.java

public enum CriticalAnalyzerPolicy {
   HALT, SHUTDOWN, LOG;
​
   static {
      // for URI support on ClusterConnection
      BeanSupport.registerConverter(new CriticalAnalyzerPolicyConverter(), CriticalAnalyzerPolicy.class);
   }
​
   static class CriticalAnalyzerPolicyConverter implements Converter {
​
      @Override
      public <T> T convert(Class<T> type, Object value) {
         return type.cast(CriticalAnalyzerPolicy.valueOf(value.toString()));
      }
   }
​
}
  • CriticalAnalyzerPolicy定义了HALT, SHUTDOWN, LOG三个枚举值

initializeCriticalAnalyzer

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

public class ActiveMQServerImpl implements ActiveMQServer {
​
   //......
​
   private void initializeCriticalAnalyzer() throws Exception {
​
      // Some tests will play crazy frequenceistop/start
      CriticalAnalyzer analyzer = this.getCriticalAnalyzer();
      if (analyzer == null) {
         if (configuration.isCriticalAnalyzer()) {
            // this will have its own ScheduledPool
            analyzer = new CriticalAnalyzerImpl();
         } else {
            analyzer = EmptyCriticalAnalyzer.getInstance();
         }
​
         this.analyzer = analyzer;
      }
​
      /* Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
      analyzer.clear();
​
      analyzer.setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
​
      if (configuration.isCriticalAnalyzer()) {
         analyzer.start();
      }
​
      CriticalAction criticalAction = null;
      final CriticalAnalyzerPolicy criticalAnalyzerPolicy = configuration.getCriticalAnalyzerPolicy();
      switch (criticalAnalyzerPolicy) {
​
         case HALT:
            criticalAction = criticalComponent -> {
​
               ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent);
​
               threadDump();
               sendCriticalNotification(criticalComponent);
​
               Runtime.getRuntime().halt(70); // Linux systems will have /usr/include/sysexits.h showing 70 as internal software error
​
            };
            break;
         case SHUTDOWN:
            criticalAction = criticalComponent -> {
​
               ActiveMQServerLogger.LOGGER.criticalSystemShutdown(criticalComponent);
​
               threadDump();
​
               // on the case of a critical failure, -1 cannot simply means forever.
               // in case graceful is -1, we will set it to 30 seconds
               sendCriticalNotification(criticalComponent);
​
               // you can't stop from the check thread,
               // nor can use an executor
               Thread stopThread = new Thread() {
                  @Override
                  public void run() {
                     try {
                        ActiveMQServerImpl.this.stop();
                     } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                     }
                  }
               };
               stopThread.start();
            };
            break;
         case LOG:
            criticalAction = criticalComponent -> {
               ActiveMQServerLogger.LOGGER.criticalSystemLog(criticalComponent);
               threadDump();
               sendCriticalNotification(criticalComponent);
            };
            break;
      }
​
      analyzer.addAction(criticalAction);
   }
​
   //......
}
  • initializeCriticalAnalyzer方法先获取CriticalAnalyzer,若为null则创建一个,其中configuration.isCriticalAnalyzer()为true时创建的是CriticalAnalyzerImpl,否则创建的是EmptyCriticalAnalyzer.getInstance();然后执行clear、设置checkTime,之后根据不同的criticalAnalyzerPolicy创建不同的criticalAction添加到analyzer;不同的criticalAnalyzerPolicy均会执行ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent)、threadDump()以及sendCriticalNotification(criticalComponent),不同的是HALT还执行Runtime.getRuntime().halt(70),SHUTDOWN还执行ActiveMQServerImpl.this.stop(),而LOG没有额外其他操作

CriticalAnalyzer

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java

public interface CriticalAnalyzer extends ActiveMQComponent {
​
   default void clear() {
   }
​
   default int getNumberOfComponents() {
      return 0;
   }
​
   boolean isMeasuring();
​
   void add(CriticalComponent component);
​
   void remove(CriticalComponent component);
​
   CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit);
​
   long getCheckTimeNanoSeconds();
​
   CriticalAnalyzer setTimeout(long timeout, TimeUnit unit);
​
   long getTimeout(TimeUnit unit);
​
   long getTimeoutNanoSeconds();
​
   CriticalAnalyzer addAction(CriticalAction action);
​
   void check();
}
  • CriticalAnalyzer接口定义了setCheckTime、addAction、check等方法

EmptyCriticalAnalyzer

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java

public class EmptyCriticalAnalyzer implements CriticalAnalyzer {
​
   private static final EmptyCriticalAnalyzer instance = new EmptyCriticalAnalyzer();
​
   public static EmptyCriticalAnalyzer getInstance() {
      return instance;
   }
​
   private EmptyCriticalAnalyzer() {
   }
​
   @Override
   public void add(CriticalComponent component) {
​
   }
​
   @Override
   public void remove(CriticalComponent component) {
​
   }
​
   @Override
   public boolean isMeasuring() {
      return false;
   }
​
   @Override
   public void start() throws Exception {
​
   }
​
   @Override
   public void stop() throws Exception {
​
   }
​
   @Override
   public long getTimeoutNanoSeconds() {
      return 0;
   }
​
   @Override
   public boolean isStarted() {
      return false;
   }
​
   @Override
   public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
      return this;
   }
​
   @Override
   public long getCheckTimeNanoSeconds() {
      return 0;
   }
​
   @Override
   public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
      return this;
   }
​
   @Override
   public long getTimeout(TimeUnit unit) {
      return 0;
   }
​
   @Override
   public CriticalAnalyzer addAction(CriticalAction action) {
      return this;
   }
​
   @Override
   public void check() {
​
   }
}
  • EmptyCriticalAnalyzer实现了CriticalAnalyzer接口,其方法都是空操作

CriticalAnalyzerImpl

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java

public class CriticalAnalyzerImpl implements CriticalAnalyzer {
​
   private final Logger logger = Logger.getLogger(CriticalAnalyzer.class);
​
   private volatile long timeoutNanoSeconds;
​
   // one minute by default.. the server will change it for sure
   private volatile long checkTimeNanoSeconds = TimeUnit.SECONDS.toNanos(60);
​
   private final ActiveMQScheduledComponent scheduledComponent;
​
   private final AtomicBoolean running = new AtomicBoolean(false);
​
   public CriticalAnalyzerImpl() {
      // this will make the scheduled component to start its own pool
​
      /* Important: The scheduled component should have its own thread pool...
       *  otherwise in case of a deadlock, or a starvation of the server the analyzer won't pick up any
       *  issues and won't be able to shutdown the server or halt the VM
       */
      this.scheduledComponent = new ActiveMQScheduledComponent(null, null, checkTimeNanoSeconds, TimeUnit.NANOSECONDS, false) {
         @Override
         public void run() {
            logger.trace("Checking critical analyzer");
            check();
         }
      };
​
   }
​
   @Override
   public void clear() {
      actions.clear();
      components.clear();
   }
​
   private CopyOnWriteArrayList<CriticalAction> actions = new CopyOnWriteArrayList<>();
​
   private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>();
​
   @Override
   public int getNumberOfComponents() {
      return components.size();
   }
​
   @Override
   public boolean isMeasuring() {
      return true;
   }
​
   @Override
   public void add(CriticalComponent component) {
      components.add(component);
   }
​
   @Override
   public void remove(CriticalComponent component) {
      components.remove(component);
   }
​
   @Override
   public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
      this.checkTimeNanoSeconds = unit.toNanos(timeout);
      this.scheduledComponent.setPeriod(timeout, unit);
      return this;
   }
​
   @Override
   public long getCheckTimeNanoSeconds() {
      if (checkTimeNanoSeconds == 0) {
         checkTimeNanoSeconds = getTimeout(TimeUnit.NANOSECONDS) / 2;
      }
      return checkTimeNanoSeconds;
   }
​
   @Override
   public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
      if (checkTimeNanoSeconds <= 0) {
         this.setCheckTime(timeout / 2, unit);
      }
      this.timeoutNanoSeconds = unit.toNanos(timeout);
      return this;
   }
​
   @Override
   public long getTimeout(TimeUnit unit) {
      if (timeoutNanoSeconds == 0) {
         timeoutNanoSeconds = TimeUnit.MINUTES.toNanos(2);
      }
      return unit.convert(timeoutNanoSeconds, TimeUnit.NANOSECONDS);
   }
​
   @Override
   public long getTimeoutNanoSeconds() {
      return timeoutNanoSeconds;
   }
​
   @Override
   public CriticalAnalyzer addAction(CriticalAction action) {
      this.actions.add(action);
      return this;
   }
​
   @Override
   public void check() {
      boolean retry = true;
      while (retry) {
         try {
            for (CriticalComponent component : components) {
​
               if (component.isExpired(timeoutNanoSeconds)) {
                  fireAction(component);
                  // no need to keep running if there's already a component failed
                  return;
               }
            }
            retry = false; // got to the end of the list, no need to retry
         } catch (ConcurrentModificationException dontCare) {
            // lets retry on the loop
         }
      }
   }
​
   private void fireAction(CriticalComponent component) {
      for (CriticalAction action : actions) {
         try {
            action.run(component);
         } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
         }
      }
​
      actions.clear();
   }
​
   @Override
   public void start() {
      scheduledComponent.start();
​
   }
​
   @Override
   public void stop() {
      scheduledComponent.stop();
   }
​
   @Override
   public boolean isStarted() {
      return scheduledComponent.isStarted();
   }
}
  • CriticalAnalyzerImpl的构造器会创建ActiveMQScheduledComponent调度执行check方法;clear方法会清空actions及components,setCheckTime方法会更新checkTimeNanoSeconds及scheduledComponent.setPeriod;check方法会挨个遍历components,判断component.isExpired(timeoutNanoSeconds),若为true则执行fireAction(component)并返回;fireAction方法则遍历actions,挨个执行action.run(component),最后清空actions

小结

CriticalAnalyzerPolicy定义了HALT, SHUTDOWN, LOG三个枚举值;ActiveMQServerImpl的initializeCriticalAnalyzer方法先获取CriticalAnalyzer,若为null则创建一个,其中configuration.isCriticalAnalyzer()为true时创建的是CriticalAnalyzerImpl,否则创建的是EmptyCriticalAnalyzer.getInstance();然后执行clear、设置checkTime,之后根据不同的criticalAnalyzerPolicy创建不同的criticalAction添加到analyzer;不同的criticalAnalyzerPolicy均会执行ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent)、threadDump()以及sendCriticalNotification(criticalComponent),不同的是HALT还执行Runtime.getRuntime().halt(70),SHUTDOWN还执行ActiveMQServerImpl.this.stop(),而LOG没有额外其他操作

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CriticalAnalyzerPolicy
  • initializeCriticalAnalyzer
  • CriticalAnalyzer
    • EmptyCriticalAnalyzer
      • CriticalAnalyzerImpl
      • 小结
      • doc
      相关产品与服务
      日志服务
      日志服务(Cloud Log Service,CLS)是腾讯云提供的一站式日志服务平台,提供了从日志采集、日志存储到日志检索,图表分析、监控告警、日志投递等多项服务,协助用户通过日志来解决业务运维、服务监控、日志审计等场景问题。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档