前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch7.0.0~7.6.2版本bug:大量文档update之后refresh慢

Elasticsearch7.0.0~7.6.2版本bug:大量文档update之后refresh慢

原创
作者头像
杨松柏
修改2022-06-05 21:16:10
1.2K0
修改2022-06-05 21:16:10
举报
文章被收录于专栏:Elasticsearch实战Elasticsearch实战

1 背景

一次日常运营时,用户反馈ES无法写入数据,且出现写拒绝和写入队列堆积,集群健康状态为red。依据这些现象,查看了如下指标:

  • tasks 如图1-1 所示 task中有大量的refresh、write任务,且耗时非常久
图1-1 ES运行中的task
图1-1 ES运行中的task
  • translog 集群健康状态为red,原因为有索引的分片一直卡在初始化状态。依照以往经验,如果分片在没有shard limit的限制,一直卡在初始化状态;则shard恢复慢主要是在translog回放;通过查看translog文件,果不其然translog文件非常大,如图1-2 所示。
图1-2 ES中某索引的某shard的translog文件
图1-2 ES中某索引的某shard的translog文件
  • flush情况 按照正常情况,单个translog文件是不会这么大。因此查看了下flush的情况(flush会触发trasnlog切割),发现flush被卡住了无法执行;flush的耗时监控如图1-3所示
图1-3 ES节点的Flush耗时情况
图1-3 ES节点的Flush耗时情况
  • 堆栈信息 因为出现了refresh,write,flush等线程任务耗时异常的问题,所以可以通过jstack捕捉线程的运行情况,查看线程是怎样被卡住的;堆栈关键信息如下:
代码语言:txt
复制
"elasticsearch[node-10][flush][T#5]" #193 daemon prio=5 os_prio=0 cpu=247.85ms elapsed=9924.36s tid=0x00007f0bd4002800 nid=0x4e9 waiting for monitor entry  [0x00007f0a53af9000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.lucene.index.IndexWriter.prepareCommitInternal(IndexWriter.java:3197)
        - waiting to lock <0x00000004b91838d8> (a java.lang.Object)
        - locked <0x00000004b91838c0> (a java.lang.Object)
        at org.apache.lucene.index.IndexWriter.commitInternal(IndexWriter.java:3445)
        - locked <0x00000004b91838c0> (a java.lang.Object)
        at org.apache.lucene.index.IndexWriter.commit(IndexWriter.java:3410)
        at org.elasticsearch.index.engine.InternalEngine.commitIndexWriter(InternalEngine.java:2446)
        at org.elasticsearch.index.engine.InternalEngine.flush(InternalEngine.java:1780)
        at org.elasticsearch.index.shard.IndexShard.flush(IndexShard.java:1092)
        at org.elasticsearch.index.shard.IndexShard$6.doRun(IndexShard.java:3138)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:773)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13.0.1/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13.0.1/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@13.0.1/Thread.java:830)

