前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JMS--ActiveMQ的简单使用

JMS--ActiveMQ的简单使用

作者头像
宋先生
发布2019-07-18 14:12:19
1K0
发布2019-07-18 14:12:19
举报

一.消息中间件概述

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有 Producer(生产者)、Consumer(消费者)。消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

1.1常见消息中间件

ActiveMQ

ActiveMQ是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和J2EE 1.4 规范的 JMS Provider 实现。

RabbitMQ

AMQP 协议的领导实现,支持多种场景。淘宝的 MySQL 集群内部有使用它进行通讯,OpenStack 开源云平台的通信组件,最先在金融行业得到运用。

ZeroMQ

史上最快的消息队列系统。

Kafka

Apache 下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到 10W/s 的吞吐速率;完全的分布式系统。适合处理海量数据(消息丢失率较高)。

1.2应用场景

  • 异步处理
  • 应用解耦
  • 流量削峰
  • 消息通讯

二.JMS消息服务

JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便于消息系统中的Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。JMS 本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似JDBC(java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS则提供同样与厂商无关的访问方法,以访问消息收发服务。

2.1JMS消息模型

消息中间件一般有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)。

2.1.1点对点模型

点对点模型(Pointer-to-Pointer):即生产者和消费者之间的消息往来。每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

特点
  • 每个消息只有一个消费者(一旦被消费,就不在消息队列中了)
  • 发送者和接收者之间没有依赖,直接发送,不管是否有消费者
  • 接收者成功接收消息后需向队列应答成功
2.1.2发布/订阅模型

