前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 的事务到底长啥样?

Kafka 的事务到底长啥样?

作者头像
大数据技术架构
发布2019-08-16 15:49:49
1.5K0
发布2019-08-16 15:49:49
举报
作者 | 来自网络

整理 | 纯粹技术分享

这篇文章主要讲述 Kafka 事务性相关原理,从 Kafka EOS 语义、幂等性、事务性等几个方面阐述。

Kafka EOS 语义

EOS(Exactly Once Semantics,精确一次处理语义)是从 Kafka 0.11.0.0 版本开始支持的,之前版本中只支持 At Least Once 和 At Most Once 语义,并不支持 Exactly Once 语义。

因为在很多要求严格的场景下,如使用 Kafka 处理交易数据,Exactly Once 语义是必须的。我们可以通过让下游系统具有幂等性来配合 Kafka 的 At Least Once 语义来间接实现 Exactly Once 语义。但是也存在一些问题:

  • 该方案要求下游系统支持幂等操作,限制了 Kafka 的适用场景
  • 实现门槛相对较高,需要用户对 Kafka 的工作机制非常了解
  • 对于 Kafka Stream 而言,Kafka 本身即是自己的下游系统,但 Kafka 在 0.11.0.0 版本之前不具有幂等发送能力

因此,Kafka 本身对Exactly Once语义的支持就非常必要。

Kafka 幂等性

在说 Kafka 的事务之前,先要说一下 Kafka 中幂等(Idempotent)的实现。幂等和事务是 Kafka 0.11.0.0 版本引入的两个特性,以此来实现 EOS 语义。

Kafka 幂等性是 Producer 端的特性,为了实现生产端幂等性,Kafka 引入了 Producer ID(即PID)和 Sequence Number。

  • PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。
  • Sequence Numbler:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。

Broker 端在缓存中保存了这 Sequence Numbler,对于接收的每条消息,如果其序号比 Broker 缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。幂等涉及的参数是 enable.idempotence,默认为 false,开启需要设置为 ture。

但是,这种只能保证单个 Producer 对于单会话单 Partition 的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 Partition 幂等。

Kafka 事务性

Kafka 事务支持

正是因为 Kafka Idempotent 不提供跨多个 Partition 和跨会话场景下的保证,因此,我们是需要一种更强的事务保证,能够原子处理多个 Partition 的写入操作,数据要么全部写入成功,要么全部失败,这就是 Kafka Transactions,即Kafka 事务。

Kafka 事务 API

producer提供了initTransactions,beginTransaction,sendOffsetsToTransaction,commitTransaction,abortTransaction 五个事务方法。

代码语言:javascript
复制
    /**     * 初始化事务。需要注意的有:     * 1、前提     * 需要保证transation.id属性被配置。     * 2、这个方法执行逻辑是:     *   (1)Ensures any transactions initiated by previous instances of the producer with the same     *      transactional.id are completed. If the previous instance had failed with a transaction in     *      progress, it will be aborted. If the last transaction had begun completion,     *      but not yet finished, this method awaits its completion.     *    (2)Gets the internal producer id and epoch, used in all future transactional     *      messages issued by the producer.     *     */    public void initTransactions();
    /**     * 开启事务     */    public void beginTransaction() throws ProducerFencedException ;
    /**     * 为消费者提供的在事务内提交偏移量的操作     */    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,                                         String consumerGroupId) throws ProducerFencedException ;
    /**     * 提交事务     */    public void commitTransaction() throws ProducerFencedException;
    /**     * 放弃事务,类似回滚事务的操作     */    public void abortTransaction() throws ProducerFencedException ;

相关属性配置

使用 Kafka 的事务 API 时的一些注意事项:

  • 需要消费者的自动模式设置为 false,并且不能子再手动的进行执行consumer#commitSync或者consumer#commitAsyc。
  • 生产者配置 transactional.id 属性。
  • 生产者不需要再配置 enable.idempotence,因为如果配置了transaction.id,则此时 enable.idempotence 会被设置为true。
  • 消费者需要配置 isolation.level 属性,有两个可选值:"read_committed","read_uncommitted",默认"read_uncommitted"。

Kafka 事务示例

以下是 Producer 事务使用示例:

代码语言:javascript
复制
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
try {    String msg = "matt test";
    producer.beginTransaction();
    producer.send(new ProducerRecord(topic, "0", msg.toString()));
    producer.send(new ProducerRecord(topic, "1", msg.toString()));
    producer.send(new ProducerRecord(topic, "2", msg.toString()));
    producer.commitTransaction();} catch (ProducerFencedException e1) {    e1.printStackTrace();
    producer.close();} catch (KafkaException e2) {    e2.printStackTrace();
    producer.abortTransaction();}producer.close();

Kafka 幂等与事务的关系

事务属性实现前提是幂等性,即在配置事务属性 transaction id 时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。

  • 幂等性引入了 Porducer ID(还有 Sequence Numbler)。
  • 事务属性引入了 Transaction Id 属性。

参数组合情况:

  • enable.idempotence = true,transactional.id不设置:只支持幂等性。
  • enable.idempotence = true,transactional.id设置:支持事务属性和幂等性
  • enable.idempotence = false,transactional.id不设置:没有事务属性和幂等性的kafka
  • enable.idempotence = false,transactional.id设置:无法获取到PID,此时会报错

参考链接:

Kafka EOS 之事务性实现:

https://www.codercto.com/a/36351.html

Kafka生产者事务和幂等:

http://www.heartthinkdo.com/?p=2040#5

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka EOS 之事务性实现:
    • Kafka生产者事务和幂等:
    相关产品与服务
    区块链
    云链聚未来,协同无边界。腾讯云区块链作为中国领先的区块链服务平台和技术提供商,致力于构建技术、数据、价值、产业互联互通的区块链基础设施,引领区块链底层技术及行业应用创新,助力传统产业转型升级,推动实体经济与数字经济深度融合。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档