前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ZooKeeper节点数据量限制引起的Hadoop YARN ResourceManager崩溃原因分析(三)

ZooKeeper节点数据量限制引起的Hadoop YARN ResourceManager崩溃原因分析(三)

作者头像
九州暮云
发布2020-05-18 16:35:21
2.1K1
发布2020-05-18 16:35:21
举报
文章被收录于专栏:九州牧云九州牧云

这个问题又让我们碰到了,发生次数不频繁但是一旦发生就会造成ResourceManager服务崩溃、ZK注册watch过多等问题。不彻底解决这个问题心中一直是个梗,所以基于前两次的分析和阅读社区最新版Hadoop 3.2.1代码之后,给生产环境YARNpatch最终解决这个问题。对于疑难问题,每遇到一次就有一次不同的感悟,接下来是我本次分析和解决该问题的过程记录。前两次解决和分析该问题的记录如下:

环境

  • Hadoop版本:Apache Hadoop 2.6.3
  • ZooKeeper版本:ZooKeeper 3.4.10
  • 两个ResourceManager节点:主节点RM01,从节点RM02

问题原因

这个问题很难复现,前两次一直没找到产生该问题的原因,打了patch之后,我们在日志中发现,产生该问题主要是由于部分异常任务导致的,日志如下:

2020-04-28 10:05:54 INFO  org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore:768 - Application update attemptState data size for /rmstore/ZKRMStateRoot/RMAppRoot/application_1587969707206_16259/appattempt_1587969707206_16259_000001 is 20266528. Exceed the maximum allowed 3145728 size. ApplicationAttemptState info: ApplicationAttemptState{attemptId=appattempt_1587969707206_16259_000001, diagnostics='User class threw exception: java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 2.0 failed 4 times, most recent failure: Lost task 15.3 in stage 2.0 (TID 4224, datanode44.bi): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.immutable.Set hset;
/* 009 */   private boolean hasNull;
/* 010 */   private UnsafeRow result;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;

当任务出现异常时,YARN会保存任务的异常信息,当异常信息很多时,YARNZK保存任务状态的数据量就会超过ZK的限制。从日志中可以看出,出现异常的Spark任务状态数据是20266528字节,也就是19MB,远远超过了我们所设置的3MB。在YARN监控界面上可以看到该任务的异常信息有20万行:

解决方案

由于有了前两次发现和解决问题以及源码理解的经验,所以这次解决问题就顺手的多,去年八月份解决该问题的最终方案是调整ZK服务端和YARN客户端的jute.maxbuffer参数值为3MB,也就是调整ZK中每个ZNode能保存的最大数据量为3MB。但是这样的方案有以下明显的缺点:

  • 使ZK中保存的数据量比较大,导致ZK JVM内存紧张,极端情况下会使ZK OOM,同时也会影响ZK数据读写、数据同步以及持久化效率
  • jute.maxbuffer属于硬配置的方式,为了使配置生效,还需要重启ZK服务和客户端YARN RM服务,对ZK服务以及依赖ZK的服务运维成本比较大。由于当前我们生产环境YARN使用的这套ZK集群还管理HBase、流式计算任务的元数据,所以重启影响还是比较大的

可以看出,通过修改jute.maxbuffer方式虽然也解决了问题,但是会对ZK服务和依赖ZK的服务有影响,运维成本也比较高。于是通过追踪社区issue和阅读Hadoop 3.2.1源码,我们采取通过在yarn-site.xml增加yarn.resourcemanager.zk-max-znode-size.bytes配置的方式来解决YARNZK写数据量超过ZK限制的问题,该配置是在Hadoop 2.9.0版本加入的。使用这种方式,我们不需要修改ZK服务端的配置,而只需要修改YARN服务端的配置并重启YARN就能限制YARNZK写入的数据量,而且也提高了ZK服务的可用性。打了patch后的代码逻辑超过数据量限制的任务状态数据直接被丢弃,并打印log日志,方便日后问题追溯。打了patch后的ZKRMStateStore主要代码如下(由于篇幅原因,其余代码省略):

public class ZKRMStateStore extends RMStateStore{
    
    private int zknodeLimit; // 保存ZNode节点数据量限制
    
     @Override
    public synchronized void initInternal(Configuration conf) throws Exception {
        // 其余部分省略
        // 获取yarn-site.xml中yarn.resourcemanager.zk-max-znode-size.bytes的值
        zknodeLimit = conf.getInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES,
                YarnConfiguration.DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES);
    }
    
    @Override
    public synchronized void updateApplicationAttemptStateInternal(
            ApplicationAttemptId appAttemptId,
            ApplicationAttemptStateData attemptStateDataPB)
            throws Exception {
        String appIdStr = appAttemptId.getApplicationId().toString();
        String appAttemptIdStr = appAttemptId.toString();
        String appDirPath = getNodePath(rmAppRoot, appIdStr);
        String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
                    + " at: " + nodeUpdatePath);
        }
        byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();

        ApplicationAttemptState attemptState = getApplicationAttemptState(appAttemptId, attemptStateDataPB);
        // 判断要写入的任务尝试数据信息是否超过zknodeLimit变量的值,如果没有,就执行任务尝试数据更新操作。否则,只打印info信息,不执行任务尝试数据更新操作
        if (attemptStateData.length <= zknodeLimit) {
            if (existsWithRetries(nodeUpdatePath, true) != null) {
                setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
            } else {
                createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
                        CreateMode.PERSISTENT);
                LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
                        + " update the application attempt state.");
            }
            LOG.info("Application update attemptState data size for " + nodeUpdatePath + " is "
                    + attemptStateData.length + ". The maximum allowed " + zknodeLimit + " size. ApplicationAttemptState info: " + attemptState.toString() + ". AppAttemptTokens length:" + attemptStateDataPB.getAppAttemptTokens().array().length
                    + ". See yarn.resourcemanager.zk-max-znode-size.bytes.");
        } else {
            LOG.info("Application update attemptState data size for " + nodeUpdatePath + " is "
                    + attemptStateData.length + ". Exceed the maximum allowed " + zknodeLimit + " size. ApplicationAttemptState info: " + attemptState.toString() + ". AppAttemptTokens length:" + attemptStateDataPB.getAppAttemptTokens().array().length
                    + ". See yarn.resourcemanager.zk-max-znode-size.bytes.");
        }
    }
}

