Hive查询最终转换为MapReduce操作,所以要先了解MapReduce数据倾斜问题。
MapReduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完,此称之为数据倾斜。 在map端和reduce端都有可能发生数据倾斜。在map端的数据倾斜会让多样化的数据集的处理效率更低。在reduce端的数据倾斜常常来源于MapReduce的默认分区器。Reduce数据倾斜一般是指map的输出数据中存在数据频率倾斜的状况,也就是部分输出键的数据量远远大于其它的输出键。
常见的数据倾斜有以下几类:
解决MapReduce数据倾斜思路有两类:
(1)reduce 端的隐患在 map 端就解决
方法1:Combine 使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。在可能的情况下,combine的目的就是聚合并精简数据。在加个combiner函数,加上combiner相当于提前进行reduce,就会把一个mapper中的相同key进行了聚合,减少shuffle过程中数据量,以及reduce端的计算量。这种方法可以有效的缓解数据倾斜问题,但是如果导致数据倾斜的key 大量分布在不同的mapper的时候,这种方法就不是很有效了。
方法2:map端join join 操作中,使用 map join 在 map 端就先进行 join ,免得到reduce 时卡住。
方法3: group 能先进行 group 操作的时候先进行 group 操作,把 key 先进行一次 reduce,之后再进行 count 或者 distinct count 操作。
(2)对 key 的操作,以减缓reduce 的压力
因为map阶段对数据处理方法不当,或者说Key设计不当,导致大量数据聚集到某个key下。这个问题再《数据结构》的hash算法中有详细解决办法(增大数组容量,选择恰当素数)。所以大家很快想到一个解决办法,重新设计key,使得数据均匀分不到每个key下面(通常需要增加reduce数),这样reduce节点收到的数据也相对均匀。
这里提供一个解决办法,自定义Partitioner,可以将key均匀分布。
package cn.hadron.mr.ncdc;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class MyPartitioner extends HashPartitioner<Weather, DoubleWritable> {
// 执行时间越短越好
public int getPartition(Weather key, DoubleWritable value, int numReduceTasks) {
// 根据年份分区
return key.getYear() % numReduceTasks;
}
}
在发现了倾斜数据的存在之后,就很有必要诊断造成数据倾斜的那些键。有一个简便方法就是在代码里实现追踪每个键的最大值。为了减少追踪量,可以设置数据量阀值,只追踪那些数据量大于阀值的键,并输出到日志中。
private int MAXVAL=100;
public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
int i = 0;
while (values.hasNext()) {
values.next();
i++;
}
if (i> MAXVAL) {
log.info("Received " + i + " values for key " + key);
}
}
Hive产生数据倾斜的原因
解决办法
(1)调参
hive.map.aggr=true
Map端部分聚合,相当于Combiner
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
(2)SQL调优
使map的输出数据更均匀的分布到reduce中去,是我们的最终目标。由于Hash算法的局限性,按key Hash会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。在此给出较为通用的步骤:
1、采样log表,哪些user_id比较倾斜,得到一个结果表tmp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样是并不可少的。
2、数据的分布符合社会学统计规则,贫富不均。倾斜的key不会太多,就像一个社会的富人不多,奇特的人不多一样。所以tmp1记录数会很少。把tmp1和users做map join生成tmp2,把tmp2读到distribute file cache。这是一个map过程。
3、map读入users和log,假如记录来自log,则检查user_id是否在tmp2里,如果是,输出到本地文件a,否则生成