Elasitcsearch 底层系列 Lucene 内核解析之 Stored Fields

背景

Lucene 的 stored fields 主要用于行存文档需要保存的字段内容,每个文档的所有 stored fields 保存在一起,在查询请求需要返回字段原始值的时候使用。Elasticsearch(ES) 一些内置的字段默认开启了 store 属性,例如 _id、_source 字段。_id 字段用于标识文档,不能关闭。 _source 字段保存原始的文档 json 内容,可以关闭。用户定义的其它字段需要在 ES 的 mapping 中显示设置 field 的 store 属性为 true,该字段才会被 store。在查询的时候返回该字段的原始值。设置方法请参考:

https://www.elastic.co/guide/en/elasticsearch/reference/6.3/mapping-store.html

本文主要分析 stored fields 的三个流程及文件存储结构,基于 lucene 7.3.0 版本。

  • 基本框架
  • 文件结构
  • 写入流程
  • 读取流程
  • 合并(merge)流程

基本框架

进入各个流程之前,我们先来看一下 store fields 相关的类结构。下图中蓝色部分是写入相关的类,红色部分是读取相关的类。这里我们主要分析带压缩的场景,我们在 mapping 中指定的压缩模式例如 best_compress 主要是针对 store fields 的压缩。

读取主要是由中间的 CompressionStoredFieldsReader 类实现,主要的方法是 visitDocument,传递 docID 返回对应文档的 store fields。

写入和合并主要是由中间的 CompressionStoredFieldsWriter 类实现。合并过程也会调用读取的流程。

Store Fields Class Diagram

文件结构

Store fields 对应的 lucene 存储文件是 fdx、fdt。fdt 文件保存数据,fdx 保存 fdt 文件的索引数据。查询某个文档的 store field 时先在 fdx 中查询文档所在的文件偏移,再读取 fdt 文件的对应位置的内容。

Fdt 文件以 chunk 为单位写入,一个 chunk 最大60k,最多包含512个文档的 store fields。Fdx 文件会在逻辑上切分多个 chunk 为一个 block,一个 block 最多包含1024个 chunk。这样切分便于快速定位一个文档所在的 chunk 位置,准确的解压对应的 chunk 并提取文档的 store fields。

Fdt、fdx 文件均由三部分组成:文件头,数据部分,文件尾。

fdt、fdx 公共文件头
fdx 文件结构
fdt 文件结构

文档的 Store fields 查找获取方法:

  1. 先利用 DocID 根据每个 block 的 DocBase 二分查找定位文档属于哪个 block。
  2. 然后根据每个 chunk 的 DocBase 二分查找定位文档属于 block 中的哪个 chunk。 第n个 chunk 的 DocID 位置: DocBase + AvgChunkDocs * n + DocBaseDeltasn
  3. 得到属于哪个 chunk 之后,就可以通过这个 chunk 的 startPointer (StartPointerDeltachunk 编号) 计算出对应的文件位置。 第n个 chunk 的文档指针位置: StartPointerBase + AvgChunkSiez * n + StartPointerDeltasn

接下来分析代码的写入流程,看看这些数据是如何一块块写入文件的。

写入流程

写入流程

ES 通过调用 lucene 的 IndexWriter.updateDocument 对文档进行索引存储。该函数会调用 DefaultIndexChain.processDocument 函数,该函数是处理文档每个元素(store fields、doc value、倒排索引、point 等)写入流程的入口,stored field 是其中一个流程。接下来详细分析,写入流程可以和前面的文件结构部分对照着看会更清晰。

processDocument 函数中 strore fields 初始化调用链:

DefaultIndexingChain.processDocument() -> DefaultIndexingChain.startStoredFields() -> StoredFieldsConsumer.startDocument() -> StoredFieldsConsumer.initStoredFieldsWriter()

initStoredFieldsWriter 函数调用 Lucene50StoredFieldsFormat 初始化对应的 format:

new CompressingStoredFieldsFormat("Lucene50StoredFieldsHigh", CompressionMode.HIGH_COMPRESSION, 61440(chunkSize), 512(单个chunk最大文档数), 1024(blockSize));

initStoredFieldsWriter 函数调用 CompressingStoredFieldsWriter 构造函数初始化 store field writer 对象:

