NiFi使用预写日志来跟踪FlowFiles(即数据记录)在系统中流动时的变化。该预写日志跟踪FlowFiles本身的更改,例如FlowFile的属性(组成元数据的键/值对)及其状态,再比如FlowFile所属的Connection /Queue。
在这里,我们将描述用于实现此功能的实现细节和算法。
什么是预写日志
预写日志(WAL,Write Ahead Log)是关系型数据库中用于实现事务性和持久性的一系列技术,ARIES是WAL系列技术常用的算法。简单来说就是,做一个操作之前先讲这件事情记录下来。
为什么要使用WAL
可以为非内存型数据提升极高的效率,真正的执行操作可能数据量会比较大,操作比较繁琐,并且写数据不一定是顺序写,所以如果每一次操作都要等待结果flush到可靠存储(比如磁盘)中才执行下一步操作的话,效率就太低了。换一种思路,如果我们在做真正的操作之前,先将这件事记录下来,持久化到可靠存储中(因为日志一般很小,并且是顺序写,效率很高),然后再去执行真正的操作。
保证了数据的完整性,在硬盘数据不损坏的情况下,预写式日志允许存储系统在崩溃后能够在日志的指导下恢复到崩溃前的状态,避免数据丢失
Apache NiFi的 Write-Ahead Log 实现
术语定义
- SerDe: 序列化/反序列化记录以及更新记录的接口
- TransactionID Generator: 是一个AtomicLong,用于在编写以编辑每个交易的日志或snapshot时指示交易ID
Writing to the Write-Ahead Log
- 验证记录是否已恢复('restored flag'设置为true)。如果不是,则抛出IllegalStateException
- 获取repo共享锁 (read lock)
- 声明一个当前未使用的分区
- 增加AtomicLong和mod的分区数 -> partitionIndex
- 尝试声明(获得写锁定)partition [partitionIndex]。如果不成功,返回???。
- 如果没有用于编辑日志的输出流,创建输出流并编写SerDe类名称和版本
- 获取ID(增量AtomicLong)并写入编辑日志
- 将更新写入分区
- 序列化更新内容到record
- 如果有更多记录,则写入TransactionContinue标记;返回上一步,否则到下一步
- 写事务提交标记
- 更新全局记录Map以保存最新版本的记录
- 释放分区声明
- 释放共享锁
Checkpointing the Write-Ahead Log
- 获取互斥锁(写锁),是的任何分区无法被更新
- 创建.partial文件
- 编写SerDe类名称和版本
- 写入当前的最大事务ID
- 在全局记录Map中写入记录数
- 对于每个记录,序列化记录
- 关闭.partial文件的输出流
- 删除当前的'snapshot'文件
- 将.partial文件重命名为'snapshot'
- 清除所有分区/编辑日志:对于每个分区:
- 关闭文件输出流
- 创建新的输出流到文件,指明Truncate,而不是append。
- 编写SerDe类名称和版本
- 释放写锁
Restoring from the Write-Ahead Log
- 获取互斥锁(写锁),以便无法更新任何分区
- 从snapshot还原
- 如果两个文件都不存在,则没有要还原的snapshot。移至4。
- 如果只有snapshot文件,我们在不创建snapshot的情况下向下面步骤继续执行。
- 如果.partial文件存在且snapshot存在,则在创建snapshot时会崩溃。所以要删除.partial文件。
- 如果只有.partial文件存在,我们在创建.partial文件并删除snapshot之后再将.partial文件重命名为snapshot。
- 检查snapshot和.partial文件
- 打开InputStream到snapshot文件
- 读取SerDe类名称和版本
- 读取最大事务ID
- 读取snapshot中的记录数
- 对于snapshot中的每个记录,反序列化记录并更新全局记录Map
- 通过设置为从snapshot读取的最大事务ID来更新TransactionID生成器(原子长)+ 1
- 对于每个分区:
- 阅读交易ID。
- 如果是EOF,请完成还原分区。
- 如果交易ID小于交易ID生成器的值,请读取该交易的数据并丢弃。转到 3-1
- 确定哪个分区读取的最小事务ID大于或等于TransactionID生成器。
- 从分区还原事务(调用SerDe#deserializeRecord,包括用于写入文件的SerDe的版本。这样,如果实现发生更改,我们仍然可以还原数据)。
- 检查还原是否成功
- 如果成功,请更新全局记录Map以反映已还原记录的新状态。
将TransactionID生成器更新为在第5步骤中恢复的事务的TransactionID+1。从编辑日志中读取下一个事务ID。
- 如果未成功(意外的EOF),则放弃事务并提醒EOF。
- 重复4-6,直到所有分区都已还原。
- 如果有任何分区表明出现意外的EOF,则在更正此分区之前,我们无法写入该分区, 因此在允许任何更新之前执行Checkpoint. 这将导致编辑日志被删除。如果无法检查点,则抛出IOException,指示还原失败。确保释放写锁定!
- 对于每个分区,打开输出流以进行追加。
- 将 'restored' 标志设置为true
- 释放写锁
参考:
https://blog.csdn.net/winwill2012/article/details/71719106
https://cwiki.apache.org/confluence/display/NIFI/NiFi%27s+Write-Ahead+Log+Implementation