前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试系列-kafka事务控制

面试系列-kafka事务控制

作者头像
用户4283147
发布2022-12-29 20:03:51
6390
发布2022-12-29 20:03:51
举报
文章被收录于专栏:对线JAVA面试对线JAVA面试

kafka事务机制

  • kafka的事务机制,是kafka实现端到端有且仅有一次语义(end-to-end EOS)的基础;事务涉及到 transactional producer 和transactional consumer, 两者配合使用,才能实现端到端有且仅有一次的语义(end-to-end EOS),producer和consumer是解耦的,也可以使用非transactional的consumer来消费transactional producer生产的消息,但此时就丢失了事务ACID的支持;
  • 通过事务机制,kafka可以实现对多个topic的多个partition的原子性的写入,即处于同一个事务内的所有消息,不管最终需要落地到哪个topic的哪个partition, 最终结果都是要么全部写成功,要么全部写失败(Atomic multi-partition writes);kafka的事务机制,在底层依赖于幂等生产者,幂等生产者是kafka事务的必要不充分条件;
  • 事实上,开启kafka事务时,kafka会自动开启幂等生产者;

kafka事务支持的设计原理

Transaction Coordinator和Transaction Log:

  • transaction coordinator是kafka broker内部的一个模块,transaction coordinator负责对分区写操作进行控制,而transaction log是kakfa的一个内部topic, 所以kafka可以通过内部的复制协议和选举机制(replication protocol and leader election processes),来确保transaction coordinator的可用性和transaction state的持久性;transaction log topic内部存储的只是事务的最新状态和其相关元数据信息,kafka producer生产的原始消息,仍然是只存储在kafka producer指定的topic中;
  • Procedure就是和Transaction Coordinator交互获得TransactionID对应的任务状态。Transaction Coordinator还负责将事务写入kafka内部的一个topic,这样即使整个服务重启,由于事务状态得到保存,正在进行的事务状态可以得到恢复,从而继续进行;

kafka事务机制下读写流程

  • kafka生产者通过initTransactions API将 transactional.id注册到 transactional coordinator:此时,此时 coordinator会关闭所有有相同transactional.id 且处于pending状态的事务,同时也会递增epoch来屏蔽僵尸生产者 (zombie producers). 该操作对每个 producer session只执行一次(producer.initTransaction());
  • kafka生产者通过beginTransaction API开启事务,并通过send API发送消息到目标topic:此时消息对应的 partition会首先被注册到transactional coordinator,然后producer按照正常流程发送消息到目标topic,且在发送消息时内部会通过校验屏蔽掉僵尸生产者(zombie producers are fenced out.(producer.beginTransaction();producer.send()*N;);
  • kafka生产者通过commitTransaction API提交事务或通过abortTransaction API回滚事务:此时会向 transactional coordinator提交请求,开始两阶段提交协议 (producer.commitTransaction();producer.abortTransaction(););
    • 在两阶段提交协议的第一阶段,transactional coordinator 更新内存中的事务状态为 “prepare_commit”,并将该状态持久化到transaction log中;
    • 在两阶段提交协议的第二阶段, coordinator首先写transaction marker标记到目标topic的目标partition,这里的transaction marker,就是我们上文说的控制消息,控制消息共有两种类型:commit和abort,分别用来表征事务已经成功提交或已经被成功终止;
    • 在两阶段提交协议的第二阶段,coordinator在向目标topic的目标partition写完控制消息后,会更新事务状态为“commited” 或“abort”, 并将该状态持久化到transaction log中;
  • kafka消费者消费消息时可以指定具体的读隔离级别,当指定使用read_committed隔离级别时,在内部会使用存储在目标topic-partition中的事务控制消息,来过滤掉没有提交的消息,包括回滚的消息和尚未提交的消息;kafka消费者消费消息时也可以指定使用read_uncommitted隔离级别,此时目标topic-partition中的所有消息都会被返回,不会进行过滤;

kafka事务在应用程序的使用

配置修改
  • producer 配置项更改:
    • enable.idempotence = true
    • acks = “all”
    • retries > 1 (preferably MAX_INT)
    • transactional.id = ‘some unique id’
  • consumer 配置项更改:
    • 根据需要配置 isolation.level为 “read_committed”, 或 “read_uncommitted”;
程序层
代码语言:javascript
复制
/**
This specifies that the KafkaConsumer should only read non-transactional messages, 
or committed transactional messages from its input topics. 
*/
KafkaConsumer consumer = createKafkaConsumer(
  “bootstrap.servers”, “localhost:9092”,
  “group.id”, “my-consumerGroup-id”,
  "isolation.level", "read_committed");
consumer.subscribe(Collections.singleton(“inputTopic”));
/**
Consume some records, start a transaction, process the consumed records, 
write the processed records to the output topic, send the consumed offsets to 
the offsets topic, and finally commit the transaction. With the guarantees mentioned 
above, we know that the offsets and the output records will be committed as an atomic 
unit.
*/
while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  producer.beginTransaction();
  for (ConsumerRecord record : records)
    producer.send(new ProducerRecord(“outputTopic”, record));
    //This method should be used when you need to batch consumed and produced messages 
    //together, typically in a consume-transform-produce pattern.
    producer.sendOffsetsToTransaction(currentOffsets(consumer), my-consumerGroup-id);
  producer.commitTransaction();
}

kafka全局一致的transactional.id维护

transactional.id在kafka的事务机制中扮演了关键的角色,kafka正是基于该参数来过滤掉僵尸生产者的 (fencing out zombies);生产者事务引入了一个全局唯一的TransactionId,将Procedure获得的PID和TransactionID绑定,这样Producer重启后就可以获得当前正在进行事务的PID;

那么如何在跨session的众多producer中 (向同一个kafka集群中生产消息的producer有多个,这些producer还有可能会重启),选用一个全局一致的transactional.id,以互不影响呢?

大体的思路有两种:

  • 一是通过一个统一的外部存储,来记录生产者使用的transactional.id和该生产者涉及到的topic-partition之间的映射关系;
  • 二是通过某些静态编码机制来生成一个全局唯一的transactional.id;

使用transactional API,用户需要配置transactional.id,但不需要配置ProducerId,Kafka内部会自动生成并维护一个全局唯一的ProducerIds;

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-11-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 对线JAVA面试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kafka事务机制
  • kafka事务支持的设计原理
  • kafka事务机制下读写流程
  • kafka事务在应用程序的使用
    • 配置修改
      • 程序层
      • kafka全局一致的transactional.id维护
      相关产品与服务
      对象存储
      对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档