首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark-streaming-kafka包源码分析

spark-streaming-kafka包源码分析

作者头像
sanmutongzi
发布2020-03-04 15:47:25
5900
发布2020-03-04 15:47:25
举报
文章被收录于专栏:stream processstream process

转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/5443789.html

最近由于使用sparkstreaming的同学需要对接到部门内部的的kafka集群,由于官方的spark-streaming-kafka包和现有公司的kafka集群权限系统无法对接,需要研究下spark-streaming-kafka包原有代码以便改造,本文研究的代码版本为spark在github的tag的v1.6.1版本。

官方给出的JavaKafkaWordCount以及KafkaWordCount代码里产生kafka-streaming消费流数据的调用代码分别如下

 JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);


 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

可以看到无论是java还是scala调用的都是KafkaUtils内重载实现的createStream方法。

object KafkaUtils {
  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param ssc       StreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   * @param groupId   The group id for this consumer
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread
   * @param storageLevel  Storage level to use for storing the received objects
   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)] = {
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000")
    createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics, storageLevel)
  }

  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param ssc         StreamingContext object
   * @param kafkaParams Map of kafka configuration parameters,
   *                    see http://kafka.apache.org/08/configuration.html
   * @param topics      Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                    in its own thread.
   * @param storageLevel Storage level to use for storing the received objects
   * @tparam K type of Kafka message key
   * @tparam V type of Kafka message value
   * @tparam U type of Kafka message key decoder
   * @tparam T type of Kafka message value decoder
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)] = {
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
  }

  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
   * @param jssc      JavaStreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   * @param groupId   The group id for this consumer
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      jssc: JavaStreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: JMap[String, JInt]
    ): JavaPairReceiverInputDStream[String, String] = {
    createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
  }

  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param jssc      JavaStreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..).
   * @param groupId   The group id for this consumer.
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread.
   * @param storageLevel RDD storage level.
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      jssc: JavaStreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: JMap[String, JInt],
      storageLevel: StorageLevel
    ): JavaPairReceiverInputDStream[String, String] = {
    createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
      storageLevel)
  }

  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param jssc      JavaStreamingContext object
   * @param keyTypeClass Key type of DStream
   * @param valueTypeClass value type of Dstream
   * @param keyDecoderClass Type of kafka key decoder
   * @param valueDecoderClass Type of kafka value decoder
   * @param kafkaParams Map of kafka configuration parameters,
   *                    see http://kafka.apache.org/08/configuration.html
   * @param topics  Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                in its own thread
   * @param storageLevel RDD storage level.
   * @tparam K type of Kafka message key
   * @tparam V type of Kafka message value
   * @tparam U type of Kafka message key decoder
   * @tparam T type of Kafka message value decoder
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
      jssc: JavaStreamingContext,
      keyTypeClass: Class[K],
      valueTypeClass: Class[V],
      keyDecoderClass: Class[U],
      valueDecoderClass: Class[T],
      kafkaParams: JMap[String, String],
      topics: JMap[String, JInt],
      storageLevel: StorageLevel
    ): JavaPairReceiverInputDStream[K, V] = {
    implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
    implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)

    implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
    implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)

    createStream[K, V, U, T](
      jssc.ssc,
      kafkaParams.asScala.toMap,
      Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
      storageLevel)
  }

其中java相关的第三个和第四个createStream调用了第一个createStream,而第一个createStream最后调用的是第二个createStream,所以所有的rdd数据流都是从下面这句代码产生的:

new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)

查看KafkaInputDStream类定义,发现获取receiver有两种类型:KafkaReceiver和ReliableKafkaReceiver。

  def getReceiver(): Receiver[(K, V)] = {
    if (!useReliableReceiver) {
      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    } else {
      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    }
  }

其中,KafkaReceiver实现比较简单,调用的是kafka的high level api产生数据流,产生的每个线程的数据流都被放到一个线程池由单独的线程来消费

val topicMessageStreams = consumerConnector.createMessageStreams(
  topics, keyDecoder, valueDecoder)

 ReliableKafkaReceiver是结合了spark的预写日志(Write Ahead Logs)功能,开启这个功能需要设置sparkconf属性 spark.streaming.receiver.writeAheadLog.enable为真(默认值是假)

这个receiver会把收到的kafka数据首先存储到日志上,然后才会向kafka提交offset,这样保证了在driver程序出现问题的时候不会丢失kafka数据。

参考文章 Spark Streaming容错的改进和零数据丢失

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-05-10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档