前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >kafka手动监听主题

kafka手动监听主题

作者头像
星痕
发布于 2020-05-18 08:20:51
发布于 2020-05-18 08:20:51
80100
代码可运行
举报
文章被收录于专栏:JAVA后端开发JAVA后端开发
运行总次数:0
代码可运行

很多人作kafka消费时,都快速的使用注解@KafkaListener进行监听。 但我现在有个需求,是要动态的手动监听。 实现代码如下: 1.手动编写监听类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        messageQueueConsumerService.receiveMessage(data.value());
    }
}

2.配置监听

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class MessageQueueKafkaConsumerServiceFactory  implements InitializingBean {

    @Autowired
    KafkaProperties kafkaProperties;

    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
    }




    private KafkaMessageListenerContainer<Integer, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<>(props);
        return new KafkaMessageListenerContainer<>(cf, containerProps);
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());

            containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
            );
            KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
            container.setBeanName(messageQueueConsumerService.topic() + "kafkaListener");

            container.start();

        });

    }
}

上面 的IMessageQueueConsumerService是我自定义处理监听的真实类 至此,手动监听代码已完成.

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
通用的消息队列(redis,kafka,rabbitmq)--消费者篇
上篇我写了一个通用的消息队列(redis,kafka,rabbitmq)--生产者篇,这次写一个消费者篇. 1.消费者的通用调用类:
星痕
2020/06/19
1.2K0
Kafka从入门到进阶
如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。
java架构师
2018/12/28
1.1K0
Spring Kafka:@KafkaListener 单条或批量处理消息
来源:csdn.net/ldw201510803006/article/details/116176711 消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者; 看看其整体代码结构: 图片 可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作; Liste
程序猿DD
2022/05/05
2.3K0
Spring Kafka:@KafkaListener 单条或批量处理消息
Spring Boot Kafka概览、配置及优雅地实现发布订阅
本文属于翻译,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net
别打名名
2019/12/24
15.7K0
Spring Boot Kafka概览、配置及优雅地实现发布订阅
聊聊spring对kafka的集成方式
除了官方的java api类库外,spring生态中又额外包装了很多,这里一一简单介绍下。
code4it
2018/09/17
3K0
【kafka异常】使用Spring-kafka遇到的坑
有想进滴滴LogI开源用户群的加我个人微信: jjdlmn_ 进群(备注:进群) 群里面主要交流 kakfa、es、agent、LogI-kafka-manager、等等相关技术; 群内有专人解答你的问题 对~ 相关技术领域的解答人员都有; 你问的问题都会得到回应
石臻臻的杂货铺[同名公众号]
2021/07/14
6.3K0
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。
小小工匠
2023/06/04
4.5K0
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
springboot中使用kafka
kafka 的事务是从0.11 版本开始支持的,kafka 的事务是基于 Exactly Once 语义的,它能保证生产或消费消息在跨分区和会话的情况下要么全部成功要么全部失败
六个核弹
2021/07/26
3.1K0
spring整合kafka(配置文件方式 消费者)
Kafka官方文档有 https://docs.spring.io/spring-kafka/reference/htmlsingle/
大数据流动
2019/08/08
1.3K0
聊聊KafkaListener的实现机制
org/springframework/kafka/annotation/KafkaListener.java
code4it
2023/10/22
7870
聊聊在springboot项目中如何配置多个kafka消费者
不知道大家有没有遇到这样的场景,就是一个项目中要消费多个kafka消息,不同的消费者消费指定kafka消息。遇到这种场景,我们可以通过kafka的提供的api进行配置即可。但很多时候我们会使用spring-kafka来简化开发,可是spring-kafka原生的配置项并没提供多个kafka配置,因此本文就来聊聊如何将spring-kafka进行改造,使之能支持多个kafka配置
lyb-geek
2022/08/02
5.9K0
聊聊在springboot项目中如何配置多个kafka消费者
【真实生产案例】SpringBoot 整合 Kafka 实现数据高吞吐
在上篇文章中,我们详细的介绍了 kafka 的架构模型,在集群环境中,kafka 可以通过设置分区数来加快数据的消费速度。
Java极客技术
2022/12/02
1.2K0
Java进阶实录—pringmvc+kafka分布式消息中间件集成方案
Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案: kafka消息平台使用spring+kafka的集成方案, 详情如下: 1 . 使用最高版本2.1.0.RELEASE集成jar包:spring-integration-kafka 2 . Zookeeper、Kafka分布式集群使用init.properties配置化方案。
慕容千语
2019/06/11
4380
spring集成kafka
一、添加依赖项 compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE' 二、发消息(生产者) 2.1 xml配置 1 <?xml
菩提树下的杨过
2018/01/18
7740
springboot kafka集成(实现producer和consumer)
本文介绍如何在springboot项目中集成kafka收发message。 1、先解决依赖 springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</ve
用户1225216
2018/03/05
3.5K0
Apache Kafka - ConsumerInterceptor 实战 (1)
ConsumerInterceptor是Kafka中的一个重要组件,它允许开发人员在Kafka消费者端拦截和修改消息的处理过程。ConsumerInterceptor可以用于实现各种功能,从消息监控到数据转换和错误处理,为开发人员提供了更大的灵活性和可定制性。
小小工匠
2023/05/27
9690
Apache Kafka - ConsumerInterceptor 实战 (1)
微服务同时接入多个Kafka
kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息 kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中, producerFactory 生产者工厂 consumerFactory 消费者工厂 producerConfigs 生产者配置 consumerConfigs 消费者配置
阿提说说
2022/11/18
1.3K0
微服务同时接入多个Kafka
搭建单体SpringBoot项目 集成Kafka消息队列
通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。
郭顺发
2023/07/07
5470
kafka 结合springboot实战--第三节
kafka 消费者可以将消费到的消息转发到指定的主题中去,比如一条消息需要经过多次流转加工才能走完整个业务流程,需要多个consumer来配合完成。转发代码示例如下:
六个核弹
2022/12/23
4270
@KafkaListener和@KafkaListeners的使用
@KafkaListeners是@KafkaListener的Container Annotation,这也是jdk8的新特性之一,注解可以重复标注。
小勇DW3
2019/11/14
5.5K0
@KafkaListener和@KafkaListeners的使用
推荐阅读
相关推荐
通用的消息队列(redis,kafka,rabbitmq)--消费者篇
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文