前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rabbit MQ Topics 通配符模式 交换机 Topic

Rabbit MQ Topics 通配符模式 交换机 Topic

作者头像
收心
发布2022-01-14 10:07:17
2650
发布2022-01-14 10:07:17
举报
文章被收录于专栏:Java实战博客

依旧如此 看下 官网介绍:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

官方说的很好理解,我说下我的看法。

  • *(星号)可以代替一个单词。
  • #(井号)可以替代零个或多个单词。

通配符模式 实际是对路由的拓展。 使用 * 和 # 来代替指定的路由key,大大提高了 路由的灵活性 下面要实现 在交换机类型为 Topic 中 有2个队列 队列1 的路由key是 *.orange 队列2 的路由key是 black.# 测试 往 路由key 为a.orange、black.a.b 分别发送一条消息

生产者代码

代码语言:javascript
复制
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Topicsprovider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建 连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);

        //2 连接 连接
        Connection connection = connectionFactory.newConnection();

        //3 通过连接 创建信道
        Channel channel = connection.createChannel();
        // 设置 队列名称
        String quotoName1 = "test_topic_queue1";
        String quotoName2 = "test_topic_queue2";
        // 让信道与队列进行声明(绑定)
        channel.queueDeclare(quotoName1,true,false,false,null);
        channel.queueDeclare(quotoName2,true,false,false,null);
        //4 设置交换机名称
        String exchangeName = "test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);

        //5 队列1的绑定 有 orange black green
        channel.queueBind(quotoName1,exchangeName,"*.orange");
        channel.queueBind(quotoName2,exchangeName,"black.#");
        String orange = "Orange:消息 XXX";
        String black = "Black: 消息 XXX";

        //6 发送数据 一共发送3条数据
        channel.basicPublish(exchangeName,"a.orange",null,orange.getBytes());
        channel.basicPublish(exchangeName,"black.a.b",null,black.getBytes());

        //7 释放资源
        channel.close();
        connection.close();

    }
}

消费者代码

消费者1

代码语言:javascript
复制
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Topicsconsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置相关参数
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");

        //2 建立连接
        Connection connection = connectionFactory.newConnection();

        //3 创建信道
        Channel channel = connection.createChannel();

        // 声明队列名称
        String queue1 = "test_topic_queue1";


        //6 创建接收回调
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };

        //接收消息
        channel.basicConsume(queue1,true,consumer);

    }
}

消费者2

代码语言:javascript
复制
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Topicsconsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置相关参数
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");

        //2 建立连接
        Connection connection = connectionFactory.newConnection();

        //3 创建信道
        Channel channel = connection.createChannel();

        // 声明队列名称
        String queue1 = "test_topic_queue2";


        //6 创建接收回调
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };

        //接收消息
        channel.basicConsume(queue1,true,consumer);

    }
}

特殊说明: 解决问题的光鲜,藏着磕Bug的痛苦。 万物皆入轮回,谁也躲不掉! 以上文章,均是我实际操作,写出来的笔记资料,不会出现全文盗用别人文章!烦请各位,请勿直接盗用!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档