腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇

ES-Hadoop 是 Elastic 官方推出的一个用于对接 Hadoop 生态的工具,使得用户可以使用 Mapreduce(MR)、Spark、Hive 等工具处理 ES 上的数据。众所周知,Hadoop 生态的长处是处理大规模数据集,但是其缺点也很明显,就是当用于交互式分析时,查询时延会比较长。而 ES 在这几个方面的能力很强,对于如 ad-hoc 查询,基本可以做到秒级。ES-Hadoop 的推出提供了一种组合两者优势的可能性。使用 ES-Hadoop,用户只需要对自己代码做出很小的改动,即可以快速处理存储在 ES 中的数据,并且能够享受到 ES 带来的加速效果。

ES-Hadoop

利用ES-Hadoop 组件,可以将 ES 作为 MR/Spark/Hive 等大数据处理引擎的“数据源”,在大数据计算存储分离的架构中扮演存储的角色。这和 MySQL/PG/MongoDB 等其他数据源并无差异。但相对于其他数据源, ES 具有更灵活的全文检索能力,更快的数据选择过滤能力以及快速的UI展示报表的能力。这些能力正是分析引擎最为关键的能力之一。

下面我们将通过特定案例,介绍如何在腾讯云 EMR 和 腾讯云 Elasticsearch 中使用 ES-Hadoop。

资源准备

购买腾讯云EMR,并勾选hive,spark等组件,以备使用。购买腾讯云Elasticsearch。

数据准备

这里以Elasticsearch官方标准的workbench测试数据http_logs为例,包含的字段如下:

clientip "10.123.123.43"
request: "GET /images/102328s.gif HTTP/1.1"
status: 2xx
size: 80

测试数据写入方法:

pip install esrally
esrally --target-hosts=10.0.4.17:9200 --distribution-version=5.6.4 --track=http_logs --pipeline=benchmark-only --challenge=append-no-conflicts-index-only
#10.0.4.17:9200 为es的vpc内网ip

ES-Hadoop关键配置项说明

'es.nodes' = '10.0.0.17',
'es.port' = '9200',
'es.nodes.wan.only' = 'true',
'es.index.auto.create' = 'false',
'es.resource' = 'logs-201998/type',
'es.read.metadata' = 'true',
'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
'es.index.read.missing.as.empty' = 'true',
'es.input.use.sliced.partitions' = 'false',
'es.input.max.docs.per.partition' = '100000000'

1. es.nodes

配置ES集群的节点ip,腾讯云ES给用户提供了带负载均衡功能的vpc内网vip,这里结合es.nodes.wan.only这个配置项同时使用。

2. es.port

配置ES集群的端口号。

3. es.nodes.wan.only

设置为true,开启ES集群在云上使用vip进行连接,不进行节点嗅探。

4. es.index.auto.create

如通过Hadoop组件向ES集群写入数据,是否自动创建不存在的index。

5. es.resource

指定要读写的index和type

6. es.mapping.names

表字段与ES的索引字段的字段名映射

7. es.read.metadata

如操作的ES字段涉及到_id之类的内部字段,需要将这个配置开启。

8. es.input.max.docs.per.partition

配置单个partition的最大文档数。在执行hive sql的过程中,需要限制mapper的数目,否则ES会面临多个scroll切片的查询,造成CPU打满,影响集群的性能和稳定性。这里需要根据ES索引中数据的总数来灵活的设置。

9. es.input.use.sliced.partitions

更多的ES-Hadoop配置项请参考官方文档https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

在hive中创建ES外部表

1. 登录EMR的master节点,切换至hadoop用户

su - hadoop

2. 下载ES-Hadoop组件jar包

wget http://download.elastic.co/hadoop/elasticsearch-hadoop-5.6.4.zip

3. 启动hive-cli,加载ES-Hadoop组件jar包

hive:->  add jar file:///home/hadoop/elasticsearch-hadoop-5.6.4/dist/elasticsearch-hadoop-hive-5.6.4.jar;

4. 创建外部表

create database if not exists tmp;
drop table tmp.tmp_es;
create external table tmp.tmp_es (uid varchar(255), clientip varchar(255), request varchar(1024), status int)
    STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = '10.0.4.17',
            'es.port'='9200',
            'es.index.auto.create' = 'false',
            'es.resource' ='logs-201998/type',
            'es.read.metadata' = 'true',
            'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
            'es.nodes.wan.only'='true',
            'es.index.read.missing.as.empty'='true',
            'es.input.use.sliced.partitions'='false',
            'es.input.max.docs.per.partition'='100000000'
);

这里以官方测试数据http_logs为例,使用logs-201998这个index,映射了4个字段_id, clientip, request, status。因为索引文档总量为100w+,设置单partition最大文档数为100000000, 期望mapper数保持在5个以内。

5. 查询ES外部表

select * from tmp.tmp_es limit 10;

6. 写入ES外部表或将ES索引中的数据导入到hive的内部表

# 写入外部表
insert into tmp.tmp_es values ('sfasfsdf', '10.0.0.11', 'sdfsfa', 200);

