本文简述如何结合 Spark Streaming 和 Kakfa 来做实时计算。截止目前(2016-03-27)有两种方式:
两种方式在编程模型、运行特性、语义保障方面均不相同,让我们进一步说明。
如果你对 Receivers 没有概念,请先移步:揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入
这种方法使用一个 Receiver 来接收数据。在该 Receiver 的实现中使用了 Kafka high-level consumer API。Receiver 从 kafka 接收的数据将被存储到 Spark executor 中,随后启动的 job 将处理这些数据。
在默认配置下,该方法失败后会丢失数据(保存在 executor 内存里的数据在 application 失败后就没了),若要保证数据不丢失,需要启用 WAL(即预写日志�至 HDFS、S3等),这样再失败后可以从日志文件中恢复数据。WAL 相关内容请参见:http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
接下来讨论如何在 streaming application 中应用这种方法。使用 KafkaUtils.createStream
,实例代码如下:
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
需要注意的点:
StorageLevel.MEMORY_AND_DISK_SER
或 StorageLevel.MEMORY_AND_DISK_SER_2
自 Spark-1.3.0 起,提供了不需要 Receiver 的方法。替代了使用 receivers 来接收数据,该方法定期查询每个 topic+partition 的 lastest offset,并据此决定每个 batch 要接收的 offsets 范围。需要注意的是,该特性在 Spark-1.3(Scala API)是实验特性。
该方式相比使用 Receiver 的方式有以下好处:
当然,方式二相比于方式一也有缺陷,即不会自动更新消费的 offsets 至 Zookeeper,从而一些监控工具就无法看到消费进度。方式二需要自行保存消费的 offsets,这在 topic 新增 partition 时会变得更加麻烦。
下面来说说怎么使用方式二,示例如下:
import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
Kafka 参数中,需要指定 metadata.broker.list
或 bootstrap.servers
。默认会从每个 topic 的每个 partition 的 lastest offset 开始消费,也可以通过将 auto.offset.reset
设置为 smallest
来从每个 topic 的每个 partition 的 smallest offset 开始消费。
使用其他重载的 KafkaUtils.createDirectStream
函数也支持从任意 offset 消费数据。另外,如果你想在每个 bath 内获取消费的 offset,可以按下面的方法做:
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}
你可以用上面的方法获取 offsets 并保存到Zookeeper或数据库中等。
需要注意的是,RDD partition 与 Kafka partition 的一一对应关系在shuffle或repartition之后将不复存在( 如reduceByKey() 或 window() ),所以要获取 offset 需要在此之前。
另一个需要注意的是,由于方式二不使用 Receiver,所以任何 Receiver 相关的配置,即spark.streaming.receiver.*
均不生效,需要转而使用 spark.streaming.kafka.*
。一个重要的参数是 spark.streaming.kafka.maxRatePerPartition
,用来控制每个 partition 每秒能接受的数据条数的上限。