前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ六种工作模式

RabbitMQ六种工作模式

作者头像
你的益达
发布2020-09-30 10:20:42
4370
发布2020-09-30 10:20:42
举报
文章被收录于专栏:阿伟的个人博客

1.hello world模式,单生产者单消费者

2.Work queue 生产者传入多个消费者进行处理,每条消息只能被一个消费者拿到。

3.发布订阅模式

将消息以某种规则发给消费者。该模式下多了一个交换器,该交换器会把消息复制多个副本传入多个队列中,c1 c2会获的相同的信息.

4.Routing 路由器模式

相当于有选择的发布订阅模式,会根据消费者的要求将满足条件的消息发送给对应的消费者。但是该模式的匹配是精准匹配,不支持模糊匹配。

5.Topic 主题模式

与路由器模式相比,支持模糊匹配。

6.RPC 远程调用

由于第一种模式只是作为demo,第六种RPC模式一般不用,后续重点介绍其他四种模式。

工作队列模式

多个消息的情况下,工作队列会将消息发送给不同的消费者。并且可以根据自身处理消息的速度来控制接受消息的数量。

其适合在集群环境中做异步处理的认为,能最大地发挥每一台服务器的性能。

多消费者时默认采用轮询的方式为每个消费者分配任务,当各个服务器处理能力不均匀时,容易造成消息堆积,性能低下。

可以使用如下方式:

代码语言:javascript
复制
channel.basicQos(1);

只有该消费者处理完一个消息收到确认后再对其发送。处理完一个取一个。

例如实现12306短信通知服务,当购票成功后,需要给用户发送一条购票成功的短信。由于我们发现对于服务器而言发送短信属于支线任务,其的结果对主线影响不大,因此可以采用工作队列模式,执行主业务的服务器把发短信任务发送给MQ,其他服务器从MQ中取出任务发送短信。

实现代码如下:

代码语言:javascript
复制
public class OrderSystem {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        for(int i = 100; i < 200; i++){
            SMS sms = new SMS("乘客" + i, "15891001" + i, "购票成功");
            String smsJson = new Gson().toJson(sms);
            channel.basicPublish("", RabbitConstant.QUEUE_SMS,null, smsJson.getBytes());
        }
        System.out.println("发送成功");
        channel.close();
        connection.close();
    }
}
/**
 * 从消息队列中取出任务 发短信
 */
public class SMSSender {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 保证Qos
        channel.basicQos(1);
        channel.queueDeclare(RabbitConstant.QUEUE_SMS,false, false, false, null);
        channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String sms = new String(body);
                System.out.println("短信发送成功" + sms);
                // 消息的签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
发布/订阅模式

该模式下生产者不再与队列绑定,而是将数据发送至“交换机”中,交换机无差别的将所有消息送入与之绑定的队列,进而供消费者使用。因此各个消费者拿到的消息完全相同,交换机的类型为“fanout”。

该模式特别适合类似“天气预报”发布的场景。首先由气象局将天气预报送入交换机,然后交换机根据队列绑定情况将天气预报发送到“百度”,“新浪”等门户网站的队列中。

代码语言:javascript
复制
public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"",null,"天气测试".getBytes());
        channel.close();
        connection.close();
    }
}
代码语言:javascript
复制
public class Baidu {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到信息" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
Routing路由模式

发布订阅模式中交换机是无条件将消息发送到与其绑定的队列中。二路由模式中交换机根据Routing Key将消息筛选后发送给消费者队列。路由模式下交换机的模式为direct。

假设百度只关心北京的天气,新浪只关心西安的天气,代码如下:

代码语言:javascript
复制
public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Map<String, String> map = new HashMap<>();
        map.put("西安_20200927","晴");
        map.put("西安_20200928","阴");
        map.put("北京_20200927","小雨");
        map.put("北京_20200928","大雨");
        Channel channel = connection.createChannel();
        for(Map.Entry<String, String> entry : map.entrySet()){
            // 以entry的key作为路由key 起一个数据筛选的作用
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, entry.getKey(),
                    null, entry.toString().getBytes());
        }
        channel.close();
        connection.close();
    }
}
代码语言:javascript
复制
public class Baidu {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key 以 "北京_20200927"作为key进行筛选消息
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,
                RabbitConstant.EXCHANGE_WEATHER_ROUTING, "北京_20200927");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,
                RabbitConstant.EXCHANGE_WEATHER_ROUTING, "北京_20200928");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到信息" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
代码语言:javascript
复制
public class Sina {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key
        channel.queueBind(RabbitConstant.QUEUE_SINA,
                RabbitConstant.EXCHANGE_WEATHER_ROUTING, "西安_20200927");
        channel.queueBind(RabbitConstant.QUEUE_SINA,
                RabbitConstant.EXCHANGE_WEATHER_ROUTING, "西安_20200928");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Sina收到信息" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
Topic主题模式

在路由模式的基础上提供了对路由key的模糊匹配功能。

规则如下:

代码语言:javascript
复制
 *  匹配单个关键字
 #  匹配所有关键字
 关键字书写用.隔开

主题模式下交换机的类型为topic。

对于如下场景,新浪只接受西安的天气,百度只接受9月27日的消息。

代码语言:javascript
复制
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Map<String, String> map = new HashMap<>();
        map.put("中国.西安.20200927","晴");
        map.put("中国.西安.20200928","阴");
        map.put("中国.北京.20200927","小雨");
        map.put("中国.北京.20200928","大雨");
        Channel channel = connection.createChannel();
        for(Map.Entry<String, String> entry : map.entrySet()){
            // 以entry的key作为路由key 起一个数据筛选的作用
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, entry.getKey(),
                    null, entry.toString().getBytes());
        }
        channel.close();
        connection.close();
    }
代码语言:javascript
复制
// sina只接受西安的天气
public class Sina {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key
        channel.queueBind(RabbitConstant.QUEUE_SINA,
                RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.西安.*");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Sina收到信息" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
代码语言:javascript
复制
// 假设百度只接受9月27号的数据
public class Baidu {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,
                RabbitConstant.EXCHANGE_WEATHER_TOPIC, "#.20200927");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到信息" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-09-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 工作队列模式
  • 发布/订阅模式
  • Routing路由模式
  • Topic主题模式
相关产品与服务
短信
腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档