Kafka 的事务功能是为了支持在分布式环境中实现原子性操作而设计的。它允许生产者在发送消息时确保消息的完整性和一致性,特别是在需要多条消息作为一个整体进行处理的场景中。以下是 Kafka 事务的主要概念和功能介绍。
事务相关概念
事务的基本概念
原子性:事务中的所有操作要么全部成功,要么全部失败。Kafka 确保在事务中发送的消息要么被成功写入到主题中,要么不写入。
一致性:事务在执行前后,数据的状态应该保持一致。
隔离性:事务之间的操作是相互独立的,一个事务的执行不应影响其他事务的执行。
持久性:一旦事务被提交,其结果是永久性的,即使系统崩溃也不会丢失。
事务的工作流程
Kafka 的事务工作流程主要包括以下几个步骤:
1. 启动事务:生产者在发送消息之前调用
initTransactions()
方法来初始化事务。2. 发送消息:生产者可以发送多条消息到一个或多个主题,这些消息会被标记为事务性消息。
3. 提交或中止事务:
提交事务:如果所有消息都成功发送,生产者调用
commitTransaction()
方法来提交事务,所有消息将被写入到 Kafka。中止事务:如果在发送过程中发生错误,生产者可以调用
abortTransaction()
方法来中止事务,所有消息将不会被写入。事务的配置
要使用 Kafka 的事务功能,您需要在生产者配置中设置以下参数:
transactional.id
:每个事务性生产者都需要一个唯一的标识符。这个 ID 用于标识事务的所有消息。acks
:设置为 all 以确保所有副本都确认消息。enable.idempotence
:设置为 true
以启用幂等性,确保消息不会被重复发送。事务的限制
性能开销:使用事务会引入额外的性能开销,因为需要进行更多的协调和确认。
事务超时:Kafka 对事务有超时限制,默认情况下为 60 秒。如果事务在此时间内未提交或中止,将会被自动中止。
消费者的处理:消费者在处理事务性消息时需要注意,只有在事务提交后,消费者才能看到这些消息。
事务使用示例
producer
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;public class TransactionalProducerDemo {public static void main(String[] args) {// Kafka 配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka broker 地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id"); // 事务 IDprops.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性// 创建 Kafka 生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 初始化事务producer.initTransactions();try {// 开始事务producer.beginTransaction();// 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);RecordMetadata metadata = producer.send(record).get(); // 发送消息并等待确认System.out.printf("Sent message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), metadata.partition(), metadata.offset());}// 提交事务producer.commitTransaction();System.out.println("Transaction committed successfully.");} catch (Exception e) {// 如果发生异常,回滚事务producer.abortTransaction();System.err.println("Transaction aborted due to an error: " + e.getMessage());} finally {// 关闭生产者producer.close();}}}
consumer
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class TransactionalConsumerDemo {public static void main(String[] args) {// Kafka 配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka broker 地址props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组 IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只读取已提交的事务消息// 创建 Kafka 消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("my-topic"));try {while (true) {// 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}}} catch (Exception e) {e.printStackTrace();} finally {// 关闭消费者consumer.close();}}}
Kafka事务管理
在 Kafka 中,事务管理涉及到多个组件和数据结构,以确保事务的原子性和一致性。事务信息的内存占用主要与以下几个方面有关:
事务 ID 和 Producer ID
事务 ID:每个事务都有一个唯一的事务 ID,用于标识该事务。事务 ID 是由生产者在发送消息时指定的,通常是一个字符串。
Producer ID:每个生产者在连接到 Kafka 时会被分配一个唯一的 Producer ID。这个 ID 用于标识生产者的消息,并确保消息的顺序性和幂等性。
事务状态管理
Kafka 使用一个称为 事务状态日志 的内部主题来管理事务的状态。这个日志记录了每个事务的状态(如进行中、已提交、已中止)以及与该事务相关的消息。事务状态日志的管理涉及以下几个方面:
内存中的数据结构:Kafka 在内存中维护一个数据结构(例如哈希表或映射),用于存储当前活动的事务信息。这些信息包括事务 ID、Producer ID、事务状态、时间戳等。
持久化存储:事务状态日志会被持久化到磁盘,以确保在 Kafka 服务器重启或故障恢复时能够恢复事务状态。
事务信息的内存占用
事务信息的内存占用主要取决于以下两个因素:
活动事务的数量:当前正在进行的事务数量直接影响内存占用。每个活动事务都会在内存中占用一定的空间。
事务的元数据:每个事务的元数据(例如事务 ID、Producer ID、状态等)也会占用内存。具体的内存占用量取决于这些元数据的大小。
事务的清理
为了防止内存占用过高,Kafka 会根据配置的过期时间定期检查并清理已完成的事务,默认保留 7 天,过期删除。
事务常见的 FullGC / OOM 问题
从事务管理可以看出,事务信息会占用大量内存。其中影响事务信息占用内存大小的最直接的两个因素就是:事务 ID 的数量和 Producer ID 的数量。
其中事务 ID 的数量指的是客户端往 Broker 初始化、提交事务的数量,这个与客户端的事务新增提交频率强相关。
Producer ID 指的是 Broker 内每个 Topic 分区存储的 producer 状态信息,因此 Producer ID 的数量与 broker 的分区数量强相关。
在事务场景中,事务 ID和 Producer ID 强绑定,如果同一个和事务 ID 绑定的 Producer ID 往 broker 内所有的分区都发送消息,那么一个 broker 内的 Producer ID 的数量理论上最多能达到事务 ID 数量与 broker 内分区数量的乘积。假设一个实例下的事务 ID 数量为 t,一个 broker 下的分区数量为 p,那么 Producer ID 的数量最大能达到 t * p。
说明:
因此,假设一个 broker 下的事务 ID 数量为 t,平均事务内存占用大小为 tb,一个 broker 下的分区数量为 p,平均一个 Producer ID 占用大小为 pb,那么该 broker 内存中关于事务信息占用的内存大小为:t * tb + t * p * pb。
可以看出有两种场景可能会导致内存占用暴涨:
客户端频繁往实例初始化新增提交新的事务 ID。
同一个事务 ID 往多个分区发送数据,Producer ID 的叉乘数量会上涨的非常恐怖,很容易将内存打满。
说明:
因此,无论是对 Flink 客户端还是自己实现的事务 producer,都尽量避免这两种场景。例如对于 Flink,可以适当降低 checkpoint 的频率,以减小由于事务 ID 前缀+随机串计算的事务 ID 变化的频率。另外就是尽量保证同一个事务 ID 往同一个分区发送数据。
Flink使用事务注意事项
对于 Flink 有以下优化手段,来保证事务信息不会急剧膨胀:
客户端优化参数:Flink 加大
checkpoint
间隔Flink 生产任务可优化
sink.partitioner
为 Fixed
模式