专栏首页伦少的博客Spark Streamming+Kafka提交offset实现有且仅有一次

Spark Streamming+Kafka提交offset实现有且仅有一次

前言

本文讲Spark Streamming使用Direct方式读取Kafka,并在输出(存储)操作之后提交offset到Kafka里实现程序读写操作有且仅有一次,即程序重启之后之前消费并且输出过的数据不再重复消费,接着上次消费的位置继续消费Kafka里的数据。 Spark Streamming+Kafka官方文档:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

1、提交offset的程序

根据官方文档可知,在spark代码里可以获取对应的offset信息,并且可以提交offset存储到kafka中。 代码:

package com.dkl.leanring.spark.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.TaskContext

object KafkaOffsetDemo {
  def main(args: Array[String]) {

    //创建sparkConf
    val sparkConf = new SparkConf().setAppName("KafkaOffsetDemo").setMaster("local[2]")
    // 创建StreamingContext batch size 为 1秒
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "ambari.master.com:6667", //kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "KafkaOffsetDemo", //消费者组名
      "auto.offset.reset" -> "earliest", //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("top1") //消费主题
    //创建DStream,返回接收到的输入数据
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams))
    // 打印获取到的数据,因为1秒刷新一次,所以数据长度大于0时才打印
    stream.foreachRDD(f => {

      if (f.count > 0) {
        println("=============================")
        println("打印获取到的kafka里的内容")
        f.foreach(f => {
          val value = f.value()
          println(value)

        })
        println("=============================")
        println("打印offset的信息")
        // offset
        val offsetRanges = f.asInstanceOf[HasOffsetRanges].offsetRanges

        //打印offset
        f.foreachPartition { iter =>
          val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        }
        println("=============================")
        // 等输出操作完成后提交offset
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

      }
    })
    //启动
    ssc.start()
    //等待停止
    ssc.awaitTermination()
  }
}

说明:

  • auto.offset.reset设置为earliest,即当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始,这样设置的目的是为了一开始可以获取到kafka对应主题下的所有的历史消息。
  • enable.auto.commit 设置为false,如果是true,则这个消费者的偏移量会在后台自动提交,这样设置目的是为了后面自己提交offset,因为如果虽然获取到了消息,但是后面的转化操作并将结果写到如hive中并没有完成程序就挂了的话,这样是不能将这次的offset提交的,这样就可以等程序重启之后接着上次失败的地方继续消费
  • group.id 是不能变得,也就是offset是和topic和group绑定的,如果换一个group的话,程序将从头消费所有的历史数据
  • 这个api是将offset存储到kakfa的一个指定的topic里,名字为__consumer_offsets,而不是zookeeper中2、测试程序 1、首先创建对应的topic 2、生产几条数据作为历史消息bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic top1

3、启动上面的程序 4、继续生产几条数据 接下来先看一下结果:

由图可得,这样可以将历史数据全部打印出来,并且后面实时增加的数据,也打印出来了,且可以看到offset是在增加的,最后一个offset是202,那么接下来测试一下程序重启之后是否会接着之前的数据继续消费呢 5、停止程序 6、生产几条数据 7、启动程序 看一下结果:

可以看出,程序确实是接着上次消费的地方消费的,为了证实这一点,我将earliest和offset圈了起来,从offset可以看到是从上次的202开始消费的。

3、关于offset过期时间

kafka offset默认的过期时间是一天,当上面的程序挂掉,一天之内没有重启,也就是一天之内没有保存新的offset的话,那么之前的offset就会被删除,再重启程序,就会从头开始消费kafka里的所有历史数据,这种情况是有问题的,所以可以通过设置offsets.retention.minutes自定义offset过期时间,该设置单位为分钟,默认为1440。 修改kafka的offset过期时间详细信息见:https://dongkelun.com/2018/06/21/modifyKafkaOffsetTime/

4、自己保存offset

可以通过自己保存offset的信息到数据库里,然后需要时再取出来,根据得到的offset信息消费kafka里的数据,这样就不用担心offset的过期的问题了,因为没有自己写代码实现,所以先给出官网的示例代码:

// The details depend on your data store, but the general idea looks like this

// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 在线水平扩展/收缩验证

    地址:http://10.111.24.194:8080/#/main/hosts 界面如下:

    董可伦
  • Spark架构原理

    董可伦
  • spark ML之特征处理(1)

    转载请务必注明原创地址为:https://dongkelun.com/2018/05/17/sparkMlFeatureProcessing1/

    董可伦
  • Kafka到底有几个Offset?——Kafka核心之偏移量机制

    Kafka是由LinkIn开源的实时数据处理框架,目前已经更新到2.3版本。不同于一般的消息中间件,Kafka通过数据持久化和磁盘读写获得了极高的吞吐量...

    用户6070864
  • Kafka+Spark Streaming管理offset的几种方法

    场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序...

    暴走大数据
  • Kafka+Spark Streaming管理offset的几种方法

    场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序...

    王知无
  • Kafka到底有几个Offset?——Kafka核心之偏移量机制

    ​ Kakfa的Offset机制是其最核心机制之一,由于API对于部分功能的实现,我们有时并没有手动去设置Offset,那么Kafka到底有几个Offset呢?

    实时计算
  • Kafka的消费积压监控-Burrow

    扫帚的影子
  • spark streaming窗口及聚合操作后如何管理offset

    对于spark streaming来说窗口操作之后,是无法管理offset的,因为offset的存储于HasOffsetRanges,只有kafkaRDD继承了...

    Spark学习技巧
  • LintCode-8.旋转字符串

    乍一看的思路是建立新的数组往里面填,空间复杂度O(n),时间复杂度O(1)。看了一下“挑战“是空间复杂度O(1),那么就是循环咯(没百度,可能有更6的办法)。

    悠扬前奏

扫码关注云+社区

领取腾讯云代金券