问题总结

1、YARN使用ZK来实现故障状态恢复,这里的修改会不会影响正常任务的执行和状态恢复?

不会。经过线上一段时间的运行和我们使用zkdoctor监控的数据发现,YARN存储在ZK中的正常任务的状态数据一般不会超过512K,只有部分异常任务的异常信息数据会特别大,这个异常信息数据是引起YARNZK写数据量超过限制的根本原因。

YARN将共享状态存储系统定义成一个RMStateStore抽象类,以保存ResourceManager故障恢复后所必需的状态信息,这些信息都是一些基本数据类型的信息,没有特别复杂的数据类型,比如字节数组。ResourceManager也不会保存已经分配给每个ApplicationMaster的资源信息和每个NodeManager的资源使用信息,这些均可通过相应的心跳汇报机制重构出来。因此,ResourceManagerHA实现是非常轻量级的。涉及到任务状态的主要类如下:

(1)Application状态信息ApplicationState

 /**
   * State of an application application
   * 任务状态信息类
   */
  public static class ApplicationState {
    final ApplicationSubmissionContext context; // 任务描述信息content
    final long submitTime; // 任务提交时间
    final long startTime; // 任务开始时间
    final String user; // 任务提交人
    Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
                  new HashMap<ApplicationAttemptId, ApplicationAttemptState>(); // 任务重试信息
    // fields set when application completes.
    RMAppState state; // 任务运行状态
    String diagnostics; // 任务异常诊断信息
    long finishTime; // 任务完成时间
    
    // 省略其他代码
  }

(2)Application对应的每个ApplicationAttempt信息ApplicationAttemptState

 /**
   * State of an application attempt
   * 任务尝试状态信息类
   */
  public static class ApplicationAttemptState {
    final ApplicationAttemptId attemptId; // 任务尝试ID
    final Container masterContainer; // 所在container的信息
    final Credentials appAttemptCredentials; // 安全token
    long startTime = 0; // 开始时间
    long finishTime = 0; // 结束时间
    // fields set when attempt completes
    RMAppAttemptState state; // 运行状态
    String finalTrackingUrl = "N/A"; // 任务运行日志查看地址
    String diagnostics; // 任务异常诊断信息
    int exitStatus = ContainerExitStatus.INVALID; // 任务退出状态
    FinalApplicationStatus amUnregisteredFinalStatus; // 任务最终状态
    long memorySeconds; // 任务消耗的内存总资源
    long vcoreSeconds; // 任务消耗的CPU总资源
    
    // 省略其他代码
  }

(3)安全令牌相关信息RMDTSecretManagerState

 /**
   * 安全令牌信息
   */
  public static class RMDTSecretManagerState {
    // DTIdentifier -> renewDate
    Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
        new HashMap<RMDelegationTokenIdentifier, Long>(); // 授权令牌状态
    Set<DelegationKey> masterKeyState =
        new HashSet<DelegationKey>(); // master key状态
    int dtSequenceNumber = 0; // 序列号
    
    // 省略其他代码
 }

2、YARN出现异常时为什么会导致ZK中注册很多的watch?

