前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ入门-Topic模式

RabbitMQ入门-Topic模式

作者头像
JackieZheng
发布2018-01-16 11:11:32
9040
发布2018-01-16 11:11:32
举报
文章被收录于专栏:JackieZhengJackieZheng

上篇《RabbitMQ入门-Routing直连模式》我们介绍了可以定向发送消息,并可以根据自定义规则派发消息。看起来,这个Routing模式已经算灵活的了,但是,这还不够,我们还有更加多样灵活的Topic模式。

Topic模式

image.png
image.png
  • 模型组成相较前几种没有什么变化,一个生产者P,一个交换机X,多个消息队列Q以及多个消费者C
  • 在Exchange派发消息到消息队列Queue所用的规则不同,我们看到了有符号"*"以及"#",可以认为是通配符
  • "*"用于匹配一个单词,比如"a","abc"等;"#"用于匹配0个或者多个单词,比如"", "abc", "abc.def"等

发送端

代码语言:javascript
复制
/**
 * Created by jackie on 17/8/7.
 */
public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.3.161");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }

    private static String getSeverity(String[] strings){
        if (strings.length < 1)
            return "info";
        return strings[0];
    }

    private static String getMessage(String[] strings){
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0 ) return "";
        if (length < startIndex ) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}
  • channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);这里指定的Exchagne模式为Topic模式
  • 通过String routingKey = getRouting(argv);实现在Program arguments中填写routing key参数
  • 通过String message = getMessage(argv);实现在Program arguments中填写发送的消息

这时候我们给Program argument赋值如下,并启动发送端程序

image.png
image.png

程序运行完,可以在RabbitMQ管理应用中看到名为“topic_logs”的Exchange。

接收端

代码语言:javascript
复制
/**
 * Created by jackie on 17/8/7.
 */
public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.3.161");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
  • 和Routing模式异曲同工,声明与发送端一样的Exchange名称
  • 通过Program arguments得到的routing key的输入参数,并将其与Exchange绑定,这时候就可以使用灵活的通配符了

运行情况

我们将启动两个消费者,并分别制定两套Routing key的规则。 第一个消费者

image.png
image.png

第二个消费者

image.png
image.png

启动两个消费者后,使用发送端发送一条消息,我们可以发现两个消费者都通过Routing key规则派发到了消息

31ff000395b3dedc07a2
31ff000395b3dedc07a2

注意:实际上如果Routing key写成了“#”表示能够接受所有的消息,类似广播模式。 这就是Topic模式,到此为止,几大主要RabbitMQ模式已经讲完了。你是否对于RabbitMQ有了一个基本的了解了?

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-08-08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Topic模式
  • 发送端
  • 运行情况
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档