本实战项目使用 Structured Streaming 来实时的分析处理用户对广告点击的行为数据.
使用代码的方式持续的生成数据, 然后写入到 kafka 中.
然后Structured Streaming 负责从 kafka 消费数据, 并对数据根据需求进行分析.
模拟出来的数据格式:
时间戳,地区,城市,用户 id,广告 id
1566035129449,华南,深圳,101,2
// 启动 zookeeper 和 Kafka
[bigdata@hadoop002 zookeeper-3.4.10]$ bin/start-allzk.sh
[bigdata@hadoop002 kafka]$ bin/start-kafkaall.sh
在 kafka 中创建topic: ads_log0814
[bigdata@hadoop002 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop002:9092 -topic ads_log0814
创建模块spark-realtime
模块
// 尽量与Kafka版本保持一致
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
RandomNumUtil
用于生成随机数package com.buwenbuhuo.data.mock.util
import java.util.Random
import scala.collection.mutable
/**
*
* @author 不温卜火
* @create 2020-08-14 12:12
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
* 随机生成整数的工具类
*
*/
object RandomNumUtil {
val random = new Random()
/**
* 返回一个随机的整数 [from, to]
*
* @param from
* @param to
* @return
*/
def randomInt(from: Int, to: Int): Int = {
if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
// [0, to - from) + from [form, to -from + from ]
random.nextInt(to - from + 1) + from
}
/**
* 随机的Long [from, to]
*
* @param from
* @param to
* @return
*/
def randomLong(from: Long, to: Long): Long = {
if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
random.nextLong().abs % (to - from + 1) + from
}
/**
* 生成一系列的随机值
*
* @param from
* @param to
* @param count
* @param canReat 是否允许随机数重复
*/
def randomMultiInt(from: Int, to: Int, count: Int, canReat: Boolean = true): List[Int] = {
if (canReat) {
(1 to count).map(_ => randomInt(from, to)).toList
} else {
val set: mutable.Set[Int] = mutable.Set[Int]()
while (set.size < count) {
set += randomInt(from, to)
}
set.toList
}
}
def main(args: Array[String]): Unit = {
println(randomMultiInt(1, 15, 10))
println(randomMultiInt(1, 8, 10, false))
}
}
RandomOptions
用于生成带有比重的随机选项package com.buwenbuhuo.data.mock.util
import scala.collection.mutable.ListBuffer
/**
*
@author 不温卜火
* @create 2020-08-14 12:12
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
* 根据提供的值和比重, 来创建RandomOptions对象.
* 然后可以通过getRandomOption来获取一个随机的预定义的值
*
*/
object RandomOptions {
def apply[T](opts: (T, Int)*): RandomOptions[T] = {
val randomOptions = new RandomOptions[T]()
randomOptions.totalWeight = (0 /: opts) (_ + _._2) // 计算出来总的比重
opts.foreach {
case (value, weight) => randomOptions.options ++= (1 to weight).map(_ => value)
}
randomOptions
}
def main(args: Array[String]): Unit = {
// 测试
val opts = RandomOptions(("张三", 10), ("李四", 30), ("ww", 20))
println(opts.getRandomOption())
println(opts.getRandomOption())
println(opts.getRandomOption())
println(opts.getRandomOption())
println(opts.getRandomOption())
}
}
// 工程师 10 程序猿 10 老师 20
class RandomOptions[T] {
var totalWeight: Int = _
var options = ListBuffer[T]()
/**
* 获取随机的 Option 的值
*
* @return
*/
def getRandomOption() = {
options(RandomNumUtil.randomInt(0, totalWeight - 1))
}
}
CityInfo
package com.buwenbuhuo.data.mock.bean
/**
* 城市表
*
* @param city_id 城市 id
* @param city_name 城市名
* @param area 城市区域
*/
case class CityInfo(city_id: Long,
city_name: String,
area: String)
MockRealtimeData
package com.buwenbuhuo.data.mock
import java.util.Properties
import com.buwenbuhuo.data.mock.bean.CityInfo
import com.buwenbuhuo.data.mock.util.{RandomNumUtil, RandomOptions}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.mutable.ArrayBuffer
/**
*
* @author 不温卜火
* @create 2020-08-14 12:12
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
/**
* 生成实时的模拟数据
*/
object MockRealtimeData {
/*
数据格式:
timestamp area city userid adid
某个时间点 某个地区 某个城市 某个用户 某个广告
*/
def mockRealTimeData(): ArrayBuffer[String] = {
// 存储模拟的实时数据
val array = ArrayBuffer[String]()
// 城市信息
val randomOpts = RandomOptions(
(CityInfo(1, "北京", "华北"), 30),
(CityInfo(2, "上海", "华东"), 30),
(CityInfo(3, "广州", "华南"), 10),
(CityInfo(4, "深圳", "华南"), 20),
(CityInfo(5, "杭州", "华中"), 10))
(1 to 50).foreach {
i => {
val timestamp = System.currentTimeMillis()
val cityInfo = randomOpts.getRandomOption()
val area = cityInfo.area
val city = cityInfo.city_name
val userid = RandomNumUtil.randomInt(100, 105)
val adid = RandomNumUtil.randomInt(1, 5)
// 拼接成字符串
array += s"$timestamp,$area,$city,$userid,$adid"
Thread.sleep(10)
}
}
array
}
def createKafkaProducer: KafkaProducer[String, String] = {
val props: Properties = new Properties
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop002:9092,hadoop003:9092,hadoop004:9092")
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](props)
}
def main(args: Array[String]): Unit = {
val topic = "ads_log0814"
// top生产者
val producer: KafkaProducer[String, String] = createKafkaProducer
while (true) {
mockRealTimeData().foreach {
msg => {
producer.send(new ProducerRecord(topic, msg))
Thread.sleep(100)
}
}
Thread.sleep(1000)
}
}
}
// 这时候需要注释MockRealtimeData中的这两行代码
本次的分享就到这里了