前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何Spark的shuffle移植到自己业务

如何Spark的shuffle移植到自己业务

作者头像
Spark学习技巧
发布2020-05-06 17:06:28
6550
发布2020-05-06 17:06:28
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

1.ExternalSorter简介

ExternalSorter是用来排序及聚合key-value类型的数据。首先使用分区器将数据按照key进行分区,然后使用自定义的排序器在一个分区内对数据key进行排序。可以生成适合shuffle读取的分区文件。

如果禁用combiner,那么value的输入和输出类型要一致。

注意:ExternalSorter是一个比较通用的排序器,在sort-based shuffle中,可以用一些配置控制其一些特性,比如块儿压缩可以通过配置 spark.shuffle.compress来开启及关闭.假如在non-shuffle场景下使用了ExternalSorter,可能会需要重新读取该配置。

构造函数如下:

代码语言:javascript
复制
private[spark] class ExternalSorter[K, V, C](
  context: TaskContext,
  aggregator: Option[Aggregator[K, V, C]] = None,
  partitioner: Option[Partitioner] = None,
  ordering: Option[Ordering[K]] = None,
  serializer: Serializer = SparkEnv.get.serializer)
extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())

参数介绍:

代码语言:javascript
复制
 * @param aggregator optional Aggregator with combine functions to use for merging data
 * @param partitioner optional Partitioner; if given, sort by partition ID and then key
 * @param ordering optional Ordering to sort keys within each partition; should be a total ordering
 * @param serializer serializer to use when spilling to disk
  • aggregator用来完成聚合操作。
  • partitioner就是shuffle的算子的分区器。也是一个maptask,写数据输出给哪个reducer,由该分区器决定。
  • ordering排序器,可选,对key进行排序。
  • serializer用来在写入数据到磁盘的时候对数据进行序列化,读数据的时候要用他进行反序列化。

注意,假如设置了ordering参数,那么就必然会对数据进行按key排序,所以一定是要在需要排序的时候才设置。比如,在一个不需要map端合并的map操作中,为了避免不必要的排序,需要将ordering参数设置为None。另一方面,假如需要map端合并,那相对于none指定排序器会更加高效。

使用该类的步骤

  1. 实例化一个ExternalSorter。
  2. 调用insertAll(),并传入records数据集。
  3. 触发排序及合并。可以使用iterator()去对元素进行迭代排序或聚合。也可以调用writePartitionedFile()函数,创建已经排序或者聚合的文件,该文件适用于spark sort shuffle。

2.ExternalSorter的工作原理

首先,数据会不停的写入内存缓存区中,假如需要按照key对value进行聚合,则使用的是PartitionedAppendOnlyMap;假如不需要按照key对value进行聚合则使用PartitionedPairBuffer。在内存buffer内部,我们需要按照partition ID对元素进行排序,假如设置了key排序也会按照key对元素进行排序。为了避免频繁调用分区器,会在存储record的时候也存储partition ID 。

其次,假如缓存区达到了内存限制,就会将其溢写到磁盘存为一个文件。这些文件,首先会按照partition ID进行排序,假如需要聚合的话也会按照key或者key的hashcode进行排序。对于每个文件,我们会记录在内存里的时候每个分区有多少元素,所以没必要为每个元素写入partition ID。

然后,当用户调用iterator或者file输出函数的时候,已经溢写的文件就会连同内存的数据一起合并,会使用与前面相同的排序器。如果需要按照key对元素聚合,要么使用设置的排序器进行全局排序,要么读取有相同hashcode的key,然后对相同key的value进行聚合操作。

最后,当调用stop()函数的时候会删除所有的中间结果文件。

3.案例

其实我们可以直接使用ExternalSorter,实际上就是一个map操作,使用指定的分区器,对数据按照key进行分区,然后会在同一个分区内使用聚合和排序算子,对key进行排序及聚合操作。

3.1 实例化ExternalSorter

代码语言:javascript
复制
val size = 400

val sparkConf = new SparkConf()
sparkConf.setMaster("local")
sparkConf.setAppName(this.getClass.getCanonicalName)

sparkConf.set("spark.shuffle.manager", "sort")
sparkConf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 40).toString)

val sc = new SparkContext(sparkConf)

val context = SparkUtils.fakeTaskContext(sc)

val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
val ord = implicitly[Ordering[Int]]

//    // Both aggregator and ordering
val sorter = new ExternalSorter[Int, Int, Int](
  context, Some(agg), Some(new HashPartitioner(4)),Some(ord))

上面agg和ord是聚合器和排序器,两者均可以自定义,也可以设置为None,浪尖这里给了最简单的案例:key,value及聚合后的结果都是Int类型。

3.2 插入数据

浪尖这里是400条数据,key的范围是0-39,value范围是0-399.

代码语言:javascript
复制
val elements = (0 until size).iterator.map { i => (i % 40, i) }
sorter.insertAll(elements)

3.3 触发输出计算

