前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

原创
作者头像
ethanzhang
修改于 2020-04-11 01:57:24
修改于 2020-04-11 01:57:24
5.4K1
举报

腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇

Hadoop/Spark读写ES之性能调优

ES-Hadoop 是 Elastic 官方推出的一个用于对接 Hadoop 生态的工具,使得用户可以使用 Mapreduce(MR)、SparkHive 等工具处理 ES 上的数据。众所周知,Hadoop 生态的长处是处理大规模数据集,但是其缺点也很明显,就是当用于交互式分析时,查询时延会比较长。而 ES 在这几个方面的能力很强,对于如 ad-hoc 查询,基本可以做到秒级。ES-Hadoop 的推出提供了一种组合两者优势的可能性。使用 ES-Hadoop,用户只需要对自己代码做出很小的改动,即可以快速处理存储在 ES 中的数据,并且能够享受到 ES 带来的加速效果。

ES-Hadoop
ES-Hadoop

利用ES-Hadoop 组件,可以将 ES 作为 MR/Spark/Hive 等大数据处理引擎的“数据源”,在大数据计算存储分离的架构中扮演存储的角色。这和 MySQL/PG/MongoDB 等其他数据源并无差异。但相对于其他数据源, ES 具有更灵活的全文检索能力,更快的数据选择过滤能力以及快速的UI展示报表的能力。这些能力正是分析引擎最为关键的能力之一。

下面我们将通过特定案例,介绍如何在腾讯云 EMR 和 腾讯云 Elasticsearch 中使用 ES-Hadoop。

资源准备

购买腾讯云EMR,并勾选hive,spark等组件,以备使用。购买腾讯云Elasticsearch。

数据准备

这里以Elasticsearch官方标准的workbench测试数据http_logs为例,包含的字段如下:

代码语言:txt
AI代码解释
复制
clientip "10.123.123.43"
request: "GET /images/102328s.gif HTTP/1.1"
status: 2xx
size: 80

测试数据写入方法:

代码语言:txt
AI代码解释
复制
pip install esrally
esrally --target-hosts=10.0.4.17:9200 --distribution-version=5.6.4 --track=http_logs --pipeline=benchmark-only --challenge=append-no-conflicts-index-only
#10.0.4.17:9200 为es的vpc内网ip

ES-Hadoop关键配置项说明

代码语言:txt
AI代码解释
复制
'es.nodes' = '10.0.0.17',
'es.port' = '9200',
'es.nodes.wan.only' = 'true',
'es.index.auto.create' = 'false',
'es.resource' = 'logs-201998/type',
'es.read.metadata' = 'true',
'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
'es.index.read.missing.as.empty' = 'true',
'es.input.use.sliced.partitions' = 'false',
'es.input.max.docs.per.partition' = '100000000'
1. es.nodes

配置ES集群的节点ip,腾讯云ES给用户提供了带负载均衡功能的vpc内网vip,这里结合es.nodes.wan.only这个配置项同时使用。

2. es.port

配置ES集群的端口号。

3. es.nodes.wan.only

设置为true,开启ES集群在云上使用vip进行连接,不进行节点嗅探。

4. es.index.auto.create

如通过Hadoop组件向ES集群写入数据,是否自动创建不存在的index。

5. es.resource

指定要读写的index和type

6. es.mapping.names

表字段与ES的索引字段的字段名映射

7. es.read.metadata

如操作的ES字段涉及到_id之类的内部字段,需要将这个配置开启。

8. es.input.max.docs.per.partition

配置单个partition的最大文档数。在执行hive sql的过程中,需要限制mapper的数目,否则ES会面临多个scroll切片的查询,造成CPU打满,影响集群的性能和稳定性。这里需要根据ES索引中数据的总数来灵活的设置。

9. es.input.use.sliced.partitions

更多的ES-Hadoop配置项请参考官方文档https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

在hive中创建ES外部表

