整理 | 纯粹技术分享
这篇文章主要讲述 Kafka 事务性相关原理,从 Kafka EOS 语义、幂等性、事务性等几个方面阐述。
Kafka EOS 语义
EOS(Exactly Once Semantics,精确一次处理语义)是从 Kafka 0.11.0.0 版本开始支持的,之前版本中只支持 At Least Once 和 At Most Once 语义,并不支持 Exactly Once 语义。
因为在很多要求严格的场景下,如使用 Kafka 处理交易数据,Exactly Once 语义是必须的。我们可以通过让下游系统具有幂等性来配合 Kafka 的 At Least Once 语义来间接实现 Exactly Once 语义。但是也存在一些问题:
因此,Kafka 本身对Exactly Once语义的支持就非常必要。
Kafka 幂等性
在说 Kafka 的事务之前,先要说一下 Kafka 中幂等(Idempotent)的实现。幂等和事务是 Kafka 0.11.0.0 版本引入的两个特性,以此来实现 EOS 语义。
Kafka 幂等性是 Producer 端的特性,为了实现生产端幂等性,Kafka 引入了 Producer ID(即PID)和 Sequence Number。
Broker 端在缓存中保存了这 Sequence Numbler,对于接收的每条消息,如果其序号比 Broker 缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。幂等涉及的参数是 enable.idempotence,默认为 false,开启需要设置为 ture。
但是,这种只能保证单个 Producer 对于单会话单 Partition 的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 Partition 幂等。
Kafka 事务性
Kafka 事务支持
正是因为 Kafka Idempotent 不提供跨多个 Partition 和跨会话场景下的保证,因此,我们是需要一种更强的事务保证,能够原子处理多个 Partition 的写入操作,数据要么全部写入成功,要么全部失败,这就是 Kafka Transactions,即Kafka 事务。
Kafka 事务 API
producer提供了initTransactions,beginTransaction,sendOffsetsToTransaction,commitTransaction,abortTransaction 五个事务方法。
/** * 初始化事务。需要注意的有: * 1、前提 * 需要保证transation.id属性被配置。 * 2、这个方法执行逻辑是: * (1)Ensures any transactions initiated by previous instances of the producer with the same * transactional.id are completed. If the previous instance had failed with a transaction in * progress, it will be aborted. If the last transaction had begun completion, * but not yet finished, this method awaits its completion. * (2)Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * */ public void initTransactions();
/** * 开启事务 */ public void beginTransaction() throws ProducerFencedException ;
/** * 为消费者提供的在事务内提交偏移量的操作 */ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ;
/** * 提交事务 */ public void commitTransaction() throws ProducerFencedException;
/** * 放弃事务,类似回滚事务的操作 */ public void abortTransaction() throws ProducerFencedException ;
相关属性配置
使用 Kafka 的事务 API 时的一些注意事项:
Kafka 事务示例
以下是 Producer 事务使用示例:
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
try { String msg = "matt test";
producer.beginTransaction();
producer.send(new ProducerRecord(topic, "0", msg.toString()));
producer.send(new ProducerRecord(topic, "1", msg.toString()));
producer.send(new ProducerRecord(topic, "2", msg.toString()));
producer.commitTransaction();} catch (ProducerFencedException e1) { e1.printStackTrace();
producer.close();} catch (KafkaException e2) { e2.printStackTrace();
producer.abortTransaction();}producer.close();
Kafka 幂等与事务的关系
事务属性实现前提是幂等性,即在配置事务属性 transaction id 时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。
参数组合情况:
参考链接:
https://www.codercto.com/a/36351.html
http://www.heartthinkdo.com/?p=2040#5