前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >再次研究消息队列记的笔记——activemq

再次研究消息队列记的笔记——activemq

作者头像
全栈程序员站长
发布2022-06-30 11:00:51
3460
发布2022-06-30 11:00:51
举报
文章被收录于专栏:全栈程序员必看

文章目录

分布式事务–消息队列

1.思考

sso服务

用户服务

日志服务

购物服务(购物车合并)

短信服务

订单服务

库存服务

物流服务

如何让这么多的服务并行执行?【涉及到分布式事务:为了保证数据的一致性】

2.分布式事务

分布式事务:在分布式环境下,如何保证数据一致性

分布式事务会涉及到性能太低的一个通病。

方案:

  • xa协议下的两段式提交
  • xa两段式提交的进阶版:tcc
  • 基于消息,采用最终一致性策略的分布式事务

LNC 分布式框架. 分布式事务理论基础:CPA理论、BASE理论

3.XA协议

XA协议:数据库与事务管理器的一个标准。

在xa协议下,提交一个事务需要经过两个阶段

阶段一:预备提交

阶段二:提交

再次研究消息队列记的笔记------activemq
再次研究消息队列记的笔记------activemq

4.TCC

需要在业务层实现,try,confirm,和cancle的接口。

再次研究消息队列记的笔记------activemq
再次研究消息队列记的笔记------activemq

5.消息队列

在一个事务正在进行的同时,发出消息给其他的业务,如果消息发送失败,或者消息的执行失败,则回滚消息,重复执行,反复执行失败后,记录失败信息,后期补充性的处理;在消息系统中开启事务,消息的事务是指,保证消息被正常消费,否则回滚的一种机制

补偿机制:日志记录,定时器在某个时间再执行(重试执行)

重复执行,需要考虑幂等性处理逻辑。

6.疑问

  • 如何确保消息发送成功? 消息应答模式?
  • 消息发送失败如何处理?
  • 消息事务?
  • 消息幂等性如何处理?
  • 消息阻塞?死信队列。

消息队列

1.消息产品

RabbitMQ 、 Kafka、ActiveMQ

  • RabbitMQ的协议是AMQP(Advanced Message Queueing Protoco);AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。RabbitMQ用Erlang开发
  • ActiveMQ使用的是JMS(Java Messaging Service )协议,JMS是针对Java体系的传输协议,队列两端必须有JVM,所以如果开发环境都是java的话推荐使用ActiveMQ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。ActiveMQ也支持AMQP协议。
  • Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好;Kafka在传输过程中可能会出现消息重复的情况,不保证发送顺序,没有消息事务功能;一般使用kafka处理大数据日志。
再次研究消息队列记的笔记------activemq
再次研究消息队列记的笔记------activemq

2.ActiveMQ

2.1 整合activemq

只需要加入整合依赖

代码语言:javascript
复制
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.15.2</version>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

2.2 队列消息

Queue 队列模式

Topic 发布订阅模式

Consumer 使用监听器监听MQ上是否有消息。

2.2.1 生产者
代码语言:javascript
复制
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.*;

public class Product {
   
    public static void main(String[] args) {
   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED); //Session.SESSION.TRASACTED 开启消息事务
            Queue testqueue = session.createQueue("TEST1");

            MessageProducer producer = session.createProducer(testqueue);
            TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("今天天气真好!我想出去走一走");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化
            producer.send(textMessage);
            session.commit();
            connection.close();
        }catch (Exception e){
   
            e.printStackTrace();
        }

    }
}
2.2.2 消费者
代码语言:javascript
复制
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {
   
    public static void main(String[] args) {
   
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination testqueue = session.createQueue("TEST1");

            MessageConsumer consumer = session.createConsumer(testqueue);
            consumer.setMessageListener(new MessageListener() {
   
                @Override
                public void onMessage(Message message) {
   
                    if(message instanceof TextMessage){
   
                        try {
   
                            String text = ((TextMessage) message).getText();
                            System.out.println(text);

                           // session.rollback();
                        } catch (JMSException e) {
   
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                             session.rollback();
                        }
                    }
                }
            });

        }catch (Exception e){
   
            e.printStackTrace();;
        }
    }
}

2.3 消息事务

producer提交时的事务

事务开启

只执行send并不会提交到队列中,只有当执行session.commit()时,消息才被真正的提交到队列中。

事务不开启

只要执行send,就进入到队列中。

consumer 接收时的事务

事务开启,签收必须写Session.SESSION_TRANSACTED

收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。

事务不开启,签收方式选择Session.AUTO_ACKNOWLEDGE

只要调用comsumer.receive方法 ,自动确认。

事务不开启,签收方式选择Session.CLIENT_ACKNOWLEDGE

需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。

事务不开启,签收方式选择Session.DUPS_OK_ACKNOWLEDGE

在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。

2.4 消息持久化

通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/100743.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021年7月1,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 分布式事务–消息队列
    • 1.思考
      • 2.分布式事务
        • 3.XA协议
          • 4.TCC
            • 5.消息队列
              • 6.疑问
              • 消息队列
                • 1.消息产品
                  • 2.ActiveMQ
                    • 2.1 整合activemq
                    • 2.2 队列消息
                    • 2.3 消息事务
                    • 2.4 消息持久化
                相关产品与服务
                消息队列 CMQ 版
                消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档