专栏首页IT技术小咖ActiveMQ 客户端的开发

ActiveMQ 客户端的开发

上篇文章 ActiveMQ 服务器的部署 实现了 ActiveMQ 服务器的部署,本文分别以官方 API、Spring、SpringBoot 三种方式,实现 ActiveMQ 消息的生成者和消费者。

1.基于官方 API

ActiveMQ 官方实现了 JMS 接口,但使用很繁琐,不建议直接使用。 https://mvnrepository.com/artifact/org.apache.activemq/activemq-client

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.15.3</version>
</dependency>

2.基于 Spring 开发

采用 spring.xml 配置的方式,比 JMS 简单,但也比较繁琐。 参见 《Spring实战》/ 12.Spring 消息

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring-version}</version>
</dependency>

3.基于 SpringBoot 开发

IDEA 新建模块的 Spring Initializr 向导中,选中 I/O, JMS(ActiveMQ); 相应的 starter 是 spring-boot-starter-activemq

3.1 依赖包 pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

3.2 配置文件 application.properties

# ActiveMQ 地址
spring.activemq.broker-url=tcp://centos:61616
# 参数 spring.jms.pub-sub-domain 用于指定消息模型是否为发布/订阅方式
# 默认情况下(false),是点对占方式(queue),如果要使用发布/订阅方式(topic),必须设置为 true
spring.jms.pub-sub-domain=false

3.3 应用类 MessageApplication.java

其中定义了 4 个 Bean:

  • queue/topic: MessageProducer 中使用,作为消息发送的目标(分别是 queue/topic);
  • containerFactoryQueue/containerFactoryTopic: MessageConsumer 中使用,以便同时启用两种消息模型。

需要特别说明的是:

  • 只启用“点对点”模型:可配置 spring.jms.pub-sub-domain=false
  • 只启用“发布/订阅”模型:可配置 spring.jms.pub-sub-domain=false
  • 若要同时启用两种消息模型:必须定义 containerFactory Bean 并在消费者的 @JmsListener 中引用。
@SpringBootApplication
public class MessageApplication {
    @Bean
    public Queue queue() { // javax.jms.Queue
        return new ActiveMQQueue("queueName");
    }
    @Bean
    public Topic topic() { // javax.jms.Topic
        return new ActiveMQTopic("topicName");
    }
    @Bean
    public JmsListenerContainerFactory<?> containerFactoryQueue(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }
    @Bean
    public JmsListenerContainerFactory<?> containerFactoryTopic(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        bean.setPubSubDomain(true);
        return bean;
    }
    public static void main(String[] args) {
        SpringApplication.run(MessageApplication.class, args);
    }
}

3.4 生产类 MessageProducer.java

@Component
@EnableScheduling
public class MessageProducer {
    @Resource
    private Queue queue; // javax.jms.Queue
    @Resource
    private Topic topic; // javax.jms.Topic
    @Resource
    private JmsMessagingTemplate messagingTemplate;
    @Scheduled(fixedDelay = 3000)
    public void sendQueue() { // 每隔 3 秒,发送一个“点对点”消息
        messagingTemplate.convertAndSend(queue, "Hi, queue!");
    }
    @Scheduled(fixedDelay = 6000)
    public void sendTopic() { // 每隔 6 秒,发送一个“发布/订阅”消息
        messagingTemplate.convertAndSend(topic, "Hi, topic!");
    }
}

3.5 消费类 MessageConsumer.java

@Component
public class MessageConsumer {
    private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    @JmsListener(destination = "queueName", containerFactory = "containerFactoryQueue")
    public void receiveQueue(String message) {
        logger.info("receiveQueue: " + message);
    }
    @JmsListener(destination = "topicName", containerFactory = "containerFactoryTopic")
    public void receiveTopic(String message) {
        logger.info("receiveTopic: " + message);
    }
}

3.6 运行结果

14:43:27.981  INFO  [MessageConsumer.java:22] - receiveTopic: Hi, topic!
14:43:27.993  INFO  [MessageConsumer.java:17] - receiveQueue: Hi, queue!
14:43:31.028  INFO  [MessageConsumer.java:17] - receiveQueue: Hi, queue!
14:43:34.008  INFO  [MessageConsumer.java:22] - receiveTopic: Hi, topic!
14:43:34.048  INFO  [MessageConsumer.java:17] - receiveQueue: Hi, queue!
14:43:37.169  INFO  [MessageConsumer.java:17] - receiveQueue: Hi, queue!

同时,可在 http://centos:8161/admin 中看到相关信息。

作者:王克锋

出处:https://kefeng.wang/2017/10/18/activemq-development/

本文分享自微信公众号 - IT技术小咖(IT-arch),作者:王克锋

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-05-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • JVM故障分析及性能优化实战(IV)——jstack生成的Thread Dump日志线程状态

    前面文章中只分析了Thread Dump日志文件的结构,今天针对日志文件中 Java EE middleware, third party & custom a...

    IT技术小咖
  • 如何保证消息队列的高可用?

    RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。

    IT技术小咖
  • JVM故障分析及性能优化实战(V)——常见的Thread Dump日志案例分析

    我们在上篇文章中详细描述了Thread Dump中Native Thread和JVM Thread线程的各种状态及描述,今天总结分析的一些原则,并详细列举一些案...

    IT技术小咖
  • 这次要说不明白immutable类,我就怎么地

    收到读者小 R 的私信后,我就总感觉自己有一种义不容辞的责任,非要把 immutable 类说明白,否则我就怎么地——你说了算!

    沉默王二
  • 你真的看懂了登上Nature 封面的清华“天机”芯片?

    近日Nature杂志封面刊登了清华类脑计算团队的最新成果:天机芯片以及由其操控的自行车。(详见《“天机”今登Nature封面:清华施路平团队发布全球首款异构融合...

    新智元
  • Android Binder 分析系列——原理(下)

    图中我把相关的 jni 里面类也画了出来,这样就能更加明显的看到 java 层和 native 层的对应关系。

    open
  • CodeForces 938A Word Correction

          题意是输入长度为n的字符串,如果'a''e''i''o''u''y'这些字母中有两个是相连的,就删除后面的那一个,然后输出最后的结果。我的思路就是开...

    Ch_Zaqdt
  • 使用代码删除IBASE object component

    需求: 将包含object ZJERRY0903C1的object component 从IBASE 中移除:

    Jerry Wang
  • 这是一份你们需要的Windows版深度学习软件安装指南

    选自Github 机器之心编译 参与:蒋思源、刘晓坤 本文从最基本的依赖项开始,依次配置了 VS 2015、Anaconda 4.4.0、CUDA 8.0.61...

    机器之心
  • 这是一份你们需要的Windows版深度学习软件安装指南

    该配置版本最后更新的日期是今年七月,该更新版本允许本地使用 3 个不同的 GPU 加速后端,并添加对 MKL BLAS 库的支持。

    华章科技

扫码关注云+社区

领取腾讯云代金券