前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >干货|SpringBoot JMS(ActiveMQ)API实践应用详解

干货|SpringBoot JMS(ActiveMQ)API实践应用详解

作者头像
浅羽技术
发布2020-12-07 11:11:47
4100
发布2020-12-07 11:11:47
举报
文章被收录于专栏:浅羽技术

前言

Active是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。AC-tiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。

ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。ActiveMQ实现了JMS标准并提供了很多附加的特性。本文将带大家详细介绍ActiveMQ的API的使用。

公众号:「浅羽的IT小屋」

1. JMS的概念?

「什么是JMS呢:」

  • JMS---------JAVA Message Service
  • JAVA的消息服务,是sun公司提供的接口,只是一个规范,这个规范就类似于JDBC是一样的,使用的时候是需要当前规范的实现产品的。

「JMS能干什么呢:」

  • 能够将信息发布到目的地
  • 可以从目的地来消费这个消息
2、两种通信模型

「队列的通信概念:」

  • 特点:当我们同一个队列有多个消费者的时候,多个消费者的数据之和才是原来队列中的所有数据
  • 队列的通信模型最大的适用场景:流量的消峰,高并发的处理

「主题的通信模型:」

  • 特点:当我们队列有多个消费者的时候,那么这多个消费者消费到的数据是一样的
  • 主题消费者通信模型的适用场景:微服务下服务之间的异步通信

3. MQ的实现产品

「实现产品:」

  • ActiveMQ
  • RabbitMQ
  • RockerMQ
  • Kafka(这个设计的初衷是做分布式的日志的,后来因为日志有严格的顺序问题,这个时候人们就用Kafka来做消息队列了)
4、JMS中常见的名词

「常见的名词:」

  • ActiveMQConnectionFactory:这个是创建连接的工厂
  • ConnectionFactory:连接的工厂
  • Connection:连接JAVA对MQ的一个连接
  • Destination:目的地
  • 生产者(Producer)
  • 消费者(Consumer)
  • Session:会话(每一次对MQ的操作都称为一次会话)
  • Queue:队列
  • Topic:主题
5、什么是消息队列

「消息队列简单的说就是用来存放临时数据的地方:」

  • 生产者----------->存储介质上
  • 消费者----------->存储介质上

「消息队列类似于快递公司:」

  • 你可以将东西交给快递公司
  • 目标人也可以从快递公司去取东西

6. ActiveMQ是什么

「含义:」

  • ActiveMQ就是一个JMS的实现产品,它能够实现JMS下的所有功能
7、ActiveMQ能干什么

「主要作用:」

  • 流量消峰处理
  • 微服务下模块的异步通信
  • 处理高并发下的订单
  • 处理第三方平台的高并发
  • 协助消息表可以完成分布式事务的最终一致性

8、ActiveMQ的安装

「ActiveMQ的安装和配置:」

代码语言:javascript
复制
 1、官网下载Linux版的ActiveMQ(最新版本为5.13.4)
       https://activemq.apache.org/download.html

       2、解压安装
      tar -zxvf apache-activemq-5.13.4-bin.tar.gz

       3、配置(这里采用默认配置,无需修改)
      vim /usr/lical/activemq-1/conf/activemq.xml

       4、启动
      cd /usr/local/activemq-1/bin
./activemq start

      5、打开管理界面(管理界面可以查看并管理所有队列及消息)
         http://192.168.1.100:8161

       启动成功后,可以浏览 http://localhost:8161/admin/
       默认用户名、密码:admin/admin
       管理界面是用jetty做容器的,如果想修改管理界面的端口,可以编辑../conf/jetty.xml,找到下面这一段:
      <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port       number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8161"/>
     </bean>
       用户名/密码是在 ../conf/jetty-realm.properties 里,比如要增加一个管理员jimmy/123456,可参考下面修改:
       1
