前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Arrow HashJoin限制

Arrow HashJoin限制

作者头像
公众号guangcity
发布于 2023-09-02 02:42:09
发布于 2023-09-02 02:42:09
21600
代码可运行
举报
文章被收录于专栏:光城(guangcity)光城(guangcity)
运行总次数:0
代码可运行

Arrow HashJoin限制

0.背景

最近在测试一些大数据量的HashJoin计算,例如:用户层设置batch数为600000,那么会导致crash。本节将会通过调试,来一步步学习SwissJoin(HashJoin的内部实现)的分区逻辑。

1.crash 点

本次crash点为SwissJoin分区排序。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// crash point
ARROW_DCHECK(num_rows > 0 && num_rows <= (1 << 15));

为了研究这个值,于是写下个这篇文章。

2.布隆过滤器

自Arrow 9.0版本后,新增HashJoin的BF功能,在执行计划上新增PrepareToProduce接口,该接口会在每个节点开始Produce之前做一些准备工作,目前只用在HashJoin节点,这里会对BloomFilter的上下文做一些初始化,例如:线程数、调度器、注册build端finish任务、probe端任务等等。

值得注意的是,对于HashJoin的线程数为:CPU + IO + 1

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
size_t num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1);

随后build端StartProducing->BuildHashTable,依次继续下面的任务,BuildTask->MergeTask->ScanTask。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  void InitTaskGroups() {
    task_group_build_ = scheduler_->RegisterTaskGroup(
        [this](size_t thread_index, int64_t task_id) -> Status {
          return BuildTask(thread_index, task_id);
        },
        [this](size_t thread_index) -> Status { return BuildFinished(thread_index); });
    task_group_merge_ = scheduler_->RegisterTaskGroup(
        [this](size_t thread_index, int64_t task_id) -> Status {
          return MergeTask(thread_index, task_id);
        },
        [this](size_t thread_index) -> Status { return MergeFinished(thread_index); });
    task_group_scan_ = scheduler_->RegisterTaskGroup(
        [this](size_t thread_index, int64_t task_id) -> Status {
          return ScanTask(thread_index, task_id);
        },
        [this](size_t thread_index) -> Status { return ScanFinished(thread_index); });
  }

3.BuildTask

  • 哈希计算

在这个里面会去做一些指令集的Hash。

  • 数据分区

在数据Join时,通常会将具有相同哈希值的元素分配到同一个分区中,以便进行后续的连接操作。对于每个哈希值,这段代码会计算其所属的分区,并将其对应的行索引保存在 locals.batch_prtn_row_ids 中。分区的范围和索引信息保存在 locals.batch_prtn_ranges 中。

  • 更新哈希

在完成数据分区后,代码会对 locals.batch_hashes 中的哈希值进行更新,以清除已经用于分区的高位比特位。这样做是为了后续将哈希值用于建立哈希表时能够直接使用这些低位比特位,从而减少哈希冲突的可能性。

取高位计算分区:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
return locals.batch_hashes[i] >> (31 - log_num_prtns_) >> 1;

移除已经参与分区计算的高位:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 locals.batch_hashes[i] <<= log_num_prtns_;
  • 数据插入

它会根据分区信息从 key_batchpayload_batch_maybe_null 中选择相应的数据,并将其插入到该分区对应的哈希表中。

4.数据分区

4.1 分区逻辑

以代码中注释为例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 输入数组: [5, 7, 2, 3, 5, 4]
 分区数: 3
 分区算法: [&in_arr] (int row_id) { return in_arr[row_id] / 3; }
 输出位置映射算法: [&out_arr] (int row_id, int pos) { out_arr[pos] = row_id; }

执行分区操作后,我们的到:out_arr: [2, 5, 3, 5, 4, 7] prtn_ranges: [0, 1, 5, 6]

  • out_arr执行的流程为:

