首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RocketMQ消息发送的高可用设计

重试机制提高了消息发送成功率。...默认机制 sendLatencyFaultEnable=false,消息发送选择队列调用以下方法: org.apache.rocketmq.client.impl.producer.TopicPublishInfo...下面我会从源码的角度详细地分析rocketmq是如何实现在一定时间内规避故障broker的,从发送消息方法源码看出,在发送消息,会调用updateFaultItem方法: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl...(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 发送消息时捕捉到异常同样会调用updateFaultItem方法,它是延迟机制的核心方法...,isolation表示broker是否需要规避,所以消息成功发送表示broker无需规避,消息发送失败时表示broker发生故障了需要规避。

1.2K31
您找到你想要的搜索结果了吗?
是的
没有找到

聊聊 RokcetMQ 生产者

笔者将发送消息流程简化如下: 获取主题发布信息; 根据路由算法选择一个消息队列,也就是 selectOneMessageQueue方法; 调用 sendKernelImpl发放消息对象,封装成发送结果对象...发送消息 通过路由机制选择一个 messageQueue 之后,调用实例客户端 API 发送消息。...(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 发送消息时捕捉到异常同样会调用 updateFaultItem 方法: endTimestamp..., true); endTimestamp - beginTimestampPrev等于消息发送耗时,如果成功发送第三个参数传的是 false ,发送失败传 true。...RocketMQ 版服务端判定消息产生的顺序性是参照同一生产者发送消息的时序。不同生产者、不同线程并发产生的消息,云消息队列 RocketMQ 版服务端无法判定消息的先后顺序。

26650

RocketMQ学习-消息发布和订阅

producer会轮询向指定topic的mq集合发送消息。 consumer有两种消费模式:集群消费和广播消费。...二、消息发送 生产者Demo 首先给出代码, package com.javadu.chapter8rocketmq.message; import org.apache.rocketmq.client.exception.MQBrokerException...消息发送源码分析 在RocketMQ中的client模块的包结构如下,可以看出,作者并没有将接口的定义和实现放在一个包下(这在我们的业务应用中是常见的做法,不一定合理)。...,如下图所示,DefaultMQProducer提供了很多发送消息的方法,可以实现同步发消息、异步发消息、指定消息队列、OneWay消息、事务消息等。...(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); //根据消息发送模式,对消息发送结果做不同的处理

5.8K20

消息中间件—RocketMQ消息发送

因此,本篇主要从一条消息发送为切入点,详细阐述在RocketMQ这款分布式消息队列中发送一条普通消息的大致流程和细节。...,主要基于RocketMQ-Client模块将消息发送RocketMQ的主节点。...,RocketMQ就会调用sendKernelImpl()方法发送消息(该方法为,通过RocketMQ的Remoting通信模块真正发送消息的核心)。...数据包发送对应的Broker上,默认发送超时间为3s; (5)这里,真正调用RocketMQ的Remoting通信模块完成消息发送是在MQClientAPIImpl实例sendMessageSync(...sendResult对象; (7)发送返回后,调用updateFaultItem更新Broker代理服务器的可用时间; (8)对于异常情况,且标志位—retryAnotherBrokerWhenNotStoreOK

2.3K30

RocketMQ

MQ介绍 ##1.1 为什么要用MQ 消息队列是一种“先进先出”的数据结构 其应用场景主要包含以下3个方面 应用解耦 系统的耦合性越高,容错性就越低。.../img/支付组件图.png)] 用户请求支付系统 支付系统调用第三方支付平台API进行发起支付流程 用户通过第三方支付平台支付成功后,第三方支付平台回调通知支付系统 支付系统调用订单服务修改订单状态...支付系统调用积分服务添加积分 支付系统调用日志服务记录日志 1.2 问题分析 问题1 用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。...1.6.1 消费幂等的必要性 在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ消息有可能会出现重复,这个重复简单可以概括为以下情况: 发送消息重复 当一条消息已被成功发送到服务端并完成持久化.../img/发送批量消息.png)] 批量消息发送是将同一个主题的多条消息一起打包发送消息服务端,减少网络调用次数,提高网络传输效率。

81210

分布式消息队列 RocketMQ 源码分析 —— RPC 通信(一)

那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键** 一、RocketMQ中Remoting通信模块概览 RocketMQ消息队列的整体部署架构如下图所示: ?...RocketMQ中其他的组件(如client、nameServer、broker在进行消息发送和接收时均使用这两个组件) 2、消息的协议设计与编码解码 在Client和Server之间完成一次消息发送时...,需要对发送消息进行一个协议约定,因此就有必要自定义RocketMQ消息协议。...f) throws Exception { if (f.isSuccess()) { //如果发送消息成功给...在Client端发送异步消息时候(rocketmq-client模块最终调用sendMessageAsync方法时),会将InvokeCallback的接口注入,而在Server端的异步线程由上面所讲的业务线程池真正执行后

67920

客服MM被投诉说下单耗时很长,老板下令必须控制在1秒以内

MQ 我们这里选型的是 RocketMQ 中间件,为什么选用 RocketMQ,可以看前面的文章(消息中间件能干什么?RabbitMQ、Kafka、RocketMQ正确选型姿势)有详细的分析。...其次,订单系统就会向 RocketMQ发送个一个订单已支付的消息,接着关联系统就会各自业务处理,如: 积分系统从 RocketMQ 里获取消息,然后依据消息去累加积分; 营销系统会从 RocketMQ...当我们发送一条订单支付成功消息RocketMQ 里的时候,他会依据自己的负载均衡算法以及容错算法将消息发送到其中一个Broker中去。如果想了解更细节的算法以及如何分配,后面再分享出来哈。...其他耦合系统改造 通过上面对于订单系统自身的改造,主要保留了状态更新以及库存扣减,然后将支付成功消息推送到 RocketMQ 里去,也完成了相应的代码编写,那接下来,我们就需要在关联系统中来获取订单系统发送来的消息...现在改造完之后,只要支付成功,订单系统现在只需要更新订单状态 30 ms + 扣减库存 80 ms + 发送订单消息RocketMq z中10 ms ,总共大概 120 ms 就可以了,完美的控制在

42520

上线之后,消息收不到了!

一笔支付成功之后,音箱没有发出收款成功的播报。一切流程排查下来之后,这才发现原来 MQ 消费端没有正常在消费消息。...MQ 消费端应用没有异常,但是无法正常消费 MQ 控制台发送消息,消费端可以成功消费消息 排查问题 刚开始排查的时候,由于没有任何异常业务日志可以定位问题,所以问题排查起来十分困难。...为什么 mq 控制台重新发送消息消费者可以收到? rocketmq 控制台重新发送消息代码如下: ?...MessageService 将会把消息的元数据封装一个CONSUME_MESSAGE_DIRECTLY类型的请求,接着调用 rocketmq 提供的 admin API,给 rocketmq broker...这对于生产者来说,可能是一个致命的问题,因为消息生产者通常消息发送延时要低。 这种情况下,我们就可以将消息发送到 VIP 端口,从而降低消息发送的延时。

1K21

深入理解RocketMq普通消息和顺序消息使用,原理,优化

为了应对这个场景,最终我们引入了阿里云的RocketMqRocketMq可以处理可以处理很多消息堆积,并且服务的稳定不挂也可以由阿里云保证。...引入了RocketMq了之后,的确解决了队列堆积导致消息队列宕机的问题。...在RocketMq中,普通消息和顺序消息有没有什么办法提升消息消费速度? 消息失败重试次数怎么设置较为合理?顺序消息和普通消息有不同吗? 2....Sync: 同步发送,需要关心结果,根据结果判断是否需要进行重试,然后回到Step3。 可以看见Rocketmq发送普通消息的流程比较清晰简单,下面来看看顺序消息。...对RocketMq熟悉的小伙伴会发现,它其实并没有提供顺序消息发送相关的API,但是在阿里云的RocketMq版本提供了顺序消息的API,原理比较简单,其实也是对现有API的一个封装: SendResult

2.7K10

RocketMQ实战(三)分布式事务

二、Queue in Topic 对于RocketMQ而言,Topic只是一个逻辑上的概念,真正的消息存储其实是在Topic中的Queue中。想一想,为什么RocketMQ要这要设计呢?...rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息rocketmq-client:提供发送、接受消息的客户端API。...img 如上图所示,消息数据独立存储,业务和消息解耦,实质上消息发送有2次,一条是转账消息,另一条是确认消息。 到这里,我们先来看看基于RocketMQ的代码: ?...事务消息是否对消费者可见,完全由事务返回给RMQ的状态码决定(状态码的本质也是一条消息)。 ? ? 生产者发送了2条消息给RMQ,有一条本地事务执行成功,有一条本地事务执行失败。...假设,我们发送一条转账事务消息给RMQ,成功后回调本地事务,DB减操作成功,刚准备给RMQ一个确认消息,此时突然断电,或者网络抖动,使得这条确认消息没有发送出去。

40910

RocketMQ实战(三):分布式事务关于多Master多Slave的说明Queue in Topic初步认识RocketMQ的核心模块Order MessageTransaction Messag

Queue in Topic 对于RocketMQ而言,Topic只是一个逻辑上的概念,真正的消息存储其实是在Topic中的Queue中。想一想,为什么RocketMQ要这要设计呢?...rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。...rocketmq-client:提供发送、接受消息的客户端API。...运行结果 生产者发送了2条消息给RMQ,有一条本地事务执行成功,有一条本地事务执行失败。...假设,我们发送一条转账事务消息给RMQ,成功后回调本地事务,DB减操作成功,刚准备给RMQ一个确认消息,此时突然断电,或者网络抖动,使得这条确认消息没有发送出去。

1.2K40
领券