首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >org.apache.spark.SparkException:Job由于阶段故障而中止:java.lang.NullPointerException

org.apache.spark.SparkException:Job由于阶段故障而中止:java.lang.NullPointerException
EN

Stack Overflow用户
提问于 2016-12-23 05:54:44
回答 1查看 2.9K关注 0票数 1

我面临一个问题,在hadoop/纱线集群上运行spark -它在本地模式下运行良好,但是在集群模式下由于这个空指针异常而失败--我在本地和集群中都使用了SLAS1.6.2和scala 2.10.6,这个应用程序是一个流应用程序,来自kakfa的流数据,下面是我获得空指针的代码,我可以获得一些批的数据,但是对于某些批,我得到了空指针,因为空指针堆积了,作业失败了--这里是它失败DevMain.scala的代码片段

代码语言:javascript
运行
复制
 Line 1 val lines: DStream[String,Array[Byte]] = myConsumer.createDefaultStream()

 Line 2 val keyDeLines = lines.map(lme.aParser);

这是createDefaultStream()

代码语言:javascript
运行
复制
def createDefaultStream(): DStream[(String,Array[Byte])] = {

    val consumerConfProps = List("zookeeper.connect","group.id","zookeeper.connection.timeout.ms")
    val kafkaConf = Utils.getSubProps(props,consumerConfProps)
    val topicArray = props.getProperty("topics").split(",")
    val topicMap = {
      topicArray.map((_, props.getProperty("numthreads").toInt)).toMap
    }

    KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
      kafkaConf,
      topicMap,
      StorageLevel.MEMORY_ONLY_SER
    )

我是lme.parser

代码语言:javascript
运行
复制
def aParser(x: (String,Array[Byte])): Option[Map[String,Any]] = {

    logInfo("Entered lme: ")


    val decodeTry = Injection.invert(x._2)
    decodeTry match {
      case Failure(e) => {
        logInfo(s"Could not decode binary data: " + e.getStackTrace)
        None
      }
      case Success(eventPojo) => {

        val bs: String = eventPojo.toString

        logInfo("json: " + bs)
      }
}

在我将日志记录到line1 of lme.parser的空指针情况下,代码永远不会输入‘lme.parser’函数

这是堆叠痕迹

代码语言:javascript
运行
复制
java.lang.NullPointerException 
at DevMain$$anonfun$5.apply(DevMain.scala:2)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我刚开始在集群上运行星星之火,请给我指出正确的方向。

注意:我知道在地图中它试图在dstream行的元素上迭代--当空的时候它失败了--但是从我在d蒸汽空批上所做的读取来看,不应该导致失败--请纠正我,如果我错了,请纠正我,如果我做错了,我在这方面做了一些研究,有些人指出,在火花代码中,从java迭代器转换到scala迭代器是失败的,另一些人已经指出这可能是火花..not序列化代码中的一个错误,确定该怎么做。

EN

回答 1

Stack Overflow用户

发布于 2016-12-23 06:54:26

我的看法是,它是因为数据loss.The生产者发送消息和消费者接收消息,但是当网络流量或内存中的数据不齐平时,数据可能是loss.you可以在一个主题逐个主题的基础上设置这个复制因子,另一个原因也会导致数据丢失。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41295916

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档