前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ActiveMQ消息传递的两种方式

ActiveMQ消息传递的两种方式

作者头像
用户2146856
发布2018-05-18 15:52:56
7460
发布2018-05-18 15:52:56
举报
文章被收录于专栏:帅小子的日常帅小子的日常

1.什么是ActiveMQ?

  ActiveMQ是apache提供的开源的,实现消息传递的一个中间插件,可以和spring整合,是目前最流行的开源消息总线,ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。较相似的还有rabbitMQ和kafka等,都是最为消息传递的插件

2.ActiveMQ传递消息的两种方式

前提:需要引入activemq的jar包

点对点方式(PTP):一个消费者对应一个生产者

发布/订阅模式(Publish/Sub):一个生产者产生消息发送后,可以被多个消费者进行接收。

JMS定义了五种消息正文格式,以及消息的调用类型,允许发送和接收一些不同类型的数据,提供现有消息格式的一些级别的兼容性。

StreamMessage:--JAVA原始的数据流

TextMessage:一个字符串对象

ObjectMessage:一个系列化的java对象

BytesMessage:一个字节对象

MapMessage:key/value方式的键值对

(1)点对点的方式(PTP)

  即:一个消息的生产者对应一个消费者

生产者(Producer)实现步骤:

第一步:创建一个ConnectionFactory对象,将服务端activemq的 ip 和 port 作为构造参数传递

第二步:通过第一步创建的工厂对象获得连接对象Connection

第三步:开启连接,直接调用connection对象的start方法即可

第四步:创建一个Session对象,通过connection对象创建

第五步:通过Session对象创建一个Destination对象(该对象有两种方式:topic和quene),这里使用quene

第六步:通过Session对象创建一个生产者Producer对象

第七步:创建Message对象,这里使用TextMessage对象,设置消息内容

第八步:使用创建的生产者对象Producer发送消息

第九步:关闭资源(Producer对象,Connection对象,Session对象)

代码语言:javascript
复制
@Test
    public void testQueueProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        //brokerURL服务器的ip及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://ip地址:61616");
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
        //参数:队列的名称。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(queue);
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        /*TextMessage message = new ActiveMQTextMessage();
        message.setText("hello activeMq,this is my first test.");*/
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }

消费者实现:

第一步:创建一个ConnectionFactory对象,将服务端activemq的 ip 和 port 作为构造参数传递

第二步:通过第一步创建的工厂对象获得连接对象Connection

第三步:开启连接,直接调用connection对象的start方法即可

第四步:创建一个Session对象,通过connection对象创建

第五步:创建一个Destination对象,使用quene,需要和生产者的quene一致

第六步:创建一个消费者对象

第七步:接收消息

第八步:打印接收的消息

第九步:关闭资源

消费者的代码:

代码语言:javascript
复制
@Test
    public void testQueueConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(queue);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    //取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

(2)订阅发布方式传递消息:Topic  

补充:由于topic传递消息的特点是,一个生产者可以有多个消费者,生产者生产的消息在没有被消费者消费之前,并不会将消息持久化到activemq的服务端,发送的消息会自动消失。所以 测试的时候需要先创建消费者对象,然后在发送消息,防止消息丢失。

生产者实现步骤:

步骤和PTP的方式完全一样,不同的是在创建Destination对象的时候,需要创建topic对象

直接上代码:

代码语言:javascript
复制
@Test
    public void testTopicProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        // brokerURL服务器的ip及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。
        // 参数:话题的名称。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(topic);
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        /*
         * TextMessage message = new ActiveMQTextMessage(); message.setText(
         * "hello activeMq,this is my first test.");
         */
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }

消费者实现的步骤:

步骤和PTP消费者实现的步骤一样,唯一不同的是在创建Destination对象的时候,创建topic对象,同时要和发布订阅的生产者的topic一致

消费者代码:

代码语言:javascript
复制
@Test
    public void testTopicConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消费端03。。。。。");
        // 等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

总结:两种传递消息的方式的异同

相同点:实现步骤基本一样,大同小异

不同点:PTP传递消息的方法,消息的生产者发送以后,消息会持久化在activemq的服务端,如果该消息给消费者消费,在服务端持久化的消息也就同时被删除。

发布订阅传递消息的方法:消息的生产者发送消息以后,如果没有消费者消费,消息不会持久化在activemq的客户端,会立即消失。如果创建的消息被消费,会的activemq的服务端显示消息相关内容。这一点和PTP刚好相反。

注意:发布订阅传递消息的方式:也是可以实现消息持久化在服务端的,需要消费者首先在activemq的服务端订阅消息(注册),将消费者客户端的ID(作为唯一标识,因为可以有多个消费者)和消息的ID传递给服务端即可。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-04-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档