专栏首页悦思悦读Spark Tips3: 在Spark Streaming job中读取Kafka messages及其offsetRange

Spark Tips3: 在Spark Streaming job中读取Kafka messages及其offsetRange

在Spark Streaming job中读取Kafka topic(s)中的messages时,有时我们会需要同步记录下每次读取的messages的offsetRange。要达到这一目的,下面这两段代码(代码1和代码2)都是正确的,而且是等价的。

代码1(正确):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.foreachRDD(

new Function<JavaPairRDD<String, String>, Void>() {

@Override

public Void call(JavaPairRDD<String, String> rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

JavaRDD<String> valueRDD = rdd.values();

long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代码2(正确):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

final AtomicReference<OffsetRange[]> offsetRanges=new AtomicReference();

lines = messages.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {

@Override

public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

offsetRanges.set(offsets);

return rdd;

}

}).map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

});

lines.foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

但是要注意,下面这两段代码(代码3和代码4)是错误的,它们都会抛出一个exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

代码3(错误):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {

@Override

public JavaRDD<String> call(JavaPairRDD<String, String> rdd) throws Exception {

return rdd.values();

}

}).foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代码4(错误):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

}).foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

本文分享自微信公众号 - 悦思悦读(yuesiyuedu),作者:YJL

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2015-11-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark Tips4: Kafka的Consumer Group及其在Spark Streaming中的“异动”(更新)

    按照Kafka官方的说法(http://kafka.apache.org/08/introduction.html),某一特定topic对于相同group id...

    叶锦鲤
  • Spark Tips 1: RDD的collect action 不适用于单个element size过大的情况

    collect是Spark RDD一个非常易用的action,通过collect可以轻易获得一个RDD当中所有的elements。当这些elements是Str...

    叶锦鲤
  • Spark Tips 2: 在Spark Streaming中均匀分配从Kafka directStream 中读出的数据

    下面这段code用于在Spark Streaming job中读取Kafka的message: ...... JavaPairInputDStream<Stri...

    叶锦鲤
  • 这样规范写代码,同事直呼“666”

    java思维导图
  • 关于SpringMVC中如何把查询数据全转成String类型

    上帝
  • 厉害了,关于String的10道经典面试题。

    1、String是基本数据类型吗? 2、String是可变的话? 3、怎么比较两个字符串的值一样,怎么比较两个字符串是否同一对象? 4、switch中可以使用S...

    Java技术栈
  • 大数据算法设计模式(2) - 左外链接(leftOuterJoin) spark实现

    左外链接(leftOuterJoin) spark实现 package com.kangaroo.studio.algorithms.join; impor...

    用户1225216
  • java中两个map比较

    ydymz
  • Java 中 String 类为什么要设计成不可变的?

    String 是 Java 中不可变的类,所以一旦被实例化就无法修改。不可变类的实例一旦创建,其成员变量的值就不能被修改。本文总结下 String 类设计成不可...

    用户3596197
  • Java——String类使用详解(实例化、字符串比较、匿名对象、两种实例化方法的区别)

    String类不是一个基本数据类型,它是一个类,这个类设计过程种加入了Java的特殊支持,其实例化形式有两种形式:

    Winter_world

扫码关注云+社区

领取腾讯云代金券