我面临一个问题,在hadoop/纱线集群上运行spark -它在本地模式下运行良好,但是在集群模式下由于这个空指针异常而失败--我在本地和集群中都使用了SLAS1.6.2和scala 2.10.6,这个应用程序是一个流应用程序,来自kakfa的流数据,下面是我获得空指针的代码,我可以获得一些批的数据,但是对于某些批,我得到了空指针,因为空指针堆积了,作业失败了--这里是它失败DevMain.scala的代码片段
Line 1 val lines: DStream[String,Array[Byte]] = myConsumer.createDefaultStream()
Line 2 val keyDeLines = lines.map(lme.aParser);
这是createDefaultStream()
def createDefaultStream(): DStream[(String,Array[Byte])] = {
val consumerConfProps = List("zookeeper.connect","group.id","zookeeper.connection.timeout.ms")
val kafkaConf = Utils.getSubProps(props,consumerConfProps)
val topicArray = props.getProperty("topics").split(",")
val topicMap = {
topicArray.map((_, props.getProperty("numthreads").toInt)).toMap
}
KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
kafkaConf,
topicMap,
StorageLevel.MEMORY_ONLY_SER
)
我是lme.parser
def aParser(x: (String,Array[Byte])): Option[Map[String,Any]] = {
logInfo("Entered lme: ")
val decodeTry = Injection.invert(x._2)
decodeTry match {
case Failure(e) => {
logInfo(s"Could not decode binary data: " + e.getStackTrace)
None
}
case Success(eventPojo) => {
val bs: String = eventPojo.toString
logInfo("json: " + bs)
}
}
在我将日志记录到line1 of lme.parser的空指针情况下,代码永远不会输入‘lme.parser’函数
这是堆叠痕迹
java.lang.NullPointerException
at DevMain$$anonfun$5.apply(DevMain.scala:2)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我刚开始在集群上运行星星之火,请给我指出正确的方向。
注意:我知道在地图中它试图在dstream行的元素上迭代--当空的时候它失败了--但是从我在d蒸汽空批上所做的读取来看,不应该导致失败--请纠正我,如果我错了,请纠正我,如果我做错了,我在这方面做了一些研究,有些人指出,在火花代码中,从java迭代器转换到scala迭代器是失败的,另一些人已经指出这可能是火花..not序列化代码中的一个错误,确定该怎么做。
发布于 2016-12-22 22:54:26
我的看法是,它是因为数据loss.The生产者发送消息和消费者接收消息,但是当网络流量或内存中的数据不齐平时,数据可能是loss.you可以在一个主题逐个主题的基础上设置这个复制因子,另一个原因也会导致数据丢失。
https://stackoverflow.com/questions/41295916
复制