前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ 系列(2) —— 用 java 连接 RabbitMQ

RabbitMQ 系列(2) —— 用 java 连接 RabbitMQ

作者头像
求和小熊猫
发布2020-12-16 15:55:53
发布2020-12-16 15:55:53
1.1K0
举报
文章被收录于专栏:∑小熊猫的博客

RabbitMQ 的相关概念

RabbitMQ 作为一个消息中间件,整体上采用了生产者与消费者模型,主要负责接收,存储和转发消息。

生产者和消费者

RabbitMQ 从宏观上可以视为

其中:

  • Producer: 生产者,负责创建消息,并将消息发布到 RabbitMQ 中
  • Broker: 消息中间件服务节点
  • Consumer: 消费者负责订阅队列 并从队列上接收消息。

其详细的工作流程可视为:

RabbitMQ 的架构模型

RabbitMQ 的整体架构可以入下图所示

队列

队列是 Rabbit MQ 的内部对象,用于存储消息。多个消费者可以订阅同一队列

交换器

交换器主要负责将生成者消息投递到队列中。

在 RabbitMQ 中,要想使用 交换器将消息头送到正确的队列上,就需要使用 BindingKey 和 RoutingKey。 BindingKey 就是 交换器和队列之间的固定通路,而 RoutingKey 就是消息选择那些通路进行投送的规则。

交换器的类型

  • fanout: 将消息发送到所有与该交换器绑定的队列上
  • deirect: 指定某一条BindingKey,将消息直接发送到队列上
  • topic: 根据自设定的路由规则将消息投送到队列中
  • headers: 不依赖路由键投递消息而是根据消息的内容进行消息投送。

使用 java 连接 RabbitMQ 的简答案例

前期准备

默认情况下 Rabbit MQ 默认的用户名和密码为 “guest”,但是该账户只能通过本地访问,因此需要创建 一个远程访问的用户,并设置权限

代码语言:javascript
复制
# 为 RabbitMQ 创建一个新的用户
# 用户名为 root 密码为 root123
rabbitmqctl add_user root root123
# 为 root 用户设置所有权限
rabbitmqctl set_permission -p/ root ".*" ".*" ".*"
# 设置 root 用户为管理员
rabbitmqctl set_user_tags root administrator

生产者与消费的Demo

Step1: 通过 maven 引入相关包

首先需要引入 rabbitmq-client 和 rabbitmq 客户端所依赖的 slf4j 包

代码语言:javascript
复制
<dependencies>
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.26</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.26</version>
        <scope>test</scope>
    </dependency>
</dependencies>

生产者相关代码

代码语言:javascript
复制
public class RabbitProducer {
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUTING_KEY = "routingkey_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.0.0"; // 服务器所在id即可
    private static final int PORT = 5672;
    public static void main(String[] args) throws IOException, TimeoutException,InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("root");
        factory.setPassword("root123");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 创建一个持久化,非排他的、非自动删除的队列
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        System.out.println(channel.isOpen());
        // 将交换器与队列通过路由键绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
        channel.close();
        connection.close();
    }
}

Step3: 消费者相关代码

代码语言:javascript
复制
public class RabbitConsumer {
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.0.0";
    private static final int PORT = 5672;
    public static void main(String[] args) throws IOException, TimeoutException,InterruptedException {
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS,PORT)
        };
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("root");
        factory.setPassword("root123");
        Connection connection = factory.newConnection(addresses);
        final Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv: message: " + new String(body));
                try{
                    TimeUnit.SECONDS.sleep(1);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/12/12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabbitMQ 的相关概念
    • 生产者和消费者
      • RabbitMQ 的架构模型
      • 使用 java 连接 RabbitMQ 的简答案例
        • 前期准备
          • 生产者与消费的Demo
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档