首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spark Tips3: 在Spark Streaming job中读取Kafka messages及其offsetRange

在Spark Streaming job中读取Kafka topic(s)中的messages时,有时我们会需要同步记录下每次读取的messages的offsetRange。要达到这一目的,下面这两段代码(代码1和代码2)都是正确的,而且是等价的。

代码1(正确):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.foreachRDD(

new Function<JavaPairRDD<String, String>, Void>() {

@Override

public Void call(JavaPairRDD<String, String> rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

JavaRDD<String> valueRDD = rdd.values();

long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代码2(正确):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

final AtomicReference<OffsetRange[]> offsetRanges=new AtomicReference();

lines = messages.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {

@Override

public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

offsetRanges.set(offsets);

return rdd;

}

}).map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

});

lines.foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

但是要注意,下面这两段代码(代码3和代码4)是错误的,它们都会抛出一个exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

代码3(错误):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {

@Override

public JavaRDD<String> call(JavaPairRDD<String, String> rdd) throws Exception {

return rdd.values();

}

}).foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代码4(错误):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

}).foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

举报
领券