rabbitmq:3.9.2
spring-boot-starter-amqp:2.3.0.RELEASE
0 <= n <= 2^32-1
ms, 约 49 天rabbitmq_delayed_message_exchange
插件解决, https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
终端启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
代码中配置
/**
*
* @author https://www.skypyb.com/
*/
@Configuration public class RabbitBindConfig {
public final static String SKYPYB_DELAY_EXCHANGE = "skypyb-delay-exchange";
public final static String SKYPYB_DELAY_QUEUE = "skypyb-delay-queue";
public final static String SKYPYB_DELAY_KEY = "skypyb.key.delay";
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//自定义交换机
return new CustomExchange(SKYPYB_DELAY_EXCHANGE, "x-delayed-message", false, true, args);
}
@Bean
public Queue delayQueue() {
return new Queue(SKYPYB_DELAY_QUEUE, false, false, true);
}
@Bean
public Binding bindingDelayExchangeAndQueue() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(SKYPYB_DELAY_KEY).noargs();
}
}
消息重新入列是在队列头部
Demo验证
@RabbitHandler
public void process(Map<String, Object> testMessage, Channel channel, Message message) throws Exception {
Thread.sleep(3000);
String data = testMessage.get("messageData").toString();
log.info("DirectReceiver消费者收到消息: {}", data);
if (data.contains("2")) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
消费者端配置手动确认
消费线程数为1
生产者发送6条消息, messageData分别为1, 2, 3, 4, 5, 6
prefetch设为100
, 消费情况如下
DirectReceiver消费者收到消息: 1
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 3
DirectReceiver消费者收到消息: 4
DirectReceiver消费者收到消息: 5
DirectReceiver消费者收到消息: 6
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
...
prefetch设为3
, 消费情况如下
DirectReceiver消费者收到消息: 1
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 3
DirectReceiver消费者收到消息: 4
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 5
DirectReceiver消费者收到消息: 6
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
...
prefetch设为1
, 消费情况如下
DirectReceiver消费者收到消息: 1
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
...
事务与异步确认机制是冲突的, 只能启用其中一个
配置
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//启用通道事务性
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
使用 发送方法上加注解
@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
开启事务模式之后,RabbitMQ 生产者发送消息会多出几个步骤:
配置
@Bean
public AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer>
configure(AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> factory) {
factory.setChannelTransacted(true);
return factory;
}
使用 同生产者一致
消费者若启用事务, 则spring.rabbitmq.listener.simple.acknowledge-mode最好为auto, 若为manual, rollback后, 会阻塞当前消费者, 消息一直为unacked状态
//缓存模式 缓存channel
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
//最多缓存25个channel
connectionFactory.setChannelCacheSize(25);
//channel超时等待 当ChannelCheckoutTimeout的值大于0的时候,ChannelCacheSize的值就是最大的channel数量了,一旦从缓存中获取不到channel,等待ChannelCheckoutTimeout毫秒后,如果还是获取不到的,就会抛AmqpTimeoutException
connectionFactory.setChannelCheckoutTimeout(1000);
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。