2
3admin: admin, admin
jimmy: 123456, admin
user: user, user
       注:管理界面有一个小坑,ActiveMQ 5.13.2与jdk1.8兼容性有点问题,如果使用jdk1.8,管理界面进入Queues标签页时,偶尔会报错,但是并不影响消息正常收发,只是无法从界面上查看队列情况,如果出现该问题,可将jdk版本降至1.7,同时最好清空data目录下的所有数据,再重启activemq即可。

9. ActiveMQ的API的使用

「AcatveMQ的API使用:」

  • 队列的使用(生产者)
代码语言:javascript
复制
package com.qy.mq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Message;

import javax.jms.*;

/**
 * @Auther: qianyu
 * @Date: 2020/11/04 14:12
 * @Description:生产者
 */
public class Producer {

    //准备发布的这个地址
    private  static final String PATH="tcp://10.7.182.87:61616";
    //ActiveMQ下的用户名
    private static final String userName="admin";
    //ActiveMQ下的密码
    private static final String password="admin";

    public static void main(String[] args) throws JMSException {
        //第一步:创建连接的工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
        //通过这个工厂获取连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //第三步:打开这个连接
        connection.start();
        //第四步:创建操作MQ的这个会话
        /**
         * 第一个参数:是否使用事务
         * 第二个参数:客户端的应答模式
         *     第一种:自动应答
         *     第二种:客户端手动应答
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //需要发送消息的目的地(queue操作的是队列)
        Destination destination=session.createQueue("wqqq");
        //生产者来生产这个消息
        //要有生产者
        MessageProducer messageProducer = session.createProducer(destination);

        //发送很多的消息到消息队列中去
//        for (int i=0;i<100;i++){
            //需要准备发送的消息
//            TextMessage textMessage = session.createTextMessage("我是浅羽:"+i);
            //研究消息的类型

          /*  BytesMessage bytesMessage = session.createBytesMessage();
            bytesMessage.setByteProperty("www",new Byte("123"));

            //接下来就可以发送消息了
            messageProducer.send(bytesMessage);*/

        //创建map类型的message
        /*MapMessage mapMessage = session.createMapMessage();
        mapMessage.setInt("www1",123);

        messageProducer.send(mapMessage);*/

        ObjectMessage objectMessage = session.createObjectMessage(new User(1, "qianyu", "123"));
        messageProducer.send(objectMessage);

//        }
    }

}



  • 队列的使用(消费者)
代码语言:javascript
复制
package com.qy.mq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.Serializable;

/**
 * @Auther: qianyu
 * @Date: 2020/11/04 14:13
 * @Description:消费者
 */
public class Consumer {

    //准备发布的这个地址
    private  static final String PATH="tcp://10.7.182.87:61616";
    //ActiveMQ下的用户名
    private static final String userName="admin";
    //ActiveMQ下的密码
    private static final String password="admin";

