Spark Streamming+Kafka提交offset实现有且仅有一次

前言

本文讲Spark Streamming使用Direct方式读取Kafka,并在输出(存储)操作之后提交offset到Kafka里实现程序读写操作有且仅有一次,即程序重启之后之前消费并且输出过的数据不再重复消费,接着上次消费的位置继续消费Kafka里的数据。 Spark Streamming+Kafka官方文档:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

1、提交offset的程序

根据官方文档可知,在spark代码里可以获取对应的offset信息,并且可以提交offset存储到kafka中。 代码:

package com.dkl.leanring.spark.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.TaskContext

object KafkaOffsetDemo {
  def main(args: Array[String]) {

    //创建sparkConf
    val sparkConf = new SparkConf().setAppName("KafkaOffsetDemo").setMaster("local[2]")
    // 创建StreamingContext batch size 为 1秒
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "ambari.master.com:6667", //kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "KafkaOffsetDemo", //消费者组名
      "auto.offset.reset" -> "earliest", //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("top1") //消费主题
    //创建DStream,返回接收到的输入数据
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams))
    // 打印获取到的数据,因为1秒刷新一次,所以数据长度大于0时才打印
    stream.foreachRDD(f => {

      if (f.count > 0) {
        println("=============================")
        println("打印获取到的kafka里的内容")
        f.foreach(f => {
          val value = f.value()
          println(value)

        })
        println("=============================")
        println("打印offset的信息")
        // offset
        val offsetRanges = f.asInstanceOf[HasOffsetRanges].offsetRanges

        //打印offset
        f.foreachPartition { iter =>
          val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        }
        println("=============================")
        // 等输出操作完成后提交offset
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

      }
    })
    //启动
    ssc.start()
    //等待停止
    ssc.awaitTermination()
  }
}

说明:

  • auto.offset.reset设置为earliest,即当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始,这样设置的目的是为了一开始可以获取到kafka对应主题下的所有的历史消息。
  • enable.auto.commit 设置为false,如果是true,则这个消费者的偏移量会在后台自动提交,这样设置目的是为了后面自己提交offset,因为如果虽然获取到了消息,但是后面的转化操作并将结果写到如hive中并没有完成程序就挂了的话,这样是不能将这次的offset提交的,这样就可以等程序重启之后接着上次失败的地方继续消费
  • group.id 是不能变得,也就是offset是和topic和group绑定的,如果换一个group的话,程序将从头消费所有的历史数据
  • 这个api是将offset存储到kakfa的一个指定的topic里,名字为__consumer_offsets,而不是zookeeper中2、测试程序 1、首先创建对应的topic 2、生产几条数据作为历史消息bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic top1

3、启动上面的程序 4、继续生产几条数据 接下来先看一下结果:

由图可得,这样可以将历史数据全部打印出来,并且后面实时增加的数据,也打印出来了,且可以看到offset是在增加的,最后一个offset是202,那么接下来测试一下程序重启之后是否会接着之前的数据继续消费呢 5、停止程序 6、生产几条数据 7、启动程序 看一下结果:

可以看出,程序确实是接着上次消费的地方消费的,为了证实这一点,我将earliest和offset圈了起来,从offset可以看到是从上次的202开始消费的。

3、关于offset过期时间

kafka offset默认的过期时间是一天,当上面的程序挂掉,一天之内没有重启,也就是一天之内没有保存新的offset的话,那么之前的offset就会被删除,再重启程序,就会从头开始消费kafka里的所有历史数据,这种情况是有问题的,所以可以通过设置offsets.retention.minutes自定义offset过期时间,该设置单位为分钟,默认为1440。 修改kafka的offset过期时间详细信息见:https://dongkelun.com/2018/06/21/modifyKafkaOffsetTime/

4、自己保存offset

可以通过自己保存offset的信息到数据库里,然后需要时再取出来,根据得到的offset信息消费kafka里的数据,这样就不用担心offset的过期的问题了,因为没有自己写代码实现,所以先给出官网的示例代码:

// The details depend on your data store, but the general idea looks like this

// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏小狼的世界

Gearman的问题分析与深入研究

Gearman作为一个优秀的分布式解决方案,已经被众多的公司或者团队所采用,我在之前的一篇文章中也有过介绍。但是目前对于woker的执行状态和结果监控,特别是放...

742
来自专栏Hadoop数据仓库

HAWQ技术解析(十六) —— 运维监控

        与任何IT系统一样,为了保证HAWQ集群的高可用和高性能,需要进行一系列监控与维护活动。本篇讨论HAWQ推荐的运维与监控活动。 一、推荐的监控与...

2118
来自专栏用户画像

4.1.6 文件系统基础

文件是以计算机硬盘为载体存储在计算机上的信息集合,它的形式很多样化,可以是文本文档、图片、程序等。

522
来自专栏铭毅天下

上线必备 | 高性能ES5.X部署配置清单

| 题记 | 网上当前(截止20170914)流传的各种配置文件普通存在以下问题: 1)版本低,不能和当前版本5.X匹配。 2)5.X的配置本身较1.X,2...

35911
来自专栏技术博文

file_get_contents()函数超时处理方法

file_get_contents — 将整个文件读入一个字符串 string file_get_contents ( string $filename [, ...

3607
来自专栏网络

Nginx 教程#1:基本概念

协作翻译 原文:Nginx Tutorial #1: Basic Concepts 链接:https://www.netguru.co/codestories/...

2068
来自专栏逸鹏说道

memcached安装及.NET中的Memcached.ClientLibrary使用详解

序言 吹吹牛逼先,借我你的20分钟,保证你在.net中使用memcached缓存数据,畅通无阻,提升数据读取效率,分担数据库压力,便不在话下。 本篇主要说下:m...

2707
来自专栏我是攻城师

kafka版本不一致导致的一个小问题(二)

3658
来自专栏大魏分享(微信公众号:david-share)

从PowerVM,KVM到Docker:存储池的配置与调优---第一篇(第1子篇)

作者说明: 针对虚拟化中存储池的配置,笔者将书写一个系列作品,介绍从PowerVM到KVM再到Docker中存储池的配置与调优。似乎看起来三种技术没有什么关联性...

2735
来自专栏大数据和云计算技术

MongoDB Compass--MongoDB DBA必备的管理工具

MongoDB Compass是MongoDB官网提供的一个集创建数据库、管理集合和文档、运行临时查询、评估和优化查询、性能图表、构建地理查询等功能为一体的...

3595

扫码关注云+社区