前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >lucene添加文档源码解析(第二篇)

lucene添加文档源码解析(第二篇)

原创
作者头像
LuceneReader
修改2020-02-10 17:14:22
7340
修改2020-02-10 17:14:22
举报

在lucene中添加文档是通过IndexWriter.addDocument方法,我们先给出添加文档的示例代码

代码语言:javascript
复制
        IndexWriterConfig config = new IndexWriterConfig(new WhitespaceAnalyzer());
        config.setUseCompoundFile(false);
        config.setMaxBufferedDocs(2);
        IndexWriter writer = new IndexWriter(dir, config);
        //
        FieldType type = new FieldType();
        type.setStored(true);
        type.setTokenized(true);
        type.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
        type.setStoreTermVectors(true);
        type.setStoreTermVectorPositions(true);
        type.setStoreTermVectorOffsets(true);
        type.freeze();
        //
        Document doc = new Document();
        doc.add(new Field("content", "one", type));
        writer.addDocument(doc);

上一篇文章中介绍了lucene添加、修改文档前的流程,在这一篇文章中,介绍处理文档的流程。

lucene添加文档的流程
lucene添加文档的流程

1. 获取ThreadState源码解析

lucene在处理文档前需要获取一个ThreadState

代码语言:javascript
复制
  ThreadState obtainAndLock() {
    // 从DocumentsWriterPerThreadPool中获取ThreadState
    final ThreadState perThread = perThreadPool.getAndLock(Thread
        .currentThread(), documentsWriter);
    boolean success = false;
    try {
      // 如果获取的ThreadState已经初始化DWPT,并且该ThreadState的deleteQueue
      // 和IndexWriter的deleteQueue不一致
      if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
        // There is a flush-all in process and this DWPT is
        // now stale -- enroll it for flush and try for
        // another DWPT:
        // 有一个full flush(IndexWriter.flush)操作正在进行中,如果该ThreadState关联的DWPT
        // 索引的文档数大于0,将该ThreadState添加到fullFlushBuffer中,否则重置该DWPT
        addFlushableState(perThread);
      }
      success = true;
      // simply return the ThreadState even in a flush all case sine we already hold the lock
      return perThread;
    } finally {
      if (!success) { // make sure we unlock if this fails
        perThreadPool.release(perThread);
      }
    }
  }
