前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ之Pull Consumer负载均衡拉取正确姿势?

RocketMQ之Pull Consumer负载均衡拉取正确姿势?

作者头像
斯武丶风晴
发布2019-07-02 17:33:33
2.9K0
发布2019-07-02 17:33:33
举报
文章被收录于专栏:龙首琴剑庐龙首琴剑庐

RocketMQ分布式消息中间件Pull模式下,我们的一般消费步骤如下:

1. 读取 topic的消息队列message queue的信息; 2. 按队列去拉取一定数目的消息; 3.(持久化message queue的消费进度consume offset)

首先有一些关键概念,我们需要理清楚:

consume offset 是基于topic 以及 消费组 consumer group的,意思是什么?

意思就是当采用集群消费模型(CLUSTERING),我们的consume offset 默认是存储在broker服务器上的config/consumerOffset.json文件。

代码语言:javascript
复制
{
	"offsetTable":{
		"TopicTest@pullConsumerGroupTest":{0:1578,1:1578,2:1578,3:1578
		}
	}
}

(TopicTest为某个topic,初始化为4个队列, pullConsumerGroupTest为某个消费组)

那么当同一个consumer group的多个consumer instance默认是共享一个消费进度。

那么问题来了,我们同一个consumer group的多个consumer instance在第一个步骤的时候,如何快速感知和分配合适的消息队列message queue,给每一个consumer instance消费呢??

---即 负载均衡问题如何解决?

废话不多说,贴个代码:

代码语言:javascript
复制
public Set<MessageQueue> fetchMessageQueuesForPullOperation(String topic)
			throws MQClientException, InterruptedException {
		DefaultMQPullConsumer pullConsumer; // please init
		long fetchQueueTimeoutMillis = 5000l;
		long fetchQueueNextDelayTimeMillis = 200l;

		Set<MessageQueue> msgQueues = null;
		switch (pullConsumer.getMessageModel()) {
			case BROADCASTING:
				msgQueues = pullConsumer.fetchSubscribeMessageQueues(topic);
				break;
			case CLUSTERING:
				msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic);
				// 未获取到负载均衡的时候,等待fetchQueueNextDelayTimeMillis毫秒重新获取,直到超时
				long timeout = 0L;
				while (CollectionUtils.isEmpty(msgQueues) && timeout < fetchQueueTimeoutMillis) {
					Thread.sleep(fetchQueueNextDelayTimeMillis);
					timeout += fetchQueueNextDelayTimeMillis;
					msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic);
				}
				break;
			default:
				break;
		}
		return msgQueues;
	}

当然还有一种内置的

代码语言:javascript
复制
MQPullConsumerScheduleService

也是可以实现。不同的是这个是回调模式。

by 斯武丶风晴 https://my.oschina.net/langxSpirit

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档