SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

前言

本文利用SparkStreaming和Kafka实现基于缓存的实时wordcount程序,什么意思呢,因为一般的SparkStreaming的wordcount程序比如官网上的,只能统计最新时间间隔内的每个单词的数量,而不能将历史的累加起来,本文是看了教程之后,自己实现了一下kafka的程序,记录在这里。其实没什么难度,只是用了一个updateStateByKey算子就能实现,因为第一次用这个算子,所以正好学习一下。

1、数据

数据是我随机在kafka里生产的几条,单词以空格区分开

2、kafka topic

首先在kafka建一个程序用到topic:UpdateStateBykeyWordCount

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic UpdateStateBykeyWordCount

3、创建checkpoint的hdfs目录

我的目录为:/spark/dkl/kafka/wordcount_checkpoint

hadoop fs -mkdir -p /spark/dkl/kafka/wordcount_checkpoint

4、Spark代码

启动下面的程序

package com.dkl.leanring.spark.kafka

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.Seconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object UpdateStateBykeyWordCount {

  def main(args: Array[String]): Unit = {
    //初始化,创建SparkSession
    val spark = SparkSession.builder().appName("sskt").master("local[2]").enableHiveSupport().getOrCreate()
    //初始化,创建sparkContext
    val sc = spark.sparkContext
    //初始化,创建StreamingContext,batchDuration为1秒
    val ssc = new StreamingContext(sc, Seconds(5))

    //开启checkpoint机制
    ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/wordcount_checkpoint")

    //kafka集群地址
    val server = "ambari.master.com:6667"

    //配置消费者
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> server, //kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "UpdateStateBykeyWordCount", //消费者组名
      "auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量   earliest 、none
      "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("UpdateStateBykeyWordCount") //消费主题

    //基于Direct方式创建DStream
    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    //开始执行WordCount程序

    //以空格为切分符切分单词,并转化为 (word,1)形式
    val words = stream.flatMap(_.value().split(" ")).map((_, 1))
    val wordCounts = words.updateStateByKey(
      //每个单词每次batch计算的时候都会调用这个函数
      //第一个参数为每个key对应的新的值,可能有多个,比如(hello,1)(hello,1),那么values为(1,1)
      //第二个参数为这个key对应的之前的状态
      (values: Seq[Int], state: Option[Int]) => {

        var newValue = state.getOrElse(0)
        values.foreach(newValue += _)
        Option(newValue)

      })
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()

  }

}

5、生产几条数据

随便写几条即可

bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic UpdateStateBykeyWordCount

6、结果

根据结果可以看到,历史的单词也被统计打印出来了

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Jed的技术阶梯

Kafka 自定义分区器

(1) 如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分...

1522
来自专栏文渊之博

pyspark 内容介绍(一)

pyspark 包介绍 子包 pyspark.sql module pyspark.streaming module pyspark.ml package py...

5466
来自专栏奇点大数据

用SparkStreaming做奇怪的事

作者:尹会生 无需授权即可转载,甚至无需保留以上版权声明 Spark Steaming 是非常著名的流式计算工具,这次用它来搞一个奇葩的需求:开发给定一个日志...

29110
来自专栏null的专栏

Hive——巧用transform处理复杂的字符串问题

相比于Map-Reduce,Hive对数据的处理相对简单,但是Hive本身提供的函数,对于处理复杂的字符串问题,就显得不是很方便,此时,可以借助transfor...

3975
来自专栏祝威廉

Structured Streaming如何实现Parquet存储目录按时间分区

StreamingPro现在支持以SQL脚本的形式写Structured Streaming流式程序了: mlsql-stream。不过期间遇到个问题,我希望按...

1071
来自专栏岑玉海

Spark源码系列(八)Spark Streaming实例分析

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照《Spark Streaming编程指南》。 Example代码分析 val ...

3117
来自专栏散尽浮华

Linux下针对服务器网卡流量和磁盘的监控脚本

1544
来自专栏大内老A

WCF技术剖析之十八:消息契约(Message Contract)和基于消息契约的序列化

在本篇文章中,我们将讨论WCF四大契约(服务契约、数据契约、消息契约和错误契约)之一的消息契约(Message Contract)。服务契约关注于对服务操作的描...

3515
来自专栏Jed的技术阶梯

SparkStreaming 写数据到 HBase,由于共用连接造成的数据丢失问题

有如下程序,SparkStreaming 读取 Kafka 中的数据,经过处理后,把数据写入到 Hbase 中

4932
来自专栏伦少的博客

spark基本概念(便于自己随时查阅--摘自Spark快速大数据分析)

转载请务必注明原创地址为:http://dongkelun.com/2018/01/23/sparkBasicConcept/

3768

扫码关注云+社区