实例在order微服务中发送确认
// 创建订单之后给restaurant发消息
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 配置channel,开启确认模式
channel.confirmSelect();
//单条同步确认机制
/*if (channel.waitForConfirms()) {
log.info("RabbitMQ confirm success");
} else {
log.info("RabbitMQ confirm failed");
}*/
// 异步同步确认机制
ConfirmListener confirmListener = new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
log.info("Ack deliveryTag:{},mutiple:{}", l, b);
// 消息发送成功
}
@Override
public void handleNack(long l, boolean b) throws IOException {
log.info("Nack deliveryTag:{},mutiple:{}", l, b);
// 消息发送失败
}
};
channel.addConfirmListener(confirmListener);
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
//(exchange,routingKey,消息特殊参数,消息体本身(字节))
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
// 发送多条消息
/*for (int i = 0; i < 10; i++) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
}
Thread.sleep(10000);*/
} catch (InterruptedException e) {
e.printStackTrace();
}
示例在restaurant微服务中无法被路由
DeliverCallback deliverCallback = (consumerTag, message) -> {
String messageBody = new String(message.getBody());
log.info("deliverCallback:messageBody:{}", messageBody);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
try {
// 消息发表序列化
OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
OrderMessageDTO.class);
// 查根据id数据库
ProductPO productPO = productDao.selsctProduct(orderMessageDTO.getProductId());
log.info("onMessage:productPO:{}", productPO);
RestaurantPO restaurantPO = restaurantDao.selsctRestaurant(productPO.getRestaurantId());
log.info("onMessage:restaurantPO:{}", restaurantPO);
// 校验是否可以下订单
if (ProductStatusEnum.AVALIABLE == productPO.getStatus() &&
RestaurantStatusEnum.OPEN == restaurantPO.getStatus()) {
orderMessageDTO.setConfirmed(true);
orderMessageDTO.setPrice(productPO.getPrice());
} else {
orderMessageDTO.setConfirmed(false);
}
log.info("sendMessage:restaurantOrderMessageDTO:{}", orderMessageDTO);
// 校验完订单回消息给订单微服务
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
/*channel.addReturnListener(new ReturnListener() {
*//**
* 1. channel.basicPublish第三个参数Mandatory为true后
* 2. channel添加ReturnListener,
* 3. 当消息没有被路由后,会调用handleReturn方法
*//*
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
log.info("Message Return: replyCode: {},replyText: {},exchange: {},routingKey: {},properties: {},body : {}", i, s, s1, s2, basicProperties, new String(bytes));
// 除了打印log可以加别的业务操作
}
});*/
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return aReturn) {
/**
* Return aReturn中的字段和上面方法相同
*/
log.info("Message Return: returnMessage{}", aReturn);
// 除了打印log可以加别的业务操作
}
});
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant", "key.order", true, null, messageToSend.getBytes());
// 如果channel关闭则不能接收返回,睡眠1s
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (JsonProcessingException | TimeoutException e) {
e.printStackTrace();
}
};
自动ACK:消费端收到消息后,会自动签收消息 手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息
单条手动ACK: multiple=false 多条手动ACK: multiple=true (推荐使用单条ACK)
若设置了重回队列,消息被NACK之后,会返回队列末尾,等待进一步被处理 一般不建议开启重回队列,因为第一次处理异常的消息,再次处理,基本上也是异常
实现步骤:
/*channel.basicConsume("queue.restaurant",
true,
deliverCallback,
consumerTag -> {
});*/
// 处理消息手动ack
channel.basicConsume("queue.restaurant",
false,
deliverCallback,
consumerTag -> {
});
// 使用全局的channel进行消费者确认
// 手动签收消息(单条签收)
// channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
// 多条消息手动签收(5条消息全部签收一次)
if (message.getEnvelope().getDeliveryTag() % 5 == 0) {
channel.basicAck(message.getEnvelope().getDeliveryTag(), true);
}
// 手动拒收消息
// channel.basicNack(message.getEnvelope().getDeliveryTag(), false, true);
业务高峰期,有个微服务崩溃了,崩溃期间队列挤压了大量消息,微服务上线后,收到大量并发消息。 将同样多的消息推给能力不同的副本,会导致部分副本异常。
实战:
// 开启qos消费端限流(一个消费端最多推送多少未确认消息,剩下的状态是ready状态,可以进行多消费端进行接收)
channel.basicQos(2);
// 处理消息手动ack
channel.basicConsume("queue.restaurant",
false,
deliverCallback,
consumerTag -> {
});
while (true) {
Thread.sleep(100000);
}
for (int i = 0; i < 50; i++) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
}
未开启qos限流前(消息全部推送,造成消费端消息挤压,无法一次性接收,并且全处于unacked状态,其他消费端也无法抢占资源)
开启qos限流后(消息全部推送,无法一次性接收,并且全处于ready状态,其他消费端可以抢占资源形成'负载均衡'的效果)
实战
// 设置单条消息的过期时间(时间到期后消息会被清除)
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
// 给整个队列设置过期消息时长,如果该队列里面在设置时间内没有消费完的消息会自动清除
Map<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 150000);
// 声明队列
channel.queueDeclare(
"queue.restaurant",
true,
false,
false,
args);
代码实现
// 声明接收私信消息的死信交换机和死信队列
channel.exchangeDeclare(
"exchange.dlx",
BuiltinExchangeType.TOPIC,
true,
false,
null);
channel.queueDeclare(
"queue.dlx",
true,
false,
false,
null
);
// 绑定死信交换机和队列
channel.queueBind("queue.dlx", "exchange.dlx", "#");
// 声明交换机
channel.exchangeDeclare(
"exchange.order.restaurant",
BuiltinExchangeType.DIRECT,
true,
false,
null);
// 给整个队列设置过期消息时长,如果该队列里面在设置时间内没有消费完的消息会自动清除或者丢到死信队列
Map<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 15000);
// 设置队列最长长度(超过队列长度将消息丢到死信队列)
args.put("x-max-length", 5);
args.put("x-dead-letter-exchange", "exchange.dlx");
// 声明交队列
channel.queueDeclare(
"queue.restaurant",
true,
false,
false,
args);
// 声明绑定关系
channel.queueBind(
"queue.restaurant",
"exchange.order.restaurant",
"key.restaurant");
或者在消费端手动签收消息时,拒收消息时(channel.basicNack),会将消息丢到死信队列