按照分区算法,对数组每个元素除以3的到分区数组[1, 2, 0, 1, 1, 1],排序的到[0, 1, 1, 1, 1, 2],随后映射到输入数组的每一条记录上,便得到了[2, 5, 3, 5, 4, 7] 。

  • prtn_ranges执行的流程为:

我们在上面已经得到了[0, 1, 1, 1, 1 2],那么就可以计算每个分区在排序后数组中的起始位置,便得到了[0, 1, 5, 6]。

SwissJoin里面的实际分区算法:右移31-log_num_prtns_,以获取哈希值的高 log_num_prtns_ 位作为分区 ID,最后右移1位向下取整。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
return locals.batch_hashes[i] >> (31 - log_num_prtns_) >> 1;

4.2 分区crash调试流程

为了方便查看上下文,这里通过gdb详细的打印了一些必要信息。

应用层传入13个batch,总行数1500036,依次batch数量为120254、120414、119563等等。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(gdb) p build_side_batches_
$21 = {row_count_ = 1500036, batches_ = std::vector of length 13, capacity 16 = {{values = std::vector of length 2, capacity 2 = {{
          static kUnknownLength = -1, 
          value = ...
(gdb) p build_side_batches_.batches_[0].ToString ()
$24 = "ExecBatch\n    # Rows: 120254\n    0: Array[1,8,11,13,18,27,30,32,36,40,...,599964,599968,599972,599974,599984,599985,599990,599991,599992,599999]\n    1: Array[1,8,11,13,18,27,30,32,36,40,...,599964,599"...
(gdb) p build_side_batches_.batches_[1].ToString ()
$25 = "ExecBatch\n    # Rows: 120414\n    0: Array[600011,600021,600028,600029,600032,600035,600037,600044,600048,600051,...,1199968,1199970,1199972,1199973,1199978,1199988,1199990,1199996,1199997,1199999]\n   "...
(gdb) p build_side_batches_.batches_[2].ToString ()
$26 = "ExecBatch\n    # Rows: 119563\n    0: Array[1200001,1200012,1200019,1200023,1200026,1200029,1200030,1200043,1200053,1200065,...,1799973,1799976,1799986,1799988,1799989,1799991,1799994,1799996,1799998,18"..
(gdb) p num_rows_
$35 = 1500036
(gdb) p dop_
$38 = 105
(gdb) p bit_util::Log2(105)
$39 = 7
(gdb) p bit_util::Log2(num_rows_ / (1 << 18))
$41 = 3
(gdb) p num_prtns_
$42 = 8
(gdb) p thread_states_[0]
$47 = {batch_hashes = std::vector of length 120254, capacity 120254 = {2977511326, 2362293233, 2688180940, 53399816, 2072962591, 3117799855, 
    3443687562, 808906694, 4129082687, 3154357176, 536287733, 2539073292, 1238460074, 1634524813, 2679361821, 1704636310, 729910543, 
    2390031435, 1774747807, 4120198289, 3145472522, 1828082088, 3830867903, 2530254429, 1555528918, 1881416625, 3900979400, 1266198276, 
    ......

dop_可以理解为线程数,这里计算出来是105,计算来自PrepareToProduce

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  // TODO(ARROW-15732)
  // Each side of join might have an IO thread being called from. Once this is fixed
  // we will change it back to just the CPU's thread pool capacity.
  // 105 = 96 + 8 + 1
  size_t num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1);

分区初始化:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
constexpr int64_t min_num_rows_per_prtn = 1 << 18;
log_num_prtns_ =
    std::min(bit_util::Log2(dop_),
             bit_util::Log2(bit_util::CeilDiv(num_rows, min_num_rows_per_prtn)));
num_prtns_ = 1 << log_num_prtns_;

所以这里是

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
log_num_prtns_ = min(7, 3) = 3;
num_prtns_ = 1 << 3 = 8;

划分为8个分区,每个分区里面的行数限制为1 << 15,分区数也是1 << 15。

在这里我们第一个batch行数为120254,超过了1 << 15,就crash了,所以我们应该限制每个batch的数量不超过1 << 15。

5.总结

理解三个控制参数:

  • 控制分区数量不要太多 1 << 18 = 262144
  • 控制分区内的行数不会超过1 << 15 = 32768
  • 分区数不得超过 1 << 15 = 32768

也就是整表限制 <= 2^30

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-07-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 光城 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
如何在Apache Arrow中定位与解决问题
最近在执行sql时做了一些batch变更,出现了一个 crash问题,底层使用了apache arrow来实现。本节将会从0开始讲解如何调试STL源码crash问题,在这篇文章中以实际工作中resize导致crash为例,引出如何进行系统性分析,希望可以帮助大家~
公众号guangcity
2024/04/03
1910
如何在Apache Arrow中定位与解决问题
C++如何排查并发编程死锁问题?
最近在Apache arrow里面写一个支持并行的算子:nested loop join,然后既然涉及到并行,这里就会遇到大家常说的死锁问题,假设你碰到了死锁问题,如何调试与定位呢?
公众号guangcity
2024/04/15
4840
HBase使用HashTable/SyncTable工具同步集群数据
复制(在上一篇博客文章中介绍)已经发布了一段时间,并且是Apache HBase最常用的功能之一。使集群与不同的对等方复制数据是非常常见的部署,无论是作为DR策略还是简单地作为在生产/临时/开发环境之间复制数据的无缝方式。尽管这是使不同的HBase数据库在亚秒级延迟内保持同步的有效方法,但是复制仅对启用该功能后所摄取的数据进行操作。这意味着复制部署中涉及的所有集群上的所有现有数据仍将需要以其他某种方式在同级之间进行复制。有很多工具可用于同步不同对等集群上的现有数据。Snapshots、BulkLoad、CopyTable是此类工具的知名示例,以前的Cloudera博客文章中都提到了这些示例。HashTable/SyncTable,详细介绍了它的一些内部实现逻辑,使用它的利弊以及如何与上述其他数据复制技术进行比较。
大数据杂货铺
2020/11/09
1.7K0
Yolov8 pose关键点检测 C++ GPU部署:ONNXRuntime cuda部署
💡💡💡本文摘要:本文提供了YOLOv8 pose关键点检测 c++部署方式,ONNX Runtime CUDA和cpu部署
AI小怪兽
2024/08/14
1.3K0
ClusterFuzz的bot源码(fuzz task)阅读
之后执行中的src/local/butler/run_bot.py中的execute
用户1423082
2024/12/31
330
PostgreSQL 哈希链接 和 哈希聚合
在PostgreSQL中,表和表之间进行关联关系的情况下,在等值链接中,两个表如果一个是大表一个是小表,PostgreSQL 更倾向与使用 hash join 的方式来解决问题。主要的原因在于通过hash join 会利用内存来进行等值链接的对比针对这种链接的方式,效率更高,
AustinDatabases
2024/03/02
2960
PostgreSQL  哈希链接 和 哈希聚合
Apache Arrow Acero执行引擎
对于许多复杂的计算,在内存或计算时间内,连续的计算函数的直接调用都是不可行的。为了更加有效的提高资源使用率、促进多批数据的消费,Arrow提供了一套流式执行引擎,称为Acero。
公众号guangcity
2023/09/02
6670
Apache Arrow Acero执行引擎
如何实现比PyTorch快6倍的Permute/Transpose算子?
无论是在统治NLP届的Transformer,还是最近视觉领域的新秀Vision Transformer,我们都能在模型中看到Transpose/Permute算子的身影,特别是在多头注意力机制(Multi-Head Attention)中,需要该算子来改变数据维度排布。
BBuf
2021/11/12
1.4K0
如何实现比PyTorch快6倍的Permute/Transpose算子?
诡异宕机,为何分区表会匹配到错误的表名?
本文从一个工单引入,讲述了 MySQL Bug#115352 从故障定位、Bug 提交到修复的全过程。
爱可生开源社区
2025/03/27
640
诡异宕机,为何分区表会匹配到错误的表名?
TiDB 源码阅读系列文章(十一)Index Lookup Join
在介绍 Index Lookup Join 之前,我们首先看一下什么是 Nested Loop Join。
PingCAP
2018/06/28
4K0
Postgresql源码(64)查询执行——子模块Executor(2)执行前的数据结构和执行过程
上一篇说明了执行的框架,本篇深入分析执行细节。测试用例不变,还是分析之前的case。
mingjie
2022/08/03
6800
Postgresql源码(64)查询执行——子模块Executor(2)执行前的数据结构和执行过程
[源码解析] NVIDIA HugeCTR,GPU版本参数服务器--- (6) --- Distributed hash表
在这系列文章中,我们介绍了 HugeCTR,这是一个面向行业的推荐系统训练框架,针对具有模型并行嵌入和数据并行密集网络的大规模 CTR 模型进行了优化。
罗西的思考
2022/05/09
7560
[源码解析] NVIDIA HugeCTR,GPU版本参数服务器--- (6) --- Distributed hash表
大厂技术实现 | 多目标优化及应用(含代码实现)@推荐与计算广告系列
推荐,搜索,计算广告是互联网公司最普及最容易商业变现的方向,也是算法发挥作用最大的一些方向,前沿算法的突破和应用可以极大程度驱动业务增长,这个系列咱们就聊聊这些业务方向的技术和企业实践。本期主题为多目标学习优化落地(附『实现代码』和『微信数据集』)
ShowMeAI
2021/10/21
2.2K1
大厂技术实现 | 多目标优化及应用(含代码实现)@推荐与计算广告系列
企业安全建设之路:端口扫描(下)
0x00、前言 在企业安全建设过程当中,我们也不断在思考,做一个什么样的端口扫描才能企业业务需求。同时,伴随着企业私有云、混合云以及公有云业务部署环境的不断变化,我们适当也要对扫描策略做调整。前期的
FB客服
2018/02/28
1.4K0
企业安全建设之路:端口扫描(下)
从源码来分析kafka生产者原理
源码学习是一种挺好的方式,不过根据我的经验最好是先学习大致的原理,再回头看源码,更能抓住重点。
崩天的勾玉
2024/03/12
1450
从源码来分析kafka生产者原理
单细胞分析工具--ECAUGT提取hECA数据
首先可根据表型信息(meta.data)筛选目标细胞群,常用的两个条件是器官(organ)与细胞(cell_type)类型
生信技能树jimmy
2023/02/16
3370
单细胞分析工具--ECAUGT提取hECA数据
libtorch系列教程3:优雅地训练MNIST分类模型
在这篇文章中,我们对如何使用Libtorch进行MNIST分类模型的训练和测试进行详细描述。首先会浏览官方MNIST示例,然后对其进行模块化重构,为后续别的模型的训练提供 codebase。
王云峰
2023/10/23
5670
Tensorflow - tfrecords 文件的创建
这里主要提供了 Tensorflow 创建 tfrecords 文件的辅助函数,以用于图像分类、检测和关键点定位.
AIHGF
2019/03/11
1.7K0
MySQL8.0 优化器介绍(二)
join在MySQL 是一个如此重要的章节,毫不夸张的说,everything is a join。
GreatSQL社区
2023/08/10
2450
MySQL8.0 优化器介绍(二)
线上MySQL的自增id用尽怎么办?
MySQL的自增id都定义了初始值,然后不断加步长。虽然自然数没有上限,但定义了表示这个数的字节长度,计算机存储就有上限。比如,无符号整型(unsigned int)是4个字节,上限就是2^32 - 1。那自增id用完,会怎么样?
JavaEdge
2021/10/18
2.1K0
推荐阅读
相关推荐
如何在Apache Arrow中定位与解决问题
更多 >
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文