投递主要针对生产端,什么是生产端的可靠性投递?
BAT/TMD 互联网大厂解决方案,看具体业务和并发量
消息落库步骤:
流程的示意图如上所示,比如我下单成功了,这是进行step1,对我的业务数据进行入库,业务数据入库完毕(这里要特别注意一定要保证业务数据入库)再对要发送的消息进行入库,图中采用了两个数据库,可以根据实际业务场景来确定是否采用两个数据库,如果采用了两个数据库,有人可能就像到了采用分布式事务来保证数据的一致性,但是在大型互联网中,基本很少采用事务,都是采用补偿机制。
对业务数据和消息入库完毕就进入setp2,发送消息到MQ服务上,按照正常的流程就是消费者监听到该消息,就根据唯一id修改该消息的状态为已消费,并给一个确认应答ack到Listener。如果出现意外情况,消费者未接收到或者Listener接收确认时发生网络闪断,接收不到,这时候就需要用到我们的分布式定时任务来从msg数据库抓取那些超时了还未被消费的消息,重新发送一遍。重试机制里面要设置重试次数限制,因为一些外部的原因导致一直发送失败的,不能重试太多次,要不然会拖垮整个服务。例如重试三次还是失败的,就把消息的status设置成2,然后通过补偿机制,人工去处理。实际生产中,这种情况还是比较少的,但是你不能没有这个补偿机制,要不然就做不到可靠性了。
数据库库表结构:订单表和消息记录表
-- 表 order 订单结构
CREATE TABLE IF NOT EXISTS `t_order` (
`id` varchar(128) NOT NULL, -- 订单ID
`name` varchar(128), -- 订单名称 其他业务熟悉忽略
`message_id` varchar(128) NOT NULL, -- 消息唯一ID
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 表 broker_message_log 消息记录结构
CREATE TABLE IF NOT EXISTS `broker_message_log` (
`message_id` varchar(128) NOT NULL, -- 消息唯一ID
`message` varchar(4000) DEFAULT NULL, -- 消息内容
`try_count` int(4) DEFAULT '0', -- 重试次数
`status` varchar(10) DEFAULT '', -- 消息投递状态 0 投递中 1 投递成功 2 投递失败
`next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 下一次重试时间 或 超时时间
`create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 创建时间
`update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 更新时间
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
延迟投递:
回想第一种方案,生产端既要对业务数据入库,又要对消息数据入库,这种设计在高并发场景下,真的合适吗?这时候需要我们的第二种方案了,流程图如下。
upstream Server就是我们的上游服务,也就是生产者,生产者将业务数据入库成功后,生成两条消息,一条是立即发送出去给到下游服务 downstream Server的,一条是延迟消息给到 补偿服务 callback Server的。
正常情况下,下游服务监听到这个即时的消息,会发送一条消息给到callback Server,注意这里不是采用第一种方案里面的返回ack方式,而是发送了一条消息给回去。
callback Server监听到这个消息,知道了刚才有一条消息消费成功了,然后把这个持久化到数据库中,当上游服务发送的延迟消息到达callback Server时,callback Server就会去数据库查询,刚才下游服务是否有处理过这个对应的消息,如果其msg DB里面有这个记录就说明这条消息是已经被消费了,如果不存在这个记录,那么callback Server就会发起一个RPC请求给到上游服务,告诉上游服务,你刚才这个消息没发送成功,需要重新发送一遍,上游服务就重新发送即时和延迟的两条消息出去,按照之前的流程继续走一遍。
虽然第二种方案也是无法做到100%的可靠传递,在特别极端的情况,还是需要定时任务和补偿机制进行辅助的。但是第二种方案的核心是减少数据库操作,这个点很重要!
在高并发场景下,我考虑的不是百分百的可靠性了,而是考虑可用性,性能能否扛得住这个流量,所以我能减少一次数据库操作就减少一次。我上游服务减少了一次数据库操作,我的服务性能相对而言就提高了一些,而且又能把异步callback Server补偿服务解耦出来。
结论
这两种方案都是可行的,需要根据实际业务来进行选择,大型的超高并发的场景会选择第二种方案,普通的就采用第一种即可。
幂等就是一个操作,不论执行多少次,产生的效果和返回的结果都是一样的。
消费端-幂等性保障
在海里订单产生的业务高峰期,如何避免消息的重复消费问题?
消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到多条一样的消息。
业界主流的幂等性操作:
select + insert机制 并发不高的后台系统,或者一些任务JOB,为了支持幂等,支持重复执行,简单的处理方法是,先查询下一些关键数据,判断是否已经执行过,再进行业务处理,就可以了。注意:核心高并发流程不要用这种方法;
利用 Redis 的原子性实现 Redis的操作之所以是原子性的,是因为Redis是单线程的。 如果存在相同的key会,将旧数据覆盖掉
使用Redis进行幂等,需要考虑的问题? 第一:我们是否要进行数据落地,如果落地的话,关键解决的问题是数据库和Redis缓存如何做到原子性? 第二:如果不进行落地,那么都存储在缓存中,如何设置定时同步策略?
理解Confirm 消息确认机制:
如何实现Confirm 确认消息?
代码实例: https://blog.csdn.net/ctwy291314/article/details/80534604
不可路由
的消息!不可达
的消息,就要使用Return Listener!在基础API中有一个关键项配置:
代码实例 https://blog.csdn.net/m0_37743948/article/details/82864452
prefetchSize
:单条消息的大小限制,0代表不限制
prefetchCount
:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global
:true\false 是否将上面应用于channel,简单点说,就是上面限制是channel级别还是consumer级别// 1 限流方式 第一件事就是autoAck设置为false
// channel.basicQos(perfetchSize, prefetchCount, global);
channel.basicQos(0, 1, false);// 一条一条处理
channel.basicConsume(queueName, false, new MyConsumer(channel);// 手动签收
生存时间
DLX也是一个正常的exchange和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。 当这个队列有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。 可以监听这个队列中的消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前的immediate参数的功能。
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #
然后我们进行正常的声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
这样消息在过期、requeue、队列在到达最大长度时,消息就可以直接路由到死信队列!