首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ客户端PUSH消费概览【源码笔记】

RocketMQ客户端PUSH消费概览【源码笔记】

作者头像
瓜农老梁
发布2019-08-05 13:32:21
7490
发布2019-08-05 13:32:21
举报
文章被收录于专栏:瓜农老梁瓜农老梁
一、问题描述

PUSH消费整体流程是怎么样的?

二、PUSH消费流程概览
1.从客户端示例开始
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("Jodie_topic_1023", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20170422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
2.客户端PUSH消费流程概览

概览流程1

概览流程2

小结:PUSH消费的主要内容实例化DefaultMQPushConsumer,注册MessageQueue分配策略;初始化订阅数据并存入缓存;注册消费监听用于回调处理消息;创建并启动MQClientInstance实例;向Broker发送心跳等工作。

3.参数校验哪些内容?

小结:参数校验中MessageListener只能为顺序消费或者并发消费两种模式;消费最小线程consumeThreadMin取值需要小于1000即最多1000个消费线程;由于为无界队consumeThreadMax设置无效。

4.MQClientInstance初始化与启动

小结:MQClientInstance初始化启动连带一系列线程类的启动。例如:PullMessageService、RebalanceService等以及通过Netty建立TCP通道。

三、总结

本篇文章主要对PUSH消费启动有个整体的印象,在分析消息拉取/并发消费/顺序消费/负载均衡时再来看各个类的具体职责。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、问题描述
  • 二、PUSH消费流程概览
    • 1.从客户端示例开始
      • 2.客户端PUSH消费流程概览
        • 3.参数校验哪些内容?
          • 4.MQClientInstance初始化与启动
          • 三、总结
          相关产品与服务
          负载均衡
          负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档