安装RabbitMQ 延迟交换机 在三台节点上安装延迟交换机插件 1.进入RabbitMQ 插件目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins
2.下载延迟插件安装包
wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
3.解压延迟插件安装包
unzip rabbitmq_delayed_message_exchange--3.6.x.zip
将RAM 节点修改成DISK 节点 在 RAM 节点上,将节点修改成磁盘节点
rabbitmqctl stop_app
rabbitmqctl change_cluster_node_type disk
rabbitmqctl start_app
启动延迟交换机插件 在三台节点上启动延迟交换机插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
测试代码:
Producer:
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("172.31.1.135");
connectionFactory.setUsername("xx");
connectionFactory.setPassword("xx");
connectionFactory.setPort();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "delay-exchange";
String routingkey = "delay.delay";
String queueName = "delay_queueName";
//x-delayed-message 声明
Map<String,Object> map =new HashMap<>();
map.put("x-delayed-type", "direct");
channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, map);
//注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
channel.queueDeclare(queueName, true, false, false, map);
channel.queueBind(queueName,exchangeName,routingkey);
for (int i = ; i < ; i++) {
// deliveryMode=2 持久化,expiration 消息有效时间
String msg = "delayed payload".getBytes("UTF-8") +" "+new Date().getTime();
byte[] messageBodyBytes = msg.getBytes();
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", );
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish(exchangeName, routingkey, props.build(), messageBodyBytes);
}
}
}
Consumer:
import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("172.31.1.135");
connectionFactory.setPort();
connectionFactory.setUsername("xxx");
connectionFactory.setPassword("xxx");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = "delay_queueName";
channel.queueDeclare(queueName,true,false,false,null);
channel.basicConsume(queueName, false, "myConsumer Tag", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String convernType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)+ " "+new Date().getTime());
channel.basicAck(deliveryTag, false);
}
});
}
}
执行结果:
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c703b
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@cbc42f
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c2f
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c703b
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@cbc42f
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c2f
欢迎关注:【程序员开发者社区】