前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【ES三周年】ElasticSearch 简要技术总结与Spark结合使用实践

【ES三周年】ElasticSearch 简要技术总结与Spark结合使用实践

原创
作者头像
大鹅
发布2023-02-14 16:03:23
1.6K0
发布2023-02-14 16:03:23
举报

ElasticSearch 简要技术总结

1. 总览

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。它是一个实时的分布式搜索和分析引擎。它可以帮助你用几秒钟内搜索百万级别的数据。

ES是高度可伸缩的开源全文搜索和分析引擎。它可以实时地存储、搜索和分析大容量的数据。通常用作底层引擎/技术力量有复杂的搜索功能和需求的应用程序。

这是一些典型的应用场景:

  1. 在线网上商店允许客户搜索销售的产品。在这种情况下,可以使用ElasticSearch存储整个产品目录和库存,并为它们提供搜索和自动填充建议。
  2. 希望收集日志或交易数据,并且希望分析和挖掘此数据以查找趋势,统计信息,摘要或异常。在这种情况下,可以使用Logstash(Elasticsearch / Logstash / Kibana堆栈的一部分)来收集,聚合和解析数据,然后让Logstash将此数据提供给Elasticsearch。一旦数据在ElasticSearch中,就可以运行搜索和聚合来挖掘您感兴趣的任何信息。
  3. 运行价格警报平台,允许精通价格的客户指定一条规则,例如“我有兴趣购买特定的电子产品,如果小工具的价格在下个月内从任何供应商降至X美元以下,我希望收到通知” 。在这种情况下,可以刮取供应商价格,将其推入ElasticSearch并使用其反向搜索(Percolator)功能来匹配价格变动与客户查询,并最终在发现匹配后将警报推送给客户。
  4. 有分析/业务智能需求,并希望快速调查,分析,可视化并询问有关大量数据的特定问题(数百万或数十亿条记录)。在这种情况下,可以使用ElasticSearch存储数据,然后使用Kibana(Elasticsearch / Logstash / Kibana堆栈的一部分)构建自定义仪表板,以便可视化重要的数据。此外,可以使用ElasticSearch聚合功能对数据执行复杂的商业智能查询。

2. 基本概念

2.1 Node 与 Cluster

Elastic 本质上是一个分布式数据库,允许多台服务器协同工作,每台服务器可以运行多个 Elastic 实例。

单个 Elastic 实例称为一个节点(node)。一组节点构成一个集群(cluster)。

2.2 Index

Elastic 会索引所有字段,经过处理后写入一个反向索引(Inverted Index)。查找数据的时候,直接查找该索引。

所以,Elastic 数据管理的顶层单位就叫做 Index(索引)。它是单个数据库的同义词。每个 Index (即数据库)的名字必须是小写。

下面的命令可以查看当前节点的所有 Index。

代码语言:txt
复制
 $ curl -X GET 'http://localhost:9200/_cat/indices?v'

事实上,我们的数据被存储和索引在分片(shards)中,索引只是一个把一个或多个分片分组在一起的逻辑空间。然而,这只是一些内部细节——我们的程序完全不用关心分片。对于我们的程序而言,文档存储在索引(index)中。剩下的细节由Elasticsearch关心既可。

2.3 Document

Index 里面单条的记录称为 Document(文档)。许多条 Document 构成了一个 Index。

Document 使用 JSON 格式表示,下面是一个例子。

代码语言:txt
复制
 {
   "user": "张三",
   "title": "工程师",
   "desc": "数据库管理"
 }

同一个 Index 里面的 Document,不要求有相同的结构(scheme),但是最好保持相同,这样有利于提高搜索效率。

通常,我们可以认为对象(object)文档(document)是等价相通的。不过,他们还是有所差别:对象(Object)是一个JSON结构体——类似于哈希、hashmap、字典或者关联数组;对象(Object)中还可能包含其他对象(Object)。 在Elasticsearch中,文档(document)这个术语有着特殊含义。它特指最顶层结构或者根对象(root object)序列化成的JSON数据(以唯一ID标识并存储于Elasticsearch中)。

2.4 Type(将在ES6.0移除)

