专栏首页最新最全的大数据技术体系物流项目中SparkSQL的相关调优

物流项目中SparkSQL的相关调优

实时ETL开发之流计算程序【编程】

编写完成从Kafka消费数据,打印控制台上,其中创建SparkSession实例对象时,需要设置参数值。

package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.Configuration
import org.apache.commons.lang3.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 编写StructuredStreaming程序,实时从Kafka消息数据(物流相关数据和CRM相关数据),打印控制台Console
	 * 1. 初始化设置Spark Application配置
	 * 2. 判断Spark Application运行模式进行设置
	 * 3. 构建SparkSession实例对象
	 * 4. 初始化消费物流Topic数据参数
	 * 5. 消费物流Topic数据,打印控制台
	 * 6. 初始化消费CRM Topic数据参数
	 * 7. 消费CRM Topic数据,打印控制台
	 * 8. 启动流式应用,等待终止
 */
object LogisticsEtlApp {
	
	def main(args: Array[String]): Unit = {
		// step1. 构建SparkSession实例对象,设置相关属性参数值
		// 1. 初始化设置Spark Application配置
		val sparkConf = new SparkConf()
    		.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
			.set("spark.sql.session.timeZone", "Asia/Shanghai")
			.set("spark.sql.files.maxPartitionBytes", "134217728")
			.set("spark.sql.files.openCostInBytes", "134217728")
			.set("spark.sql.shuffle.partitions", "3")
			.set("spark.sql.autoBroadcastJoinThreshold", "67108864")
		// 2. 判断Spark Application运行模式进行设置
		if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
			//本地环境LOCAL_HADOOP_HOME
			System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
			//设置运行环境和checkpoint路径
			sparkConf
				.set("spark.master", "local[3]")
				.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
		} else {
			//生产环境
			sparkConf
				.set("spark.master", "yarn")
				.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
		}
		// 3. 构建SparkSession实例对象
		val spark: SparkSession = SparkSession.builder()
    		.config(sparkConf)
			.getOrCreate()
		import spark.implicits._
		
		// step2. 从Kafka实时消费数据,设置Kafka Server地址和Topic名称
		// step3. 将ETL转换后数据打印到控制台,启动流式应用
		// 4. 初始化消费物流Topic数据参数
		val logisticsDF: DataFrame = spark.readStream
			.format("kafka")
			.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
			.option("subscribe", "logistics")
			.option("maxOffsetsPerTrigger", "100000")
			.load()
		// 5. 消费物流Topic数据,打印控制台
		logisticsDF.writeStream
			.queryName("query-logistics-console")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "10")
			.option("truncate", "false")
			.start()
		
		// 6. 初始化消费CRM Topic数据参数
		val crmDF: DataFrame = spark.readStream
			.format("kafka")
			.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
			.option("subscribe", "crm")
			.option("maxOffsetsPerTrigger", "100000")
			.load()
		// 7. 消费CRM Topic数据,打印控制
		crmDF.writeStream
			.queryName("query-crm-console")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "10")
			.option("truncate", "false")
			.start()
		
		// step4. 流式应用启动以后,等待终止,关闭资源
		// 8. 启动流式应用,等待终止
		spark.streams.active.foreach(query => println("启动Query:" + query.name))
		spark.streams.awaitAnyTermination()
	}
	
}

SparkSQL 参数调优设置:

  • 1)、设置会话时区:set("spark.sql.session.timeZone", "Asia/Shanghai")
  • 2)、设置读取文件时单个分区可容纳的最大字节数 set("spark.sql.files.maxPartitionBytes", "134217728")
  • 3)、设置合并小文件的阈值:set("spark.sql.files.openCostInBytes", "134217728")
  • 4)、设置 shuffle 分区数:set("spark.sql.shuffle.partitions", "4")
  • 5)、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小 set("spark.sql.autoBroadcastJoinThreshold", "67108864")