"elasticsearch[node-10][refresh][T#3]" #236 daemon prio=5 os_prio=0 cpu=2113167.75ms elapsed=9552.19s tid=0x00007f0bd430c000 nid=0x729 runnable  [0x00007f0b413d4000]
   java.lang.Thread.State: RUNNABLE
        at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:564)
        at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:451)
        at org.apache.lucene.index.FrozenBufferedUpdates.apply(FrozenBufferedUpdates.java:421)
        at org.apache.lucene.index.FrozenBufferedUpdates.forceApply(FrozenBufferedUpdates.java:249)
        at org.apache.lucene.index.FrozenBufferedUpdates.tryApply(FrozenBufferedUpdates.java:159)
        at org.apache.lucene.index.IndexWriter.lambda$publishFrozenUpdates$3(IndexWriter.java:2592)
        at org.apache.lucene.index.IndexWriter$$Lambda$4640/0x0000000801952440.process(Unknown Source)
        at org.apache.lucene.index.IndexWriter.processEvents(IndexWriter.java:5116)
        at org.apache.lucene.index.IndexWriter.getReader(IndexWriter.java:507)
        - locked <0x00000004b91838d8> (a java.lang.Object)
        at org.apache.lucene.index.StandardDirectoryReader.doOpenFromWriter(StandardDirectoryReader.java:297)
        at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:272)
        at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:262)
        at org.apache.lucene.index.FilterDirectoryReader.doOpenIfChanged(FilterDirectoryReader.java:112)
        at org.apache.lucene.index.DirectoryReader.openIfChanged(DirectoryReader.java:165)
        at org.elasticsearch.index.engine.ElasticsearchReaderManager.refreshIfNeeded(ElasticsearchReaderManager.java:66)
        at org.elasticsearch.index.engine.ElasticsearchReaderManager.refreshIfNeeded(ElasticsearchReaderManager.java:40)
        at org.apache.lucene.search.ReferenceManager.doMaybeRefresh(ReferenceManager.java:176)
        at org.apache.lucene.search.ReferenceManager.maybeRefreshBlocking(ReferenceManager.java:253)
        at org.elasticsearch.index.engine.InternalEngine$ExternalReaderManager.refreshIfNeeded(InternalEngine.java:339)
        at org.elasticsearch.index.engine.InternalEngine$ExternalReaderManager.refreshIfNeeded(InternalEngine.java:321)
        at org.apache.lucene.search.ReferenceManager.doMaybeRefresh(ReferenceManager.java:176)
        at org.apache.lucene.search.ReferenceManager.maybeRefresh(ReferenceManager.java:225)
        at org.elasticsearch.index.engine.InternalEngine.refresh(InternalEngine.java:1606)
        at org.elasticsearch.index.engine.InternalEngine.maybeRefresh(InternalEngine.java:1585)
        at org.elasticsearch.index.shard.IndexShard.scheduledRefresh(IndexShard.java:3241)
        at org.elasticsearch.index.IndexService.maybeRefreshEngine(IndexService.java:791)
        at org.elasticsearch.index.IndexService.access$200(IndexService.java:104)
        at org.elasticsearch.index.IndexService$AsyncRefreshTask.runInternal(IndexService.java:925)
        at org.elasticsearch.common.util.concurrent.AbstractAsyncTask.run(AbstractAsyncTask.java:144)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13.0.1/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13.0.1/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@13.0.1/Thread.java:830)

从堆栈信息可以看出,refresh在FrozenBufferedUpdates.applyDocValuesUpdates卡住,并且没有释放0x00000004b91838d8 锁,而flush在等待该锁资源。

  • CPU耗时火焰图 从火焰图也可以看出org.apache.lucene.index.FrozenBufferedUpdates#applyDocValuesUpdates是最耗时间的占用了98%的CPU执行时间;如图1-4所示
图1-4 ES节点的火焰图
图1-4 ES节点的火焰图

2 故障原因

Q1 为什么refresh慢甚至夯住?

A1: es在6.5.0 之后引入soft_delete特性;es7.0.0之后其默认值为true ;但是其存在一个bug(ES52146Lucene9228 ),如果一个索引表存在大量update,则refresh会非常慢甚至夯住;该bug在7.7版本之后进行了修复;该bug的说明可查看本文参考部分的【1】【2】【4】

Q2 为什么translog的单个文件会超过13GB?

A2: refresh慢没有释放锁, flush等待refresh释放锁而卡主,无法执行flush;进而translog没有被正确处理(flush会触发translog文件上卷切割),translog文件变的越来越大

Q3 为什么写入超时,写入队列堆积和写入拒绝?

A3: 因为在写入添加文档时,同样会执行到FrozenBufferedUpdates.applyDocValuesUpdates,该方法会执行耗费大量时间,导致写入变慢;可以看如下堆栈信息