Document 可以分组,比如weather这个 Index 里面,可以按城市分组(北京和上海),也可以按气候分组(晴天和雨天)。这种分组就叫做 Type,它是虚拟的逻辑分组,用来过滤 Document。

不同的 Type 应该有相似的结构(schema),举例来说,id字段不能在这个组是字符串,在另一个组是数值。这是与关系型数据库的表的一个区别。性质完全不同的数据(比如productslogs)应该存成两个 Index,而不是一个 Index 里面的两个 Type(虽然可以做到)。

下面的命令可以列出每个 Index 所包含的 Type。

代码语言:txt
复制
 $ curl 'localhost:9200/_mapping?pretty=true'

在应用中,我们使用对象表示一些“事物”,例如一个用户、一篇博客、一个评论,或者一封邮件。每个对象都属于一个类(class),这个类定义了属性或与对象关联的数据。user类的对象可能包含姓名、性别、年龄和Email地址。

在关系型数据库中,我们经常将相同类的对象存储在一个表里,因为它们有着相同的结构。同理,在Elasticsearch中,我们使用相同类型(type)的文档表示相同的“事物”,因为他们的数据结构也是相同的。

每个类型(type)都有自己的映射(mapping)或者结构定义,就像传统数据库表中的列一样。所有类型下的文档被存储在同一个索引下,但是类型的映射(mapping)会告诉Elasticsearch不同的文档如何被索引。

3. 数据操作

3.1 新增记录

向指定的 /Index/Type 发送 PUT 请求,就可以在 Index 里面新增一条记录。比如,向/accounts/person发送请求,就可以新增一条人员记录。

代码语言:txt
复制
 $ curl -X PUT 'localhost:9200/accounts/person/1' -d '
 {
   "user": "张三",
   "title": "工程师",
   "desc": "数据库管理"
 }' 
 

服务器返回的 JSON 对象,会给出 Index、Type、Id、Version 等信息。

代码语言:txt
复制
 {
   "_index":"accounts",
   "_type":"person",
   "_id":"1",
   "_version":1,
   "result":"created",
   "_shards":{"total":2,"successful":1,"failed":0},
   "created":true
 }

如果你仔细看,会发现请求路径是/accounts/person/1,最后的1是该条记录的 Id。它不一定是数字,任意字符串(比如abc)都可以。

新增记录的时候,也可以不指定 Id,这时要改成 POST 请求。

代码语言:txt
复制
 $ curl -X POST 'localhost:9200/accounts/person' -d '
 {
   "user": "李四",
   "title": "工程师",
   "desc": "系统管理"
 }'

上面代码中,向/accounts/person发出一个 POST 请求,添加一个记录。这时,服务器返回的 JSON 对象里面,_id字段就是一个随机字符串。

代码语言:txt
复制
 {
  "_index":"accounts",
   "_type":"person",
   "_id":"AV3qGfrC6jMbsbXb6k1p",
   "_version":1,
   "result":"created",
   "_shards":{"total":2,"successful":1,"failed":0},
   "created":true
 }

如果没有先创建 Index(这个例子是accounts),直接执行上面的命令,Elastic 也不会报错,而是直接生成指定的 Index

3.2 查看记录

/Index/Type/Id发出 GET 请求,就可以查看这条记录。

代码语言:txt
复制
 $ curl 'localhost:9200/accounts/person/1?pretty=true'

上面代码请求查看/accounts/person/1这条记录,URL 的参数pretty=true表示以易读的格式返回。

返回的数据中,found字段表示查询成功,_source字段返回原始记录。

代码语言:txt
复制
 {
   "_index" : "accounts",
   "_type" : "person",
   "_id" : "1",
   "_version" : 1,
   "found" : true,
   "_source" : {
     "user" : "张三",
     "title" : "工程师",
     "desc" : "数据库管理"
   }
 } 

如果 Id 不正确,就查不到数据,found字段就是false

代码语言:txt
复制
 $ curl 'localhost:9200/weather/beijing/abc?pretty=true'
 {
   "_index" : "accounts",
   "_type" : "person",
   "_id" : "abc",
   "found" : false
 } 
3.3 删除记录

删除记录就是发出 DELETE 请求

代码语言:txt
复制
 $ curl -X DELETE 'localhost:9200/accounts/person/1'
3.4 更新记录

