前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【消息队列之rabbitmq】学习RabbitMQ必备品之一

【消息队列之rabbitmq】学习RabbitMQ必备品之一

作者头像
沁溪源
发布2021-12-20 20:25:29
7380
发布2021-12-20 20:25:29
举报
文章被收录于专栏:沁溪源沁溪源

一、基础知识

对于RabbitMQ其他知识体系,本文中就不仔细讲解了,先列出系列核概念,帮助大家建立知识体系;

在这里插入图片描述
在这里插入图片描述

Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 Broker 表示消息队列服务器实体。

二、Rabbitmq消息发送模式

消息发送基本流程总结:生产者(Producer)发送->中间件->消费者(Consumer)接收消息

RabbitMQ包括五种队列模式,简单队列、工作队列、发布/订阅、路由、主题等。

1、简单队列

1)生产者将消息发送到队列,消费者从队列获取消息。 2)一个队列对应一个消费者。

在这里插入图片描述
在这里插入图片描述

2、工作队列

1)一个生产者,多个消费者。 2)一个消息发送到队列时,只能被一个消费者获取。 3)多个消费者并行处理消息,提升消息处理速度。 注意:channel.basicQos(1)表示同一时刻只发送一条消息给消费者。 消息模型在Web应用程序中特别有用,可以处理短的HTTP请求窗口中无法处理复杂的任务。

在这里插入图片描述
在这里插入图片描述

3、发布/订阅

将消息发送到交换机,队列从交换机获取消息,队列需要绑定到交换机。 1)一个生产者,多个消费者。 2)每一个消费者都有自己的一个队列。 3)生产者没有将消息直接发送到队列,而是发送到交换机。 4)每一个队列都要绑定到交换机。 5)生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的。 6)发布/订阅映射的交换机类型为“fanout”。 注意:交换机本身没有存储消息的能力,消息只能存储到队列中。

在这里插入图片描述
在这里插入图片描述

了解到这里,大家会熟悉了工作队列模式和发布/订阅模式,思考一下两者存在什么区别? 1、从模式图中,可以清楚明确到发布/订阅模式需要定义交换机exchange,而工作队列模式中并未明确定义交换机(那意味着底层没有使用交换机???不对。) 2、发布/订阅生产者是面向交换机发送消息; 工作队列模式生产者是面向队列发送消息(底层使用默认交换机)。 3、发布/订阅消费者需要设置队列和交换机的绑定; 工作队列中消费者开发者不需要设置绑定关系,底层会将队列绑定到默认的交换机; 第二点和第三点,在下面实践编码过程中,会明确定义其区别;

4、路由模式

1)路由模式的交换机类型为“direct”。 2)绑定队列到交换机时指定 key,即路由键,一个队列可以指定多个路由键。 3)生产者发送消息时指定路由键,这时,消息只会发送到绑定的key的对应队列中。

在这里插入图片描述
在这里插入图片描述

解释上图含义P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。 X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列 C1:消费者,其所在队列指定了需要routing key 为 error 的消息; C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

5、主题模式

1)每个消费者监听自己的队列,并且设置带统配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。 2)Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割; 3)主题模式的交换机类型为“topic” 4)通配符规则: #:匹配一个或多个词; *:匹配不多不少恰好1个词;

在这里插入图片描述
在这里插入图片描述

三、RabbitMQ交换机类型

1、Direct exchange

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

在这里插入图片描述
在这里插入图片描述

2、Fanout exchange

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

在这里插入图片描述
在这里插入图片描述

3、Topic exchange

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。

在这里插入图片描述
在这里插入图片描述

四、实战

针对以上知识体系,小编part仅针对部分进行演示,其他消息模式或者交换机类型,大家有兴趣的可以进行扩展;

1、运行环境

1、centos镜像或者已安装rabbitmq虚拟机、rabbitmq图形化界面; 2、JDK1.8+版本+maven; 3、依赖配置

代码语言:javascript
复制
<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.1</version>
        </dependency>

2、统一连接类

注意点:factory.setHost("****");配置成自己的服务器地址

代码语言:javascript
复制
package com.itwx.mq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author wangxuan
 * @date 2021/12/15 11:03 上午
 * @describe
 */
public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     *
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        //todo 配置成自己的服务器地址
        factory.setHost("****");
        //端口
        factory.setPort(5672);
        /**
         * 设置账号信息,用户名、密码、vhost
         */
        //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工厂获取连接
        return factory.newConnection();
    }

}

3、简单队列消息模式

