首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Kafka是如何检查文件完整性的

Kafka的存储层是顺序文件,如果你要是开发一个应用,自己写文件来保存状态,那么有一件事情你就必须关注:这个文件我写的完整么?文件有没有损坏?说到这里很多同学可能想到了checksum,就是类似使用MD5等哈希算法计算文件的哈希,下次读取的时候再算一遍,看看哈希是否对的上,那么Kafka又是怎么来确保文件的完整性的呢?

其实在前一篇文章中,已经提到过,Kafka启动的时候会检查每个目录是否有一个.kafka_cleanshutdown文件,如果存在这个文件说明Kafka是走正常的关闭逻辑关闭的,否则认为是异常关闭,异常关闭的时候会走一个恢复逻辑。

那么首先看Kafka的关闭逻辑。

Kafka.scala这个类里注册一个shutdown hook,在正常关闭的时候走shutdown逻辑。

Runtime.getRuntime().addShutdownHook(new Thread() {

override def run() = {

kafkaServerStartable.shutdown

}

})

这里我们最关注的是LogManager的shutdown。

log.flush很好理解,我们都知道在Linux,如果不是O_Direct这种方式打开的文件,write都是先写到page cache里的,Linux会在后台将内容flush到物理存储介质,比如磁盘。而如果调用flush方法则会强制同步的flush到磁盘,但是一般为了提高写性能,在接收消息的时候不会同步flush,所以Kafka停止的时候就会强制flush一次了。每次flush的时候Kafka都会更新一个叫recoveryPoint的offset到当前offset。这个recoveryPoint就代表每个partition哪个offset之前的已经flush到磁盘了,这个在每个日志目录里有一个recovery-point-offset-checkpoint的文件,这个文件会周期性刷新,记录最新的各个partition的recoverPoint。

然后是close就不说了。

做完这些事情如果没有任何问题后,Kafka会在每个日志创建一个.kafka_cleanshutdown的标识文件,文件内容是空的,就是起一个标识的作用,表示Kafka是走正常关闭逻辑。

以上就是Kafka的停止逻辑了,遗憾的是我们并没有看到Kafka写了什么checksum之类的,这个我们下文还会再次提到。

现在回到启动逻辑,在上一篇文章里我们稍微提到了,Kafka启动的时候如果检查日志目录里没有.kafka_cleanshutdown这个文件,就会进入一个recover逻辑。这个逻辑还是比较重的,不过还好的是他不是对所有日志文件都进行recover,还记得前面提到的recoveryPoint么,启动的时候会读取这个文件,这个文件里记录的就是上一次flush到磁盘时候的offset,Kafka会从这个offset开始recover。

那我们再来看这个recover逻辑主要干了啥呢,首先会把对应LogSegment的.index干掉,然后就开始遍历了,遍历消息就是顺序往下读取一个个消息了,Kafka中每个消息格式如下:

offset(8 bytes)

size(4 bytes)

MessageHeader(14 bytes)

Crc(4 bytes)

Magic(1 byte)

Attributes(1 byte)

KeySize(4 bytes)

ValueSize(4 bytes)

读取消息之后这个header里不是有一个crc么,这相当于每个消息的checksum了,这里会计算一下读取的buffer的crc和header里记录的crc相比较,如果不想等则校验不通过。那么这里其实就是两个校验逻辑:消息的大小是否大于配置的最大消息大小,crc校验是否通过。如果这里校验不通过则这个LogSegment就截断了,后面的整个都不要了。

不仅如此,假设这个partition有10个log文件,检查到第8个的时候有不合法的消息,那么从这个消息开始位置到这个log文件末尾内容全部丢弃,而且第9, 10两个log也删掉。嗯,这样删掉怎么办呢?如果这个topic有多个replicas,那是可以同步过来的。

recover逻辑执行完毕后,我们就可以将.kafka_cleanshutdown标识文件删掉了。

上面差不多就是Kafka确保文件完整的整个机制了,这个机制非常依赖flush操作,Kafka好像认为只要flush到磁盘,那么文件就不会损坏了。但其实这种机制是不够的,比如:

在Kafka正常停止后,人为破坏log文件,虽然有crc,但是仍然无法检查出来。因为正常关闭的,Kafka根本就不走recover逻辑

如果正常关闭后,磁盘出现故障,导致文件损坏,那么Kafka也是不会进行检测的,原理同上(这个在我们实际中确实碰到了)。

所以如果碰到上面这两种情况就比较悲剧了,因为启动的时候Kafka根本没有进行检查,而实际消费消息的时候又碰到文件损坏,那就会出错了,不过如果真遇到这种情况,我们是否就可以停止kafka,然后是删除.kafka_cleanshutdown文件,让Kafka做一次自检呢?

另外不知道为啥kafka不做完整文件的checksum呢?

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180307G0ISGR00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券