前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rabbit-使用

Rabbit-使用

作者头像
Liusy
修改2020-09-01 17:25:55
6430
修改2020-09-01 17:25:55
举报
文章被收录于专栏:Liusy01

何为消息中间件?

     消息中间件是在消息的传输过程中保存消息的容器。消息中间件在将消息从它的源中传递到它的目标时充当中间人的作用。队列的主要目的是提供路由并保证消息的传递。可作为系统间的通信中间件。

 目前主流的消息中间件有Kafka、RabbitMQ、ActiveMQ、RocketMQ。今天梳理一下RabbitMQ的简单使用。

2

RabbitMQ简介

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

3

安装

1、由于RabbitMQ使用Erlang编写,所以需要先装Erlang环境。

(1)先获取Erlang安装包

代码语言:javascript
复制
wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

(2)利用rpm进行安装

代码语言:javascript
复制
rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm

安装完之后使用erl -version查看版本号,可以显示版本号,则说明安装成功

2、安装RabbitMQ

(1)使用wget获取包

代码语言:javascript
复制
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.4.1/rabbitmq-server-3.4.1-1.noarch.rpm

(2)安装

代码语言:javascript
复制
rpm -ive rabbitmq-server-3.4.1-1.noarch.rpm

安装之后查看版本

(3)设置开机启动

代码语言:javascript
复制
chkconfig rabbitmq-server on

(4)创建配置文件

代码语言:javascript
复制
1、进入/etc/rabbitmq目录
cd /etc/rabbitmq/
2、将默认目录中的配置文件拷贝至/etc/rabbitmq目录下 
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
3、然后将配置文件改名
mv rabbitmq.config.example rabbitmq.config

(5)开启远程访问

代码语言:javascript
复制
1、打开配置文件
vim rabbitmq.config
2、找到这一行
 %% {loopback_users, []},
3、将百分号和最后面逗号去除
{loopback_users, []}

(6)开启web界面管理工具

代码语言:javascript
复制
rabbitmq-plugins enable rabbitmq_management

开启之后可以用15672访问

【注:如开了防火墙,需开放此端口】

代码语言:javascript
复制
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEP
/etc/rc.d/init.d/iptables save

默认用户名密码都是guest

(7)启停命令

代码语言:javascript
复制
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart

4

角色及Virtual Host配置

一、角色

可在此页面添加用户,同时配置相应的角色,RabbitMQ提供了五种角色,分别是:  (1)超级管理员(administrator) 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。  (2)监控者(monitoring) 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)  (3)策略制定者(policymaker) 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。  (4)普通管理者(management) 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。  (5)其他 无法登陆管理控制台,通常就是普通的生产者和消费者。

我在这给每个角色创建一个用户

二、Virtual Host

像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。相当于mysql的db。Virtual Name一般以/开头。

(1)创建Virtual Host
(2)设置Virtual Host权限

1、在Virtual Host点击相应的Virtual Host

2、设置访问权限

  • user:用户名
  • configure :一个正则表达式,用户对符合该正则表达式的所有资源拥有 configure 操作的权限
  • write:一个正则表达式,用户对符合该正则表达式的所有资源拥有 write 操作的权限
  • read:一个正则表达式,用户对符合该正则表达式的所有资源拥有 read 操作的权限

5

工作队列模式

准备工作:

引入Rabbitmq的依赖

代码语言:javascript
复制
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
</dependency>

创建获取连接的公共类:

代码语言:javascript
复制
public class ConnectionUtils {
    public static Connection getConn() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connFactory = new ConnectionFactory();
        //设置Rabbitmq的host
        connFactory.setHost("192.168.197.100");
        //设置Rabbitmq的服务端口
        connFactory.setPort(5672);
        //设置虚拟主机名称
        connFactory.setVirtualHost("virtual_lsy_1");
        //设置用户名
        connFactory.setUsername("guest");
        //设置密码
        connFactory.setPassword("guest");
        //获取连接
        Connection connection = connFactory.newConnection();
        return connection;
    }
}

1、简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

生产者:

代码语言:javascript
复制
public class SimpleProduct {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConn();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列,(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
        channel.queueDeclare("queue_lsy_1",true,false,false,null);
        //创建消息
        String str = "这是一个消息";
        //消息发送(交换机[默认Default Exchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)
        channel.basicPublish("","queue_lsy_1",null,str.getBytes("utf-8"));
        //关闭资源
        channel.close();
        connection.close();
    }
}

消费者:

代码语言:javascript
复制
public class SimpleComsumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = ConnectionUtils.getConn();
        //创建通道
        final Channel channel = connection.createChannel();

        channel.queueDeclare("queue_lsy_1",true,false,false,null);

        //创建消费者
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * @param consumerTag 消费者标签,在channel.basicConsume时候可以指定
             * @param envelope 信封,消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties  属性信息(生产者的发送时指定)
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //获取routingkey
                String routingKey = envelope.getRoutingKey();
                //获取路由
                String exchange = envelope.getExchange();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                String msg = new String(body);
                System.out.println(
                        "routingKey:" + routingKey +
                                ",exchange:" + exchange +
                                ",deliveryTag:" + deliveryTag +
                                ",message:" + msg);

                //手动确认消息是否已被消费
//                channel.basicAck(deliveryTag,true);
            }
        };

        /**
         * 消息消费
         * 参数1:队列名称
         * 参数2:是否自动应答,true为自动应答[mq接收到回复会删除消息],设置为false则需要手动应答
         * 参数3:消息接收到后回调
         */
         channel.basicConsume("queue_lsy_1", true, consumer);
    }
}

