这次真的忽略了一些ActiveMQ内心的娇艳

首先ActiveMQ有俩种消息队列模式:点对点和发布订阅,这俩种都有不可替代的应用场景,前者适用于消息唯一传递的业务,后者适用于分布式环境下进行多面数据同步的操作。

其次一些关于它的官方简介和安装步骤我就不占博客园数据库的内存了,写了也没啥鸟用,用烂的朋友想要提取点儿精华,没接触过的朋友请先安装一个玩玩点对点和发布订阅模式吧(http://www.cnblogs.com/1315925303zxz/p/6377551.html),理解一下这俩种机制的区别和出现消息临界值时的特性,我下面也放一些我前期用于测试的Demo,其中总结了一些他们二者的主要区别,都是实战中必须要考虑的因素可以参考:

假设:存在一个消息生产者、多个消费者,分别在点对点和发布订阅模式下进行消息获取,当出现消息临界值的时候都有什么现象?这些需要朋友你自己体会,我能做的只有送上代码供各位测试了。

消息生产者

 1 public class ProducerDemo2 {
 2     
 3     private static Random r = new Random();
 4 
 5     public static void main(String[] args) {
 6         ConnectionFactory connectionFactory; // 连接工厂
 7         Connection connection = null; // 连接
 8         Session session; // 会话 接受或者发送消息的线程
 9         Destination destination; // 消息的目的地
10         MessageProducer messageProducer;//消息生产者
11         // 实例化连接工厂
12         connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.0.40.73:61616");    //用户名、密码、连接地址
13         try {
14             // 通过连接工厂获取连接
15             connection = connectionFactory.createConnection();
16             // 启动连接
17             connection.start();
18             // 创建session
19             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);        //true:支持事务    false:不支持事务
20             // 创建/选择一个消息队列
21             destination = session.createQueue("BMW");    //点对点
22             //destination = session.createTopic("ToyotaYQ");    //发布订阅
23             //创建消息生产者
24             messageProducer = session.createProducer(destination);
25             //创建一条文本消息 
26             int num = r.nextInt(100);
27             TextMessage message = session.createTextMessage("ActiveMQ 生产者2发送消息"+num);
28             System.out.println("生产者2发送消息:"+num);
29             //通过消息生产者发出消息 
30             messageProducer.send(message);
31             //提交
32             session.commit();
33         } catch (JMSException e) {
34             e.printStackTrace();
35         }
36     }
37 }

消息消费者(可以copy多个进行消息争抢测试)

 1 public class ConsumerDemo2 {
 2 
 3     // private static final String USERNAME =
 4     // ActiveMQConnection.DEFAULT_USER;//默认连接用户名(amdin)
 5     // private static final String PASSWORD =
 6     // ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码(admin)
 7     // private static final String BROKEURL =
 8     // ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址(tcp://localhost:61616)
 9 
10     public static void main(String[] args) {
11         ConnectionFactory connectionFactory; // 连接工厂
12         Connection connection = null; // 连接
13         Session session; // 会话 接受或者发送消息的线程
14         Destination destination; // 消息的目的地
15         MessageConsumer messageConsumer; // 消息的消费者
16         // 实例化连接工厂
17         connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.0.40.73:61616");    //用户名、密码、连接地址
18         try {
19             // 通过连接工厂获取连接
20             connection = connectionFactory.createConnection();
21             // 启动连接
22             connection.start();
23             // 创建session
24             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
25             // 创建/选择一个消息队列
26             destination = session.createQueue("BMW");            //点对点模式
27             //destination = session.createTopic("ToyotaYQ");    //发布订阅模式
28             // 创建消息消费者
29             messageConsumer = session.createConsumer(destination);
30 
31             while (true) {
32                  //设置消费者接收消息的时间(3s)
33                 TextMessage textMessage = (TextMessage)messageConsumer.receive(3000);
34                 if(textMessage == null){
35                     System.out.println("没有消息");
36                 }else{
37                     System.out.println(textMessage.getText());
38                     //System.exit(0);
39                 }
40             }
41         } catch (JMSException e) {
42             e.printStackTrace();
43         }
44     }
45 }

注意事项:

1、生产者发送消息时必须支持事务,就是创建消息会话的时候设置为true,否则会出现“javax.jms.IllegalStateException: Not a transacted session”异常。消费者可以不支持。

2、点对点模式下,同一时刻只能有一个消费者从队列中获取消息内容,如果存在多个消费者,则会出现消息争抢现象直到消息全部抢完,处于阻塞状态,如果再有消息被放进来时,接着会进行争抢,但是只会有一个消费者获取到消息,不会出现多个消费者抢到消息的情况。

