前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ之特性和运维详解

RocketMQ之特性和运维详解

作者头像
IT架构圈
发布2021-10-11 10:26:20
9030
发布2021-10-11 10:26:20
举报
文章被收录于专栏:IT架构圈

rocketmq的producer 和 Consumer的特性比较了解。

Producer详解(一)

•① 普通消息

涉及到的类 org.apache.rocketmq.client.impl.CommunicationMode org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

•② 定时消息

消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。固定精度:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 涉及到的类org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel

•③ 顺序消息

涉及到的类 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 在分布式队列实现排序,这个需求怎么做?优先级:订单号20180206(queu1 )订单号20180207(queue2) 排序+优先级(分片)

•④ 事物消息

Consumer详解(二)

•① Pull&push模式

DefaultMQPullConsumer DefaultMQPushConsumer

•② 消费模型

org.apache.rocketmq.common.protocol.heartbeat.MessageModel#BROADCASTING org.apache.rocketmq.common.protocol.heartbeat.MessageModel#CLUSTERING

•③ 消费选择

org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_LAST_OFFSET org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_FIRST_OFFSET org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_TIMESTAMP

•④ 重试策略

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败

•⑤ 消息重复幂

RocketMQ无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重

•⑥ 消息回溯

回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。使用命令:sh mqadmin resetOffsetByTime -g xx -s yy -t tt

•⑦ Producer|consumer Group:

1.可以通过运维工具(上次说过的rocketmq-console)查询这个组下有多少Producer实例。2.可通过运维工具查询这个组下的消费进度,多少个Consumer实例。3.事务消息,如果Producer意外宕机,Broker会主动回调Producer Group中的任意一台机器确认事务状态。4.集群模式,一个Consumer Group下的多个Consumer均摊消费消息;广播模式,group无意义。

NameServer(三)

•① 作用

1.每个Broker启动的时候会向Namesrv发送注册请求,Namesrv接收Broker的请求注册路由信息,NameServer保存活跃的broker列表,包括Master和Slave。2.用来保存所有topic和该topic所有队列的列表。3.NameServer用来保存所有broker的Filter列表。4.接收client(Producer和Consumer)的请求根据某个topic获取所有到broker的路由信息。总结:维护路由信息、维护broker数据。

•② 启动过程

org.apache.rocketmq.namesrv.NamesrvStartup org.apache.rocketmq.namesrv.NamesrvController#initialize org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load org.apache.rocketmq.remoting.netty.NettyRemotingServer(初始化远程服务、线程池服务)

初始化过程执行

1.读取配置文件2.调用初始化方法#1083.注册broker处理接受消费和消费消息等功能4.定时去扫描不活跃的broker

Broker(四)

•① 作用

消息接受、存储(恢复)、与各节点之间通讯。

•② 启动过程

org.apache.rocketmq.broker.BrokerStartup#createBrokerController#210 org.apache.rocketmq.common.ConfigManager#load#加载文件(store/config)内容 org.apache.rocketmq.store.DefaultMessageStore#load加载磁盘文件内容 org.apache.rocketmq.remoting.netty.NettyRemotingServer初始化通信层 线程池 org.apache.rocketmq.broker.BrokerController#registerProcessor 注册远程服务

•③ DefaultMessageStore

持久核心处理类

1.FlushConsumeQueueService >逻辑队列刷盘服务2.CleanCommitLogService 清理物理文件服务,定期清理72小时之前的物理文件。3.CleanConsumeQueueService 定期清理在逻辑队列中的物理偏移量小于commitlog中的最小物理偏移量的数据4.HAService 用于commitlog数据的主5.ScheduleMessageService 用于监控延迟消息,并到期后执行6.TransactionStateService 用于事务消息状态文件

初始化过程执行

1.初始化>createBrokerController>initialize

2.加载topic、消费进度、订阅关系

3.加载本地消息messageStore.load

4.远程通讯NettyRemotingServer、加载处理消息类。

5.初始化发送线程池sendMessageExecutor、拉取消息线程池(pullMessageExecutor)、管理Broker线程池(adminBrokerExecutor)、客户端管理线程池(clientManageExecutor)。

6.注册事件处理器,包括发送消息事件处理器(SendMessageProcessor)、拉取消息事件处理器、查询消息事件处理器(QueryMessageProcessor,包括客户端的心跳事件、注销事件、获取消费者列表事件、更新更新和查询消费进度consumerOffset)、客户端管理事件处理器、结束事务处理器(EndTransactionProcessor)、默认事件处理器(AdminBrokerProcessor)。

运维(五)

参看源码:jms里面的rocket运维

PS:特性和运维只是了解下,看下github源码中的文档,里面介绍的更加详细。

点击👆卡片,共同学习共同进步,我的坚持你的收获。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 IT架构圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Producer详解(一)
  • Consumer详解(二)
  • NameServer(三)
  • Broker(四)
  • 运维(五)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档