    public static void main(String[] args) throws JMSException {
        //第一步:创建连接的工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
        //通过这个工厂获取连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //第三步:打开这个连接
        connection.start();
        //第四步:创建操作MQ的这个会话
        /**
         * 第一个参数:是否使用事务
         * 第二个参数:客户端的应答模式
         *     第一种:自动应答
         *     第二种:客户端手动应答
         */
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        //需要发送消息的目的地(queue操作的是队列)
        Destination destination=session.createQueue("wqqq");
        //创建我们的消费者了
        MessageConsumer messageConsumer = session.createConsumer(destination);
        //接下来就可以接收我们的消息了
        //Message message = messageConsumer.receive();

        //接收消息并指定这个超时的时间
//      Message message = messageConsumer.receive(5000);

        //接收消息没有就不等待了 直接over了  不接收了
//      Message message = messageConsumer.receiveNoWait();

        //给定当前的路径设置监听器
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {

               /* BytesMessage bytesMessage= (BytesMessage) message;

                try {
                    System.out.println("获取到的数据是:"+bytesMessage.getByteProperty("www"));
                } catch (JMSException e) {
                    e.printStackTrace();
                }*/


               /*MapMessage mapMessage= (MapMessage) message;

                try {
                    System.out.println("获取到的数据是:"+mapMessage.getInt("www1"));
                } catch (JMSException e) {
                    e.printStackTrace();
                }*/

               //测试对象类型的消息的发送和接收
               ObjectMessage objectMessage= (ObjectMessage) message;
                try {
                    User user = (User) objectMessage.getObject();

                    System.out.println("接收到的数据是:"+user);
                } catch (JMSException e) {
                    e.printStackTrace();
                }


              /*  //我们知道是一个字符串类型的消息
                TextMessage textMessage= (TextMessage) message;
                //接下来就可以打印这个消息了
                try {
                    System.out.println("消费者1---接收到的消息是:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }*/

                try {
                    //这句话就表示的是客户端来手动的进行应答
                    message.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
    }
}




  • 主题模型的生产者
代码语言:javascript
复制
package com.qy.mq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: qianyu
 * @Date: 2020/11/04 15:17
 * @Description:
 */
public class Producer {
    //准备发布的这个地址
    private  static final String PATH="tcp://10.7.182.87:61616";
    //ActiveMQ下的用户名
    private static final String userName="admin";
    //ActiveMQ下的密码
    private static final String password="admin";

    public static void main(String[] args) throws JMSException {
        //第一步:创建连接的工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
        //通过这个工厂获取连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //第三步:打开这个连接
        connection.start();
        //第四步:创建操作MQ的这个会话
        /**
         * 第一个参数:是否使用事务
         * 第二个参数:客户端的应答模式
         *     第一种:自动应答
         *     第二种:客户端手动应答
         */
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        //需要发送消息的目的地(下面创建的就应该是主题模型的地址)
        Destination destination=session.createTopic("topic222");
        //生产者来生产这个消息
        //要有生产者
        MessageProducer messageProducer = session.createProducer(destination);

        //发送很多的消息到消息队列中去
        for (int i=0;i<100;i++){
            //需要准备发送的消息
            TextMessage textMessage = session.createTextMessage("我是浅羽:"+i);
            //接下来就可以发送消息了
            messageProducer.send(textMessage);
        }
    }
}




  • 主题模型的消费者
代码语言:javascript
复制
package com.qy.mq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: qianyu
 * @Date: 2020/11/04 15:19
 * @Description:
 */
public class Consumer {

    //准备发布的这个地址
    private  static final String PATH="tcp://10.7.182.87:61616";
    //ActiveMQ下的用户名
    private static final String userName="admin";
    //ActiveMQ下的密码
    private static final String password="admin";

    public static void main(String[] args) throws JMSException {
        //第一步:创建连接的工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
        //通过这个工厂获取连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //第三步:打开这个连接
        connection.start();
        //第四步:创建操作MQ的这个会话
        /**
         * 第一个参数:是否使用事务
         * 第二个参数:客户端的应答模式
         *     第一种:自动应答
         *     第二种:客户端手动应答
         */
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        //需要发送消息的目的地(queue操作的是队列)
        Destination destination=session.createTopic("topic222");
        //创建我们的消费者了
        MessageConsumer messageConsumer = session.createConsumer(destination);
        //接下来就可以接收我们的消息了

        //给定当前的路径设置监听器
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {

                //我们知道是一个字符串类型的消息
                TextMessage textMessage= (TextMessage) message;
                //接下来就可以打印这个消息了
                try {
                    System.out.println("消费者1---接收到的消息是:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }

                try {
                    message.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
    }
}




本篇关于ActiveMQ的介绍就先到这里结束了,后续会出更多关于ActiveMQ系列更多文章,谢谢大家支持!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浅羽的IT小屋 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 1. JMS的概念?
      • 2、两种通信模型
        • 4、JMS中常见的名词
          • 5、什么是消息队列
            • 7、ActiveMQ能干什么
            相关产品与服务
            消息队列 CMQ 版
            消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档