前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ 安装延迟交换机delay-exchange

RabbitMQ 安装延迟交换机delay-exchange

作者头像
王小明_HIT
发布2020-03-11 13:30:09
9370
发布2020-03-11 13:30:09
举报
文章被收录于专栏:程序员奇点

安装RabbitMQ 延迟交换机 在三台节点上安装延迟交换机插件 1.进入RabbitMQ 插件目录

代码语言:javascript
复制
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins

2.下载延迟插件安装包

代码语言:javascript
复制
wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

3.解压延迟插件安装包

代码语言:javascript
复制
unzip rabbitmq_delayed_message_exchange--3.6.x.zip

将RAM 节点修改成DISK 节点 在 RAM 节点上,将节点修改成磁盘节点

代码语言:javascript
复制
rabbitmqctl stop_app

rabbitmqctl change_cluster_node_type disk

rabbitmqctl start_app

启动延迟交换机插件 在三台节点上启动延迟交换机插件

代码语言:javascript
复制
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

测试代码

Producer:

代码语言:javascript
复制
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("172.31.1.135");
        connectionFactory.setUsername("xx");
        connectionFactory.setPassword("xx");
        connectionFactory.setPort();
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "delay-exchange";
        String routingkey = "delay.delay";

        String queueName = "delay_queueName";
        //x-delayed-message 声明
        Map<String,Object> map =new HashMap<>();
        map.put("x-delayed-type", "direct");
        channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, map);
        //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
        channel.queueDeclare(queueName, true, false, false, map);
        channel.queueBind(queueName,exchangeName,routingkey);
        for (int i = ; i < ; i++) {
             // deliveryMode=2 持久化,expiration 消息有效时间
            String msg = "delayed payload".getBytes("UTF-8") +" "+new Date().getTime();
            byte[] messageBodyBytes = msg.getBytes();
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("x-delay", );
            AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
            channel.basicPublish(exchangeName, routingkey, props.build(), messageBodyBytes);
        }
    }
}

Consumer:

代码语言:javascript
复制
import java.io.IOException;
import java.util.Date;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("172.31.1.135");
        connectionFactory.setPort();
        connectionFactory.setUsername("xxx");
        connectionFactory.setPassword("xxx");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String queueName = "delay_queueName";
        channel.queueDeclare(queueName,true,false,false,null);

        channel.basicConsume(queueName, false, "myConsumer Tag", new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                  String routingKey = envelope.getRoutingKey();
                  String convernType = properties.getContentType();
                  long deliveryTag = envelope.getDeliveryTag();
                  System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)+ " "+new Date().getTime());
                  channel.basicAck(deliveryTag, false);
            }

        });
    }
}

执行结果:

代码语言:javascript
复制
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c703b  
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@cbc42f  
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c2f  
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c703b  
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@cbc42f  
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c2f  

欢迎关注:【程序员开发者社区】

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员奇点 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档