@toc
最近在Elastic官网blog的这篇文章引起了许多人的注意——Investigative analysis of disjointed data in Elasticsearch with the Siren Platform (利用Siren平台对Elasticsearch中的非联接(disjointed)数据进行调查分析)
这里的亮点莫过于文章里面提到的对于disjointed
数据的调查分析。
其实对于很多国内的资深的ES玩家来说,Elasticsearch是真真正正的被玩出了花样:
无论是互联网企业还是各种金融机构,都会把Elasticsearch用在BI分析、多维分析、大数据流处理等等国外ES玩家较少涉足的场景。通过结合Elasticsearch本身首屈一指的全文检索和模糊查询能力、结构化和非结构化数据的处理能力,灵活的字段组合查询能力、以及丰富的数据聚合功能、再加上大数据和高并发的支持,使得Elasticsearch在大数据多维分析,OLAP等场景应用越来越广泛。
但这里,当我们把Elasticsearch作为一个数据仓库或者一个数据湖去使用的时候,不同数据源的join关联分析,往往会是我们使用Elasticsearch时候的一大痛点。
基于Elasticsearch分布式集群的特性,其实是很难做到一个高效且可扩展的分布式join操作的,所以现在Elasticsearch只支持join type
和 nested type
两种有限的join场景,并且这两种join的效率都不是特别高(nested type
查询性能比标准查询的慢5~10倍, join type
则是慢了10 ~ 100倍,参见tune-for-search-speed),甚至对于join type
还存在限制parent和child必须在一个分片上面的限制。
因此,通常情况下,我们得到的建议都是做数据的denormalize(反规范化),做一张大宽表,把所有的字段都放置在一张'表'里面,以获得对查询和聚合的性能。但其实这种大宽表带来的问题也是明显的,当我们要更新某个字段值时,需要重新索引整个文档,其带来的痛点包括:
因此,很多用户还是非常希望Elasticsearch有一个更好的实现数据联合分析的能力的,包括:
但可惜的是,这里没有银弹,在分布式系统的前提下,我们还是需要在不同的场景中,权衡性能、存储成本、数据更新成本之间的关系,选择一个局部最优解来作为解决方案。
基于上面的原因,也难怪大家会对这个Siren Federate 插件比较感兴趣了。大家可以查看Siren Federate的官网简单了解。
他们是这样描述的:
Siren Federate插件通过以下主要功能扩展了Elasticsearch:
这里面,我觉得大家最感兴趣的应该是两点:
在这篇文章里面,我们带大家初探 Siren Federate
安装相对简单,和普通的插件安装是一样的。从官网下载对应版本的安装包,使用elasticsearch-plugin
进行安装
$ ./bin/elasticsearch-plugin install file:///PATH-TO-SIREN-FEDERATE-PLUGIN/siren-federate-7.9.3-21.2-proguard-plugin.zip
-> Downloading file:///PATH-TO-SIREN-FEDERATE-PLUGIN/siren-federate-7.9.3-21.2-proguard-plugin.zip
[=================================================] 100%
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@ WARNING: plugin requires additional permissions @
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
* java.io.FilePermission cloudera.properties read
* java.io.FilePermission simba.properties read
* java.lang.RuntimePermission accessClassInPackage.sun.misc
* java.lang.RuntimePermission accessClassInPackage.sun.misc.*
* java.lang.RuntimePermission accessClassInPackage.sun.security.provider
* java.lang.RuntimePermission accessDeclaredMembers
* java.lang.RuntimePermission createClassLoader
* java.lang.RuntimePermission getClassLoader
...
See http://docs.oracle.com/javase/8/docs/technotes/guides/security/permissions.html
for descriptions of what these permissions allow and the associated risks.
Continue with installation? [y/N]y
-> Installed siren-federate
重新启动ES,可看到如下的日志,表明Siren Federate加载成功
[2017-04-11T10:42:02,209][INFO ][o.e.p.PluginsService ] [etZuTTn] loaded plugin [siren-federate]
也可以通过调用siren/<index>/_search
相关接口,确认
这个是比较有意思的功能,我们还是先来看官方关于该功能的描述:
Siren Federate提供了一个名为
connector
的模块,该模块透明地将数据表从外部数据库系统映射到Elasticsearch中的“虚拟索引”。该connector
的API允许你把外部数据库系统注册为数据源。目前支持两种类型的数据源:JDBC和Elasticsearch。注册数据源后,可以将外部数据库系统中的表映射到虚拟索引。使用Elasticsearch API将请求发送到虚拟索引时,例如Mapping或Search API,该请求被connector
模块拦截。该请求将转换为外部数据库方言,并针对外部数据库执行。外部数据库的结果将映射到Elasticsearch方言,并作为Elasticsearch响应返回。
这里的实际意义在于,如果能在ES里面直接将外部的数据表映射为虚拟索引,我们就可以实现跨数据源的分析。
目前Siren Federate支持的外部数据源如下:
Name | JDBC class |
---|---|
PostgreSQL | org.postgresql.Driver |
MySQL | com.mysql.jdbc.Driver |
Oracle 12c+ | oracle.jdbc.OracleDriver |
Spark SQL 2.2+ | com.simba.spark.jdbc41.Driver |
Neo4j | org.neo4j.jdbc.http.HttpDriver |
接下来,我们添加一个MySQL数据源试试。首先,我们要通过API创建一个MySQL数据源。
要配置JDBC数据源,需要完成以下步骤:
elasticsearch.yml
文件并添加以下设置:
node.attr.connector.jdbc:true
├── jdbc-drivers
│ ├── mysql
│ │ └── mysql-connector-java-8.0.21.jar
│ └── pg
│ └── postgresql-42.2.18.jarelasticsearch/config/
或/etc/elasticsearch/
中创建该目录。这里最好是按照每个数据库再创建一个子目录通过Connector
相关的API进行相关数据源对象的创建,分别是Datasource API
和Virtual index API
。
首先使用Datasource API
来创建数据源对象,_siren/connector/datasource/<id>
:
PUT _siren/connector/datasource/mysql
{
"jdbc": {
"username": "root",
"password": "changeme",
"driver": "com.mysql.jdbc.Driver",
"url": "jdbc:mysql://cdb-bibxdjok.cd.tencentcdb.com:10130/dev?useLegacyDatetimeCode=false",
"properties": {
"ssl": true
}
}
}
注意这里需要填入的参数,特别是driver
和url
。我们可以通过GET _siren/connector/datasource
来查看输入规范:
{
"Dremio" : {
"driverClassName" : "com.dremio.jdbc.Driver",
"defaultURL" : "jdbc:dremio:direct={{host}}:{{port}}{{databasename}}",
"defaultPort" : 31010,
"validationQuery" : "SELECT 1 AS N",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="https://docs.dremio.com/drivers/dremio-jdbc-driver.html">Dremio JDBC documentation</a> for further information.""",
"virtualIndexSupported" : true,
"ingestionSupported" : true
},
"Impala" : {
"driverClassName" : "com.cloudera.impala.jdbc41.Driver",
"defaultURL" : "jdbc:impala://{{host}}:{{port}}/default;UseNativeQuery=1",
"defaultPort" : 21050,
"validationQuery" : "SELECT 1 AS N",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_jdbc.html">Impala JDBC documentation</a> for further information.""",
"virtualIndexSupported" : true,
"ingestionSupported" : true
},
"MySQL" : {
"driverClassName" : "com.mysql.jdbc.Driver",
"defaultURL" : "jdbc:mysql://{{host}}:{{port}}{{databasename}}?useLegacyDatetimeCode=false&useCursorFetch=true",
"defaultPort" : 3306,
"validationQuery" : "SELECT 1 AS N",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference.html">MySQL Connector/J documentation</a> for further information.""",
"virtualIndexSupported" : true,
"ingestionSupported" : true
},
"PostgreSQL" : {
"driverClassName" : "org.postgresql.Driver",
"defaultURL" : "jdbc:postgresql://{{host}}:{{port}}{{databasename}}",
"defaultPort" : 5432,
"validationQuery" : "SELECT 1 AS N",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="https://jdbc.postgresql.org/documentation/94/connect.html">PostgreSQL JDBC documentation</a> for further information.""",
"virtualIndexSupported" : true,
"ingestionSupported" : true
},
"SQL Server 2017" : {
"driverClassName" : "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"defaultURL" : "jdbc:sqlserver://{{host}}:{{port}}{{databasename}}",
"defaultPort" : 1433,
"validationQuery" : "SELECT 1 AS N",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="https://docs.microsoft.com/en-us/sql/connect/jdbc/building-the-connection-url">SQL Server JDBC documentation</a> for further information.""",
"virtualIndexSupported" : true,
"ingestionSupported" : true
},
"SAP ASE 15.7" : {
"driverClassName" : "net.sourceforge.jtds.jdbc.Driver",
"defaultURL" : "jdbc:jtds:sybase://{{host}}:{{port}}{{databasename}}",
"defaultPort" : 5000,
"validationQuery" : "SELECT 1 AS N",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="http://jtds.sourceforge.net/faq.html#urlFormat">jTDS documentation</a> for further information.If your setup is using the jConnect JDBC Driver, see the <a target ="_blank" rel="noopener noreferrer" href="https://wiki.scn.sap.com/wiki/display/SYBCON/jConnect+Driver+Overview">jConnect documentation</a> for further information.""",
"virtualIndexSupported" : true,
"ingestionSupported" : true
},
"Oracle Database 12c" : {
"driverClassName" : "oracle.jdbc.OracleDriver",
"defaultURL" : "jdbc:oracle:thin:@//{{host}}:{{port}}{{databasename}}",
"defaultPort" : 1521,
"validationQuery" : "SELECT 1 AS N FROM DUAL",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="https://docs.oracle.com/cd/E11882_01/java.112/e16548/urls.htm#JJDBC08200">Oracle JDBC documentation</a> for further information.""",
"virtualIndexSupported" : true,
"ingestionSupported" : true
},
"Spark SQL 2.2" : {
"driverClassName" : "com.simba.spark.jdbc41.Driver",
"defaultURL" : "jdbc:spark://{{host}}:{{port}}{{databasename}};UseNativeQuery=1",
"defaultPort" : 10000,
"validationQuery" : "SELECT 1 AS N",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="https://www.simba.com/products/Spark/doc/JDBC_InstallGuide/content/jdbc/sp/using/connectionurl.htm">Simba Spark JDBC documentation</a> for further information.""",
"virtualIndexSupported" : true,
"ingestionSupported" : true
},
"Presto" : {
"driverClassName" : "com.facebook.presto.jdbc.PrestoDriver",
"defaultURL" : "jdbc:presto://{{host}}:{{port}}{{databasename}}",
"defaultPort" : 8080,
"validationQuery" : "SELECT 1 AS N",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="https://prestodb.io/docs/current/installation/jdbc.html">Presto JDBC documentation</a> for further information.""",
"virtualIndexSupported" : true,
"ingestionSupported" : true
},
"Neo4j" : {
"driverClassName" : "org.neo4j.jdbc.http.HttpDriver",
"defaultURL" : "jdbc:neo4j:http://{{host}}:{{port}}",
"defaultPort" : 7474,
"validationQuery" : "START n=NODE(*) MATCH (n) RETURN COUNT(n)",
"disclaimer" : """This is a sample connection string, see the <a target="_blank" rel="noopener noreferrer" href="http://neo4j-contrib.github.io/neo4j-jdbc/">Neo4j JDBC Driver Documentation</a> for further information.""",
"virtualIndexSupported" : false,
"ingestionSupported" : true
}
}
我们可以通过_siren/connector/datasource/<id>/_validate
API来确认数据源是否创建有效:
POST _siren/connector/datasource/mysql/_validate
结果:
{
"_id" : "mysql",
"found" : true,
"valid" : true
}
在实际体验中,这种远程链接还是会因为网络不稳定的情况而出现失败。
当完成数据源的创建和校验后,我们需要将该数据源映射到虚拟索引上。
PUT /_siren/connector/index/virtual_category
{
"datasource": "mysql",
"resource": "Categories"
}
virtual_category
是我们创建的虚拟索引的名字。 datasource
是我们之前创建的MySQL数据resource
是我们需要指定的数据库
Categories
数据表。然后我们可以像访问然后我们可以像访问普通索引一样访问这个虚拟索引:
这个虚拟索引和普通索引看起来区别也不大,我们通过这个虚拟索引,直接读取到了MySQL上面的数据。
但最主要的问题有两个:
Siren Federate支持对虚拟索引的以下操作:
目前,不支持涉及虚拟索引和普通Elasticsearch索引混合的搜索请求(例如使用通配符时),并且会被拒绝;但是可以发出包含普通Elasticsearch索引和虚拟索引请求的msearch请求。
在创建虚拟索引时,插件会创建一个空的Elasticsearch索引,以实现与Search Guard和Elastic X-Pack的互操作性;如果已经存在一个与虚拟索引同名的Elasticsearch索引,并且它不是空的,那么虚拟索引的创建将失败。
删除虚拟索引时,对应的Elasticsearch索引不会被删除。
该插件将JDBC类型转换为其最接近的Elasticsearch对应项:
目前,Siren Federate不支持特定于数据源的全文本搜索功能,因此所有这些查询都是特定于keyword字段的
Siren Federate的join
本质上类似于Elasticsearch的join
类型的parent-child功能:它们在查询时执行连接。但是,它们之间存在重要区别:
join
模型需要将数据模型反规范化为分层形式。这限制了数据建模的灵活性,并可能导致数据冗余。Siren Federate没有此数据建模约束,可以进行数据归一化。没有“一刀切”的解决方案,您需要充分了解自己的要求才能选择最合适的解决方案。作为基本规则,如果您的数据模型和数据关系是纯分层的(或可以映射到纯分层的模型),则父子模型可能更合适。另一方面,如果您需要查询数据关系的两个方向,则Siren Federate连接可能更合适。
Siren Federate使用join查询子句扩展了Elasticsearch Query DSL,该子句允许基于联接条件在两组文档之间执行联接操作。要创建复杂的查询计划,您可以使用布尔运算符(例如,AND
,OR
或NOT
),并自由组合和嵌套多个join查询子句。
连接条件基于两个字段之间的相等运算符,当文档具有指定字段的相等值时,将满足连接条件。这两个字段必须具有相同的数据类型。支持数字和文本字段。
Siren Federate当前支持两种类型的联接操作:(左)半联接(semi join)和内部联接(inner join)。join操作是在内存中的分布式计算层之上实现的,该层随群集中可用节点的数量而横向扩展。join操作也可以纵向扩展——根据计算机中可用的CPU内核数量进行扩展。
在执行join操作期间,来自文档的投影字段会在网络上洗牌(shuffle) 并存储在内存中。投影的字段使用Apache Arrow以列格式编码,并存储在堆外内存中,因此减少了其对堆内存的影响。
semi join的主要场景是基于第二组文档B筛选第一组文档A。两套文档A和B之间的semi join返回满足联接条件的A的文档,带有B的文档。这等效于SQL中的EXISTS()
运算符。
semi join用于根据第二组文档B来筛选一组文档A,A和B两组文档之间的semi join将返回A中满足连接条件的文档(使用B文档的过滤条件),这相当于SQL中的EXISTS()运算符。
举例,假设有以下的索引(article, company)和数据:
$ curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/article'
$ curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/article/_mapping' -d '
{
"properties": {
"mentions": {
"type": "keyword"
}
}
}
'
$ curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/company'
$ curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/company/_mapping' -d '
{
"properties": {
"id": {
"type": "keyword"
}
}
}
'
$ curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_bulk?pretty&refresh=true' -d '
{ "index" : { "_index" : "article", "_id" : "1" } }
{ "title" : "The NoSQL database glut", "mentions" : ["1", "2"] }
{ "index" : { "_index" : "article", "_id" : "2" } }
{ "title" : "Graph Databases Seen Connecting the Dots", "mentions" : [] }
{ "index" : { "_index" : "article", "_id" : "3" } }
{ "title" : "How to determine which NoSQL DBMS best fits your needs", "mentions" : ["2", "4"] }
{ "index" : { "_index" : "article", "_id" : "4" } }
{ "title" : "MapR ships Apache Drill", "mentions" : ["4"] }
{ "index" : { "_index" : "company", "_id" : "1" } }
{ "id": "1", "name" : "Elastic" }
{ "index" : { "_index" : "company", "_id" : "2" } }
{ "id": "2", "name" : "Orient Technologies" }
{ "index" : { "_index" : "company", "_id" : "3" } }
{ "id": "3", "name" : "Cloudera" }
{ "index" : { "_index" : "company", "_id" : "4" } }
{ "id": "4", "name" : "MapR" }
在这里
article
(文章)里面包含了title
(文章的名字),以及mention
(提到的公司)company
(公司)里面包含了id
和name
(公司的名字)如果这时候我们想查询提到了特定公司名字的文章,我们是需要联合这两个数据源来进行查询的。我们article
把看作是A文档,company
看做是B文档,结合我们上面提到的semi join的概念,可以通过以下方式进行查询:
$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/article/_search?pretty' -d '{
"query" : {
"join" : {
"indices" : ["company"],
"on" : ["mentions", "id"],
"request" : {
"query" : {
"term" : {
"name" : "orient"
}
}
}
}
}
}'
左边是 article
, 通过搜索API指定: /siren/article/_search
右边是company
,通过body里面的"indices"
指定,query的条件是——名字包含有orient
的公司,并通过符合这个条件的公司的id去筛选对应的文章 "on" : ["mentions", "id"],
通过以上的方式,我们可以得到以下结果:
{
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [ {
"_index" : "article",
"_id" : "1",
"_score" : 1.0,
"_source":{ "title" : "The NoSQL database glut", "mentions" : ["1", "2"] }
}, {
"_index" : "article",
"_id" : "3",
"_score" : 1.0,
"_source":{ "title" : "How to determine which NoSQL DBMS best fits your needs", "mentions" : ["2", "4"] }
} ]
}
}
公司{ "id": "2", "name" : "Orient Technologies" }
名字中包含了Orient
,有两个文章提到了'2'。
这种semi join的优势在于,我们在没有使用大宽表的情况下,可以关联两张数据表进行关联分析。寻找里面基于特定字段的交集。这个在安全分析等场景非常的有用。
inner join使任意字段(包括脚本字段和文档的分数)从一组文档B中 "投射 "出来,并与一组文档A "结合"。join的结果是由文档集B的投影字段增强的文档集A。
当需要在位于多个数据源中的许多不同记录上实现视图时,这种内连接非常有用。
在日志分析、网络威胁检查和情报调查中,拥有关于特定实体的多样化记录事件是很常见的,这些事件分布在多个数据源中。例如,一个用户可以与一个或多个会话相关联,一个会话可以与一个或多个事件相关联,如登录、注销、未经授权的操作等。从一组不同的记录中很难回答诸如 "找到所有在t时间登录的用户 "或 "找到所有显示不正常在线活动的用户 "这样的问题。
在这种情况下,内联接可以实现多个事件的收集,并将其归纳到一个特定的上下文中,以便进一步分析。
举例,假设我们有两个索引company
和people
$ curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/_bulk?pretty' -d '
{ "index" : { "_index" : "company", "_type" : "company", "_id" : "1" } }
{ "id": 1, "name" : "Acme" }
{ "index" : { "_index" : "company", "_type" : "company", "_id" : "2" } }
{ "id": 2, "name" : "Bueno" }
{ "index" : { "_index" : "people", "_type" : "person", "_id" : "1" } }
{ "id" : 1, "name" : "Alice", "age" : 31, "gender" : "Female", "employed_by" : 1 }
{ "index" : { "_index" : "people", "_type" : "person", "_id" : "2" } }
{ "id" : 2, "name" : "Bob", "age" : 42, "gender" : "Male", "employed_by" : 2 }
{ "index" : { "_index" : "people", "_type" : "person", "_id" : "3" } }
{ "id" : 3, "name" : "Carol", "age" : 26, "gender" : "Female", "employed_by" : 1 }
'
假设将这两个索引进行join,来检索包含所有员工年龄的公司列表。可以使用下面的请求:
$ curl -H 'Content-Type: application/json' 'http://localhost:9200/siren/company/_search?pretty' -d '{
"query" : {
"join" : {
"indices" : ["people"],
"on" : ["id", "employed_by"],
"request" : {
"project" : [
{ "field" : { "name" : "age", "alias" : "employee_age" } }
],
"query" : {
"match_all" : {}
}
}
}
},
"script_fields" : {
"employees_age" : {
"script" : "doc.employee_age"
}
}
}'
可得到如下结果:
{
"hits" : {
"total" : 2,
"max_score" : 0.0,
"hits" : [
{
"_index" : "company",
"_type" : "company",
"_id" : "2",
"_score" : 0.0,
"fields" : {
"employees_age" : [
42
]
}
},
{
"_index" : "company",
"_type" : "company",
"_id" : "1",
"_score" : 0.0,
"fields" : {
"employees_age" : [
26,
31
]
}
}
]
}
}
inner join把两个所有的数据结合的起来。得到一个组合值。
Siren Federate的分布式join有以下特点或限制:
这功能听起来也是比较有趣的。但很不幸的是,官网上没有提及相关的用法。在实际测试中,也是不支持的。
可以运行,但因为虚拟索引不支持script,所以字段映射(project)是不支持的
Siren Federate提供了这些功能。无论是join,还是外部数据源,实际上都是有性能约束的。并且在实际的应用中,有非常多的限制。这个插件实际上只能使用在他们自家的平台上面(Siren Platform),并且是高度定制化的。只适用于特定分析场景的(关联分析多,但并发少;数据静止,事后追踪)。
使用下来。给我的感觉是,它在性能上面无法支撑大数据多维分析和OLAP的场景,因为大量缓存数据。也不适合实时流式分析,但在以下场景,还是有其价值的。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。