RabbitMQ入门-Routing直连模式

Hello World模式,告诉我们如何一对一发送和接收消息;

Work模式,告诉我们如何多管齐下高效的消费消息; Publish/Subscribe模式,告诉我们如何广播消息 那么有没有灵活强一点的既可以高效消费,又可以同时送达多个消费者的模式? 有,这就是Routing模式,我又称之为Direct直连模式。

Routing模式

  • 一个生产者P,一个交换机X,多个消息队列Q以及多个消费者C
  • 在Exchange和Queue中,我们看到了不同的规则,也就是Routing Key

显然从图中的说明,我们就知道这是一个log日志根据级别派发消息的例子。熟悉Log日志系统的应该都知道,一般的log系统分为error、info、warn和debug等。从图中我们可以看出,将日志级别为error的定向的派发到第一个消息队列,将error、warn和info级别的日志派发到第一个消息队列。

该模型首先实现了定向派发,而不再是订阅模式那种广播式的派发。同一条消息既可以派发给一个Queue,也可以同时派发给两个或者多个Queue,这就是该模式的灵活之处。下面来看看实例

发送端

/**
 * Created by jackie on 17/8/7.
 */
public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.3.161");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }

    private static String getSeverity(String[] strings){
        if (strings.length < 1)
            return "info";
        return strings[0];
    }

    private static String getMessage(String[] strings){
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0 ) return "";
        if (length < startIndex ) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}
  • String severity = getSeverity(argv);通过程序参数赋值给Routing Key,作为发送消息的规则
  • String message = getMessage(argv);通过程序参数赋值作为消息实体发送到Queue

在run configurations中配置argv

*第一个参数是要绑定key的名称,第二个参数是要发送的消息内容

  • 运行后,可以在RabbitMQ管理应用中看到exchange,但是此时没有绑定queue,所以即使发送消息也没有queue会存储或者消费。

接收端

/**
 * Created by jackie on 17/8/7.
 */
public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.3.161");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
  • channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);表示使用的exchange类型为Direct类型
  • 绑定的queue的名称也是通过program arguments指定的

这里两个参数info和error表示绑定了两个routing key,即如果发送routing key为info的消息该队列能接收到,如果发送routing key为error,该队列也能收到

运行情况

启动接收端代码,我们可以看到生成了Queue名称为amq.gen-ugjKo6t4y0PXPwoh3CeubA的队列,同时有routingKey=info和routingKey=error的绑定到了Exchange上。

这时候起送发送端给routingkey为info发送消息“hello world”,我们可以看到在接收端确实能够收到消息“hello world”,同理,这时候发送routingkey为error的消息,该队列同样能够接收到,因为队列同时绑定了两个routing key

这个就是Routing直连模式。

如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏开发与安全

muduo网络库学习之Exception类、Thread 类封装中的知识点(重点讲pthread_atfork())

一、Exception类封装 class Exception : public std::exception ?  #include <execinfo.h>...

2380
来自专栏Jed的技术阶梯

Kafka 消费者旧版低级 API

Kafka 消费者总共有 3 种 API,新版 API、旧版高级 API、旧版低级 API,新版 API 是在 kafka 0.9 版本后增加的,推荐使用新版 ...

2053
来自专栏JavaEdge

高性能队列——Disruptor总论1 背景2 Java内置队列3 ArrayBlockingQueue的问题4 Disruptor的设计方案代码样例性能等待策略Log4j 2应用场景

这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列 Disruptor特性限于3.3.4

2073
来自专栏从零开始学自动化测试

python笔记9-多线程Threading之阻塞(join)和守护线程(setDaemon)

前言 今天小编YOYO请xiaoming和xiaowang吃火锅,吃完火锅的时候会有以下三种场景: - 场景一:小编(主)先吃完了,xiaoming(客)和xi...

3746
来自专栏蜉蝣禅修之道

EJBCA使用之注册用户及创建证书

2794
来自专栏SDNLAB

ODL应用开发之MD-SAL中级教程

1. 简介 本次我们从开始设计到最终完成一个应用的开发,主要设计datastore和RPC定义和实现。Opendaylight 开发使用了OSGi框架,OSGi...

6928
来自专栏腾讯云API

腾讯云API:用Python使用腾讯云API(机器翻译实例)

腾讯云API地址:https://cloud.tencent.com/document/api

1.6K2
来自专栏Java编程技术

使用数据库悲观锁实现不可重入的分布式锁

在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源...

751
来自专栏向前进

【笔记】HybridApp中使用Promise化的JS-Bridge

背景: HybridApp,前端采用JS-bridge的方式调用Native的接口,如获取设备信息、拍照、人脸识别等 前端封装了调用库,每次调用Native接口...

3464
来自专栏java 成神之路

RocketMQ 底层通信机制 源码分析

RocketMQ 底层通讯是使用Netty来实现的。 下面我们通过源码分析下RocketMQ是怎么利用Netty进行通讯的。

1522

扫码关注云+社区

领取腾讯云代金券