前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >lucene添加文档源码解析(第一篇)

lucene添加文档源码解析(第一篇)

原创
作者头像
LuceneReader
修改2020-02-10 17:14:35
8510
修改2020-02-10 17:14:35
举报

在lucene中可以对一篇文档进行添加、修改、删除操作,在这篇文章中我们详细介绍lucene添加文档的流程,对添加文档的源码进行解析。

在lucene中添加文档是通过IndexWriter.addDocument方法,我们先给出添加文档的示例代码

代码语言:javascript
复制
        IndexWriterConfig config = new IndexWriterConfig(new WhitespaceAnalyzer());
        config.setUseCompoundFile(false);
        config.setMaxBufferedDocs(2);
        IndexWriter writer = new IndexWriter(dir, config);
        //
        FieldType type = new FieldType();
        type.setStored(true);
        type.setTokenized(true);
        type.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
        type.setStoreTermVectors(true);
        type.setStoreTermVectorPositions(true);
        type.setStoreTermVectorOffsets(true);
        type.freeze();
        //
        Document doc = new Document();
        doc.add(new Field("content", "one", type));
        writer.addDocument(doc);

单文档添加的流程图

lucene添加文档流程图
lucene添加文档流程图

我们首先介绍添加文档前的处理流程,在lucene中添加文档前的处理源码如下。

代码语言:javascript
复制
  private boolean preUpdate() throws IOException, AbortingException {
    ensureOpen();
    boolean hasEvents = false;
    // 判断是否有需要待flush的DWPT
    if (flushControl.anyStalledThreads() || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
      // Help out flushing any queued DWPTs so we can un-stall:
      do {
        // Try pick up pending threads here if possible
        DocumentsWriterPerThread flushingDWPT;
        // 取出每一个DWPT进行flush
        while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
          // Don't push the delete here since the update could fail!
          // 对该flushingDWPT进行flush
          hasEvents |= doFlush(flushingDWPT);
        }
        
        flushControl.waitIfStalled(); // block if stalled
      } while (flushControl.numQueuedFlushes() != 0); // still queued DWPTs try help flushing
    }
    return hasEvents;
  }

1. DocumentsWriterFlushControl.anyStalledThreads

DocumentsWriterFlushControl通过DocumentsWriterStallControl成员变量中的stalled标志来判断当前是否处于停顿状态。持有相同IndexWriter对象的多个线程并发的添加、更新文档时,每一个线程都会获取一个ThreadState,该ThreadState持有的DWPT在处理完文档后,如果该ThreadState持有的DWPT达到flush条件,处理文档占用的内存会累加到flushBytes中,否则累加到activeBytes中。

多线程并发添加、更新文档会降低内存的健康度,flush一个段会提高内存的健康度,如果添加、更新文档的速度快于flush的速度,如果不控制添加、更新文档的线程,很容易造成内存OOM。lucene内部通过下面的条件来停顿添加、更新文档的线程。

代码语言:javascript
复制
    /*
     * we block indexing threads if net byte grows due to slow flushes
     * yet, for small ram buffers and large documents we can easily
     * reach the limit without any ongoing flushes. we need to ensure
     * that we don't stall/block if an ongoing or pending flush can
     * not free up enough memory to release the stall lock.
     */
     stall = (activeBytes + flushBytes) > limit && activeBytes < limit && !closed;

2. IndexWriterConfig.checkPendingFlushOnUpdate

该标志表示在添加、更新文档时是否检查有需要flush的段,如果该值设置为true,则处理待flush的段。

代码语言:javascript
复制
/** 
  if an indexing thread should check for pending flushes on update 
  in order to help out on a full flush
  */
protected volatile boolean checkPendingFlushOnUpdate = true;

2. DocumentsWriterFlushControl.nextPendingFlush解析

2.1 nextPendingFlush是取出一个待flush的段的DWPT

代码语言:javascript
复制
  DocumentsWriterPerThread nextPendingFlush() {
    int numPending;
    boolean fullFlush;
    synchronized (this) {
      final DocumentsWriterPerThread poll;
      // 取出添加到flushQueue中的DWPT
      if ((poll = flushQueue.poll()) != null) {
        // 更新线程停滞标志
        updateStallState();
        return poll;
      }
      fullFlush = this.fullFlush;
      numPending = this.numPending;
    }
    // 如果标记为flushPending状态的ThreadState的个数大于0,并且当前没有处于full flush状态(主动执行flush)
    if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
      // 处理每一个ThreadState,如果该ThreadState被标记为flushPending状态,尝试取出该ThreadState
      // 关联的的DWPT,返回该DWPT进行flush
      final int limit = perThreadPool.getActiveThreadStateCount();
      for (int i = 0; i < limit && numPending > 0; i++) {
        final ThreadState next = perThreadPool.getThreadState(i);
        if (next.flushPending) {
          final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next);
          if (dwpt != null) {
            return dwpt;
          }
        }
      }
    }
    return null;
  }

2.2 下面我们详细介绍tryCheckoutForFlush方法的源码

代码语言:javascript
复制
synchronized DocumentsWriterPerThread tryCheckoutForFlush(ThreadState perThread) {
   // 如果该ThreadState处于flushPending状态,尝试取出该ThreadState关联的DWPT
   return perThread.flushPending ? internalTryCheckOutForFlush(perThread) : null;
}
代码语言:javascript
复制
  private DocumentsWriterPerThread internalTryCheckOutForFlush(ThreadState perThread) {
    assert Thread.holdsLock(this);
    assert perThread.flushPending;
    try {
      // We are pending so all memory is already moved to flushBytes
      // 由于添加、更新文档是并发操作,可能其他的线程获得了该ThreadState的锁,
      // 如果无法获取该ThreadState的锁,返回空
      if (perThread.tryLock()) {
        try {
          // 如果该ThreadState中的DWPT已经初始化
          if (perThread.isInitialized()) {
            assert perThread.isHeldByCurrentThread();
            final DocumentsWriterPerThread dwpt;
            final long bytes = perThread.bytesUsed; // do that before
                                                         // replace!
            // 重置该ThreadState
            dwpt = perThreadPool.reset(perThread);
            assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
            // Record the flushing DWPT to reduce flushBytes in doAfterFlush
            // 记录该ThreadState关联的DWPT占用的内存,以便在doAfterFlush进行处理
            flushingWriters.put(dwpt, Long.valueOf(bytes));
            // 待flush的DWPT的个数递减
            numPending--; // write access synced
            return dwpt;
          }
        } finally {
          perThread.unlock();
        }
      }
      return null;
    } finally {
      updateStallState();
    }
  }

以上介绍了lucene添加、更新文档前的流程。下一篇文章 https://cloud.tencent.com/developer/article/1579685

中我们将介绍处理文档的详细逻辑。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2. DocumentsWriterFlushControl.nextPendingFlush解析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档