前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rabbitmq 简单介绍,安装和go客户端使用

Rabbitmq 简单介绍,安装和go客户端使用

作者头像
张琳兮
发布2019-09-16 13:46:03
1K0
发布2019-09-16 13:46:03
举报
文章被收录于专栏:首富手记首富手记

Rabbitmq 简单介绍,安装和go客户端使用

1,消息队列介绍

1.1 什么是消息队列?

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。 消息队列,一般我们会简称他为MQ(Message Queue),消息队列可以简单的理解为:把要传输的数据放在队列中

image.png
image.png

说明:

  • Producer:消息生产者,负责产生和发送消息到 Broker;
  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;

1.2, 为什么要用消息队列?

1.2.1 应用结偶

比如在我们现在公司的业务常见中: 1,给客户打完电话之后我们需要根据通话录音进行打标签; 2,给客户打完电话之后我们需要给他发送短信 3,给客户打完电话之后我们需要发送他的通话给机器人,让机器人自学习 简单架构图如下:

image.png
image.png

如果没有消息队列,在A服务里面要写上3个API接口分别对应后面三个服务,突然有一天这个客户说我不需要发短信功能了,如果按照上面这种方式,我们就需要联系开发开始删代码,然后升级,刚升级好没几天,客户说我有要这个功能,那开发又要升级代码,这个时候开发集体离职了,(这每天干的完全是无用功) 但是如果有消息队列那就完全不一样了,就会变成下面这个样子:

image.png
image.png

A只需要写一个接口对接MQ了,后面不管是添加和删除都对A没有影响了,删除直接取消去消息就行了,大大减少了开发人员的工作量

1.2.2 异步处理

还拿上面那个场景来简述这个:A是公司的主要业务,打电话业务,BCD为非主要业务。 假设A调用BCD 接口需要50ms,那等A把所有接口调用完成之后需要150ms,对主业务消耗特别大,如果我们不用搭理BCD的处理,A直接把他交给消息队列,有消息队列去处理BCD,A只要把数据给消息队列就行了,那么A的压力就会很小,也不会影响主要业务流程,提高用户的

1.2.3 流量削峰

打个比方,我们目前有A B两个服务,A服务的OPS峰值为100W,但是B服务的OPS峰值只有10w,这个时候来了90w个请求,A服务能处理过来没问题,但是这个时候B服务直接就崩溃了

image.png
image.png

如果这个时候我们在A和B之间加一个rabbitmq,我们让B每次去取9w,这样B服务就不会挂了,

1.3,消费者怎么得到消息队列的数据?

  • 生产者将数据放到消息队列中,消息队列有数据了,主动叫消费者去拿(俗称push)
  • 消费者不断去轮训消息队列,看看有没有新的数据,如果有就消费(俗称pull)

2, RabbitMQ消息队列

2.1 RabbitMQ的优点

2.1.1 可靠性

RabbitMQ提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。

2.1.2 灵活的路由

消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用。

2.1.3 集群

在相同局域网内的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用

2.1.4 联合

对于服务器来说,他比集群需要更多的松散和非可靠链接,为此RabbitMQ提供了联合模型

2.1.5 高可用的队列

在同一个集群里,队列可以被镜像到多个机器中,以确保当前某些硬件出现故障后,你的消息仍然可以被使用

2.1.6 多协议

RabbitMQ支持多种消息协议的消息传递

2.1.7 广泛的客户端

只要是你能想到的编程语言几乎都有与其相适配的RabbitMQ客户端。

2.1.8 可视化管理工具

RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。

2.1.9 追踪

如果你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你能够发现问题所在。

2.1.10 插件系统

RabbitMQ附带了各种各样的插件来对自己进行扩展。你甚至也可以写自己的插件来使用。

2.2 RabbitMQ 的概念模型

所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

image.png
image.png

2.3 RabbitMQ基本概念

RabbitMQ基本流程图:

image.png
image.png
2.3.1 Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2.3.2 Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

2.3.3 Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

2.3.4 Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

2.3.5 Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

2.3.6 Connection

网络连接,比如一个TCP连接。

2.3.7 Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

2.3.8 Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

2.3.9 Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

2.3.10 Broker

表示消息队列服务器实体。

2.4 安装RabbitMQ

2.4.1 Docker 安装RabbitMQ

