前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 结合springboot实战--第二节

kafka 结合springboot实战--第二节

作者头像
六个核弹
发布2022-12-23 20:44:21
7360
发布2022-12-23 20:44:21
举报

生产者事务

Spring-kafka自动注册的KafkaTemplate实例是不具有事务消息发送能力的。需要在 application.properties 配置属性:

代码语言:javascript
复制
spring.kafka.producer.acks=-1
spring.kafka.producer.transaction-id-prefix=kafka_tx

当激活事务时 kafkaTemplate 就只能发送事务消息了,发送非事务的消息会报异常。发送事务消息的方法有两种,一种是通过 kafkaTemplate.executeInTransaction 实现,一种是通过 spring的注解 @Transactional 来实现,代码示例:

代码语言:javascript
复制
    @Scheduled(cron = "*/15 * * * * ?")
    public void sendTrans() {
      kafkaTemplate.executeInTransaction(t ->{
          t.send("xxxxx","test1");
          t.send("xxxxx","test2");
          return true;
      }
          );
    }

    @Scheduled(cron = "*/15 * * * * ?")
    @Transactional(rollbackFor = Exception.class)
    public void sendFoo() {
        kafkaTemplate.send("topic_input", "test");

    }

消费者Ack

消费者消息消息可以自动确认,也可以通过手动确认,开启手动首先需要关闭自动提交,然后设置下consumer的消费模式:

代码语言:javascript
复制
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

配置完成之后我们需要对消费者监听器做一点小改动:

代码语言:javascript
复制
    @KafkaListener( topics = "topic_input")
    public void listen(ConsumerRecord<?, String> record, Acknowledgment ack) {
        System.out.println(record.value());
        ack.acknowledge();
    }

如你所见,我们可以通过 Acknowledgment.acknowledge() 来手动的确认消息的消费,不确认就不算消费成功,监听器会再次收到这个消息。对于某些业务场景这个功能还是很必要的,比如消费消息的同时导致写库异常,数据库回滚,那么消息也不应该被ack。

消费者监听器生命周期控制

消费者监听器有三个生命周期:启动、停止、继续;如果我们想控制消费者监听器生命周期,需要修改@KafkaListenerautoStartup 属性为false, 并给监听器 id 属性赋值 然后通过KafkaListenerEndpointRegistry 控制id 对应的监听器的启动停止继续:

代码语言:javascript
复制
import org.springframework.stereotype.Service;
@Service
public class test {
    @Autowired
    KafkaListenerEndpointRegistry listenerRegistry;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(cron = "*/15 * * * * ?")
    @Transactional
    public void testListener(){
        if (i==20){
            listenerRegistry.getListenerContainer("listener1").start();
        }
        System.out.println("生产者生产消息"+i++);
        kafkaTemplate.send("test","xxx"+i);
    }

     @KafkaListener( id = "listener1",topics = "test",autoStartup ="false" )
    public void testStart(ConsumerRecord<?, String> record){
        System.out.println(record.value());
    }


}

通过观察窗口输出就能看到,生产者生产了20条数据后消费者监听器才开始启动消费。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 六个核弹 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 生产者事务
  • 消费者Ack
  • 消费者监听器生命周期控制
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档