前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

原创
作者头像
ethanzhang
修改2020-04-11 09:57:24
5.2K1
修改2020-04-11 09:57:24
举报

腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇

Hadoop/Spark读写ES之性能调优

ES-Hadoop 是 Elastic 官方推出的一个用于对接 Hadoop 生态的工具,使得用户可以使用 Mapreduce(MR)、Spark、Hive 等工具处理 ES 上的数据。众所周知,Hadoop 生态的长处是处理大规模数据集,但是其缺点也很明显,就是当用于交互式分析时,查询时延会比较长。而 ES 在这几个方面的能力很强,对于如 ad-hoc 查询,基本可以做到秒级。ES-Hadoop 的推出提供了一种组合两者优势的可能性。使用 ES-Hadoop,用户只需要对自己代码做出很小的改动,即可以快速处理存储在 ES 中的数据,并且能够享受到 ES 带来的加速效果。

ES-Hadoop
ES-Hadoop

利用ES-Hadoop 组件,可以将 ES 作为 MR/Spark/Hive 等大数据处理引擎的“数据源”,在大数据计算存储分离的架构中扮演存储的角色。这和 MySQL/PG/MongoDB 等其他数据源并无差异。但相对于其他数据源, ES 具有更灵活的全文检索能力,更快的数据选择过滤能力以及快速的UI展示报表的能力。这些能力正是分析引擎最为关键的能力之一。

下面我们将通过特定案例,介绍如何在腾讯云 EMR 和 腾讯云 Elasticsearch 中使用 ES-Hadoop。

资源准备

购买腾讯云EMR,并勾选hive,spark等组件,以备使用。购买腾讯云Elasticsearch。

数据准备

这里以Elasticsearch官方标准的workbench测试数据http_logs为例,包含的字段如下:

代码语言:txt
复制
clientip "10.123.123.43"
request: "GET /images/102328s.gif HTTP/1.1"
status: 2xx
size: 80

测试数据写入方法:

代码语言:txt
复制
pip install esrally
esrally --target-hosts=10.0.4.17:9200 --distribution-version=5.6.4 --track=http_logs --pipeline=benchmark-only --challenge=append-no-conflicts-index-only
#10.0.4.17:9200 为es的vpc内网ip

ES-Hadoop关键配置项说明

代码语言:txt
复制
'es.nodes' = '10.0.0.17',
'es.port' = '9200',
'es.nodes.wan.only' = 'true',
'es.index.auto.create' = 'false',
'es.resource' = 'logs-201998/type',
'es.read.metadata' = 'true',
'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
'es.index.read.missing.as.empty' = 'true',
'es.input.use.sliced.partitions' = 'false',
'es.input.max.docs.per.partition' = '100000000'
1. es.nodes

配置ES集群的节点ip,腾讯云ES给用户提供了带负载均衡功能的vpc内网vip,这里结合es.nodes.wan.only这个配置项同时使用。

2. es.port

配置ES集群的端口号。

3. es.nodes.wan.only

设置为true,开启ES集群在云上使用vip进行连接,不进行节点嗅探。

4. es.index.auto.create

如通过Hadoop组件向ES集群写入数据,是否自动创建不存在的index。

5. es.resource

指定要读写的index和type

6. es.mapping.names

表字段与ES的索引字段的字段名映射

7. es.read.metadata

如操作的ES字段涉及到_id之类的内部字段,需要将这个配置开启。

8. es.input.max.docs.per.partition

配置单个partition的最大文档数。在执行hive sql的过程中,需要限制mapper的数目,否则ES会面临多个scroll切片的查询,造成CPU打满,影响集群的性能和稳定性。这里需要根据ES索引中数据的总数来灵活的设置。

9. es.input.use.sliced.partitions

更多的ES-Hadoop配置项请参考官方文档https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

在hive中创建ES外部表

