Transaction Coordinator和Transaction Log:
/**
This specifies that the KafkaConsumer should only read non-transactional messages,
or committed transactional messages from its input topics.
*/
KafkaConsumer consumer = createKafkaConsumer(
“bootstrap.servers”, “localhost:9092”,
“group.id”, “my-consumerGroup-id”,
"isolation.level", "read_committed");
consumer.subscribe(Collections.singleton(“inputTopic”));
/**
Consume some records, start a transaction, process the consumed records,
write the processed records to the output topic, send the consumed offsets to
the offsets topic, and finally commit the transaction. With the guarantees mentioned
above, we know that the offsets and the output records will be committed as an atomic
unit.
*/
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records)
producer.send(new ProducerRecord(“outputTopic”, record));
//This method should be used when you need to batch consumed and produced messages
//together, typically in a consume-transform-produce pattern.
producer.sendOffsetsToTransaction(currentOffsets(consumer), my-consumerGroup-id);
producer.commitTransaction();
}
transactional.id在kafka的事务机制中扮演了关键的角色,kafka正是基于该参数来过滤掉僵尸生产者的 (fencing out zombies);生产者事务引入了一个全局唯一的TransactionId,将Procedure获得的PID和TransactionID绑定,这样Producer重启后就可以获得当前正在进行事务的PID;
那么如何在跨session的众多producer中 (向同一个kafka集群中生产消息的producer有多个,这些producer还有可能会重启),选用一个全局一致的transactional.id,以互不影响呢?
大体的思路有两种:
使用transactional API,用户需要配置transactional.id,但不需要配置ProducerId,Kafka内部会自动生成并维护一个全局唯一的ProducerIds;