前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ延迟消费和重复消费

RabbitMQ延迟消费和重复消费

作者头像
allsmallpig
发布2021-02-25 14:51:37
2.3K0
发布2021-02-25 14:51:37
举报
文章被收录于专栏:allsmallpi博客

转载自 https://blog.csdn.net/quliuwuyiz/article/details/79301054

使用RabbitMQ实现延迟任务 场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

延迟任务的模型如下图:

基于 RabbitMQ 实现的分布式延迟重试队列 场景一:在消费该消息的时候,发现条件不满足,需要等待30分钟,重新消费该消息,再次判断是否满足条件,如果满足则消费该消息,如果不满足,则再等待30分钟。这样的场景通过mq队列来实现。

在消息队列的监听过程中,先判断条件是否满足,满足,则直接消费。不满足,则将该消息发送到上图的死信队列,但是在死信队列失效之后,需要重新转发到当前队列进行消费就可以实现该功能。

基本概念如下: 消息的TTL ( Time to Live ) 和 DLX (Dead Letter Exchange)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:

byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes); 当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。

Dead Letter Exchanges Exchage的概念在这里就不在赘述,可以从这里进行了解。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

2. 上面的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

package com.test.sender.delay;

import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map;

import javax.annotation.Resource;

import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component;

import com.drools.model.MQPushErrorFlow;

