摘要: 原创出处 http://www.iocoder.cn/RocketMQ/install/ 「芋道源码」欢迎转载,保留摘要,谢谢!
推荐阅读如下 RocketMQ 文章:
在开始搭建 RocketMQ 服务之前,我们先来对它做下简单的了解。
RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于 2017 年 9 月 25 日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。
如下是 RocketMQ 产生的原因:
淘宝内部的交易系统使用了淘宝自主研发的 Notify 消息中间件,使用 MySQL 作为消息存储媒介,可完全水平扩容。 为了进一步降低成本,我们认为存储部分可以进一步优化。2011 年初,Linkin 开源了 Kafka 这个优秀的消息中间件,淘宝中间件团队在对 Kafka 做过充分 Review 之后, Kafka 无限消息堆积,高效的持久化速度吸引了我们。 但是,同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用 Java 语言编写了 RocketMQ ,定位于非日志的可靠消息传输(日志场景也 OK)。 目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binglog 分发等场景。
在开始之前,胖友先认真阅读如下两个文档:
如下图所示:
整体流程
目前 RocketMQ 4 的中文文档很少,所以英文不太好的胖友,后续推荐看看如下资料:
可以参考 《Apache RocketMQ —— Quick Start》 文章。
本小节,我们会部署一套 RocketMQ 最小化的单机环境,包括一个 RocketMQ Namesrv 和 Broker 服务。部署完成之后,我们会测试消息的发送与消费。下面,让我们逐步开始。
需要安装如下软件:
因为我们准备直接编译 RocketMQ 源码,构建出 RocketMQ 软件包。
打开 RocketMQ release_notes 页面,我们可以看到 RocketMQ 所有的发布版本。这里,我们选择最新的 RocketMQ 4.6.0 版本。点击进入该版本的发布页面后,我们可以看到两种发布版本:
一般情况下,我们可以直接使用 Binary 版本,它是 RocketMQ 已经编译好,可以直接使用的 RocketMQ 软件包。
这里,我们想带着胖友们编译一次 RocketMQ 源码,所以使用 Source 版本。下面,我们开始下载 RocketMQ 4.6.0 Source 源码。命令行操作如下:
# 下载
$ wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source-release.zip
# 解压
$ unzip rocketmq-all-4.6.0-source-release.zip
使用 Maven 编译 RocketMQ 源码。命令行操作如下:
# 进入 RocketMQ 源码目录
$ cd rocketmq-all-4.6.0-source-release
# Maven 编译 RocketMQ ,并跳过测试。耐心等待...
$ mvn -Prelease-all -DskipTests clean install -U
编译完成,在我们进入 distribution 目录下,就可以看到 RocketMQ 的发布包了。命令行操作如下:
# 进入 distribution 目录下
$ cd distribution/target/rocketmq-4.6.0/rocketmq-4.6.0
# 打印目录
$ ls
40 -rwxr-xr-x 1 yunai staff 17336 Nov 19 20:59 LICENSE
8 -rwxr-xr-x 1 yunai staff 1338 Nov 19 20:59 NOTICE
16 -rwxr-xr-x 1 yunai staff 4225 Nov 19 20:59 README.md
0 drwxr-xr-x 6 yunai staff 192 Dec 3 12:48 benchmark # 性能基准测试
0 drwxr-xr-x 30 yunai staff 960 Nov 19 20:59 bin # 执行脚本
0 drwxr-xr-x 12 yunai staff 384 Nov 19 20:59 conf # 配置文件
0 drwxr-xr-x 36 yunai staff 1152 Dec 3 12:48 lib # RocketMQ jar 包
启动一个 RocketMQ Namesrv 服务。命令行操作如下:
nohup sh bin/mqnamesrv &
启动完成后,查看日志。
# 查看 Namesrv 日志。
$ tail -f ~/logs/rocketmqlogs/namesrv.log
2019-12-03 12:58:04 INFO main - The Name Server boot success. serializeType=JSON
~/logs/rocketmqlogs/namesrv.log
。如果想要自定义,可以通过 conf/logback_namesrv.xml
配置文件来进行修改。在 conf
目录下,RocketMQ 提供了多种 Broker 的配置文件:
broker.conf
:单主,异步刷盘。2m/
:双主,异步刷盘。2m-2s-async/
:两主两从,异步复制,异步刷盘。2m-2s-sync/
:两主两从,同步复制,异步刷盘。dledger/
:Dledger 集群,至少三节点。这里,我们只启动一个 RocketMQ Broker 服务,所以使用 broker.conf
配置文件。命令行操作如下:
nohup sh bin/mqbroker -c conf/broker.conf -n 127.0.0.1:9876 &
-c
参数,配置读取的主 Broker 配置。-n
参数,设置 RocketMQ Namesrv 地址。bin/runbroker.sh
脚本,将 Broker JVM 内存调小。如下:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"
启动完成后,查看日志。
tail -f ~/logs/rocketmqlogs/broker.log
2019-12-03 14:27:07 INFO main - The broker[broker-a, 192.168.3.44:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
~/logs/rocketmqlogs/broker.log
。如果想要自定义,可以通过 conf/logback_broker.xml
配置文件来进行修改。? 至此,我们已经完成了 RocketMQ 单机部署。下面,我们开始进行下消息的发送和消费的测试。
通过使用 bin/tools.sh
工具类,实现测试发送消息。命令行操作如下:
# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876
# 执行生产者 Producer 发送测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
如果发送成功,我们会看到大量成功的发送日志。
SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F100F4, offsetMsgId=C0A8032C00002A9F000000000000D7EE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=61]
SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F200F5, offsetMsgId=C0A8032C00002A9F000000000000D8D1, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=61]
sendStatus=SEND_OK
状态,说明消息都发送成功了。通过使用 bin/tools.sh
工具类,实现测试消费消息。命令行操作如下:
# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876
# 执行消费者 Consumer 消费测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
如果消费成功,我们会看到大量成功的消费日志。
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=131, sysFlag=0, bornTimestamp=1575354513732, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513733, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001D1FC, commitLogOffset=119292, bodyCRC=1549304357, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867104, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E944020E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 54], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=130, sysFlag=0, bornTimestamp=1575354513729, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513729, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001CE70, commitLogOffset=118384, bodyCRC=1530218044, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867103, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E941020A, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 50], transactionId='null'}]]
ConsumeMessageThread_4
和 ConsumeMessageThread_3
线程名,我们可以看出,目前是进行并发消费消息。在生产环境下,必须搭建 RocketMQ 高可用集群,不然简直是找死。艿艿有个项目抠门了下,只搭建了一主一从,在一次主挂掉之后,因为 RocketMQ 不支持主从切换,就发生了线上事故。一般 RocketMQ 的集群部署方案推荐如下:
因为在 《性能测试 —— RocketMQ 基准测试》 的 「5. 搭建集群」 小节中,我们已经详细描述了如何搭建一个一主一从的 RocketMQ 单集群。胖友可以参考该文,搭建一个二主两从的 RocketMQ 双集群。?
下面,艿艿额外放送下 RocketMQ 实现高可用的原理。感兴趣的胖友,可以瞅一瞅。
RocketMQ 集群
? 1. Producer
? 2. Consumer
? 3. Namesrv
? 4. Broker
在 RocketMQ 拓展项目(rocketmq-externals) 中,包含了 RocketMQ Console 项目,是 RocketMQ 的图形化管理控制台,提供 Broker 集群信息查看,Topic 管理,Producer、Consumer 信息展示,消息查询等等常用功能。
虽然说,我们也可以使用 RocketMQ 提供的 CLI Admin Tool 工具,实现上述的查询与管理的功能,但是命令行的方式对操作人员的要求稍高一些。当然,在 RocketMQ Console 无法满足我们更精细化的管理的需求的时候,我们还是会使用 CLI Admin Tool 工具。
下面,让我们来搭建一个 RocketMQ Console 控制台。
将 rocketmq-externals 仓库的代码,克隆到本地。操作流程如下:
# 克隆代码
$ git clone https://github.com/apache/rocketmq-externals.git
# 进入 Console 目录
$ cd rocketmq-console
如果胖友需要自定义 RocketMQ Console 的配置,可以进入该项目下的 src/main/resources/
目录下,进行相应的配置文件修改。例如说,设置 RocketMQ Namesrv 地址,开启 RocketMQ Console 的登陆访问。
这里,我们修改 src/main/resources/application.properties
配置文件,通过设置 rocketmq.config.namesrvAddr=127.0.0.1:9876
配置项,设置 RocketMQ Namesrv 的地址。
使用 Maven 编译 RocketMQ Console 源码。命令行操作如下:
# 编译
$ mvn clean package -Dmaven.test.skip=true
直接以 jar
的方式,启动控制台。注意,控制台使用 8080 端口。命令行操作如下:
nohup java -jar target/rocketmq-console-ng-1.0.1.jar &
启动完成后,查看日志。
$ tail -f nohup.out
[2019-12-03 16:05:19.349] INFO Tomcat started on port(s): 8080 (http)
[2019-12-03 16:05:19.354] INFO Started App in 5.341 seconds (JVM running for 6.104)
使用浏览器,访问 http://127.0.0.1:8080/ 地址,我们就可以看到 RocketMQ Console 的界面。如下图:
更多的使用指南,胖友可以后续看看 《RocketMQ Console —— 使用文档》 。
本小节,我们来看看如何使用生产者 Producer 发送消息,和消费者 Consumer 消费消息。
在 Rocketmq 仓库的 example 目录下,提供了 RocketMQ 示例。本小节,我们主要来看看 qucikstart 这个最简示例。
Producer 类,提供生产者 Producer 发送消息的最简示例。代码如下:
// Producer.java
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/*
* Instantiate with a producer group name.
*/
// <1.1> 创建 DefaultMQProducer 对象
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// <1.2> 设置 RocketMQ Namesrv 地址
producer.setNamesrvAddr("127.0.0.1:9876");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Launch the instance.
*/
// <1.3> 启动 producer 生产者
producer.start();
for (int i = 0; i < 1000; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
// <2.1> 创建 Message 消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
// <2.2> 同步发送消息
SendResult sendResult = producer.send(msg);
// <2.3> 打印发送结果
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
// <3> 关闭 producer 生产者
producer.shutdown();
}
}
<1>
处,初始化一个 Producer 生产者。<1.1>
处,创建 DefaultMQProducer 对象,这里设置的生产者分组是 "please_rename_unique_group_name"
。<1.2>
处,设置 设置 producer
的 RocketMQ Namesrv 地址。这里,是艿艿额外添加的代码。<1.3>
处,启动 producer
生产者。<2>
处,使用 Producer 发送 1000 条消息。<2.1>
处,创建 Message 消息。这里设置了其 Topic 为 "TopicTest"
,Tag 为 TagA
、消息体 Body 为 "Hello RocketMQ"
的二进制数组。<2.2>
处,调用生产者的 #send(Message msg)
方法,同步发送消息,等待发送结果。RocketMQ Producer 一共有三种发送消息的方式,除了我们这里看到的同步发送消息之外,还有异步发送消息(可见 AsyncProducer 示例),和 Oneway 发送消息。<2.3>
处,打印发送结果。<3>
处,关闭 producer
生产者。执行 #main(args)
方法,开始发送消息。在控制台上,可以看到如下内容:
# 发送日志,省略另外 999 条日志
SendResult [sendStatus=SEND_OK, msgId=240E00E0F0931BB3FCEC071C1CDE61A8000018B4AAC20E79E06A03E7, offsetMsgId=C0A82BF000002A9F000000000008EE72, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=645]
# 关闭 Producer 日志
19:27:48.339 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
19:27:48.340 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.43.240:10911] result: true
Consumer 类,提供消费者 Consumer 消费消息的最简示例。代码如下:
// Consumer.java
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Instantiate with specified consumer group name.
*/
// <1> 创建 DefaultMQPushConsumer 对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// <2> 设置 RocketMQ Namesrv 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
// <3> 设置消费进度,从 Topic 最初位置开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume.
*/
// <4> 订阅 TopicTest 主题
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
// <5> 添加消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
// <6> 启动 producer 消费者
consumer.start();
// 打印 Consumer 启动完成
System.out.printf("Consumer Started.%n");
}
}
<1>
处,创建 DefaultMQPushConsumer 对象,这里设置的消费者分组是 "please_rename_unique_group_name"
。注意,消费者分组的概念:
FROM 概念(Concept)
同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic 。
RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。<2>
处,设置 consumer
的 RocketMQ Namesrv 地址。这里,是艿艿额外添加的代码。<3>
处,设置一个新的消费集群,初始的消费进度。目前有三个选项:CONSUME_FROM_FIRST_OFFSET
:每个 Topic 队列的第一条消息。CONSUME_FROM_LAST_OFFSET
:每个 Topic 队列的最后一条消息。CONSUME_FROM_TIMESTAMP
:每个 Topic 队列的指定时间开始的消息。<4>
处,设置订阅 "TopicTest"
主题的消息。有一定一定要注意!!!消费者组的消费者实例必须订阅完全相同的 Topic + Tag 。<5>
处,添加消息监听器。这里我们采用的是 MessageListenerConcurrently 并发消费消息的监听器。如果胖友需要实现顺序消费消息,需要使用 MessageListenerOrderly 顺序消费的监听器。<6>
处,启动 consumer
消费者。此时,Consumer 就开始正式的消费消息啦。。执行 #main(args)
方法,开始消费消息。在控制台上,可以看到如下内容:
# 消费者启动成功
Consumer Started.
# 消费内容
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=225, queueOffset=645, sysFlag=0, bornTimestamp=1575373846053, bornHost=/192.168.43.240:52717, storeTimestamp=1575373846058, storeHost=/192.168.43.240:10911, msgId=C0A82BF000002A9F000000000008EF55, commitLogOffset=585557, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=646, CONSUME_START_TIME=1575373846067, UNIQ_KEY=240E00E0F0931BB3FCEC071C1CDE61A8000018B4AAC20E8EE6250000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=3, storeSize=225, queueOffset=645, sysFlag=0, bornTimestamp=1575373846060, bornHost=/192.168.43.240:52717, storeTimestamp=1575373846061, storeHost=/192.168.43.240:10911, msgId=C0A82BF000002A9F000000000008F036, commitLogOffset=585782, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=646, CONSUME_START_TIME=1575373846067, UNIQ_KEY=240E00E0F0931BB3FCEC071C1CDE61A8000018B4AAC20E8EE62C0001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]
ConsumeMessageThread_1
和 ConsumeMessageThread_2
线程名,我们可以看出,目前是进行并发消费消息。在 《芋道 Spring Boot 分布式消息队列 RocketMQ 入门》 中,我们来详细学习如何在 Spring Boot 中,整合并使用 RocketMQ 。? 会方便很多。
在如下的文章中,我们来详细学习如何在 Spring Cloud 中,整合并使用 RocketMQ 。? 更加方便。
至此,我们已经完成了 RocketMQ 的入门。个人建议的话,对于初学 RocketMQ 的胖友,一定要认真仔细去读读 「1.4 更多文档」 推荐的内容,艿艿一路踩着“坑”过来,希望胖友能够走的平稳一些。很多时候,我们踩坑的原因,是因为我们没有认真仔细阅读相关的文档,在没有完全入门的情况下,匆匆忙忙将一个中间件就上线了。
这里,在额外推荐一些内容: