首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

生产环境踩坑系列::Hive on Spark的connection timeout 问题

起因

7/16 凌晨,钉钉突然收到了一条告警,一个公司所有业务部门的组织架构表的 ETL 过程中,数据推送到 DIM 层的过程中出现异常,导致任务失败。

因为这个数据会影响到第二天所有大数据组对外的应用服务中组织架构基础数据,当然,我们的 Pla-nB 也不是吃素的,一旦出现错误,后面的权限管理模块与网关会自动配合切换前一天的最后一次成功处理到 DIM 中的组织架构数据,只会影响到在前一天做过组织架构变化的同事在系统上的操作,但是这个影响数量是可控的,并且我们会也有所有组织架构变化的审计数据,如果第二天这个推数的 ETL 修复不完的话,我们会手动按照审计数据对这些用户先进行操作,保证线上的稳定性。

技术架构

  • 集群:CDH 256G/64C 计算物理集群 X 18 台
  • 调度:dolphin
  • 数据抽取:datax
  • DIM 层数据库:Doris
  • Hive 版本:2.1.1

告警

告警策略现在是有机器人去捕捉 dolphin 的告警邮件,发到钉钉群里,dolphin 其实是可以获取到异常的,需要进行一系列的开发,但是担心复杂的调度过程会有任务监控的遗漏,导致告警丢失,这样就是大问题,所以简单粗暴,机器人代替人来读取邮件并发送告警到钉钉,这样只关注这个幸福来敲门的小可爱即可。

集群 log

代码语言:javascript
复制
Log Type: stderr
Log Upload Time: Fri Jul 16 01:27:46 +0800 2021
Log Length: 10569
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/data7/yarn/nm/usercache/dolphinscheduler/filecache/8096/__spark_libs__6065796770539359217.zip/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]21/07/16 01:27:43 INFO util.SignalUtils: Registered signal handler for TERM21/07/16 01:27:43 INFO util.SignalUtils: Registered signal handler for HUP21/07/16 01:27:43 INFO util.SignalUtils: Registered signal handler for INT21/07/16 01:27:43 INFO spark.SecurityManager: Changing view acls to: yarn,dolphinscheduler21/07/16 01:27:43 INFO spark.SecurityManager: Changing modify acls to: yarn,dolphinscheduler21/07/16 01:27:43 INFO spark.SecurityManager: Changing view acls groups to: 21/07/16 01:27:43 INFO spark.SecurityManager: Changing modify acls groups to: 21/07/16 01:27:43 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, dolphinscheduler); groups with view permissions: Set(); users  with modify permissions: Set(yarn, dolphinscheduler); groups with modify permissions: Set()21/07/16 01:27:43 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1625364172078_3093_00000121/07/16 01:27:43 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread21/07/16 01:27:43 INFO yarn.ApplicationMaster: Waiting for spark context initialization...21/07/16 01:27:43 INFO client.RemoteDriver: Connecting to HiveServer2 address: hadoop-task-1.bigdata.xx.com:2417321/07/16 01:27:44 INFO conf.HiveConf: Found configuration file file:/data8/yarn/nm/usercache/dolphinscheduler/filecache/8097/__spark_conf__.zip/__hadoop_conf__/hive-site.xml21/07/16 01:27:44 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.ExecutionException: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: hadoop-task-1.bigdata.xx.com/10.25.15.104:24173java.util.concurrent.ExecutionException: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: hadoop-task-1.bigdata.XX.com/10.25.15.104:24173  at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)  at org.apache.hive.spark.client.RemoteDriver.<init>(RemoteDriver.java:155)  at org.apache.hive.spark.client.RemoteDriver.main(RemoteDriver.java:559)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.lang.reflect.Method.invoke(Method.java:498)  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: hadoop-task-1.bigdata.xx.com/10.25.15.104:24173  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)  at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)  at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)  at java.lang.Thread.run(Thread.java:748)Caused by: java.net.ConnectException: Connection refused  ... 10 more21/07/16 01:27:44 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: User class threw exception: java.util.concurrent.ExecutionException: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: hadoop-task-1.bigdata.dd.com/10.25.15.104:24173  at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)  at org.apache.hive.spark.client.RemoteDriver.<init>(RemoteDriver.java:155)  at org.apache.hive.spark.client.RemoteDriver.main(RemoteDriver.java:559)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.lang.reflect.Method.invoke(Method.java:498)  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: hadoop-task-1.bigdata.xx.com/10.25.15.104:24173  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)  at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)  at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)  at java.lang.Thread.run(Thread.java:748)Caused by: java.net.ConnectException: Connection refused  ... 10 more)21/07/16 01:27:44 ERROR yarn.ApplicationMaster: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult:   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)  at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:447)  at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:275)  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:805)  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:804)  at java.security.AccessController.doPrivileged(Native Method)  at javax.security.auth.Subject.doAs(Subject.java:422)  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)  at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:804)  at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)Caused by: java.util.concurrent.ExecutionException: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: hadoop-task-1.bigdata.xx.com/10.25.15.104:24173  at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)  at org.apache.hive.spark.client.RemoteDriver.<init>(RemoteDriver.java:155)  at org.apache.hive.spark.client.RemoteDriver.main(RemoteDriver.java:559)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.lang.reflect.Method.invoke(Method.java:498)  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: hadoop-task-1.bigdata.xx.com/10.25.15.104:24173  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)  at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)  at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)  at java.lang.Thread.run(Thread.java:748)Caused by: java.net.ConnectException: Connection refused  ... 10 more21/07/16 01:27:44 INFO yarn.ApplicationMaster: Deleting staging directory hdfs://lbc/user/dolphinscheduler/.sparkStaging/application_1625364172078_309321/07/16 01:27:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable21/07/16 01:27:44 INFO util.ShutdownHookManager: Shutdown hook called
*/

原因分析

运维角度

资源负载

运维同学的思路从来都是先看负载,其实这套 18 个计算节点的集群已经平稳运行一段时间了,当天晚上推送的这个时间段的任务并行度其实也很低,Yarn 的每个队列也都做了隔离,我们在 dolphin 上面的任务也通过直接抓 dolphin 的 Mysql 数据库直接获取到了所有的运行计划和实际执行计划,所以因为资源跑满导致的问题不太令人信服,并且也没有支持这个观点的证据。

网络

一般ConnectionTimeOut都会去网络同学那里打交换机的 log,一通筛查,但是其实如果不是严重的网络情况,这也是比较难以发现问题的,果然,网络同学的回复是一切正常。

开发角度

程序 Bug

这个 ETL 的过程运行了几个星期了,一直正常,我们从 dolphin 的邮件监控,钉钉机器人监控,任务的 footprint 监控等等都一直跟踪,程序 bug 的可能性不高。之所以没说一定不是程序 bug 的原因是,ETL 的过程本身从数据源的数据类型,数据集以及突发的一些变化可能会影响到后续的整体数据搬移的过程,也许是一些考不到的点在这个时间点突然间发力,导致问题,这样也需要对程序的健壮性进行增强。对程序进行了初步筛查,排除了程序的问题。

开源工具

一般程序也没有问题的话,就是一个很可怕的消息了。我们需要从 log 中排查开源工具的执行流,然后分析步骤,从出问题的地方开始分析导致问题发生的各种可能性,最主要的是,这个问题可能是一个无法重现的问题。一般如果分析到这里,就需要有对开源架构非常了解的同学或者是对开源框架运行原理相对熟悉的同学出手了,当然,也有那种从没有跟踪过这一块源代码的同学非常有兴趣也可以从头开始调查。本次需要从开源工具的架构来分析问题出在哪里了。

问题分析

从 log 上看,本次出问题的地方是 Hive on Spark 的运行过程中,HQL 已经变成了 Spark 任务,在 AM 中初始化了 Driver 的线程。关于 Driver 启动和 Executor 的关系我也想整理一套文章,有空发出来。

定位出问题的地方

最关键重要的两条 log 如下

代码语言:javascript
复制
21/07/16 01:27:43 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
21/07/16 01:27:43 INFO yarn.ApplicationMaster: Waiting for spark context initialization...

这两句的代码的来源分别是来自org.apache.spark.deploy.yarn.ApplicationMaster.scalarunDriver方法与startUserApplication方法

  • runDriver
代码语言:javascript
复制
     private def runDriver(): Unit = {
        addAmIpFilter(None)
        userClassThread = startUserApplication()

        // This a bit hacky, but we need to wait until the spark.driver.port property has
        // been set by the Thread executing the user class.
        logInfo("Waiting for spark context initialization...")
  • startUserApplication
代码语言:javascript
复制
    /**
       * Start the user class, which contains the spark driver, in a separate Thread.
       * If the main routine exits cleanly or exits with System.exit(N) for any N
       * we assume it was successful, for all other cases we assume failure.
       *
       * Returns the user thread that was started.
       */
      private def startUserApplication(): Thread = {
        logInfo("Starting the user application in a separate Thread")

这两个方法即 yarn-cluster 模式下启动用户提交的 spark 运行的 jar 文件的过程,在用户提交的代码中是原生应该是处理数据的代码,即各种算子的计算,根据 shuffle 算子进行 Stage 划分,遇到 Action 算子则进行任务提交等等。

分析可能的原因

解读问题

而就在 Driver 线程运行的过程中却有一行这样的错误:

代码语言:javascript
复制
21/07/16 01:27:43 INFO yarn.ApplicationMaster: Waiting for spark context initialization...
21/07/16 01:27:43 INFO client.RemoteDriver: Connecting to HiveServer2 address: hadoop-task-1.bigdata.xx.com:24173
21/07/16 01:27:44 INFO conf.HiveConf: Found configuration file file:/data8/yarn/nm/usercache/dolphinscheduler/filecache/8097/__spark_conf__.zip/__hadoop_conf__/hive-site.xml
21/07/16 01:27:44 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.ExecutionException: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: hadoop-task-1.bigdata.xx.com/10.25.15.104:24173
java.util.concurrent.ExecutionException: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: hadoop-task-1.bigdata.XX.com/10.25.15.104:24173

我分析问题从来都是有依据的从大到小的去有目的的收缩,并剔除不可能的选项,从可能的问题中精确最终答案。

从以上的 log 中可以分析出

  1. Driver 已经启动了
  2. 在 Driver 中进行了连接到 HiveServer2 的连接
  3. 而这个连接发生了 ConnectionTimeout 的错误

表象原因

从 log 中解读出来的错误就是,Driver 启动后,Driver 线程里面与 HiveServer2,也就是 Hive 的 Server 进行的连接,在连接的时候出现了 timeout,导致任务失败,到这里具体问题出在哪里就知道了,那么下一个问题就是 Why?

Hive on spark 是什么处理机制?为什么会在 Driver 线程中去连接 HiveServer2 的服务?这个处理过程因为什么会导致 timeout 呢?带着问题进行深入分析,只能去源代码中一探究竟

深入分析

Hive on Spark(下称 HOS)机制

Hive on Spark,即 Hive 的 SQL(HQL)的执行过程从默认的 MapReduce 变成 Spark 引擎来实现,利用 Spark 的速度优势与计算能力解决原生 MR 笨重的实现

Hive on Spark 的实现架构

这里需要一幅图(来源于网络,跟我我对源代码的解读,这个架构是正确的)

这个大的结构图先有个大体印象即可,后续分析每一块细节的时候再回头来理解会更简单

Hive on Spark 细节技术点

入口:SparkTask

可以理解成,HQL 提交在 Hive 的 client 端,提交 HQL 后,经过一系列的转换变成 spark 的任务,整体开始向 Spark 任务转换的起始位置就是SparkTask,至于从哪里如何调用到SparkTask的,我暂时还没有细致研究,后续有需要或者有小伙伴有兴趣我们一起探讨跟踪这部分的逻辑。

与上面的架构图呼应,整个对一个 HQL 任务的提交(不算后续的 Job 的监控)其实就是

  • Session 的创建
  • Session 的 submit

这两个步骤的调用大体流程

  • SparkSession 的获取的一系列调用过程
  • sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager)
  • sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
  • existingSession.open(conf);
  • hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf);
  • return new RemoteHiveSparkClient(hiveconf, sparkConf);
  • createRemoteClient();
  • remoteClient = SparkClientFactory.createClient(conf, hiveConf);
  • return new SparkClientImpl(server, sparkConf, hiveConf);
  • this.driverThread = startDriver(rpcServer, clientId, secret);
  • 扔 Driver 的 jar 到 spark 集群
  • spark-submit —class xxxx.jar ... 的处理
  • sparkSession.submit 的一系列调用过程
  • SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
  • return hiveSparkClient.execute(driverContext, sparkWork);
  • RemoteHiveSparkClient】 return submit(driverContext, sparkWork);
  • JobHandle<Serializable> jobHandle = remoteClient.submit(job);
  • SparkClientImpl】 return protocol.submit(job);
  • ClientProtocol】 final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));

初始化:SparkSession

众所周知,一个离线的 spark 任务是用户首先编写一个 User Class,然后达成 jar 包,把这个 jar 包投入到 spark 集群中即可,一般生产环境上,我们会使用—master yarn —deploy-mode cluster 的 yarn 的提交方式。

一直以来,我理解的 HOS 中提交一个 HQL 就是解析成一个 spark 的 job,提交到 spark 集群即可,但是这个 job 每次都是打成一个 jar 包,或者整体打成一个 jar 包来提交么?这块一直没有细致的研究,其实细想起来,每次打个包是个多么愚蠢的设计,看过 SparkSession 的实现后,可以理解到,HOS 本身的设计架构其实是这样的

  • 先回顾一下 spark 提交任务的简单模型
  • 加上 HOS 的过程则为

一个初始化 SparkSessin 的过程居然完成了提交一个 User Class 到 spark 集群的过程,而且这个过程其实非常的巧妙。

在 HiveServer2(HS2)与 spark 集群建立连接

因为 HQL 是提交到 HS2 的服务器,HS2 解析 HQL 并转换成为 sparkTask 并执行一系列的处理,如上图所示,在 HS2 中利用SparkClientFactory.initialize首先建立了一个 Netty 的 Server,,然后通过SparkClientFactory.createClient初始化了SparkClientImpl,并且在SparkClientImpl的构造函数中调用了startDriver方法,在这个方法中完成了spark-submit的操作,代码片段如下

代码语言:javascript
复制
if (sparkHome != null) {
        argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
      } else {
        LOG.info("No spark.home provided, calling SparkSubmit directly.");
        argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());

        if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) {
          String mem = conf.get("spark.driver.memory");
          if (mem != null) {
            argv.add("-Xms" + mem);
            argv.add("-Xmx" + mem);
          }

          String cp = conf.get("spark.driver.extraClassPath");
          if (cp != null) {
            argv.add("-classpath");
            argv.add(cp);
          }

可以看到拼装了一个带有bin/spark-submit的 cmd

回顾一下一般提交一个 User Class 中代码的形式

  • 一般提交的 User Class 的样子
  • 一般,我们提交到 spark 集群的 User Class(jar 文件)都是一段代码文件体,比如以下的代码片段,一段 wordcount。
  • 其中
  • reduceByKey是一个 Shuffle 算子,切分出来 2 个 stage
  • saveAsTextFile是一个 Action 算子,会提交整个 job
代码语言:javascript
复制
    object WordCount {
      def main(args: Array[String]): Unit = {        // code        val conf = new SparkConf().setAppName("WordCount")        val sc = new SparkContext()
        //read        val line: RDD[String] = sc.textFile(args(0))
        // flatmap        val words: RDD[String] = line.flatMap(_.split(" "))
        val wordAndOne = words.map((_, 1))
        val wordAndCount = wordAndOne.reduceByKey(_ + _)
        // save to hdfs        wordAndCount.saveAsTextFile(args(1))
        // close        sc.stop()
      }
    }

这个过程的精妙之处

  • 提交的 User Class(RemoteDriver.java)本身是一个 Netty 的 client
  • RemoteDriver 被条到 Spark 集群中,会启动一个 Netty client,去连接到 HS2 的 Netty Server,如图,这个 Netty Server 前述构建的时间点了
  • 提交的 HQL 即在 Netty Server 与 Netty Client(已经提交到 Spark 集群中的 RemoteDriver)的通信
  • 从这幅图可以看出(猜想)来,提交的 HQL 通过这两个 Netty 间的服务传递到 Spark 集群内部,从而实现在集群内的计算处理

巡查可疑的问题点

从上面的错误 log 中可以看出,就是在 Driver 线程中启动了 RemoteDriver 后,反向连接 HS2 造成了 timeout,也就是上图中的这个 Netty Rpc 连接过程中造成了 timeout,需要再细看一下这个过程是如何处理的

  • RemoteDriver 的代码细节
  • 在初始化的过程中,有这样的一段代码,就是在初始化 Netty 的 client,这里有一行注释非常的亮眼,其实这行注释已经提醒我们注意 time out,因为,本身 Rpc.createClient 返回的是一个 Promise,而这里又进一步进行了 get 的同步调用阻塞的获取到 clientRpc。在这个获取过程中如果控制不好却是容易造成 timeout
代码语言:javascript
复制
    // The RPC library takes care of timing out this.
        this.clientRpc = Rpc.createClient(mapConf, egroup, serverAddress, serverPort,
          clientId, secret, protocol).get();
        this.running = true;

  • RemoteDriver 中调用Rpc.createClient的代码细节
  • 我直接在代码中进行标注解释,这里,构建 client 的 bootstrap 的过程中,使用到了一个
  • int connectTimeoutMs = (int) rpcConf.getConnectTimeoutMs();
  • 这个来源的 timeout 常量
代码语言:javascript
复制
     public static Promise<Rpc> createClient(
        Map<String, String> config,
        final NioEventLoopGroup eloop,
        String host,
        int port,
        final String clientId,
        final String secret,
        final RpcDispatcher dispatcher) throws Exception {
      final RpcConfiguration rpcConf = new RpcConfiguration(config);

      // client端连接Netty server端的timeout时长
      int connectTimeoutMs = (int) rpcConf.getConnectTimeoutMs();

      final ChannelFuture cf = new Bootstrap()
          .group(eloop)
          .handler(new ChannelInboundHandlerAdapter() { })
          .channel(NioSocketChannel.class)
          .option(ChannelOption.SO_KEEPALIVE, true)

          // 在这里被设置
          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
          .connect(host, port);

      final Promise<Rpc> promise = eloop.next().newPromise();
      final AtomicReference<Rpc> rpc = new AtomicReference<Rpc>();

      // Set up a timeout to undo everything.
      final Runnable timeoutTask = new Runnable() {
        @Override
        public void run() {
          promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection."));
        }
      };
      final ScheduledFuture<?> timeoutFuture = eloop.schedule(timeoutTask,
          rpcConf.getServerConnectTimeoutMs(), TimeUnit.MILLISECONDS);

      // The channel listener instantiates the Rpc instance when the connection is established,
      // and initiates the SASL handshake.
      cf.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture cf) throws Exception {
          if (cf.isSuccess()) {
            SaslClientHandler saslHandler = new SaslClientHandler(rpcConf, clientId, promise,
              timeoutFuture, secret, dispatcher);
            Rpc rpc =createRpc(rpcConf, saslHandler, (SocketChannel) cf.channel(), eloop);
            saslHandler.rpc = rpc;
            saslHandler.sendHello(cf.channel());
          } else {
            promise.setFailure(cf.cause());
          }
        }
      });

      // Handle cancellation of the promise.
      promise.addListener(new GenericFutureListener<Promise<Rpc>>() {
        @Override
        public void operationComplete(Promise<Rpc> p) {
          if (p.isCancelled()) {
            cf.cancel(true);
          }
        }
      });

      return promise;
    }

  • 追踪这个可以的 timeout 时长
  • 可以看出,这个 timeout 时长的 default 值为 1000ms,一旦 client 端到 server 端连接超过 1s,则直接会出现 timeout 错误,也就是本文最初描述的 timeout
代码语言:javascript
复制
     long getConnectTimeoutMs() {
      String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname);
      return value != null ? Integer.parseInt(value) :DEFAULT_CONF.getTimeVar(
        HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    SPARK_RPC_CLIENT_CONNECT_TIMEOUT("hive.spark.client.connect.timeout",
          "1000ms", new TimeValidator(TimeUnit.MILLISECONDS),
          "Timeout for remote Spark driver in connecting back to Hive client."),

结案

如上文分析所示,在 Driver 线程中连接到 HS2 的 Server 的过程中,timeout 的常量被设置成了 1s,一旦超过 1s,则会出现 timeout 错误。这个 1s 本身设置的过短,很容易出现问题,所以提高这个 timeout 常量的设置即可解决问题,提高稳定性。

官方解答

其实这个问题,我已经最初搜索到了官方的一个 bug fix

参看:

https://issues.apache.org/jira/browse/HIVE-16794

https://issues.apache.org/jira/secure/attachment/12872466/HIVE-16794.patch

之所以没有一开始就按照这个 issue 修改或者做 Hive 升级是想详细的再研究一下这个问题的本质,以及 HOS 的基础原理

后续

HOS 的基础过程在这次 trouble shooting 中做了简单的回顾,后续会针对 RemoteDriver 是如何向 spark 提交 job 的,并且 job 又是如何从 HS2 的 Netty Server 端生成并传入到 RemoteDriver 的做详细的说明,

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/3fa3f101c3598ff4b5abd989e
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券