更新记录就是使用 PUT 请求,重新发送一次数据

代码语言:txt
复制
 $ curl -X PUT 'localhost:9200/accounts/person/1' -d '
 {
     "user" : "张三",
     "title" : "工程师",
     "desc" : "数据库管理,软件开发"
 }' 
 
 {
   "_index":"accounts",
   "_type":"person",
   "_id":"1",
   "_version":2,
   "result":"updated",
   "_shards":{"total":2,"successful":1,"failed":0},
   "created":false
 }

上面代码中,我们将原始数据从"数据库管理"改成"数据库管理,软件开发"。 返回结果里面,有几个字段发生了变化

可以看到,记录的 Id 没变,但是版本(version)从1变成2,操作类型(result)从created变成updatedcreated字段变成false,因为这次不是新建记录

Elasticsearch是一个分布式系统。当documents被创建、更新或者删除,其新版本会被复制到集群的其它节点。Elasticsearch既是异步的(asynchronous )也是同步的(concurrent),其含义是复制请求都是并行发送的,但是到达目的地的顺序是无序的。Elasticsearch系统需要一种方法使得老版本的文档永远都无法覆盖新的版本。

每当文档被改变的时候,文档中的_version将会被增加(+1)。Elasticsearch使用_version确保所有的修改都会按照正确的顺序执行。如果文档旧的版本在新的版本之后到达,它会被简单的忽略。

4. 数据查询

4.1 返回所有记录

使用 GET 方法,直接请求/Index/Type/_search,就会返回所有记录。

代码语言:txt
复制
 
 $ curl 'localhost:9200/accounts/person/_search'
 
 {
   "took":2,
   "timed_out":false,
   "_shards":{"total":5,"successful":5,"failed":0},
   "hits":{
     "total":2,
     "max_score":1.0,
     "hits":[
       {
         "_index":"accounts",
         "_type":"person",
         "_id":"AV3qGfrC6jMbsbXb6k1p",
         "_score":1.0,
         "_source": {
           "user": "李四",
           "title": "工程师",
           "desc": "系统管理"
         }
       },
       {
         "_index":"accounts",
         "_type":"person",
         "_id":"1",
         "_score":1.0,
         "_source": {
           "user" : "张三",
           "title" : "工程师",
           "desc" : "数据库管理,软件开发"
         }
       }
     ]
   }
 }
 

上面代码中,返回结果的 took字段表示该操作的耗时(单位为毫秒),timed_out字段表示是否超时,hits字段表示命中的记录,里面子字段的含义如下。

  • total:返回记录数,本例是2条。
  • max_score:最高的匹配程度,本例是1.0
  • hits:返回的记录组成的数组。

返回的记录中,每条记录都有一个_score字段,表示匹配的程序,默认是按照这个字段降序排列。

4.2 全文搜索

Elastic 的查询非常特别,使用自己的查询语法,要求 GET 请求带有数据体

代码语言:txt
复制
 
 $ curl 'localhost:9200/accounts/person/_search'  -d '
 {
   "query" : { "match" : { "desc" : "软件" }}
 }'
 

上面代码使用 Match 查询,指定的匹配条件是desc字段里面包含"软件"这个词,返回结果如下

代码语言:txt
复制
 
 {
   "took":3,
   "timed_out":false,
   "_shards":{"total":5,"successful":5,"failed":0},
   "hits":{
     "total":1,
     "max_score":0.28582606,
     "hits":[
       {
         "_index":"accounts",
         "_type":"person",
         "_id":"1",
         "_score":0.28582606,
         "_source": {
           "user" : "张三",
           "title" : "工程师",
           "desc" : "数据库管理,软件开发"
         }
       }
     ]
   }
 }
 

Elastic 默认一次返回10条结果,可以通过size字段改变这个设置。

代码语言:txt
复制
 
 $ curl 'localhost:9200/accounts/person/_search'  -d '
 {
   "query" : { "match" : { "desc" : "管理" }},
   "size": 1
 }'
 

上面代码指定,每次只返回一条结果。

还可以通过from字段,指定位移。

代码语言:txt
复制
 
 $ curl 'localhost:9200/accounts/person/_search'  -d '
 {
   "query" : { "match" : { "desc" : "管理" }},
   "from": 1,
   "size": 1
 }'
 