此种消息模式,底层选择rabbitmq默认的交换机类型,无需开发者自定义交换机、以及交换机和队列的绑定关系; 面向队列编程 故开发者需要定义的角色:队列;

  • 生产者
代码语言:javascript
复制
package com.itwx.mq.basic;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @author wangxuan
 * @date 2021/12/15 11:12 上午
 * @describe 生产者发送消息
 */
public class ProviderSend {
//    private final static String EXCHANGE_NAME = "wx_test_exchange";
    private final static String QUEUE_NAME = "wx_test_queue";

    /**
     * 基本消息模式,暂时不定义交换器
     * @param argv
     * @throws Exception
     */
    public static void main(String[] argv) throws Exception {
        // 1、获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 2、从连接中创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        // 3、声明(创建)队列
        /**
         * 参数明细
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4、消息内容
        String message = "Hello rabbitmq!";
        // 向指定的队列中发送消息
        /**
         * 参数明细:
         * String exchange, String routingKey, BasicProperties props, byte[] body
         * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
         * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
         * 3、props,消息的属性
         * 4、body,消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("provider send:" + message);
        //关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
        try {
            channel.close();
            connection.close();
        } catch (Exception e) {

        }
    }
}
  • 消费者
代码语言:javascript
复制
package com.itwx.mq.basic;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author wangxuan
 * @date 2021/12/15 11:44 上午
 * @describe
 */
public class Consumer {
    private final static String QUEUE_NAME = "wx_test_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();
        // 声明队列
        /**
         * 参数明细
         * 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //实现消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            /**
             * 当接收到消息后此方法将被调用
             * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
             * @param envelope 信封,通过envelope
             * @param properties 消息属性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //TODO 手动抛异常,造成消息丢失现象
                //int i= 1 / 0;
                //交换机
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 即消息体
                String msg = new String(body,"utf-8");
                System.out.println("consumer receive message:" + msg + ",messageId:" + deliveryTag + ",exchange name:" + exchange);
            }
        };

        // 监听队列,第二个参数:是否自动进行消息确认。
        //参数:String queue, boolean autoAck, Consumer callback
        /**
         * 参数明细:
         * 1、queue 队列名称
         * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
         * 3、callback,消费方法,当消费者接收到消息要执行的方法
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

4、发布/订阅消息模式—交换机fanout类型

发布订阅上面基础概念我们介绍其与工作队列消息模式的区别,故面向交换机编程; 开发者需要定义的角色:交换机、队列、路由key、交换机和队列之间的绑定;

  • 生产者
代码语言:javascript
复制
package com.itwx.mq.fanout;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

/**
 * @author wangxuan
 * @date 2021/12/15 5:03 下午
 * @describe
 */
public class ProviderFanout {

    private final static String EXCHANGE_NAME = "test_fanout_exchange";

    public static void main(String[] args) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明exchange,指定类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 消息内容
        String message = "hello rabbitmq, fanout type message!";
        // 发布消息到Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println("provider fanout send message:" + message);

        channel.close();
        connection.close();
    }

}
  • 消费者1
代码语言:javascript
复制
package com.itwx.mq.fanout;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author wangxuan
 * @date 2021/12/15 5:11 下午
 * @describe
 */
public class EmailFanoutConsumer {
    private final static String QUEUE_NAME = "test_email_fanout_queue";//邮件队列

    private final static String EXCHANGE_NAME = "test_fanout_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /**
         * 思考一个问题,如果队列未绑定交换机会发生什么?
         */
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println("email received : " + msg + "!");
            }
        };
        // 监听队列,自动返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}
  • 消费者2
代码语言:javascript
复制
package com.itwx.mq.fanout;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author wangxuan
 * @date 2021/12/15 5:03 下午
 * @describe
 */
public class SmsFanoutConsumer {
    private final static String QUEUE_NAME = "test_sms_fanout_queue";//短信队列

    private final static String EXCHANGE_NAME = "test_fanout_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println("sms received : " + msg + "!");
            }
        };
        // 监听队列,自动返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

代码存在一些思考,大家可以验证一下,避免做伸手党!

5、路由模式—交换机direct类型

  • 生产者
代码语言:javascript
复制
package com.itwx.mq.direct;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @author wangxuan
 * @date 2021/12/15 5:56 下午
 * @describe
 */