注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面。 我们访问docker镜像仓库然后查找RabbitMQ的镜像 找到我们合适的镜像版本

image.png
image.png

我们使用命令拉去

代码语言:javascript
复制
# docker pull rabbitmq:3.8-rc-management
# docker images
REPOSITORY                                                       TAG                 IMAGE ID            CREATED             SIZE
rabbitmq                                                         3.8-rc-management   90cce17c1af8        2 days ago          179MB

启动RabbitMQ:

代码语言:javascript
复制
docker run -d --hostname my-rabbit --name some-rabbit --net host -e RABBITMQ_DEFAULT_USER=zsf -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8-rc-management

参数解释:

代码语言:javascript
复制
--hostname 指定docker容器内部的名称
--name。    指定docker容器的名称
--net host。容器内部使用主机的网络
-e RABBITMQ_DEFAULT_USER 设置rabbitmq管理界面的用户
-e RABBITMQ_DEFAULT_PASS 设置rabbitmq管理界面用户的密码

查看日志是否启动成功:

代码语言:javascript
复制
# docker logs -f some-rabbit
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] drop_unroutable_metric
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] empty_basic_get_metric
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.743 [info] <0.8.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.743 [info] <0.8.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.757 [info] <0.261.0> ra: meta data store initialised. 0 record(s) recovered
2019-09-12 01:25:40.757 [info] <0.266.0> WAL: recovering []
2019-09-12 01:25:40.758 [info] <0.270.0>
 Starting RabbitMQ 3.8.0-rc.1 on Erlang 22.0.7
 Copyright (C) 2007-2019 Pivotal Software, Inc.
 Licensed under the MPL.  See https://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.8.0-rc.1. Copyright (C) 2007-2019 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See https://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...