CompressingStoredFieldsWriter 类
/** Sole constructor. */
  public CompressingStoredFieldsWriter(Directory directory, SegmentInfo si, String segmentSuffix, IOContext context,
      String formatName, CompressionMode compressionMode, int chunkSize, int maxDocsPerChunk, int blockSize) throws IOException {
    assert directory != null;
    this.segment = si.name; // segment_N 文件的编号 _N
    this.compressionMode = compressionMode; // 压缩模式,选 best_compress 此处为 HIGH_COMPRESSION
    this.compressor = compressionMode.newCompressor();
    this.chunkSize = chunkSize; // 默认的 chunkSize 为 60 * 1024 = 61440
    this.maxDocsPerChunk = maxDocsPerChunk; // 512
    this.docBase = 0;
    // 分配该 doc 的 store field buffer,实际大小会比 chunkSize 多1/8,因为每次扩容会扩原有大小的1/8
    this.bufferedDocs = new GrowableByteArrayDataOutput(chunkSize); // length=69120
    this.numStoredFields = new int[16]; // 长度是缓存的 doc 数量,自扩容,每个元素是单个 doc 中的所有 stored field 的总数量
    this.endOffsets = new int[16]; // 长度是缓存的 doc 数量,自扩容,每个元素是单个 doc 的所有 stored field 的 value 的总长度
    this.numBufferedDocs = 0; // flush 之前缓存的 doc 数量

    boolean success = false;
    // 创建对应的 fdt fdx 文件
    IndexOutput indexStream = directory.createOutput(IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_INDEX_EXTENSION), 
                                                                     context);
    try {
      fieldsStream = directory.createOutput(IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_EXTENSION),
                                                    context);

      final String codecNameIdx = formatName + CODEC_SFX_IDX;
      final String codecNameDat = formatName + CODEC_SFX_DAT;
      
       // 写 fdt, fdx 文件头
       // 内部的CODEC_MAGIC 32位模数:0x3fd76c17 主要用于校验文件完整性
      CodecUtil.writeIndexHeader(indexStream, codecNameIdx, VERSION_CURRENT, si.getId(), segmentSuffix);
      CodecUtil.writeIndexHeader(fieldsStream, codecNameDat, VERSION_CURRENT, si.getId(), segmentSuffix);
      assert CodecUtil.indexHeaderLength(codecNameDat, segmentSuffix) == fieldsStream.getFilePointer();
      assert CodecUtil.indexHeaderLength(codecNameIdx, segmentSuffix) == indexStream.getFilePointer();

      // 里面会写 fdx 文件 packedInts 版本号
      indexWriter = new CompressingStoredFieldsIndexWriter(indexStream, blockSize);
      indexStream = null;

      // 写 fdt 文件 chunkSize 和 packedInts 版本号
      fieldsStream.writeVInt(chunkSize);
      fieldsStream.writeVInt(PackedInts.VERSION_CURRENT);

      success = true;
    } finally {
      if (!success) {
        IOUtils.closeWhileHandlingException(fieldsStream, indexStream, indexWriter);
      }
    }
  }

上述初始化完毕之后,由 DefaultIndexChain processField() 函数处理 doc 中每个 store field,主要处理逻辑:

// Add stored fields:
    if (fieldType.stored()) {
      if (fp == null) {
      // 每个字段只会保存一个 PerField 对象。
        fp = getOrAddField(fieldName, fieldType, false);
      }
      if (fieldType.stored()) {
        String value = field.stringValue();
        if (value != null && value.length() > IndexWriter.MAX_STORED_STRING_LENGTH) {
          throw new IllegalArgumentException("stored field \"" + field.name() + "\" is too large (" + value.length() + " characters) to store");
        }
        try {
          // 写 stored field 进内存
          storedFieldsConsumer.writeField(fp.fieldInfo, field);
        } catch (Throwable th) {
          throw AbortingException.wrap(th);
        }
      }
    }

接下来分析 storedFieldsConsumer.writeField() 的执行过程。

由于 stored fields 会使用 LZ4 或者 DEFLATE 算法压缩(取决于codec compress选项的配置),因此 storedFieldsConsumer 会调用 CompressingStoredFieldsWriter 来完成写的动作。代码如下:

@Override
  public void writeField(FieldInfo info, IndexableField field)
      throws IOException {

    // 保存当前正在写的文档中 store field 的数量
    ++numStoredFieldsInDoc;

    int bits = 0;
    final BytesRef bytes;
    final String string;

    // 获取 store field 的类型,int 型 bits 用于记录类型
    Number number = field.numericValue();
    if (number != null) {
      if (number instanceof Byte || number instanceof Short || number instanceof Integer) {
        bits = NUMERIC_INT;
      } else if (number instanceof Long) {
        bits = NUMERIC_LONG;
      } else if (number instanceof Float) {
        bits = NUMERIC_FLOAT;
      } else if (number instanceof Double) {
        bits = NUMERIC_DOUBLE;
      } else {
        throw new IllegalArgumentException("cannot store numeric type " + number.getClass());
      }
      string = null;
      bytes = null;
    } else {
      bytes = field.binaryValue();
      if (bytes != null) {
        bits = BYTE_ARR;
        string = null;
      } else {
        bits = STRING;
        string = field.stringValue();
        if (string == null) {
          throw new IllegalArgumentException("field " + field.name() + " is stored but does not have binaryValue, stringValue nor numericValue");
        }
      }
    }

    // TYPE_BITS用于保存 bits 的位数(上面的 bits 最大的类型为 NUMERIC_DOUBLE = 0x05 需要3位保存),
    // infoAndBits 中最后 TYPE_BITS 位用于保存类型,前面的高位保存这个 field 的编号(所有 field 在 doc 中都是顺序编号的)。
    final long infoAndBits = (((long) info.number) << TYPE_BITS) | bits;
    bufferedDocs.writeVLong(infoAndBits);

    // 根据 field 的类型将 value 写入 buffer
    if (bytes != null) {
      bufferedDocs.writeVInt(bytes.length);
      bufferedDocs.writeBytes(bytes.bytes, bytes.offset, bytes.length);
    } else if (string != null) {
      bufferedDocs.writeString(string);
    } else {
      if (number instanceof Byte || number instanceof Short || number instanceof Integer) {
        bufferedDocs.writeZInt(number.intValue());
      } else if (number instanceof Long) {
        writeTLong(bufferedDocs, number.longValue());
      } else if (number instanceof Float) {
        writeZFloat(bufferedDocs, number.floatValue());
      } else if (number instanceof Double) {
        writeZDouble(bufferedDocs, number.doubleValue());
      } else {
        throw new AssertionError("Cannot get here");
      }
    }
  }

所有字段写入 buffer 完毕后会调用 CompressingStoredFieldsWriter finishDocument() 更新统计索引数据,及判断是否触发 flush。

  @Override
  public void finishDocument() throws IOException {
    // 判断文档数是否超长,超长则扩容
    if (numBufferedDocs == this.numStoredFields.length) { 
      final int newLength = ArrayUtil.oversize(numBufferedDocs + 1, 4);
      this.numStoredFields = Arrays.copyOf(this.numStoredFields, newLength);
      endOffsets = Arrays.copyOf(endOffsets, newLength);
    }
    // 保存文档中的 stored field 数量及该文档 stored field value 总大小的偏移量。
    this.numStoredFields[numBufferedDocs] = numStoredFieldsInDoc;
    numStoredFieldsInDoc = 0;
    // 这里的 endOffsets 记录 bufferedDocs 中每个 doc 的n个 store fields 的结束位置
    endOffsets[numBufferedDocs] = bufferedDocs.getPosition();
    ++numBufferedDocs;
    // 检查总大小是否大于一个 chunkSize 61440,以及是否大于单个 chunkSize 最大文档数512,任意条件满足立即触发 flush.
    if (triggerFlush()) {
      flush();
    }
  }

flush 由上层函数控制,周期性或者在 heap 使用较多的时候触发,调用链:

DefaultIndexingChain.flush(控制所有对象如 doc value,store field,point 等的 flush) -> storedFieldsConsumer.flush() -> CompressingStoredFieldsWriter.finish()

