抱歉,你查看的文章不存在

activeMq消息转投rabbitMq研究

在研究activemq转投消息到rabbitmq的过程中还是发现了很多有趣的细节。 消息发送端分为PERSISTENT与NON_PERSISTENT,该类型表示是否持久化消息到数据库中。

  1. Activemq默认使用kahaDB。我大Q9使用的也是kahaDB。当然也是支持mysql等数据库的。

具体配置在${activemq.base}/conf/activemq.xml中。

<persistenceAdapter>
   <kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<!--
   The systemUsage controls the maximum amount of space the broker will
   use before slowing down producers. For more information, see:
   http://activemq.apache.org/producer-flow-control.html
   -->
<systemUsage>
   <systemUsage>
      <memoryUsage>
         <memoryUsage limit="4 gb"/>
      </memoryUsage>
      <storeUsage>
         <storeUsage limit="15 gb"/>
      </storeUsage>
      <tempUsage>
         <tempUsage limit="100 mb"/>
      </tempUsage>
   </systemUsage>
</systemUsage>

和大家理解不一样的地方是NON_PERSISTENT是会使用文件作为存储介质的。主要是为了防止内存挤爆。当发送者发送过快或者接受者处理过慢都会导致使用大量内存。此时将消息临时存储在临时文件中(swap)。

  1. 对于PERSISTENT与NON_PERSISTENT区别在于是否在mq服务器重启后能够正常发送消息。PERSISTENT的消息在服务器重启后依然能够将message发送出去。

如果服务端的topic没有订阅者该消息将被直接丢弃。

  1. 消费者的持久化则有一定区别。当为queue的时候,若客户端不在线等到某个客户端消费了该消息时则会将该消息删除。当为topic时,若客户端未设置subscriptionDurable,则该客户端必须要在线才能收到订阅。当客户端设置subscriptionDurable为true时,则服务器会保存该消息直到被所有的订阅者均消费一次(消费是指服务器收到ack回复)

消息发送端

消息接收端

可靠性及因素

PERSISTENT

queue receiver/durable subscriber

消费一次且仅消费一次。可靠性最好,但是占用服务器资源比较多。

PERSISTENT

non-durable subscriber

最多消费一次。这是由于non-durable subscriber决定的,如果消费端宕机或其他问题导致与JMS服务器断开连接,等下次再联上JMS服务器时的一系列消息,不为之保留。

NON_PERSISTENT

queue receiver/durable subscriber

最多消费一次。这是由于服务器的宕机会造成消息丢失

NON_PERSISTENT

non-durable subscriber

最多消费一次。这是由于服务器的宕机造成消息丢失,也可能是由于non-durable subscriber的性质所决定

服务端可以根据clientId及durableSubscriptionName来辨别指定的订阅者以便将该订阅者尚未消费的消息供消费。

记得在设置subscriptionDurable时候也需要设置durableSubscriptionName如下注释。否则该名称会变成listener的名称。

    /**  * Set whether to make the subscription durable. The durable subscription name  * to be used can be specified through the "durableSubscriptionName" property.  * <p>Default is "false". Set this to "true" to register a durable subscription,  * typically in combination with a "durableSubscriptionName" value (unless  * your message listener class name is good enough as subscription name).  * <p>Only makes sense when listening to a topic (pub-sub domain).  * @see #setDurableSubscriptionName  */
``` plain
如下为一配置示例
```xml
    <bean id="jmsbillChoiceOfAviationContainer"    class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="expressJmsFactory" />    <property name="destination" ref="expressDssScanTopicDestination" />    <property name="messageListener" ref="billChoiceOfAviationTopicListener" />    <property name="durableSubscriptionName" value="billChoiceOfAviation" />    <property name="clientId" value="billChoiceOfAviationClient" />    <property name="subscriptionDurable" value="true" /> </bean>

请注意在配置destination属性是topic。该类还有一个属性为destinationName。配置该属性时会默认为queue。

在设置destination时执行如下,可以看到还有setPubSubDomain(true);才会表示为订阅模式否则为点对点模式

