前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试系列之-rocketmq消息机制

面试系列之-rocketmq消息机制

作者头像
用户4283147
发布2022-12-29 20:07:34
1.1K0
发布2022-12-29 20:07:34
举报
文章被收录于专栏:对线JAVA面试对线JAVA面试

rocketmq消息消费模式

广播模式

广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息都会被发送到Consumer Group中的每个Consumer进行消费;

集群模式(默认)

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊同一个Topic的消息。即每条消息只会被发送到Consumer Group中的某个Consumer;

消息进度保存
  • 广播模式: 消费进度保存在consumer端。因为广播模式下consumer group中每个consumer都会消费所有消息,但它们的消费进度是不同。所以consumer各自保存各自的消费进度;
  • 集群模式: 消费进度保存在broker中。consumer group中的所有consumer共同消费同一个Topic中的消息,同一条消息只会被消费一次。消费进度会参与到了消费的负载均衡中,故消费进度是需要共享的;

消息发送类型

Rocketmq提供三种方式可以发送普通消息:同步、异步、和单向发送:

  • 同步:发送方发送消息后,收到服务端响应后才发送下一条消息
  • 异步:发送一条消息后,不等服务端返回就可以继续发送消息或者后续任务处理,发送方通过回调接口接收服务端响应,并处理响应结果。
  • OneWay:发送方发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不需要应答。

发送方式对比:发送吞吐量,单向>异步>同步,但单向发送可靠性差存在丢失消息可能,选型根据实际需求确定;

代码语言:javascript
复制
private void sync() throws Exception {
        //创建消息
        Message message = new Message("topic_family", ("  同步发送  ").
                                      getBytes());
        //同步发送消息
        SendResult sendResult = producer.send(message);
        log.info("Product-同步发送-Product信息={}", sendResult);
    }
 
    /**
     * 2、异步发送消息
     */
    private void async() throws Exception {
        //创建消息
        Message message = new Message("topic_family", ("  异步发送  ").
                                      getBytes());
        //异步发送消息
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Product-异步发送-输出信息={}", sendResult);
            }
 
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                //补偿机制,根据业务情况进行使用,看是否进行重试
            }
        });
    }
 
    /**
     * 3、单项发送消息
     */
    private void oneWay() throws Exception {
        //创建消息
        Message message = new Message("topic_family", (" 单项发送 ").
                                      getBytes());
        //同步发送消息
        producer.sendOneway(message);
    }

rocketmq消息查询

按照Message Key查询

消息的key是业务开发同学在发送消息之前自行指定的,通常会把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等;按Message Key查询消息,主要是基于RocketMQ的IndexFile索引文件来实现的;RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现;

查询过程:

  1. 根据查询的key的hashcode%slotNum得到具体的槽的位置(slotNum是一个索引文件里面包含的最大槽的数目,例如图中所示slotNum=5000000);
  2. 根据slotValue(slot位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue总是指向最新的一个索引项);
  3. 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的32条记录);
  4. Hash冲突:寻找key的slot位置时相当亍执行了两次散列函数,一次key的hash,一次key的hash值取模,因此这里存在两次冲突的情况:第一种,key的hash值不同但模数相同,此时查询的时候会在比较一次key的hash值(每个索引项保存了key的hash值),过滤掉hash值不相等的项;第二种,hash值相等但key不等,出于性能的考虑冲突的检测放到客户端处理(key的原始值是存储在消息文件中的,避免对数据文件的解析),客户端比较一次消息体的key是否相同;
  5. 存储:为了节省空间索引项中存储的时间是时间差值(存储时间-开始时间,开始时间存储在索引文件头中),整个索引文件是定长的,结构也是固定的;

按照Message Key查询消息的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

按照Message Id查询

MsgId总共16字节,包含消息存储主机地址,消息Commit Log offset,从MsgId中解析出Broker的地址和Commit Log的偏移地址,然后挄照存储格式所在位置消息buffer解析成一个完整的消息;Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID);Broker端走的是QueryMessageProcessor,读取消息的过程用其中的commitLog offset和size去commitLog中找到真正的记录并解析成一个完整的消息返回;

rocketmq消息过滤

表达式过滤tag方式
代码语言:javascript
复制
//只订阅的消息TagA或TagC 
consumer.subscribe("TagFilterTest","TagA || TagC");
表达式过滤sql92方式

基于 sql92 表达式消息过滤,其实是对消息的属性运用 sql 过滤表达式进行条件匹配,所以消息发送时应该调用 putUserProperty 方法设置消息属性:

代码语言:javascript
复制
//只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest",MessageSelector.bySql("a between 0 and 3");
MessageFilter类过滤方式
代码语言:javascript
复制
// 使用 MessageFilter实现类,在服务器做消息过滤
String filterCode = MixAll.file2String("MessageFilterImpl.java");
consumer.subscribe("FilterTopicTest", "com.zqh.demo.filter.MessageFilterImpl", 
				   filterCode);
Rocket MQ消息过滤是发生在服务端还是客户端
  1. 消息生产者把消息发送并存储到Rocket MQ的broker上,NameServer用来发现和更新broker;
  2. 消费者启动时会启动PullMessageService 线程,PullMessageService线程不断地从内部的队列中取 PullRequest,然后使用PullRequest作为请求去拉取消息;
  3. PullRequest中的消息处理队列ProcessQueue是MessageQueue在消费端的重现、快照;PullMessageService使用消费者(DefaultMQPushConsumerImpl)从消息服务器默认每次拉取32条消息,按消息的队列偏移量存放在ProcessQueue中,然后消费者再将消息提交到消息消费线程池中(提交 ConsumeRequest),消息成功消费后从ProcessQueue中移除;
过滤时机一:Rocket MQ服务端(Borker)接收到拉取消息命令(RequestCode.PULL_MESSAGE),会对消息进行过滤
过滤时机二:Rocket MQ消费端(一般是业务应用)拉取到消息后对消息的处理

RocketMQ的消息过滤方式有别于其他消息中间件,是在订阅时,再做过滤;

  1. 在Broker端进行Message Tag比对,先遍历Consume Queue,如果存储的Message Tag不订阅的 Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer;注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode;
  2. Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是 Hashcode;
为什么基于表达式tag会在客户端再进行一次过滤
  1. Message Tag存储Hashcode,是为了在Consume Queue定长方式存储,节约空间;
  2. 过滤过程中不会访问Commit Log数据,可以保证堆积情况下也能高效过滤;
  3. 即使存在Hash冲突,也可以在Consumer端进行修正,保证万无一失;

由于在消息服务端进行消息过滤是匹配消息tag的hashcode,导致服务端过滤并不十分准确,从服务端返回的消息最终不一定是消息消费者订阅的消息,导致服务端过滤并不十分准确,这样也会造成网络带宽的浪费(可使用基于MessageFilter实现类模式的消息过滤);

rocketmq顺序消息

生产者顺序发送

顺序消息分为全局顺序消息与部分顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可;顺序消费实际上有两个核心点,一个是生产者有序存储,另一个是消费者有序消费;

RocketMQ中生产者生产的消息会放置在某个队列中,基于队列先进先出的特性天然的可以保证存入队列的消息顺序和拉取的消息顺序是一致的,因此我们只需要保证一组相同的消息按照给定的顺序存入同一个队列中,就能保证生产者有序存储;

普通发送消息的模式下,生产者会采用轮询的方式将消费均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用RocketMQ带来的队列有序特性来保证消息有序性了;

  1. RocketMQ支持生产者在投放消息的时候自定义投放策略,我们实现一个MessageQueueSelector接口,使用Hash取模法来保证同一个订单在同一个队列中就行了,即通过订单ID%队列数量得到该ID的订单所投放的队列在队列列表中的索引,然后该订单的所有消息都会被投放到这个队列中;生产者发送消息的方法中就有一些添加队列选择器的方法,保证消息发送顺序;
  2. 顺序消息必须使用同步发送的方式才能保证生产者发送的消息有序;实际上采用队列选择器的方法不能保证消息的严格顺序,我们的目的是将消息发送到同一个队列中,如果某个broker挂了,那么队列就会减少一部分,如果采用取余的方式投递,将可能导致同一个业务中的不同消息被发送到不同的队列中,导致同一个业务的不同消息被存入不同的队列中,短暂的造成部分消息无序。同样的如果增加了服务器,那么也会造成短暂的造成部分消息无序;
消费者顺序消费

RockerMQ的MessageListener回调函数提供了两种消费模式,有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently:

  • 在消费的时候,还需要保证消费者注册MessageListenerOrderly类型的回调接口实现顺序消费,如果消费者采用Concurrently并行消费,则仍然不能保证消息消费顺序;
  • 每一个消费者的的消费端都是采用线程池实现多线程消费的模式,即消费端是多线程消费。虽然MessageListenerOrderly被称为有序消费模式,但是仍然是使用的线程池去消费消息;
  • MessageListenerConcurrently是拉取到新消息之后就提交到线程池去消费;MessageListenerOrderly则是通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据;
