一 RocketMQ介绍
rocketmq是阿里巴巴开源的一款分布式的消息中间件,他源于jms规范但是不遵守jms规范。对于分布式只一点,如果你了用过其他mq并且了解过rocketmq,就知道rocketmq天生就是分布式的,可以说是broker、provider、consumer等各种分布式。
二 RocketMQ优点:
1、 rmq去除对zk的依赖
2、 rmq支持异步和同步两种方式刷磁盘
3、 rmq单机支持的队列或者topic数量是5w
4、 rmq支持消息重试
5、 rmq支持严格按照一定的顺序发送消息
6、 rmq支持定时发送消息
7、 rmq支持根据消息ID来进行查询
8、 rmq支持根据某个时间点进行消息的回溯
9、 rmq支持对消息服务端的过滤
10、 rmq消费并行度:顺序消费 取决于queue数量,乱序消费 取决于consumer数量
三 启动RocketMQ
安装ROcketMQ请参考《RocketMQ在windows环境下的安装与配置》
启动namesrv
启动brokerserver
四 创建项目
(1)pom.xml文件如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>spring-cloud</groupId>
<artifactId>sc-rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>sc-rocketmq</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
(2)消费者
package test;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setInstanceName("rmq-instance");
consumer.subscribe("log-topic", "user-tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费者消费数据:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
(3)提供者
package test;
import java.io.Serializable;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import com.alibaba.fastjson.JSON;
/**
* @Function 消息生产者
*/
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("test-group");
producer.setNamesrvAddr("localhost:9876");
producer.setInstanceName("rmq-instance");
producer.start();
try {
for (int i = 0; i < 100; i++) {
User user = new User();
user.setLoginName("abc" + i);
user.setPwd(String.valueOf(i));
Message message = new Message("log-topic", "user-tag", JSON.toJSONString(user).getBytes());
System.out.println("生产者发送消息:" + JSON.toJSONString(user));
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
/**
* 发送用户消息
*/
static class User implements Serializable {
private String loginName;
private String pwd;
public String getLoginName() {
return loginName;
}
public void setLoginName(String loginName) {
this.loginName = loginName;
}
public String getPwd() {
return pwd;
}
public void setPwd(String pwd) {
this.pwd = pwd;
}
}
}
接下来先启动消费者,然后再启动生产者,看一下效果
生产者控制台发送消息:
消费者控制台消费消息
Spring Cloud 2.x系列之集成RocketMQ:
https://blog.csdn.net/qq_18603599/article/details/81172866
https://blog.csdn.net/wd2014610/article/details/81781109