1. 登录EMR的master节点,切换至hadoop用户
代码语言:txt
AI代码解释
复制
su - hadoop
2. 下载ES-Hadoop组件jar包
代码语言:txt
AI代码解释
复制
wget http://download.elastic.co/hadoop/elasticsearch-hadoop-5.6.4.zip
3. 启动hive-cli,加载ES-Hadoop组件jar包
代码语言:txt
AI代码解释
复制
hive:->  add jar file:///home/hadoop/elasticsearch-hadoop-5.6.4/dist/elasticsearch-hadoop-hive-5.6.4.jar;
4. 创建外部表
代码语言:txt
AI代码解释
复制
create database if not exists tmp;
drop table tmp.tmp_es;
create external table tmp.tmp_es (uid varchar(255), clientip varchar(255), request varchar(1024), status int)
    STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = '10.0.4.17',
            'es.port'='9200',
            'es.index.auto.create' = 'false',
            'es.resource' ='logs-201998/type',
            'es.read.metadata' = 'true',
            'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
            'es.nodes.wan.only'='true',
            'es.index.read.missing.as.empty'='true',
            'es.input.use.sliced.partitions'='false',
            'es.input.max.docs.per.partition'='100000000'
);

这里以官方测试数据http_logs为例,使用logs-201998这个index,映射了4个字段_id, clientip, request, status。因为索引文档总量为100w+,设置单partition最大文档数为100000000, 期望mapper数保持在5个以内。

5. 查询ES外部表
代码语言:txt
AI代码解释
复制
select * from tmp.tmp_es limit 10;
6. 写入ES外部表或将ES索引中的数据导入到hive的内部表
代码语言:txt
AI代码解释
复制
# 写入外部表
insert into tmp.tmp_es values ('sfasfsdf', '10.0.0.11', 'sdfsfa', 200);
7. ES索引中的数据导入到hive的内部表
代码语言:txt
AI代码解释
复制
# 将hive内部表中的数据导入至ES外部表
drop table tmp.tmp_hive;
create table tmp.tmp_hive (uid varchar(255), clientip varchar(255), request varchar(1024), status int);
insert into tmp.tmp_hive select * from tmp.tmp_es;

通过MapReduce任务向ES写入数据

如一些较复杂的分析任务,不适合使用hive sql完成。下面这个例子演示了如何通过MR任务,读取HDFS上的JSON文件,并写入ES集群。

1. 新增数据

我们新增一条http log,clientip设置为特殊的255.255.255.255。写入到http_log.json,并上传至HDFS的/es-hadoop目录。

代码语言:txt
AI代码解释
复制
{"@timestamp": 895435190,"clientip": "255.255.255.255","request": "GET /images/102328s.gif HTTP/1.1","status": 200,"size": 802}
2. MR代码

编译打包如下代码为esmr-1.0-SNAPSHOT.jar,编写Mapper,读取HDFS上的json文件,并在map阶段通过EsOutputFormat写入。

  • 在设置中关闭map 和 reduce 的推测执行机制
  • 设置es.input.json为true,将源文件按json来解析。

完整工程代码可以在这里下载

代码语言:txt
AI代码解释
复制
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WriteToEsWithMR extends Configured implements Tool {

    public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
        private Text doc = new Text();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            if (value.getLength() > 0) {
                doc.set(value);
                System.out.println(value);
                context.write(NullWritable.get(), doc);
            }
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.setBoolean("mapreduce.map.speculative", false);
        conf.setBoolean("mapreduce.reduce.speculative", false);
        conf.set("es.nodes", "10.0.4.17:9200");
        conf.set("es.nodes.wan.only", "true");
        conf.set("es.resource", "logs-201998/type");
        conf.set("es.input.json", "yes");

        Job job = Job.getInstance(conf);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setJarByClass(WriteToEsWithMR.class);
        job.setMapperClass(EsMapper.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new WriteToEsWithMR(), args);
        System.exit(ret);
    }
}
3. 执行
代码语言:txt
AI代码解释
复制
hadoop jar esmr-1.0-SNAPSHOT.jar /es-hadoop
4. 验证

