前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka参数使用

kafka参数使用

作者头像
leobhao
发布2022-06-28 18:33:04
5240
发布2022-06-28 18:33:04
举报
文章被收录于专栏:涓流涓流

producer参数

producer发送消息到partition的过程:

客户端先将消息写入内存缓存, 多个消息形成一个个Batch, 然后send线程将多个Batch打包成一个request发送到kafka服务器上。

acks

acks参数是用来确认消息是否发送成功的。

(1) acks=0

acks设置为0时,producer只要将消息发送出去,还未写入leader partition当磁盘,即认为消息发送成功。这个情况很容易造成消息丢失

(2) acks=1

这是kafka的默认配置,当acks设置为1的时候,只有 leader partition 接收到消息并成功写入磁盘才算发送成功。但是这种情况并不能保证高可用,如果leader写入了磁盘,follower还未来得及同步leader就宕机了,这样消息也会丢失

(3) acks=all

acks等于all的情况下,只有在 ISR 同步列表里面的 全部follower把消息同步过去,才算发送成功。

buffer.memory

内存缓冲区大小,这个参数到默认大小是 32MB

Kafka的客户端发送数据到服务器,一般都是要经过缓冲的,也就是说,你通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的。

如果这个内存设置太小,kafka写入消息很快将内存写满,就会阻塞用户线程无法往里面写入消息。

batch.size

多少个消息可以打成一个batch, 这个参数默认是16KB

加大这个参数可以提高吞吐率,如果将参数改为32KB甚至更大,一个request包就可以放入更多的消息。但是如果设置的过大,发送的消息就会有延迟,没法及时凑满一个batch

linger.ms

当一个batch被创建后,过了linger.ms毫秒后,无论batch有没有凑满,都会将这个batch发送出去。这就避免了一个batch无法凑满,导致消息延迟和堆积。

max.request.size

这个参数决定了每次发送给Kafka服务器请求的最大大小,同时也会限制你一条消息的最大大小也不能超过这个参数设置的值。默认是1MB

如果业务中的消息都是大的报文,就需要适当调整这个参数了

retries和retries.backoff.ms

这两个参数决定了一个请求失败了可以重试几次,每次重试的间隔是多少毫秒。

consumer参数

在consumer参数说明前,先说明consumer的提交消息和offset的保存

consumer 提交(commit)与位移(offset)

consumer调用 poll 时,会返回没有消费当消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)

rebalance 重平衡

在正常情况下,消费者会发送分区的提交信息到Kafka,Kafka进行记录。当消费者宕机或者新消费者加入时,Kafka会进行重平衡(rebalance),这会导致消费者负责之前并不属于它的分区。重平衡完成后,消费者会重新获取分区的位移。这时就可能产生两种情况:

  1. 假如一个消费者在重平衡前后都负责某个分区,如果提交位移比之前实际处理的消息位移要小,那么会导致消息重复消费
  2. 假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后Kafka进行重平衡,新的消费者负责此分区并读取提交位移,此时会“丢失”消息

可以看出,提交offset的方式对消息消费记录有着比较大对影响。

consumer 提交的方式
  1. 自动提交

这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。

需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

  1. 提交当前位移

设置auto.commit.offset为false,那么应用需要自己通过调用commitSync()来主动提交位移,该方法会提交poll返回的最后位移。

为了避免消息丢失,我们应当在完成业务逻辑后才提交位移。而如果在处理消息时发生了重平衡,那么只有当前poll的消息会重复消费。下面是一个自动提交的代码样例:

代码语言:javascript
复制
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        // 处理业务逻辑.......
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e)
    }
}
  1. 异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的APIcommitAsync()。以下为使用异步提交的方式,应用发了一个提交请求然后立即返回:

代码语言:javascript
复制
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    // 异步提交
    consumer.commitAsync();
}
fetch.message.max.bytes

这个参数是Consumer每次发起获取消息请求的时候,会发往给broker端指导broker端每次读取partition日志时的最大消息长度。这个值越大有利于减少日志读取的次数,提升broker端获取数据的速度,但越大占的内存也越大。

fetch.min.bytes

这个参数允许消费者指定从broker读取消息时最小的数据量。当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力

enable.auto.commit

这个参数指定了消费者是否自动提交消费位移,默认为true。如果需要减少重复消费或者数据丢失,你可以设置为false

最佳实践

消息不丢失保证
  1. 不要使用producer.send(msg),而要使用producer.send(msg, callback)。记住,一定要使用带有回调通知的send方法。
  2. 设置acks = all。acks是Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置retries为一个较大的值。这里的retries同样是Producer的参数,对应前面提到的Producer自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0的Producer能够自动重试消息发送,避免消息丢失。
  4. 设置unclean.leader.election.enable = false。这是Broker端的参数,它控制的是哪些Broker有资格竞选分区的Leader。如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,必然会造成消息的丢失。故一般都要将该参数设置成false,即不允许这种情况的发生。
  5. 设置replication.factor >= 3。这也是Broker端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置min.insync.replicas > 1。这依然是Broker端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于1可以提升消息持久性。在实际环境中千万不要使用默认值1。
  7. 确保replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1。
  8. 确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。就像前面说的,这对于单Consumer多线程处理的场景而言是至关重要的。

参考资料

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • producer参数
    • acks
      • buffer.memory
        • batch.size
          • linger.ms
            • max.request.size
              • retries和retries.backoff.ms
              • consumer参数
                • consumer 提交(commit)与位移(offset)
                  • rebalance 重平衡
                  • consumer 提交的方式
                • fetch.message.max.bytes
                  • fetch.min.bytes
                    • enable.auto.commit
                    • 最佳实践
                      • 消息不丢失保证
                      • 参考资料
                      相关产品与服务
                      批量计算
                      批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档