/**  * Set the destination to receive messages from.  * <p>Alternatively, specify a "destinationName", to be dynamically  * resolved via the {@link org.springframework.jms.support.destination.DestinationResolver}.  * <p>Note: The destination may be replaced at runtime, with the listener  * container picking up the new destination immediately (works e.g. with  * DefaultMessageListenerContainer, as long as the cache level is less than  * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!  * @see #setDestinationName(String)  */ public void setDestination(Destination destination) {    Assert.notNull(destination, "'destination' must not be null");    this.destination = destination;    if (destination instanceof Topic && !(destination instanceof Queue)) {       // Clearly a Topic: let's set the "pubSubDomain" flag accordingly.       setPubSubDomain(true);    } }

因此在设置destinationName属性时如果是topic需要增加

<property name="pubSubDomain" value="true"/>
  1. rabbitMQ和activeMq很大的不同在于rabbitMq的消息发送完全基于queue。

在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:

l 消费者是无法订阅或者获取不存在的MessageQueue中信息。

l 消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的

当rabbitmq的生产者发送消息出来后该消息会发送到指定的exchange中。Exchange分为如下几种常用类型:direct, fanout,topic

1) fanout 所有bind到此exchange的queue都可以接收消息

通常此处routingkey为””

不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey");  
  
byte[] messageBodyBytes = "hello world".getBytes();  
//需要绑定路由键  
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

2) direct 通过routingKey和exchange决定的那个唯一的queue可以接收消息

处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。

Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey");  
  
byte[] messageBodyBytes = "hello world".getBytes();  
//需要绑定路由键  
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

3) topic 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息 表达式符号说明:#代表一个或多个字符,*代表任何字符 例:#.a会匹配a.a,aa.a,aaa.a等 *.a会匹配a.a,b.a,c.a等 注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey.*");  
  
byte[] messageBodyBytes = "hello world".getBytes();  
channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

Rabbitmq和activemq区别很大的一点是在于当生产者发送消息给topic时,activemq是将该消息广播至该处所有的订阅者(包括离线持久订阅者),而rabbitmq的消息在发送时不一样是还需要配合routingkey。只有符合表达式的订阅者才会被转发。二订阅者依旧是关注被转发的queue,符合该表达式的消息会被转发至对应的queue中,这样客户端消费者才可以消费到。

因此想要持久化订阅topic在rabbitmq中还需要对应在exchange中增加一个名称唯一的queue来进行转发。

如下如果多个listener需要订阅该topic,则需要每个listener对应一个不同的queue,以便转发。

<rabbit:topic-exchange  id="expressDssScanTopicExchange" name="expressDssScanTopicExchange" durable="true" >       <rabbit:bindings>           <rabbit:binding queue="express.scan" pattern="express.dssScan"/>    <rabbit:binding queue="express.dss" pattern="express.dssScan"/>       </rabbit:bindings>   </rabbit:topic-exchange>

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

后端之路

0 篇文章0 人订阅

相关文章

来自专栏Seebug漏洞平台

TP-LINK WR941N路由器研究

作者:Hcamael@知道创宇404实验室 之前看到了一个CVE, CVE-2017-13772 是TP-Link WR940N后台的RCE, 手头上正好有一个...

3176
来自专栏aoho求索

基于可靠消息方案的分布式事务(四):接入Lottor服务

在上一篇文章中,通过Lottor Sample介绍了快速体验分布式事务Lottor。本文将会介绍如何将微服务中的生产方和消费方服务接入Lottor。

2721
来自专栏Elson's web

webpack4实用配置指南-上手篇

算起来已经有3到4个项目的webpack构建打包经历。然而每次搭建起来还是有新手既视感,比较捉急。工具用法的东西,好记性不如烂笔头,有必要系统梳理下webpac...

1.9K14
来自专栏程序员宝库

vue-cli 脚手架中 webpack 配置基础文件详解

vue-cli是构建vue单页应用的脚手架,输入一串指定的命令行从而自动生成vue.js+wepack的项目模板。这其中webpack发挥了很大的作用,它使得我...

2663
来自专栏王亚昌的专栏

【Zookeeper】Leader选举机制示例(异步API)

    上一篇文章中介绍了如何用同步API实现Leader选举机制,本文也借用本一个场景,简单介绍异步API的使用。管理异步API的使用,可以方便大家在一些单进...

891
来自专栏黄Java的地盘

提高代码质量——使用Jest和Sinon给已有的代码添加单元测试

在日常的功能开发中,我们的代码测试都依赖于自己或者QA进行测试。这些操作不仅费时费力,而且还依赖开发者自身的驱动。在开发一些第三方依赖的库时,我们也没有办法给第...

1950
来自专栏刘望舒

Android系统启动流程(二)解析Zygote进程

前言 上一篇文章我们分析了init进程,init进程中主要做了三件事,其中一件就是创建了Zygote进程,那么Zygote进程是什么,它做了哪些事呢?这篇文章会...

2328
来自专栏程序手艺人

nghttp2 - HTTP/2 C Library 简明教程(一)

43812
来自专栏Java进阶之路

消息队列_RabbitMQ

2010
来自专栏小怪聊职场

爬虫架构|利用Kafka处理数据推送问题(2)

56812

扫码关注云+社区

领取腾讯云代金券