首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

SparkSQL执行时参数优化

近期接手了不少大数据表任务调度补数据的工作,补数时发现资源消耗异常的大且运行速度却不怎么给力. 发现根本原因在于sparkSQL配置有诸多问题,解决后总结出来就当抛砖引玉了....2G 以下为SparkSQL调优相关设置 以下列表中动态资源分配相关不建议使用 //1.下列Hive参数对Spark同样起作用。...//2.运行行为 set spark.sql.autoBroadcastJoinThreshold; // 大表 JOIN 小表,小表做广播的阈值 set spark.dynamicAllocation.enabled...后,最小的分区数 set spark.Hadoop.mapreduce.input.fileinputformat.split.maxsize; //当几个stripe的大小大于该值时,会合并到一个task...set spark.sql.windowExec.buffer.spill.threshold; //当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时

1.4K10

客快物流大数据项目(五十四):初始化Spark流式计算程序

topic数据 * 8)启动运行等待停止 */ //1)初始化spark的运行环境 val conf: SparkConf = new SparkConf()...//设置join操作时可以广播到worker节点的最大字节大小,可以避免shuffer操作 .set("spark.sql.autoBroadcastJoinThreshold", "67108864...、​​​​​​​设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小 对于broadcast join模式,会将小于spark.sql.autoBroadcastJoinThreshold...值(默认为10M)的表广播到其他计算节点,不走shuffle过程,所以会更加高效。...所以这个配置的最大字节大小是用于当执行连接时,该表将广播到所有工作节点。通过将此值设置为-1,广播可以被禁用。

