SparkStreaming+Kafka 实现统计基于缓存的实时uv

前言

本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实只要按照WordCount的思路,最后输出key的数量即可,所以可以利用SparkStreaming+Kafka 实现基于缓存的实时wordcount程序,这里稍加改动,如果uv数量增加的话就打印uv的数量(key的数量)。

1、数据

数据是我随机在kafka里生产的几条,用户以空格区分开(因为用的之前单词统计的程序)

2、kafka topic

首先在kafka建一个程序用到topic:KafkaUV

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic KafkaUV

3、创建checkpoint的hdfs目录

我的目录为:/spark/dkl/kafka/UV_checkpoint

hadoop fs -mkdir -p /spark/dkl/kafka/UV_checkpoint

4、Spark代码

启动下面的程序

package com.dkl.leanring.spark.kafka

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.Seconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafkaUV {
  def main(args: Array[String]): Unit = {
    //初始化,创建SparkSession
    val spark = SparkSession.builder().appName("KafkaUV").master("local[2]").enableHiveSupport().getOrCreate()
    //初始化,创建sparkContext
    val sc = spark.sparkContext
    //初始化,创建StreamingContext,batchDuration为5秒
    val ssc = new StreamingContext(sc, Seconds(5))

    //开启checkpoint机制
    ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/UV_checkpoint")

    //kafka集群地址
    val server = "ambari.master.com:6667"

    //配置消费者
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> server, //kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "UpdateStateBykeyWordCount", //消费者组名
      "auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量   earliest 、none
      "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("KafkaUV") //消费主题

    //基于Direct方式创建DStream
    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    //开始执行WordCount程序

    //以空格为切分符切分单词,并转化为 (word,1)形式
    val words = stream.flatMap(_.value().split(" ")).map((_, 1))
    val wordCounts = words.updateStateByKey(
      //每个单词每次batch计算的时候都会调用这个函数
      //第一个参数为每个key对应的新的值,可能有多个,比如(hello,1)(hello,1),那么values为(1,1)
      //第二个参数为这个key对应的之前的状态
      (values: Seq[Int], state: Option[Int]) => {

        var newValue = state.getOrElse(0)
        values.foreach(newValue += _)
        Option(newValue)

      })

    //共享变量,便于后面的比较是否用新的uv
    val accum = sc.longAccumulator("uv")

    wordCounts.foreachRDD(rdd => {

      //如果uv增加
      if (rdd.count > accum.value) {
        //打印uv
        println(rdd.count)
        //将共享变量的值更新为新的uv
        accum.add(rdd.count - accum.value)
      }
    })

    ssc.start()
    ssc.awaitTermination()

  }

}

5、生产几条数据

随便写几条即可

bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic KafkaUV

6、结果

根据结果可以看到,既做到了历史消息用户的累计,也做到了用户的去重

本文由 董可伦 发表于 伦少的博客 ,采用署名-非商业性使用-禁止演绎 3.0进行许可。

非商业转载请注明作者及出处。商业转载请联系作者本人。

本文标题:SparkStreaming+Kafka 实现统计基于缓存的实时uv

本文链接:https://dongkelun.com/2018/06/25/KafkaUV/

--------------------

作者联系方式:

QQ:1412359494

微信:dongkelun

--------------------

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

spark调优系列之内存和GC调优

本文基于spark1.6讲解。 一,基本概述 调优内存的使用主要有三个方面的考虑:对象的内存占用量(你可能希望整个数据集都适合内存),访问这些数据的开销,垃圾...

7149
来自专栏CSDN技术头条

Spark Block存储管理分析

Apache Spark中,对Block的查询、存储管理,是通过唯一的Block ID来进行区分的。所以,了解Block ID的生成规则,能够帮助我们了解Blo...

22410
来自专栏祝威廉

Spark Streaming + Spark SQL 实现配置化ETL流程

通常而言,你可能会因为要走完上面的流程而构建了一个很大的程序,比如一个main方法里上百行代码,虽然在开发小功能上足够便利,但是复用度更方面是不够的,而且不利于...

1893
来自专栏CDA数据分析师

专栏 | Learning Spark (Python版) 学习笔记(二)----键值对、数据读取与保存、共享特性

本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =。以后还是要按时完成任务。废话不多说,第四章-第六章主要讲了三个内容:键值对、数据读取与保存与Spa...

2089
来自专栏24k

Spark wordcount开发并提交到单机(伪分布式)运行

1654
来自专栏大内老A

WCF技术剖析之二十一:WCF基本异常处理模式[中篇]

通过WCF基本的异常处理模式[上篇], 我们知道了:在默认的情况下,服务端在执行某个服务操作时抛出的异常(在这里指非FaultException异常),其相关的...

20210
来自专栏大内老A

ASP.NET MVC基于标注特性的Model验证:一个Model,多种验证规则

对于Model验证,理想的设计应该是场景驱动的,而不是Model(类型)驱动的,也就是对于同一个Model对象,在不同的使用场景中可能具有不同的验证规则。举个简...

17710
来自专栏分布式系统进阶

Influxdb Cluster下的数据写入

3.2 调用w.MetaClient.CreateShardGroup, 如果ShardGroup存在直接返回ShardGroup信息,如果不存在创建,创建过程...

1362
来自专栏岑玉海

Spark调优

因为Spark是内存当中的计算框架,集群中的任何资源都会让它处于瓶颈,CPU、内存、网络带宽。通常,内存足够的情况之下,网络带宽是瓶颈,这时我们就需要进行一些调...

3818
来自专栏肖力涛的专栏

Spark踩坑记:共享变量

如果我们想在节点之间共享一份变量,比如一份公共的配置项,该怎么办呢?Spark为我们提供了两种特定的共享变量,来完成节点间变量的共享。 本文首先简单的介绍spa...

1K1

扫码关注云+社区