首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Kafka怎么不丢数据?

​ 在Kafka中broker一般有多个,它们组成一个高容错的集群。Broker的主要职责是接受producer和consumer的请求,并把消息持久化到本地磁盘。Broker以topic为单位将消息分布到不同的分区,每个分区可以有多个副本,通过数据冗余的方式实现容错。当partition存在多个副本时,只有一个是leader对外提供读写请求,其它均是follower不对外提供读写服务,只是同步leader中的数据,并在leader出现问题时,通过选举算法将其中的某一个提升为leader。

Broker以追加方式将消息写到磁盘文件,且每个分区中的消息被赋予了唯一整数标识偏移量offset,broker仅提供基于offset的读取方式,不维护各consumer当前已消费消息的offset值,而是由consumer各自维护当前offset。Consumer向broker读取数据时发送起始offset值,broker将之后的消息流式发送过去,Broker中保存的数据是有有效期的,一旦超过了有效期,对应的数据将被移除以释放磁盘空间,只要数据在有效期内,consumer可以重复消费。

Kafka Broker能够保证同一topic下同一partition内部的消息是有序的,但无法保证partition之间的消息全局有序,这意味着一个consumer读取某个topic下多个分区的消息时,和写入的顺序不一致。

一、存储

    Broker中以/tmp/kafka_logs来设置消息文件存储目录,如果有多个分区将会在/tmp/kafka_logs下生成多个partition目录,partition目录的命名规则:topic名_分区序号。每个partition物理上是由多个segment组成,segment文件由index和log文件组成,每个log文件的大小由log.segment.bytes控制,写满后两个文件的命名规则:当前文件最小的offset值为名,20位数字字符长度,不足用0补充,例如:00000000000000000123.index。索引文件中的元数据指向数据文件中message的物理偏移位置,例如:该index文件中元数据[6,856]表示在分区中第123+6=129个消息,物理偏移offset是865。

    每个分区内部的数据都是有序的,用一个offset来标识每一条数据的位置,但只是仅限于分区内的有序,offset不是message在分区中的实际存储位置,而是逻辑上的一个值,它唯一确定分区中的一条Message:

  • 相对offset:是该segment中的相对offset
  • Position:表示该条Message在数据文件中的绝对位置

每次有消息进入就往log文件中写入,那么就会造成大量的磁盘随机写入,所以引入数据缓存,将要写入的数据先缓存起来再统一写入,从而提升写入效率。kafka采用OS级别缓存pageCache,OS将闲置的memory用作disk caching,当数据写入时OS将数据写入pageCache,同时标记该page为dirty,当读取数据时,先从pageCache中查找,如果没有查到(发生缺页)则去磁盘中读取,由于数据在内存,存在系统down机内存数据丢失的风险。而对于broker来说数据只需要存储到内存,如果OS缓存失效就会导致kafka客户端commit的数据丢失,可以通过log.flush.interval.messages(一定的条数)和log.flush.interval.ms(一定的时间)设置强制写入到磁盘,所以kafka保证它存在于多个replica内存中,不保证被持久化到磁盘。

二、切分文件

index和log文件会存在多个文件,切分规则如下:

  • 当前日志分段文件的大小超过了 log.segment.bytes 配置的值(默认1GB)
  • 当前日志分段中消息的最大时间戳与系统的时间戳的差值大于log.roll.ms或log.roll.hours 的值(7天)
  • 索引文件的大小达到 log.index.size.max.bytes 配置的值默认值(10MB)

三、清理

清理日志实际上是清理过期的segment或者日志文件太大了需要删除最旧的数据,使得整体日志文件大小不超过指定的值,以维持日志文件的固定大小。log.cleanup.policy=delete表示采用删除日志的清理机制,默认5分钟执行一次日志删除,清理日志有三种策略:

  • 基于时间

配置log.retention.hours/minutes/ms(默认7天)参数来设定清除的时间,日志任务会清理超过指定阈值时间的segment文件,但是并不是根据最近修改时间(lastModifiedTime)来计算,而是根据Segment日志中最大的时间戳(largestTimeStamp)来计算,因为segment日志的lastModifiedTime可以被修改(分区副本进行了rebalance),lastModifiedTime并不能真实地反映出日志分段在磁盘的保留时间。

  • 基于日志大小

配置log.retention.bytes参数来设定清除的大小,日志任务会清理超过指定阈值大小的segment文件,该参数是日志文件的总大小,并不是单个Segment文件的大小。首先计算日志文件的总大小size和log.retention.size的差值diff,即需要删除的日志总大小,然后从日志文件中查找可删除的文件集合deletableSegments,之后就执行删除操作。

  • 基于起始偏移量

基于日志起始偏移量(logStartOffset)的删除策略依据是某segment日志的下一个segment的offset是否>=logStartOffset,若是则加入deletablesSegments。

上图中第1个和第2个Segment会加入deletablesSegments集合,然后被删除。删除策略是以topic为级别的,所以不同的topic可以设置不同的删除策略。

三、压缩

日志压缩确保Kafka会为一个topic分区数据日志中保留message key的最后一个值,它解决了应用crash或应用在操作期间重启来重新加载缓存的场景。

Kafka日志压缩机制是细粒度key级别的保留机制,而不是基于时间的粗粒度。压缩后的offset可能是不连续的,比如上图中没有5和7,因为这些offset的消息被merge了,当需要消费这些offset消息时,将会拿到比这个offset大的offset对应的消息。

