专栏首页腾讯云Elasticsearch Service腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
原创

腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇

Hadoop/Spark读写ES之性能调优

ES-Hadoop 是 Elastic 官方推出的一个用于对接 Hadoop 生态的工具,使得用户可以使用 Mapreduce(MR)、Spark、Hive 等工具处理 ES 上的数据。众所周知,Hadoop 生态的长处是处理大规模数据集,但是其缺点也很明显,就是当用于交互式分析时,查询时延会比较长。而 ES 在这几个方面的能力很强,对于如 ad-hoc 查询,基本可以做到秒级。ES-Hadoop 的推出提供了一种组合两者优势的可能性。使用 ES-Hadoop,用户只需要对自己代码做出很小的改动,即可以快速处理存储在 ES 中的数据,并且能够享受到 ES 带来的加速效果。

ES-Hadoop

利用ES-Hadoop 组件,可以将 ES 作为 MR/Spark/Hive 等大数据处理引擎的“数据源”,在大数据计算存储分离的架构中扮演存储的角色。这和 MySQL/PG/MongoDB 等其他数据源并无差异。但相对于其他数据源, ES 具有更灵活的全文检索能力,更快的数据选择过滤能力以及快速的UI展示报表的能力。这些能力正是分析引擎最为关键的能力之一。

下面我们将通过特定案例,介绍如何在腾讯云 EMR 和 腾讯云 Elasticsearch 中使用 ES-Hadoop。

资源准备

购买腾讯云EMR,并勾选hive,spark等组件,以备使用。购买腾讯云Elasticsearch。

数据准备

这里以Elasticsearch官方标准的workbench测试数据http_logs为例,包含的字段如下:

clientip "10.123.123.43"
request: "GET /images/102328s.gif HTTP/1.1"
status: 2xx
size: 80

测试数据写入方法:

pip install esrally
esrally --target-hosts=10.0.4.17:9200 --distribution-version=5.6.4 --track=http_logs --pipeline=benchmark-only --challenge=append-no-conflicts-index-only
#10.0.4.17:9200 为es的vpc内网ip

ES-Hadoop关键配置项说明

'es.nodes' = '10.0.0.17',
'es.port' = '9200',
'es.nodes.wan.only' = 'true',
'es.index.auto.create' = 'false',
'es.resource' = 'logs-201998/type',
'es.read.metadata' = 'true',
'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
'es.index.read.missing.as.empty' = 'true',
'es.input.use.sliced.partitions' = 'false',
'es.input.max.docs.per.partition' = '100000000'

1. es.nodes

配置ES集群的节点ip,腾讯云ES给用户提供了带负载均衡功能的vpc内网vip,这里结合es.nodes.wan.only这个配置项同时使用。

2. es.port

配置ES集群的端口号。

3. es.nodes.wan.only

设置为true,开启ES集群在云上使用vip进行连接,不进行节点嗅探。

4. es.index.auto.create

如通过Hadoop组件向ES集群写入数据,是否自动创建不存在的index。

5. es.resource

指定要读写的index和type

6. es.mapping.names

表字段与ES的索引字段的字段名映射

7. es.read.metadata

如操作的ES字段涉及到_id之类的内部字段,需要将这个配置开启。

8. es.input.max.docs.per.partition

配置单个partition的最大文档数。在执行hive sql的过程中,需要限制mapper的数目,否则ES会面临多个scroll切片的查询,造成CPU打满,影响集群的性能和稳定性。这里需要根据ES索引中数据的总数来灵活的设置。

9. es.input.use.sliced.partitions

更多的ES-Hadoop配置项请参考官方文档https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

在hive中创建ES外部表

1. 登录EMR的master节点,切换至hadoop用户

su - hadoop

2. 下载ES-Hadoop组件jar包

wget http://download.elastic.co/hadoop/elasticsearch-hadoop-5.6.4.zip

3. 启动hive-cli,加载ES-Hadoop组件jar包

hive:->  add jar file:///home/hadoop/elasticsearch-hadoop-5.6.4/dist/elasticsearch-hadoop-hive-5.6.4.jar;

4. 创建外部表

create database if not exists tmp;
drop table tmp.tmp_es;
create external table tmp.tmp_es (uid varchar(255), clientip varchar(255), request varchar(1024), status int)
    STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = '10.0.4.17',
            'es.port'='9200',
            'es.index.auto.create' = 'false',
            'es.resource' ='logs-201998/type',
            'es.read.metadata' = 'true',
            'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
            'es.nodes.wan.only'='true',
            'es.index.read.missing.as.empty'='true',
            'es.input.use.sliced.partitions'='false',
            'es.input.max.docs.per.partition'='100000000'
);

这里以官方测试数据http_logs为例,使用logs-201998这个index,映射了4个字段_id, clientip, request, status。因为索引文档总量为100w+,设置单partition最大文档数为100000000, 期望mapper数保持在5个以内。

5. 查询ES外部表

select * from tmp.tmp_es limit 10;

6. 写入ES外部表或将ES索引中的数据导入到hive的内部表

# 写入外部表
insert into tmp.tmp_es values ('sfasfsdf', '10.0.0.11', 'sdfsfa', 200);

7. ES索引中的数据导入到hive的内部表

# 将hive内部表中的数据导入至ES外部表
drop table tmp.tmp_hive;
create table tmp.tmp_hive (uid varchar(255), clientip varchar(255), request varchar(1024), status int);
insert into tmp.tmp_hive select * from tmp.tmp_es;

通过MapReduce任务向ES写入数据

如一些较复杂的分析任务,不适合使用hive sql完成。下面这个例子演示了如何通过MR任务,读取HDFS上的JSON文件,并写入ES集群。

1. 新增数据

我们新增一条http log,clientip设置为特殊的255.255.255.255。写入到http_log.json,并上传至HDFS的/es-hadoop目录。

{"@timestamp": 895435190,"clientip": "255.255.255.255","request": "GET /images/102328s.gif HTTP/1.1","status": 200,"size": 802}

2. MR代码

编译打包如下代码为esmr-1.0-SNAPSHOT.jar,编写Mapper,读取HDFS上的json文件,并在map阶段通过EsOutputFormat写入。

  • 在设置中关闭map 和 reduce 的推测执行机制
  • 设置es.input.json为true,将源文件按json来解析。

完整工程代码可以在这里下载

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WriteToEsWithMR extends Configured implements Tool {

    public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
        private Text doc = new Text();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            if (value.getLength() > 0) {
                doc.set(value);
                System.out.println(value);
                context.write(NullWritable.get(), doc);
            }
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.setBoolean("mapreduce.map.speculative", false);
        conf.setBoolean("mapreduce.reduce.speculative", false);
        conf.set("es.nodes", "10.0.4.17:9200");
        conf.set("es.nodes.wan.only", "true");
        conf.set("es.resource", "logs-201998/type");
        conf.set("es.input.json", "yes");

        Job job = Job.getInstance(conf);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setJarByClass(WriteToEsWithMR.class);
        job.setMapperClass(EsMapper.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new WriteToEsWithMR(), args);
        System.exit(ret);
    }
}

3. 执行

hadoop jar esmr-1.0-SNAPSHOT.jar /es-hadoop

4. 验证

通过API或kibana查询clientip为255.255.255.255的记录

GET logs-201998/_search
{
  "query": {
    "match": {
      "clientip":"255.255.255.255"
    }
  }
}

结语

本文以腾讯云上的EMR和Elasticsearch为例,介绍了如何通过ES强大的ES-Hadoop组件,在hive和MR上进行数据的查询和写入。可以方便用户将Elasticsearch与Hadoop生态组件结合起来,提供更灵活的分析能力。下一篇将为大家介绍ES-Hadoop之Spark篇的内容,将为大家进一步介绍在spark中如果读取和写入ES数据,敬请期待。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Hadoop/Spark读写ES之性能调优

    腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

    ethanzhang
  • 腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇

    腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

    ethanzhang
  • 腾讯云ES数据备份恢复使用教程(视频)

    在腾讯云Elasticsearch集群上通过COS进行索引数据的备份与恢复,教程包含:

    ethanzhang
  • 在生成藏头诗的应用体会elaticsearch的运用场景

    最近在学习elasticsearch,作为一个萌新,对es的理解还仅仅停留在它的数据库功能上,更同时摆脱不了关系数据库的影响。在翻阅了es的入门书,看些网上博客...

    逆回十六夜
  • Python如何把Spark数据写入ElasticSearch

    这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中。

    砸漏
  • 动态规划解决01背包问题

    一、问题描述:有n 个物品,它们有各自的重量和价值,现有给定容量的背包,如何让背包里装入的物品具有最大的价值总和?

    Christal_R
  • 自然语言处理起源:马尔科夫和香农的语言建模实验

    1913 年,俄国数学家安德烈·安德烈耶维奇·马尔科夫(Andrey Andreyevich Markov)坐在他圣彼得堡的书房里,手里拿着当时的文学巨著——普...

    机器之心
  • Maven项目遇到的BUG汇总

    Maven使用的过程中,可能会遇到各种各样的问题。 下面介绍项目中遇到的两个BUG.

    白凡
  • 原创的《Pandas120题》和 《NumPy80题》.pdf

    Pandas与NumPy都是Python数据分析中的利器,但是对着官方文档学习是十分枯燥且低效的方式,因此我精心挑选了200个Python数据处理中的常用操作,...

    朱小五
  • 【云+社区年度征文】Docker教程(3)—Docker安装Tomcat,Nginx与可视化工具Portainer

    前面我们讲了Docker的安装使用,以及一些常用的命令。俗话说只说不练假把式,接下来我们使用安装Docker安装一些常用的镜像,练习一下Docker命令的使用。

    AlbertYang

扫码关注云+社区

领取腾讯云代金券