前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ES-Hadoop 实践

ES-Hadoop 实践

原创
作者头像
franyang
修改2019-12-23 13:14:31
3.2K0
修改2019-12-23 13:14:31
举报

介绍

在大数据背景下,适用于不同场景下的框架、系统层出不穷,在批量数据计算上hadoop鲜有敌手,而在实时搜索领域es则是独孤求败,那如何能让数据同时结合两者优势呢?本文介绍的es-hadoop将做到这点。关于es-hadoop的使用在ethanbzhang之前的两篇文章《腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇》《腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇》中已经进行了一些介绍,本文一方面是对其内容的一些补充,另一方面也是对个人实践过程的一个总结。

hadoop是一个优秀的批量数据处理系统,能对数据进行复杂计算,但实时的(全文)搜索却比较困难;在这两点上ES与之几乎相反:很好的支持实时(全文)搜索,但只能用聚合查询进行简单的数据计算,也无法支持大批量的数据。

ES hadoop是一个ES对接hadoop生态的工具,它允许hadoop任务(比如MR、hive、pig、spark等)与ES交互,比如让hadoop以ES作为数据源进行计算、将计算的中间或结果数据存储到ES中等,这意味着它将能够整合Elasticsearch和hadoop各自的优势。

另外,ES-hadoop还提供插件Hadoop HDFS Repository Plugin允许将ES数据备份到hdfs或从其恢复。

实现

这部分将介绍ES-hadoop是如何将ES和hadoop的数据实体进行映射的。

在分布式系统中,扩展计算能力的一个关键因素是:并行,或者说是将一个任务划分成多个更小的任务,使他们同时运行在集群的不同节点上,读取数据的不同部分进行计算。这个概念对应hadoop是splits、对应spark是partition、对应Elasticsearch则是shards。简单来讲,更多的splits、partition或shards意味着能有更多的任务同时读数据源的不同部分并进行计算,提高了计算能力。

既然并行如此重要,那么在hadoop中使用es-hadoop与ES进行数据交互时,它仍然应该能够并行的读写数据的不同部分,否则计算能力将大大降低。

从ES读取数据

在spark、MR等系统中使用elasticsearch-hadoop从ES读取数据时,shard是一个关键的角色,因为elasticsearch-hadoop将为ES索引中的每个shard创建一个Hadoop InputSplit或Spark Partition。

大家看到这里可能会有疑问:es-hadoop是如何同时读取ES索引中不同shard数据的呢?这里也对其实现做简单的介绍。

通过文章Spark Core读取ES的分区问题分析中的源码分析了解到,当es-hadoop从ES读取索引数据时,它获取索引各个shard的信息,包括:shard id、所在节点id等,并创建对应的Spark partition;除了按shard创建parition的方式外,es-hadoop还允许按shard最大doc数创建partition(配置项:es.input.max.docs.per.partition),这意味着es-hadoop能将shard切分成更小的数据集对应于partition,这对shard容量过大的情况将非常适用。 但说到这里,其实还是没能解答疑问:如何同时获取不同shard的数据呢?通过阅读elasticsearch-hadoop源码我找到了答案:

在文件mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java:389createReader方法创建了PartitionReader,用于将读取ES shards生成partition。在方法中可以看到其构造了ES 查询请求:

