广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息都会被发送到Consumer Group中的每个Consumer进行消费;
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊同一个Topic的消息。即每条消息只会被发送到Consumer Group中的某个Consumer;
Rocketmq提供三种方式可以发送普通消息:同步、异步、和单向发送:
发送方式对比:发送吞吐量,单向>异步>同步,但单向发送可靠性差存在丢失消息可能,选型根据实际需求确定;
private void sync() throws Exception {
//创建消息
Message message = new Message("topic_family", (" 同步发送 ").
getBytes());
//同步发送消息
SendResult sendResult = producer.send(message);
log.info("Product-同步发送-Product信息={}", sendResult);
}
/**
* 2、异步发送消息
*/
private void async() throws Exception {
//创建消息
Message message = new Message("topic_family", (" 异步发送 ").
getBytes());
//异步发送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Product-异步发送-输出信息={}", sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
//补偿机制,根据业务情况进行使用,看是否进行重试
}
});
}
/**
* 3、单项发送消息
*/
private void oneWay() throws Exception {
//创建消息
Message message = new Message("topic_family", (" 单项发送 ").
getBytes());
//同步发送消息
producer.sendOneway(message);
}
消息的key是业务开发同学在发送消息之前自行指定的,通常会把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等;按Message Key查询消息,主要是基于RocketMQ的IndexFile索引文件来实现的;RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现;
查询过程:
按照Message Key查询消息的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。
MsgId总共16字节,包含消息存储主机地址,消息Commit Log offset,从MsgId中解析出Broker的地址和Commit Log的偏移地址,然后挄照存储格式所在位置消息buffer解析成一个完整的消息;Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID);Broker端走的是QueryMessageProcessor,读取消息的过程用其中的commitLog offset和size去commitLog中找到真正的记录并解析成一个完整的消息返回;
//只订阅的消息TagA或TagC
consumer.subscribe("TagFilterTest","TagA || TagC");
基于 sql92 表达式消息过滤,其实是对消息的属性运用 sql 过滤表达式进行条件匹配,所以消息发送时应该调用 putUserProperty 方法设置消息属性:
//只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest",MessageSelector.bySql("a between 0 and 3");
// 使用 MessageFilter实现类,在服务器做消息过滤
String filterCode = MixAll.file2String("MessageFilterImpl.java");
consumer.subscribe("FilterTopicTest", "com.zqh.demo.filter.MessageFilterImpl",
filterCode);
RocketMQ的消息过滤方式有别于其他消息中间件,是在订阅时,再做过滤;
由于在消息服务端进行消息过滤是匹配消息tag的hashcode,导致服务端过滤并不十分准确,从服务端返回的消息最终不一定是消息消费者订阅的消息,导致服务端过滤并不十分准确,这样也会造成网络带宽的浪费(可使用基于MessageFilter实现类模式的消息过滤);
顺序消息分为全局顺序消息与部分顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可;顺序消费实际上有两个核心点,一个是生产者有序存储,另一个是消费者有序消费;
RocketMQ中生产者生产的消息会放置在某个队列中,基于队列先进先出的特性天然的可以保证存入队列的消息顺序和拉取的消息顺序是一致的,因此我们只需要保证一组相同的消息按照给定的顺序存入同一个队列中,就能保证生产者有序存储;
普通发送消息的模式下,生产者会采用轮询的方式将消费均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用RocketMQ带来的队列有序特性来保证消息有序性了;
RockerMQ的MessageListener回调函数提供了两种消费模式,有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently:
RocketMQ事务不同于Kafka事务,它是基于2PC的方案实现的分布式事务,分两阶段提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,两阶段提交 + 回查(RMQ事务消息是对经典的2PC分布式事务的实现):
半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于暂时不可被消费状态,该状态的事务消息被称为半消息;
由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback),可以看出Message Status Check主要用来解决分布式事务中的超时问题;
TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息;
TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费;
TransactionStatus.Unknown:中间状态,它代表需要回查本地事务状态来决定是提交还是回滚事务;
/**
* 事务监听器,重写执行本地事务方法以及事务回查方法
*/
public class TransactionListenerImpl implements TransactionListener{
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg){
String msgKey = msg.getKeys();
switch (msgKey) {
case "Num0":
case "Num1":
// 明确回复回滚操作,消息将会被删除,不允许被消费。
return LocalTransactionState.ROLLBACK_MESSAGE;
case "Num8":
case "Num9":
// 消息无响应,代表需要回查本地事务状态来决定是提交还是回滚事务
return LocalTransactionState.UNKNOW;
default:
// 消息通过,允许消费者消费消息
return LocalTransactionState.COMMIT_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg){
System.out.println("回查本地事务状态,消息Key: " + msg.getKeys() + ",消息内容: "
+ new String(msg.getBody()));
// 需要根据业务,查询本地事务是否执行成功,这里直接返回COMMIT
return LocalTransactionState.COMMIT_MESSAGE;
}
}
事务消息的生产者跟我们之前的普通生产者的不同:
public class TransactionProducer{
public static void main(String[] args) throws MQClientException,
InterruptedException{
// 创建事务类型的生产者
TransactionMQProducer producer = new
TransactionMQProducer("transaction-producer-group");
// 设置NameServer的地址
producer.setNamesrvAddr("10.0.90.211:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
// 启动生产者
producer.start();
// 发送10条消息
for (int i = 0; i < 10; i++){
try {
Message msg = new Message("TransactionTopic", "",
("Hello RocketMQ Transaction Message" + i).
getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置消息Key
msg.setKeys("Num" + i);
// 使用事务方式发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("sendResult = " + sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e){
e.printStackTrace();
}
}
// 阻塞,目的是为了在消息发送完成后才关闭生产者
Thread.sleep(10000);
producer.shutdown();
}
}
public class MQConsumer {
public static void main(String[] args) throws MQClientException {
// 创建DefaultMQPushConsumer类并设定消费者名称
DefaultMQPushConsumer mqPushConsumer = new
DefaultMQPushConsumer("consumer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqPushConsumer.setNamesrvAddr("10.0.90.211:9876");
// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果不是第一次启动,那么按照上次消费的位置继续消费
mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,
//则使用*
mqPushConsumer.subscribe("TransactionTopic", "*");
// 注册回调实现类来处理从broker拉取回来的消息
mqPushConsumer.registerMessageListener(new MessageListenerConcurrently(){
// 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage
//方法接收数据
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = msgList.get(0);
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
System.out.println("消费者接收到消息: " + messageExt.toString()
+ "---消息内容为:" + body);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
mqPushConsumer.start();
}
}
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC;由于消费组未订阅该主题,故消费端无法消费half类型的消息;
RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早;
要求延时消息3小时才能消费,而RocketMQ的延时消息最大延时级别只支持延时2小时,怎么处理?
消费者负载均衡,指的是,将⼀个Topic下的多个Queue分配到不同Consumer实例的过程,负载均衡机制的本意是为了提升消息的并行消费能力;通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费;
消费者平均分摊queue;
平均分摊每一条queue,只是以环状轮流分queue的形式;
集群模式下,queue都是只允许分配只一个实例,否则会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue;
由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列,所以需要控制让queue的总数量大于等于consumer的数量;
在Broker中维护着多个Map集合,这些集合中动态存放着当前Topic中Queue的信息、Consumer Group中Consumer实例的信息。一旦发现消费者所订阅的Queue数量发生变化,或消费者组中消费者的数量发生变化,立即向Consumer Group中的每个实例发出Rebalance通知;
Consumer实例在接收到通知后会采用Queue分配算法自己获取到相应的Queue,即由Consumer实例自主进行Rebalance;
一个Topic中的Queue只能由Consumer Group中的一个Consumer进行消费,而一个Consumer可以同时消费多个Queue中的消息;那么Queue与Consumer间的配对关系是如何确定的,即Queue要分配给哪个Consumer进行消费,也是有算法策略的,这些策略是通过在创建Consumer时的构造器传进去的;
常见的有四种策略:平均分配、环形平均策略、一致性hash策略、同机房策略;
该算法是要根据avg = QueueCount / ConsumerCount的计算结果进行分配的。如果能够整除,则按顺序将avg个Queue逐个分配Consumer;如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配;先计算好每个Consumer应该分得几个Queue,然后再依次将这些数量的Queue逐个分配个Consumer;
比如:有10个Queue,4个Consumer,那么每个Consumer可以分配到2个Queue,但是还有两个Queue是多余的,那么这两个Queue将依次按顺序分给Consumer-A,Consumer-B;
环形平均算法是指,根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配,该算法不用事先计算每个Consumer需要分配几个Queue,直接一个一个分即可;
该算法会将consumer的hash值作为Node节点存放到hash环上,然后将queue的hash值也放到hash环上,通过顺时针方向,距离queue最近的那个consumer就是该queue要分配的consumer;该算法存在的问题:分配不均,其可以有效减少由于消费者组扩容或缩容所带来的大量的Rebalance;
根据queue的部署机房位置和consumer的位置,过滤出当前consumer相同机房的queue。然后按照平均分配策略或环形平均策略对同机房queue进行分配;如果没有同机房queue,则按照平均分配策略或环形平均策略对所有queue进行分配;
发送消息通过轮询队列的方式发送,每个队列接收平均的消息量,通过增加机器,可以水平扩展队列容量;另外也可以自定义方式选择发往哪个队列;
RocketMQ 是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条 queue 上的消费进度。如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的;
每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值;
这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况;在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度只能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了;在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被 kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次;
对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度;