上面代码指定,从位置1开始(默认是从位置0开始),只返回一条结果。

4.3 逻辑运算

如果有多个搜索关键字, Elastic 认为它们是or关系

代码语言:txt
复制
 
 $ curl 'localhost:9200/accounts/person/_search'  -d '
 {
   "query" : { "match" : { "desc" : "软件 系统" }}
 }'
 

上面代码搜索的是软件 or 系统

如果要执行多个关键词的and搜索,必须使用布尔查询

代码语言:txt
复制
 
 $ curl 'localhost:9200/accounts/person/_search'  -d '
 {
   "query": {
     "bool": {
       "must": [
         { "match": { "desc": "软件" } },
         { "match": { "desc": "系统" } }
       ]
     }
   }
 }'

5. ES与Spark整合

5.1 Maven配置

引入对应依赖

代码语言:txt
复制
<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-13_2.10</artifactId>
  <version>5.0.1</version>
</dependency>
5.2 写入Map对象
代码语言:java
复制
// 1
import org.apache.spark.api.java.JavaSparkContext;                              
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
// 2
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;                        
...
// 3
SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              
// 4
Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
// 5
JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
// 6
JavaEsSpark.saveToEs(javaRDD, "spark/docs");      

下面是代码解释:

步骤

具体含义

1

Spark imports

2

elasticsearch-hadoop imports

3

运行Spark

4

使用了GuavaImmutable* 方法简化 Map, List 的创建

5

创建 RDD

6

保存到ES中,Index为spark/docs

5.3 写入JSON对象

我们可以直接将Json字符串写入到ElasticSearch中,如下:

代码语言:java
复制
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

JavaSparkContext jsc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");             
5.4 Spark Streaming 写入数据

Java有一个专用的类,它提供与EsSparkStreaming类似的功能,即包org.elasticsearch.spark.streaming.api.java中的JavaEsSparkStreaming(类似于Spark的Java API的包):

代码语言:java
复制
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;                                              
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;         
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              
JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1));

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
microbatches.add(javaRDD);                                                      
JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);

JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs");                       

jssc.start()                                                                    
5.5 读取数据

Java有一个专用的JavaPairRDD,返回的Tuple2值(或第二个元素)将文档作为java.util集合返回。

代码语言:java
复制
import org.apache.spark.api.java.JavaSparkContext;               
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;             
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);               

JavaPairRDD<String, Map<String, Object>> esRDD =
                        JavaEsSpark.esRDD(jsc, "radio/artists"); 
5.6 其他操作

我们还可以将JavaBean 或者Spark SQL中的DataFrame存入到ES中,具体可以参考https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#CO47-3

Ref

1 https://www.elastic.co/guide/en/elasticsearch/reference/6.0/_basic_concepts.html (官方文档)

2 http://www.ruanyifeng.com/blog/2017/08/elasticsearch.html (阮一峰写的简要教程)

3 https://www.elastic.co/guide/cn/elasticsearch/guide/cn/index.html (中文文档,较旧)

4https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html (ES与Spark整合 官方文档)

5https://www.iteblog.com/archives/1728.html (Spark+ES 实践博客)

6http://wiki.jikexueyuan.com/project/elasticsearch-definitive-guide-cn/ (极客学院中文教程)

7https://www.iteblog.com/archives/1741.html (ElasticSearch查询例子)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ElasticSearch 简要技术总结
    • 1. 总览
      • 2. 基本概念
        • 2.1 Node 与 Cluster
        • 2.2 Index
        • 2.3 Document
        • 2.4 Type(将在ES6.0移除)
      • 3. 数据操作
        • 3.1 新增记录
        • 3.2 查看记录
        • 3.3 删除记录
        • 3.4 更新记录
      • 4. 数据查询
        • 4.1 返回所有记录
        • 4.2 全文搜索
        • 4.3 逻辑运算
      • 5. ES与Spark整合
        • 5.1 Maven配置
        • 5.2 写入Map对象
        • 5.3 写入JSON对象
        • 5.4 Spark Streaming 写入数据
        • 5.5 读取数据
        • 5.6 其他操作
      • Ref
      相关产品与服务
      Elasticsearch Service
      腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档