代码语言:txt
复制
    public static PartitionReader createReader(Settings settings, PartitionDefinition partition, Log log) {
        // ...
        SearchRequestBuilder requestBuilder =
                new SearchRequestBuilder(clusterInfo.getMajorVersion(), includeVersion)
                        .resource(read)
                        // Overwrite the index name from the resource to be that of the concrete index in the partition definition
                        .indices(partition.getIndex())
                        .query(QueryUtils.parseQuery(settings))
                        .scroll(settings.getScrollKeepAlive())
                        .size(settings.getScrollSize())
                        .limit(settings.getScrollLimit())
                        .fields(SettingsUtils.determineSourceFields(settings))
                        .filters(QueryUtils.parseFilters(settings))
                        .shard(Integer.toString(partition.getShardId()))
                        .readMetadata(settings.getReadMetadata())
                        .local(true)
                        .preference(settings.getShardPreference())
                        .excludeSource(settings.getExcludeSource());

可以看到:

  1. 请求使用scroll API查询批量的数据,这是ES用来查询批量数据非常常见的用法,因为它能避免深分页问题。
  2. 使用preference参数指定要查询的某个shard,每个partition都从指定shard获取数据,这样就能做到从多个shard并行的获取数据。除此之外,preference 还能从指定节点、主副shard、本地节点等条件查询数据。

向ES写入数据

和读取类似的,es-hadoop能够将hadoop的splits或spark partition数据对应成shard并行的写入ES。

实践

这里以一个使用spark对es索引数据进行单词计数(wordcount)的使用示例,介绍es-hadoop中spark是如何操作es数据的。示例源码位于:https://github.com/yyff/es-spark-wordcount

生成es-hadooop配置

代码语言:txt
复制
        SparkConf sparkConf = new SparkConf().setAppName("wordcount")
                .setMaster("local[*]").set("es.index.auto.create", "true")
                .set("es.nodes", esHost).set("es.port", esPort).set("es.nodes.wan.only", "true")
                .set("es.net.http.auth.user", "elastic")
                .set("es.net.http.auth.pass", password);

详细配置见:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapreduce.html

使用spark native 读取ES数据

1、 调用JavaEsSpark.esRDD从索引查询中创建RDD

代码语言:txt
复制
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, indexName, query).values();

2、 统计词频

代码语言:txt
复制
        JavaPairRDD<String, Integer> counts = esRDD.flatMap(doc -> {
           String fieldValue = (String)doc.get(field);
           if (fieldValue == null) {
               fieldValue = "";
           }
           return Arrays.asList(fieldValue.split(" ")).iterator();
        }).mapToPair(word -> new Tuple2<String, Integer>(word, 1)).reduceByKey((x, y) -> x + y);

3、 结果输出到文件和ES

代码语言:txt
复制
        counts.saveAsTextFile(outDir);
        EsSpark.saveToEs(counts.rdd(), "spark-native");

4、 查看结果

  • 文件:
  • es 索引

使用spark sql 读取ES数据

1、 使用配置创建spark session

代码语言:txt
复制
        SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
        JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
        SQLContext sql = new SQLContext(jsc);

2、 调用JavaEsSparkSQL.esDF从es sql查询中生成Dataset

代码语言:txt
复制
        Dataset<Row> ds = JavaEsSparkSQL.esDF(sql, indexName);
        ds = ds.select(ds.col(field));
        for (String cond : conditions) {
            ds = ds.where(cond);
        }

3、 统计词频

代码语言:txt
复制
        JavaPairRDD<String, Integer> counts = ds.javaRDD().flatMap(
                row -> {
                    String line = row.getAs(field);
                    if (line == null) {
                        line = "";
                    }
                    return Arrays.asList(line.split(" ")).iterator();
                }).mapToPair(word -> new Tuple2<String, Integer>(word, 1)).reduceByKey((x, y) -> x + y);

4、 结果输出到文件和ES,关闭session

代码语言:txt
复制
        counts.saveAsTextFile(outDir);
        EsSpark.saveToEs(counts.rdd(), "spark-sql");
        sparkSession.stop();

5、 查看结果

  • 文件
  • es 索引

总结

通过es-hadoop,ES可以作为MR、Hive、Spark等的数据源,这意味着什么呢?意味着对于既需要使用Spark等工具进行批量分析和计算、又需要使用ES做实时搜索的数据,比如常见的业务日志,可以只存在于ES中,而无需重复存储于HDFS等存储中,极大的节省了存储成本。

在使用方面,通过ES-hadoop的实现可以看到,ES的shard和hadoop splits、spark partition有着对应关系,因此对要用于hadoop分析的索引设置合理的分片数变得十分重要,因为这将充分利用hadoop的并行计算能力。

参考

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 实现
    • 从ES读取数据
      • 向ES写入数据
      • 实践
        • 生成es-hadooop配置
          • 使用spark native 读取ES数据
            • 使用spark sql 读取ES数据
            • 总结
            • 参考
            相关产品与服务
            Elasticsearch Service
            腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档