Spark Streaming 中使用 zookeeper 保存 offset 并重用 Java版

最近中使用spark Streaming +kafka,由于涉及到金额,所以需要保证at only one, 而网上关于java版的kafka offset回写zk的资料少之又少,于是总结一下,希望可以为广大使用java的友友们提供参考!这里采用的是Direct Approach的方式.

各位看官,接下来让我们一起开始 offset的冒险之旅吧!(基于 spark 1.6 kafka 0.8)

    import kafka.common.TopicAndPartition;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import kafka.utils.ZKGroupTopicDirs;
    import org.I0Itec.zkclient.ZkClient;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import scala.Tuple2;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;


    /**
     * Created by shengjk1 on 2016/10/8.
     * Blog Address:http://blog.csdn.net/jsjsjs1789
     */

    public static JavaStreamingContext createContext() {

        final SparkConf conf = new SparkConf().setAppName("scan");
        final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(60));
        final HiveContext sqlContext = new HiveContext(jssc.sc());
        final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();

        //checkpoint失败后会自动重启,会造成数据丢失
        //sqlContext.setConf("hive.optimize.ppd", "false");
        //jssc.checkpoint("hdfs://centos11:9000/cp");

        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "centos11:9092");
        Set<String> topics = new HashSet<>();
        topics.add("20161008");


        final String zkServer="centos12:2181";
        //ZkClient zkClient = new ZkClient(zkServer, 60 * 1000, 60 * 1000);
        ZkClient zkClient=new ZkClient(zkServer);
        final String topic="20161008";


        //kafka groupid
        ZKGroupTopicDirs zgt=new ZKGroupTopicDirs("test-consumer-group",topic);
        final String zkTopicPath=zgt.consumerOffsetDir();

        int countChildren=zkClient.countChildren(zkTopicPath);


        Map<TopicAndPartition,Long> fromOffsets=new HashMap<>();

        //每一个children都是一个partition
        if(countChildren>0){
            for (int i = 0; i < countChildren; i++) {
                String path=zkTopicPath+"/"+i;

                logger.info("========================zk地址 "+path);
                String offset=zkClient.readData(path);
                TopicAndPartition  topicAndPartition=new TopicAndPartition(topic,i);
                fromOffsets.put(topicAndPartition,Long.parseLong(offset));
            }

            KafkaUtils.createDirectStream(
                    jssc,
                    String.class,
                    String.class,
                    StringDecoder.class,
                    StringDecoder.class,
                    String.class,
                    kafkaParams,
                    fromOffsets,
                    new Function<MessageAndMetadata<String, String>, String>() {
                        public String call(MessageAndMetadata<String, String> v1) throws Exception {
                            return v1.message();
                        }
                    }
            ).transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
                @Override
                public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
                    OffsetRange[] offsets = ((HasOffsetRanges) v1.rdd()).offsetRanges();
                    logger.info("=======================offsetsoffsetsoffsetsoffsets  "+offsets);
                    offsetRanges.set(offsets);
                    return v1;
                }
            }).foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> stringJavaRDD1) throws Exception {


                    //业务处理
                    ......


                    //回写zk
                    ZkClient zkClient=new ZkClient(zkServer);
                    OffsetRange[] offsets = offsetRanges.get();
                    if (null != offsets) {
                        logger.info("scan ===================zk开始更新 offsets" + offsets.length);
                        ZKGroupTopicDirs zgt = new ZKGroupTopicDirs("test-consumer-group", topic);
                        String zkTopicPath = zgt.consumerOffsetDir();
                        for (OffsetRange o : offsets) {
                            String zkPath = zkTopicPath + "/" + o.partition();
                            ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset() + "");
                            logger.info("scan ===================zk更新完成 path: " + zkPath);
                        }
                        zkClient.close();
                    }

                }
            });
        }else{
            KafkaUtils.createDirectStream(jssc,
                    String.class,
                    String.class,
                    StringDecoder.class,
                    StringDecoder.class,
                    kafkaParams,
                    topics)
                    .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
                @Override
                public JavaPairRDD<String, String> call(JavaPairRDD<String, String> v1) throws Exception {
                    OffsetRange[] offsets = ((HasOffsetRanges) v1.rdd()).offsetRanges();

                    logger.info("=elseelseelseelseelseelse======================offsetsoffsetsoffsetsoffsets  "+offsets);

                    offsetRanges.set(offsets);
                    return v1;
                }

            }).map(new Function<Tuple2<String,String>, String>() {
                @Override
                public String call(Tuple2<String, String> v1) throws Exception {
                    return v1._2();
                }
            }).foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> stringJavaRDD) throws Exception {


                    //业务处理
                    ......



                    //回写zk
                    ZkClient zkClient=new ZkClient(zkServer);
                    OffsetRange[] offsets = offsetRanges.get();
                    if (null != offsets) {
                        logger.info("scan ===================zk开始更新 offsets" + offsets.length);
                        for (OffsetRange o : offsets) {
                            String zkPath = zkTopicPath + "/" + o.partition();
                            ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset() + "");
                            logger.info("scan ===================zk更新完成 path: " + zkPath);
                        }
                        zkClient.close();
                    }

                }
            });

最大的一个问题是代码重复率太高,尝试过代码重构,即返回JavaInputDStream和JavaPairDStream的一个父类,但最后失败了,会报一些类型转化错误,也尝试过使用同一个zkClient,这样的话,ZkClient需要final,而org.I0Itec.zkclient.ZkClient包下的zkCLient是没有办法序列化的,有时间的话可以自己写一个zkClient,zkClient部分的重复代码应该可以解决掉。 也参考过scala的相应代码,它之所以没有重复代码,在于scala可以自动判断返回值得类型。

解决代码重复简单粗暴的方法就是进行代码的封装了。

指定partition进行消费:

Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();
fromOffsets.put(new TopicAndPartition("test1", 0), 5800L);
fromOffsets.put(new TopicAndPartition("test1", 1), 6600L);

JavaInputDStream<String> inputDStream = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        String.class,
        params,
        fromOffsets,
        (Function<MessageAndMetadata<String, String>, String>) MessageAndMetadata::message
);

注意: 1.特别是对于SparkStreaming连接kafka仅仅checkpoints也会导致数据丢失,无法保证at only one。此处着重说明一下若是因为spark代码导致的失败,checkpoints可以保证at only one,但若spark代码执行完毕由于插入数据库时程序失败,即使checkpoint也无法保证at only one 2.此版本更新offset处给予kafka-0.8,而kafka-0.9应该这样:

new ZkUtils(zkClient, null, false).updatePersistentPath(zkPath, o.untilOffset() + "", ZooDefs.Ids.OPEN_ACL_UNSAFE);

3.简易版的,可参考

http://blog.csdn.net/jsjsjs1789/article/details/79253044

参考: http://spark.apache.org/docs/latest/streaming-kafka-integration.html http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数据小魔方

Julia语言初体验

最近MIT发布的julia 1.0.0版,据传整合了C、Python、R等诸多语言特色,是数据科学领域又一把顶级利器。

2.9K20
来自专栏about云

spark2的SparkSession思考与总结2:SparkSession有哪些函数及作用是什么

问题导读 1.spark SparkSession包含哪些函数? 2.创建DataFrame有哪些函数? 3.创建DataSet有哪些函数? 上一篇spa...

81950
来自专栏数据科学与人工智能

【Spark研究】Lambda表达式让Spark编程更容易

近日,Databricks官方网站发表了一篇博文,用示例说明了lambda表达式如何让Spark编程更容易。文章开头即指出,Spark的主要目标之一是使编写大数...

34150
来自专栏猿天地

spring-data-mongodb mapreduce使用

今天主要介绍下在框架中如何使用mapreduce,不涉及到mapreduce的使用讲解 这边主要的js代码都将写在js文件中,放在classpath下面统一维护...

43960
来自专栏架构之路

Spring 数据库连接(Connection)绑定线程(Thread)的实现

最近在看spring事务的时候在想一个问题:spring中的很多bean都是单例的,是非状态的,而数据库连接是一种有状态的对象,所以spring一定在创建出co...

39830
来自专栏jeremy的技术点滴

Java开发小技巧_02

43940
来自专栏Spark学习技巧

textFile构建RDD的分区及compute计算策略

1,textFile A),第一点,就是输入格式,key,value类型及并行度的意义。 def textFile( path: String, mi...

26770
来自专栏一个会写诗的程序员的博客

《Kotlin 程序设计》第七章 Kotlin 编译过程分析第七章 Kotlin 编译过程分析

http://mp.weixin.qq.com/s/lEFRH523W7aNWUO1QE6ULQ

21020
来自专栏Spark生态圈

[spark] Task执行流程

在文章TaskScheduler 任务提交与调度源码解析 中介绍了Task在executor上的逻辑分配,调用TaskSchedulerImpl的resourc...

20810
来自专栏Java成神之路

Java微信开发_Exception_01_The type org.xmlpull.v1.XmlPullParser cannot be resolved. It is indirectly ref

这个异常是在做微信开发时出现的,在引入了XStream的jar包之后,还是出现了如下错误信息:

10930

扫码关注云+社区

领取腾讯云代金券