前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java与RabbitMQ(五)Rabbirmq JAVA编程之Publish/Subscribe(发布/订阅)

Java与RabbitMQ(五)Rabbirmq JAVA编程之Publish/Subscribe(发布/订阅)

作者头像
青山师
发布2023-05-04 20:10:14
2750
发布2023-05-04 20:10:14
举报

本系列教程主要针对使用Java语言进行Rabbitmq的相关编程。阅读前请确认已经安装过rabbit服务。关于如何安装rabbitmq,请参考如何使用rabbitmq.

发布/订阅(Publish/Subscribe)

(using the Java Client) 在上一节中我们创建了一个工作队列。工作队列的背后的思想是:每个任务都准确的分发给每一个消费者处理。在本节中,我们将会做和第二节完全不同的事 —— 我们会将一个消息传送给多个消费者。这种模式就是我们常说的发布/订阅模式。

为了举例说明这种模式,我们将会建立一个简单的日志记录系统。也是由两个程序组成 —— 第一个程序发送消息、第二个程序接收消息并打印。

在这个日志记录系统中,每个运行中的消费者都会接收到消息。这样的话我们可以运行一个消费者接收消息并将日志记录到磁盘中;同时让另外一个运行中的消费者接收消息并打印输出

本质上来看,发布出去的日志消息将会广播给每一个接收者。

交换器(Exchanges)

在前面的教程部分中,我们发送、接收消息都是来自一个队列中的。现在是时候介绍一下RabbitMQ中完整的消息模型了。

我们迅速温习一下前面我们学到的东西:

  • 一个生产者就是一个发送消息的用户程序。
  • 一个队列就是一个存储消息的缓冲区。
  • 一个消费者就是一个接收消息的用户程序。

RabbitMQ消息模型的核心思想就是生产者不会直接发送消息到队列中。实际上,生产者甚至不知道消息是否发送到一个队列中去了。

相反,生产者只能将消息发送给一个交换器(exchange)。交换是一个简单的事情。在一边交换器从生产者接收消息,另外一边交换器将消息推送到队列中去。 交换器必须知道如何处理它所接收到的消息:是添加消息到一个特定的队列中去?还是添加消息到许多的队列中去?或者还是丢弃消息。交换器的类型定义了这些规则。

这里写图片描述
这里写图片描述

RabbitMQ给我们提供了几种交换器的类型:direct,topic, headers 和fanout。我们先来看看最后一种类型 —— fanout(广播)。我们可以创建一个fanout类型的交换器,暂且叫它logs吧:

代码语言:javascript
复制
//创建一个交换器,并指定名称为logs,类型为广播型(fanout)
channel.exchangeDeclare("logs", "fanout");

列出交换器 你可以通过 rabbitmqctl list_exchanges 命令查看交换器列表: $ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done. 在这个结果集列表中有一些 amq.*.exchanges 和默认未命名的交换器。这些都是RabbitMQ默认创建的,但是你未必能用到它们。 无名的交换器 在前面部分教程中,我们根本不知道交换器,但是还是可以发送消息到队列中。最大的可能就是我们使用了默认的交换器,我们使用的是空串(”“)。 回想之前我们发送消息: channel.basicPublish("","hello",null,message.getBytes()); 发送消息时第一个参数就是指定交换器的名称。空串表示使用一个默认的交换器或者使用一个无名交换器:如果存在routingKey(第二个参数),消息会路由到routingKey指定的队列中去。

现在,我们发送消息时可以指定到我们定义的交换器中去了:

代码语言:javascript
复制
channel.basicPublish("logs","",null,message.getBytes());

临时队列(Temporary queues)

你应该还记得在前面的学习内容中我们使用的都是指定了名称的队列(还记得hellotask_queuem吗?)。使用命名的队列对我们来说是很关键的 —— 我们需要告知worker去同一个队列中获取消息。当你想要在生产者和消费者之间共享消息的时候,为队列指定一个名称是极其重要的

但这并不是我们当前的日志系统的场景。我们想接收到所有的日志消息,而不仅仅是它们中的一个子集。我们感兴趣的是当前流动中的而不是旧的消息。为了解决这个问题,我们需要做两件事。

首先,任何时候我们连接RabbitMQ时均指定一个新的、空的队列。为了解决这件事我们创建一个随机名称的队列,或者其他更好的方式 —— 可以让服务器帮我们选择一个随机名称给我们的队列。

然后,一旦我们断开消费者的连接,队列需要自动删除。

在Java客户端,我们可以使用无参的 queueDeclare() 创建一个非持久化的、 专用的、 自动删除的拥有自动生成的名称队列:

代码语言:javascript
复制
String queueName = channel.queueDeclare().getQueue();

在上面的代码中 queueName 包含了一个随机的队列名称。它有点类似 amq.gen-JzTY20BRgKO-HjmUJj0wLg 这样的形式。

绑定(Bindings)

这里写图片描述
这里写图片描述

我们已经创建了一个广播式的交换器和队列。现在我们需要告诉交换器发送消息到我们的队列中去。 交换器与队列之间的关系我们就称之为一个绑定(binding).

代码语言:javascript
复制
channel.queueBind(queueName, "logs", "");

从现在开始叫logs的交换器将会将接收到的消息传递给我们的队列。

查看绑定列表 你可以使用 rabbitmqctl list_bindings 查看绑定列表。

汇总

这里写图片描述
这里写图片描述

在生产者程序中,发送日志消息,和前面的并没有太多不同。最主要的改变就是我们发送消息到logs交换器而不是之前未命名的交换器中。发送消息的时候,我们需要提供一个routingKey ,但如果是广播式(fanout )的交换器的话,这个routingKey 的值会被忽略掉。这里是 EmitLog.java 程序:

代码语言:javascript
复制
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

( 猛戳,EmitLog.java 源码在这里 ^_*)

正如你所看到的,连接创建之后,我们声明了交换器。这一个步骤是很有必要的因为发送消息给一个不存在的交换器是禁止的。

如果没有队列与交换器绑定,交换其中的消息会被丢弃掉。如果没有消费者接收消息,我们可以安全的把消息丢弃掉。

这是 ReceiveLogs.java 的代码:

代码语言:javascript
复制
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

( 猛戳,ReceiveLogs.java 源码在这里 ^_*)

在eclipse/idea中运行两个ReceiveLogs进程,然后运行EmitLog的main方法,可以看到第一个ReceiveLogs输出:

代码语言:javascript
复制
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

第二个ReceiveLogs输出也是:

代码语言:javascript
复制
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

这就是发布/订阅模式,在本节中我们让所有消费者都获得了广播而来的消息。

原文链接:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-10-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 发布/订阅(Publish/Subscribe)
  • 交换器(Exchanges)
  • 临时队列(Temporary queues)
  • 绑定(Bindings)
  • 汇总
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档