代码语言:txt
复制
"elasticsearch[node-3][write][T#5]" #232 daemon prio=5 os_prio=0 cpu=5315909.50ms elapsed=31254.63s tid=0x00007f99d8104800 nid=0x6f4d runnable  [0x00007f992dfdc000]
   java.lang.Thread.State: RUNNABLE
	at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:568)
	at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:451)
	at org.apache.lucene.index.FrozenBufferedUpdates.apply(FrozenBufferedUpdates.java:421)
	at org.apache.lucene.index.FrozenBufferedUpdates.forceApply(FrozenBufferedUpdates.java:249)
	at org.apache.lucene.index.FrozenBufferedUpdates.tryApply(FrozenBufferedUpdates.java:159)
	at org.apache.lucene.index.IndexWriter.lambda$publishFrozenUpdates$3(IndexWriter.java:2592)
	at org.apache.lucene.index.IndexWriter$$Lambda$4484/0x0000000801927040.process(Unknown Source)
	at org.apache.lucene.index.IndexWriter.processEvents(IndexWriter.java:5116)
	at org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1597)
	at org.apache.lucene.index.IndexWriter.softUpdateDocument(IndexWriter.java:1654)
	at org.elasticsearch.index.engine.InternalEngine.updateDocs(InternalEngine.java:1254)
	at org.elasticsearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1085)
	at org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:929)
	at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:811)
	at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:783)
	at org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnPrimary(IndexShard.java:740)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:258)
	at org.elasticsearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:161)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:193)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:118)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:79)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:917)
	at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:108)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.runWithPrimaryShardReference(TransportReplicationAction.java:394)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.lambda$doRun$0(TransportReplicationAction.java:316)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction$$Lambda$4128/0x0000000801886c40.accept(Unknown Source)
	at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
	at org.elasticsearch.index.shard.IndexShard.lambda$wrapPrimaryOperationPermitListener$22(IndexShard.java:2796)
	at org.elasticsearch.index.shard.IndexShard$$Lambda$4111/0x000000080187d840.accept(Unknown Source)
	at org.elasticsearch.action.ActionListener$3.onResponse(ActionListener.java:113)
	at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:285)
	at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:237)
	at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationPermit(IndexShard.java:2770)
	at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryOperationPermit(TransportReplicationAction.java:858)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:312)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at org.elasticsearch.action.support.replication.TransportReplicationAction.handlePrimaryRequest(TransportReplicationAction.java:275)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$$Lambda$2741/0x00000008014e2c40.messageReceived(Unknown Source)
	at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:63)
	at org.elasticsearch.transport.TransportService$7.doRun(TransportService.java:752)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:773)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13.0.1/ThreadPoolExecutor.java:1128)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13.0.1/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(java.base@13.0.1/Thread.java:830)

3 临时解决方案

长久的解决方案,最好是升级ES版本到7.7.0+

3.1 脏元数据处理

如果你的集群出现了有节点脏元数据,即该节点上的索引副本认为其是主分片,因为它上面含有最多的新数据(该索引副本主分片已经在其他节点分配完成了),现象表现为节点反复加入和离开集群;处理手段:停掉有问题的节点(因为有索引shard脏数据无法消费集群元信息),将有问题索引副本设置为0,再重启这些节点。

3.2 trasnlog清理

针对translog过大恢复卡住,shard一直处在初始化状态,恢复手段只有清除translog

  • 通过指定目录的方式清除translog
代码语言:txt
复制
bin/elasticsearch-shard remove-corrupted-data   --dir /es_home/storage/data/nodes/0/indices/UmZrtruITFyNyTngUlpvng/22/translog/ --truncate-clean-translog
  • 通过指定shard-id和索引名称进行清除
代码语言:txt
复制
bin/elasticsearch-shard  --index test  --shard-id 22 --truncate-clean-translog

进行translog文件清除需要,先停止节点;在节点停止的过程中为了避免分片在其他节点进行重新分配,可以进行如下设置

代码语言:txt
复制
//平衡与分配策略做调整是为了避免,脏数据的产生与平衡影响集群的恢复
PUT _cluster/settings
{
  "transient": {
    //不允许rebalance
    "cluster.routing.rebalance.enable": "none",
   //只允许新建的索引分配主分片
    "cluster.routing.allocation.enable": "new_primaries"
  }
}

完成清除之后重启节点,执行reroute命令

代码语言:txt
复制
POST /_cluster/reroute
{
  "commands" : [
    {
      "allocate_stale_primary" : {
        "index" : "test",
        "shard" : 22,
        "node" : "wW6OXqAqT1e6w7w8kc7222",
       //修改为true
        "accept_data_loss" : true
      }
    }
  ]
}