YARN出现异常会进行故障转移,故障转移到standby节点,standby节点会调用RMStateloadState方法进行任务状态数据的恢复,loadState会调用ZKRMStateStoreloadRMAppState方法读取在ZK中保存的任务状态数据,在调用ZKgetData方法时会给任务状态节点和任务尝试状态节点注册watch,以监听任务状态的变化。由于任务状态节点和任务尝试状态节点是持久节点,不会因为ZK客户端连接失效而删除,且是一对多的关系,因此会导致watch数量很多。以下是加载任务状态的相关代码:

 private synchronized void loadRMAppState(RMState rmState) throws Exception {
        // 当/rmstore/ZKRMStateRoot/RMAppRoot/节点及其子节点被删除或创建时,watch被触发
        List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
        for (String childNodeName : childNodes) {
            String childNodePath = getNodePath(rmAppRoot, childNodeName);
            // 获取任务节点数据并注册watch,该watch当任务节点被删除或数据被更新时触发
            byte[] childData = getDataWithRetries(childNodePath, true);
            if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
                // application
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Loading application from znode: " + childNodeName);
                }
                ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
                ApplicationStateDataPBImpl appStateData =
                        new ApplicationStateDataPBImpl(
                                ApplicationStateDataProto.parseFrom(childData));
                // 获取任务数据
                ApplicationState appState =
                        new ApplicationState(appStateData.getSubmitTime(),
                                appStateData.getStartTime(),
                                appStateData.getApplicationSubmissionContext(),
                                appStateData.getUser(),
                                appStateData.getState(),
                                appStateData.getDiagnostics(), appStateData.getFinishTime());
                if (!appId.equals(appState.context.getApplicationId())) {
                    throw new YarnRuntimeException("The child node name is different " +
                            "from the application id");
                }
                rmState.appState.put(appId, appState);
                // 获取任务重试数据
                loadApplicationAttemptState(appState, appId);
            } else {
                LOG.info("Unknown child node with name: " + childNodeName);
            }
        }
    }

我们生产环境设置在ZK中保存2万个任务状态信息,发生问题时监控发现YARNZK注册了10几万的watch。由于ZKwatch信息是用HashMapkeyZNode节点的pathvalue是注册在ZNode上的watch集合)保存的,因此大量的watch会使这个HashMap成为JVM中的一个大对象,这个大对象会一直保存在ZK的服务器端不会被回收,直到YARN被动删除或者更新任务状态数据时才会移除相应节点的watchZK服务端保存watch信息的HashMap的元素数量才会相应减少。这是一个比较缓慢的过程,在这个过程中,ZK很可能因为JVM GC问题响应缓慢甚至出现OOM。去年就由于YARN出现问题往ZK注册很多watch导致ZK OOM,继而影响到依赖ZKHBase服务出现异常。因此,我们在打patch的基础上,将YARN迁移到一套独立的ZK集群,这套ZK集群只为YARN服务,从而提高大数据基础服务的可用性。

我们监控和统计发现,正常情况下,YARNZK中注册的watch很少,基本上都是运行时的任务状态数据节点的watch,因此不会对ZK产生太大压力。

3、YARN向ZK写任务状态异常为什么会触发YARN故障转移?

ZKRMStateStoreZK交互的方法里,都会调用ZKRMStateStore.ZKAction类的runWithRetries方法进行重试,正常情况下不需要重试。如果发生异常才会触发重试逻辑,默认重试1000次,当重试1000次之后,会使用throw方式给上层调用者抛出异常,凡是以下方法都有可能抛出异常:

异常会被RMStateStorenotifyStoreOperationFailed方法捕捉到,该方法很简单,主要进行以下逻辑判断:

  • 如果YARN开启了HA,则触发故障转移操作
  • 如果没有开启HA,则判断YARN是否开启了快速失败特性,则触发RMFatalEventType.STATE_STORE_OP_FAILED事件,退出进程
  • 如果以上两个条件都不满足,则打印warn信息

该方法具体代码如下:

 /**
   * 该方法通知RM存储操作失败,参数是引起操作失败的异常信息
   * This method is called to notify the ResourceManager that the store
   * operation has failed.
   * @param failureCause the exception due to which the operation failed
   */
  protected void notifyStoreOperationFailed(Exception failureCause) {
    LOG.error("State store operation failed ", failureCause);
    // 如果开启了HA,则执行故障转移操作
    if (HAUtil.isHAEnabled(getConfig())) {
      LOG.warn("State-store fenced ! Transitioning RM to standby");
      Thread standByTransitionThread =
          new Thread(new StandByTransitionThread());
      standByTransitionThread.setName("StandByTransitionThread Handler");
      standByTransitionThread.start();
    } else if (YarnConfiguration.shouldRMFailFast(getConfig())) { // 如果没有开启HA,则判断有没有开启快速失败
      LOG.fatal("Fail RM now due to state-store error!");
      rmDispatcher.getEventHandler().handle(
          new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED,
              failureCause));
    } else { // 否则,打印跳过存储异常警告信息
      LOG.warn("Skip the state-store error.");
    }
  }

参考资料

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境
  • 问题原因
  • 解决方案
  • 问题总结
  • 参考资料
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档