CompressingStoredFieldsWriter finish 函数逻辑:

  @Override
  public void finish(FieldInfos fis, int numDocs) throws IOException {
    if (numBufferedDocs > 0) { 
      flush(); // 真正的store fields flush 逻辑在内部的 flush 函数处理,下面会描述。
      numDirtyChunks++; // incomplete: we had to force this flush
    } else {
      assert bufferedDocs.getPosition() == 0;
    }
    if (docBase != numDocs) {
      throw new RuntimeException("Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
    }
    // 处理 fdx 的 finish 流程,写 block, 写文件尾。
    indexWriter.finish(numDocs, fieldsStream.getFilePointer());
    // 写 fdt 文件尾,包括 chunk 数量,被使用的 chunk 数量,以及尾模数等。
    fieldsStream.writeVLong(numChunks);
    fieldsStream.writeVLong(numDirtyChunks);
    CodecUtil.writeFooter(fieldsStream);
    assert bufferedDocs.getPosition() == 0;
  }

CompressingStoredFieldsWriter 内部的 flush 函数主要有两个作用:

  1. 根据压缩模式压缩缓存文档的 Store Fields。
  2. 写入磁盘(close前在缓冲区)。

flush 函数的触发时机有以下几种情况:

  1. 整个 index flush 周期到了触发。
  2. segement merge 的时候触发。merge 之前会 flush 所有内存中的 Store Fields。
  3. 写一个文档结束的时候(调用 finishDocument() 时),当一个 chunk 缓存的 doc 数量超过最大值(512)时触发。
  4. 写一个文档结束的时候(调用 finishDocument() 时),当一个 chunk 缓存的 doc 超过 chunkSize(61440)时触发。
  private void flush() throws IOException {
    //写 fdt 的索引文件 fdx,此函数计算并记录 block、chunk、以及相关 delta 的关系。后面详述。
    indexWriter.writeIndex(numBufferedDocs, fieldsStream.getFilePointer());

    // transform end offsets into lengths
    final int[] lengths = endOffsets;
    for (int i = numBufferedDocs - 1; i > 0; --i) {
      lengths[i] = endOffsets[i] - endOffsets[i - 1];
      assert lengths[i] >= 0;
    }
    final boolean sliced = bufferedDocs.getPosition() >= 2 * chunkSize;
    // 写 chunk 头
    // docBase: chunk 中的起始文档编号
    // numBufferedDocs: chunk 中缓存的文档数
    // numStoredFields: 数组,保存每个文档中的 store field 数量
    // lengths: 数组,保存每个文档中的 store field 长度
    // sliced: 是否有 chunk 切片(跨 chunk)。
    writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced);

    // compress stored fields to fieldsStream
    if (sliced) {
      // big chunk, slice it
      for (int compressed = 0; compressed < bufferedDocs.getPosition(); compressed += chunkSize) {
        compressor.compress(bufferedDocs.getBytes(), compressed, Math.min(chunkSize, bufferedDocs.getPosition() - compressed), fieldsStream);
      }
    } else {
      // 压缩 stored fields value 并写入文件,压缩是按 chunk 来压缩的
      compressor.compress(bufferedDocs.getBytes(), 0, bufferedDocs.getPosition(), fieldsStream);
    }

    // reset
    docBase += numBufferedDocs;
    numBufferedDocs = 0;
    bufferedDocs.reset();
    numChunks++;
  }

在上面的 private flush 函数中调用的 writeIndex(这里的 Index 指的是 fdt 文件的 index fdx)函数:

   void writeIndex(int numDocs, long startPointer) throws IOException {
    if (blockChunks == blockSize) { //判断一个 block 是否已经满了1024个 chunk,满了就写 block。
      writeBlock();
      reset();
    }

    if (firstStartPointer == -1) {
      // 一个 block 中的 store field 的起始位置(除开 header 等信息)
      firstStartPointer = maxStartPointer = startPointer;
    }
    assert firstStartPointer > 0 && startPointer >= firstStartPointer;

    docBaseDeltas[blockChunks] = numDocs; //当前 chunk 中的文档数量
    startPointerDeltas[blockChunks] = startPointer - maxStartPointer; //当前 chunk 的偏移量

    ++blockChunks; // block 中的 chunk 编号
    blockDocs += numDocs; // block 中的文档数
    totalDocs += numDocs; // 总的文档数
    maxStartPointer = startPointer; // block 中的最大偏移量
  }

