Spring消息之JMS.

 一、概念

  • 异步消息简介

    与远程调用机制以及REST接口类似,异步消息也是用于应用程序之间通信的。

    RMI、Hessian、Burlap、HTTP invoker和Web服务在应用程序之间的通信机制是同步的,即客户端应用程序直接与远程服务相交互,并且一直等到远程过程完成后才继续执行。而消息是异步发送的,客户端不需要等待服务处理消息,甚至不需要等待消息投递完成。客户端发送消息,然后继续执行,这是因为客户端假定服务最终可以收到并处理这条消息。

  • 优缺点

优点:

    1. 异步通信。客户端无需等待服务端的响应,节省时间,提升客户端的效率。
    2. 面向消息与解耦。客户端不需要与特定的方法签名绑定,任何可以处理数据的队列或主题订阅者都可以处理由客户端发送的消息,而客户端不必了解远程服务的任何规范。
    3. 位置独立。由于客户端并不直接与服务端通信,而是把消息交由消息代理。因此,只要服务能够从队列或主题中获取消息即可,消息客户端根本不需要关注服务来自哪里。而且可以使用服务器集群监听同一个消息代理提升服务器负载。

缺点:

    1. 增加复杂度。毫无疑问,消息代理这个东西是多出来的,需要维护成本。
    2. 暂时的不一致性。异步消息方式可以确保最终的一致性,但是可能存在客户端把消息给了消息队列,而服务端暂时还没处理这个队列导致的暂时不一致性问题。
  • 应用场景
    1. 客户端并不需要服务端的反馈,诸如此类的非核心流程异步化处理。
    2. 流量削峰。比如很多的秒杀场景,用户的请求,服务器接收后,首先写入消息队列,接着再根据业务做后续处理。
    3. 日志处理。将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
    4. 消息通讯。消息队列一般都内置了高效的通信机制,因此也可以用于单纯的消息通讯,比如实现点对点消息队列或者聊天室等。
  • 消息模型

点对点消息模型

在点对点模型中,每一条消息都有一个发送者和一个接收者,如图17.3所示。当消息代理得到消息时,它将消息放入一个队列中。当接收者请求队列中的下一条消息时,消息会从队列中取出,并投递给接收者。因为消息投递后会从队列中删除,这样就可以保证消息只能投递给一个接收者。

发布-订阅消息模型

 在发布—订阅消息模型中,消息会发送给一个主题。与队列类似,多个接收者都可以监听一个主题。但是,与队列不同的是,消息不再是只投递给一个接收者,而是主题的所有订阅者都会接收到此消息的副本,如图17.4所示。

二、集成实现JMS

    Java消息服务(Java Message Service ,JMS)是一个Java标准,定义了使用消息代理的通用API。借助JMS,所有遵从规范的实现都使用通用的接口,这就类似于JDBC为数据库操作提供了通用的接口一样。

    Spring通过基于模板的抽象为JMS功能提供了支持,这个模板也就是JmsTemplate。使用JmsTemplate,能够非常容易地在消息生产方发送队列和主题消息,在消费消息的那一方,也能够非常容易地接收这些消息。Spring还提供了消息驱动POJO的理念:这是一个简单的Java对象,它能够以异步的方式响应队列或主题上到达的消息。

    接下来让我们来看看在Spring中如何集成实现JMS:

  •  搭建消息代理

    我们首先需要一个消息代理,作为客户端和服务端通信的中介。ActiveMQ是一个伟大的开源消息代理产品,也是使用JMS进行异步消息传递的最佳选择。下载地址:http://activemq.apache.org/ ,下载完成后解压缩到本地硬盘,在bin目录下,我们可以看到为各种操作系统所创建的对应子目录。在这些子目录下,我们可以找到用于启动ActiveMQ的脚本。

    启动好ActiveMQ后,添加如下的 pom 依赖:

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.3</version>
        </dependency>
  • 建立连接工厂、消息目的地

连接工厂:

    <!--1、ActiveMQ 工厂 2、amq命名空间方式 3、默认监听端口61616 4、默认用户名:admin 密码:admin -->
    <amq:connectionFactory id="connectionFactory" 
                           brokerURL="tcp://localhost:61616" 
                           userName="admin" 
                           password="admin"/>

消息目的地:

消息目的地又分为 队列 和 主题 两种:

    <!--1、定义消息目的地,可以是队列或者主题两种方式 2、借助physicalName属性指定消息通道的名称-->
    <amq:queue id="queueDestination" physicalName="queueName"/>
    <amq:topic id="topicDestination" physicalName="topicName"/>
  • 使用 JmsTemplate

为了消除冗余和重复的JMS代码,Spring 给出的解决方案就是JmsTemplate。JmsTemplate可以创建连接、获得会话以及发送和接收消息。这使得我们可以专注于构建要发送的消息或者处理接收到的消息。另外,JmsTemplate可以处理所有抛出的笨拙的JMSException异常。

    <!--1、jmsTemplate 2、defaultDestination 定义了默认的消息目的地 3、messageConverter 消息转换器 -->
    <bean id="jmsTemplate"
          class="org.springframework.jms.core.JmsTemplate"
          p:connectionFactory-ref="connectionFactory"
          p:defaultDestination-ref="queueDestination"
          p:messageConverter-ref="messageConverter"/>

    <!--MessageConvert-->
    <bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

JmsTemplate 可以非常的简单的实现消息的发送和接收功能,让我们来看看吧!

发送消息(convertAndSend):

    @Autowired
    private JmsOperations jmsOperations;

    /**
     * jmsOperations.convertAndSend() 方法,"queueName" 不填写,用默认的 Destination
     */
    @Test
    public void convertAndSend(){
        Map<String ,Object> map = new HashMap<>(16);
        map.put("java", "java");
        map.put("python", "python");
        map.put("c++", "c++");
        jmsOperations.convertAndSend("queueName", map);
    }

接收消息(receiveAndConvert):

    @Autowired
    private JmsOperations jmsOperations;

    /**
     * jmsOperations 的 receiveAndConvert() 方法
     */
    @Test
    public void receiveAndConvert(){
        Map<String, Object> map = (Map) jmsOperations.receiveAndConvert("queueName");
    }

这里有几点需要说明一下;

1、除了 convertAndSend()  和 receiveAndConvert() 方法,JmsTempalte 还支持 send() 和 receive() 方法来发送和接收消息,就是写起来麻烦点,还要自己处理 JMSException。可参考我的源码~

2、convertAndSend() 和 receiveAndConvert() 方法 如果不指定 消息通道名称,即上面的 "queueName"。采用JmsTemplate 默认设置的,即 defaultDestination 关联的消息目的地中的消息通道。

3、convertAndSend() 和 receiveAndConvert() 方法 能便捷的实现 发送和接收消息功能,原因是 消息转换器 !发送时,JmsTemplate 先把消息内容转换成对应Message;接收时,JmsTemplate 再把对应Message 转换回消息内容。JmsTemplate 定义了多个消息转换器。如上,我用了 SimpleMessageConverter 转换器,也就是 JmsTemplate 中默认使用的转换器(不设置用的就是这个转换器)。如果需要,还可自定义转换器呢!

  • 创建消息监听器

使用JmsTemplate接收消息的最大缺点在于receive()和receiveAndConvert()方法都是同步的。这意味着接收者必须耐心等待消息的到来,因此这些方法会一直被阻塞,直到有可用消息(或者直到超时)。同步接收异步发送的消息,是不是感觉很怪异?

如果一发送消息就能被对应的方法处理,岂不美哉?

    <jms:listener-container connection-factory="connectionFactory">
        <jms:listener destination="queueName" ref="queueMessageHandler" method="handle"/>
        <jms:listener destination="topicName" ref="topicMessageHandler" method="handle"/>
    </jms:listener-container>

在这里,我们在消息监听器容器中包含了消息监听器。消息监听器容器(message listener container)是一个特殊的bean,它可以监控JMS目的地并等待消息到达。一旦有消息到达,它取出消息,然后把消息传给任意一个对此消息感兴趣的消息监听器。注意!关键词 任意一个 !说明即使多个消息监听器监听同一个消息通道,仍然只会有一个消息监听器执行!!另外,destination 指的是消息通道的名称,并不是JMS目的地的 id 。ref 连接的是 Spring 的 bean 。methon 指的是这个bean中处理这个 消息的方法,需要注意的是 这个方法的形参!如果放入消息通道的数据类型是 字符串的话,那这个方法的形参也要用字符串接收;如果放入消息通道的数据类型是 集合的话,那这个方法的形参也要用对应集合类型接收。

三、使用基于消息的RPC 

    为了支持基于消息的RPC,Spring提供了JmsInvokerServiceExporter,它可以把bean导出为基于消息的服务;同时,为客户端提供了JmsInvokerProxyFactoryBean来使用这些服务。

  • 导出基于JMS的服务

把bean导出为基于消息的服务,利用的是Spring的 JmsInvokerServiceExporter,如下:

    <bean id="jmsServer"
          class="org.springframework.jms.remoting.JmsInvokerServiceExporter"
          p:serviceInterface="org.springframework.message.activemq.rpc.JmsServer"
          p:service-ref="jmsServerImpl"/>

    这个bean的属性描述了导出的服务应该是什么样子的。service-ref 属性设置为 jmsServerImpl 的引用,它是远程服务的实现。同时,serviceInterface 属性设置为远程服务对外提供接口的全限定类名。

JmsInvokerServiceExporter 可以充当JMS监听器来进行服务间的通信。即客户端 调用这个服务的时候,就可以立即 用这个服务的实现 来处理客户端的调用啦!因为我们监听了这个服务!如下:

    <jms:listener-container connection-factory="connectionFactory">
        <!--利用jms监听器导出消息服务-->
        <jms:listener destination="sparta" ref="jmsServer"/>
    </jms:listener-container>

我们为JMS监听器容器指定了连接工厂,所以它能够知道如何连接消息代理,而<jms:listener>声明指定了远程消息的目的地。

  • 使用基于JMS的服务

JmsInvokerProxyFactoryBean 是一个远程代理工厂bean,代理了通过JmsInvokerServiceExporter所导出的JMS服务。它隐藏了访问远程服务的细节,并提供一个易用的接口,通过该接口客户端与远程服务进行交互。

    <!--远程代理工厂 bean ,供客户端访问-->
    <bean id="jmsServerProxy" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean"
          p:serviceInterface="org.springframework.message.activemq.rpc.JmsServer"
          p:connectionFactory-ref="connectionFactory"
          p:queueName="sparta"/>

    对于serviceInterface,指定了代理应该通过 JmsServer 接口暴露功能。queueName 指定要连接的消息代理的名称。

测试类:

    @Test
    public void test01(){
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
        JmsServer service = (JmsServer)context.getBean("jmsServerProxy");
        service.doServer("Hello Message");
    }

tips:使用基于消息的RPC,只研究了用法。还没想到用在什么场景~  各位指教?

演示代码下载:https://github.com/JMCuixy/SpringMessage

参考资料:《Spring 实战第四版》

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏linux系统运维

原 主动模式和被动模式,添加监控主机,添加

1582
来自专栏电光石火

修改Centos服务器主机名称

Centos服务器安装好之后,默认的主机名为:localhost.localdomain,为了便与管理,我们需要对服务器主机名称进行修改,此修改生效涉及到两个配...

1662
来自专栏繁花云

Centos升级系统到最新内核

2360
来自专栏IT技术精选文摘

Kafka Topic架构-复制、故障切换和并行处理

本文介绍了Kafka主题的架构,并讨论了分区,如何做故障切换和并行处理。 Kafka Topic,日志和分区 回想一下,Kafka Topic是一个命名的记录...

3167
来自专栏KaliArch

Kafka-manager部署

1.1 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。

3155
来自专栏mini188

在Openfire上弄一个简单的推送系统

推送系统 说是推送系统有点大,其实就是一个消息广播功能吧。作用其实也就是由服务端接收到消息然后推送到订阅的客户端。 思路 对于推送最关键的是服务端向客户端发送数...

23610
来自专栏高爽的专栏

共同父域下的单点登录

      单点登录(Single Sign On),简称为SSO,SSO不仅在企业级开发很常用,在互联网中更是大行其道。随便举几个例子,比如我们登录新浪微博后...

2150
来自专栏专注研发

kafka安装与测试

Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic, Topic即主题,通过对消息指定主题可以将消息分类,消费...

1501
来自专栏云计算教程系列

如何在Ubuntu 16.04上使用PM2和Nginx开发Node.js TCP服务器应用程序

Node.js是一个流行的开源JavaScript运行时环境,它基于Chrome的V8 Javascript引擎构建。Node.js用于构建服务器端和网络应用程...

2123
来自专栏JavaEdge

2018-08-02

hibernate执行更新需要较长时间,因此需要等待,否则无法获得更新后字段

922

扫码关注云+社区