Groupby和reduce是大数据领域常见的算子,但是很多同学应该对其背后机制不甚了解。本文将从源码入手,为大家解析Flink中Groupby和reduce的原理,看看他们在背后做了什么。
探究的原因是想到了几个问题 :
为了便于大家理解,我们先总结下,对于一个Groupby + Reduce的操作,Flink做了如下处理:
这样之前的疑问就基本得到了解释。
MapReduce是一种编程模型,用于大规模数据集的并行运算。概念 "Map(映射)"和"Reduce(归约)" 是它们的主要思想,其是从函数式编程语言,矢量编程语言里借来的特性。
我们目前使用的Flink,Spark都出自于MapReduce,所以我们有必有追根溯源,看看MapReduce是如何区分各个阶段的。
如果把MapReduce细分,可以分为一下几大过程:
Combine是我们需要特殊注意的。在mapreduce中,map多,reduce少。在reduce中由于数据量比较多,所以我们干脆在map阶段中先把自己map里面的数据归类,这样到了reduce的时候就减轻了压力。
Combine可以理解为是在map端的reduce的操作,对单个map任务的输出结果数据进行合并的操作。combine是对一个map的,而reduce合并的对象是对于多个map。
map函数操作所产生的键值对会作为combine函数的输入,经combine函数处理后再送到reduce函数进行处理,减少了写入磁盘的数据量,同时也减少了网络中键值对的传输量。在Map端,用户自定义实现的Combine优化机制类Combiner在执行Map端任务的节点本身运行,相当于对map函数的输出做了一次reduce。
集群上的可用带宽往往是有限的,产生的中间临时数据量很大时就会出现性能瓶颈,因此应该尽量避免Map端任务和Reduce端任务之间大量的数据传输。使用Combine机制的意义就在于使Map端输出更紧凑,使得写到本地磁盘和传给Reduce端的数据更少。
Partition是分割map每个节点的结果,按照key分别映射给不同的reduce,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也可以自定义。
这里其实可以理解归类。我们对于错综复杂的数据归类。比如在动物园里有牛羊鸡鸭鹅,他们都是混在一起的,但是到了晚上他们就各自牛回牛棚,羊回羊圈,鸡回鸡窝。partition的作用就是把这些数据归类。只不过是在写程序的时候,
在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。假如我们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪个reduce task去做呢,是需要立刻决定的。
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce task的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
在我们的例子中,假定 “aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。
shuffle就是map和reduce之间的过程,包含了两端的combine和partition。它比较难以理解,因为我们摸不着,看不到它。它属于mapreduce的框架,编程的时候,我们用不到它。
Shuffle的大致范围就是:怎样把map task的输出结果有效地传送到reduce端。也可以这样理解, Shuffle描述着数据从map task输出到reduce task输入的这段过程。
简单地说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,最终形成一个文件作为reduce task的输入文件。
我们以Flink的KMeans算法作为样例,具体摘要如下:
public class WordCountExampleReduce {
DataStream ds;
public static void main(String[] args) throws Exception {
//构建环境
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//通过字符串构建数据集
DataSet<String> text = env.fromElements(
"Who‘s there?",
"I think I hear them. Stand, ho! Who‘s there?");
//分割字符串、按照key进行分组、统计相同的key个数
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return new Tuple2(value1.f0, value1.f1 + value2.f1);
}
});
//打印
wordCounts.print();
}
//分割字符串的方法
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
输出是:
(hear,1)
(ho!,1)
(them.,1)
(I,2)
(Stand,,1)
(Who‘s,2)
(there?,2)
(think,1)
首先,我们从Flink基本JAVA API来入手开始挖掘。
我们需要留意的是:GroupBy并没有对应的Operator。GroupBy只是生成DataSet转换的一个中间步骤或者辅助步骤。
GroupBy功能的基类是Grouping,其只是DataSet转换的一个中间步骤。其几个主要成员是:
// Grouping is an intermediate step for a transformation on a grouped DataSet.
public abstract class Grouping<T> {
protected final DataSet<T> inputDataSet;
protected final Keys<T> keys;
protected Partitioner<?> customPartitioner;
}
Grouping并没有任何业务相关的API,具体API都是在其派生类中,比如UnsortedGrouping。
我们代码中对应的就是UnsortedGrouping类。我们看到它提供了很多业务API,比如:sum,max,min,reduce,aggregate,reduceGroup,combineGroup.....
回到我们的示例,groupBy做了如下操作
.groupBy(0).reduce(new CentroidAccumulator())
返回的是ReduceOperator。这就对应了前面我们提到的,groupBy只是中间步骤,reduce才能返回一个Operator。public class UnsortedGrouping<T> extends Grouping<T> {
// groupBy返回一个UnsortedGrouping
public UnsortedGrouping<T> groupBy(int... fields) {
return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
}
// reduce返回一个ReduceOperator
public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
return new ReduceOperator<T>(this, inputDataSet.clean(reducer), Utils.getCallLocationName());
}
}
对于业务来说,reduce才是真正有意义的逻辑算子。
从前文的函数调用和ReduceOperator定义可以看出,.groupBy(0).reduce()
的调用结果是生成一个ReduceOperator,而 UnsortedGrouping 被设置为 ReduceOperator 的 grouper 成员变量,作为辅助操作。
public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> {
private final ReduceFunction<IN> function;
private final Grouping<IN> grouper; // UnsortedGrouping被设置在这里,后续reduce操作中会用到。
public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function,
String defaultName) {
this.function = function;
this.grouper = input; // UnsortedGrouping被设置在这里,后续reduce操作中会用到。
this.hint = CombineHint.OPTIMIZER_CHOOSES; // 优化时候会用到。
}
}
让我们顺着Flink程序执行阶段继续看看系统都做了些什么。
程序执行的第一步是:当程序运行时候,首先会根据java API的结果来生成执行plan。
public JobClient executeAsync(String jobName) throws Exception {
final Plan plan = createProgramPlan(jobName);
}
其中重要的函数是translateToDataFlow,因为在translateToDataFlow方法中,会从批处理Java API模块中operators包往核心模块中operators包的转换。
对于我们的示例程序,在生成 Graph时,translateToDataFlow会生成一个 SingleInputOperator,为后续runtime使用。下面是代码缩减版。
protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
......
// UnsortedGrouping中的keys被取出,
else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
// reduce with field positions
ReduceOperatorBase<IN, ReduceFunction<IN>> po =
new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
po.setCustomPartitioner(grouper.getCustomPartitioner());
po.setInput(input);
po.setParallelism(getParallelism()); // 没有并行度的变化
return po;//translateToDataFlow会生成一个 SingleInputOperator,为后续runtime使用
}
}
}
我们代码最终生成的执行计划如下,我们可以看出来,执行计划基本符合我们的估计:简单的从输入到输出。中间有意义的算子其实只有Reduce。
GenericDataSourceBase ——> FlatMapOperatorBase ——> ReduceOperatorBase ——> GenericDataSinkBase
具体在代码中体现如下是:
plan = {Plan@1296}
sinks = {ArrayList@1309} size = 1
0 = {GenericDataSinkBase@1313} "collect()"
formatWrapper = {UserCodeObjectWrapper@1315}
input = {ReduceOperatorBase@1316} "ReduceOperatorBase - Reduce at main(WordCountExampleReduceCsv.java:25)"
hint = {ReduceOperatorBase$CombineHint@1325} "OPTIMIZER_CHOOSES"
customPartitioner = null
input = {FlatMapOperatorBase@1326} "FlatMapOperatorBase - FlatMap at main(WordCountExampleReduceCsv.java:23)"
input = {GenericDataSourceBase@1339} "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)"
程序执行的第二步是:Flink对于Plan会继续优化,生成Optimized Plan。其核心代码位于PlanTranslator.compilePlan 函数,这里得到了Optimized Plan。
这个编译的过程不作任何决策与假设,也就是说作业最终如何被执行早已被优化器确定,而编译也是在此基础上做确定性的映射。所以我们将集中精力看如何优化plan。
private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
OptimizedPlan optimizedPlan = optimizer.compile(plan);
JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
}
在内部调用plan的accept方法遍历它。accept会挨个在每个sink上调用accept。对于每个sink会先preVisit,然后 postVisit。
这里优化时候有几个注意点:
combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;
public ReduceNode(ReduceOperatorBase<?, ?> operator) { DriverStrategy combinerStrategy; switch(operator.getCombineHint()) { case OPTIMIZER_CHOOSES: combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE; break; } }
生成的优化执行计划如下,我们可以看到,这时候设置了并行度,也把reduce分割成两段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE。
Data Source ——> FlatMap ——> Reduce(SORTED_PARTIAL_REDUCE) ——> Reduce(SORTED_REDUCE) ——> Data Sink
具体在代码中体现如下是:
optimizedPlan = {OptimizedPlan@1506}
allNodes = {HashSet@1510} size = 5
0 = {SourcePlanNode@1512} "Data Source "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
1 = {SingleInputPlanNode@1513} "FlatMap "FlatMap at main(WordCountExampleReduceCsv.java:23)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
2 = {SingleInputPlanNode@1514} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
3 = {SinkPlanNode@1515} "Data Sink "collect()" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
4 = {SingleInputPlanNode@1516} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_PARTIAL_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
程序执行的第三步是:建立JobGraph。LocalExecutor.execute中会生成JobGraph。Optimized Plan 经过优化后生成了 JobGraph, JobGraph是提交给 JobManager 的数据结构。
主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
JobGraph是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
final JobGraph jobGraph = getJobGraph(pipeline, configuration);
}
我们可以看出来,这一步形成了一个Operator Chain:
CHAIN DataSource -> FlatMap -> Combine (Reduce)
于是我们看到,Reduce(SORTED_PARTIAL_REDUCE)和其上游合并在一起。
具体在程序中打印出来:
jobGraph = {JobGraph@1739} "JobGraph(jobId: 30421d78d7eedee6be2c5de39d416eb7)"
taskVertices = {LinkedHashMap@1742} size = 3
{JobVertexID@1762} "e2c43ec0df647ea6735b2421fb7330fb" -> {InputOutputFormatVertex@1763} "CHAIN DataSource (at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCountExampleReduceCsv.java:23)) -> Combine (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.DataSourceTask)"
{JobVertexID@1764} "2de11f497e827e48dda1d63b458dead7" -> {JobVertex@1765} "Reduce (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.BatchTask)"
{JobVertexID@1766} "2bee17f2c86aa1e9439e3dedea58007b" -> {InputOutputFormatVertex@1767} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
Job提交之后,就是程序正式运行了。这里实际上涉及到了三次排序,
这里是第一次排序。
当一批数据处理完成之后,在ChainedFlatMapDriver中调用到close函数进行发送数据给下游。
public void close() {
this.outputCollector.close();
}
Operator Chain会调用到ChainedReduceCombineDriver.close
public void close() {
// send the final batch
try {
switch (strategy) {
case SORTED_PARTIAL_REDUCE:
sortAndCombine(); // 我们是在这里
break;
case HASHED_PARTIAL_REDUCE:
reduceFacade.emit();
break;
}
} catch (Exception ex2) {
throw new ExceptionInChainedStubException(taskName, ex2);
}
outputCollector.close();
dispose(false);
}
sortAndCombine中先排序,然后做combine,最后会不断发送数据。
private void sortAndCombine() throws Exception {
final InMemorySorter<T> sorter = this.sorter;
if (!sorter.isEmpty()) {
sortAlgo.sort(sorter); // 这里会先排序
final TypeSerializer<T> serializer = this.serializer;
final TypeComparator<T> comparator = this.comparator;
final ReduceFunction<T> function = this.reducer;
final Collector<T> output = this.outputCollector;
final MutableObjectIterator<T> input = sorter.getIterator();
if (objectReuseEnabled) {
......
} else {
T value = input.next();
// 这里就是combine
// iterate over key groups
while (running && value != null) {
comparator.setReference(value);
T res = value;
// iterate within a key group
while ((value = input.next()) != null) {
if (comparator.equalToReference(value)) {
// same group, reduce
res = function.reduce(res, value);
} else {
// new key group
break;
}
}
output.collect(res); //发送数据
}
}
}
}
最后发送给哪个下游,是由OutputEmitter.selectChannel决定的。有如下几种决定方式:
hash-partitioning, broadcasting, round-robin, custom partition functions。这里采用的是PARTITION_HASH。
每个task都会把同样字符串统计结果发送给同样的下游ReduceDriver。这就保证了下游Reducer一定不会出现统计出错。
public final int selectChannel(SerializationDelegate<T> record) {
switch (strategy) {
...
case PARTITION_HASH:
return hashPartitionDefault(record.getInstance(), numberOfChannels);
...
}
}
private int hashPartitionDefault(T record, int numberOfChannels) {
int hash = this.comparator.hash(record);
return MathUtils.murmurHash(hash) % numberOfChannels;
}
具体调用栈:
hash:50, TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hash:30, TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hashPartitionDefault:187, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:147, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping)
emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
sortAndCombine:254, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:266, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:40, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
close:88, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
invoke:215, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
这里是第二次排序。
在 BatchTask中,会先Sort, Merge输入,然后才会交由Reduce来具体完成过。sort & merge操作具体是在UnilateralSortMerger类中完成的。
getIterator:646, UnilateralSortMerger (org.apache.flink.runtime.operators.sort)
getInput:1110, BatchTask (org.apache.flink.runtime.operators)
prepare:95, ReduceDriver (org.apache.flink.runtime.operators)
run:474, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
UnilateralSortMerger是一个full fledged sorter,它实现了一个多路merge sort。其内部的逻辑被划分到三个线程上(read, sort, spill),这三个线程彼此之间通过一系列blocking queues来构成了一个闭环。
其内存通过MemoryManager分配,所以这个组件不会超过给其分配的内存。
该类主要变量摘录如下:
public class UnilateralSortMerger<E> implements Sorter<E> {
// ------------------------------------------------------------------------
// Threads
// ------------------------------------------------------------------------
/** The thread that reads the input channels into buffers and passes them on to the merger. */
private final ThreadBase<E> readThread;
/** The thread that merges the buffer handed from the reading thread. */
private final ThreadBase<E> sortThread;
/** The thread that handles spilling to secondary storage. */
private final ThreadBase<E> spillThread;
// ------------------------------------------------------------------------
// Memory
// ------------------------------------------------------------------------
/** The memory segments used first for sorting and later for reading/pre-fetching
* during the external merge. */
protected final List<MemorySegment> sortReadMemory;
/** The memory segments used to stage data to be written. */
protected final List<MemorySegment> writeMemory;
/** The memory manager through which memory is allocated and released. */
protected final MemoryManager memoryManager;
// ------------------------------------------------------------------------
// Miscellaneous Fields
// ------------------------------------------------------------------------
/**
* Collection of all currently open channels, to be closed and deleted during cleanup.
*/
private final HashSet<FileIOChannel> openChannels;
/**
* The monitor which guards the iterator field.
*/
protected final Object iteratorLock = new Object();
/**
* The iterator to be returned by the sort-merger. This variable is null, while receiving and merging is still in
* progress and it will be set once we have < merge factor sorted sub-streams that will then be streamed sorted.
*/
protected volatile MutableObjectIterator<E> iterator; // 如果大家经常调试,就会发现driver中的input都是这个兄弟。
private final Collection<InMemorySorter<?>> inMemorySorters;
}
ReadingThread:这种线程持续读取输入,然后把数据放入到一个待排序的buffer中。The thread that consumes the input data and puts it into a buffer that will be sorted.
SortingThread : 这种线程对于上游填充好的buffer进行排序。The thread that sorts filled buffers.
SpillingThread:这种线程进行归并操作。The thread that handles the spilling of intermediate results and sets up the merging. It also merges the channels until sufficiently few channels remain to perform the final streamed merge.
UnilateralSortMerger有一个特殊变量:
protected volatile MutableObjectIterator<E> iterator;
这个变量就是最终sort-merger的输出。如果大家调试过算子,就会发现这个变量就是具体算子的输入input类型。最终算子的输入就是来自于此。
这里是第三次排序,我们可以看出来reduce是怎么和groupby一起运作的。
.groupBy(0)
,ReduceDriver就是单纯获取输入的第一个数值 T value = input.next();
comparator.setReference(value);
因为groubBy只是指定按照第一个位置比较,没有指定具体key数值,所以这个value就是key了。此处记为while (1)
,代码中有注解。while (2)
while (2)
之后,代码依然在 while (1)
,此时value是新值,所以继续在 while (1)
中运行 。把value继续赋于比较算子 comparator.setReference(value);
,于是进行新的key比较。public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
// cache references on the stack
final MutableObjectIterator<T> input = this.input;
final TypeSerializer<T> serializer = this.serializer;
final TypeComparator<T> comparator = this.comparator;
final ReduceFunction<T> function = this.taskContext.getStub();
final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
......
} else {
// 针对 `.groupBy(0)`,ReduceDriver就是单纯获取输入的第一个数值 `T value = input.next();`
T value = input.next();
// while (1)
// iterate over key groups
while (this.running && value != null) {
numRecordsIn.inc();
// 把value赋于比较算子,这个value就是key了。
comparator.setReference(value);
T res = value;
// while (2)
// iterate within a key group,循环比较这个key
while ((value = input.next()) != null) {
numRecordsIn.inc();
if (comparator.equalToReference(value)) {
// same group, reduce,如果下一个数值是同一个key,就reduce
res = function.reduce(res, value);
} else {
// new key group,如果下一个数值不是同一个key,就跳出循环,放弃比较。
break;
}
}
// 把reduce结果输出
output.collect(res);
}
}
}
}
mapreduce里的shuffle 里的 sort merge 和combine
实战录 | Hadoop Mapreduce shuffle之Combine探讨