写 block 信息。触发写 block 信息的场景有以下几种:

  1. block 中的 chunkSize 达到了最大值1024.
  2. 整个 index flush 周期到了触发。在上述的 finish 函数中调用。
  private void writeBlock() throws IOException {
    assert blockChunks > 0;
    fieldsIndexOut.writeVInt(blockChunks);

    // The trick here is that we only store the difference from the average start
    // pointer or doc base, this helps save bits per value.
    // And in order to prevent a few chunks that would be far from the average to
    // raise the number of bits per value for all of them, we only encode blocks
    // of 1024 chunks at once
    // See LUCENE-4512

    // doc bases
    final int avgChunkDocs;
    if (blockChunks == 1) {
      avgChunkDocs = 0;
    } else {
      // 这里减掉最后一个 chunk 的数量 因为最后一个有可能未满
      // (block 中的文档总数 - 最后一个chunk的文档数) / block 中的 chunk 数量
      avgChunkDocs = Math.round((float) (blockDocs - docBaseDeltas[blockChunks - 1]) / (blockChunks - 1));
    }
    // block 的起始 doc 编号 segment 总文档数 - 当前 block 中的文档数
    fieldsIndexOut.writeVInt(totalDocs - blockDocs); // docBase
    fieldsIndexOut.writeVInt(avgChunkDocs);
    int docBase = 0;
    long maxDelta = 0;
    // 计算 chunk 最大文档数
    for (int i = 0; i < blockChunks; ++i) {
      final int delta = docBase - avgChunkDocs * i;
      maxDelta |= zigZagEncode(delta);
      docBase += docBaseDeltas[i];
    }

    // 最大文档数需要多少位存储
    final int bitsPerDocBase = PackedInts.bitsRequired(maxDelta);
    fieldsIndexOut.writeVInt(bitsPerDocBase);
    PackedInts.Writer writer = PackedInts.getWriterNoHeader(fieldsIndexOut,
        PackedInts.Format.PACKED, blockChunks, bitsPerDocBase, 1);
    docBase = 0;
    // 写每个 chunk 文档数 delta
    for (int i = 0; i < blockChunks; ++i) {
      final long delta = docBase - avgChunkDocs * i;
      assert PackedInts.bitsRequired(zigZagEncode(delta)) <= writer.bitsPerValue();
      writer.add(zigZagEncode(delta));
      docBase += docBaseDeltas[i];
    }
    writer.finish();

    // 该 block 在 fdx 文件的起始位置指针
    // start pointers
    fieldsIndexOut.writeVLong(firstStartPointer);
    final long avgChunkSize;
    if (blockChunks == 1) {
      avgChunkSize = 0;
    } else {
      avgChunkSize = (maxStartPointer - firstStartPointer) / (blockChunks - 1);
    }
    // chunk 的平均大小
    fieldsIndexOut.writeVLong(avgChunkSize);
    long startPointer = 0;
    maxDelta = 0; //最大 chunk 的大小
    for (int i = 0; i < blockChunks; ++i) {
      startPointer += startPointerDeltas[i];
      final long delta = startPointer - avgChunkSize * i;
      maxDelta |= zigZagEncode(delta);
    }

    // 最大 chunk 大小需要多少位存储
    final int bitsPerStartPointer = PackedInts.bitsRequired(maxDelta);
    fieldsIndexOut.writeVInt(bitsPerStartPointer);
    writer = PackedInts.getWriterNoHeader(fieldsIndexOut, PackedInts.Format.PACKED,
        blockChunks, bitsPerStartPointer, 1);
    startPointer = 0;
    // 写每个 chunk 大小的偏移
    for (int i = 0; i < blockChunks; ++i) {
      startPointer += startPointerDeltas[i];
      final long delta = startPointer - avgChunkSize * i;
      assert PackedInts.bitsRequired(zigZagEncode(delta)) <= writer.bitsPerValue();
      writer.add(zigZagEncode(delta));
    }
    writer.finish();
  }

读取流程

读的过程比较简单,index open 的时候会初始化 CompressingStoredFieldsReade对象,该类负责 store field 的读取。

初始化 CompressingStoredFieldsReader 的时候,会同时初始化一个 CompressingStoredFieldsIndexReader,该类负责 fdx 即索引文件的读取。

CompressingStoredFieldsIndexReader 初始化过程中,会读取 fdx 文件全部内容,读取 fdt 文件的头尾信息。中间真正 store fields 的内容不会直接读取,而是在该类中如下 visitDocument 函数根据 docID 计算出指定位置读取。

