重试机制提高了消息发送的成功率。...默认机制 sendLatencyFaultEnable=false,消息发送选择队列调用以下方法: org.apache.rocketmq.client.impl.producer.TopicPublishInfo...下面我会从源码的角度详细地分析rocketmq是如何实现在一定时间内规避故障broker的,从发送消息方法源码看出,在发送完消息,会调用updateFaultItem方法: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl...(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 发送消息时捕捉到异常同样会调用updateFaultItem方法,它是延迟机制的核心方法...,isolation表示broker是否需要规避,所以消息成功发送表示broker无需规避,消息发送失败时表示broker发生故障了需要规避。
概述 Producer 发送消息,RocketMQ 提供了三种模式。...2、异步发送 Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。...是2,所以除了正常调用一次外,发送消息如果失败了会重试2次。...异步发送:不会重试(调用总次数等于1) 2、循环执行发送消息 如果发送的消息未成功发送,则循环继续发送,直到发送的次数达到 timesTotal 。...4、调用 sendKernelImpl 方法进行发送消息 5、如果发送失败,则continue,继续循环发送,发送成功则直接 return 返回 ---- 同步发送原理 RocketMQ 通讯是使用 Netty
笔者将发送消息流程简化如下: 获取主题发布信息; 根据路由算法选择一个消息队列,也就是 selectOneMessageQueue方法; 调用 sendKernelImpl发放消息对象,封装成发送结果对象...发送消息 通过路由机制选择一个 messageQueue 之后,调用实例客户端 API 发送消息。...(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 发送消息时捕捉到异常同样会调用 updateFaultItem 方法: endTimestamp..., true); endTimestamp - beginTimestampPrev等于消息发送耗时,如果成功发送第三个参数传的是 false ,发送失败传 true。...RocketMQ 版服务端判定消息产生的顺序性是参照同一生产者发送消息的时序。不同生产者、不同线程并发产生的消息,云消息队列 RocketMQ 版服务端无法判定消息的先后顺序。
producer会轮询向指定topic的mq集合发送消息。 consumer有两种消费模式:集群消费和广播消费。...二、消息发送 生产者Demo 首先给出代码, package com.javadu.chapter8rocketmq.message; import org.apache.rocketmq.client.exception.MQBrokerException...消息发送源码分析 在RocketMQ中的client模块的包结构如下,可以看出,作者并没有将接口的定义和实现放在一个包下(这在我们的业务应用中是常见的做法,不一定合理)。...,如下图所示,DefaultMQProducer提供了很多发送消息的方法,可以实现同步发消息、异步发消息、指定消息队列、OneWay消息、事务消息等。...(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); //根据消息发送模式,对消息发送结果做不同的处理
rocketmq-remoting-4.6.0-sources.jar!...} //...... } sendDefaultImpl方法内部会通过selectOneMessageQueue选择MessageQueue,然后通过sendKernelImpl来执行消息发送...null); } //...... } sendSelectImpl方法通过MessageQueueSelector来选择MessageQueue,在通过sendKernelImpl发送消息之前会判断是否超时...DefaultMQProducerImpl的sendDefaultImpl方法内部会通过selectOneMessageQueue选择MessageQueue,然后通过sendKernelImpl来执行消息发送...call timeout");sendSelectImpl方法通过MessageQueueSelector来选择MessageQueue,在通过sendKernelImpl发送消息之前会判断是否超时,
序 本文主要研究一下rocketmq的RemotingTooMuchRequestException RemotingTooMuchRequestException rocketmq-remoting-...} //...... } sendDefaultImpl方法内部会通过selectOneMessageQueue选择MessageQueue,然后通过sendKernelImpl来执行消息发送...null); } //...... } sendSelectImpl方法通过MessageQueueSelector来选择MessageQueue,在通过sendKernelImpl发送消息之前会判断是否超时...DefaultMQProducerImpl的sendDefaultImpl方法内部会通过selectOneMessageQueue选择MessageQueue,然后通过sendKernelImpl来执行消息发送...call timeout");sendSelectImpl方法通过MessageQueueSelector来选择MessageQueue,在通过sendKernelImpl发送消息之前会判断是否超时,
序 本文主要研究一下rocketmq的retryTimesWhenSendAsyncFailed pubsunmode.png DefaultMQProducerImpl rocketmq-client...sendKernelImpl方法根据communicationMode做不同的处理,如果是ASYNC,则通过mQClientFactory.getMQClientAPIImpl().sendMessage来发送消息返回...sendMessageAsync方法进行重试,如果出现InterruptedException、RemotingTooMuchRequestException时会再次调用onExceptionImpl,...sendKernelImpl方法根据communicationMode做不同的处理,如果是ASYNC,则通过mQClientFactory.getMQClientAPIImpl().sendMessage来发送消息返回...sendMessageAsync方法进行重试,调用过程中出现异常会根据异常类型再次执行onExceptionImpl方法 doc DefaultMQProducerImpl
(存储消息在《RocketMQ 源码分析 —— Message 存储》解析) ? Producer发送消息全局顺序图 2、Producer 发送消息 ?...// 循环调用发送消息,直到成功 34: for (; times < timesTotal; times++) { 35: String lastBrokerName...第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo() 第 30 & 34 行 :计算调用发送消息到成功为止的最大次数...同步或异步发送消息会调用多次,默认配置为3次。...第 29 至 39 行 :消息格式与大小校验。 第 47 行 :调用 CommitLong 进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》
序 本文主要研究一下rocketmq的retryTimesWhenSendAsyncFailed DefaultMQProducerImpl rocketmq-client-4.5.2-sources.jar...sendKernelImpl方法根据communicationMode做不同的处理,如果是ASYNC,则通过mQClientFactory.getMQClientAPIImpl().sendMessage来发送消息返回...sendMessageAsync方法进行重试,如果出现InterruptedException、RemotingTooMuchRequestException时会再次调用onExceptionImpl,...sendKernelImpl方法根据communicationMode做不同的处理,如果是ASYNC,则通过mQClientFactory.getMQClientAPIImpl().sendMessage来发送消息返回...sendMessageAsync方法进行重试,调用过程中出现异常会根据异常类型再次执行onExceptionImpl方法 doc DefaultMQProducerImpl
中获取 没有再从NameServer中请求获取 依然没有则使用默认topic(TBW102)的路由信息 2.选择一个MessageQueue进行发送 3.组装requestHeader发送消息...设置客户端MsgId 超过4K消息压缩设置压缩消息标记 设置事务消息标记 判断发送前钩子执行 消息发送完钩子执行 private SendResult sendDefaultImpl...final MessageQueue mq, //将消息发送到该队列上 final CommunicationMode communicationMode, //消息发送模式...requestHeader.setFlag(msg.getFlag());//消息标记,RocketMQ中不做处理 requestHeader.setProperties...MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } //调用通道发送消息
MQClientException, RemotingException, MQBrokerException, InterruptedException { //同步、异步、oneway默认都会调用...mq.getBrokerName()); if (null == brokerAddr) { /* 找不到则尝试去抓取,因此第一次发送消息可能会有比较大延时...context, true, producer); } } } }); } 这里又会去调用...channelFuture.cause()); } } else { /* 超时时间内未建立连接成功...发送端这里逻辑还是比较简单的,提交一条消息后就通过Netty发送到Broker,而Kafa的会更复杂一点,Kafka这里会做一个合并,客户端提交是放到一个内存队列,然后有一个Sender线程负责根据当前的状态决定是否发送消息
因此,本篇主要从一条消息发送为切入点,详细阐述在RocketMQ这款分布式消息队列中发送一条普通消息的大致流程和细节。...,主要基于RocketMQ-Client模块将消息发送至RocketMQ的主节点。...,RocketMQ就会调用sendKernelImpl()方法发送消息(该方法为,通过RocketMQ的Remoting通信模块真正发送消息的核心)。...数据包发送对应的Broker上,默认发送超时间为3s; (5)这里,真正调用RocketMQ的Remoting通信模块完成消息发送是在MQClientAPIImpl实例sendMessageSync(...sendResult对象; (7)发送返回后,调用updateFaultItem更新Broker代理服务器的可用时间; (8)对于异常情况,且标志位—retryAnotherBrokerWhenNotStoreOK
MQ介绍 ##1.1 为什么要用MQ 消息队列是一种“先进先出”的数据结构 其应用场景主要包含以下3个方面 应用解耦 系统的耦合性越高,容错性就越低。.../img/支付组件图.png)] 用户请求支付系统 支付系统调用第三方支付平台API进行发起支付流程 用户通过第三方支付平台支付成功后,第三方支付平台回调通知支付系统 支付系统调用订单服务修改订单状态...支付系统调用积分服务添加积分 支付系统调用日志服务记录日志 1.2 问题分析 问题1 用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。...1.6.1 消费幂等的必要性 在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况: 发送时消息重复 当一条消息已被成功发送到服务端并完成持久化.../img/发送批量消息.png)] 批量消息发送是将同一个主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。
进行消息发送的过程首先会准备好路由信息,最终是由netty完成的,也即使用nettyRemotingClient来实现的。...1.发送消息 /** * Send message in synchronous mode.... *发送消息使用同步模式,仅当发送生产者消息完成后这个方法才返回 * Warn: this method has internal retry-mechanism...结果可能会有多个消息发送给broker。所以需要解决潜在的重复问 * @param msg Message to send....oneMessageQueue //消息发送端采用重试机制,由retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送结果后执行回调之前进行重试 //选择队列的两种方式
那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键** 一、RocketMQ中Remoting通信模块概览 RocketMQ消息队列的整体部署架构如下图所示: ?...RocketMQ中其他的组件(如client、nameServer、broker在进行消息的发送和接收时均使用这两个组件) 2、消息的协议设计与编码解码 在Client和Server之间完成一次消息发送时...,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。...f) throws Exception { if (f.isSuccess()) { //如果发送消息成功给...在Client端发送异步消息时候(rocketmq-client模块最终调用sendMessageAsync方法时),会将InvokeCallback的接口注入,而在Server端的异步线程由上面所讲的业务线程池真正执行后
MQ 我们这里选型的是 RocketMQ 中间件,为什么选用 RocketMQ,可以看前面的文章(消息中间件能干什么?RabbitMQ、Kafka、RocketMQ正确选型姿势)有详细的分析。...其次,订单系统就会向 RocketMQ 中发送个一个订单已支付的消息,接着关联系统就会各自业务处理,如: 积分系统从 RocketMQ 里获取消息,然后依据消息去累加积分; 营销系统会从 RocketMQ...当我们发送一条订单支付成功消息到 RocketMQ 里的时候,他会依据自己的负载均衡算法以及容错算法将消息发送到其中一个Broker中去。如果想了解更细节的算法以及如何分配,后面再分享出来哈。...其他耦合系统改造 通过上面对于订单系统自身的改造,主要保留了状态更新以及库存扣减,然后将支付成功消息推送到 RocketMQ 里去,也完成了相应的代码编写,那接下来,我们就需要在关联系统中来获取订单系统发送来的消息...现在改造完之后,只要支付成功,订单系统现在只需要更新订单状态 30 ms + 扣减库存 80 ms + 发送订单消息到RocketMq z中10 ms ,总共大概 120 ms 就可以了,完美的控制在
一笔支付成功之后,音箱没有发出收款成功的播报。一切流程排查下来之后,这才发现原来 MQ 消费端没有正常在消费消息。...MQ 消费端应用没有异常,但是无法正常消费 MQ 控制台发送消息,消费端可以成功消费消息 排查问题 刚开始排查的时候,由于没有任何异常业务日志可以定位问题,所以问题排查起来十分困难。...为什么 mq 控制台重新发送的消息消费者可以收到? rocketmq 控制台重新发送消息代码如下: ?...MessageService 将会把消息的元数据封装一个CONSUME_MESSAGE_DIRECTLY类型的请求,接着调用 rocketmq 提供的 admin API,给 rocketmq broker...这对于生产者来说,可能是一个致命的问题,因为消息生产者通常消息发送延时要低。 这种情况下,我们就可以将消息发送到 VIP 端口,从而降低消息发送的延时。
为了应对这个场景,最终我们引入了阿里云的RocketMq,RocketMq可以处理可以处理很多消息堆积,并且服务的稳定不挂也可以由阿里云保证。...引入了RocketMq了之后,的确解决了队列堆积导致消息队列宕机的问题。...在RocketMq中,普通消息和顺序消息有没有什么办法提升消息消费速度? 消息失败重试次数怎么设置较为合理?顺序消息和普通消息有不同吗? 2....Sync: 同步发送,需要关心结果,根据结果判断是否需要进行重试,然后回到Step3。 可以看见Rocketmq发送普通消息的流程比较清晰简单,下面来看看顺序消息。...对RocketMq熟悉的小伙伴会发现,它其实并没有提供顺序消息发送相关的API,但是在阿里云的RocketMq版本提供了顺序消息的API,原理比较简单,其实也是对现有API的一个封装: SendResult
二、Queue in Topic 对于RocketMQ而言,Topic只是一个逻辑上的概念,真正的消息存储其实是在Topic中的Queue中。想一想,为什么RocketMQ要这要设计呢?...rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。 rocketmq-client:提供发送、接受消息的客户端API。...img 如上图所示,消息数据独立存储,业务和消息解耦,实质上消息的发送有2次,一条是转账消息,另一条是确认消息。 到这里,我们先来看看基于RocketMQ的代码: ?...事务消息是否对消费者可见,完全由事务返回给RMQ的状态码决定(状态码的本质也是一条消息)。 ? ? 生产者发送了2条消息给RMQ,有一条本地事务执行成功,有一条本地事务执行失败。...假设,我们发送一条转账事务消息给RMQ,成功后回调本地事务,DB减操作成功,刚准备给RMQ一个确认消息,此时突然断电,或者网络抖动,使得这条确认消息没有发送出去。
Queue in Topic 对于RocketMQ而言,Topic只是一个逻辑上的概念,真正的消息存储其实是在Topic中的Queue中。想一想,为什么RocketMQ要这要设计呢?...rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。...rocketmq-client:提供发送、接受消息的客户端API。...运行结果 生产者发送了2条消息给RMQ,有一条本地事务执行成功,有一条本地事务执行失败。...假设,我们发送一条转账事务消息给RMQ,成功后回调本地事务,DB减操作成功,刚准备给RMQ一个确认消息,此时突然断电,或者网络抖动,使得这条确认消息没有发送出去。
领取专属 10元无门槛券
手把手带您无忧上云