3、点对点模式下,生产者发送消息时,消费者可以处于离线状态,当消费者再次运行时可以接收到历史消息;但是在发布订阅模式下,消费者必须处于运行状态获取消息,历史消息也是不会被获取到的。

实战上线后踩过的坑以及解决方案:

1、用户订单入库成功后发送到MQ中的订单消息丢失,出现处理订单遗漏的情况?

解决方案1:打开消息持久开关。

因为Activemq支持两种消息传送模式:

PERSISTENT (持久消息)该模式是activemq默认的传送方式,此模式下可以保证消息只会被成功传送一次和成功使用一次,消息具有可靠性。在消息传递到目标消费者,在消费者没有成功应答前,消息不会丢失。所以很自然的,需要一个地方来持久性存储。如果消息消费者在进行消费过程发生失败,则消息会被再次投递;

 NON_PERSISTENT(非持久消息)该模式适用于消息不重要的,可以接受消息丢失的哪一类消息,这种消息只会被投递一次,消息不会在持久性存储中存储,也不会保证消息丢失后的重新投递。

与spring整合使用ActiveMQ配置文件如下:

  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false-->
        <property name="explicitQosEnabled" value="true" />  
        <!-- 【发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久】--> 
        <property name="deliveryMode" value="2" /> 
    </bean>

解决方案2:设置消息重发机制。

ActiveMQ针对消息丢失情况提供了消息重发机制,假设消息发送失败,为了解决这一尴尬局面,我们可以在实际项目中配置消息重发机制,以防万一。

  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.136.139:61616"/>
        <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />  <!-- 引用重发机制 --> 
    </bean>
    
    <!-- ActiveMQ消息发送失败后的重发机制 -->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">  
        <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->  
        <property name="useExponentialBackOff" value="true"></property>  
        <!--重发次数,默认为6次   这里设置为1次 -->  
        <property name="maximumRedeliveries" value="1"></property>  
        <!--重发时间间隔,默认为1秒 -->  
        <property name="initialRedeliveryDelay" value="1000"></property>  
        <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->  
        <property name="backOffMultiplier" value="2"></property>  
        <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第   
                                   二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->  
        <property name="maximumRedeliveryDelay" value="1000"></property>  
    </bean>

前天任务调度服务上线后,出现消息丢失的情况,根据以上解决方案是否能够根治这种情况,本人也是不太能够保证,希望稍后能在留言区得到各位老兄的帮助。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏青玉伏案

Java中的网络编程

​  Java中的网路编程主要是Java的Socket编程,属于JavaEE中的高级的部分,以下内容是对java网路编程的一个小结,代码都是经过编译调试的   ...

20960
来自专栏青玉伏案

Java基本语法练习

1.编写程序,求100以内的全部素数。 实验源码: public class FirstClass { public static void main(St...

32790
来自专栏青玉伏案

JavaSE高级之GUI编程

 下面主要用到了java中的swing进行界面设计,当然java的GUI不如C#的设计的好看,不过原理还是要会的。 1. GUI Graphical User ...

38650
来自专栏青玉伏案

JavaSE高级之集合类

​下面的内容是对java中的集合类进行的总结,过段时间会分享java的网路编程,多线程等内容,欢迎批评指正。 1.Java中的集合是用来存放对象的,即集合是对象...

25290
来自专栏liulun

kotlin web开发教程【一】从零搭建kotlin与spring boot开发环境

IDEA中文输入法的智能提示框不会跟随光标的问题 我用的开发工具是IDEA image.png 这个版本的IDEA有一个问题; 就是中文输入法的智能提示框不...

42960
来自专栏编程

写好Java代码的30条经验总结

无可厚非你是一名程序员,但你真的是一个优秀的程序员吗?答案可不一定了。想要成为一个优秀的程序员,有着良好的代码编写习惯是必不可少的。下面就让我们来看看Java代...

24350
来自专栏企鹅号快讯

安利这15个编程小游戏,边玩边学!

这年头,不学点编程都不敢出门。学习编程的方法有太多,比如编程类教科书,在线互动课程,线上编程指导等。其中,最有趣的就是编程类游戏。今天,Uni酱就来谈谈:那些可...

2.2K80
来自专栏青玉伏案

Java中的多线程

1、 线程中的主要方法     a) isAlive() 判断线程是否还活着,即线程是否未终止     b) getPriority() 获得线程的优先级   ...

18560
来自专栏计算机视觉与深度学习基础

java在acm中大数运算教程

import java.io.*; import java.util.*; public class Main { public static void ...

24590
来自专栏青玉伏案

JDBC与JAVA数据库编程

一、JDBC的概念 1、 JDBC (Java DataBase Connectivity) Java数据库连接     a) 主要提供java数据库应用程序...

21760

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励