2019-09-12 01:25:40.758 [info] <0.270.0>
 node           : rabbit@my-rabbit
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : mhZwmGv/TzyC2kVekZ7Yvg==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/rabbit@my-rabbit
2019-09-12 01:25:40.768 [info] <0.270.0> Running boot step pre_boot defined by app rabbit
2019-09-12 01:25:40.768 [info] <0.270.0> Running boot step rabbit_core_metrics defined by app rabbit
2019-09-12 01:25:40.769 [info] <0.270.0> Running boot step rabbit_alarm defined by app rabbit
2019-09-12 01:25:40.771 [info] <0.276.0> Memory high watermark set to 11998 MiB (12581668454 bytes) of 29997 MiB (31454171136 bytes) total
2019-09-12 01:25:40.774 [info] <0.278.0> Enabling free disk space monitoring
2019-09-12 01:25:40.774 [info] <0.278.0> Disk free limit set to 50MB
2019-09-12 01:25:40.776 [info] <0.270.0> Running boot step code_server_cache defined by app rabbit
2019-09-12 01:25:40.776 [info] <0.270.0> Running boot step file_handle_cache defined by app rabbit
2019-09-12 01:25:40.776 [info] <0.281.0> Limiting to approx 1048476 file handles (943626 sockets)
2019-09-12 01:25:40.776 [info] <0.282.0> FHC read buffering:  OFF
2019-09-12 01:25:40.776 [info] <0.282.0> FHC write buffering: ON
2019-09-12 01:25:40.777 [info] <0.270.0> Running boot step worker_pool defined by app rabbit
2019-09-12 01:25:40.777 [info] <0.271.0> Will use 4 processes for default worker pool
2019-09-12 01:25:40.777 [info] <0.271.0> Starting worker pool 'worker_pool' with 4 processes in it
2019-09-12 01:25:40.777 [info] <0.270.0> Running boot step database defined by app rabbit
2019-09-12 01:25:40.777 [info] <0.270.0> Node database directory at /var/lib/rabbitmq/mnesia/rabbit@my-rabbit is empty. Assuming we need to join an existing cluster or initialise from scratch...
2019-09-12 01:25:40.777 [info] <0.270.0> Configured peer discovery backend: rabbit_peer_discovery_classic_config
2019-09-12 01:25:40.777 [info] <0.270.0> Will try to lock with peer discovery backend rabbit_peer_discovery_classic_config
2019-09-12 01:25:40.777 [info] <0.270.0> Peer discovery backend does not support locking, falling back to randomized delay
2019-09-12 01:25:40.777 [info] <0.270.0> Peer discovery backend rabbit_peer_discovery_classic_config does not support registration, skipping randomized startup delay.
2019-09-12 01:25:40.777 [info] <0.270.0> All discovered existing cluster peers:
2019-09-12 01:25:40.777 [info] <0.270.0> Discovered no peer nodes to cluster with
2019-09-12 01:25:40.779 [info] <0.43.0> Application mnesia exited with reason: stopped
2019-09-12 01:25:40.807 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.825 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.825 [info] <0.270.0> Feature flag `drop_unroutable_metric`: supported, attempt to enable...
2019-09-12 01:25:40.825 [info] <0.270.0> Feature flag `drop_unroutable_metric`: mark as enabled=state_changing
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [~] drop_unroutable_metric
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] empty_basic_get_metric
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.841 [info] <0.270.0> Feature flag `drop_unroutable_metric`: mark as enabled=true
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] empty_basic_get_metric
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.857 [info] <0.270.0> Feature flag `empty_basic_get_metric`: supported, attempt to enable...
2019-09-12 01:25:40.857 [info] <0.270.0> Feature flag `empty_basic_get_metric`: mark as enabled=state_changing
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [~] empty_basic_get_metric
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.873 [info] <0.270.0> Feature flag `empty_basic_get_metric`: mark as enabled=true
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.890 [info] <0.270.0> Feature flag `implicit_default_bindings`: supported, attempt to enable...
2019-09-12 01:25:40.890 [info] <0.270.0> Feature flag `implicit_default_bindings`: mark as enabled=state_changing
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [~] implicit_default_bindings
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.906 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 0 retries left
2019-09-12 01:25:40.906 [info] <0.270.0> Feature flag `implicit_default_bindings`: mark as enabled=true
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.922 [info] <0.270.0> Feature flag `quorum_queue`: supported, attempt to enable...
2019-09-12 01:25:40.922 [info] <0.270.0> Feature flag `quorum_queue`: mark as enabled=state_changing
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [~] quorum_queue
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.932 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.938 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.939 [info] <0.270.0> Feature flag `quorum_queue`:   migrating Mnesia table rabbit_queue...
2019-09-12 01:25:40.943 [info] <0.270.0> Feature flag `quorum_queue`:   migrating Mnesia table rabbit_durable_queue...
2019-09-12 01:25:40.947 [info] <0.270.0> Feature flag `quorum_queue`:   Mnesia tables migration done
2019-09-12 01:25:40.947 [info] <0.270.0> Feature flag `quorum_queue`: mark as enabled=true
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] quorum_queue
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.963 [info] <0.270.0> Feature flag `virtual_host_metadata`: supported, attempt to enable...
2019-09-12 01:25:40.963 [info] <0.270.0> Feature flag `virtual_host_metadata`: mark as enabled=state_changing
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] quorum_queue
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [~] virtual_host_metadata
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.979 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.983 [info] <0.270.0> Feature flag `virtual_host_metadata`: mark as enabled=true
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] quorum_queue
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] virtual_host_metadata
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.999 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:41.018 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:41.018 [info] <0.270.0> Peer discovery backend rabbit_peer_discovery_classic_config does not support registration, skipping registration.
2019-09-12 01:25:41.018 [info] <0.270.0> Running boot step database_sync defined by app rabbit
2019-09-12 01:25:41.018 [info] <0.270.0> Running boot step feature_flags defined by app rabbit
2019-09-12 01:25:41.018 [info] <0.270.0> Running boot step codec_correctness_check defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step external_infrastructure defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_registry defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_auth_mechanism_cr_demo defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_queue_location_random defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_event defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_auth_mechanism_amqplain defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_auth_mechanism_plain defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_direct defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_fanout defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_headers defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_topic defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_mirror_queue_mode_all defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_mirror_queue_mode_exactly defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_mirror_queue_mode_nodes defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_priority_queue defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Priority queues enabled, real BQ is rabbit_variable_queue
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_queue_location_client_local defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_queue_location_min_masters defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step kernel_ready defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_sysmon_minder defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_epmd_monitor defined by app rabbit
2019-09-12 01:25:41.047 [info] <0.270.0> Running boot step guid_generator defined by app rabbit
2019-09-12 01:25:41.194 [info] <0.270.0> Running boot step rabbit_node_monitor defined by app rabbit
2019-09-12 01:25:41.194 [info] <0.518.0> Starting rabbit_node_monitor
2019-09-12 01:25:41.194 [info] <0.270.0> Running boot step delegate_sup defined by app rabbit
2019-09-12 01:25:41.195 [info] <0.270.0> Running boot step rabbit_memory_monitor defined by app rabbit
2019-09-12 01:25:41.195 [info] <0.270.0> Running boot step core_initialized defined by app rabbit
2019-09-12 01:25:41.195 [info] <0.270.0> Running boot step upgrade_queues defined by app rabbit
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: 1 to apply
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: Applying rabbit_variable_queue:move_messages_to_vhost_store
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: No durable queues found. Skipping message store migration
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: Removing the old message store data
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: All upgrades applied successfully
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_connection_tracking defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_connection_tracking_handler defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_exchange_parameters defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_mirror_queue_misc defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_policies defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_policy defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_queue_location_validator defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_quorum_memory_manager defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_vhost_limit defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_mgmt_reset_handler defined by app rabbitmq_management
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_mgmt_db_handler defined by app rabbitmq_management_agent
2019-09-12 01:25:41.232 [info] <0.270.0> Management plugin: using rates mode 'basic'
2019-09-12 01:25:41.233 [info] <0.270.0> Running boot step recovery defined by app rabbit
2019-09-12 01:25:41.233 [info] <0.270.0> Running boot step load_definitions defined by app rabbitmq_management
2019-09-12 01:25:41.233 [info] <0.270.0> Running boot step empty_db_check defined by app rabbit
2019-09-12 01:25:41.233 [info] <0.270.0> Adding vhost '/' (description: 'Default virtual host')
2019-09-12 01:25:41.238 [info] <0.559.0> Making sure data directory '/var/lib/rabbitmq/mnesia/rabbit@my-rabbit/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L' for vhost '/' exists
2019-09-12 01:25:41.241 [info] <0.559.0> Starting message stores for vhost '/'
2019-09-12 01:25:41.241 [info] <0.563.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_transient": using rabbit_msg_store_ets_index to provide index
2019-09-12 01:25:41.241 [info] <0.559.0> Started message store of type transient for vhost '/'
2019-09-12 01:25:41.242 [info] <0.566.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent": using rabbit_msg_store_ets_index to provide index
2019-09-12 01:25:41.242 [warning] <0.566.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent": rebuilding indices from scratch
2019-09-12 01:25:41.242 [info] <0.559.0> Started message store of type persistent for vhost '/'
2019-09-12 01:25:41.243 [info] <0.270.0> Creating user 'zsf'
2019-09-12 01:25:41.244 [info] <0.270.0> Setting user tags for user 'zsf' to [administrator]
2019-09-12 01:25:41.244 [info] <0.270.0> Setting permissions for 'zsf' in '/' to '.*', '.*', '.*'
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step rabbit_looking_glass defined by app rabbit
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step rabbit_core_metrics_gc defined by app rabbit
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step background_gc defined by app rabbit
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step connection_tracking defined by app rabbit
2019-09-12 01:25:41.246 [info] <0.270.0> Setting up a table for connection tracking on this node: 'tracked_connection_on_node_rabbit@my-rabbit'
2019-09-12 01:25:41.247 [info] <0.270.0> Setting up a table for per-vhost connection counting on this node: 'tracked_connection_per_vhost_on_node_rabbit@my-rabbit'
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step routing_ready defined by app rabbit
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step pre_flight defined by app rabbit
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step notify_cluster defined by app rabbit
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step networking defined by app rabbit
2019-09-12 01:25:41.295 [info] <0.612.0> started TCP listener on [::]:5672
2019-09-12 01:25:41.318 [info] <0.270.0> Running boot step cluster_name defined by app rabbit
2019-09-12 01:25:41.318 [info] <0.270.0> Running boot step direct_client defined by app rabbit
2019-09-12 01:25:41.359 [info] <0.662.0> Management plugin: HTTP (non-TLS) listener started on port 15672
2019-09-12 01:25:41.359 [info] <0.768.0> Statistics database started.
2019-09-12 01:25:41.359 [info] <0.767.0> Starting worker pool 'management_worker_pool' with 3 processes in it
 completed with 3 plugins.
