前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core读取ES的分区问题分析

Spark Core读取ES的分区问题分析

作者头像
Spark学习技巧
发布2019-06-20 17:37:24
1.5K0
发布2019-06-20 17:37:24
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

ES也是比较火热,在日志数据分析,规则分析等确实很方便,说实话用es stack 浪尖觉得可以解决很多公司的数据分析需求。极客时间下周一要上线新的ES课程,有需要的暂时别购买,到时候还找浪尖返现吧。

写这篇文章的原因是前两天星球球友去面试,面试管问了一下,Spark 分析ES的数据,生成的RDD分区数跟什么有关系呢?

稍微猜测一下就能想到跟分片数有关,但是具体是什么关系呢?

可想的具体关系可能是以下两种:

1).就像KafkaRDD的分区与kafka topic分区数的关系一样,一对一。

2).ES支持游标查询,那么是不是也可以对比较大的分片进行拆分成多个RDD分区呢?

那么下面浪尖带着大家翻一下源码看看具体情况。

1.Spark Core读取ES

ES官网直接提供的有elasticsearch-hadoop 插件,对于ES 7.x,hadoop和Spark版本支持如下:

代码语言:javascript
复制
hadoop2Version  = 2.7.1
hadoop22Version = 2.2.0
spark13Version = 1.6.2
spark20Version = 2.3.0

浪尖这了采用的ES版本是7.1.1,测试用的Spark版本是2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:

a,导入整个elasticsearch-hadoop包

代码语言:javascript
复制
 <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-hadoop</artifactId>
      <version>7.1.1</version>
    </dependency>

b,只导入spark模块的包

代码语言:javascript
复制
<dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-spark-20_2.11</artifactId>
      <version>7.1.1</version>
    </dependency>

浪尖这里为了测试方便,只是在本机起了一个单节点的ES实例,简单的测试代码如下:

代码语言:javascript
复制

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

object es2sparkrdd {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)

    conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")
    conf.set(ConfigurationOptions.ES_PORT, "9200")
    conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
    conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
//    conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
//    conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
    conf.set("es.write.rest.error.handlers", "ignoreConflict")
    conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

    val sc = new SparkContext(conf)
    import org.elasticsearch.spark._

    sc.esRDD("posts").foreach(each=>{
      each._2.keys.foreach(println)
    })
    sc.esJsonRDD("posts").foreach(each=>{
      println(each._2)
    })

    sc.stop()
  }
}

可以看到Spark Core读取RDD主要有两种形式的API:

a,esRDD。这种返回的是一个tuple2的类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。

代码语言:javascript
复制
RDD[(String, Map[String, AnyRef])]

b,esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。

代码语言:javascript
复制
RDD[(String, String)]

虽然是两种类型的RDD,但是RDD都是ScalaEsRDD类型。

要分析Spark Core读取ES的并行度,只需要分析ScalaEsRDD的getPartitions函数即可。

2.源码分析

首先导入源码https://github.com/elastic/elasticsearch-hadoop这个是gradle工程,可以直接导入idea,然后切换到7.x版本即可。

废话少说直接找到ScalaEsRDD,发现gePartitions是在其父类实现的,方法内容如下:

代码语言:javascript
复制
override def getPartitions: Array[Partition] = {
    esPartitions.zipWithIndex.map { case(esPartition, idx) =>
      new EsPartition(id, idx, esPartition)
    }.toArray
  }

esPartitions是一个lazy型的变量:

代码语言:javascript
复制
@transient private[spark] lazy val esPartitions = {
    RestService.findPartitions(esCfg, logger)
  }

这种声明原因是什么呢?

lazy+transient的原因大家可以考虑一下。

RestService.findPartitions方法也是仅是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法。

代码语言:javascript
复制
final List<PartitionDefinition> partitions;
//            5.x及以后版本 同时没有配置es.input.max.docs.per.partition
if (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) {
     partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);
} else {
     partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);
}       

a).findSlicePartitions

这个方法其实就是在5.x及以后的ES版本,同时配置了

代码语言:javascript
复制
es.input.max.docs.per.partition

以后,才会执行,实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:

代码语言:javascript
复制
long numDocs;
if (readResource.isTyped()) {
    numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query);
} else {
    numDocs = client.countIndexShard(index, Integer.toString(shardId), query);
}
int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
for (int i = 0; i < numPartitions; i++) {
    PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
    partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));
}

实际上分片就是用游标的方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据的读取,组装过程是SearchRequestBuilder.assemble方法来实现的。

这个其实个人觉得会浪费一定的性能,假如真的要ES结合Spark的话,建议合理设置分片数。

b).findShardPartitions方法

这个方法没啥疑问了就是一个RDD分区对应于ES index的一个分片。

代码语言:javascript
复制
PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,
locationList.toArray(new String[0]));
partitions.add(partition);

3.总结

以上就是Spark Core读取ES数据的时候分片和RDD分区的对应关系分析,默认情况下是一个es 索引分片对应Spark RDD的一个分区。假如分片数过大,且ES版本在5.x及以上,可以配置参数

代码语言:javascript
复制
es.input.max.docs.per.partition

进行拆分。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档