3.3 refresh慢临时解决方案

  • 调小索引刷新频率,index.refresh_interval设置为1s
  • 关闭index.soft_deletes.enabled,该值默认值为true,将其设置为false;什么是soft_deletes可看本文查看本文参考部分文档【2】
  • 写入并发调整:为避免并发写入过大,update的文档过多而refresh卡住,用户适当调小写入速度。

因为soft_deletes只有创建索引表的时候才能设置,所以只能进行reindex索引,完成reindex;并对原索引进行快照备份删除原始索引,并且将原始索引的名称设置为reindex之后索引的别名(避免用户侧需要改动代码)

reindex 570GB 大概36亿个文档,耗时约5个小时;reindex 速度提升slices

代码语言:txt
复制
POST _reindex?wait_for_completion=false&scroll=10m&slices=20
{
  "source": {
    "index": "test",
    "size": 9000
  },
  "dest": {
    "index": "test_v2"
  }
}

⚠️ 注意undefined reindex期间避免有任务误操作,往原索引继续写入数据,将原索引设置为只读PUT test/_settings { "index.blocks.read_only": true }

4 代码分析

4.1 代码调用链

如图4-1所示,refresh和fush在浅红色块,会有锁竞争的关系,竞争的锁分别为fullFlushLock和indexWrite。

图4-1 refresh和flush代码调用链
图4-1 refresh和flush代码调用链

在图4-1 中黄色块部分,如果有大量更新之后这块代码会非常耗时甚至夯住。

我们来看一下卡住的这一块代码的逻辑 org.apache.lucene.index.FrozenBufferedUpdates#applyDocValuesUpdates

代码语言:txt
复制
  private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState,
                                            Map<String, FieldUpdatesBuffer> updates,
                                            long delGen,
                                            boolean segmentPrivateDeletes) throws IOException {

    // TODO: we can process the updates per DV field, from last to first so that
    // if multiple terms affect same document for the same field, we add an update
    // only once (that of the last term). To do that, we can keep a bitset which
    // marks which documents have already been updated. So e.g. if term T1
    // updates doc 7, and then we process term T2 and it updates doc 7 as well,
    // we don't apply the update since we know T1 came last and therefore wins
    // the update.
    // We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so
    // that these documents aren't even returned.
    long updateCount = 0;
    // We first write all our updates private, and only in the end publish to the ReadersAndUpdates */
    final List<DocValuesFieldUpdates> resolvedUpdates = new ArrayList<>();
    //在es中该map只有一个key,即为__soft_deletes
    for (Map.Entry<String, FieldUpdatesBuffer> fieldUpdate : updates.entrySet()) {
      String updateField = fieldUpdate.getKey();
      DocValuesFieldUpdates dvUpdates = null;
      FieldUpdatesBuffer value = fieldUpdate.getValue();
      boolean isNumeric = value.isNumeric();
      FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
      FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
      TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, false);
      //主要问题点是该层循坏没有进行去重,在7.7.0版本之后,同一个Term只会执行一次
      while ((bufferedUpdate = iterator.next()) != null) {
        // TODO: we traverse the terms in update order (not term order) so that we
        // apply the updates in the correct order, i.e. if two terms update the
        // same document, the last one that came in wins, irrespective of the
        // terms lexical order.
        // we can apply the updates in terms order if we keep an updatesGen (and
        // increment it with every update) and attach it to each NumericUpdate. Note
        // that we cannot rely only on docIDUpto because an app may send two updates
        // which will get same docIDUpto, yet will still need to respect the order
        // those updates arrived.
        // TODO: we could at least *collate* by field?
        final DocIdSetIterator docIdSetIterator = termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue);
        if (docIdSetIterator != null) {
          final int limit;
          if (delGen == segState.delGen) {
            assert segmentPrivateDeletes;
            limit = bufferedUpdate.docUpTo;
          } else {
            limit = Integer.MAX_VALUE;
          }
          final BytesRef binaryValue;
          final long longValue;
          if (bufferedUpdate.hasValue == false) {
            longValue = -1;
            binaryValue = null;
          } else {
            longValue = bufferedUpdate.numericValue;
            binaryValue = bufferedUpdate.binaryValue;
          }
           termDocsIterator.getDocs();
          if (dvUpdates == null) {
            if (isNumeric) {
              if (value.hasSingleValue()) {
                dvUpdates = new NumericDocValuesFieldUpdates
                    .SingleValueNumericDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc(),
                    value.getNumericValue(0));
              } else {
                dvUpdates = new NumericDocValuesFieldUpdates(delGen, updateField, value.getMinNumeric(),
                    value.getMaxNumeric(), segState.reader.maxDoc());
              }
            } else {
              dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc());
            }
            resolvedUpdates.add(dvUpdates);
          }
          final IntConsumer docIdConsumer;
          final DocValuesFieldUpdates update = dvUpdates;
          if (bufferedUpdate.hasValue == false) {
            docIdConsumer = doc -> update.reset(doc);
          } else if (isNumeric) {
            docIdConsumer = doc -> update.add(doc, longValue);
          } else {
            docIdConsumer = doc -> update.add(doc, binaryValue);
          }
          final Bits acceptDocs = segState.rld.getLiveDocs();
          if (segState.rld.sortMap != null && segmentPrivateDeletes) {
            // This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
            int doc;
            while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
              if (acceptDocs == null || acceptDocs.get(doc)) {
                // The limit is in the pre-sorted doc space:
                if (segState.rld.sortMap.newToOld(doc) < limit) {
                  docIdConsumer.accept(doc);
                  updateCount++;
                }
              }
            }
          } else {
            int doc;
            while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
              if (doc >= limit) {
                break; // no more docs that can be updated for this term
              }
              if (acceptDocs == null || acceptDocs.get(doc)) {
                docIdConsumer.accept(doc);
                updateCount++;
              }
            }
          }
        }
      }
    }

    // now freeze & publish:
    for (DocValuesFieldUpdates update : resolvedUpdates) {
      if (update.any()) {
        update.finish();
        segState.rld.addDVUpdate(update);
      }
    }

    return updateCount;
  }

