适用于需要数据索引量不大的场景,当索引量过大时需要使用 ES、Solr 等全文搜索服务器实现搜索功能。
本文旨在分享 Lucene 搜索引擎的源码阅读和功能开发中的经验,Lucene 采用 7.3.1 版本。
索引的生成分为两个部分:
1. 创建阶段:
2. 搜索阶段:
索引创建及搜索流程如下图所示:
Lucene 的基础层次结构由索引、段、文档、域、词五个部分组成。正向索引的生成即为基于 Lucene 的基础层次结构一级一级处理文档并分解域存储词的过程。
索引文件层级关系如图 1 所示:
Lucene 全文索引的核心是基于倒排索引实现的快速索引机制。
倒排索引原理如图 2 所示,倒排索引简单来说就是基于分析器将文本内容进行分词后,记录每个词出现在哪篇文章中,从而通过用户输入的搜索词查询出包含该词的文章。
问题:上述倒排索引使用时每次都需要将索引词加载到内存中,当文章数量较多,篇幅较长时,索引词可能会占用大量的存储空间,加载到内存后内存损耗较大。
解决方案:从 Lucene4 开始,Lucene 采用了 FST 来减少索引词带来的空间消耗。
FST(Finite StateTransducers),中文名有限状态机转换器。其主要特点在于以下四点:
具体存储方式如图 3 所示:
倒排索引相关文件包含.tip、.tim 和.doc 这三个文件,其中:
Lucene 利用倒排索引定位需要查询的文档号,通过文档号搜索出文件后,再利用词权重等信息对文档排序后返回。
文件格式如图 4 所示:
上文主要讲解 Lucene 的工作原理,下文将阐述 Java 中 Lucene 执行索引、查询等操作的相关代码。
Lucene 项目中文本的解析,存储等操作均由 IndexWriter 类实现,IndexWriter 文件主要由 Directory 和 IndexWriterConfig 两个类构成,其中:
Directory:用于指定存放索引文件的目录类型。既然要对文本内容进行搜索,自然需要先将这些文本内容及索引信息写入到目录里。Directory 是一个抽象类,针对索引的存储允许有多种不同的实现。常见的存储方式一般包括存储有本地(FSDirectory),内存(RAMDirectory)等。 IndexWriterConfig:用于指定 IndexWriter 在文件内容写入时的相关配置,包括 OpenMode 索引构建模式、Similarity 相关性算法等。
IndexWriter 具体是如何操作索引的呢?让我们来简单分析一下 IndexWriter 索引操作的相关源码。
a. Lucene 会为每个文档创建 ThreadState 对象,对象持有 DocumentWriterPerThread 来执行文件的增删改操作;
ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) { ThreadState threadState = null; synchronized (this) { if (freeList.isEmpty()) { // 如果不存在已创建的空闲ThreadState,则新创建一个 return newThreadState(); } else { // freeList后进先出,仅使用有限的ThreadState操作索引 threadState = freeList.remove(freeList.size()-1);
// 优先使用已经初始化过DocumentWriterPerThread的ThreadState,并将其与当前 // ThreadState换位,将其移到队尾优先使用 if (threadState.dwpt == null) { for(int i=0;i<freeList.size();i++) { ThreadState ts = freeList.get(i); if (ts.dwpt != null) { freeList.set(i, threadState); threadState = ts; break; } } } } } threadState.lock(); return threadState;}
复制代码
b. 索引文件的插入:DocumentWriterPerThread 调用 DefaultIndexChain 下的 processField 来处理文档中的每个域,processField 方法是索引链的核心执行逻辑。通过用户对每个域设置的不同的 FieldType 进行相应的索引、分词、存储等操作。FieldType 中比较重要的是 indexOptions:
// 构建倒排表
if (fieldType.indexOptions() != IndexOptions.NONE) { fp = getOrAddField(fieldName, fieldType, true); boolean first = fp.fieldGen != fieldGen; // field具体的索引、分词操作 fp.invert(field, first);
if (first) { fields[fieldCount++] = fp; fp.fieldGen = fieldGen; }} else { verifyUnIndexedFieldType(fieldName, fieldType);}
// 存储该field的storeFieldif (fieldType.stored()) { if (fp == null) { fp = getOrAddField(fieldName, fieldType, false); } if (fieldType.stored()) { String value = field.stringValue(); if (value != null && value.length() > IndexWriter.MAX_STORED_STRING_LENGTH) { throw new IllegalArgumentException("stored field \"" + field.name() + "\" is too large (" + value.length() + " characters) to store"); } try { storedFieldsConsumer.writeField(fp.fieldInfo, field); } catch (Throwable th) { throw AbortingException.wrap(th); } }}
// 建立DocValue(通过文档查询文档下包含了哪些词)DocValuesType dvType = fieldType.docValuesType();if (dvType == null) { throw new NullPointerException("docValuesType must not be null (field: \"" + fieldName + "\")");}if (dvType != DocValuesType.NONE) { if (fp == null) { fp = getOrAddField(fieldName, fieldType, false); } indexDocValue(fp, dvType, field);}if (fieldType.pointDimensionCount() != 0) { if (fp == null) { fp = getOrAddField(fieldName, fieldType, false); } indexPoint(fp, field);}
复制代码
c. 解析 Field 首先需要构造 TokenStream 类,用于产生和转换 token 流,TokenStream 有两个重要的派生类 Tokenizer 和 TokenFilter,其中 Tokenizer 用于通过 java.io.Reader 类读取字符,产生 Token 流,然后通过任意数量的 TokenFilter 来处理这些输入的 Token 流,具体源码如下:
// invert:对Field进行分词处理首先需要将Field转化为TokenStreamtry (TokenStream stream = tokenStream = field.tokenStream(docState.analyzer, tokenStream))// TokenStream在不同分词器下实现不同,根据不同分词器返回相应的TokenStreamif (tokenStream != null) { return tokenStream;} else if (readerValue() != null) { return analyzer.tokenStream(name(), readerValue());} else if (stringValue() != null) { return analyzer.tokenStream(name(), stringValue());}
public final TokenStream tokenStream(final String fieldName, final Reader reader) { // 通过复用策略,如果TokenStreamComponents中已经存在Component则复用。 TokenStreamComponents components = reuseStrategy.getReusableComponents(this, fieldName); final Reader r = initReader(fieldName, reader); // 如果Component不存在,则根据分词器创建对应的Components。 if (components == null) { components = createComponents(fieldName); reuseStrategy.setReusableComponents(this, fieldName, components); } // 将java.io.Reader输入流传入Component中。 components.setReader(r); return components.getTokenStream();}
复制代码
d. 根据 IndexWriterConfig 中配置的分词器,通过策略模式返回分词器对应的分词组件,针对不同的语言及不同的分词需求,分词组件存在很多不同的实现。
以 StandardAnalyzer(标准分词器)为例:
// 标准分词器创建Component过程,涵盖了标准分词处理器、Term转化小写、常用词过滤三个功能protected TokenStreamComponents createComponents(final String fieldName) { final StandardTokenizer src = new StandardTokenizer(); src.setMaxTokenLength(maxTokenLength); TokenStream tok = new StandardFilter(src); tok = new LowerCaseFilter(tok); tok = new StopFilter(tok, stopwords); return new TokenStreamComponents(src, tok) { @Override protected void setReader(final Reader reader) { src.setMaxTokenLength(StandardAnalyzer.this.maxTokenLength); super.setReader(reader); } };}
复制代码
e. 在获取 TokenStream 之后通过 TokenStream 中的 incrementToken 方法分析并获取属性,再通过 TermsHashPerField 下的 add 方法构建倒排表,最终将 Field 的相关数据存储到类型为 FreqProxPostingsArray 的 freqProxPostingsArray 中,以及 TermVectorsPostingsArray 的 termVectorsPostingsArray 中,构成倒排表;
// 以LowerCaseFilter为例,通过其下的increamentToken将Token中的字符转化为小写public final boolean incrementToken() throws IOException { if (input.incrementToken()) { CharacterUtils.toLowerCase(termAtt.buffer(), 0, termAtt.length()); return true; } else return false;}
复制代码
try (TokenStream stream = tokenStream = field.tokenStream(docState.analyzer, tokenStream)) { // reset TokenStream stream.reset(); invertState.setAttributeSource(stream); termsHashPerField.start(field, first); // 分析并获取Token属性 while (stream.incrementToken()) { …… try { // 构建倒排表 termsHashPerField.add(); } catch (MaxBytesLengthExceededException e) { …… } catch (Throwable th) { throw AbortingException.wrap(th); } } ……}
复制代码
a. Lucene 下文档的删除,首先将要删除的 Term 或 Query 添加到删除队列中;
synchronized long deleteTerms(final Term... terms) throws IOException { // TODO why is this synchronized? final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; // 文档删除操作是将删除的词信息添加到删除队列中,根据flush策略进行删除 long seqNo = deleteQueue.addDelete(terms); flushControl.doOnDelete(); lastSeqNo = Math.max(lastSeqNo, seqNo); if (applyAllDeletes(deleteQueue)) { seqNo = -seqNo; } return seqNo;}
复制代码
b. 根据 Flush 策略触发删除操作;
private boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { // 判断是否满足删除条件 --> onDelete if (flushControl.getAndResetApplyAllDeletes()) { if (deleteQueue != null) { ticketQueue.addDeletes(deleteQueue); } // 指定执行删除操作的event putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge return true; } return false;}
复制代码
public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { // 判断并设置是否满足删除条件 if ((flushOnRAM() && control.getDeleteBytesUsed() > 1024*1024*indexWriterConfig.getRAMBufferSizeMB())) { control.setApplyAllDeletes(); if (infoStream.isEnabled("FP")) { infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBufferMB=" + indexWriterConfig.getRAMBufferSizeMB()); } }}
复制代码
文档的更新就是一个先删除后插入的过程,本文就不再做更多赘述。
文档写入到一定数量后,会由某一线程触发 IndexWriter 的 Flush 操作,生成段并将内存中的 Document 信息写到硬盘上。Flush 操作目前仅有一种策略:FlushByRamOrCountsPolicy。FlushByRamOrCountsPolicy 主要基于两种策略自动执行 Flush 操作:
其中 activeBytes() 为 dwpt 收集的索引所占的内存量,deleteByteUsed 为删除的索引量。
@Overridepublic void onInsert(DocumentsWriterFlushControl control, ThreadState state) { // 根据文档数进行Flush if (flushOnDocCount() && state.dwpt.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { // Flush this state by num docs control.setFlushPending(state); // 根据内存使用量进行Flush } else if (flushOnRAM()) {// flush by RAM final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); final long totalRam = control.activeBytes() + control.getDeleteBytesUsed(); if (totalRam >= limit) { if (infoStream.isEnabled("FP")) { infoStream.message("FP", "trigger flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit); } markLargestWriterPending(control, state, totalRam); } }}
复制代码
将内存信息写入索引库。
索引的 Flush 分为主动 Flush 和自动 Flush,根据策略触发的 Flush 操作为自动 Flush,主动 Flush 的执行与自动 Flush 有较大区别,关于主动 Flush 本文暂不多做赘述。需要了解的话可以跳转链接。
索引 Flush 时每个 dwpt 会单独生成一个 segment,当 segment 过多时进行全文检索可能会跨多个 segment,产生多次加载的情况,因此需要对过多的 segment 进行合并。
段合并的执行通过 MergeScheduler 进行管理。mergeScheduler 也包含了多种管理策略,包括 NoMergeScheduler、SerialMergeScheduler 和 ConcurrentMergeScheduler。
1) merge 操作首先需要通过 updatePendingMerges 方法根据段的合并策略查询需要合并的段。段合并策略分为很多种,本文仅介绍两种 Lucene 默认使用的段合并策略:TieredMergePolicy 和 LogMergePolicy。
private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
final MergePolicy.MergeSpecification spec; // 查询需要合并的段 if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) { assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED : "Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this); newMergesFound = spec != null; if (newMergesFound) { final int numMerges = spec.merges.size(); for(int i=0;i<numMerges;i++) { final MergePolicy.OneMerge merge = spec.merges.get(i); merge.maxNumSegments = maxNumSegments; } } } else { spec = mergePolicy.findMerges(trigger, segmentInfos, this); } // 注册所有需要合并的段 newMergesFound = spec != null; if (newMergesFound) { final int numMerges = spec.merges.size(); for(int i=0;i<numMerges;i++) { registerMerge(spec.merges.get(i)); } } return newMergesFound;}
复制代码
2)通过 ConcurrentMergeScheduler 类中的 merge 方法创建用户合并的线程 MergeThread 并启动。
@Overridepublic synchronized void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { …… while (true) { …… // 取出注册的后选段 OneMerge merge = writer.getNextMerge(); boolean success = false; try { // 构建用于合并的线程MergeThread final MergeThread newMergeThread = getMergeThread(writer, merge); mergeThreads.add(newMergeThread);
updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter);
if (verbose()) { message(" launch new thread [" + newMergeThread.getName() + "]"); } // 启用线程 newMergeThread.start(); updateMergeThreads();
success = true; } finally { if (!success) { writer.mergeFinish(merge); } } }}
复制代码
3)通过 doMerge 方法执行 merge 操作;
public void merge(MergePolicy.OneMerge merge) throws IOException { …… try { // 用于处理merge前缓存任务及新段相关信息生成 mergeInit(merge); // 执行段之间的merge操作 mergeMiddle(merge, mergePolicy); mergeSuccess(merge); success = true; } catch (Throwable t) { handleMergeException(t, merge); } finally { // merge完成后的收尾工作 mergeFinish(merge) }……}
复制代码
Lucene 想要执行搜索首先需要将索引段加载到内存中,由于加载索引库的操作非常耗时,因此仅有当索引库产生变化时需要重新加载索引库。
加载索引库分为加载段信息和加载文档信息两个部分:
1)加载段信息:
2)加载文档信息:
索引库加载完成后需要 IndexReader 封装进 IndexSearch,IndexSearch 通过用户构造的 Query 语句和指定的 Similarity 文本相似度算法(默认 BM25)返回用户需要的结果。通过 IndexSearch.search 方法实现搜索功能。
搜索:Query 包含多种实现,包括 BooleanQuery、PhraseQuery、TermQuery、PrefixQuery 等多种查询方法,使用者可根据项目需求构造查询语句
排序:IndexSearch 除了通过 Similarity 计算文档相关性分值排序外,也提供了 BoostQuery 的方式让用户指定关键词分值,定制排序。Similarity 相关性算法也包含很多种不同的相关性分值计算实现,此处暂不做赘述,读者有需要可自行网上查阅。
Lucene 作为全文索引工具包,为中小型项目提供了强大的全文检索功能支持,但 Lucene 在使用的过程中存在诸多问题:
Lucene 使用时存在诸多限制,使用起来也不那么方便,当数据量增大时还是尽量选择 ElasticSearch 等分布式搜索服务器作为搜索功能的实现方案。
作者:vivo 互联网服务器团队-Qian Yulun
领取专属 10元无门槛券
私享最新 技术干货