通过API或kibana查询clientip为255.255.255.255的记录

代码语言:txt
AI代码解释
复制
GET logs-201998/_search
{
  "query": {
    "match": {
      "clientip":"255.255.255.255"
    }
  }
}

结语

本文以腾讯云上的EMR和Elasticsearch为例,介绍了如何通过ES强大的ES-Hadoop组件,在hive和MR上进行数据的查询和写入。可以方便用户将Elasticsearch与Hadoop生态组件结合起来,提供更灵活的分析能力。下一篇将为大家介绍ES-Hadoop之Spark篇的内容,将为大家进一步介绍在spark中如果读取和写入ES数据,敬请期待。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
1 条评论
热度
最新
牛批
牛批
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
ethanzhang
2018/12/29
8.8K0
腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
Hadoop/Spark读写ES之性能调优
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
ethanzhang
2020/04/09
5.6K0
Hadoop/Spark读写ES之性能调优
kerberos+ranger场景下使用hive操作ES集群数据
使用hive对ES的数据进行增查改,通过kerberos+ranger完成不同用户对于不同表,列的细粒度权限控制。
沈小翊
2023/07/18
9002
kerberos+ranger场景下使用hive操作ES集群数据
使用Hive读写ElasticSearch中的数据
ElasticSearch已经可以与YARN、Hadoop、Hive、Pig、Spark、Flume等大数据技术框架整合起来使用,尤其是在添加数据的时候,可以使用分布式任务来添加索引数据,尤其是在数据平台上,很多数据存储在Hive中,使用Hive操作ElasticSearch中的数据,将极大的方便开发人员。这里记录一下Hive与ElasticSearch整合,查询和添加数据的配置使用过程。基于Hive0.13.1、Hadoop-cdh5.0、ElasticSearch 2.1.0。
Java架构师必看
2021/08/12
1.5K0
大厂程序员为了更好的满足历史数据的保存和检索往往选择这种神操作!
ElasticSearch是一款开源的非常火爆的文档索引引擎, 大小公司都比较青睐的一款做日志检索、分析、查询的工具。
云存储
2019/12/11
5330
大厂程序员为了更好的满足历史数据的保存和检索往往选择这种神操作!
Hive如何创建elasticsearch外部表
Elasticsearch 是一个开源的分布式搜索和分析引擎,建立在 Apache Lucene 基础上。它提供了一个可扩展的、实时的搜索和分析平台,用于处理和分析大规模的结构化和非结构化数据。 在类实时读写与全文检索上有极大的优势。
空洞的盒子
2023/11/27
1.3K1
ES-Hadoop 实践
在大数据背景下,适用于不同场景下的框架、系统层出不穷,在批量数据计算上hadoop鲜有敌手,而在实时搜索领域es则是独孤求败,那如何能让数据同时结合两者优势呢?本文介绍的es-hadoop将做到这点。关于es-hadoop的使用在ethanbzhang之前的两篇文章《腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇》和《腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇》中已经进行了一些介绍,本文一方面是对其内容的一些补充,另一方面也是对个人实践过程的一个总结。
franyang
2019/12/03
3.5K0
ES-Hadoop 实践
Elasticsearch与Hive的数据互导
首先先下载一个叫"elasticsearch-hadoop-hive"的JAR包,放到相应路径下:https://jar-download.com/artifacts/org.elasticsearch/elasticsearch-hadoop-hive
dandelion1990
2019/06/27
6.5K2
Hive-ES配置实例
配置过程如下: 1)添加jar包 add jar file:///path/to/elasticsearch-hadoop-hive-5.2.0.jar; 2)创建一个hive-es对应表 create external table tmp.es_guo_test( imp_date string, group_code string, member_uin string, uin_flag bigint, ex_flag bigint, ower_f
YG
2018/05/23
5560
hive读写ES集群及Role权限控制
1.下载elasticsearch-hadoop-hive-xxx.jar包,版本要与ES集群对应
沈小翊
2023/11/15
3300
大数据技术之_11_HBase学习_02_HBase API 操作 + HBase 与 Hive 集成 + HBase 优化
注意:truncate,清空表数据,实际底层操作是先使表不可用(下线),然后删除表,最后根据表信息重新创建一张新表。
黑泽君
2019/03/15
1.9K0
大数据技术之_11_HBase学习_02_HBase API 操作 + HBase 与 Hive 集成 + HBase 优化
Spark读写ES最佳实践
更换代码中公网ip为内网ip,选择maven assembly plugin进行打包,上传带依赖的jar包到EMR上,运行"ReadES"
沈小翊
2023/11/14
7840
orc文件格式对常用系统的支持
1、Hive支持 创建表时指定orc格式即可: create table tmp.orc_test(id bigint, name string, age int) stored as orc TBLPROPERTIES('orc.compress'='SNAPPY') 压缩格式有"SNAPPY"和 "ZLIB"两种,需要哪种格式指定即可。 2、SPARK支持 Spark读: df = spark.read.orc("/tmp/test/orc_data") # 读出来的数据是一个dataframe
YG
2018/11/22
2.2K0
腾讯云EMR&Elasticsearch中 使用ES-Hadoop&云HDFS进行数据交换和备份
腾讯云EMR和ES是两款非常火热的大数据分析产品,长期以来一直是分别在客户场景下使用的,不过随着云上CHDFS产品的上线,以及ES-Hadoop等插件的完善,两者结合使用有了比较成熟的方案,下面就介绍一下相关使用的方式:
说云时间
2019/12/06
1.4K0
腾讯云EMR&Elasticsearch中 使用ES-Hadoop&云HDFS进行数据交换和备份
hadoop系列之MR的经典代码案例一
七、MapReduce经典案例 1、网站分析案例 1)分析 省份访问 procinceId --> Key 1 -->Value <procinceId,lis
Spark学习技巧
2018/01/31
2.2K0
hadoop系列之MR的经典代码案例一
Spark SQL读写 ES7.x 及问题总结
ES官方提供了对spark的支持,可以直接通过spark读写es,具体可以参考ES Spark Support文档(https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark)
大数据真好玩
2021/01/26
3.6K0
Hadoop基础教程-第7章 MapReduce进阶(7.6 MapReduce 二次排序)
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的。在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求。所谓二次排序,先按第1个字段进行排序,然后对第1个字段相同的数据,再按第2个字段进行排序。
程裕强
2022/05/06
2160
HBase快速入门系列(7) | 官方HBase-MapReduce与自定义
1. 官方HBase-MapReduce 1.查看HBase的MapReduce任务的执行 [bigdata@hadoop002 hbase]$ bin/hbase mapredcp 上图标记处为
不温卜火
2020/10/28
8250
HBase快速入门系列(7) | 官方HBase-MapReduce与自定义
万法归宗之Hadoop编程无界限
记录下,散仙今天的工作以及遇到的问题和解决方案,俗话说,好记性不如烂笔头,写出来文章,供大家参考,学习和点评,进步,才是王道 ,废话不多说,下面切入主题: 先介绍下需求: 散仙要处理多个类似表的txt数据,当然只有值,列名什么的全部在xml里配置了,然后加工这些每个表的每一行数据,生成特定的格式基于ASCII码1和ASCII码2作为分隔符的一行数据,ASCII2作为字段名和字段值的分隔符,ASCII1作为字段和字段之间的分隔符,每解析一个txt文件时,都要获取文件名,然后与xml中的schema信息
我是攻城师
2018/05/11
8110
快速学习-HBaseAPI操作
通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的表中,比如我们从HBase中读取一些原始数据后使用MapReduce做数据分析。
cwl_java
2020/02/21
4720
相关推荐
腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文