MessageListenerOrderly的加锁机制
  • 消费者在进行某个队列的消息拉取时首先向Broker服务器申请队列锁,如果申请到琐,则拉取消息,否则放弃消息拉取,等到下一个队列负载周期(20s)再试。这一个锁使得一个MessageQueue同一个时刻只能被一个消费客户端消费,防止因为队列负载均衡导致消息重复消费;
  • 假设消费者对messageQueue的加锁已经成功,那么会开始拉取消息,拉取到消息后同样会提交到消费端的线程池进行消费。但在本地消费之前,会先获取该messageQueue对应的锁对象,每一个messageQueue对应一个锁对象,获取到锁对象后,使用synchronized阻塞式的申请线程级独占锁。这一个锁使得来自同一个messageQueue的消息在本地的同一个时刻只能被一个消费客户端中的一个线程顺序的消费;
  • 在本地加synchronized锁成功之后,还会判断如果是广播模式,则直接进行消费,如果是集群模式,则判断如果messagequeue没有锁住或者锁过期(默认30000ms),那么延迟100ms后再次尝试向Broker申请锁定messageQueue,锁定成功后重新提交消费请求;
MessageListenerOrderly顺序消费存在的问题
  1. 使用了很多的锁,降低了吞吐量;
  2. 前一个消息消费阻塞时后面消息都会被阻塞。如果遇到消费失败的消息,会自动对当前消息进行重试(每次间隔时间为1秒),无法自动跳过,重试最大次数是Integer.MAX_VALUE,这将导致当前队列消费暂停,因此通常需要设定有一个最大消费次数,以及处理好所有可能的异常情况;
顺序消息缺陷
  • 发送顺序消息无法利用集群FailOver特性;
  • 消费顺序消息的并行度依赖于队列数量;
  • 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题;
  • 遇到消息失败的消息,无法跳过,当前队列消费暂停;

rocketmq事务消息

RocketMQ事务不同于Kafka事务,它是基于2PC的方案实现的分布式事务,分两阶段提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,两阶段提交 + 回查(RMQ事务消息是对经典的2PC分布式事务的实现):

  • 一阶段:发送half消息;
  • 二阶段:根据half消息发送结果以及本地事务执行结果决定发送commit或rollback;
  • 回查:broker端通过定时任务,默认以1分钟为回查频率,对half消息存储队列(RMQ_SYS_TRANS_HALF_TOPIC)及半消息处理队列(RMQ_SYS_TRANS_OP_HALF_TOPIC存储已经提交或者回滚的消息)中的消息进行比较,对需要进行回查的half消息发送给客户端进行回查;根据回查结果最终决定对半消息进行commit/rollback操作;
事务消息的两个拓展
Half(Prepare) Message——半消息(预处理消息)

半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于暂时不可被消费状态,该状态的事务消息被称为半消息;

Message Status Check——消息状态回查

由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback),可以看出Message Status Check主要用来解决分布式事务中的超时问题;

rocketmq事务消息执行流程
  1. Producer向Broker端发送Half Message;
  2. Broker ACK,Half Message发送成功;
  3. Producer执行本地事务;
  4. 本地事务完毕,根据事务的状态,Producer向Broker发送二次确认消息,确认该Half Message的Commit或者Rollback状态;Broker收到二次确认消息后,对于Commit状态,则直接发送到Consumer端执行消费逻辑,而对于Rollback则直接标记为失败,一段时间后清除,并不会发给Consumer;正常情况下,到此分布式事务已经完成,剩下要处理的就是超时问题,即一段时间后Broker仍没有收到Producer的二次确认消息;
  5. 针对超时状态,Broker主动向Producer发起消息回查;
  6. Producer处理回查消息,返回对应的本地事务的执行结果;
  7. Broker针对回查消息的结果,执行Commit或Rollback操作,同4;
消息的三个状态

TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息;

TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费;

TransactionStatus.Unknown:中间状态,它代表需要回查本地事务状态来决定是提交还是回滚事务;

RocketMQ事务消息使用案例
1. 定义消息监听器
  • executeLocalTransaction:执行本地事务;
  • checkLocalTransaction:回查本地事务状态,根据这次回查的结果来决定此次事务是提交还是回滚;