需要注意的是,如果队列在RabbitMQ中不存在,启动消费者会报错。所以先要启动生产者创建队列,或者直接在页面上创建队列。

运行结果:

2、工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

工作队列模式就是简单模式,只不过是一个队列有多个消费者,一个消息只能被一个消费者消费。

3、发布订阅模式 Publish/subscribe 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列,消息可分发到各个队列中,一个消息可被多个消费者消费。

在发布订阅模型中,多了一个e(exchange)角色,而且过程略有变化。 P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机) C:消费者,消息的接受者,会一直等待消息到来。 Queue:消息队列,接收消息、缓存消息。 Exchange:交换机,图中的e。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

    Fanout:广播,将消息交给所有绑定到交换机的队列     Direct:定向,把消息交给符合指定routing key 的队列     Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列 Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

生产者:声明一个名为“fanout_exchange”的exchange,将消息发送到交换器中,发送10个消息给交换器中

代码语言:javascript
复制
public class FanoutProduct {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConn();
        Channel channel = conn.createChannel();
        channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
        for (int i = 0; i < 10; i++) {
            String str = "这是第"+i+"条信息";
            channel.basicPublish("fanout_exchange","",null,str.getBytes("utf-8"));
        }
        channel.close();
        conn.close();
    }
}

消费者1:声明一个队列fanout_queue_1,绑定fanout_exchange交换机

代码语言:javascript
复制
public class FanoutConsumer1 {


    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConn();
        Channel channel = conn.createChannel();

        channel.queueDeclare("fanout_queue_1",true,false,true,null);
         //队列名称,交换器名称,routing_key
        channel.queueBind("fanout_queue_1","fanout_exchange","");

        //创建消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
            }
        };
        channel.basicConsume("fanout_queue_1",true,consumer);
    }

}

消费者2与消费者1代码相同,只是队列名称不一致。

运行结果:

消费者1收到10个消息

消费者2收到10个消息

4、路由模式 Routing 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

生产者发送消息时指定routing key,比如A,交换机就会把消息发送到A的队列。

生产者:

代码语言:javascript
复制
public class RoutingProduct {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConn();
        Channel channel = conn.createChannel();
        channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
        for (int i = 0; i < 10; i++) {
            String routingKey = "";
            int i1 = i % 5;
            if (i1 == 0) {
                routingKey = "routing_key_a";
            } else {
                routingKey = "routing_key_b";
            }
            String str = "这是第"+i+"条信息";
            channel.basicPublish("routing_exchange",routingKey,null,str.getBytes("utf-8"));
        }
        channel.close();
        conn.close();
    }
}

如果i%5等于0,就发送到routingKey为routing_key_a绑定的队列下,否则就发送到routing_key_b的绑定队列下

消费者:队列routing_queue_1绑定routing_key_a,routing_queue_2绑定routing_key_b

代码语言:javascript
复制
public class RoutingConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConn();
        Channel channel = conn.createChannel();

        channel.queueDeclare("routing_queue_1",true,false,true,null);
        channel.queueBind("routing_queue_1","routing_exchange","routing_key_a");

        //创建消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
            }
        };

        channel.basicConsume("routing_queue_1",true,consumer);

    }

}


public class RoutingConsumer2 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConn();
        Channel channel = conn.createChannel();

        channel.queueDeclare("routing_queue_2",true,false,true,null);
        channel.queueBind("routing_queue_2","routing_exchange","routing_key_b");

        //创建消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
            }
        };

        channel.basicConsume("routing_queue_2",true,consumer);

    }

}

运行结果:

消费者1收到两条消息

消费者2收到8条消息 

5、通配符模式 Topic 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

通配符规则: #:匹配一个或多个词 *:匹配不多不少恰好1个词

比如:

a.A.b会匹配到*.A.*

B.Y会匹配到B.#

生产者:

代码语言:javascript
复制
public class TopicProduct {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConn();
        Channel channel = conn.createChannel();

        channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
        for (int i = 0; i < 5; i++) {
            String routingKey = "";
            int i1 = i % 2;
            if (i1 == 0) {
                routingKey = "topic.a";
            } else {
                routingKey = "topic.b";
            }
            String message = "这是第"+i+"条信息";
            channel.basicPublish("topic_exchange",routingKey,null,message.getBytes("utf-8"));
        }
        channel.close();
        conn.close();

    }

}

将i%2等于0的发送到与routing_key为topic.a绑定队列中,否则发送到与routing_key为topic.b绑定队列中。

消费者:

代码语言:javascript
复制
public class TopicConsumer1 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConn();
        Channel channel = conn.createChannel();

        channel.queueDeclare("topic_queue1",true,false,false,null);
        //队列绑定交换机与路由key
        channel.queueBind("topic_queue1", "topic_exchange", "topic.*");
        //创建消费者
        Consumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println(message);
            }
        };
        channel.basicConsume("topic_queue1",true,callback);

    }

}

public class TopicConsumer2 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConn();
        Channel channel = conn.createChannel();

        channel.queueDeclare("topic_queue2",true,false,false,null);

        //队列绑定路由key
        channel.queueBind("topic_queue2", "topic_exchange", "topic.b");
        //创建消费者
        Consumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println(message);
            }
        };
        channel.basicConsume("topic_queue2",true,callback);

    }

}

消费者1绑定的routing_key是topic.*,消费者 2绑定 的routing_key是topic.b。

运行结果:

生产者推送了5条消息

消费者1的routing_key匹配到所有消息。

消费者2的routing_key匹配到两条消息。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-07-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Liusy01 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • (1)创建Virtual Host
  • (2)设置Virtual Host权限
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档