首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

在Spring Kafka中设置isolation.level = read_committed (Spring Boot版本1.5.18和spring Kafka版本1.3.8)

在Spring Kafka中,设置isolation.level = read_committed是为了确保消费者只读取已提交的消息。这个配置项可以用于保证消费者只消费到已经被事务提交的消息,从而避免读取到未提交的消息。

isolation.level是Kafka的一个重要配置项,用于控制消费者的隔离级别。在Spring Kafka中,可以通过设置isolation.level = read_committed来将隔离级别设置为只读取已提交的消息。

read_committed隔离级别是Kafka事务的一部分,它确保只有已经被事务提交的消息才会被消费者读取。这对于一些对消息顺序和一致性要求较高的应用场景非常重要。

设置isolation.level = read_committed的优势包括:

  1. 数据一致性:只读取已提交的消息可以确保消费者获取到的数据是一致的,避免了读取到未提交的消息导致的数据不一致问题。
  2. 顺序性:读取已提交的消息可以保证消息的顺序性,避免了读取到未提交的消息导致的消息顺序错乱问题。
  3. 可靠性:通过只读取已提交的消息,可以提高应用的可靠性,避免了读取到未提交的消息导致的数据错误或重复消费的问题。

在Spring Boot版本1.5.18和spring Kafka版本1.3.8中,可以通过以下方式设置isolation.level = read_committed:

代码语言:txt
复制
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 设置isolation.level为read_committed
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

在上述代码中,通过设置ConsumerConfig.ISOLATION_LEVEL_CONFIG属性为"read_committed",即可将隔离级别设置为只读取已提交的消息。

推荐的腾讯云相关产品和产品介绍链接地址:

  • 腾讯云消息队列 CKafka:https://cloud.tencent.com/product/ckafka
  • 腾讯云云原生容器服务 TKE:https://cloud.tencent.com/product/tke
  • 腾讯云数据库 TencentDB:https://cloud.tencent.com/product/cdb
  • 腾讯云云服务器 CVM:https://cloud.tencent.com/product/cvm
  • 腾讯云云安全中心:https://cloud.tencent.com/product/ssc
  • 腾讯云音视频处理:https://cloud.tencent.com/product/mps
  • 腾讯云人工智能:https://cloud.tencent.com/product/ai
  • 腾讯云物联网平台:https://cloud.tencent.com/product/iotexplorer
  • 腾讯云移动开发:https://cloud.tencent.com/product/mobdev
  • 腾讯云对象存储 COS:https://cloud.tencent.com/product/cos
  • 腾讯云区块链服务:https://cloud.tencent.com/product/baas
  • 腾讯云元宇宙:https://cloud.tencent.com/product/mu
页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

「首席架构师看Event Hub」KafkaSpring 深入挖掘 -第1部分

消息转换器bean推断要转换为方法签名的参数类型的类型。 转换器自动“信任”类型。Spring Boot自动将转换器配置到侦听器容器。...本例,我们将在两端使用消息转换器(以及StringSerializerStringDeserializer)。...同样,Spring Boot会自动将消息转换器配置到容器。下面是应用程序片段的生产端类型映射。...transaction-id-prefix: tx. consumer: properties: isolation.level: read_committed 当使用spring-kafka 1.3...x或更高版本支持事务的kafka-clients版本(0.11或更高版本),@KafkaListener方法执行的任何KafkaTemplate操作都将参与事务,而侦听器容器将在提交事务之前向事务发送偏移量

1.4K40

Flink exactly-once系列实践之KafkaToKafka

env.setParallelism(4); //3.设置CK状态后端 CkAndStateBacked.setCheckPointAndStateBackend...=read_committed(默认值为read_uncommitted) sourceProperties.put("isolation.level", "read_committed...这里设置read_committed(默认为read_uncommitted) 这里可以看到以你CheckPoint设置的时间,来批量展示kafka生产者的消息。...0.11.0.0版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50,开启事务性的情况下就会首先会获得一个全局的TransactionCoordinator...) 当flink任务出现异常的情况下,kafka会把以及提交但是未标记可以消费的数据直接销毁,或者正常的情况下,会正式提交(本质是修改消息的标志位),之后对于消费者开启isolation.level

27310

Spring Boot Kafka概览、配置及优雅地实现发布订阅

*作为前缀的配置参数),Spring Boot中使用Kafka特别简单。并且Spring Boot还提供了一个嵌入式Kafka代理方便做测试。...本文尽量做到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring BootSpring Kafka进一步简化配置,通过Spring BootKafka几大注解实现发布订阅功能...Boot启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,因此不需要使用显式的@EnableKafka。...5.3 基于自定义配置发布订阅实现 上面是简单的通过Spring Boot依赖的Spring Kafka配置即可快速实现发布订阅功能,这个时候我们是无法程序操作这些配置的,因此这一小节就是利用我们之前...Spring Kafka的发送消息接收消息功能,其他包括Spring Kafka Stream的简单介绍,以及Spring Boot如何通过三种方式去实现Kafka的发布订阅功能,涉及了Kafka

15.2K72

2017年终总结