2019-09-12 01:25:41.439 [info] <0.8.0> Server startup complete; 3 plugins started.
 * rabbitmq_management
 * rabbitmq_management_agent
 * rabbitmq_web_dispatch

查看端口是否启动完全:

代码语言:javascript
复制
# netstat -ntalp | grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      6423/beam.smp
tcp        0      0 0.0.0.0:15672           0.0.0.0:*               LISTEN      6423/beam.smp
tcp6       0      0 :::5672                 :::*                    LISTEN      6423/beam.smp
//15672 web界面管理接口
//5672  业务端口,客户端操作连接的端口

浏览器访问管理界面

2.5 RabbitMQ使用中的一些概念

在上面的RabbitMQ的基本流程图里面我们可以看到,RabbitMQ的整体工作流程是,生产者产生数据交给RabbitMQ,然后RabbitMQ通过Exchange更具规则来选择绑定到那个队列(Queues)中,然后消费者在到对应的队列里面去取数据,我们下面就来讲解下Exchange的四种类型

2.5.1 Exchange的四种类型
2.5.1.1 direct精准匹配
image.png
image.png

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

2.5.1.2 fanout 广播
image.png
image.png

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

2.5.1.3 topic 正则匹配
image.png
image.png

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。

2.5.1.4 headers 根据头部匹配(几乎不用)

