textFile构建RDD的分区及compute计算策略

1,textFile

A),第一点,就是输入格式,key,value类型及并行度的意义。

def textFile(
    path: String,
 minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
 //输入文件的格式TextInputFormat,key的类型LongWritable ,value的类型Text
  //最小分区数defaultMinPartitions
 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
 minPartitions).map(pair => pair._2.toString).setName(path)
}

并行度

conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))

真正意义是啥?实际是决定我们goalSize的值。并不决定我们的分区数。

B),hadoopRDD的getPartition方法。

主要是获取分片的过程通过调用FileInputFormat.getSplits方法来实现分片。主要有一下几个步骤:

1) ,获取所有 FileStatus

FileStatus[] files = listStatus(job);

ListStatus方法里面:

1,判断是否需要递归

boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);

2,接着是创建路径过滤器,筛选掉一些我们不需要的文件,入以_,.开头的

List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
  filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);

3,根据mapreduce.input.fileinputformat.list-status.num-threads决定是并发还是单线程

FileStatus[] result;
int numThreads = job
    .getInt(
        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);

Stopwatch sw = new Stopwatch().start();
if (numThreads == 1) {
  List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 
 result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
} else {
  Iterable<FileStatus> locatedFiles = null;
  try {
 
    LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
        job, dirs, recursive, inputFilter, false);
 locatedFiles = locatedFileStatusFetcher.getFileStatuses();
 } catch (InterruptedException e) {
 throw new IOException("Interrupted while getting file statuses");
 }
  result = Iterables.toArray(locatedFiles, FileStatus.class);
}

2) ,获取目标分片goalsize和最小minsize

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

3) ,判断文件是否支持切分,不压缩或者压缩方式为BZip2Codec支持切分

protected boolean isSplitable(FileSystem fs, Path file) {
 final CompressionCodec codec = compressionCodecs.getCodec(file);
  if (null == codec) {
 return true;
 }
 return codec instanceof SplittableCompressionCodec;
}

支持切分就进行切分分片,切分分片大小为

Math.max(minSize, Math.min(maxSize, blockSize));

不支持切分的话就直接返回一个文件一个分片

最终,用InputSplit构建HadoopPartition

C),接着进入compute方法

重点掌握根据指定分片获取reader

reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

实际上是在TextInputFormat构建了

 new LineRecordReader(job, (FileSplit) genericSplit,
 recordDelimiterBytes);

还有就是识别不同系统的过程,比如hdfs ,本地file,tachyon。

final FileSystem fs = file.getFileSystem(job);

里面会根据uri获取scheme,然后构建为"fs." + scheme + ".impl" 通过反射的到相应的对象。

clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);

类加载器为Configuration对象里面初始化的

private ClassLoader classLoader;
{
 classLoader = Thread.currentThread().getContextClassLoader();
  if (classLoader == null) {
 classLoader = Configuration.class.getClassLoader();
 }
}

而此,configuration对象是在compute方法中通过jobConf = getJobConf()获得的实际是

从Driver端发送过来的。

val conf: Configuration = broadcastedConf.value.value

由此可以得到结论是tachyon使用是依赖,必须方法系统类加载器的Classpath中去

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-12-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊sentinel的DegradeSlot

com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java

1451
来自专栏码匠的流水账

聊聊eureka server的response cache

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/ApplicationResource....

1143
来自专栏Spark生态圈

[spark] DAGScheduler 提交stage源码解析

DAGScheduler在划分完Stage后([spark] DAGScheduler划分stage源码解析 ),将会通过submitStage(finalSt...

1113
来自专栏架构之路

Spring 数据库连接(Connection)绑定线程(Thread)的实现

最近在看spring事务的时候在想一个问题:spring中的很多bean都是单例的,是非状态的,而数据库连接是一种有状态的对象,所以spring一定在创建出co...

3853
来自专栏码匠的流水账

聊聊sentinel的SimpleHttpCommandCenter

sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/trans...

911
来自专栏码匠的流水账

spring security oauth2使用redis存储token

本文就来讲述一下spring security oauth2使用redis来存储token的配置及在redis中的存储结构

1.4K0
来自专栏小勇DW3

Mybatis使用动态代理实现拦截器功能

  拦截器顾名思义为拦截某个功能的一个武器,在众多框架中均有“拦截器”。这个Plugin有什么用呢?或者说拦截器有什么用呢?可以想想拦截器是怎么实现的。Plug...

4692
来自专栏杂烩

Eclipse下Hadoop的MapReduce开发之MapReduce编写

    先说下业务需求吧,有个系统日志文件,记录系统的运行信息,其中包含DEBUG、INFO、WARN、ERROR四个级别的日志,现在想要看到所有级别各有多少条...

1309
来自专栏码字搬砖

Spark Streaming 中使用 zookeeper 保存 offset 并重用 Java版

最近中使用spark Streaming +kafka,由于涉及到金额,所以需要保证at only one, 而网上关于java版的kafka offset...

3302
来自专栏数据结构与算法

BZOJ1269: [AHOI2006]文本编辑器editor

Descriptio 这些日子,可可不和卡卡一起玩了,原来可可正废寝忘食的想做一个简单而高效的文本编辑器。你能帮助他吗? 为了明确任务目标,可可对“文本编辑器...

2907

扫码关注云+社区

领取腾讯云代金券