专栏首页腾讯云Elasticsearch Service腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
原创

腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇

Hadoop/Spark读写ES之性能调优

在上一篇中,我们介绍了在Hadoop和hive中做ES数据的导入导出。本篇我们介绍在Spark下使用ES-Hadoop的例子

*注:资源准备、数据准备以及ES-Hadoop关键配置项说明请参考上一篇中的内容

Spark 读取 ES 数据

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;


import java.util.Map;

public class ReadFromESBySpark {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("my-app").clone()
                .set("es.nodes", "10.0.4.17")
                .set("es.port", "9200")
                .set("es.nodes.wan.only", "true")
                .set("es.input.use.sliced.partitions", "false")
                .set("es.input.max.docs.per.partition", "100000000");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc, "logs-201998/type", "?q=clientip:247.37.0.0");

        for (Map<String, Object> item : rdd.values().collect()) {
            System.out.println(item);
        }

        sc.stop();
    }
}

通过JavaEsSpark.esRDD(sc, "logs-201998/type", "?q=clientip:247.37.0.0")方法从ES集群的索引logs-201998/type中,查询query为?q=clientip:247.37.0.0,返回JavaPairRDD

通过 Spark RDD 写入 ES

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.spark_project.guava.collect.ImmutableList;
import org.spark_project.guava.collect.ImmutableMap;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

import java.util.Map;
import java.util.List;

public class WriteToESUseRDD {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("my-app").clone()
                .set("es.nodes", "10.0.4.17")
                .set("es.port", "9200")
                .set("es.nodes.wan.only", "true")
                .set("es.batch.size.bytes", "30MB")
                .set("es.batch.size.entries", "20000")
                .set("es.batch.write.refresh", "false")
                .set("es.batch.write.retry.count", "50")
                .set("es.batch.write.retry.wait", "500s")
                .set("es.http.timeout", "5m")
                .set("es.http.retries", "50")
                set("es.action.heart.beat.lead", "50s");

        JavaSparkContext sc = new JavaSparkContext(conf);

        Map<String, ?> logs = ImmutableMap.of("clientip", "255.255.255.254",
                "request", "POST /write/using_spark_rdd HTTP/1.1",
                "status", 200,"size", 802,
                "@timestamp", 895435190);

        List<Map<String, ?>> list = ImmutableList.of(logs);

        JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(list);

        JavaEsSpark.saveToEs(javaRDD, "logs-201998/type");

        sc.stop();
    }
}

构建JavaRDD,通过JavaEsSpark.saveToEs写入。

通过 Spark Streaming 写入 ES

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.Seconds;

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.spark_project.guava.collect.ImmutableList;
import org.spark_project.guava.collect.ImmutableMap;


import java.util.Map;
import java.util.LinkedList;
import java.util.Queue;

public class WriteToESUseSparkStreaming {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("my-app").clone()
                .set("es.nodes", "10.0.4.17")
                .set("es.port", "9200")
                .set("es.nodes.wan.only","true")
                .set("es.batch.size.bytes", "30MB")
                .set("es.batch.size.entries", "20000")
                .set("es.batch.write.refresh", "false")
                .set("es.batch.write.retry.count", "50")
                .set("es.batch.write.retry.wait", "500s")
                .set("es.http.timeout", "5m")
                .set("es.http.retries", "50")
                set("es.action.heart.beat.lead", "50s");

        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext jssc = new JavaStreamingContext(sc, Seconds.apply(1));

        Map<String, ?> logs = ImmutableMap.of("clientip", "255.255.255.253", "request", "POST /write/using_spark_streaming HTTP/1.1");
        JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(ImmutableList.of(logs));

        Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
        microbatches.add(javaRDD);
        JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);

        JavaEsSparkStreaming.saveToEs(javaDStream, "logs-201998/type");

        sc.stop();
    }
}

构建JavaRDDJavaDStream,通过调用JavaEsSparkStreaming.saveToEs写入。

执行

wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-spark-20_2.11/5.6.4/elasticsearch-spark-20_2.11-5.6.4.jar
spark-submit --jars elasticsearch-spark-20_2.11-5.6.4.jar --class "ReadFromESBySpark" esspark-1.0-SNAPSHOT.jar

通过--jars参数,载入elasticsearch-spark

总结

相比于Hadoop,Spark与ES的交互有更多的方式,包括RDD,Spark Streaming,还有文中未涉及到的DataSet与Spark SQL的模式等等。本位未列出scale版的相关代码,可以参考Elastic官方文档进行实际的演练。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 应用AI芯片加速 Hadoop 3.0 纠删码的计算性能

    在保证可靠性的前提下如何提高存储利用率已成为当前 DFS 应用的主要问题之一。

    ethanzhang
  • 腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

    腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

    ethanzhang
  • Hadoop/Spark读写ES之性能调优

    腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

    ethanzhang
  • 重磅发布 | 基于Spark训练线性回归模型 实战入门教程

    最开始接触分布式计算框架的是Hadoop中的MapReduce,虽然开发起来很复杂(Map与Reduce都要有相应的实现类)但是我也成功的启动了第一个“Hell...

    double
  • spark2.0.1安装部署及使用jdbc连接基于hive的sparksql

    复制一份spark-env.sh.template,改名为spark-env.sh。然后编辑spark-env.sh

    尚浩宇
  • Win7 Eclipse 搭建spark java1.8编译环境,JavaRDD的helloworld例子

    Win7 Eclipse 搭建spark java1.8编译环境,JavaRDD的helloworld例子:

    马克java社区
  • 使用aop加解密http接口

    最近在写一个小程序接口,由于安全性比较高,因此需要给请求参数和响应进行加密处理。如果在每个方法上都加密解密,那样代码就显得太繁琐了而且工作量会加大。所以,我们会...

    Java旅途
  • 在线音乐社区研究报告:跟你分享歌单的都是什么人?

    导读:音乐社区已成趋势,年轻人的心思谁来猜?极光JIGUANG发布《国内在线音乐社区研究报告》,从市场概况、行业分析、音乐社区代表及未来发展等多层面解读音乐社区...

    CDA数据分析师
  • 大数据生涯感悟

      不知不觉,毕业一年半了,从实习开始接触大数据技术。那时懵懂的我,不对,应该说懵逼的我在想,卧槽,这是啥这么牛逼,我都不会啊。。。啥都不会完蛋了。。即便现在也...

    用户3003813
  • ViewPager做出广告轮播特效

    Xiaolei123

扫码关注云+社区

领取腾讯云代金券