Spark Tips 1: RDD的collect action 不适用于单个element size过大的情况

collect是Spark RDD一个非常易用的action,通过collect可以轻易获得一个RDD当中所有的elements。当这些elements是String类型的时候,可以轻易将整个RDD转化成一个List<String>,简直不要太好用。

不过等一等,这么好用的action有一个弱点,它不适合size比较的element。举个例子来说吧。请看下面这段代码:

... ...

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

});

lines.foreachRDD(new Function<JavaRDD<String>, Void>(){

@Override

public Void call(JavaRDD<String> strJavaRDD) throws Exception {

List<String> messages = strJavaRDD.collect();

List<String> sizeStrs = new ArrayList<String>();

for (String message: messages) {

if (message== null)

continue;

String logStr = "message size is " + message.length();

strs.add(logStr);

}

saveToLog(outputLogPath, strs);

return null;

}

});

... ...

上述这段代码当Kafka中单个message(也就是)的size很小(比如200Bytes)的时候,运行得很好。可是当单个message size变大到一定程度(例如10MB),就会抛出以下异常:

sparkDriver-akka.actor.default-dispatcher-18 2015-10-15 21:52:28,606 ERROR JobSc

heduler - Error running job streaming job 1444971120000 ms.0

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 238.0 failed 4 times, most recent failure: Lost task 0.3 in stage 238.0 (TID421, 127.0.0.1): ExecutorLostFailure (executor 123 lost)

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1215)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1204)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1203)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at scala.Option.foreach(Option.scala:236)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1404)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1365)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

原因很简单,collect()无法handle“大数据”。对于10MB size这样的单条message。我们可以用下面这段代码替代上面最后一部分:

lines.foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> strJavaRDD) throws Exception {

JavaRDD<String> sizeRDD = strJavaRDD.map(new Function<String, String>() {

@Override

public String call(String message) throws Exception {

if (message == null)

return null;

String logStr = "Message size is " + message.length();

return logStr;

}

});

List<String> sizeStrs = sizeRDD.collect();

saveToLog(outputLogPat, sizeStrs);

return null;

}

});

原文发布于微信公众号 - 悦思悦读(yuesiyuedu)

原文发表时间:2015-10-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏cnblogs

Javascript的原型继承,说清楚

     一直以来对Javascript的原型、原型链、继承等东西都只是会用和了解,但没有深入去理解这门语言关于继承这方面的本质和特点。闲暇之余做的理解和总...

1809
来自专栏小古哥的博客园

JavaScript对象创建的九种方式

1、标准创建对象模式 1 var person = new Object(); 2 person.name = "Nicholas"; 3 person.age...

3027
来自专栏码匠的流水账

springboot2自定义statsd指标前缀

springboot2引入了micrometer,1.x版本的spring.metrics.export.statsd.prefix在2版本中已经被标记为废弃,...

1452
来自专栏悦思悦读

Spark Tips3: 在Spark Streaming job中读取Kafka messages及其offsetRange

在Spark Streaming job中读取Kafka topic(s)中的messages时,有时我们会需要同步记录下每次读取的messages的offse...

53312
来自专栏PPV课数据科学社区

【学习】七天搞定SAS(二):基本操作(判断、运算、基本函数)

SAS生成新变量 SAS支持基本的加减乘除,值得一提的是它的**代表指数,而不是^。 * Modify homegarden data set with ass...

4714
来自专栏菩提树下的杨过

无限级分类(非递归算法/存储过程版/GUID主键)完整数据库示例_(2)插入记录

-- ======================================== -- Author:  <杨俊明,jimmy.yang@cntvs.c...

2029
来自专栏听Allen瞎扯淡

Spark 的惰性运算

作者的意图很简单,就是将RDD中的数据转换为新的数据格式,并统计非法数据的个数。咋一看代码,似乎没有什么问题,可是,这段代码真的能得到正确的结果么?答案是否定的...

7171
来自专栏Hongten

python开发_calendar

如果你用过linux,你可能知道在linux下面的有一个强大的calendar功能,即日历

1312
来自专栏扎心了老铁

spark三种连接join

本文主要介绍spark join相关操作。 讲述spark连接相关的三个方法join,left-outer-join,right-outer-join,在这之前...

3758
来自专栏函数式编程语言及工具

SDP(7):Cassandra- Cassandra-Engine:Streaming

  akka在alpakka工具包里提供了对cassandra数据库的streaming功能。简单来讲就是用一个CQL-statement读取cassandra...

3436

扫码关注云+社区

领取腾讯云代金券