在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
// 官方允许是 0-255 之间 此处设置10 允许优化级范围0-10 不要设置过大 浪费CPU与内存
arguments.put("x-max-priority", 10);
// 设置优先级, 不得高于 x-max-priority 设置的值
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
.builder().priority(5).build();
/**
* 优先级生产者
*/
public static void priorityProducer() throws Exception {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP 连接RabbitMq的队列
factory.setHost("localhost");
// 用户名
factory.setUsername("zjh");
// 密码
factory.setPassword("zjh");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
/*
生成一个队列
1.队列名称
2.队列里面的消息是否持久化,默认情况消息存储在内存中
3.该队列是否只提供一个消费者进行消费 是否进行消息共享,true可以多个消费者消费
4.是否自动删除 最后一个消费者断开连接以后 该队列是否自动删除 true自动删除 false不自动删除
5.其他参数
*/
Map<String, Object> arguments = new HashMap<>(8);
// 官方允许是 0-255 之间 此处设置10 允许优化级范围0-10 不要设置过大 浪费CPU与内存
arguments.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
// 设置循环次数
long length = 11;
for (int i = 1; i < length; i++) {
String message = "info" + i;
// 设置等于五的优先级高
if (i == 5){
// 设置优先级, 不得高于 x-max-priority 设置的值
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
.builder().priority(5).build();
// 发送消息
channel.basicPublish("", QUEUE_NAME, basicProperties, message.getBytes());
} else {
// 发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
System.out.println("消息发送完毕");
}
/**
* @author zjh
*/
public class Consumer {
/**
* 队列名称
*/
public static final String QUEUE_NAME = "hello";
/**
* 接收消息
*/
public static void main(String[] args) throws Exception {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP 连接RabbitMq的队列
factory.setHost("localhost");
// 用户名
factory.setUsername("zjh");
// 密码
factory.setPassword("zjh");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
// 声明接收消息回调
DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody()));
// 声明取消接收消息回调
CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费被中断");
/*
消费者消费信息
1.消费哪个队列
2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
3.消费者未成功消费的回调
4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}