前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >客快物流大数据项目(五十四):初始化Spark流式计算程序

客快物流大数据项目(五十四):初始化Spark流式计算程序

作者头像
Lansonli
发布2022-03-04 10:02:51
8820
发布2022-03-04 10:02:51
举报
文章被收录于专栏:Lansonli技术博客

目录

初始化Spark流式计算程序

一、SparkSql参数调优设置 

1、设置会话时区

2、​​​​​​​设置读取文件时单个分区可容纳的最大字节数

3、设置合并小文件的阈值

4、​​​​​​​设置 join 或aggregate洗牌(shuffle)数据时使用的分区数

5、​​​​​​​设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小

二、测试数据是否可以消费成功

初始化Spark流式计算程序

实现步骤:

  • etl模块realtime目录创建 App 单例对象,初始化 spark 运行环境
  • 创建main方法
  • 编写代码
    • 初始化spark环境参数
    • 消费kafka的ogg数据
    • 消费kafka的canal数据
    • 打印kafka的数据

参考代码:

代码语言:javascript
复制
package cn.it.logistics.etl.realtime

import cn.it.logistics.common.Configuration
import org.apache.commons.lang.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 测试消费kafka的数据
 * 1)物流相关的数据
 * 2)客户关系管理系统的数据
 */
object App {
  /**
   * 入口函数
   *
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化spark的运行环境
     * 2)判断当前的运行环境(local/linux运行环境)
     * 3)创建sparkSession对象
     * 4)初始化物流topic数据的连接参数
     * 5)初始化客户关系系统topic数据的连接参数
     * 6)消费oracle->ogg->kafka的topic数据
     * 7)消费mysql->canal->kafka的topic数据
     * 8)启动运行等待停止
     */
    //1)初始化spark的运行环境
    val conf: SparkConf = new SparkConf()
      //设置应用的名称
      .set("spark.app.name", this.getClass.getSimpleName)
      //设置时区
      .set("spark.sql.session.timeZone", "Asia/Shanghai")
      //设置单个分区可容纳的最大字节数,默认是128M, 等同于block块的大小
      .set("spark.sql.files.maxPartitionBytes", "134217728")
      //设置合并小文件的阈值,避免每个小文件占用一个分区的情况
      .set("spark.sql.files.openCostInBytes", "134217728")
      //设置join或者shuffle的时候使用的分区数,默认情况下分区数是200
      .set("spark.sql.shuffle.partitions", "600")
      //设置join操作时可以广播到worker节点的最大字节大小,可以避免shuffer操作
      .set("spark.sql.autoBroadcastJoinThreshold", "67108864")

    //2)判断当前的运行环境(local/linux运行环境)
    if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
      //本地环境LOCAL_HADOOP_HOME
      System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
      //设置运行环境和checkpoint路径
      conf.set("spark.master", "local[*]").set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppWinCheckpointDir)
    } else {
      //生产环境
      conf.set("spark.master", "yarn").set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppDfsCheckpointDir)
    }

    //3)创建sparkSession对象
    val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //4)初始化物流topic数据的连接参数
    val logisticsKafkaParams: Map[String, String] = Map[String, String](
      "kafka.bootstrap.servers" -> Configuration.kafkaAddress,
      "subscribe" -> Configuration.kafkaLogisticsTopic,
      "group.id" -> "logistics",
      //表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
      "failOnDataLoss" -> "false"
    )

    //5)初始化客户关系系统topic数据的连接参数
    val crmKafkaParams: Map[String, String] = Map[String, String](
      "kafka.bootstrap.servers" -> Configuration.kafkaAddress,
      "subscribe" -> Configuration.kafkaCrmTopic,
      "group.id" -> "logistics",
      //表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
      "failOnDataLoss" -> "false"
    )

    //导入隐士转换
    import sparkSession.implicits._

    //6)消费oracle->ogg->kafka的topic数据
    val logisticsDF: DataFrame = sparkSession.readStream.format("kafka").options(logisticsKafkaParams).load().selectExpr("CAST(value AS STRING)").as[String].toDF()

    //7)消费mysql->canal->kafka的topic数据
    val crmDF: DataFrame = sparkSession.readStream.format("kafka").options(crmKafkaParams).load().selectExpr("CAST(value AS STRING)").as[String].toDF()

    //输出数据
   logisticsDF.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()
    crmDF.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()

    //8)启动运行等待停止
    val stream = sparkSession.streams
    //stream.active:获取当前活动流式查询的列表
    stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
    //线程阻塞,等待终止
    stream.awaitAnyTermination()
  }
}

一、SparkSql参数调优设置 

1、​​​​​​​设置会话时区

会话本地时区的ID

代码语言:javascript
复制
.set("spark.sql.session.timeZone", "Asia/Shanghai")

会话时区使用配置'spark.sql.session.timeZone'设置,如果未设置,将默认为JVM系统本地时区

2、​​​​​​​设置读取文件时单个分区可容纳的最大字节数

读取文件时单个分区可容纳的最大字节数,默认128M,等同于Block块大小

代码语言:javascript
复制
.set("spark.sql.files.maxPartitionBytes", "134217728")

3、设置合并小文件的阈值

用相同时间内可以扫描的数据的大小来衡量打开一个文件的开销。当将多个文件写入同一个分区的时候该参数有用。

该值设置大一点有好处,有小文件的分区会比大文件分区处理速度更快(优先调度),默认是4M

说直白一些这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并,防止太多单个小文件占一个分区情况。

代码语言:javascript
复制
.set("spark.sql.files.openCostInBytes", "134217728")

4、​​​​​​​设置 join 或aggregate洗牌(shuffle)数据时使用的分区数

对于SparkSQL,还有一个比较重要的参数,就是shuffle时候的Task数量,通过spark.sql.shuffle.partitions来调节。调节的基础是spark集群的处理能力和要处理的数据量,spark的默认值是200。Task过多,会产生很多的任务启动开销,Task多少,每个Task的处理时间过长,容易straggle(掉队)

代码语言:javascript
复制
.set("spark.sql.shuffle.partitions", "600")

5、​​​​​​​设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小

对于broadcast join模式,会将小于spark.sql.autoBroadcastJoinThreshold值(默认为10M)的表广播到其他计算节点,不走shuffle过程,所以会更加高效。

代码语言:javascript
复制
.set("spark.sql.autoBroadcastJoinThreshold", "67108864")

否则会报如下错误:

Exception in thread “broadcast-exchange-0” java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes 

原因:

从问题来分析说是内存溢出了,也就是说明广播内存不够用,即使不断设整任务的内存资源,无论是executor还是driver的内存都分配多一倍了,但是还是不起作用。

所以这个配置的最大字节大小是用于当执行连接时,该表将广播到所有工作节点。通过将此值设置为-1,广播可以被禁用。

二、测试数据是否可以消费成功

测试步骤:

  • 启动docker并启动Order和Mysql数据库(包含OGG服务和Canal-server服务
  • 启动造数程序(位于logistics-generate项目下的cn.it.logistics.generate.App类
  • 启动App单例对象
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 初始化Spark流式计算程序
    • 一、SparkSql参数调优设置 
      • 1、​​​​​​​设置会话时区
      • 2、​​​​​​​设置读取文件时单个分区可容纳的最大字节数
      • 3、设置合并小文件的阈值
      • 4、​​​​​​​设置 join 或aggregate洗牌(shuffle)数据时使用的分区数
      • 5、​​​​​​​设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小
    • 二、测试数据是否可以消费成功
    相关产品与服务
    数据库
    云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档