在Spark Streaming job中读取Kafka topic(s)中的messages时,有时我们会需要同步记录下每次读取的messages的offsetRange。要达到这一目的,下面这两段代码(代码1和代码2)都是正确的,而且是等价的。
代码1(正确):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaRDD<String> valueRDD = rdd.values();
long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代码2(正确):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
final AtomicReference<OffsetRange[]> offsetRanges=new AtomicReference();
lines = messages.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
但是要注意,下面这两段代码(代码3和代码4)是错误的,它们都会抛出一个exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
代码3(错误):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaPairRDD<String, String> rdd) throws Exception {
return rdd.values();
}
}).foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代码4(错误):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}).foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});