前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

Spark Streaming 项目实战(1) | 生成随机数据并写入到Kafka中

作者头像
不温卜火
发布2020-10-28 17:37:37
2.6K0
发布2020-10-28 17:37:37
举报
文章被收录于专栏:不温卜火不温卜火

本实战项目使用 Structured Streaming 来实时的分析处理用户对广告点击的行为数据.

一. 数据生成方式

  使用代码的方式持续的生成数据, 然后写入到 kafka 中.

  然后Structured Streaming 负责从 kafka 消费数据, 并对数据根据需求进行分析.

二. 数据生成模块

模拟出来的数据格式:

代码语言:javascript
复制
时间戳,地区,城市,用户 id,广告 id
1566035129449,华南,深圳,101,2

1. 开启集群

代码语言:javascript
复制
// 启动 zookeeper 和 Kafka
[bigdata@hadoop002 zookeeper-3.4.10]$ bin/start-allzk.sh 
[bigdata@hadoop002 kafka]$ bin/start-kafkaall.sh 
3
3

2. 创建 Topic

在 kafka 中创建topic: ads_log0814

代码语言:javascript
复制
[bigdata@hadoop002 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop002:9092 -topic ads_log0814
9
9

3. 产生循环不断的数据到指定的 topic

创建模块spark-realtime模块

  • 1. 导入依赖:
代码语言:javascript
复制
// 尽量与Kafka版本保持一致

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.2</version>
</dependency>
4
4
  • 2. 工具类: RandomNumUtil 用于生成随机数
代码语言:javascript
复制
package com.buwenbuhuo.data.mock.util
import java.util.Random

import scala.collection.mutable

/**
 *
 *  @author 不温卜火
 *  @create 2020-08-14 12:12
 *  MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *  随机生成整数的工具类
 *
 */
object RandomNumUtil {
  val random = new Random()

  /**
   * 返回一个随机的整数 [from, to]
   *
   * @param from
   * @param to
   * @return
   */
  def randomInt(from: Int, to: Int): Int = {
    if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
    // [0, to - from)  + from [form, to -from + from ]
    random.nextInt(to - from + 1) + from
  }

  /**
   * 随机的Long  [from, to]
   *
   * @param from
   * @param to
   * @return
   */
  def randomLong(from: Long, to: Long): Long = {
    if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
    random.nextLong().abs % (to - from + 1) + from
  }

  /**
   * 生成一系列的随机值
   *
   * @param from
   * @param to
   * @param count
   * @param canReat 是否允许随机数重复
   */
  def randomMultiInt(from: Int, to: Int, count: Int, canReat: Boolean = true): List[Int] = {
    if (canReat) {
      (1 to count).map(_ => randomInt(from, to)).toList
    } else {
      val set: mutable.Set[Int] = mutable.Set[Int]()
      while (set.size < count) {
        set += randomInt(from, to)
      }
      set.toList
    }
  }


  def main(args: Array[String]): Unit = {
    println(randomMultiInt(1, 15, 10))
    println(randomMultiInt(1, 8, 10, false))
  }
}
5
5
  • 3. 工具类: RandomOptions 用于生成带有比重的随机选项
代码语言:javascript
复制
package com.buwenbuhuo.data.mock.util
import scala.collection.mutable.ListBuffer
/**
 *
    @author 不温卜火
 *  @create 2020-08-14 12:12
 *  MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 * 根据提供的值和比重, 来创建RandomOptions对象.
 * 然后可以通过getRandomOption来获取一个随机的预定义的值

 *
 */
object RandomOptions {
  def apply[T](opts: (T, Int)*): RandomOptions[T] = {
    val randomOptions = new RandomOptions[T]()
    randomOptions.totalWeight = (0 /: opts) (_ + _._2) // 计算出来总的比重
    opts.foreach {
      case (value, weight) => randomOptions.options ++= (1 to weight).map(_ => value)
    }
    randomOptions
  }


  def main(args: Array[String]): Unit = {
    // 测试
    val opts = RandomOptions(("张三", 10), ("李四", 30), ("ww", 20))

    println(opts.getRandomOption())
    println(opts.getRandomOption())
    println(opts.getRandomOption())
    println(opts.getRandomOption())
    println(opts.getRandomOption())
  }
}

// 工程师 10  程序猿 10  老师 20
class RandomOptions[T] {
  var totalWeight: Int = _
  var options = ListBuffer[T]()
  /**
   * 获取随机的 Option 的值
   *
   * @return
   */
  def getRandomOption() = {
    options(RandomNumUtil.randomInt(0, totalWeight - 1))
  }
}
6
6
  • 4. 样例类: CityInfo
代码语言:javascript
复制
package com.buwenbuhuo.data.mock.bean

/**
 * 城市表
 *
 * @param city_id   城市 id
 * @param city_name 城市名
 * @param area      城市区域
 */
case class CityInfo(city_id: Long,
                    city_name: String,
                    area: String)
  • 5. 生成模拟数据: MockRealtimeData
代码语言:javascript
复制
package com.buwenbuhuo.data.mock
import java.util.Properties

import com.buwenbuhuo.data.mock.bean.CityInfo
import com.buwenbuhuo.data.mock.util.{RandomNumUtil, RandomOptions}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.collection.mutable.ArrayBuffer

/**
 *
 *  @author 不温卜火
 *  @create 2020-08-14 12:12
 *  MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */


/**
 * 生成实时的模拟数据
 */
object MockRealtimeData {
  /*
  数据格式:
  timestamp area city userid adid
  某个时间点 某个地区 某个城市 某个用户 某个广告

  */
  def mockRealTimeData(): ArrayBuffer[String] = {
    // 存储模拟的实时数据
    val array = ArrayBuffer[String]()
    // 城市信息
    val randomOpts = RandomOptions(
      (CityInfo(1, "北京", "华北"), 30),
      (CityInfo(2, "上海", "华东"), 30),
      (CityInfo(3, "广州", "华南"), 10),
      (CityInfo(4, "深圳", "华南"), 20),
      (CityInfo(5, "杭州", "华中"), 10))
    (1 to 50).foreach {
      i => {
        val timestamp = System.currentTimeMillis()
        val cityInfo = randomOpts.getRandomOption()

        val area = cityInfo.area
        val city = cityInfo.city_name
        val userid = RandomNumUtil.randomInt(100, 105)
        val adid = RandomNumUtil.randomInt(1, 5)
        // 拼接成字符串
        array += s"$timestamp,$area,$city,$userid,$adid"
        Thread.sleep(10)
      }
    }
    array
  }

  def createKafkaProducer: KafkaProducer[String, String] = {
    val props: Properties = new Properties
    // Kafka服务端的主机名和端口号
    props.put("bootstrap.servers", "hadoop002:9092,hadoop003:9092,hadoop004:9092")
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    new KafkaProducer[String, String](props)
  }

  def main(args: Array[String]): Unit = {
    val topic = "ads_log0814"
    // top生产者
    val producer: KafkaProducer[String, String] = createKafkaProducer
    while (true) {
      mockRealTimeData().foreach {
        msg => {
          producer.send(new ProducerRecord(topic, msg))
          Thread.sleep(100)
        }
      }
      Thread.sleep(1000)
    }
  }
}
  • 6. 先看一下随机生成的数据
代码语言:javascript
复制
// 这时候需要注释MockRealtimeData中的这两行代码
7
7
8
8

4. 确认 kafka 中数据是否生成成功

9
9

  本次的分享就到这里了

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-08-22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. 数据生成方式
  • 二. 数据生成模块
    • 1. 开启集群
      • 2. 创建 Topic
        • 3. 产生循环不断的数据到指定的 topic
          • 4. 确认 kafka 中数据是否生成成功
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档