代码语言:javascript
复制
  /** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
  // 获取ThreadState
  ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
    ThreadState threadState = null;
    synchronized (this) {
      // 如果freeList中ThreadState为空,创建新的ThreadState,并且将其添加到threadStates中
      if (freeList.isEmpty()) {
        // ThreadState is already locked before return by this method:
        return newThreadState();
      } else {
        // Important that we are LIFO here! This way if number of concurrent indexing 
        // threads was once high, but has now reduced, we only use a
        // limited number of thread states:
        // LIFO(后进先出),获取最后一个ThreadState
        threadState = freeList.remove(freeList.size()-1);
        // 如果最后一个ThreadState中的DWPT为空
        if (threadState.dwpt == null) {
          // This thread-state is not initialized, e.g. it
          // was just flushed. See if we can instead find
          // another free thread state that already has docs
          // indexed. This way if incoming thread concurrency
          // has decreased, we don't leave docs
          // indefinitely buffered, tying up RAM.  This
          // will instead get those thread states flushed,
          // freeing up RAM for larger segment flushes:
          // 从头到尾遍历freeList中的ThreadState,寻找DWPT不为空的ThreadState,
          // 并且将其和最后一个元素进行交换
          for(int i=0;i<freeList.size();i++) {
            ThreadState ts = freeList.get(i);
            if (ts.dwpt != null) {
              // Use this one instead, and swap it with
              // the un-initialized one:
              freeList.set(i, threadState);
              threadState = ts;
              break;
            }
          }
        }
      }
    }
   
    // This could take time, e.g. if the threadState is [briefly] checked for flushing:
    // 对选中的ThreadState进行锁定
    threadState.lock();

    return threadState;
  } 

2. DocumentsWriterPerThread.updateDocument更新文档源码解析

代码语言:javascript
复制
  public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
    testPoint("DocumentsWriterPerThread addDocument start");
    assert deleteQueue != null;
    // pendingNumDocs计数递增
    reserveOneDoc();
    docState.doc = doc;
    docState.analyzer = analyzer;
    docState.docID = numDocsInRAM;
    if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
      infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
    }
    // Even on exception, the document is still added (but marked
    // deleted), so we don't need to un-reserve at that point.
    // Aborting exceptions will actually "lose" more than one
    // document, so the counter will be "wrong" in that case, but
    // it's very hard to fix (we can't easily distinguish aborting
    // vs non-aborting exceptions):
    boolean success = false;
    try {
      try {
        // 处理文档
        consumer.processDocument();
      } finally {
        // DocState清空状态
        docState.clear();
      }
      success = true;
    } finally {
      if (!success) {
        // 如果该文档处理异常,将该docId添加到pendingUpdates中,在flush的时候处理该docId
        // mark document as deleted
        deleteDocID(docState.docID);
        // 递增numDocsInRAM以便为下一篇文档的docId
        numDocsInRAM++;
      }
    }
    // 处理完文档后更新deleteSlice
    return finishDocument(delTerm);
  }

3. DocConsumer.processDocument源码解析

该方法会处理文档的每一个field,生成存储域、倒排索引、DocValue、Point类型

代码语言:javascript
复制
  public void processDocument() throws IOException, AbortingException {

    // How many indexed field names we've seen (collapses
    // multiple field instances by the same name):
    int fieldCount = 0;

    long fieldGen = nextFieldGen++;

    // NOTE: we need two passes here, in case there are
    // multi-valued fields, because we must process all
    // instances of a given field at once, since the
    // analyzer is free to reuse TokenStream across fields
    // (i.e., we cannot have more than one TokenStream
    // running "at once"):
    // 倒排索引处理前的操作
    termsHash.startDocument();
    // 开始处理存储域
    startStoredFields(docState.docID);

    boolean aborting = false;
    try {
      // 依次处理每一个field
      for (IndexableField field : docState.doc) {
        // 处理每一个field,涉及倒排索引、termVector的生成、存储域、DocValue、Point类型的处理
        fieldCount = processField(field, fieldGen, fieldCount);
      }
    } catch (AbortingException ae) {
      aborting = true;
      throw ae;
    } finally {
      if (aborting == false) {
        // Finish each indexed field name seen in the document:
        // 依次调用每一个field的finish方法
        for (int i=0;i<fieldCount;i++) {
          fields[i].finish();
        }
        // 结束处理存储域
        finishStoredFields();
      }
    }

    try {
      // 结束处理倒排索引
      termsHash.finishDocument();
    } catch (Throwable th) {
      // Must abort, on the possibility that on-disk term
      // vectors are now corrupt:
      throw AbortingException.wrap(th);
    }
  }

4. DocumentsWriterFlushControl.doAfterDocument源码解析

处理完文档后,会调用DocumentsWriterFlushControl.doAfterDocument进行后置处理。

代码语言:javascript
复制
  synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) {
    try {
      // 更新ThreadState占用的内存,如果该ThreadState处于flushPending状态,将占用的内存
      // 增加到flushBytes中,否则添加到activeBytes中
      commitPerThreadBytes(perThread);
      // 如果该ThreadState没有处于flushPending状态
      if (!perThread.flushPending) {
        // 如果是更新操作,调用FlushPolicy的onUpdate,否则调用onInsert
        if (isUpdate) {
          flushPolicy.onUpdate(this, perThread);
        } else {
          flushPolicy.onInsert(this, perThread);
        }
        // 如果ThreadState没有处于flushPending状态,并且ThreadState占用的内存大于hardMaxBytesPerDWPT
        // 设置该ThreadState为flushPending状态
        if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
          // Safety check to prevent a single DWPT exceeding its RAM limit. This
          // is super important since we can not address more than 2048 MB per DWPT
          setFlushPending(perThread);
        }
      }
      // checkout该ThreadState,下面描述该方法的流程
      return checkout(perThread, false);
    } finally {
      boolean stalled = updateStallState();
      assert assertNumDocsSinceStalled(stalled) && assertMemory();
    }
  }
代码语言:javascript
复制
  private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) {
    if (fullFlush) {
      // 处于full flush状态,如果该ThreadState处于flushPending状态,将该ThreadState添加到
      // blockedFlushes中, 并且取出一个待flush的DWPT, 否则返回空
      if (perThread.flushPending) {
        checkoutAndBlock(perThread);
        return nextPendingFlush();
      } else {
        return null;
      }
    } else {
      // 如果需要将该ThreadState设置为flushPending
      if (markPending) {
        assert perThread.isFlushPending() == false;
        setFlushPending(perThread);
      }
      // 取出该ThreadState相关的DWPT
      return tryCheckoutForFlush(perThread);
    }
  }

该文章描述了处理文档流程,并没有展开介绍field的具体处理逻辑,我们会在后面的文章中介绍不同type的field的具体文件格式。下一篇文章 https://cloud.tencent.com/developer/article/1579684 中我们会介绍处理文档后的流程。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档