之前说到了activeMQ的一些基本用法,本文将介绍activeMQ如何与spring以及spring boot整合。
1、需要的依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.9</version>
</dependency>
2、applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 1、开启自动扫描 -->
<context:component-scan base-package="com.zhu.study"/>
<!-- 2、配置连接 -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.x.xx:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"/>
</bean>
<!-- 3、配置目的地,队列 -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-active-queue"/>
</bean>
<!-- 配置目的地,主题 -->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-active-topic"/>
</bean>
<!-- 4、配置spring提供的jms模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationQueue"/>
<!--<property name="defaultDestination" ref="destinationTopic"/>-->
<property name="messageConverter">
<!-- 做消息类型转换的 -->
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
配置很简单,就是四步:
如果想将目的地由queue换成topic,只需要在配置jmsTemplate的时候,将defaultDestination指向你上面配置的topic即可,然后启动时先启动消费者,其他任何地方不用改。
3、生产者:
@Service
public class Produce {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args){
// 1、加载配置
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
// 2、获取bean
Produce produce = context.getBean(Produce.class);
// 3、调用jmsTemplate发送消息
/*produce.jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("spring整合activeMQ");
return message;
}
});*/
// lambda方式:编程口诀(拷贝小括号,写死右箭头,落地大括号)
produce.jmsTemplate.send((Session session) -> {
TextMessage message = session.createTextMessage("spring整合activeMQ");
return message;
});
System.out.println("activemq send success!");
}
}
4、消费者:
@Service
public class Consumer {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args){
// 1、加载配置
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
// 2、获取bean
Consumer consumer = context.getBean(Consumer.class);
// 3、消费消息
String result = (String)consumer.jmsTemplate.receiveAndConvert();
System.out.println("消费者收到消息:" + result);
}
}
这样就搞定了。不过上面说了,如果把目的地换成topic要先启动消费者。其实还可以配置监听程序,这样就不需要手动启动消费者了,消费者会一直处于待命状态。先写一个监听的类。
5、监听程序:
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("从activemq收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
然后在applicationContext.xml中配置:
<!-- 5、配置监听程序 -->
<bean id="jmsListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="destination" ref="destinationTopic"/>
<property name="messageListener" ref="myMessageListener"/>
</bean>
这样就只需启动生产者即可了,监听程序会自动监听,收到消息就会进行消费。
首先新建一个springboot项目,用来编写生产者代码。 1、需要的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2、application.yml:
server:
port: 6666
spring:
activemq:
broker-url: tcp://192.168.x.xx:61616
user: admin
password: admin
# false = Queue; true = Topic
jms:
pub-sub-domain: false
#队列名称
myqueue: boot-activemq
上面配置的是队列,要用主题的话,把上面的false改成true。
3、配置类:
@Component
@EnableJms // 这个注解必不可少
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
// 1、创建队列
@Bean
public Queue queue(){
return new ActiveMQQueue(myQueue);
}
}
这个配置类主要就是创建了一个队列,队列名从配置文件中读取。
4、生产者: 这里主要有两种生产模式,一种是触发投递,一种是定时投递。触发投递就是程序启动后,满足某个条件才会去调用发送消息的方法;定时投递就是相当于一个定时任务。
@Component
public class Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 触发投递
public void produceMsg(){
String message = "springboot整合activemq成功";
jmsMessagingTemplate.convertAndSend(queue,message);
System.out.println("消息触发投递成功");
}
// 定时生产,每隔5秒中向MQ发送一次消息(还需在启动类上加上@EnableScheduling注解)
@Scheduled(fixedDelay = 5000)
public void produceMsgScheduled(){
String message = "定时投递消息";
jmsMessagingTemplate.convertAndSend(queue,message);
System.out.println("消息定时投递成功");
}
}
注意定时投递需要在启动类上加@EnableScheduling注解!要测试定时投递,直接运行spring boot的启动类就好了,就可以看到每隔5秒“消息定时投递成功”就会被打印一次。要测试触发投递,就需要我们手动地去调用produceMsg方法,可以写个如下的测试类:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqspringbootApplicationTests {
@Autowired
private Produce produce;
@Test
public void contextLoads() {
produce.produceMsg();
}
}
运行这个测试类就可以看到会打印出“消息触发投递成功”,然后程序就会停止。
5、消费者: 我们知道消费消息有两种方式,一种是用receive方法,还有就是监听。用receive方法和spring中的一样,这里讲如何配置监听。
@Component
public class Queue_Consumer {
@JmsListener(destination = "${myqueue}")
public void recevice(TextMessage message) throws Exception{
System.out.println("消费者收到消息:" + message.getText());
}
}
没错,就是这么简单!在spring中还需要我们自己新建监听类,然后配置到配置文件中,在springboot中,一个注解就搞定了!
6、发布订阅: 上面用的是队列,主题的使用方法如下:
pub-sub-domain
的值改为true。@Bean
public Topic topic(){
return new ActiveMQTopic(myTopic);
}
可以看出,springboot整合activemq比spring整个它简单很多!