发布/订阅(Publish-Subscribe):包含三个角色:主体(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发送到 topic,系统将这些消息投递到订阅此 topic 的订阅者。发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息。topic 实现了发布和订阅,当你发布一个消息,所有订阅这个 topic 的服务都能得到这个消息。

特点
  • 每个消息可有有多个消费者
  • 发布者和订阅者之间有时间上的依赖
  • 针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态

2.2JMS编程模型

ConnectionFactory

创建Connection对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactory 和TopicConnectionFactory 两种。

Destination

Destination 的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的 Destination 是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的 Destination 也是某个队列或主题(即消息来源)。所以,Destination 实际上就是两种类型的对象:Queue、Topic。

Connection

Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。Connection 可以产生一个或多个 Session。

Session

Session 是我们对消息进行操作的接口,可以通过 session 创建生产者、消费者、消息等。Session 提供了事务的功能,如果需要使用 session 发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。

Producer

Producer(消息生产者):消息生产者由 Session 创建,并用于将消息发送到 Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

Consumer

Consumer(消息消费者):消息消费者由 Session 创建,用于接收被发送到 Destination 的消息。两种类型:QueueReceiver 和 TopicSubscriber。可分别通过 session 的 createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以 session 的 creatDurableSubscriber 方法来创建持久化的订阅者。

MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的 onMessage 方法。EJB 中的 MDB(Message-Driven Bean)就是一种 MessageListener。

三.消息队列ActiveMQ

ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通信中间件。ActiveMQ 实现了JMS 1.1 并提供了很多附加的特性,比如 JMX 管理、主从管理、消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等等。

官网:http://activemq.apache.org/

解压安装后进入管理界面:localhost:8161 用户名密码均为:admin

3.1点对点模式

第一步:新建两个Maven工程并都导入activemq坐标
代码语言:javascript
复制
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.13.4</version>
</dependency>
第二步:编写消息生产者
代码语言:javascript
复制
/**
 * 生产消费者模式:生产者
 * @author Mr.song
 * @date 2019/05/24 20:50
 */
public class QueueProducerTest {

    public static void main(String[] args) throws Exception {
        //1.获取连接工厂
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.从工厂获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.通过连接获取会话: 参数1-是否支持事务,  参数2-消息的确认模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.使用会话创建队列的目的地
        Queue queue = session.createQueue("queue-demo");
        //6.创建消息的生产这对象
        MessageProducer producer = session.createProducer(queue);
        //7.创建消息内容(使用会话对象创建)
        TextMessage textMessage = session.createTextMessage("activeMQ的生产消费模型第一个消息来了");
        //8.发送消息
        producer.send(queue,textMessage);
        //9.释放资源
        producer.close();
        session.close();
        connection.close();
    }
}
第三步:编写消息消费者
代码语言:javascript
复制
/**
 * ActiveMQ的生产消费模式-消费者
 * @author Mr.song
 * @date 2019/05/25 15:15
 */
public class QueueConsumerTest {
    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂
        ConnectionFactory factory = new
                ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.创建连接
        Connection connection = factory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建消息目的地
        Queue queue = session.createQueue("queue-demo");
        //6.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(queue);
        //7.使用消费者接受消息:采用监听器轮询接受消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //将message进行转换
                try {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("1号消费者:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.如需,可释放资源
//        consumer.close();
//        session.close();
//        connection.close();
    }
}

Tips: 创建session的两个参数:

  • 第一个 : 是否使用事务
  • 第二个 : 消息的确认模式
    • AUTO_ACKNOWLEDGE = 1 自动确认
    • CLIENT_ACKNOWLEDGE = 2 客户端手动确
    • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
    • SESSION_TRANSACTED = 0 事务提交并确认
第四步:运行测试

3.2发布订阅模式

第一步:新建两个Maven工程并都导入activemq坐标
代码语言:javascript
复制
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.13.4</version>
</dependency>
第二步:编写消息生产者
代码语言:javascript
复制
/**
 * 发布订阅模式的发布者
 * @author Mr.song
 * @date 2019/05/25 15:36
 */
public class TopicProduceTest {

    public static void main(String[] args) throws JMSException {
        //1.获取连接工厂
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.开启连接
        connection.start();
        //4.获取会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建消息队列的目的地,创建的是发布订阅模型的队列
        Topic topic = session.createTopic("topic-demo");
        //6.创建消息的生产者对象
        MessageProducer producer = session.createProducer(topic);
        //7.创建消息内容
        TextMessage textMessage = session.createTextMessage("ActiveMQ的发布订阅模型消息来了");
        //8.发送消息,指定发布到哪个队列
        producer.send(topic,textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}
第三步:编写消息消费者
代码语言:javascript
复制
/**
 * ActiveMQ发布订阅模式的消费者
 * @author Mr.song
 * @date 2019/05/25 15:49
 */
public class TopicConsumerTest {
    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.创建连接
        Connection connection = connectionFactory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建消费者目的地
        Topic topic = session.createTopic("topic-demo");
        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //7.使用消费者接受消息:使用监听器进行轮询
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //进行消息转换
                try {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("订阅到了消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.如果需要,可以关闭资源
//        consumer.close();
//        session.close();
//        connection.close();
    }
}
第四步:运行测试

四.Spring整合JMS

ActiveMQ可以通过Spring的配置文件方式很容易嵌入到Spring应用中。

4.1点对点模式/发布订阅模式

第一步:创建Maven工程并导入相关坐标
代码语言:javascript
复制
<!-- activemq  start -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.11.2</version>
</dependency>
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0.1</version>
</dependency>
<!-- activemq  end -->
<!-- spring 与 mq整合  start -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.0.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>5.0.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.13</version>
</dependency>
<!-- spring 与 mq整合  end -->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <version>5.0.4.RELEASE</version>
</dependency>
第二步:编写消息监听器
代码语言:javascript
复制
/**
 * 生成消费模式,消息监听器
 * @author Mr.song
 * @date 2019/05/25 16:20
 */
@Component
public class QueueListener implements MessageListener {
    /**
     * 获取到消息进行相关的处理
     * @param message
     */
    @Override
    public void onMessage(Message message) {
        try {//1.消息转型
            MapMessage mapMessage = (MapMessage) message;
            String phone = mapMessage.getString("phone");
            String code = mapMessage.getString("code");
            System.out.println("消费者端得到的手机号及验证码是:"+phone+"=="+code);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

//==================================================
/**
 * 发布订阅模式,消息监听器
 * @author Mr.song
 * @date 2019/05/25 16:25
 */
@Component
public class TopicListener implements MessageListener {
    /**
     * 获取到消息进行相关的处理
     * @param message
     */
    @Override
    public void onMessage(Message message) {
        try {//1.消息转型和获取
            MapMessage mapMessage = (MapMessage) message;
            String phone = mapMessage.getString("phone");
            String code = mapMessage.getString("code");
            System.out.println("订阅者获得的手机号和验证码:"+phone+"=="+code);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
第三步:编写消息发布者配置文件(applicaitionContext-mq.xml)
代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
    http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- 1.配置连接工厂,ActiveMQ的连接工厂 -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616"
                           userName="admin" password="admin"/>
    <!-- 2.配置Spring支持会话缓存的连接工厂 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 注入供应商的连接工厂 -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <!-- 设置session缓存的大小: 100个会话 -->
        <property name="sessionCacheSize" value="100"/>
    </bean>

    <!--=================== 通过配置,选择点对点/发布订阅模式 ======================= -->
    
    <!-- 3.配置Spring提供的jms模板 : 点对点模式 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 注入连接工厂,Spring的那个 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- 指定是否是发布订阅模型:false即是点对点模式 -->
        <property name="pubSubDomain" value="false"/>
    </bean>

    <!-- 3.配置Spring提供的jms模板 : 发布订阅模式 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 注入工厂连接 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- 指定是否是发布订阅模型:true即是发布订阅模式 -->
        <property name="pubSubDomain" value="true"/>
    </bean>
</beans>
第四步:编写消息消费者配置文件(applicationContext-listener.xml)
代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
    http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- 1.配置Spring容器启动时要扫描的包 -->
    <context:component-scan base-package="cn.dintalk.listener"/>

    <!-- 2.配置连接工厂:ActiveMQ的连接工厂 -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616"
                           userName="admin" password="admin"/>
    <!-- 3.配置Spring支持会话缓存的连接工厂 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 注入连接工厂:供应商提供的 -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <!-- 设置会话缓存大小: 100个会话 -->
        <property name="sessionCacheSize" value="100"/>
    </bean>

  <!-- ================== 配置不同模式下的消息监听器 =================== -->  
    
    <!-- 4.配置生产消费模式的监听器 -->
    <jms:listener-container destination-type="queue">
        <!-- 配置监听器类,和消息目的地 -->
        <jms:listener destination="spring-queue" ref="queueListener"/>
    </jms:listener-container>
    
    <!-- 5.配置发布订阅模式的监听器 -->
    <jms:listener-container destination-type="topic">
        <jms:listener destination="spring-topic" ref="topicListener"/>
    </jms:listener-container>
</beans>
第五步:编写消息生产者(两种模式都有)
代码语言:javascript
复制
/**
 * Spring整合ActiveMQ的 生产/发布测试类
 * @author Mr.song
 * @date 2019/05/25 16:34
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext-mq.xml")
public class SpringActiveMQProducerTest {

    @Autowired //点对点模式的jms模板
    private JmsTemplate jmsQueueTemplate;
    @Autowired //发布订阅模式的jms模板
    private JmsTemplate jmsTopicTemplate;
  // 1.点对点模式
    @Test
    public void testQueueProducer(){
        jmsQueueTemplate.send("spring-queue", new MessageCreator() {
            /**
             * 消息生成器
             * @param session
             * @return
             * @throws JMSException
             */
            @Override
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("phone","12345678901");
                mapMessage.setString("code","6542");
                return mapMessage;
            }
        });
    }
  //2.发布订阅模式
    @Test
    public void testTopicProducer(){
        jmsTopicTemplate.send("spring-topic", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("phone","12345678901");
                mapMessage.setString("code","5648");
                return mapMessage;
            }
        });
    }
}
第六步:编写消息消费者(注册监听器)(两种模式都有)
代码语言:javascript
复制
/**
 * @author Mr.song
 * @date 2019/05/25 17:42
 */
public class SpringActiveMQConsumerTest {
  //1.加载配置文件,注册消息监听器
    public static void main(String[] args) {
        ClassPathXmlApplicationContext ac = new
                ClassPathXmlApplicationContext("classpath:applicationContext-listener.xml");
        ac.start();
    }
}
第七步:运行测试
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-05-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 顶哥说 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一.消息中间件概述
    • 1.1常见消息中间件
      • ActiveMQ
      • RabbitMQ
      • ZeroMQ
      • Kafka
    • 1.2应用场景
    • 二.JMS消息服务
      • 2.1JMS消息模型
        • 2.1.1点对点模型
        • 特点
        • 2.1.2发布/订阅模型
        • 特点
      • 2.2JMS编程模型
        • ConnectionFactory
        • Destination
        • Connection
        • Session
        • Producer
        • Consumer
        • MessageListener
    • 三.消息队列ActiveMQ
      • 3.1点对点模式
        • 第一步:新建两个Maven工程并都导入activemq坐标
        • 第二步:编写消息生产者
        • 第三步:编写消息消费者
        • 第四步:运行测试
      • 3.2发布订阅模式
        • 第一步:新建两个Maven工程并都导入activemq坐标
        • 第二步:编写消息生产者
        • 第三步:编写消息消费者
        • 第四步:运行测试
    • 四.Spring整合JMS
      • 4.1点对点模式/发布订阅模式
        • 第一步:创建Maven工程并导入相关坐标
        • 第二步:编写消息监听器
        • 第三步:编写消息发布者配置文件(applicaitionContext-mq.xml)
        • 第四步:编写消息消费者配置文件(applicationContext-listener.xml)
        • 第五步:编写消息生产者(两种模式都有)
        • 第六步:编写消息消费者(注册监听器)(两种模式都有)
        • 第七步:运行测试
    相关产品与服务
    消息队列 CMQ 版
    消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档