本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!
本文分享自作者个人站点/博客:https://blog.csdn.net/xianyu120复制
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • SQL调优和诊断工具之SQL 相关的动态视图

    动态视图可以通过查询相关的动态视图,来查看最近执行过SQL的依然还内存中的执行计划和一些统计信息。

    SQLplusDB
  • 深度学习中的文本分类方法汇总相关代码及调优trick

    Fasttext是Facebook推出的一个便捷的工具,包含文本分类和词向量训练两个功能。

    大鹅
  • delete相关的pl/sql调优(r4笔记第87天)

    今天开发找到我,说有个问题想征求一下我的意见。 问题的大体意思是,对目前环境中的两个表,我们就叫做表a,表b吧,他说根据一个时间字段去判断是否为5天前的记录,...

    jeanron100
  • 0基础学习大数据路线,0基础大数据开发课程大纲

    随着大数据炒的越来越火热,很多大学已经陆续开设了大数据相关课程。0基础学习大数据路线是什么呢?加米谷大数据理论+代码+实战+实操的独有课程体系,下面是加米谷的0...

    加米谷大数据
  • 循序渐进调优union相关的sql(r2笔记23天)

    今天在生产中发现一条sql语句消耗了大量的cpu资源。使用top -c来查看。 PID USER PR NI VIRT RES SHR S...

    jeanron100
  • SparkSQL的应用实践和优化实战

    场景描述:面对大量复杂的数据分析需求,提供一套稳定、高效、便捷的企业级查询分析服务具有重大意义。本次演讲介绍了字节跳动基于SparkSQL建设大数据查询统一服务...

    王知无-import_bigdata
  • 客快物流大数据项目(五十四):初始化Spark流式计算程序

    4、​​​​​​​设置 join 或aggregate洗牌(shuffle)数据时使用的分区数

    Lanson
  • Spark性能调优九之常用算子调优

            前面介绍了很多关于Spark性能的调优手段,今天来介绍一下Spark性能调优的最后一个点,就是关于Spark中常用算子的调优。废话不多说,直接进...

    z小赵
  • 【视频】大数据实战工具Spark 共64讲

    学习目标 1. 学习Spark配置,掌握Spark集群部署; 2. 学习RDD和Scala,掌握Spark调优和应用开发; 3. 掌握Spark Streami...

    小莹莹
  • 被虐后,分享一点点JVM调优原理相关的知识和经验

    首先我们要对上述的内容有一定的了解,从全局出发。看了上图,在调优中我们能做的也就是对运行时数据区进行一些操作,然后选择执行引擎用何种垃圾收集器对垃圾进行回收。

    全栈程序员站长
  • Note_Logistics_Day01(客快物流项目概述及Docker入门)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RadClEeU-1625444773065)(/img/161551669086...

    ChinaManor
  • kylin调优,项目中错误总结,知识点总结,kylin jdbc driver + 数据库连接池druid + Mybatis项目中的整合,shell脚本执行kylin restapi 案例

    该机制用于数据的容错和恢复: 每个HRegionServer中都有一个HLog对象,HLog是一个实现Write Ahead Log的类,在每次用户操...

    Java架构师必看
  • 入门大数据必读

    Spark学习技巧
  • 2018年Java学习体系

    一、JavaSE 1、Java开发环境搭建 2、Java基础语法 3、Java面向对象 4、异常 5、数组/算法 6、常用类 7、集合/数据结构 8、IO流 9...

    用户1220053
  • 转型【数仓开发】该怎么学

    知道概念—>学习理论—>大量练习—>逐渐清晰—>再大量练习—>清晰—>熟练运用—>融汇贯通

    数据仓库践行者
  • sparksql调优之第一弹

    1,jvm调优 这个是扯不断,理还乱。建议能加内存就加内存,没事调啥JVM,你都不了解JVM和你的任务数据。 spark调优系列之内存和GC调优 2,内存调优...

    Spark学习技巧
  • 十年项目经验面试官亲传大数据面试__大数据面试独孤九剑

    本项目涉及的业务数据包括订单、运输、仓储、搬运装卸等物流环节中涉及的数据、信息。由于多年的积累、庞大的用户群,每日的订单数上千万,传统的数据处理技术已无法满足企...

    ChinaManor
  • 这可能是你见过大数据岗位最全,最规范的面试准备大纲 !(建议收藏)

    本篇博客所分享的知识非常硬核,建议各位看官(尤其是大数据专业的同学啊),赶紧搬好小板凳,带好西瓜,我们边看边吃瓜。

    大数据梦想家

扫码关注腾讯云开发者

领取腾讯云代金券