日志压缩提供了如下保证:

  • 所有跟上消费的consumer能消费到所有写入的消息,这些消息有连续的序列号。Topic的min.compaction.lag.ms用于保证消息写入多久后才会被压缩
  • 消息的顺序会被保留。压缩不会重排序消息,只是移除其中一部分
  • 消息的offset不会变更。这是消息在日志中的永久标志
  • 任何从头开始处理日志的consumer至少会拿到每个key的最终状态

Kafka支持GZIP、Snappy、LZ4 三种压缩算法,可通过compression.type 设定具体的压缩算法。通过设置log.cleaner.enable=true启用cleaner(默认false),log.cleanup.policy=compact启用压缩策略(默认delete)。压缩算法是要占用很大一部分cpu资源的并且耗时也是不小的,而压缩的目的很大程度上是为了提升网络传输的性能。

四、分区

    Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),被分布在集群中的多个broker上。

分区机制

Producer在生产数据时可以为每条消息指定Key,消息被发送到broker时会根据分区规则选择被存储到哪一个分区,如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。

分区规则

默认情况下,Kafka根据消息的key来进行分区的分配,即hash(key) % numPartitions,如下图所示:

其中numPartitions就是Tpoic的分区总数。

在创建Topic时候可以配置num.partitions来指定默认的分区数。为topic创建分区时,分区数最好是broker数量的整数倍,这样才能使一个topic的分区均匀的分布在Kafka集群中。如果消息key为null时,那么producer将会把这条消息发送给随机的一个partition,然后把这个分区加入到缓存中以备后续的null key直接使用,但是会每隔10分钟清除缓存。当往Broker发送消息时修改了topic的分区数,producer可以在最多topic.metadata.refresh.interval.ms的时间之后动态感知到分区数的变化,并且可以将数据发送到新添加的分区中。

确定分区

通过将topic的消息分散到多个broker的多个分区,理论上一个topic的分区越多,整个集群的吞吐量就越大,但是每个分区都是自身的消耗,所以并不是分区越多越好,但是可以遵循一定的步骤来确定分区数:

创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量,假设它们的值分别是Tp和Tc,单位可以是MB/s,总的目标吞吐量是Tt,那么分区数 =  Tt / max(Tp, Tc)。

消费均衡

Consumer要确定从哪一个分区去取数据来消费,选择规则如下:

五、副本

分区的副本被称为replica,每个分区可以有多个副本,并且在副本集中会存在一个leader和多个follower,均匀的分布在多个broker。当leader节点挂掉之后,会从副本集中重新选出一个副本作为leader继续提供服务实现故障转移。副本的个数可以通过broker配置文件来设定,leader处理所有的read-write请求,follower只需要与leader同步数据即可。

副本同步中有ISR副本的概念,ISR副本是leader和所有能够与leader保持基本同步的follower副本集合,由leader维护。由于消息复制延迟受到最慢同步副本的限制,因此快速检测出慢副本并将其从 ISR 中删除非常重要。如果follow副本和leader数据同步速度过慢导致【落后太多】,该follower将会被剔除出ISR副本,由min.insync.replicas设置ISR中follower的最小个数。

同步机制

Producer将消息发送到partition的leader上并写入其本地log后,其他follower将从leader pull数据,producer需要等待request.required.acks个副本同步完成才算成功提交消息,该参数有如下配置:

  • acks=0 生产者无需等待leader返回确认,但是无法保证消息是否被leader收到
  • acks=1 生产者需要等待leader副本成功写入日志。
  • acks=-1 leader副本和所有follower都写入日志才会向producer发送确认信息

副本同步的主要参数:

num.replica.fetchers:从一个broker同步数据的拉取线程数,可增加该broker的IO并行度。默认值:1

replica.fetch.wait.max.ms:对于follower replica而言,每个fetch请求的最大间隔时间,这个值应该比replica.lag.time.max.ms要小,否则对于吞吐量特别低的topic可能会导致ISR频繁抖动。默认值:500

replica.lag.time.max.ms:超时时间,即当follower在该时间内没有发送fetch请求或者在这个时间内没有追上leader当前的log end offset,则将这个follower从ISR中移除。默认值:10000(10S)

replica.fetch.min.bytes:每次fetch请求最少拉取的数据量,如果不满足这个条件,那么要等待 replica.fetch.wait.max.ms。默认值:1

不同步的原因:

  • 慢副本

在一定周期时间内follower不能追赶上leader。最常见的原因之一是I/O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。

  • 卡住副本

在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。

  • 新启动副本

当用户给topic增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。

宕机恢复

  • 少部分副本宕机

当leader宕机了,会从follower选择一个作为leader,当宕机的旧leader重新恢复时,会把之前commit的数据清空,重新从新leader中pull数据。

  • 全部副本宕机

当全部副本宕机了有两种恢复方式:

等待ISR集合中的副本恢复后选举leader。等待时间较长,降低可用性

选择第一个恢复的副本作为新的leader,无论是否在ISR中,但是并未包含之前leader commit的数据,因此造成数据丢失

offset存储

Kafka新版本已默认将消费的offset迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。以消费的Group、Topic、Partition做为组合 Key。所有的消费offset都提交写入到上述的Topic中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking 级别设置为了 -1,生产者等到所有的 ISR 都收到消息后才会得到ACK。

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/98858eb1dc7224840bcae52db
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券