前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Tips3: 在Spark Streaming job中读取Kafka messages及其offsetRange

Spark Tips3: 在Spark Streaming job中读取Kafka messages及其offsetRange

作者头像
叶锦鲤
发布2018-03-15 10:47:50
1.5K0
发布2018-03-15 10:47:50
举报
文章被收录于专栏:悦思悦读悦思悦读悦思悦读

在Spark Streaming job中读取Kafka topic(s)中的messages时,有时我们会需要同步记录下每次读取的messages的offsetRange。要达到这一目的,下面这两段代码(代码1和代码2)都是正确的,而且是等价的。

代码1(正确):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.foreachRDD(

new Function<JavaPairRDD<String, String>, Void>() {

@Override

public Void call(JavaPairRDD<String, String> rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

JavaRDD<String> valueRDD = rdd.values();

long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代码2(正确):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

final AtomicReference<OffsetRange[]> offsetRanges=new AtomicReference();

lines = messages.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {

@Override

public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

offsetRanges.set(offsets);

return rdd;

}

}).map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

});

lines.foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

但是要注意,下面这两段代码(代码3和代码4)是错误的,它们都会抛出一个exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

代码3(错误):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {

@Override

public JavaRDD<String> call(JavaPairRDD<String, String> rdd) throws Exception {

return rdd.values();

}

}).foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代码4(错误):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

}).foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2015-11-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 智汇AI 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档