首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

必读:再讲Spark与kafka 0.8.2.1+整合

Kafka在0.8和0.10版本引入了新的消费者API,所以sparkStreaming与kafka的整合提供了两个包。如果请根据你的集群选用正确的包。注意,跟0.8和后期的版本0.9及0.10是兼容的,但是0.10整合是不兼容之前的版本的。

包与版本特性之间的对应关系如下:

本文主要讲述sparkStreaming与kafka 0.8.2.1+版本整合,要求kafka集群的版本是0.8.2.1或者更高版本。

基于Receiver的方式

这种方式使用一个Receiver来接受数据。Receiver是使用kafka的高级消费者API来实现的。所有的Receiver从kafka里面接受数据,然后存储于Executors,sparkStreaming再生成任务来处理数据。

然而,默认配置的情况,这种方式在失败的情况下有可能丢失数据,为了确保零数据丢失,可以配置预写日志(WAL,从spark1.2引入)。这会将Receiver接收到的数据写入分布式文件系统,如hdfs,所以所有的数据可以在从失败恢复运行的时候加载到。

导包(MVN或者sbt):

artifactId = spark-streaming-kafka-0-8_2.11

version = 2.2.1

测试代码如下:

valsparkConf =newSparkConf().setAppName("KafkaWordCount")

valssc =newStreamingContext(sparkConf,Seconds(2))

ssc.checkpoint("checkpoint")

valtopics ="topic1,topic2 1"

valtopicMap = topics.split(",").map((_,numThreads.toInt)).toMap

vallines = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2)

valwords = lines.flatMap(_.split(" "))

valwordCounts = words.map(x => (x,1L))

.reduceByKeyAndWindow(_ + _,_ - _,Minutes(10),Seconds(2),2)

wordCounts.print()

ssc.start()

ssc.awaitTermination()

注意事项:

1,打包的时候spark-streaming-kafka-0-8对应的jar包一定要带上。

2,消费的kafka分区和生成的RDD分区并不是一一对应的。所以,增加KafkaUtils.createStream()命令中topic指定的分区,也即map里面topic名字对应的value,只会增加消费该命令创建的Receiver的内部消费者线程数目,不会增加spark处理数据的并行度,恰当线程数会增加Receiver的接收数据的速度。

3,KafkaUtils.createStream()命令执行只会创建一个Receiver,我们可以结合消费的topic分区和group名称来多创建几个Receiver,来增加接收数据的并行度。

4,如果你启动了预写日志,日志存储系统时hdfs,日志已经会被存储副本。所以,可以设置存储等级为StorageLevel.MEMORY_AND_DISK_SER.

Direct Approach

在spark1.3以后引入了一种新的sparkStreamingapi,新的api回自己在driver内部维护一个偏移,然后自动计算指定的topic+partition该批次需要拉去数据的范围,然后从kafka拉去数据来计算。不同于基于Receiver的方式,direct模式不会将偏移记录到Zookeeper,以保证故障恢复从上次偏移处消费消息。Direct模式你可以通过Checkpoint或者自己编写工具来实现偏移的维护,保证数据消费不丢失。

这种方式相比于基于Receiver的方式有以下优势:

1,简化并行度:不需要创建多个kafkastream,然后union他们。使用directStream,sparkstreaming生成的RDD分区和kafka的分区是一一对应的,这种方式理解起来更简单而且便于调优。

2,高效:基于Receiver的方式要保证数据不丢失,必须启用预写日志。这个行为实际上是非常抵消的,数据会被复制两次,一次是kafka集群,一次是预写日志。Direct方式解决了这个问题,由于没有Receiver,故而也不需要预写日志。只要你kafka里面存有数据,那么消息就可以从kafka里面恢复。

3,仅一次消费语义:基于Receiver的会把偏移提交到Zookeeper。这种方式结合预写日志能保证数据不丢失,也即是最少一次消费语义,但是有几率导致消费者在存在失败的情况下消费消息两次。比如,消息处理并经过存储之后,但是偏移并没有提交到Zookeeper,这个时候发生故障了,那么恢复之后,就会按照Zookeeper上的偏移再一次消费数据并处理,导致消息重复处理。但是direct方式偏移不会提交到Zookeeper,是sparkstreaming在driver使用内存变量加Checkpoint进行追踪的,所以尽管会存在任务失败,但是仍然能保证消费的一次处理。

注意,由于direct方式不会提交偏移到Zookeeper,所以,基于Zookeeper的kafka监控工具就不能监控到sparkstreaming的消费情况。然而,你可以自己讲偏移提交道Zookeeper,来满足你的需求。

导包(MVN或者sbt):

artifactId = spark-streaming-kafka-0-8_2.11

version = 2.2.1

测试代码如下:

valArray(brokers,topics) = args

// Create context with 2 second batch interval

valsparkConf =newSparkConf().setAppName("DirectKafkaWordCount")

valssc =newStreamingContext(sparkConf,Seconds(2))

// Create direct kafka stream with brokers and topics

valtopicsSet = topics.split(",").toSet

valkafkaParams =Map[String,String]("metadata.broker.list"-> brokers)

valmessages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](

ssc,kafkaParams,topicsSet)

// Get the lines, split them into words, count the words and print

vallines = messages.map(_._2)

valwords = lines.flatMap(_.split(" "))

valwordCounts = words.map(x => (x,1L)).reduceByKey(_ + _)

wordCounts.print()

// Start the computation

ssc.start()

ssc.awaitTermination()

关于自己编代码提交到Zookeeper,限于篇幅的原因,不在这里啰嗦。

调优限速

现实系统中会有流量尖峰,比如淘宝的双十一,那一秒钟的流量,大的吓人,假如有sparkstreaming处理的话,会有可能导致消息不能及时处理,甚至出现故障,应对这种流量尖峰,sparkstreaming内部实现了一个控制器,基于PID,具体PID的概念是啥,请自行百度。

这里只是想介绍两个主要的参数:

通过我们压测我们的sparkstreaming任务每秒钟最大消费处理的消息数,然后使用这两个参数限消费消息的速率,来避免高峰期一批次消费过量消息导致应用不正常执行。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180317G01JV000?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券