@Override
  public void visitDocument(int docID, StoredFieldVisitor visitor)
      throws IOException {

    // 根据 docID 偏移量,注意此处 ID 为 segment level 偏移量,获取 doc 对象,该 doc 包含其所有 store fields.
    // 该函数内部调用后面有描述。
    final SerializedDocument doc = document(docID);

    // 遍历 doc 中所有的 fields,取出对应位置的 field,放到 FieldsVisitor的 Map<String, List<Object>> fieldsValues; 对象中。
    // FieldsVisitor 对象继承了上面的 StoredFieldVisitor,是具体的实现类。
    for (int fieldIDX = 0; fieldIDX < doc.numStoredFields; fieldIDX++) {
      final long infoAndBits = doc.in.readVLong();
      final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
      final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);

      final int bits = (int) (infoAndBits & TYPE_MASK);
      assert bits <= NUMERIC_DOUBLE: "bits=" + Integer.toHexString(bits);

      // 判断 field 是否需要提取,如果不需要提取就会跳过。
      switch(visitor.needsField(fieldInfo)) {
        case YES:
          readField(doc.in, visitor, fieldInfo, bits);
          break;
        case NO:
          if (fieldIDX == doc.numStoredFields - 1) {// don't skipField on last field value; treat like STOP
            return;
          }
          skipField(doc.in, bits);
          break;
        case STOP:
          return;
      }
    }
  }

上面的 visitDocument 调用的 document 函数细节:

  SerializedDocument document(int docID) throws IOException {
    // 这里的 state 对象保存了当前已读取的所有 block 中的 chunk 信息
    // 如果当前内存中的 state 已包含当前 doc,则直接走后面提取流程。
    if (state.contains(docID) == false) {
      // 根据 docID 计算 fdx 保存的内容,获取 docID 对应 chunk 在 fdt 文件偏移量并跳转,下面详述
      fieldsStream.seek(indexReader.getStartPointer(docID));
      // 从上面 seek 的位置开始读取该 block 的 chunk 文件内容,填充到 state,即 state 就包含了 docID 所在 chunk 的整个内容。
      state.reset(docID);
    }
    assert state.contains(docID);
    // 根据 docID 从 block 中取出具体的文档包含 store field 内容。见下面的 blockState.document 函数。
    return state.document(docID);
  }

获取 fdt 文件中的 block 及 chunk 的文件偏移量的过程:

 long getStartPointer(int docID) {
    if (docID < 0 || docID >= maxDoc) {
      throw new IllegalArgumentException("docID out of range [0-" + maxDoc + "]: " + docID);
    }
    // 二分法查找 docID 所属 block
    final int block = block(docID);
    // 二分法查找 docID 所属 block 中的 chunk
    final int relativeChunk = relativeChunk(block, docID - docBases[block]);
    // 返回该 chunk 的文件偏移量
    return startPointers[block] + relativeStartPointer(block, relativeChunk);
  }

blockState.document() 函数的细节:

   SerializedDocument document(int docID) throws IOException {
      if (contains(docID) == false) {
        throw new IllegalArgumentException();
      }

      final int index = docID - docBase; // 此处 docBase 为该 chunk 的起始文档编号,算出来的 index 为该文档在该 chunk 的位置。
      final int offset = offsets[index]; // offset 保存了每个文档 store fields 文件长度的偏移,在前面的 state.reset 中读取的。
      final int length = offsets[index+1] - offset; // 这里算出来的 length 就是该文档实际的文件长度
      final int totalLength = offsets[chunkDocs]; // 最后一个文档的长度就是当前 chunk 的总长度
      final int numStoredFields = this.numStoredFields[index]; // 该文档 store fields 的数量

      final DataInput documentInput;
      if (length == 0) {
        // empty
        documentInput = new ByteArrayDataInput();
      } else if (merging) {
        // already decompressed
        documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset + offset, length);
      } else if (sliced) {
        // 跨 chunk 切片的流程
        fieldsStream.seek(startPointer);
        decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
        ......
        };
      } else {
        // 跳转到该 chunk 的其实文件位置
        fieldsStream.seek(startPointer);
	    // decompress 函数会读取该完整 chunk 的内容并解压,然后提取 length 长度(即该文档 store fields 实际大小)
        decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
        assert bytes.length == length; // 确保解压读取的长度和 fdx 保存的长度一致
        documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
      }

      return new SerializedDocument(documentInput, length, numStoredFields);
    }

  }

