专栏首页求道rabbitmq主题订阅

rabbitmq主题订阅

一、topic 模式交换机

上一篇文章讲述了关于直接连接交换机根据key找到对应队列的方式,实现特殊消息特殊队列消费的目的,但是事实上,生产环境下,对于消息的复杂性远不是这样就能够解决的!比如:你要监控有个用户的操作行为,用户的操作行为太多了 增删改查,如果一个一个的写难免会有遗漏,这个时候,我们可以用通配符 user.* 轻松解决!这就是mq的主题模式

这里的交换机类型为 topic 模式的,他更像direct模式,只不过direct是单个匹配,而topic是通配符匹配

  • *:代表一个字符
  • #:代表多个字符

他的用法极其类似于direct 模式,我们不多说了,直接看代码

二、主要代码

消息生产者:消息生产者,在发送消息的时候需要指定消息类型

String msg = "醉卧沙场君莫笑";
//关注第二个参数
channel.basicPublish(EXCHANGE_NAME,"huangfu.del",null,msg.getBytes());

消息消费者消息消费者,在绑定交换机的时候需要指定通配符

//绑定交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"huangfu.#");

三、详细代码

消息生产者

package com.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发布订阅模式
 * 主题模式
 * @author huangfu
 */
public class TopicsSend {
    private static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = MqConnection.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        String msg = "醉卧沙场君莫笑";
        channel.basicPublish(EXCHANGE_NAME,"huangfu.del",null,msg.getBytes());
        System.out.println("send:"+msg);
        channel.close();
        connection.close();
    }
}

消费者1

package com.topics;

import com.rabbitmq.client.*;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 */
public class TopicsRecv {
    private static String QUEUE_NAME = "topics";
    private static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnection.getConnection();
        final Channel channel = connection.createChannel();

        //声明对垒
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"huangfu.add");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

消费者2

package com.topics;

import com.rabbitmq.client.*;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 */
public class TopicsRecv2 {
    private static String QUEUE_NAME = "topics2";
    private static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnection.getConnection();
        final Channel channel = connection.createChannel();

        //声明对垒
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"huangfu.#");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

本文分享自微信公众号 - JAVA程序狗(javacxg),作者:皇甫嗷嗷叫

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • rabbitmq路由

    上一篇文章写到了消息在交换机内部进行广播式发送,每一个与其绑定的队列都会收到一个相同的消息,这就是fanout类型的交换机,那么早碰见类似这样一种场景的情况下:...

    止术
  • rabbitmq发布订阅

    对的,以前我们发送消息是直接由生产者将消息发送到队列,可是这种方式官方是不推荐的!

    止术
  • 如何创建一个与Servlet-api完全解耦和的管理员后台操作日志监控

    在日常开发系统后台时,需要针对管理员操作进行监控,如果使用Spring这一套技术体系,使用AOP切面编程+自定义注解不妨是一个好办法,但是在使用这一套体系的同时...

    止术
  • 消息队列——RabbitMQ的基本使用及高级特性

    Rabbit是基于AMQP协议并使用Erlang开发的开源消息队列中间件,它支持多种语言的客户端,也是目前市面上使用比较广泛的一种消息队列,因此学习并掌握它是非...

    夜勿语
  • 微信小程序开发之前期需求研讨

    接下来要做一个 个人的小程序,名称为:申霖 - 博客,微信小程序账号已经注册了,前期的准备工作也都做好了,下面来说一下产品的设计吧!

    小白程序猿
  • Android断点续传下载器JarvisDownloader的示例

    熟悉漫威电影的人都知道Jarvis,他是钢铁侠的智能管家,帮助钢铁侠制造装甲、分析大量数据、协助建模等各种智能工作,可惜在复联2中,Jarvis与灵魂宝石共同结...

    砸漏
  • 盘一盘 Python 系列 3 - SciPy

    SciPy 是 Python 里处理科学计算 (scientific computing) 的包,使用它遇到问题可访问它的官网 (https://www.sci...

    用户5753894
  • 用Kotlin破解Android版微信小游戏-跳一跳成果跳一跳思路源码使用方法参考来源Android 插件 免PC

    iOSDevLog
  • 大数据那些事(11):复活的LSM-Tree--BigTable的\b系统实现(修)

    修正一些小错误。 BigTable是一个非常复杂的系统,发表的论文面面俱到,但是每个方面都写得并不是很清楚。所幸Google开源了LevelDB这个Key-Va...

    用户1564362
  • PHP实现财务审核通过后返现金额到客户的功能

    有这么一个返现的系统,当前端客户发起提现的时候,后端就要通过审核这笔返现订单,才可以返现到客户的账号里。

    砸漏

扫码关注云+社区

领取腾讯云代金券