相当于一个ActiveMQ服务实例
Broker其实就是实现了用代码的形式启动了ActiveMQ将MQ嵌入到java代码中,以便随时用随时启动,在用的时候再去启动这样能节约资源,也保证了可靠性。
1、 先将ActiveMQ根目录下conf文件夹中的activemq.xml复制一份并重命名为activemq02.xml
命令(cp activemq.xml activemq02.xml)
2、启动activemq02.xml,默认启动的是activemq.xml
命令(./activemq start xbean:file:/usr/local/activeMQ/apache-activemq-5.15.11/conf/activemq02.xml)
用ActiveMQ Broker作为独立的消息服务器来构建java应用。ActiveMQ也支持在虚拟机中通信,基于嵌入式的broker,能够无缝的集成其他java应用
1、pom.xml中引入包
<!--activemq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
2、broker代码
import org.apache.activemq.broker.BrokerService;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/10 16:04
* @Version: 1.0
*/
public class EmbedBroker {
public static void main(String[] args) throws Exception {
//ActiveMQ也支持在虚拟机中通信,嵌入broker
BrokerService brokerService=new BrokerService();
//将activeMQ嵌入到java程序中
brokerService.setUseJmx(true);
//现在是将activeMQ嵌入到java程序中,所以使用本机
brokerService.addConnector("tcp://127.0.0.1:61616");
//启动程序
brokerService.start();
}
}
3、队列生产者代码
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/2 17:04
* @Version: 1.0
*/
public class ActiveMQTest {
//url路径
private static final String ACTRIVE_URL="tcp://127.0.0.1:61616";
//队列名称
private static final String QUEUE_NAME="queue01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题),这里是创建队列
Queue queue=session.createQueue(QUEUE_NAME);
//5、创建消息生产者,队列模式
MessageProducer messageProducer = session.createProducer(queue);
//6、通过messageProducer生产三条消息发送到MQ消息队列中
for (int i=0;i<3;i++){
//7、创建消息
TextMessage textMessage = session.createTextMessage("msg----->" + i);//创建一个文本消息
//8、通过messageProducer发送给mq
messageProducer.send(textMessage);
//9、数据非持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
messageProducer.close();
session.commit();
session.close();
connection.close();
System.out.println("消息发送成功");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3、队列消费者代码
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/3 8:47
* @Version: 1.0
*/
public class ActiveMQConsumer {
//url路径
private static final String ACTRIVE_URL="tcp://127.0.0.1:61616";
//队列名称
private static final String QUEUE_NAME="queue01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、这里接受的queue的名称要和发送者的一致
Queue queue = session.createQueue(QUEUE_NAME);
//5、创建消费者
MessageConsumer consumer = session.createConsumer(queue);
//6、通过监听的方式消费消息
while(true){
//MessageConsumer 调用的receive方法为同步调用,在消息到达之前一直阻塞线程
//用什么格式发送,这里就用什么格式接受
//receive等待消息,不限制时间
TextMessage message=(TextMessage)consumer.receive();
//receive带参数等待消息,限制时间,单位毫秒
//TextMessage message=(TextMessage)consumer.receive(4000L);
if(null != message){
System.out.println("接受的消息为------>"+message.getText());
}else{
break;
}
}
//7、闭资源
consumer.close();
session.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}