合并(merge)流程

shard 中 segment 数量达到一定值的时候就会触发 merge 流程。

stored field merge 主要在 CompressingStoredFieldsWriter 的 merge 函数中完成。该函数接受一个 MergeState mergeState 对象,其包含了merge 需要的各个 segment 文件的 store field reader 对象。合并逻辑主要是遍历所有 segment 的 store fields reader 对象,读取所有 doc 的 store fields 重新走写入流程。

  public int merge(MergeState mergeState) throws IOException {
  
  ......
  // 主要的 merge 逻辑:
  // 遍历每个文件对应的 CompressingStoredFieldsIndexReader 对象,处理如下逻辑:
        } else {
        // optimized merge, we copy serialized (but decompressed) bytes directly
        // even on simple docs (1 stored field), it seems to help by about 20%
        
        // if the format is older, its always handled by the naive merge case above
        assert matchingFieldsReader.getVersion() == VERSION_CURRENT;
        // 检查文件的完整性
        matchingFieldsReader.checkIntegrity();

        // maxDo c为该 segment 中文档总数,docID 在 segment 中从0顺序递增分布。
        for (int docID = 0; docID < maxDoc; docID++) {
          if (liveDocs != null && liveDocs.get(docID) == false) {
            continue;
          }
          // 根据 docID 获取文档内容(包含 store fields,已解压)
          SerializedDocument doc = matchingFieldsReader.document(docID);
          startDocument();
          // 直接拷贝到新的 buffer 中合并保存
          bufferedDocs.copyBytes(doc.in, doc.length);
          numStoredFieldsInDoc = doc.numStoredFields;//字段数量
          finishDocument(); //结束文档,同写入逻辑
          ++docCount;
        }
      }
    }
    // 结束合并文档并写入磁盘,同写入逻辑
    finish(mergeState.mergeFieldInfos, docCount);

至此,Store Fields 的写入、读取、合并流程及其文件数据结构就分析完了,本文只分析了正常流程,暂未考虑其它异常分支流程。欢迎各位提出意见,一起交流学习!

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Elasticsearch实验室

Elasitcsearch 底层系列 Lucene 内核解析之 Stored Fields

Lucene 的 stored fields 主要用于行存文档需要保存的字段内容,每个文档的所有 stored fields 保存在一起,在查询请求需要返回字段...

6165
来自专栏JadePeng的技术博客

Angular快速学习笔记(4) -- Observable与RxJS

8402
来自专栏用户2442861的专栏

web.xml配置详解

1、web.xml学名叫部署描述符文件,是在Servlet规范中定义的,是web应用的配置文件。

3671
来自专栏分布式系统进阶

Kafka消息的磁盘存储Kafka源码分析-汇总

可以看到使用FileMessageSet来操作Log文件, 使用OffsetIndex来操作Index文件

2432
来自专栏jeremy的技术点滴

py3_cookbook_notes_03

2823
来自专栏JavaEdge

高性能队列——Disruptor总论1 背景2 Java内置队列3 ArrayBlockingQueue的问题4 Disruptor的设计方案代码样例性能等待策略Log4j 2应用场景

这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列 Disruptor特性限于3.3.4

2113
来自专栏IT可乐

mybatis源码解读(三)——数据源的配置

1353
来自专栏软件开发

前端MVC Vue2学习总结(七)——ES6与Module模块化、Vue-cli脚手架搭建、开发、发布项目与综合示例

使用vue-cli可以规范项目,提高开发效率,但是使用vue-cli时需要一些ECMAScript6的知识,特别是ES6中的模块管理内容,本章先介绍ES6中的基...

1616
来自专栏cnblogs

knockoutjs 上自己实现的flux

在knockoutjs 上实现 Flux 单向数据流 状态机,主要解决多个组件之间对数据的耦合问题。 一、其实简单 flux的设计理念和实现方案,很大程度上人借...

2208
来自专栏码云1024

MFC多线程

4486

扫码关注云+社区

领取腾讯云代金券