可以按照分区将数据输出到console或者缓存到一个scala集合里。

代码语言:javascript
复制
sorter.partitionedIterator
.map(p => (p._1, p._2))
.filter(p=> p._1 == 0)
.flatMap(p=>p._2)
.foreach(println)

浪尖这里是获取了partition ID的为0的数据,并输出,结果如下:

代码语言:javascript
复制
(0,1800)
(4,1840)
(8,1880)
(12,1920)
(16,1960)
(20,2000)
(24,2040)
(28,2080)
(32,2120)
(36,2160)

这个计算过程,中间数据会落地到磁盘里的,触发溢写操作的的配置参数是:

代码语言:javascript
复制
sparkConf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 40).toString)

浪尖这里测试方便,达到10条就会触发刷磁盘,临时文件会在调用sorter.stop()之后删除。

要想看是否有中间文件,操作方法也很简单,spark的blockmanager提供了接口:

代码语言:javascript
复制
val beforeCleanUp = SparkUtils.getBlockManager(sc).diskBlockManager.getAllFiles().size
println(beforeCleanUp)
sorter.stop()
val afterCleanUp = SparkUtils.getBlockManager(sc).diskBlockManager.getAllFiles().size
println(afterCleanUp)

结果如下:

代码语言:javascript
复制
36
0

3.4 输出到文件

shuffle中间结果肯定是输出到blockmanager管理的,也是可以落地到磁盘,浪尖这里也给出让其落地磁盘操作案例。

代码语言:javascript
复制
sorter.partitionedIterator.map(p => (p._1, p._2)).filter(p=> p._1 == 0).flatMap(p=>p._2).foreach(println)

val outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "",new File("data/"))

// outputFile.deleteOnExit()
sorter.writePartitionedFile(ShuffleBlockId(0,0,0),outputFile).foreach(println)

执行之后会在工程的data目录下生成文件,文件是unsaferow及序列化的,不可以直接查看。

3.5 读取溢写文件

sorter的writePartitionedFile方法,返回值是一个数组,数组的下标是 partition ID,元素是该分区数据的大小。读数据的时候由于sorter会将所有的分区数据写入同一个数据文件,其实spark shuffle里还有一个索引文件,浪尖这里是测试用的所有没有索引文件。

代码语言:javascript
复制
[271,271,271,271]

浪尖这里分区器是四个分区,数据设计的比较均匀,所以每个分区数据大小很均匀,都是271.

读取最后一个分区数据的方法,浪尖没有参照源码,给出比较简单的读取方法有兴趣的可以去源码里找找:

代码语言:javascript
复制
val serializer = SparkEnv.get.serializer.newInstance()
val input = new FileInputStream(new File("data/test-unsafe-row-serializer-spill7127803973846287207"))
input.skip(271+271+271)
val deserializer = serializer.deserializeStream(input)
try {
  val rows = deserializer.asKeyValueIterator
  while (rows.hasNext)
  {
    val (key,value)=rows.next();
    println(key+":"+value)
  }
} catch {
  case ex: Exception => {
    ex.printStackTrace() // 打印到标准err
    System.err.println("exception===>: ...")  // 打印到标准err
  }
}

结果,第一个元素是key,第二个是聚合后的value,可以看到分区特点也很均匀key差值是4,由于排序的原因,所以key也是递增的,这是由于浪尖这个给的hashpartitioner分区数为4,且给了排序器的原因。

代码语言:javascript
复制
3:1830
7:1870
11:1910
15:1950
19:1990
23:2030
27:2070
31:2110
35:2150
39:2190

浪尖这里读取分区文件的时候由于分区segment之间有分隔符,所以会抛异常,而中止,这正好是给我们结束契机。

4. 代码补充

自己的类要包路径是org.apache.spark.文章提到的工具类是:

代码语言:javascript
复制
package org.apache.spark

import java.util.Properties

import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.storage.BlockManager

object SparkUtils {
  def fakeTaskContext(sc: SparkContext): TaskContext = {
    val env = sc.env
    val taskMemoryManager = new TaskMemoryManager(env.memoryManager, 0)
    new TaskContextImpl(
      stageId = 0,
      stageAttemptNumber = 0,
      partitionId = 0,
      taskAttemptId = 0,
      attemptNumber = 0,
      taskMemoryManager = taskMemoryManager,
      localProperties = new Properties,
      metricsSystem = env.metricsSystem)
  }

  def getBlockManager(sc:SparkContext):BlockManager = {
    sc.env.blockManager
  }
}

5.总结

这个思路主要来源于知识星球之前有人问过浪尖,数据集比较大,写分布式spark程序集成到自己的任务里有比较麻烦,所以想问问浪尖有没有好思路。

浪尖想自己实现基于磁盘的排序算法,实际上重复造轮子太复杂了,而且性能不知如何,所以想到利用spark shuffle的基于磁盘的排序操作,把它拿出来,然后使用起来。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档