专栏首页求道rabbitmq路由

rabbitmq路由

一、RabbitMq Routing 介绍

上一篇文章写到了消息在交换机内部进行广播式发送,每一个与其绑定的队列都会收到一个相同的消息,这就是fanout类型的交换机,那么早碰见类似这样一种场景的情况下:现在与交换机绑定的有三条队列,每一个队列都代表一个日志级别 error,debug,info 当只有error级别的错误被生产出来之后才会通知到 error 队列中,其他队列不推送!

有这样一个需求之后,我们就应该想到交换机的另外一个类型:direct 他的路由算法很简单,在消息被生产出来之后,会赋予一个路由秘钥,消息队列也会被赋予一个绑定秘钥消息进入其绑定秘钥和消息的路由秘钥完全匹配的队列!

上图就是路由的概览图,队列的绑定key是可以重复的 也就是说如上图 三个队列的绑定key都可以为error,如果三个队列的绑定key,那么他就是一个另类的fanout交换机类型了!

二、RabbitMq Routing代码实现

消息生产者消息生产者,在创建交换机的时候需要指定交换机类型为direct

channel.exchangeDeclare(EXCHANGE_NAME,"direct");

消息生产者:**然后在发送消息的时候需要指定 routingKey 表明这个消息的类型 **

String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());

消息消费者:消息消费者,在绑定交换机的时候需要指定 binding key

//绑定交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

完整代码实现

消息生产者

package com.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 路由  发布订阅模式
 * @author huangfu
 *
 * 路由转发
 * 根据  routingKey  进行转发
 *
 * 缺陷:路由表必须明确
 */
public class RoutingSend {
    private static String EXCHANGE_NAME = "routing";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        String msg = "醉卧沙场君莫笑";
        String routingKey = "error";
        channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
        System.out.println("send:"+msg);
        channel.close();
        connection.close();

    }
}

消费者1

package com.routing;

import com.rabbitmq.client.*;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * @author huangfu
 */
public class RoutingRecv {
    private static String EXCHANGE_NAME = "routing";
    private static String QUEUE_NAME = "routing";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnection.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列  不持久化
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //每次只发送一条
        channel.basicQos(1);
        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"UTF-8"));
                System.out.println("[1] done");
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

消费者2

package com.routing;

import com.rabbitmq.client.*;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * @author huangfu
 */
public class RoutingRecv2 {
    private static String EXCHANGE_NAME = "routing";
    private static String QUEUE_NAME = "routing2";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnection.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列  不持久化
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //每次只发送一条
        channel.basicQos(1);
        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"debug");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"UTF-8"));
                System.out.println("[2] done");
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

本文分享自微信公众号 - JAVA程序狗(javacxg),作者:皇甫嗷嗷叫

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • rabbitmq主题订阅

    上一篇文章讲述了关于直接连接交换机根据key找到对应队列的方式,实现特殊消息特殊队列消费的目的,但是事实上,生产环境下,对于消息的复杂性远不是这样就能够解决的!...

    止术
  • 如何创建一个与Servlet-api完全解耦和的管理员后台操作日志监控

    在日常开发系统后台时,需要针对管理员操作进行监控,如果使用Spring这一套技术体系,使用AOP切面编程+自定义注解不妨是一个好办法,但是在使用这一套体系的同时...

    止术
  • rabbitmq发布订阅

    对的,以前我们发送消息是直接由生产者将消息发送到队列,可是这种方式官方是不推荐的!

    止术
  • Oracle 学习笔记

    前言 本贴内容纪录Oracle课程中的学习笔记,和Oracle的课后作业,以及数据库相关课程的学习笔记,笔记部分使用实例代码记录,不记详细语法。 用户管理...

    李郑
  • 纯JavaScript实现的MQTT智能门锁

    JavaScript实现的MQTT Demo,可通过Hbuilder IDE进行App打包,也可直接部署到Web服务器上。

    小锋学长
  • 家用可视门铃居然会泄露WiFi密码?

    近日Pen Test Partners公司发现门铃竟然可以泄露WiFi密码。 容易遭到恶意攻击者利用 通常人们安装这款可视的门铃,该门铃基于“物联网”环境下,可...

    FB客服
  • 腾讯医疗的 AI 联合实验室,互联网医院的未来?

    知晓君
  • (七十二)c#Winform自定义控件-雷达图

    GitHub:https://github.com/kwwwvagaa/NetWinformControl

    冰封一夏
  • Python递归函数特点及原理解析

    砸漏
  • 教你用clusterProfiler实现其它来源富集结果的可视化

    在对功能富集分析的结果进行可视化的时候,大家肯定都听过Y叔的R包clusterProfiler,这个包可以说是富集分析结果可视化的神器,不仅画出来的图好看,而且...

    阿凡亮

扫码关注云+社区

领取腾讯云代金券