前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark难点 | Join的实现原理

Spark难点 | Join的实现原理

作者头像
王知无-import_bigdata
发布2019-11-18 00:33:23
1.5K0
发布2019-11-18 00:33:23
举报

Join背景

当前SparkSQL支持三种join算法:Shuffle Hash Join、Broadcast Hash Join以及Sort Merge Join。其中前两者归根到底都属于Hash Join,只不过载Hash Join之前需要先Shuffle还是先Broadcast。其实,Hash Join算法来自于传统数据库,而Shuffle和Broadcast是大数据在分布式情况下的概念,两者结合的产物。因此可以说,大数据的根就是传统数据库。Hash Join是内核。

Spark Join的分类和实现机制

上图是Spark Join的分类和使用。

Hash Join

先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,参与join的两张表是order和item,join key分别是item.id以及order.i_id。现在假设Join采用的是hash join算法,整个过程会经历三步:

  • 确定Build Table以及Probe Table:这个概念比较重要,Build Table会被构建成以join key为key的hash table,而Probe Table使用join key在这张hash table表中寻找符合条件的行,然后进行join链接。Build表和Probe表是Spark决定的。通常情况下,小表会被作为Build Table,较大的表会被作为Probe Table。
  • 构建Hash Table:依次读取Build Table(item)的数据,对于每一条数据根据Join Key(item.id)进行hash,hash到对应的bucket中(类似于HashMap的原理),最后会生成一张HashTable,HashTable会缓存在内存中,如果内存放不下会dump到磁盘中。
  • 匹配:生成Hash Table后,在依次扫描Probe Table(order)的数据,使用相同的hash函数(在spark中,实际上就是要使用相同的partitioner)在Hash Table中寻找hash(join key)相同的值,如果匹配成功就将两者join在一起。
Broadcast Hash Join

当Join的一张表很小的时候,使用broadcast hash join。

Broadcast Hash Join的条件有以下几个:

  • 被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的信息,默认是10M;
  • 基表不能被广播,比如left outer join时,只能广播右表。

broadcast hash join可以分为两步:

  • broadcast阶段:将小表广播到所有的executor上,广播的算法有很多,最简单的是先发给driver,driver再统一分发给所有的executor,要不就是基于bittorrete的p2p思路;
  • hash join阶段:在每个executor上执行 hash join,小表构建为hash table,大表的分区数据匹配hash table中的数据。
Sort Merge Join

当两个表都非常大时,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join。这种方式不用将一侧数据全部加载后再进行hash join,但需要在join前将数据进行排序。

首先将两张表按照join key进行重新shuffle,保证join key值相同的记录会被分在相应的分区,分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接。可以看出,无论分区有多大,Sort Merge Join都不用把一侧的数据全部加载到内存中,而是即用即丢;因为两个序列都有有序的,从头遍历,碰到key相同的就输出,如果不同,左边小就继续取左边,反之取右边。从而大大提高了大数据量下sql join的稳定性。

整个过程分为三个步骤:

  • shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理
  • sort阶段:对单个分区节点的两表数据,分别进行排序
  • merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则继续取更小一边的key。

经过上文的分析,很明显可以得出这几种join的代价关系:cost(Broadcast Hash Join)< cost(Shuffle Hash Join) < cost(Sort Merge Join),数据仓库设计时最好避免大表与大表的join查询,SparkSQL也可以根据内存资源、带宽资源适量将参数spark.sql. autoBroadcastJoinThreshold调大,让更多join实际执行为Broadcast Hash Join。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark Join的分类和实现机制
    • Hash Join
      • Broadcast Hash Join
        • Sort Merge Join
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档