首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RocketMQ(一):推拉消费模型客户端实践

MQ有很多成熟产品,以RocketMQ作为切入点,成本较低。MQ主要角色为:生产者、消费者、消息服务端。 本文先来看看消费者的实现。现在通用的消费模型中,有推和拉两种模型。...调用业务消费代码,实现了 MessageListenerConcurrently 的接口; push的消费模式,前端代码简单,由服务端进行推送数据,能够在消息到达时及时处理,省去了客户端无谓的轮询类操作...服务端调用 // org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#pullBlockIfNotFound @Override...Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); } // 创建消费客户端实例...客户端进行消费消息,基本都是一此连接的管理,复杂度都不高。主要还是MQ服务端功能,值得深入。 欲知后事如何,且听下文分解。 ---- ? —END—

1.1K10
您找到你想要的搜索结果了吗?
是的
没有找到

RocketMQ客户端消费--ProcessQueue处理队列【源码笔记】

this.processQueue.isLockExpired())) 小结:@1&@2中可以看出lastLockTimestamp在顺序消费时向Broker请求对队列加锁成功后设置的时间戳;REBALANCE_LOCK_MAX_LIVE_TIME由参数rocketmq.client.rebalance.lockMaxLiveTime...case CONSUME_PASSIVELY: pq.setDropped(true); } 小结:lastPullTimestamp每次拉取消息都会更新时间戳;PULL_MAX_IDLE_TIME由rocketmq.client.pull.pullMaxIdleTime...msgs.isEmpty()) { //客户端消费消息 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs...), context); } 小结:顺序消费时通过ProcessQueue#takeMessags获取特定数量的消息(默认1条)并传给客户端Listener进行处理。...case SUCCESS: //清空msgTreeMapTemp commitOffset = consumeRequest.getProcessQueue().commit(); 小结:在顺序消费客户端处理消息状态为成功时

2K50

记在使用rocketmq client客户端过程中踩到的坑

前言 最近项目中使用阿里的rocketmq来做消息队列,具体怎么使用rocketmq不在本文讨论范围之内,其相关帮助文档可以参考如下链接 https://help.aliyun.com/product...spm=a2c4g.11186623.6.540.afd02578y4vHe4 本文主要记录在使用rocketmq client时,遇到的一些坑,作者采用的客户端版本是4.2 踩到的坑 1、No route...的相应端口,或者加入相应的可以访问rocketmq的ip 5、topic的长度过长 这个有待验证 2、connect to failed 产生的原因: rocketmq默认开启了vip...通道 解决方案: 在客户端代码层面加入 producer.setVipChannelEnabled(false); consumer.setVipChannelEnabled(false); 3、Send...spm=5176.789006189.3.6.UbsCt3 3、如果是使用虚拟机,可能虚拟机中的网络太多,rocketMQ在自动识别网络的时候识别错误。

14.6K31

RocketMQ客户端PUSH消费--并发消费与顺序消费【源码笔记】

小结:ConsumeMessageService并发消费(ConsumeMessageConcurrentlyService)主要工作交给Listener(客户端传入)进行处理,并对处理结果进行统计和处理...4.会存在Broker加锁过期了客户端还在处理该队列的情况吗? 2.Broker端队列加锁流程 ?...小结:顺序消费时对Broker端队列加锁防止该队列在特定时间内(一次默认60秒)被分配给其他clientId处理;Broker端加锁了,一次加锁失效时长为60秒;不存在Broker加锁过期了客户端还在处理该队列的情况...,Broker加锁时长为60秒,而客户端加锁时长为30秒,当客户端加锁时长失效时会重新请求Broker加锁并更新时间戳,从而可以持续延长加锁时间。

2.8K60

优秀的 RocketMQ 可视化管理工具 GUI 客户端

优秀的 RocketMQ 可视化管理工具 GUI 客户端官网地址:http://www.redisant.cn/rocketmq快速查看所有 RocketMQ 集群,包括Brokers、Topics和Consumers...查看消费者订阅了哪些主题,以及消息队列被分配给了哪些消费者;当出现消息积压时,RocketMQ Assistant 帮您快速定位问题创建普通消息、延迟消息、顺序消息;配合数据模板和定时器,您可以一次发送数千条消息进行性能测试...浮点类型消息创建和删除主题、重置消费者偏移量以及其他管理功能根据消息ID或消息Key追踪消息,了解消息从生产、存储到消费的详细过程支持权限控制列表(ACL)多标签页管理,同时打开多个连接快速连接到您的 RocketMQ...集群并开始工作RocketMQ Assistant 支持ACL认证,支持 TLS 连接;支持 RocketMQ 4.x, 5.0图片实时查看您的 RocketMQ 健康指标查看 Broker 运行时配置...,支持 Prometheus 格式的服务端、生产者、消费者 Metrics 指标图片支持丰富的数据格式RocketMQ Assistant 会自动识别并格式化不同的数据格式,包括Text、JSON、XML

82330

RocketMQ

还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力 rocket相比其他mq的优点 能够保证严格的消息顺序 提供丰富的消息拉取模式 实时的消息订阅机制 亿级消息堆积能力 rocketmq...使用同步复制 可以保证消息100%不丢失(但是性能下降10%) rocketmq节点 Name Server Broker 部署相对复杂,Broker 分为 Master 与 Slave 一个 Master...由于 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此 RocketMQ 没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列...每个优先级可以用不同的 topic 表示,发消息时,指定不同的 topic 来表示优先级,这种方式可以解决绝大部分的优先级问题,但是对业务的优先级精确性做了妥协 刷盘方式 rocketmq 建议使用多主多从同步复制

2.2K10

RocketMQ

RocketMQ 基本概念 消息模型 RocketMQ由Producer、Broker、Consumer组成 Producer 生产消息,同步/异步发送,顺序/单向发送。...为什么RocketMQ没有这么做 因为RocketMQ 是java 实现的,要是缓存过多消息,GC是很严重的问题。...所以多文件并发写入,性能比RocketMQ好。 RocketMQ只有一个commitLog物理文件,单文件写入,性能比KafKa差。...不支持分布式事务消息 RocketMQ支持分布式事务 消息过滤 kafka不支持代理端消息过滤 RocketMQ支持代理端消息过滤 KafKa不支持延迟消息,而RocketMQ支持 重点 ActiveMQ...,线上关闭 autoCreateTopicEnable=true #是否允许broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #接受客户端连接的监听端口

1.1K30

RocketMQ详解(2)——RocketMQ核心概念

RocketMQ详解(2)——RocketMQ核心概念 一. RocketMQ专业术语 Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息。...RocketMQ的消费方式 广播消息 一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每一个Consumer...在JMS规范中,类似于P2P模型,但是RocketMQ的集群消费功能大于等于JMS的P2P消费。...因为集群消费模式下,RocketMQ单个Consumer Group内的消费类似于P2P,但是一个Topic/Queue可以被多个Consumer Group消费。...在RocketMQ中,该顺序主要指局部顺序,即一类消费为满足顺序性,必须Producer单线程发送,且发送到同一个队列,这样Consumer就可以按照Producer的发送顺序来消费消息。

1.3K20

RocketMQ

本文参考 消息存储 不会永久保存消息文件,而是启用文件过期策略,在磁盘空间不足或在凌晨4点删除过期文件,文件默认保存72小时,删除时不会判断该文件上的消息是否被消费...fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq...发送请求到broker获取consumer的客户端ID. Broker中为什么会持有该消费组的所有消费者信息?...Broker默认每10s持久化一次 广播模式: 保存在消费者客户端....master汇报 消息消费者向master拉取消息时,如果消息消费者内存中存在消息消费进度时,master会尝试跟新消息消费进度 读写分离 master负责读写,slave可以为读,也可以什么都不做 RocketMQ

2.1K30
领券