92331
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    SparkSQL的3种Join实现

    被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M (或者加了broadcast join的hint) 2....这个方案只能用于广播较小的表,否则数据的冗余传输就远大于shuffle的开销;另外,广播时需要将被广播的表现collect到driver端,当频繁有广播出现时,对driver的内存也是一个考验。...SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M。...分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M 2. 基表不能被广播,比如left outer join时,只能广播右表 3....经过上文的分析,可以明确每种Join算法都有自己的适用场景,数据仓库设计时最好避免大表与大表的join查询,SparkSQL也可以根据内存资源、带宽资源适量将参数spark.sql.autoBroadcastJoinThreshold

    3.5K30

    Spark join种类(>3种)及join选择依据

    做过Spark/flink流处理的应该都用过一种流表和维表的join,维表对于Spark来说可以是driver端获取后广播到每个Executor,然后在executor端执行流表task的时候join,...Flink进行维表join可以用的方式比较多了,比如直接open方法里从外部加载的静态hashmap,这种就无法更新,因为Flink不像Spark可以每个批次或者若干批次加载一次维表。...上面所说的就是比较常见的hashjoin的简单表达,将维表通过join的条件key构建为一个hashtable,就拿java 8的HashMap来说吧,就是一个数组+链表(链表过长会变为红黑树),数组下标就是...先判断,假设join的表统计信息现实,一张表大小大于0,且小于等于用户配置的自动广播阈值则,采用广播。...假设两张表都满足广播需求,选最小的。

    1K30

    Spark SQL如何选择join策略

    在了解join策略选择之前,首先看几个先决条件: 1. build table的选择 Hash Join的第一步就是根据两表之中较小的那一个构建哈希表,这个小表就叫做build table,大表则称为...build table的条件,join类型需满足(第1种是在业务开发中写的SQL主要适配的): 1....满足什么条件的表才能被广播 如果一个表的大小小于或等于参数spark.sql.autoBroadcastJoinThreshold(默认10M)配置的值,那么就可以广播该表。...,我们也可以通过直接在Spark SQL中显示使用hint方式(/*+ BROADCAST(small_table) */),直接指定要广播的表,源码如下: private def canBroadcastByHints...size小于spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默认200)时,即可构造本地HashMap plan.stats.sizeInBytes

    1.2K20

    Spark Adaptive Execution调研

    这种做法就是MapJoin,在Spark中,也叫做BroadcastHashJoin。原理是将小表数据以broadcast变量加载到内存,然后广播到各个Executor上,直接在map中做join。...假设表A(1M)和表B(4G)做join时,并已经进行了Shuffle Write,转换成BroadcastHashJoin的过程如下: 将表A的数据加载成broadcast 假设上游表B有5个partition...如果不设置该参数,该阈值与 spark.sql.autoBroadcastJoinThreshold 的值相等 3、自动处理数据倾斜 还是在Shuffle Write之后解决问题。...比如表A和表B做join,表A在shuffle write完,partition 0有4G的数据,其他partition都只有1,200M。...如果一个 Partition 的大小大于 spark.sql.adaptive.skewedPartitionSizeThreshold 的同时大于各 Partition 大小中位数与该因子的乘积,或者行数大于

    1.9K10

    工作经验分享:Spark调优【优化后性能提升1200%】

    背景 业务数据不断增大, Spark运行时间越来越长, 从最初的半小时到6个多小时 某日Spark程序运行6.5个小时后, 报“Too large frame...”的异常 org.apache.spark.shuffle.FetchFailedException...抛出异常的原因 Spark uses custom frame decoder (TransportFrameDecoder) which does not support frames larger...异常,就是发生在业务数据处理的最后一步left join操作 2.2....“脏数据”(非法数据) 业务无关的数据 3.分析join操作, 左右表的特征, 判断是否可以进行小表广播 broadcast (1)这样可避免shuffle操作,特别是当大表特别大 (2)默认情况下,...join时候, 如果表的数据量低于spark.sql.autoBroadcastJoinThreshold参数值时(默认值为10 MB), spark会自动进行broadcast, 但也可以通过强制手动指定广播

    1.9K10

    【大数据】Spark优化经验&案例--数据倾斜

    十秒看完 1.业务处理中存在复杂的多表关联和计算逻辑(原始数据达百亿数量级) 2.优化后,spark计算性能提升了约12倍(6h-->30min) 3.最终,业务的性能瓶颈存在于ES写入(计算结果,ES...背景 业务数据不断增大, Spark运行时间越来越长, 从最初的半小时到6个多小时 某日Spark程序运行6.5个小时后, 报“Too large frame...”的异常 org.apache.spark.shuffle.FetchFailedException...异常,就是发生在业务数据处理的最后一步left join操作 2.2....非法数据) 业务无关的数据 分析join操作, 左右表的特征, 判断是否可以进行小表广播 broadcast 这样可避免shuffle操作,特别是当大表特别大 默认情况下, join时候, 如果表的数据量低于...spark.sql.autoBroadcastJoinThreshold参数值时(默认值为10 MB), spark会自动进行broadcast, 但也可以通过强制手动指定广播 visitor_df.join

    3.1K85

    Spark 3.0 新特性 之 自适应查询与分区动态裁剪

    说起这个可以先回想下Spark的发展历史,在1.x时代Spark通过RDD的编程形成DAG图,这个阶段可以说没啥优化完全是按照规则来执行;在2.x时代,引入了代价计算,Spark会通过提前进行代价计算,...像Spark会配置一个参数 spark.sql.autoBroadcastJoinThreshold 来决定小于这个配置的表就认为是小表,然后采用广播策略(默认10MB)。...一般广播的套路是把小表拷贝到driver端,然后分发到每个executor工作节点上,因此如果表的数据太大,会导致来回复制的数据太多,性能低下,因此BHJ仅适用于广播小表。...比如某个表初始的时候15M,达不到广播join的要求,但是该表在查询过程中有个filter条件可以让表仅保留8M的有效数据,此时就可以采用广播join了。...其他方面由于工作内容涉及的不多,因此就先不过多整理了,感兴趣可以去官网或者观看上面的分享视频。需要额外一提的是,官方文档也有两个很重要的调整: 1 增加了SQL相关的文档 ?

    1.6K30

    sparksql调优之第一弹

    1,jvm调优 这个是扯不断,理还乱。建议能加内存就加内存,没事调啥JVM,你都不了解JVM和你的任务数据。 spark调优系列之内存和GC调优 2,内存调优 缓存表 spark2....+采用: spark.catalog.cacheTable("tableName")缓存表,spark.catalog.uncacheTable("tableName")解除缓存。 spark 1....属性 默认值 描述 spark.sql.broadcastTimeout 300 广播等待超时时间,单位秒 spark.sql.autoBroadcastJoinThreshold 10485760 (...设置为-1可以禁止该功能。当前统计信息仅支持Hive Metastore表 广播的变量的使用其实,有时候没啥用处。在任务超多,夸stage使用数据的时候才能凸显其真正作用。...任务一趟跑完了,其实广播不广播无所谓了。。。 4,分区数据的调控 分区设置spark.sql.shuffle.partitions,默认是200.

    3K80

    Adaptive Execution 让 Spark SQL 更高效更智能

    另外,如果小 RDD 过大,无法存于 Executor 内存中,则无法使用 BroadcastJoin 对于基础表的 Join,可在生成执行计划前,直接通过 HDFS 获取各表的大小,从而判断是否适合使用...但对于中间表的 Join,无法提前准确判断中间表大小从而精确判断是否适合使用 BroadcastJoin 《Spark SQL 性能优化再进一步 CBO 基于代价的优化》一文介绍的 CBO 可通过表的统计信息与各操作对数据统计信息的影响...Stage 0 每个 Task 的 Shuffle Write 数据,同时与广播得到的 Stage 1 的全量数据进行 Join 注:广播数据存于每个 Executor 中,其上所有 Task 共享,...如果不设置该参数,该阈值与 spark.sql.autoBroadcastJoinThreshold 的值相等 除了本文所述 SortMergeJoin 转 BroadcastJoin,Adaptive...如果一个 Partition 的大小大于 spark.sql.adaptive.skewedPartitionSizeThreshold 的同时大于各 Partition 大小中位数与该因子的乘积,或者行数大于

    1K10

    【Spark重点难点08】Spark3.0中的AQE和DPP小总结

    由于这个原因,当 Spark 估计参加 join 的表数据量小于广播大小的阈值时,其会将 Join 策略调整为 Broadcast Hash Join。...比如下面这个例子,右表的实际大小为15M,而在该场景下,经过filter过滤后,实际参与join的数据大小为8M,小于了默认broadcast阈值10M,应该被广播。...我们可以看下这个场景,Table A join Table B,其中Table A的partition A0数据远大于其他分区。...如果不做这个优化,SMJ将会产生4个tasks并且其中一个执行时间远大于其他。经优化,这个join将会有5个tasks,但每个task执行耗时差不多相同,因此个整个查询带来了更好的性能。...但是使用DPP的前提条件比较苛刻,需要满足以下条件: 事实表必须是分区表 只支持等值Join 维度表过滤之后的数据必须小于广播阈值:spark.sql.autoBroadcastJoinThreshold

    2.9K41

    Spark调优 | 不可避免的 Join 优化

    广播到每个计算节点,然后将buildIter放到hash表中,如下图所示。...这个不用我们担心,spark sql自动帮我们完成,当buildIter的估计大小不超过参数spark.sql.autoBroadcastJoinThreshold设定的值(默认10M),那么就会自动采用...hash join实现 除了上面两种join实现方式外,spark还提供了hash join实现方式,在shuffle read阶段不对记录排序,反正来自两格表的具有相同key的记录会在同一个分区,只是在分区内不排序...: buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件; 开启尝试使用hash join的开关,...spark.sql.join.preferSortMergeJoin=false; 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle

    4.6K20

    Spark SQL 之 Join 实现

    广播到每个计算节点,然后将buildIter放到hash表中,如下图所示。...这个不用我们担心,spark sql自动帮我们完成,当buildIter的估计大小不超过参数spark.sql.autoBroadcastJoinThreshold设定的值(默认10M),那么就会自动采用...hash join实现 除了上面两种join实现方式外,spark还提供了hash join实现方式,在shuffle read阶段不对记录排序,反正来自两格表的具有相同key的记录会在同一个分区,只是在分区内不排序...: buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin...=false 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中

    9.5K1111

    【硬刚大数据】从零到大数据专家面试篇之SparkSQL篇

    2.容易导致task数过多,如果超过参数spark.driver.maxResultSize的配置(默认1g),会抛出类似如下的异常,影响任务的处理 Caused by: org.apache.spark.SparkException...满足什么条件的表才能被广播 如果一个表的大小小于或等于参数spark.sql.autoBroadcastJoinThreshold(默认10M)配置的值,那么就可以广播该表。...它的工作方式是循环从一张表(outer table)中读取数据,然后访问另一张表(inner table,通常有索引),将outer表中的每一条数据与inner表中的数据进行join,类似一个嵌套的循环并且在循环的过程中进行数据的比对校验是否满足一定条件...而Spark SQL中的BroadcastNestedLoopJoin就类似于Nested Loop Join,只不过加上了广播表(build table)而已。...Spark SQL是否产生了笛卡尔积 以join语句不指定on条件产生笛卡尔积的SQL为例: -- test_partition1和test_partition2是Hive分区表 select * from

    2.4K30
    领券