前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark加载数据到ES

spark加载数据到ES

作者头像
若与
发布2021-03-02 15:16:20
8880
发布2021-03-02 15:16:20
举报

在日常开发中一定会遇到,spark将计算好的数据load到es中,供后端同学查询使用。下面介绍一下spark写es的方式。 使用scala进行演示,对应的java自己google了。

spark写es需要使用到 对应的包es包。maven配置如下

代码语言:javascript
复制
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>7.0.0</version>
        </dependency>

使用MAP方式

代码如下

代码语言:javascript
复制
package org.bigdata.es

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

object D01 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("d01").setMaster("local[*]")
    conf.set("es.index.auto.create", "true")

    val sc: SparkContext = new SparkContext(conf)

    // map方式
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
    sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

  }
}

注意: 必须要导入 import org.elasticsearch.spark._, 不然,就没有 saveToEs方法了

下面介绍一下, org.elasticsearch.spark._导入的隐式函数

包对象中隐式函数

在 org.elasticsearch.spark._ 下面的包对象中有 一个隐式函数,将 RDD转成 SparkRDDFunctions

反编译成 java代码如下

代码语言:javascript
复制
package org.bigdata.es;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.runtime.BoxesRunTime;

public final class D01$ {
 public static final D01$ MODULE$;
 
 public void main(String[] args) {
   SparkConf conf = (new SparkConf()).setAppName("d01").setMaster("local[*]");
   conf.set("es.index.auto.create", "true");
   SparkContext sc = new SparkContext(conf);
   (new scala.Tuple2[3])[0] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("one"), BoxesRunTime.boxToInteger(1));
   (new scala.Tuple2[3])[1] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("two"), BoxesRunTime.boxToInteger(2));
   (new scala.Tuple2[3])[2] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("three"), BoxesRunTime.boxToInteger(3));
   Map numbers = (Map)scala.Predef$.MODULE$.Map().apply((Seq)scala.Predef$.MODULE$.wrapRefArray((Object[])new scala.Tuple2[3]));
   (new scala.Tuple2[2])[0] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("arrival"), "Otopeni");
   (new scala.Tuple2[2])[1] = scala.Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(scala.Predef$.MODULE$.ArrowAssoc("SFO"), "San Fran");
   Map airports = (Map)scala.Predef$.MODULE$.Map().apply((Seq)scala.Predef$.MODULE$.wrapRefArray((Object[])new scala.Tuple2[2]));
   (new Map[2])[0] = numbers;
   (new Map[2])[1] = airports;
   org.elasticsearch.spark.package$.MODULE$.sparkRDDFunctions(sc.makeRDD((Seq)scala.collection.Seq$.MODULE$.apply((Seq)scala.Predef$.MODULE$.wrapRefArray((Object[])new Map[2])), sc.makeRDD$default$2(), scala.reflect.ClassTag$.MODULE$.apply(Map.class)), scala.reflect.ClassTag$.MODULE$.apply(Map.class)).saveToEs("spark/docs");
 }
 
 private D01$() {
   MODULE$ = this;
 }
}

再给一下,其他的 写es的代码

使用样例类方式

代码语言:javascript
复制
package org.bigdata.es

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.rdd.EsSpark

object D02 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("d01").setMaster("local[*]")
    conf.set("es.index.auto.create", "true")

    val sc: SparkContext = new SparkContext(conf)
    val upcomingTrip: Trip = Trip("OTP", "SFO")
    val lastWeekTrip: Trip = Trip("MUC", "OTP")

    val rdd: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
  }
}


// define a case class
case class Trip(departure: String, arrival: String)

使用字符串json方式

代码语言:javascript
复制
package org.bigdata.es

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._


object D03 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("d01").setMaster("local[*]")
    conf.set("es.index.auto.create", "true")

    val sc: SparkContext = new SparkContext(conf)

    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""
    sc.makeRDD(Seq(json1, json2)).saveToEs("spark/json-trips")

  }
}

动态index

代码语言:javascript
复制
package org.bigdata.es

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

object D04 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("d01").setMaster("local[*]")
    conf.set("es.index.auto.create", "true")

    val sc: SparkContext = new SparkContext(conf)

    val game = Map(
      "media_type" -> "game",
      "title" -> "FF VI",
      "year" -> "1994")

    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")

    sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")

  }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用MAP方式
  • 使用样例类方式
  • 使用字符串json方式
  • 动态index
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档