前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rabbitmq主题订阅

rabbitmq主题订阅

作者头像
止术
发布2020-09-15 10:11:40
1650
发布2020-09-15 10:11:40
举报
文章被收录于专栏:求道求道

一、topic 模式交换机

上一篇文章讲述了关于直接连接交换机根据key找到对应队列的方式,实现特殊消息特殊队列消费的目的,但是事实上,生产环境下,对于消息的复杂性远不是这样就能够解决的!比如:你要监控有个用户的操作行为,用户的操作行为太多了 增删改查,如果一个一个的写难免会有遗漏,这个时候,我们可以用通配符 user.* 轻松解决!这就是mq的主题模式

这里的交换机类型为 topic 模式的,他更像direct模式,只不过direct是单个匹配,而topic是通配符匹配

  • *:代表一个字符
  • #:代表多个字符

他的用法极其类似于direct 模式,我们不多说了,直接看代码

二、主要代码

消息生产者:消息生产者,在发送消息的时候需要指定消息类型

String msg = "醉卧沙场君莫笑";
//关注第二个参数
channel.basicPublish(EXCHANGE_NAME,"huangfu.del",null,msg.getBytes());

消息消费者消息消费者,在绑定交换机的时候需要指定通配符

//绑定交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"huangfu.#");

三、详细代码

消息生产者

package com.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发布订阅模式
 * 主题模式
 * @author huangfu
 */
public class TopicsSend {
    private static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = MqConnection.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        String msg = "醉卧沙场君莫笑";
        channel.basicPublish(EXCHANGE_NAME,"huangfu.del",null,msg.getBytes());
        System.out.println("send:"+msg);
        channel.close();
        connection.close();
    }
}

消费者1

package com.topics;

import com.rabbitmq.client.*;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 */
public class TopicsRecv {
    private static String QUEUE_NAME = "topics";
    private static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnection.getConnection();
        final Channel channel = connection.createChannel();

        //声明对垒
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"huangfu.add");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

消费者2

package com.topics;

import com.rabbitmq.client.*;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 */
public class TopicsRecv2 {
    private static String QUEUE_NAME = "topics2";
    private static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnection.getConnection();
        final Channel channel = connection.createChannel();

        //声明对垒
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"huangfu.#");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JAVA程序狗 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 二、主要代码
  • 三、详细代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档