2.6 使用go客户端操作rabbitMQ

2.6.1 生产者代码
代码语言:javascript
复制
package send

import (
    "log"
    "rabbitmq/src/github.com/streadway/amqp" //根据实际情况来定
    "strconv"
    "time"
)

//创建一个返回错误打印日志的函数
func FailOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}
func Send() {
    //打开一个连接
    conn, err := amqp.Dial("amqp://zsf:123456@172.16.1.194:5672")
    FailOnError(err, "failed to connect to RabbitMQ")
    defer conn.Close()

    //打开一个通道
    ch, err := conn.Channel()
    FailOnError(err, "failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test number",
        false,
        false,
        false,
        false,
        nil,
    )
    FailOnError(err, "Failed to declare a queue")

    for i := 0; i < 60; i++ {
        body := "hello, ZhangShouFu  " + strconv.Itoa(i)
        err = ch.Publish(
            "",
            q.Name,
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(body),
            })
        FailOnError(err, "Failed to publish a message")
        time.Sleep(1 * time.Second)
    }

}
2.6.2 消费者代码
代码语言:javascript
复制
package receive

import (
    "log"
    "rabbitmq/src/github.com/streadway/amqp"
    "rabbitmq/src/send"
    "time"
)

//创建一个返回错误打印日志的函数

func Receive() {
    conn, err := amqp.Dial("amqp://zsf:123456@172.16.1.194:5672/")
    send.FailOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    send.FailOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test number", // name
        false,         // durable
        false,         // delete when usused
        false,         // exclusive
        false,         // no-wait
        nil,           // arguments
    )
    send.FailOnError(err, "Failed to declare a queue")
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    send.FailOnError(err, "Failed to register a consumer")

    forever := make(chan bool)
    for i := 0; i < 60; i++ {
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
            }
        }()
        time.Sleep(5 * time.Second)
    }
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}
2.6.3 主函数
代码语言:javascript
复制
package main

import (
    "rabbitmq/src/receive"
    "rabbitmq/src/send"
)

func main() {
    send.Send()
    receive.Receive()
}

运行主函数之后,到rabbitmq管理界面能看到我们刚才创建的队列

image.png
image.png

参考: https://www.rabbitmq.com/ https://rabbitmq.mr-ping.com/installation/Installing_on_Debian_Ubuntu.html https://www.jianshu.com/p/79ca08116d57

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Rabbitmq 简单介绍,安装和go客户端使用
    • 1,消息队列介绍
      • 1.1 什么是消息队列?
      • 1.2, 为什么要用消息队列?
      • 1.3,消费者怎么得到消息队列的数据?
    • 2, RabbitMQ消息队列
      • 2.1 RabbitMQ的优点
      • 2.2 RabbitMQ 的概念模型
      • 2.3 RabbitMQ基本概念
      • 2.4 安装RabbitMQ
      • 2.5 RabbitMQ使用中的一些概念
      • 2.6 使用go客户端操作rabbitMQ
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档