学习
实践
活动
专区
工具
TVP
写文章
专栏首页非典型程序猿Elasticsearch数据更新全方位解析
原创

Elasticsearch数据更新全方位解析

导言

“需求是人类进步的阶梯“,这是著名的我们当前项目产品经理在提出最后一次需求时说出的话(为什么是最后一句话, 因为被祭天了)。

前段时间在项目的crm存储部分,为了满足大量自定义的搜索功能,选择了使用了ES作为后端存储介质。期间随着需求的变更对ES存储数据更新方式也多方面进行了了解,本着好记性不如烂笔头,记录在此。

完整更新

需求:每次完全替换crm中某document的所有的数据

方案:通过插入更新

ES API:index

在ES中,通过提供一个document ID,index API可以实现插入一条新的document,或者更新一条已有的document到ES中。至于document存在与否的判断,则是根据给到的document ID来判定。因此,通过index API来对已有的文档实现更新,其实是进行了一次reindex操作从而实现。

下面的操作,是向ES的index crm中插入一条数据document ID为1的数据:

PUT crm/1
{
    "crm_id": 12001,
    "user_name":"王五",
    "age": 30,
    "mobile":"13600000001",
}

我们的开发语言为golang,在go语言中,我们采用了https://github.com/olivere/elastic中的Elastic库来实现与ES的交互,对应的代码如下:

type CrmData struct {
	CrmId     int           `json:"crm_id"`
	UserName  string        `json:"user_name"`
	Age       int           `json:"age"`
	Mobile    string        `json:"mobile"`
}

crmData1 := CrmData{CrmId: 12001, UserName: "王五", Age: 30, Mobile:"13600000001"}

// 添加一份document
indexResult, err := esClient.Index().
		Index("crm").
		Id("1").
		BodyJson(&crmData1).
		Do(context.TODO())
	if err != nil {
		//error code
}

需要注意的是,如上所提到,这种方式的更新存在两个局限性或者说特性:

1. 只能实现对一个document ID的整个内容进行全部的更新,如果需要更新document中的部分field,无法实现;

2. 必须事先知道document的ID*。

*请注意,在大多数ES操作中,若无业务需求,建议不指定document ID,因为在ES中插入数据时,通过指定ID进行index的操作比起使用随机ID的性能要稍微差些。

部分更新

开发完上述需求没两天,将被祭天的产品经理又来了。这次是客户的后端数据更新,希望可以往每一个会员数据中插入一些新的字段。也就是需要我们支持到,通过document ID实现对某文档的部分field更新。不过,兵来将挡水来土掩,问题不大。

需求:更新ES中某document的部分数据

方案:通过update更新

ES API:update

ES中的update API支持到根据用户提供的脚本去实现更新一份document的功能,而脚本语言的支持极大地提升了我们对数据进行更新的灵活性。

举个例子,对于上述插入到ES中的第一条用户crm数据,现在,客户的crm数据更新了会员等级制度,也就是需要对ES中每一条docuemnt数据增加一份会员等级信息(类似的需求诸如:随着新的一年的到来,需要对用户的年龄增加一岁),对此,在ES中可以操作如下:

POST crm/1/_update
{
    "script" : {
        "source": "ctx._source.level=params.level;ctx._source.age+= params.count",
        "lang": "painless",
        "params" : {
            "level": 2
            "count" : 1
        }
    }
}

上述的操作中,首先,我们通过document ID=1来实现对应的document的索引,其次,通过script字段中的内容,来实现对具体field的更新。在update的script字段中:

  • source是将要执行的脚本内容;
  • lang表示的是当前脚本的语言*;
  • param则是脚本执行的参数;

*注:lang支持多种语言,关于本文使用的painless脚本的语言等,不是本文介绍的内容,需要了解的朋友,可以去这儿参考详情:https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-painless.html。

在update API实现的逻辑中,其实可以理解为三步操作:

  • index:根据document ID去索引中获取到对应的document快照信息;
  • update:根据script脚本来更新document;
  • reindex:将更新后的document重新写回到索引;

在上述的第一步和第三步执行时,update操作都会应用到ES内部的version以实现版本控制,从而保证document在更新的过程中没有发生改变。因此,需要注意的是,ES的update API依然是需要对文档做一次完全的reindex操作,而不是直接去修改原始document。但是,update API所能做的是减少了网络交互次数,当然这比起我们自己通过index获取数据并在业务代码中更新再写回到ES来实现,大大的减少了版本冲突的概率。

上面提到,在update 中第一步和第三步执行时,update操作都会应用到ES内部的version以实现版本控制。那么,冲突了我们如何解决呢?

在遇到版本冲突问题时,ES将会返回409 Conflict HTTP错误码。因此,当遇到409后,为了保证数据的最终插入,我们就必须要考虑到retry机制。为了实现冲突后的retry,有两种方案来实现:

1. 业务代码自定义

通过识别409错误,在业务代码中,跟据自己的需求来进行retry。因为是自定义的逻辑,所以我们可以任意的操作retry的回退策略,以及retry的内容等;

2. retry_on_conflict

通过在参数中指定来实现retry_on_conflict来实现,如下所示:

POST crm/_doc/1/_update?retry_on_conflict=3
{
    "script" : {
        "source": "ctx._source.level=params.level;ctx._source.age+= params.count",
        "lang": "painless",
        "params" : {
            "level": 2
            "count" : 1
        }
    }
}

上述的操作,结合retry_on_conflict,当ES的update过程中检测到version发生了变化,则会在之后尝试更新三次,在golang中对应的代码:

update := esClient.Update().
		Index("crm").
		Id("1").
		Script(elastic.NewScript("ctx._source.level=level;ctx._source.age+= params.count").
		Params(map[string]interface{}{"level": 2, "count":1}).
		Lang("painless")).
		RetryOnConflict(3)

根据上面的描述,我们可以发现,通过update API来进行更新的方式,对于那些不依赖执行顺序的更新行为显然更为合适,比如counter累加这类,再依赖于强大脚本语言的支持,使得更新的功能更加丰富。但是,这一切的前提依然依赖于对document ID的已知。

通过搜索更新

当我还沉浸于不费吹灰之力又解决了一个需求的时候,产品经理端了端镜框,站了过来,并慢慢踱步而来。你没猜错,新的需求又来了!这次是来自于产品经理自己的产品升级需求,需要对用户的crm数据做出画像功能,同时对所有crm会员数据进行标签分组。

关于标签功能的设计,不是本文的重点内容,我们不在此花更多的篇幅介绍,简化下来,可以用一句话来说明需求:将年纪介于30-40岁的会员添加一个【高购买力人群】的标签。虽然这句话很短,但是根据我们目前的方案来看,这可就为难了。在没办法知道符合这个年龄段所有人群在crm中的document ID的前提下,我们通过上述两份方案均无法实现,或者换句话说,如果需要知道这些ID,那就需要额外的存储(如MySQL)来记录这些所有的ID信息,而这最后,就会演变成将数据完全的存储到了MySQL中,显然是不可取的。也就是说无论是通过index或者update方案,一开始就是行不通的。这时候,我们就需要第三种更新方式了。

需求:需要将年纪介于30-40岁的会员添加一个【高购买力人群】的标签

方案:通过搜索更新来实现

ES API:update_by_query

update_by_query,顾名思义,这种更新方式,即通过查询再更新。对应上述的需求,为了实现对年龄在30-40之间的会员添加标签,在ES中,我们通过update_by_query中的query和script来实现先查询再更新的机制:

POST crm/_update_by_query?conflicts=proceed
{
  "script": {
    "source": "ctx._source.tag.add(param.tag)",
    "lang": "painless",
    "param":{
      "tag" : 10
    }
  },
  "query": {
    "term": {
       "range" : {
            "age" : {
                "gte" : 30,
                "lte" : 40
            }
       }
    }
  }
}

其中,query字段,表示我们query的条件*,根据该条件,ES将找到对应的document。(*注:ES中query的语法,不是本文讨论的主要内容,将不赘述。)

通过上述命令,ES将首先执行query语句找到对应的document,再根据script中的语法去更新相应的字段(上面示例中假设tag=10表示的是【高购买力人群】)。

细心的朋友会发现,在上述请求中我们还指定的conflicts=proceed参数。当update_by_query执行的时候,也应用了ES内部的version以支持到版本控制 ,也就是说,我们在执行过程可能会出现版本冲突的问题。默认情况下,update_by_query在遇到版本冲突问题时,同样返回409错误码,如果需求场景是不介意版本冲突的,那么可以按照上文那样,通过指定conflicts=proceed,从而当出现版本冲突时,ES将会继续执行更新的操作。至于Retry机制,则可以采用一样的方案来实现。上述介绍的部份,对应在golang中的代码如下:

res, err := esClient.UpdateByQuery("crm").
		Query(elastic.NewRangeQuery("created").Gte(30).Lte(40))).
		Script(elastic.NewScriptInline("ctx._source.tag.add(param.tag)")).
		Param(map[string]interface{}{"tag": 10}).
		ProceedOnVersionConflict().
		Do(context.TODO())
	if err != nil {
		t.Fatal(err)
	}

批量操作

在实际代码运行过程中,我们很有可能面对的场景是批量更新。如果通过for循环来一条条操作自然是可以实现的,但是网络来回导致的性能问题惨不忍睹。这时候我们便需要使用ES的buck AP来实现批量操作。在golang代码中,一个bulk update的操作大概如下所示:

func addTagToEs(es *elastic.Client, esIndex string, tagId int, esIds []string) (err error) {
	var res *elastic.BulkResponse
	bulkReq := es.Bulk()
	for _, esId := range esIds {
	  // 创建bulk的request
		scriptStr := "if (ctx._source.user_tag.contains(params.tagId)==false) { ctx._source.user_tag.add(params.tagId) } "
		updateReq := elastic.NewBulkUpdateRequest().
			Index(esIndex).
			Type(TYPE_POPULATION).
			Id(esId).
			RetryOnConflict(3).
			Script(elastic.NewScriptInline(scriptStr).Param("tagId", tagId))
		bulkReq = bulkReq.Add(updateReq)

		// 执行批量添加
		if bulkReq.NumberOfActions() >= inited.EsBulksize {
			res, err = bulkReq.Do(context.TODO())
			if err != nil {
				log.LoggerWrapperWithCaller().Errorf(err.Error())
				return
			}
			if res.Errors {
				rep := res.Failed()
				for _, v := range rep {
					err = fmt.Errorf("Failed: %v, error msg: %v", *v, *(v.Error))
					log.LoggerWrapperWithCaller().Errorf(err.Error())
				}
				err = fmt.Errorf("bulk insert commit failed")
				log.LoggerWrapperWithCaller().Errorf(err.Error())
				return
			}
		}
	}

	// 将剩余的继续添加
	if bulkReq.NumberOfActions() > 0 {
		res, err = bulkReq.Do(context.TODO())
		if err != nil {
			log.LoggerWrapperWithCaller().Errorf(err.Error())
			return
		}
		if res.Errors {
			rep := res.Failed()
			for _, v := range rep {
				err = fmt.Errorf("Failed: %v, error msg: %v", *v, *(v.Error))
				log.LoggerWrapperWithCaller().Errorf(err.Error())
			}
			err = fmt.Errorf("bulk insert commit failed")
			log.LoggerWrapperWithCaller().Errorf(err.Error())
			return
		}
	}

	return
}

疑难杂症

在开发过程中,尤其是批量操作update过程中,很多人可能会遇到了一个script compliation 限制的错误:

Too many dynamic script compilations within one minute

通过搜索不难发现,当我们使用script功能的时候,在ES中需要对该脚本进行编译,但是ES对脚本编译有个限制的配置,可以通过下面操作来修改该解析上限的配置:

PUT /_cluster/settings
{
  "transient": {
    "script.max_compilations_per_minute": 40
  }
}

然而,对于大批量的数据更新通过这样的配置显然是治标不治本(比如一次更新上万条数据,这在ES这中存储介质中已经是很小很小的量了)。经过进一步分析,原来每当遇到一条不同的脚本时,ES都需要单独的编译解析,因此,当进行bulk update时,每一个脚本都实时编译的话,很快就会达到了上述的上限。

因此第一个需要注意的问题,bulk update操作时的脚本一定要尽量保持变化较少。

其实,对于上述问题,在大多数的更新场景中都是可以满足的。在大多数的更新场景中,我们更多都是去更新相同的字段,也因此批量更新的脚本基本都是同一个(或者几个),但仅仅是这样却并不能完全解决问题,在批量更新脚本编写的过程中,依然会有个坑等着你踩。

第二个需要注意的问题,就是脚本的编写了。这需要我们了解一个知识背景:ES如何认定一个脚本是否需要实时编译的。以一个简单例子举例,当更新用户会员年龄信息时,可能会是【1-100】岁之间的任意整数,如果时20岁,对应的脚本应该如下:

"script" : {
        "source": "ctx._source.age=20",
        "lang": "painless"
    }

按照这种方式就意味着,每一个年龄都会需要一条脚本对应着,那么最多会有100条脚本,从而在批量更新年龄的时候就很大概率会超过上面说到的编译脚本的限制。

那么有没有解决方法呢?当然是有的,这时候我们就需要结合script中的param功能,同样的操作,下面这条脚本,无论年龄在【0-100】中如何变化,对于ES中都只会在第一次进行解析:ctx._source.age=params.age这个脚本,之后便无需再次解析:

"script" : {
        "source": "ctx._source.age=params.age",
        "lang": "painless",
        "params" : {
            "age": 20
        }
    }
 

通过观察上述两种写法,不难发现,当脚本中有常数变量时,ES会实时编译脚本,这时候可以通过结合script中param的功能,设法将脚本中的变量通过param传递进去,从而从根本上解决脚本编译解析限制的问题。

原创声明,本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

登录 后参与评论
0 条评论