主要是研究了0.8版本的一些使用实现细节,另外研究了0.9版本与0.8版本的一些区别,还有就是今年主推的kafka stream,后续可能基于1.0版本再去深入了解。...聊聊spring的async注解 聊聊TaskExecutor的spring托管 springboot的diskSpaceHealthIndicator springboot扩展tomcat的executor...data jpa数据变更审计 javer的表结构设计 使用envers记录数据变更版本 聊聊spring jdbc的RowMapper spring-data-jpa设置fetchsize spring-data-jpa...的auditor设置 spring-data-jpa软删除方案 spring data jpa 使用native sql实例 jpa存储byte到postgresql mybatis spring boot...prometheus 输出hystrix指标到dropwizard metrics Prometheus的架构及持久化 通过jmx监控docker的java应用 使用pcp监控spring boot

1.7K10

springboot中使用kafka

kafka 事务 kafka 的事务是从0.11 版本开始支持的,kafka 的事务是基于 Exactly Once 语义的,它能保证生产或消费消息跨分区和会话的情况下要么全部成功要么全部失败 生产者事务...,该组件还会将事务状态持久化到kafka一个内部的 Topic 。...当参数设置read_committed,则消费者不能消费到未commit的消息。...可能会给多个topic发送消息,需要保证消息要么全部发送成功要么全部发送失败(操作的原子性); 消费者 消费一个topic,然后做处理再发到另一个topic,这个消费转发的动作应该在同一事物; 如果下游消费者只有等上游消息事务提交以后才能读到...需要配置属性: spring.kafka.producer.acks=-1 spring.kafka.producer.transaction-id-prefix=kafka_tx 当激活事务时 kafkaTemplate

2.9K20

SpringBoot3集成Kafka

组件选择与boot框架spring相同的依赖,即6.0.10版本spring-kafka最近的版本3.0.8符合; 但是该版本使用的是kafka-clients组件的3.3.2版本Spring...文档的kafka模块,明确说明spring-boot:3.1要使用kafka-clients:3.4,所以从spring-kafka组件中排除掉,重新依赖kafka-clients组件; <dependency...properties: max.poll.interval.ms: 3600000 四、基础用法 1、消息生产 模板类KafkaTemplate用于执行高级的操作,封装各种消息发送的方法,该方法...")); // 发送消息 kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);...e.printStackTrace(); } return "OK" ; } } 2、消息消费 编写消息监听类,通过KafkaListener注解控制监听的具体信息,实现消息生产消费的方法测试后

72820

Spring Cloud Config 配置中心实践过程,你需要了解这些细节!

Spring Cloud Config 基本概念 Spring Cloud Config 用来为分布式系统的基础设施微服务应用提供集中化的外部配置支持。...客户端:微服务架构各个微服务应用基础设施,通过指定配置中心管理应用资源与业务相关的配置内容,启动时从配置中心获取和加载配置信息 SCC作用: 实现了对服务端客户端中环境变量属性配置的抽象映射。...注意:{label} 参数很特别,如果 GIT 分支标签包含 “/”,那么 {label} 参数 HTTP 的 URL 应用使用 “(_)” 替代,以避免改变了 URI 含义,指向到其他 URI... B 版本,未实际测试过,存储到临时目录 /tmp/config-repo-随机数目录下。 为了避免一些不可预知的问题,我们设置一个固定的本地GIT仓库目录。...from' in value "${from}" 2)多仓库目录注意事项: 这种方式不能设置参数 spring.cloud.config.server.git.force-pull=true spring.cloud.config.server.git.clone-on-start

1.2K20

IDEA导入Spring-kafka项目Gradle编译失败

前言 最近在弄kafka相关的东东,因为是spring boot工程,所以用到了Spring-kafka,一个包含了kafka-producerkafka-consumer自动装配的依赖。...为了进一步研究spring是如何封装的kafka官方客户端的细节,所以从github上拉到了源码准备研究下,导入到IDEA时,因为Spring-kafka工程使用的是Gradle,导入时就编译失败了...Spring-kafka地址:https://github.com/spring-projects/spring-kafka 异常信息如下: java.lang.AbstractMethodError...,IDEA的环境下如果不配置本地的Gradle版本,默认会去拉最新的Gradle版本5.x。...直接解压到本地目录,然后打开IDEA的设置,找到如下箭头所指向的配置 Gradle home选中自己刚解压的目录即可,提交后会自动触发再次编译

20230

基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

事务提交后使用JkesKafkaProducer发送SaveEvent的实体到KafkaKafka会使用我们提供的JkesJsonSerializer序列化指定的数据,然后发送到Kafka。...Spring Boot Application,使用docker打包为镜像 查询服务提供多版本API,用于API进化兼容 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java...一致的配置模型 EventSupport处理事件的细节,保存删除数据时生成相应事件存放到EventContainer,事务提交回滚时处理相应的事件 SearchPlatformTransactionManager...包装了客户端的事务管理器,事务提交回滚时加入了回调hook audit包提供了一个简单的AuditedEntity父类,方便添加审计功能,版本信息可用于结合ElasticSearch的版本机制保证不会索引过期文档数据...我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序的接入难度 查询服务是一个Spring Boot

2.1K10
领券