前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >客快物流大数据项目(五十六): 编写SparkSession对象工具类

客快物流大数据项目(五十六): 编写SparkSession对象工具类

作者头像
Lansonli
发布2022-03-07 10:48:02
3980
发布2022-03-07 10:48:02
举报
文章被收录于专栏:Lansonli技术博客

编写SparkSession对象工具类

后续业务开发过程中,每个子业务(kudu、es、clickhouse等等)都会创建SparkSession对象,以及初始化开发环境,因此将环境初始化操作封装成工具类,方便后续使用

实现步骤:

  • 公共模块scala目录的common程序包下创建 SparkUtils 单例对象
  • 实现方法:创建SparkConf对象
  • 实现方法:预定义当前环境的运行模式
  • 实现方法:创建获取SparkSession对象
代码语言:javascript
复制
package cn.it.logistics.common

import org.apache.commons.lang.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * spark操作的工具类
 */
object SparkUtils {
  /**
   * 创建sparkConf对象
   */
  lazy val sparkConf = (appName:String) =>{
    val conf: SparkConf = new SparkConf()
      //设置应用的名称
      .set("spark.app.name", this.getClass.getSimpleName)
      //设置时区
      .set("spark.sql.session.timeZone", "Asia/Shanghai")
      //设置单个分区可容纳的最大字节数,默认是128M, 等同于block块的大小
      .set("spark.sql.files.maxPartitionBytes", "134217728")
      //设置合并小文件的阈值,避免每个小文件占用一个分区的情况
      .set("spark.sql.files.openCostInBytes", "134217728")
      //设置join或者shuffle的时候使用的分区数,默认情况下分区数是200
      .set("spark.sql.shuffle.partitions", "600")
      //设置join操作时可以广播到worker节点的最大字节大小,可以避免shuffer操作
      .set("spark.sql.autoBroadcastJoinThreshold", "67108864")

    //返回sparkConf对象
    conf
  }

  /**
   * 预定义可用于window和linux中的运行模式
   */
  lazy val autoSettingEnv = (sparkConf:SparkConf) =>{
    //本地运行环境
    if(SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC){
      //本地环境LOCAL_HADOOP_HOME
      System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
      //设置运行环境和checkpoint路径
      sparkConf.set("spark.master", "local[*]")
        .set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppWinCheckpointDir)
        .set("spark.sql.warehouse.dir", Configuration.sparkAppWinDataDir)
    }else{
      //集群运行环境(生产环境)
      //生产环境
      sparkConf.set("spark.master", "yarn")
        .set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppDfsCheckpointDir)
        .set("spark.sql.warehouse.dir", Configuration.sparkAppDfsDataDir)
    }

    //返回sparkConf对象
    sparkConf
  }

  /**
   * 创建sparkSession对象
   * @param sparkConf
   */
  def getSparkSession(sparkConf: SparkConf) = {
    SparkSession.builder().config(sparkConf).getOrCreate()
  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 编写SparkSession对象工具类
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档