通过对applyDocValuesUpdates进行断点debug和es7.7.0版本之后的代码做对比;可以发现问题主要出现在bufferedUpdate = iterator.next()这一行代码,没有去重term执行。假如软更新文档10w次,那么会出现10w*10W迭代【1】。

debug测试代码块如下:

代码语言:txt
复制
package com.dirk.soft.delete;

import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Date;

/**
 * Created by
 *
 * @Author : yang
 * @create 2022/6/5 09:49
 */
public class SoftDeletesTest1 {
    private final static String indexPath = "/Users/yang/workspace/github/lucene-learn/docpath";
    private Directory dir = null;
    {
        try {
            dir = FSDirectory.open(Paths.get(indexPath));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //索引存放目录
    // 放在方法外 这个变量能高亮显示
    private IndexWriter indexWriter;
    IndexWriterConfig indexWriterConfig;
    public void doIndexAndSearch() throws Exception {
        indexWriterConfig = new IndexWriterConfig(new WhitespaceAnalyzer());
        indexWriterConfig.setSoftDeletesField("__soft_deletes");
        indexWriterConfig.setUseCompoundFile(true);
        indexWriterConfig.setMergePolicy(NoMergePolicy.INSTANCE);
        indexWriter = new IndexWriter(dir, indexWriterConfig);
        Document doc;
        // 文档0
        doc = new Document();
        doc.add(new StringField("_id", "0", Field.Store.YES));
        doc.add(new NumericDocValuesField("docValuesFiled", 2));
        doc.add(new StringField("_version", "0", Field.Store.YES));
        indexWriter.addDocument(doc);
        // 文档1
        doc = new Document();
        doc.add(new StringField("_id", "1", Field.Store.YES));
        doc.add(new NumericDocValuesField("docValuesFiled", 3));
        doc.add(new StringField("_version", "1", Field.Store.YES));
        indexWriter.addDocument(doc);
        // 第一次软删除
        Document newDoc = new Document();
        newDoc.add(new StringField("_id", "1", Field.Store.YES));
        newDoc.add(new StringField("_version", "2", Field.Store.YES));
        indexWriter.softUpdateDocument(new Term("_id", "1"), newDoc, new NumericDocValuesField("__soft_deletes", 3));
        // 第二次软删除
        newDoc = new Document();
        newDoc.add(new StringField("_id", "1", Field.Store.YES));
        newDoc.add(new StringField("_version", "3", Field.Store.YES));
        indexWriter.softUpdateDocument(new Term("_id", "1"), newDoc, new NumericDocValuesField("__soft_deletes", 3));
        // 第三次软删除
        newDoc = new Document();
        newDoc.add(new StringField("_id", "1", Field.Store.YES));
        newDoc.add(new StringField("_version", "4", Field.Store.YES));
        indexWriter.softUpdateDocument(new Term("_id", "2"), newDoc, new NumericDocValuesField("__soft_deletes", 3));
        indexWriter.commit();
        DirectoryReader readerBeforeMerge = DirectoryReader.open(indexWriter);
        ScoreDoc[] scoreDocs = (new IndexSearcher(readerBeforeMerge)).search(new MatchAllDocsQuery(), 100).scoreDocs;
        for (ScoreDoc scoreDoc : scoreDocs) {
            System.out.println("time:"+new Date() +";  docId: 文档" + scoreDoc.doc +
                    ", FieldValue of Field abc: " + readerBeforeMerge.document(scoreDoc.doc).get("_id") +
                    ",_version:"+readerBeforeMerge.document(scoreDoc.doc).get("_version"));
        }
    }

    public static void main(String[] args) throws Exception{
        SoftDeletesTest1 test = new SoftDeletesTest1();
        test.doIndexAndSearch();
    }
}

执行结果

代码语言:txt
复制
time:Sun Jun 05 17:22:15 CST 2022;  docId: 文档0, FieldValue of Field abc: 0,_version:0
time:Sun Jun 05 17:22:15 CST 2022;  docId: 文档3, FieldValue of Field abc: 1,_version:3
time:Sun Jun 05 17:22:15 CST 2022;  docId: 文档4, FieldValue of Field abc: 1,_version:4

debug的applyDocValuesUpdates,如图4-2和4-3所示

图4-2 applyDocValuesUpdates的debug1
图4-2 applyDocValuesUpdates的debug1
图4-2 applyDocValuesUpdates的debug2
图4-2 applyDocValuesUpdates的debug2

5 本地复现

5.1 测试代码

org.elasticsearch.index.engine.EngineTestCase#testRefresh

代码语言:txt
复制
public void  testRefresh(){
        long startTime = System.currentTimeMillis();
        int i  = 0 ;
        while (i++<100000) {
            try {
                //更新文档id为1的文档10w次
                engine.index(indexForDoc(createParsedDoc("1", null)));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        logger.info("updating 耗时 :{}ms",(System.currentTimeMillis() - startTime));
        startTime = System.currentTimeMillis();
        engine.refresh("test", randomFrom(Engine.SearcherScope.values()), randomBoolean());
        logger.info("refresh耗时 :{}ms",(System.currentTimeMillis() - startTime));
    }

5.2 测试结果

分别测试对es7.5.2版本和es7.10.2进行了测试:

  • 10w次更新,refresh耗时: es7.5.2 10次测试结果中7次耗时1s以内,其余三次在30s+; es7.10.2 10次测试结果均在1s以内
  • 50W次更新,refresh耗时: es7.5.2 Suite timeout exceeded (>= 1200000 msec); es7.10.2 10次测试结果均在3s以内

5.3 待进一步探索的问题

为什么测试10W、50W更新的时候,7.5.2时间有长有短?按照逻辑来说应该执行refresh时间比较长?

6 参考

【1】 总结近期遇到的几个问题

【2】 Elasticsearch 7.4的 soft-deletes 是个什么鬼

【3】 清除translog

【4】 soft-delete 可能导致 write queue 持续积压的问题

【5】 elasticsearch translog恢复到一定百分比卡住(stuck),导致索引无法恢复

【6】 Lucene软删除

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 背景
  • 2 故障原因
  • 3 临时解决方案
    • 3.1 脏元数据处理
      • 3.2 trasnlog清理
        • 3.3 refresh慢临时解决方案
        • 4 代码分析
          • 4.1 代码调用链
          • 5 本地复现
            • 5.1 测试代码
              • 5.2 测试结果
                • 5.3 待进一步探索的问题
                • 6 参考
                相关产品与服务
                Elasticsearch Service
                腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档