1. 登录EMR的master节点,切换至hadoop用户
代码语言:txt
复制
su - hadoop
2. 下载ES-Hadoop组件jar包
代码语言:txt
复制
wget http://download.elastic.co/hadoop/elasticsearch-hadoop-5.6.4.zip
3. 启动hive-cli,加载ES-Hadoop组件jar包
代码语言:txt
复制
hive:->  add jar file:///home/hadoop/elasticsearch-hadoop-5.6.4/dist/elasticsearch-hadoop-hive-5.6.4.jar;
4. 创建外部表
代码语言:txt
复制
create database if not exists tmp;
drop table tmp.tmp_es;
create external table tmp.tmp_es (uid varchar(255), clientip varchar(255), request varchar(1024), status int)
    STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = '10.0.4.17',
            'es.port'='9200',
            'es.index.auto.create' = 'false',
            'es.resource' ='logs-201998/type',
            'es.read.metadata' = 'true',
            'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
            'es.nodes.wan.only'='true',
            'es.index.read.missing.as.empty'='true',
            'es.input.use.sliced.partitions'='false',
            'es.input.max.docs.per.partition'='100000000'
);

这里以官方测试数据http_logs为例,使用logs-201998这个index,映射了4个字段_id, clientip, request, status。因为索引文档总量为100w+,设置单partition最大文档数为100000000, 期望mapper数保持在5个以内。

5. 查询ES外部表
代码语言:txt
复制
select * from tmp.tmp_es limit 10;
6. 写入ES外部表或将ES索引中的数据导入到hive的内部表
代码语言:txt
复制
# 写入外部表
insert into tmp.tmp_es values ('sfasfsdf', '10.0.0.11', 'sdfsfa', 200);
7. ES索引中的数据导入到hive的内部表
代码语言:txt
复制
# 将hive内部表中的数据导入至ES外部表
drop table tmp.tmp_hive;
create table tmp.tmp_hive (uid varchar(255), clientip varchar(255), request varchar(1024), status int);
insert into tmp.tmp_hive select * from tmp.tmp_es;

通过MapReduce任务向ES写入数据

如一些较复杂的分析任务,不适合使用hive sql完成。下面这个例子演示了如何通过MR任务,读取HDFS上的JSON文件,并写入ES集群。

1. 新增数据

我们新增一条http log,clientip设置为特殊的255.255.255.255。写入到http_log.json,并上传至HDFS的/es-hadoop目录。

代码语言:txt
复制
{"@timestamp": 895435190,"clientip": "255.255.255.255","request": "GET /images/102328s.gif HTTP/1.1","status": 200,"size": 802}
2. MR代码

编译打包如下代码为esmr-1.0-SNAPSHOT.jar,编写Mapper,读取HDFS上的json文件,并在map阶段通过EsOutputFormat写入。

  • 在设置中关闭map 和 reduce 的推测执行机制
  • 设置es.input.json为true,将源文件按json来解析。

完整工程代码可以在这里下载

代码语言:txt
复制
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WriteToEsWithMR extends Configured implements Tool {

    public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
        private Text doc = new Text();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            if (value.getLength() > 0) {
                doc.set(value);
                System.out.println(value);
                context.write(NullWritable.get(), doc);
            }
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.setBoolean("mapreduce.map.speculative", false);
        conf.setBoolean("mapreduce.reduce.speculative", false);
        conf.set("es.nodes", "10.0.4.17:9200");
        conf.set("es.nodes.wan.only", "true");
        conf.set("es.resource", "logs-201998/type");
        conf.set("es.input.json", "yes");

        Job job = Job.getInstance(conf);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setJarByClass(WriteToEsWithMR.class);
        job.setMapperClass(EsMapper.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new WriteToEsWithMR(), args);
        System.exit(ret);
    }
}
3. 执行
代码语言:txt
复制
hadoop jar esmr-1.0-SNAPSHOT.jar /es-hadoop
4. 验证

通过API或kibana查询clientip为255.255.255.255的记录

代码语言:txt
复制
GET logs-201998/_search
{
  "query": {
    "match": {
      "clientip":"255.255.255.255"
    }
  }
}

结语

本文以腾讯云上的EMR和Elasticsearch为例,介绍了如何通过ES强大的ES-Hadoop组件,在hive和MR上进行数据的查询和写入。可以方便用户将Elasticsearch与Hadoop生态组件结合起来,提供更灵活的分析能力。下一篇将为大家介绍ES-Hadoop之Spark篇的内容,将为大家进一步介绍在spark中如果读取和写入ES数据,敬请期待。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 资源准备
  • 数据准备
  • ES-Hadoop关键配置项说明
  • 在hive中创建ES外部表
  • 通过MapReduce任务向ES写入数据
  • 结语
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档