相关文章

  • Python更新Elasticsearch数据方法大全

    Elasticsearch 是一个实时的分布式搜索分析引擎,它能让你以前所未有的速度和规模,去探索你的数据。它被用作全文检索、结构化搜索、分析以及这三个功能的组...

    Python编程与实战
  • 深度数据全方位解析:冰桶挑战

    大数据文摘
  • 通过StreamSets实时更新数据至ElasticSearch

    网上许多关于StreamSets增量更新的教程几乎都是单单INSERT操作,这使得目标数据库会出现重复数据,而实际需求上我们往往更多是需要INSERT加UPDA...

    Qwe7
  • 【日志服务CLS】Nginx日志数据全方位大解析

    Nginx 是一个高性能的HTTP和反向代理web服务器,透过Nginx日志可以挖掘非常大的价值,比如诊断调优网站,监控网站稳定性,运营数据统计等。今天我们一起...

    日志服务CLS小助手
  • 《Elasticsearch 源码解析与优化实战》第6章:数据模型

    Elasticsearch(ES)可用于全文检索、日志分析、指标分析、APM等众多场景,而且搭建部署容易,后期弹性扩容、故障处理简单。ES在一定程度上实现了一套...

    HLee
  • 数据湖应用解析:Spark on Elasticsearch一致性问题

    在《Java虚拟机规范》的规定里,除了程序计数器外,虚拟机内存的其他几个运行时区域都有发生 OutOfMemoryError 异常的可能。

    不会飞的小鸟
  • Elasticsearch写入数据的过程是什么样的?以及是如何快速更新索引数据的?

    最近面试过程中遇到问Elasticsearch的问题不少,这次总结一下,然后顺便也了解一下Elasticsearch内部是一个什么样的结构,毕竟总不能就只了解个...

    纪莫
  • 数据库[分库分表]中间件 Sharding-JDBC 源码分析 —— SQL 解析(五)之更新SQL解析

    本文主要基于 Sharding-JDBC 1.5.0 正式版 1. 概述 2. UpdateStatement 3. #parse() 3.1 #skipBet...

    芋道源码
  • RedisJson 横空出世,性能碾压 ES 和 MongoDB !

    点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 |...

    芋道源码
  • RedisJson 横空出世,比 ES 快7 倍,惊爆了!

    点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 |...

    芋道源码
  • 10 月数据库排名:“三大王”无人能敌,PostgreSQL 紧随其后

    DB-Engines最近发布了2020年10月份的数据库排名。该网站根据数据库管理系统的受欢迎程度对其进行排名,实时统计了359种数据库的排名指数。前10名的排...

    逆锋起笔
  • 深入解析:你听说过Oracle数据库的更新重启动吗?

    杨廷琨 云和恩墨高级咨询顾问, ITPUB Oracle 数据库管理版版主 ,人称 “杨长老”,十数年如一日坚持进行 Oracle 技术研究与写作,号称 ...

    数据和云
  • Elasticsearch进阶教程:轻松构造一个全方位的信息检索系统

    搜索,已经成为我们生活中必不可少的一个重要部分,无论我们是在网上冲浪、工作办公、还是私人文件的处理,都需要一个搜索框方便我们快速找到所需的信息。而当我们的任务是...

    点火三周
  • 重要!!Elasticsearch 安全加固指南

    如果没有第三方加固,1.X——6.8之前版本,Elasticsearch 都属于“裸奔”状态。

    铭毅天下
  • Elasticsearch教程 | 第二篇:常见的参数配置

    Elasticsearch 提供了良好的默认值,并且只需要很少的配置。可以使用集群更新设置API在正在运行的集群上更改大多数设置 。

    一点博客
  • RedisJson发布官方性能报告,性能碾压ES和Mongo

    近期官网给出了RedisJson(RedisSearch)的性能测试报告,可谓碾压其他NoSQL,下面是核心的报告内容,先上结论:

    码农架构
  • RedisJson 横空出世,性能碾压ES和Mongo!

    近期官网给出了RedisJson(RedisSearch)的性能测试报告,可谓碾压其他NoSQL,下面是核心的报告内容,先上结论:

    Java团长
  • 基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

    Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用REST API用于文档搜...

    哲洛不闹
  • ELK介绍

    运维人员需要对系统和业务日志进行精准把控,便于分析系统和业务状态。日志分布在不同的服务器上,传统的使用传统的方法依次登录每台服务器查看日志,既繁琐又效率低下。所...

    星哥玩云

作者介绍

netkiddy
  • 《技思广益 · 腾讯技术人原创集》签约作者

腾讯高级后台开发

腾讯 · 高级后台开发 (已认证)

专栏

精选专题

活动推荐

关注

腾讯云开发者公众号
10元无门槛代金券
洞察腾讯核心技术
剖析业界实践案例
腾讯云开发者公众号二维码

扫码关注腾讯云开发者

领取腾讯云代金券