我们消息中心是负责承载各个业务(比如电商,物流,营销中心,券中心,会员中心,积分中心,停车场等等)的消息发送需求,那么消息呢就可能有延迟需求,比如物流到货后十五分钟进行一次邀请评价的需求。而且做个消息延迟发送,我们不可能让业务自己去写消息啥时候发送,到发送时间了再调我们接口这种逻辑,这样不合理,我们需要做比较强大的消息中心功能。
那么延迟消息的实现有多种多样的实现,我们前阵子想实现延迟消息,对此做了一定的讨论,最后发现许多方案要么不支持分布式项目,要么平白的对机器性能损耗比较大,要么可能存在系统崩溃数据丢失的风险,最后我们采用了RabbitMQ的方式实现延迟队列。
上面提到的消息死亡有几种类型 消息被拒绝(basic.reject / basic.nack),并且requeue = false 消息TTL过期 队列达到最大长度
RabbitMQ支持给队列内的消息设置过期时间和给消息单独过期时间,那么结合死信队列我们就可以做到消息的延迟发送了; 大概是以下步骤 1.创建延迟队列并设置消息的过期时间,绑定一个死信队列 2.不创建该队列的消费者,让其内部消息根据过期时间自动过期 3.创建死信队列的消费者,使其每次消费死亡的消息;
死信队列结构图
看到之前有的人写的博文写的比较复杂,还把交换机写进来了,其实完全没必要,死信队列根本上只是队列之间的绑定以及数据交换,具体代码就不说了,因为重点不在这里;
因此这种方式直接pass了;
自己版本对应的ez文件类型插件
。rabbitmq_delayed_message_exchange
下载完放入rabbitmq 的plugins文件下#查看插件目录
rabbitmq-plugins list
#安装延迟队列插件 (rabbitmq-plugins enable 插件名)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
这个优点就是解决了上面方式的缺点。。。让消息延迟性绑定到消息本身上,使的每个消息有自己的过期时间;
根本上来说:我们指定其是delay类型不过是决定了其什么时候投递到指定路由队列
我们目前有两种不同类型的延迟,因此我这里用的topic模式结合延迟插件实现延迟队列;
topic结合delay插件实现延迟消息架构
如上所示,我们延迟的消息首先都放置到Mq里,然后延迟时间到了之后呢就会被路由到指定队列上去;
这么做有个小问题,如果我们延迟消息过多的话,那么必然存在着rabbitmq挤压消息,占用空间的问题,当然解决方案也比较简单
如果我们消息非常非常多,可以把消息分发区间划的更细点,比如只存储每12小时的消息,甚至只存储每个小时要发送的消息,这都是完全OK的;
代码也很简单,这里提供一个绑定了两种业务的延迟队列的小demo: 延迟队列配置:
@Configuration
public class DelayedRabbitMQConfig {
/**
* 声明延时队列hangfire
* 不设置TTL
*/
@Bean("hDelayQueue")
public Queue hangfireDelayQueue() {
return new Queue(BaseDict.DELAY_QUEUE_NAME_H);
}
/**
* 业务调用延迟队列
*
*/
@Bean("mDelayQueue")
public Queue moduleDelayQueue() {
return new Queue(BaseDict.DELAY_QUEUE_NAME_M);
}
/**
* 延迟交换机
*
*/
@Bean(name = "delayExchange")
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "topic");
return new CustomExchange(BaseDict.DELAY_EXCHANGE_DISPATCHER, "x-delayed-message", true, false, args);
}
@Bean
public Binding hangfireBinding(@Qualifier("hDelayQueue") Queue queue,
@Qualifier("delayExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_H).noargs();
}
@Bean
public Binding moduleBinding(@Qualifier("mDelayQueue") Queue queue,
@Qualifier("delayExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_M).noargs();
}
}
消息生产
@PostMapping("addMsgToMDelayQueue")
public void addMsgToMQueue(@RequestBody XXX request) {
sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_M, JSONObject.toJSONString(request), 10*24*60*60*1000);
}
@PostMapping("addMsgToHDelay")
public void addMsgToHQueue(@RequestBody List<AAA> request) {
sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_H, JSONObject.toJSONString(request), 10*24*60*60*1000);
}
/**
* 消息发送到延迟交换机上
*/
public void sendDelayMsg(String exchange,String routingKey, String msg, Integer delayTime) {
rabbitTemplate.convertAndSend(exchange, routingKey, msg, a -> {
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
消息消费
@RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_H, concurrency = "10-30")
public void hangfireReceive(Message message, Channel channel) throws IOException {
final List<AAA> sendDetails = JSONObject.parseArray(JSONObject.toJSONString(message.getBody()), AAA.class);
Optional.ofNullable(sendDetails).ifPresent(item->item.forEach(System.out::println));
}
@RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_M, concurrency = "10-20")
public void receiveQ(Message message, Channel channel) throws IOException {
final XXX request = JSONObject.parseObject(message.getBody(), XXX.class);
System.out.println("消费"+request.toString());
}
我们在第一次使用这个延迟插件的时候做了一个压测,大约100W数据量的延迟会导致内存和Cpu使用量的急速上升,查了一些文档没搞明白后,去了官网看了下,发现其对此有以下解释,大致是讲目前这个延迟插件还不足以支持那么大的数据量,建议数据量不要太大 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72
因此我们如果想用好延迟插件,目前来说需要做一些额外的配合,尽量使其延时最近的数据,并且数据量维持到一个比较低的程度