@Component @PropertySource(value = "classpath:riskConfigMq.properties") public class LifsInCompleteDataOneSend { private static final Log log = LogFactory.getLog(LifsInCompleteDataOneConfig.class);

private static final String DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST = "delay_queue_per_queue_lifs_ttl"; // TTL配置在队列上的缓冲队列。 private static final String DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST = "delay_queue_per_queue_lifs_routing_key"; // TTL配置在队列上的缓冲队列。

private static final Integer QUEUE_EXPIRATION_FIRST = 30000;

/** * 消息队列业务名称 */ @Value("${lifs.consumer.pushServiceName}") private String pushServiceName;

/** * 订阅平台名称 */ @Value("${lifs.consumer.platformName}") private String platformName;

/** * 消息队列一个业务使用的队列的数量 */ @Value("${lifs.consumer.queueShardingCount}") private Integer queueShardingCount;

/** * 交换机的名称,共用lifs监听的交换机 */ @Value("

/** * 底层需要使用的真实发送对象,每个发送对象都需要对应一个 */ @Resource(name = "lnCompleteDataOneRabbitTemplate") private RabbitTemplate rabbitTemplate;

@Bean public Queue delayQueueFirstTTL() { Map arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", exchangeName); arguments.put("x-dead-letter-routing-key", getDirectRoutingKey(pushServiceName, 0, platformName)); arguments.put("x-message-ttl", QUEUE_EXPIRATION_FIRST); Queue queue = new Queue(DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST, true, false, false, arguments); log.info("第一次延迟队列名称: " + DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST + "  延期之后的转发的routingKey: " + getDirectRoutingKey(pushServiceName, 0, platformName) + "  exchange: " + exchangeName); /* * Queue queue = QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL) // * delay_queue_per_queue_ttl .withArgument("x-dead-letter-exchange",DELAY_EXCHANGE_NAME) * .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) * .withArgument("x-message-ttl",QUEUE_EXPIRATION).build();  * Queue queue =new Queue(DELAY_QUEUE_PER_QUEUE_TTL,true); */ return queue; }

@Bean public Binding lnCompleteDataOneBinding() { return BindingBuilder.bind(delayQueueFirstTTL()).to(lnCompleteDataOneExchange()).with(DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST); }

@Bean(name = "lnCompleteDataOneExchange") public DirectExchange lnCompleteDataOneExchange() { return new DirectExchange(exchangeName); }

    private String getDirectRoutingKey(String pushServiceName, int shardingIndex, String platformName) {         return String.format("%s.%d.%s", pushServiceName, shardingIndex, platformName);     }     @Bean(name = "delayQueueFirstListenerContainer")     public String delayQueueFirstListenerContainer(@Qualifier("lnCompleteDataOneConnectionFactory") ConnectionFactory connectionFactory) {     Queue queue = delayQueueFirstTTL();     RabbitAdmin ra = new RabbitAdmin(connectionFactory);         ra.declareExchange(lnCompleteDataOneExchange());         ra.declareQueue(queue);         ra.declareBinding(lnCompleteDataOneBinding());         log.info("delayQueueFirstListenerContainer: queueName" + queue.getName() + "  exchangeName: " + lnCompleteDataOneExchange().getName() + " routingKey: " + DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST);         return "";     } /** * 自动生成uuid调用发送方法 *  * @param dto * @param routingId */ public String send(String message) { DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("延迟半分钟的队列中接受消息的时间: " + df.format(new Date()) + "\n消息的內容:" + message);

rabbitTemplate.convertAndSend(DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST, message); // 向队列里面发送消息,第一个参数是队列名称,第二个参数是内容

return "sender delay"; }

}

package com.test.sender.delay;

import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; import com.framework.mq.common.RabbitConfig;

@Component @PropertySource(value = "classpath:riskConfigMq.properties") public class LifsInCompleteDataOneConfig {

/** * MQ服务地址和端口号 */ @Value("{rabbitmq.password}") private String password; /** * MQ的虚拟主机 */ @Value("{rabbitmq.publisherConfirms}") private boolean publisherConfirms; /** * 缓存的channel的数量 */ @Value("

/** * 交换机的名称,共用lifs监听的交换机 */ @Value("${lifs.consumer.exchangeName}") private String exchangeName;

/** * 注入RabbitConfig对象 * @return */ @Bean(name = "lnCompleteDataOneRabbitConfig") public RabbitConfig rabbitConfig() { return new RabbitConfig(addresses, username, password, virtualHost, publisherConfirms, channelCacheSize, connectionCacheSize, exchangeName); }

/** * 注入连接工厂对象 *  * @param rabbitConfig 之前注入的 @RabbitConfig 对象 * @return */ @Bean(name = "lnCompleteDataOneConnectionFactory") public ConnectionFactory connectionFactory( @Qualifier(value = "lnCompleteDataOneRabbitConfig") RabbitConfig rabbitConfig) { return rabbitConfig.getConnectionFactory(); }

/** * 注入的 @RabbitTemplate 对象 *  * @param connectionFactory * @return */ @Bean(name = "lnCompleteDataOneRabbitTemplate") RabbitTemplate rabbitTemplate( @Qualifier("lnCompleteDataOneConnectionFactory") ConnectionFactory connectionFactory) {

return new RabbitTemplate(connectionFactory); } }

在初次监听消息队列的地方

在业务代码中,判断条件是否满足,如果不满足,赋值incompleteDataFlagResult=1,在第二次重试的时候,如果还不满足,则赋值incompleteDataFlagResult=2,如果满足,则赋值incompleteDataFlagResult=200,直接消费,并发送回调的mq。

if(incompleteDataFlagResult==1){ //推进到等待30秒过期的队列 lifsInCompleteDataOneSend.send(JSONObject.toJSONString(request));

}else if(incompleteDataFlagResult==2){  //推进到等待60秒过期的队列

lifsInCompleteDataTwoSend.send(JSONObject.toJSONString(request));

}else if(incompleteDataFlagResult==3){ // 进行保存,需要手工处理 InstallmentRequestFailure installmentRequestFailure = new InstallmentRequestFailure(); installmentRequestFailureService.save(installmentRequestFailure); }else if(incompleteDataFlagResult==200){ lifsPushSender.send(request, customerId); }

---------------------  作者:quliuwuyiz  来源:CSDN  原文:https://blog.csdn.net/quliuwuyiz/article/details/79301054  版权声明:本文为博主原创文章,转载请附上博文链接!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018/10/25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档