public class ProviderDirect {
    private final static String EXCHANGE_NAME = "test_direct_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明exchange,指定类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 消息内容,
        String message = "欢迎您使用rabbitmq消息中间件,如有疑问及时反馈";
        // 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息
        channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
  • 消费者1
代码语言:javascript
复制
package com.itwx.mq.direct;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author wangxuan
 * @date 2021/12/15 5:57 下午
 * @describe
 */
public class EmailDirectConsumer {
    private final static String QUEUE_NAME = "direct_exchange_queue_email";//邮件队列
    private final static String EXCHANGE_NAME = "test_direct_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收发送方指定routing key为email的消息

        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" email received : " + msg + "!");
            }
        };
        // 监听队列,自动ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}
  • 消费者2
代码语言:javascript
复制
package com.itwx.mq.direct;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author wangxuan
 * @date 2021/12/15 5:57 下午
 * @describe
 */
public class SmsDirectConsumer {

    private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信队列
    private final static String EXCHANGE_NAME = "test_direct_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收发送方指定routing key为sms的消息
        //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");

        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" sms received : " + msg + "!");
            }
        };
        // 监听队列,自动ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

6、主题模式—交换机topic类型

  • 生产者
代码语言:javascript
复制
package com.itwx.mq.topic;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @author wangxuan
 * @date 2021/12/15 6:21 下午
 * @describe
 */
public class ProviderTopic {
    private final static String EXCHANGE_NAME = "test_topic_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明exchange,指定类型为topic
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 消息内容
        String message = "发送手机验证码,登录信息已发送邮件通知";
        // 发送消息,并且指定routing key为:quick.orange.rabbit
        channel.basicPublish(EXCHANGE_NAME, "phone.verification_code.email.login.notice", null, message.getBytes());
        System.out.println(" [动物描述:] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
  • 消费者1
代码语言:javascript
复制
package com.itwx.mq.topic;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author wangxuan
 * @date 2021/12/15 6:21 下午
 * @describe
 */
public class TopicConsumerOne {
    private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
    private final static String EXCHANGE_NAME = "test_topic_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机,同时指定需要订阅的routing key
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.verification_code.#");

        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者1] received : " + msg + "!");
            }
        };
        // 监听队列,自动ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
  • 消费者2
代码语言:javascript
复制
package com.itwx.mq.topic;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author wangxuan
 * @date 2021/12/15 6:22 下午
 * @describe
 */
public class TopicConsumerTwo {
    private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
    private final static String EXCHANGE_NAME = "test_topic_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机,同时指定需要订阅的routing key
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.email");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#.login.#");

        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者2] received : " + msg + "!");
            }
        };
        // 监听队列,自动ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

五、Rabbit Management

对于上面的实战,大家需要结合图形化界面验证更便于理解,而且在日常开发过程也方便排查问题,故小编顺便简单介绍一下Rabbit Management! 登录地址:http://******:15672 账户密码:一般均是默认值guest/guest

1、connections管理

介绍各个服务器的连接信息;

2、channel管理

建立在连接基础上的通道,实际开发中链接应为全局变量,通道为线程级;

3、exchange管理

如图:大家可以进行查询已定义的交换机信息;

在这里插入图片描述
在这里插入图片描述

如果想要在图形化界面新增交换机,如图:

在这里插入图片描述
在这里插入图片描述

里面的参数,与编码中的含义一致,根据自己需求设定交换机类型、是否持久化、自动删除等;

Queue管理

在这里插入图片描述
在这里插入图片描述

点击具体队列名字,进入详情设置:绑定交换机、路由key,以及消息内容等;

在这里插入图片描述
在这里插入图片描述

其他的细节属性,大家可以仔细摸索,简单的英文名字,很明确见名知其义;

六、参考资料

1、rabbitmq基础概念知识 2、rabbitmq官网 3、最详细的RabbitMQ介绍

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-12-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、基础知识
  • 二、Rabbitmq消息发送模式
    • 1、简单队列
      • 2、工作队列
        • 3、发布/订阅
          • 4、路由模式
            • 5、主题模式
            • 三、RabbitMQ交换机类型
              • 1、Direct exchange
                • 2、Fanout exchange
                  • 3、Topic exchange
                  • 四、实战
                    • 1、运行环境
                      • 2、统一连接类
                        • 3、简单队列消息模式
                          • 4、发布/订阅消息模式—交换机fanout类型
                            • 5、路由模式—交换机direct类型
                              • 6、主题模式—交换机topic类型
                              • 五、Rabbit Management
                                • 1、connections管理
                                  • 2、channel管理
                                    • 3、exchange管理
                                      • Queue管理
                                      • 六、参考资料
                                      相关产品与服务
                                      消息队列 CMQ 版
                                      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                                      领券
                                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档