前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka与Spark Streaming整合

Kafka与Spark Streaming整合

原创
作者头像
haimingli
发布2021-01-13 00:29:41
4690
发布2021-01-13 00:29:41
举报
文章被收录于专栏:kafka消息队列kafka消息队列

Kafka与Spark Streaming整合

概述

Spark Streaming是一个可扩展,高吞吐,容错能力强的实时流式处理处理系统。一般的系统架构图是,数据从一个源点,经过Sparing Streaming处理,最后汇聚到一个系统。Spark Streaming的数据来源可以非常丰富,比如Kafka, Flume, Twitter, ZeroMQ, Kinesis 或者是任何的TCP sockets程序。对于数据的处理,Spark Streaming提供了非常丰富的高级api,例如map,redue,joini和窗口函数等等。数据处理完成后,可以存储到其他地方,比如文件系统,对象存储,数据库。典型的数据处理流程图:

概念简介

RDD:Resilient Distributed Datasets,弹性分部署数据集,支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;动作(actions)在数据集上运行计算后,返回一个值给驱动程序。在这里简单理解为某个时间片的数据集合即可。

DStream:和RDD概念有点类似,是RDD的集合,代表着整个数据流。简单来说Spark Streaming中的数据量就是DStream,然后每个时间片的数据就是RDD。

Kafka与Spark Streaming整合

整合方式

Kafka与Spark Streaming整合,首先需要从Kafka读取数据过来,读取数据有两种方式

  • 方法一:Receiver-based 这种方式使用一个Receiver接收Kafka的消息,如果使用默认的配置,存在丢数据的风险,因为这种方式会把从kafka接收到的消息存放到Spark的exectors,然后再启动streaming作业区处理,如果exectors挂了,那么消息可能就丢失了。可以通过开启Write Ahead Logs来保证数据的可靠性(Spark 1.2后开始支持),这种方式和大多数存储系统的Write Ahead Logs类似,Spark会把接收到的消息及kafka消息偏移存放到分布式文件系统中(例如HDFS),如果运行期间出现了故障,那么这些信息会被用于故障恢复。
  • 方法二:Direc 这种方式是Spark 1.3引入的,Spark会创建和Kafka partition一一对应的的RDD分区,然后周期性的去轮询获取分区信息,这种方式和Receier-based不一样的是,它不需要Write Ahead Logs,而是通过check point的机制记录kafka的offset,通过check point机制,保证Kafka中的消息不会被遗漏。这种方式相对于第一种方式有多种优点,一是天然支持并发,建了了和Kafka的partition分区对应的RDD分区,第二点是更高效,不需要write ahead logs,减少了写磁盘次数,第三种优点是可以支持exactly-once语义,通过checkpoint机制记录了kafka的offset,而不是通过zk或者kafka来记录offset能避免分布式系统中数据不一致的问题,从而能支持exactly-once语义,当然这里不是说这样就完全是exactly-once了,还需要消费端配合做消息幂等或事物处理。这种模式是较新的模式,推荐使用该模式,第一种方式已经逐步被淘汰。

整合示例

下面使用一个示例,展示如何整合Kafka和Spark Streaming,这个例子中,使用一个生产者不断往Kafka随机发送数字,然后通过Spark Streaming统计时间片段内数字之和。

1.往kafka随机发送数字代码:

代码语言:txt
复制
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "spark-producer-demo-client");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String,String> producer = new KafkaProducer<>(properties);

        Random random = new Random();
        while (true) {
            int value = random.nextInt(10);
            ProducerRecord<String, String> message =
                    new ProducerRecord<>(topic, value+"");
            producer.send(message, (recordMetadata, e) -> {
                if (recordMetadata != null) {
                    System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + ":" +
                            recordMetadata.offset());
                }
            });
            TimeUnit.SECONDS.sleep(1);

2.提交到spark集群代码,用于统计2秒时间间隔的数字之和,这里我们使用的是Direct模式:

代码语言:txt
复制
 def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]")
      .setAppName("StreamingWithKafka")
    val ssc = new StreamingContext(sparkConf, Seconds(2)) // 1
    ssc.checkpoint(checkpointDir)

    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
        classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
        classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false:java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, // 2
      Subscribe[String, String](List(topic), kafkaParams))
    val value = stream.map(record => {
      val intVal = Integer.valueOf(record.value())
      println(intVal)
      intVal
    }).reduce(_+_)

    value.print()

    ssc.start
    ssc.awaitTermination
  }

上面代码中1处的代码表示聚合处理的时间分片为2秒,计算两秒内的随机数之和。2处的代码用于指定spark执行器上面的kafka consumer分区分配策略,一共有三种类型,PreferConsistent是最常用的,表示订阅主题的分区均匀分配到执行器上面,然后还有PreferBrokers,这种机制是优先分配到和broker相同机器的执行器上,还有一种是PreferFixed,这种是手动配置,用的比较少。上面程序每次计算2秒时间间隔内的数字之和,输入会类似如下:

代码语言:txt
复制
3
4
...

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka与Spark Streaming整合
    • 概述
      • 概念简介
    • Kafka与Spark Streaming整合
      • 整合方式
        • 整合示例
        相关产品与服务
        消息队列 CKafka 版
        消息队列 CKafka 版(TDMQ for CKafka)是一个分布式、高吞吐量、高可扩展性的消息系统,100%兼容开源 Kafka API 2.4、2.8、3.2 版本。CKafka 基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。CKafka 具有高可用、数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合、流式数据集成等场景。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档