前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka —— 如何保证消息不会丢失

Kafka —— 如何保证消息不会丢失

原创
作者头像
solve
修改2019-11-26 18:21:46
1.4K0
修改2019-11-26 18:21:46
举报
文章被收录于专栏:大数据技术栈大数据技术栈

前言

Kafka 提供了数据高可靠的特性,

但是如果使用不当,

你可能无法享受到这一特性,

今天我们就来看看如何正确的使用Kafka 保证数据的不会丢失吧!

生产者的正确的消息发送方式

Kafka为生产者生产消息提供了一个 send(msg) 方法,

另有一个重载的方法send(msg, callback),

  • send(msg) 该方法可以将一条消息发送出去, 但是对发送出去的消息没有掌控能力, 无法得知其最后是不是到达了Kafka, 所以这是一种不可靠的发送方式, 但是也因为客户端只需要负责发送, 所以具有较好的性能。 Future<RecordMetadata> future = producer.send(record) 上面的示例代码也可以看到,send返回的是一个 Future, 也就是说其实你是可以 Future.get()获取返回值的, 但这种同步的方式,基本上可以说是不会用到。
  • send(msg, callback) 该方法可以将一条消息发送出去, 并且可以从callback回调中得到该条消息的发送结果, 并且callback是异步回调, 所以在兼具性能的情况下, 也对消息具有比较好的掌控。
代码语言:txt
复制
    ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
    producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null) {
                          e.printStackTrace();
                       } else {
                          System.out.println("The offset of the record we just sent is: "
                          + metadata.offset());
                       }
                   }
               });
  • 综上,如果要使数据不丢失, 首先你就的使用 send(msg, callback)来发送消息, 绝大多数情况下,我也建议你这么做。

生产者的配置

当我们通过 send(msg, callback) 是不是就意味着消息一定不丢失了呢?

答案明显是:不是的

我们接着上面,

send(msg, callback)里面 callback返回的成功,

到底是不是真的确保消息万无一失了?

其实这个返回的成功也是可以在生产者配置的:

代码语言:txt
复制
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
//*******重点*****************
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
 producer.close();

这段代码是生产者发送消息的一个例子,

其中没使用callback主要是这里callback不是重点,

我们的重点是props.put("acks", "all");

这个acks配置属性就是我们callback成功的具体含义:

  • acks=0 acks = 0如果设置为零,那么生产者将完全不会管服务器是否收到消息。 该记录将立即添加到套接字缓冲区中并视为已发送。 并且重试配置不会生效(因为客户端通常不会知道任何故障)。 返回值的偏移量将始终等于 -1。 该方式具有最大的吞吐量, 一般建议直接配合 send(msg)使用。
  • acks=1 当leader接受到消息就会直接给客户端返回成功, 一般情况下这种模式都能很好的保证数据的不丢失, 只有在laeder接受到数据, 然后还没来得及同步到follower, 就挂掉了才会导致数据的丢失, 这种概率还是比较小的。 这也是默认的选择方式, 兼具较好的吞吐和较高的可靠性
  • acks=all 或者 acks=-1 当leader接受到消息,并同步到了一定数量的follower, 才向生产者发生成功的消息, 同步到的follower数量由 broker 端的 min.insync.replicas 决定 除非一些不可抗力因素, 这种方式基本可以确保数据的完全不丢失。

Broker 端的配置

其实到这里,生产者端基本已经做好了数据不丢失的大部分准备,

但是有些东西是要配合 Broker 端一起,

才能达到预期的不丢失数据的,

比如我们上面说到的

  • min.insync.replicas 配置 我们上面知道了, 当 生产者 acks = -1 的时候, 写入的副本数就必须 >= min.insync.replicas 数, 当达不到这个要求的时候, 生产者端会收到一个either NotEnoughReplicas or NotEnoughReplicasAfterAppend的异常。 所以我们这个参数必须不能大于 replication.factor 副本数。 否则生产者将无法写入任何数据, 一般建议 replication.factor 数要大于 min.insync.replicas, 比如3个机器的集群,设置 replication.factor = 3, 那么设置 min.insync.replicas = 2 就可以了, 这样既保证了数据写入的时候有一个副本的冗余, 也能保证在一些情况下, 某台Broker宕机导致数据无法达到3个副本时, 依然可以正常写入数据。
  • unclean.leader.election.enable 这里 Broker 端还有一个重要的配置就是 unclean.leader.election.enable = false 这个配置代表着一些数据落后比较多的 follower, 是否能在leader宕机后被选举成新的 leader 如果你设置成 true, 很明显,如果这样的follower成为新leader, 就会造成最新的一部分数据丢失掉,

重试 retries

上面已经基本完成了不丢数据的方方面面了,

但是有些东西不是我们能控制的,

比如 网络抖动 等不可抗拒的因素,

这时候重试次数就很关键了,

配置合适的retries重试次数,

和 合适的retry.backoff.ms重试间隔时间,

将为我们的数据发送提供更高的稳定性,

当然如果实在发送不成功,怎么办呢?

一般我们也可以把发送不成功的数据保存在一个日志文件,

如果数据很重要,那就发送警告信息,

人工干预一下。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 生产者的正确的消息发送方式
  • 生产者的配置
  • Broker 端的配置
  • 重试 retries
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档