专栏首页JAVA后端开发通用的消息队列(redis,kafka,rabbitmq)--消费者篇

通用的消息队列(redis,kafka,rabbitmq)--消费者篇

上篇我写了一个通用的消息队列(redis,kafka,rabbitmq)--生产者篇,这次写一个消费者篇. 1.消费者的通用调用类:

/**
 * 消息队列处理的handle
 * @author starmark
 * @date 2020/5/1  上午10:56
 */
public interface IMessageQueueConsumerService {


    /**
     * 处理消息队列的消息
     * @param message 消息
     */
    void receiveMessage(String message);

    /**
     * 返回监听的topic
     * @return 主题
     */
    String topic();

    /**
     *
     * @param consumerType 消费者类型
     * @return 是否支持该消费者类者
     */
    boolean support(String consumerType);
}

只要实现该类的接口就可以实现监听, redis的消费端,有两个类,如下:

/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueRedisConsumerListener implements MessageListener {

    private IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(Message message, byte[] pattern) {
        messageQueueConsumerService.receiveMessage(message.toString());
    }
}

/**
 * 消息队列服务端的监听
 *
 * @author starmark
 * @date 2020/5/1  上午10:55
 */
@Service
public class MessageQueueRedisConsumerServiceFactory {


    private List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("redis")).collect(Collectors.toList());
    }

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
                    new MessageQueueRedisConsumerListener(messageQueueConsumerService));
            messageListenerAdapter.afterPropertiesSet();
            container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));

        });

        return container;
    }


}

kafka消费者也有两个类,如下:

/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        messageQueueConsumerService.receiveMessage(data.value());
    }
}

/**
 * 消息队列服务端的监听
 *
 * @author starmark
 * @date 2020/5/1  上午10:55
 */
@Component
public class MessageQueueKafkaConsumerServiceFactory  implements InitializingBean {

    @Autowired
    KafkaProperties kafkaProperties;

    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
    }




    private KafkaMessageListenerContainer<Integer, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<>(props);
        return new KafkaMessageListenerContainer<>(cf, containerProps);
    }


    @Override
    public void afterPropertiesSet() {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());

            containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
            );
            KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
            container.setBeanName(messageQueueConsumerService.topic() + "kafkaListener");

            container.start();

        });

    }
}

这些类都是实现动态监听某个主题.

rabbitmq就有点复杂,因为他要求建了queue才能实现监听,我现在这个代码,如果生产者没有创建队列,会自动帮生产者创建该主题的队列。其实这是不对的,但不这么做,无法实现监听.

/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueRabbitmqConsumerListener implements MessageListener  {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueRabbitmqConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(Message message) {

        messageQueueConsumerService.receiveMessage(new String(message.getBody()));
    }

}

@Component
public class MessageQueueRabbitmqConsumerServiceFactory implements InitializingBean {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final ConfigurableApplicationContext applicationContext;
    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
    private final ConnectionFactory connectionFactory;

    @Autowired
    public MessageQueueRabbitmqConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList, ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("rabbitmq")).collect(Collectors.toList());
        this.applicationContext = applicationContext;
        this.connectionFactory = connectionFactory;

    }


    @Override
    public void afterPropertiesSet() {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {

            this.registerBean(messageQueueConsumerService.topic(), messageQueueConsumerService.topic());
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setConsumerStartTimeout(6000L);
        ;
            //设置监听的队列名,
            String[] types = {messageQueueConsumerService.topic()};
            container.setQueueNames(types);
            container.setMessageListener(new MessageQueueRabbitmqConsumerListener(messageQueueConsumerService));
            container.start();
        });

    }


    private void registerBean(String name, Object... args) {
        if (applicationContext.containsBean(name)) {
            return;
        }
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
        if (args.length > 0) {
            for (Object arg : args) {
                beanDefinitionBuilder.addConstructorArgValue(arg);
            }
        }
        BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();

        BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
        beanFactory.registerBeanDefinition(name, beanDefinition);

    }
}

至此,通用的消息队列已完成,这个只能满足一般情况的使用 . 如果要更高端的使用,直接使用其原生的api会更好.

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • redis实现消息队列

    消息队列一般都会想到kafka,rabbitmq,Rockermq, 其实,给你印像做缓存的Redis也是能做消息队列.

    星痕
  • kafka手动监听主题

    很多人作kafka消费时,都快速的使用注解@KafkaListener进行监听。 但我现在有个需求,是要动态的手动监听。 实现代码如下: 1.手动编写监听...

    星痕
  • flowable实现流程全局事件

    最近在研究flowable,发现这个东东虽说是activiti的升级版,但感觉还是没有a5的好用。 项目中需要实现一个全局事件,实现如下:

    星痕
  • 零基础用Python | 实现12306火车票实时查询

    大家都用过12306,今天我们来学习写一个Python程序,要求在命令行敲一行命令来获得你想要的火车票信息。 1 接口设置 先给这个小应用起个名字吧,既然及查询...

    小小科
  • SpringBoot根据条件注入Bean@Condition用法

    @Condition:这个注解在Spring4中引入,其主要作用就是判断条件是否满足,从而决定是否初始化并向容器注册Bean!

    时光_赌徒
  • scadnano:一个基于浏览器的、易于编写脚本的DNA纳米结构设计工具(CS ET)

    我们在https://scadnano.org/中详细介绍了*scadnano*("scriptable cadnano"的缩写),它是一个用于设计合成DNA结...

    Elva
  • 空洞卷积的设计原理以及tensorflow和mxnet框架实现

    空洞卷积(atrous convolutions)又名扩张卷积(dilated convolutions),现在在深度学习视觉领域应用非常广泛,在目标检测、图像...

    小草AI
  • 卷积神经网络(CNN)

    假设给定一张图(可能是字母X或者字母O),通过CNN即可识别出是X还是O,如下图所示

    CristianoC
  • 在 Windows 系统上启用远程应用

    需要一个远程桌面 App 进行演示, 安装 Windows 远程桌面服务太折腾, 需要安装域控制器, 再部署一整套的远程服务, 太折腾了, 如果只是演示的话, ...

    beginor
  • 一个人竟然撸了一个微博 App

    今天在Github的时候,又又又发现了一款厉害的源码,于是记录下来,推荐给你们哦。

    逆锋起笔

扫码关注云+社区

领取腾讯云代金券