1. 下载 ActiveMQ , 到 activemq 官网上下载 Windows 或者 Linux 对应的安装包
2. 安装 ActiveMQ
Windows 系统下, 解压 apache-activemq-5.15.3-bin.zip, 然后在 bin 目录下有 win32 和 win64 文件夹(根据自己电脑系统选择), win64 目录下 activemq.bat(右键, 以管理员身份运行) 就可以在命令行运行. 另外, 双击InstallService.bat 也可以以服务的形式安装后运行.
Linux 安装运行. 下载 apache-activemq-5.15.3-bin.tar.gz , 然后解压, 然后运行 bin/activemq start
测试是否安装成功: 访问 http://localhost:8161 , 如果可以访问就说明安装成功.
3. 创建简单的 spring-boot 项目引入 ActiveMQ
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
4. 写实验代码
队列模式 (消费者平均消费生产者队列下的消息)
package com.codingos.springboot.activemq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 队列模式 生产者
*/
public class AppProducer {
private static final String url = "tcp://localhost:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
// 1.常见ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.创建Connection
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建会话 (1参: 是否在事务中处理, 2参: 使用自动应答模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标队列
Destination destination = session.createQueue(queueName);
// 6.创建生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7.创建消息
TextMessage textMessage = session.createTextMessage("Message " + i);
// 8.发送消息
producer.send(textMessage);
System.out.println("发送消息: " + textMessage.getText());
}
// 9.关闭连接
connection.close();
}
}
package com.codingos.springboot.activemq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 队列模式 消费者
*/
public class AppConsumer {
private static final String url = "tcp://localhost:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
// 1.常见ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.创建Connection
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建会话 (1参: 是否在事务中处理, 2参: 使用自动应答模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标队列
Destination destination = session.createQueue(queueName);
// 6.创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 7.创建监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息: "+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
主题模式 (订阅者全量消费发布者的主题下的消息)
package com.codingos.springboot.activemq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 主题模式 发布者
*/
public class AppProducer {
private static final String url = "tcp://localhost:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
// 1.常见ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.创建Connection
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建会话 (1参: 是否在事务中处理, 2参: 使用自动应答模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标主题
Destination destination = session.createTopic(topicName);
// 6.创建发布者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7.创建消息
TextMessage textMessage = session.createTextMessage("Message " + i);
// 8.发送消息
producer.send(textMessage);
System.out.println("发送消息: " + textMessage.getText());
}
// 9.关闭连接
connection.close();
}
}
package com.codingos.springboot.activemq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 主题模式 订阅者
*/
public class AppConsumer {
private static final String url = "tcp://localhost:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
// 1.常见ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.创建Connection
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建会话 (1参: 是否在事务中处理, 2参: 使用自动应答模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标主题
Destination destination = session.createTopic(topicName);
// 6.创建订阅者
MessageConsumer consumer = session.createConsumer(destination);
// 7.创建监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息: "+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}