代码语言:javascript
复制
/**
* 事务监听器,重写执行本地事务方法以及事务回查方法
*/
public class TransactionListenerImpl implements TransactionListener{
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg){
        String msgKey = msg.getKeys();
        switch (msgKey) {
            case "Num0":
            case "Num1":
                // 明确回复回滚操作,消息将会被删除,不允许被消费。
                return LocalTransactionState.ROLLBACK_MESSAGE;
            case "Num8":
            case "Num9":
                // 消息无响应,代表需要回查本地事务状态来决定是提交还是回滚事务
                return LocalTransactionState.UNKNOW;
            default:
                // 消息通过,允许消费者消费消息
                return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
 
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg){
        System.out.println("回查本地事务状态,消息Key: " + msg.getKeys() + ",消息内容: "
						   + new String(msg.getBody()));
        // 需要根据业务,查询本地事务是否执行成功,这里直接返回COMMIT
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
2. 定义消息生产者

事务消息的生产者跟我们之前的普通生产者的不同:

  • 需创建事务类型的生产者TransactionMQProducer;
  • 需调用setTransactionListener()方法设置事务监听器;
  • 使用sendMessageInTransaction()以事务方式发送消息;
代码语言:javascript
复制
public class TransactionProducer{
    public static void main(String[] args) throws MQClientException, 
	InterruptedException{
        // 创建事务类型的生产者
        TransactionMQProducer producer = new 
			TransactionMQProducer("transaction-producer-group");
        // 设置NameServer的地址
        producer.setNamesrvAddr("10.0.90.211:9876");
        // 设置事务监听器
        producer.setTransactionListener(new TransactionListenerImpl());
        // 启动生产者
        producer.start();
        // 发送10条消息
        for (int i = 0; i < 10; i++){
            try {
                Message msg = new Message("TransactionTopic", "", 
										  ("Hello RocketMQ Transaction Message" + i).
										  getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 设置消息Key
                msg.setKeys("Num" + i);
                // 使用事务方式发送消息
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.println("sendResult = " + sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e){
                e.printStackTrace();
            }
        }
        // 阻塞,目的是为了在消息发送完成后才关闭生产者
        Thread.sleep(10000);
        producer.shutdown();
    }
}
3. 定义消息消费者
代码语言:javascript
复制
public class MQConsumer {
    public static void main(String[] args) throws MQClientException {
		// 创建DefaultMQPushConsumer类并设定消费者名称
		DefaultMQPushConsumer mqPushConsumer = new 
			DefaultMQPushConsumer("consumer-group-test");
		// 设置NameServer地址,如果是集群的话,使用分号;分隔开
		mqPushConsumer.setNamesrvAddr("10.0.90.211:9876");
		// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
		// 如果不是第一次启动,那么按照上次消费的位置继续消费
		mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
		// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,
		//则使用*
		mqPushConsumer.subscribe("TransactionTopic", "*");

		// 注册回调实现类来处理从broker拉取回来的消息
		mqPushConsumer.registerMessageListener(new MessageListenerConcurrently(){
			// 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage
			//方法接收数据
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, 
															ConsumeConcurrentlyContext consumeConcurrentlyContext) {
				MessageExt messageExt = msgList.get(0);
				String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
				System.out.println("消费者接收到消息: " + messageExt.toString() 
								   + "---消息内容为:" + body);
				// 标记该消息已经被成功消费
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		// 启动消费者实例
		mqPushConsumer.start();
    }
}
RocketMQ事务消息原理
基本思想

在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC;由于消费组未订阅该主题,故消费端无法消费half类型的消息;

事务回查的基本思想
  • Broker会启动一个消息回查的定时任务,定时从事务消息queue中读取所有待反查的消息。针对每个需要反查的半消息,Broker会给对应的Producer发一个要求执行事务状态反查的RPC请求,然后根据RPC返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。最后提交或者回滚事务,将半消息标记为已处理状态【将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)】。 如果是提交事务,就把半消息从半消息队列中复制到该消息真正的topic和queue中; 如果是回滚事务,则什么都不做;
  • rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息;
RocketMQ事务消息使用限制
  1. 事务消息不支持延时消息和批量消息;
  2. 事务性消息可能不止一次被检查或消费,所以消费者端需要做好消费幂等;
  3. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次(即默认只会回查15次),我们可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话( N = transactionCheckMax ), 则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为;
  4. 事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数;
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制;
  6. 事务消息的生产者ID不能与其他类型消息的生产者ID共享;与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者;

rocketmq延时消息

  • RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
  • 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
  • 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
  • broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面;
具体原理

RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早;

执行过程
  1. Broker收到消息后会将消息写入CommitLog,在写入时会判断消息DELAY属性是否大于0;
  2. 大于0,CommitLog写入时并没有直接写入,而是把Topic改为SCHEDULE_TOPIC_XXXX,把queueId改为延时级别减1;因为延时级别有18个,所以这里有18个队列;
  3. 调度消息:延时消息写入后,会有一个调度任务不停地拉取这些延时消息,ScheduleMessageService的load()方法会加载一个delayLevelTable(ConcurrentHashMap类型),key保存延时级别(从1开始),value保存延时时间(单位是 ms);load()方法结束后,创建了一个有18个核心线程的定时线程池,然后遍历delayLevelTable,创建18个任务(DeliverDelayedMessageTimerTask)进行每个延时级别的任务调度;
  4. 消息从CommitLog转发到ConsumeQueue时,会判断是否是延时消息(Topic = SCHEDULE_TOPIC_XXXX 并且延时级别大于0),如果是延时消息,就会修改tagsCode值为消息投递的时间戳,而tagsCode原值是tag的HashCode;而ScheduleMessageService调度线程将消息从ConsumeQueue重新投递到原始队列中时,会把tagsCode再次修改为tag的HashCode;
  • Commit Log Offset:记录在CommitLog中的位置;
  • Size:记录消息的大小;
  • Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳;这也是为什么java中hashCode方法返回一个int型,只占用4个字节,而这里Message Tag HashCode字段却设计成8个字节的原因;
quest

要求延时消息3小时才能消费,而RocketMQ的延时消息最大延时级别只支持延时2小时,怎么处理?

  • 在Broker上修改messageDelayLevel的默认配置;
  • 在客户端缓存msgId,先设置延时级别是18(2h),当客户端拉取到消息后首先判断有没有缓存,如果有缓存则再次发送延时消息,这次延时级别是17(1h),如果没有缓存则进行消费;
总流程
  • producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别;
  • broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1;
  • mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列;
  • 根据消费偏移量offset从commitLog中解析出对应消息;
  • 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递;
  • 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递;
优缺点
  • 优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性;
  • 缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况;
  • 改进点:可以在每个延迟队列上各采用一个timer,或者使用timer进行扫描,加一个线程池对消息进行处理,这样可以提供效率;

rocketmq消息负载均衡

消费者负载均衡

消费者负载均衡,指的是,将⼀个Topic下的多个Queue分配到不同Consumer实例的过程,负载均衡机制的本意是为了提升消息的并行消费能力;通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费;

分配算法
AllocateMessageQueueAveragely

消费者平均分摊queue;

AllocateMessageQueueAveragelyByCircle

平均分摊每一条queue,只是以环状轮流分queue的形式;

集群模式下,queue都是只允许分配只一个实例,否则会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue;

消费者Rebalance限制

由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列,所以需要控制让queue的总数量大于等于consumer的数量;

Rebalance危害
  • 消费突刺: 由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息。 在只有一个Consumer时,其负责消费所有Queue;在新增了一个Consumer后会触发Rebalance的发生。此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费;
  • 消费重复: Consumer 在消费新分配给自己的队列时,必须接着之前Consumer 提交的消费进度的offset继续消费。然而默认情况下,offset是异步提交的,如果异步提交失败了,就会导致提交到Broker的offset与Consumer实际消费的消息并不一致。这个不一致的差值就是可能会重复消费的消息(Consumer被踢出消费组,可能还没有提交offset,Rebalance时会Partition重新分配其它Consumer,会造成重复消费,虽有幂等操作但耗费消费资源,亦增加集群压力);
  1. 同步提交: consumer提交了其消费完毕的一批消息的offset给broker后,需要等待broker的成功ACK。当收到ACK后,consumer才会继续获取并消费下一批消息。在等待ACK期间,consumer是阻塞的;
  2. 异步提交: consumer提交了其消费完毕的一批消息的offset给broker后,不需要等待broker的成功ACK。consumer可以直接获取并消费下一批消息;
  • 消息丢失: 假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后 Kafka 进行重平衡,新的消费者负责此分区并读取提交位移,此时会“丢失”消息;
Rebalance原因及过程
Rebalance产生的原因
  • 消费者所订阅Topic的Queue数量发生变化;
  • 消费者组中消费者的数量发生变化;
Rebalance过程

在Broker中维护着多个Map集合,这些集合中动态存放着当前Topic中Queue的信息、Consumer Group中Consumer实例的信息。一旦发现消费者所订阅的Queue数量发生变化,或消费者组中消费者的数量发生变化,立即向Consumer Group中的每个实例发出Rebalance通知;

Consumer实例在接收到通知后会采用Queue分配算法自己获取到相应的Queue,即由Consumer实例自主进行Rebalance;

Queue分配算法

一个Topic中的Queue只能由Consumer Group中的一个Consumer进行消费,而一个Consumer可以同时消费多个Queue中的消息;那么Queue与Consumer间的配对关系是如何确定的,即Queue要分配给哪个Consumer进行消费,也是有算法策略的,这些策略是通过在创建Consumer时的构造器传进去的;

常见的有四种策略:平均分配、环形平均策略、一致性hash策略、同机房策略;

平均分配策略

该算法是要根据avg = QueueCount / ConsumerCount的计算结果进行分配的。如果能够整除,则按顺序将avg个Queue逐个分配Consumer;如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配;先计算好每个Consumer应该分得几个Queue,然后再依次将这些数量的Queue逐个分配个Consumer;

比如:有10个Queue,4个Consumer,那么每个Consumer可以分配到2个Queue,但是还有两个Queue是多余的,那么这两个Queue将依次按顺序分给Consumer-A,Consumer-B;

环形平均策略

环形平均算法是指,根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配,该算法不用事先计算每个Consumer需要分配几个Queue,直接一个一个分即可;

一致性hash策略

该算法会将consumer的hash值作为Node节点存放到hash环上,然后将queue的hash值也放到hash环上,通过顺时针方向,距离queue最近的那个consumer就是该queue要分配的consumer;该算法存在的问题:分配不均,其可以有效减少由于消费者组扩容或缩容所带来的大量的Rebalance;

同机房策略

根据queue的部署机房位置和consumer的位置,过滤出当前consumer相同机房的queue。然后按照平均分配策略或环形平均策略对同机房queue进行分配;如果没有同机房queue,则按照平均分配策略或环形平均策略对所有queue进行分配;

生产者负载均衡

发送消息通过轮询队列的方式发送,每个队列接收平均的消息量,通过增加机器,可以水平扩展队列容量;另外也可以自定义方式选择发往哪个队列;

rocketmq消息ACK机制

RocketMQ 是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条 queue 上的消费进度。如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的;

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值;

这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况;在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度只能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了;在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被 kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次;

对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度;

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 对线JAVA面试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • rocketmq消息消费模式
    • 广播模式
      • 集群模式(默认)
        • 消息进度保存
        • 消息发送类型
        • rocketmq消息查询
          • 按照Message Key查询
            • 按照Message Id查询
            • rocketmq消息过滤
              • 表达式过滤tag方式
                • 表达式过滤sql92方式
                  • MessageFilter类过滤方式
                    • Rocket MQ消息过滤是发生在服务端还是客户端
                      • 过滤时机一:Rocket MQ服务端(Borker)接收到拉取消息命令(RequestCode.PULL_MESSAGE),会对消息进行过滤
                      • 过滤时机二:Rocket MQ消费端(一般是业务应用)拉取到消息后对消息的处理
                      • 为什么基于表达式tag会在客户端再进行一次过滤
                  • rocketmq顺序消息
                    • 生产者顺序发送
                      • 消费者顺序消费
                        • MessageListenerOrderly的加锁机制
                        • MessageListenerOrderly顺序消费存在的问题
                      • 顺序消息缺陷
                      • rocketmq事务消息
                        • 事务消息的两个拓展
                          • Half(Prepare) Message——半消息(预处理消息)
                          • Message Status Check——消息状态回查
                        • rocketmq事务消息执行流程
                          • 消息的三个状态
                            • RocketMQ事务消息使用案例
                              • 1. 定义消息监听器
                              • 2. 定义消息生产者
                              • 3. 定义消息消费者
                            • RocketMQ事务消息原理
                              • 基本思想
                              • 事务回查的基本思想
                            • RocketMQ事务消息使用限制
                            • rocketmq延时消息
                              • 具体原理
                                • 执行过程
                                  • quest
                                    • 总流程
                                      • 优缺点
                                      • rocketmq消息负载均衡
                                        • 消费者负载均衡
                                          • 分配算法
                                          • 消费者Rebalance限制
                                          • Rebalance原因及过程
                                          • Queue分配算法
                                        • 生产者负载均衡
                                        • rocketmq消息ACK机制
                                        相关产品与服务
                                        消息队列 CMQ 版
                                        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                                        领券
                                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档