7. ES索引中的数据导入到hive的内部表

# 将hive内部表中的数据导入至ES外部表
drop table tmp.tmp_hive;
create table tmp.tmp_hive (uid varchar(255), clientip varchar(255), request varchar(1024), status int);
insert into tmp.tmp_hive select * from tmp.tmp_es;

通过MapReduce任务向ES写入数据

如一些较复杂的分析任务,不适合使用hive sql完成。下面这个例子演示了如何通过MR任务,读取HDFS上的JSON文件,并写入ES集群。

1. 新增数据

我们新增一条http log,clientip设置为特殊的255.255.255.255。写入到http_log.json,并上传至HDFS的/es-hadoop目录。

{"@timestamp": 895435190,"clientip": "255.255.255.255","request": "GET /images/102328s.gif HTTP/1.1","status": 200,"size": 802}

2. MR代码

编译打包如下代码为esmr-1.0-SNAPSHOT.jar,编写Mapper,读取HDFS上的json文件,并在map阶段通过EsOutputFormat写入。

  • 在设置中关闭map 和 reduce 的推测执行机制
  • 设置es.input.json为true,将源文件按json来解析。

完整工程代码可以在这里下载

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WriteToEsWithMR extends Configured implements Tool {

    public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
        private Text doc = new Text();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            if (value.getLength() > 0) {
                doc.set(value);
                System.out.println(value);
                context.write(NullWritable.get(), doc);
            }
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.setBoolean("mapreduce.map.speculative", false);
        conf.setBoolean("mapreduce.reduce.speculative", false);
        conf.set("es.nodes", "10.0.4.17:9200");
        conf.set("es.nodes.wan.only", "true");
        conf.set("es.resource", "logs-201998/type");
        conf.set("es.input.json", "yes");

        Job job = Job.getInstance(conf);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setJarByClass(WriteToEsWithMR.class);
        job.setMapperClass(EsMapper.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new WriteToEsWithMR(), args);
        System.exit(ret);
    }
}

3. 执行

hadoop jar esmr-1.0-SNAPSHOT.jar /es-hadoop

4. 验证

通过API或kibana查询clientip为255.255.255.255的记录

GET logs-201998/_search
{
  "query": {
    "match": {
      "clientip":"255.255.255.255"
    }
  }
}

结语

本文以腾讯云上的EMR和Elasticsearch为例,介绍了如何通过ES强大的ES-Hadoop组件,在hive和MR上进行数据的查询和写入。可以方便用户将Elasticsearch与Hadoop生态组件结合起来,提供更灵活的分析能力。下一篇将为大家介绍ES-Hadoop之Spark篇的内容,将为大家进一步介绍在spark中如果读取和写入ES数据,敬请期待。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

SparkStreaming如何解决小文件问题

1.3K20
来自专栏数据和云

Oracle Database 12.2新特性详解

在2015年旧金山的Oracle OpenWorld大会上,Oracle发布了Database 12.2的Beta版本,虽然Beta版本只对部分用户开放,但是大...

42160
来自专栏用户画像

4.3.4 磁盘组织与管理

在磁盘上进行一次读写操作花费的时间由寻道时间,延迟时间和传输时间决定。其中寻道时间是将磁头移动到指定磁道所需要的时间。延迟时间是磁头定位到某个磁道的扇区(块号)...

8820
来自专栏数据和云

Oracle Database 12.2新特性详解

在2015年旧金山的Oracle OpenWorld大会上,Oracle发布了Database 12.2的Beta版本,虽然Beta版本只对部分用户开放,但是大...

35530
来自专栏Hadoop实操

Cloudera Labs中的Phoenix

Cloudera Labs在2016-06-27宣布打包了Apache Phoenix项目,版本为4.7.0,并基于CDH5.7.0。安装依旧是大家熟悉的Par...

96190
来自专栏知识分享

关于IAR开发STM32配置

因为自己要学Msp430还有ZigBee发现IAR真的挺好用,,,所以以后想着就用IAR写单片机程序,, 这次配置我不会把程序的配置弄得有条理,分开文件夹存放,...

35740
来自专栏CSDN技术头条

腾讯大规模Hadoop集群实践

TDW(Tencent distributed Data Warehouse,腾讯分布式数据仓库)基于开源软件Hadoop和Hive进行构建,打破了传统数据仓库...

57770
来自专栏xingoo, 一个梦想做发明家的程序员

[看图说话] 基于Spark UI性能优化与调试——初级篇

Spark有几种部署的模式,单机版、集群版等等,平时单机版在数据量不大的时候可以跟传统的java程序一样进行断电调试、但是在集群上调试就比较麻烦了...远程断...

45250
来自专栏MongoDB中文社区

9月.精华文章推荐

1.《GDPR: Impact to Your Data Management Landscape:Part 3 》

12920
来自专栏黑泽君的专栏

FIFO、UART、ALE解释

FIFO存储器 FIFO是英文First In First Out 的缩写,是一种先进先出的数据缓存器。

14530

扫码关注云+社区

领取腾讯云代金券