【EJB学习笔记】——JMS和消息驱动Bean

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/details/50996458

  认识消息驱动Bean之前,先了解一下JMS。

JMS


  JMS(Java Message Service):java消息服务,客户端与服务端之间可以通过JSM服务进行消息的异步传输(消息的发送和消息的接收不是同时进行的,即发送了消息后,不需要等待消息的返回就可以继续执行),客户端只管发送,不需要考虑服务端什么时候处理。因此,如果客户端与服务端对消息发送和接收对时间相关不是很严格的话,用JMS可以很大程度上提高性能。

  JMS支持两种消息模型:Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub)。

点对点模型(P2P)

  生产者(发送者)异步把消息发送到队列,消费者(接受者)从队列中获取消息。消息在被消费或超时之前,始终保持在消息队列中。

  特点:   1、生产者和消费者之间没有时间依赖性,无论消费者是否收到消息,都不影响生产者发送消息;   2、消费者收到消息后需要向队列反馈;   3、适用于每条消息都需要被消费者消费的场景。

发布/订阅模型(Pub/Sub)

  与P2P不同的是,一个生产者把消息发布后,这些消息可以传送给多个消费者。

  特点:每条消息可以有多个消费者。

消息驱动Bean(以下简称MDB)


  在上面的JMS介绍中了解了异步消息,消息驱动Bean可以看做是异步消息的消费者。

  实现消息驱动Bean,需要在JBoss的安装目录(jboss-5.0.1.GA\server\default\deploy)下添加一个配置文件:

  xxx-service.xml

<?xml version="1.0" encoding="UTF-8" ?> 
<server> 
   <!-- Queue,name:Queue的名称 -->
   <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.org.destination:server=Queue,name=myqueue" > 
     <!-- JNDI名称 -->
     <attribute name="JNDIName">queue/myqueue</attribute> 
     <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> 
   </mbean> 
   <!-- Topic,name:Topic的名称-->
   <mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.org.destination:server=Topic,name=mytopic" > 
     <!-- JNDI名称 -->
     <attribute name="JNDIName">topic/mytopic</attribute> 
     <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> 
   </mbean> 
</server>

实现P2P模式的消息驱动Bean

  服务端

  MyQueueMDBBean.java

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(
    activationConfig={
            @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
            @ActivationConfigProperty(propertyName="destination",propertyValue="queue/myqueue")
    }
)
public class MyQueueMDBBean implements MessageListener{
    static int i=0;
    @Override
    public void onMessage(Message msg) {

        TextMessage textMessage=(TextMessage)msg;
        try {
            System.out.println("【MyQueueMDBBean】消息"+(i++)+"被接收了。TextMessage:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

  对以上代码的说明:

  用@MessageDriven注解来定义消息驱动Bean,如果查看EJB的源码会发现,MessageDriven中有一个数组类型的变量activationConfig:

ActivationConfigProperty[] activationConfig() default {}; 

  所以这里需要为activationConfig赋值:

activationConfig={
            @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
            @ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic")
    }

  destinationType属性值为javax.jms.Topic说明此MDB实现的是P2P模式的消息服务;destination属性值为topic/mytopic表示此MDB的消息来源,也表示生产者的发送消息的目的地,jndi地址为topic/mytopic,这个可以在xxx-service.xml中自定义。

  客户端

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;

public class MyQueueMDBBeanClient {
     public static void main(String[] args) throws Exception {
        InitialContext context=new InitialContext();

        //获取QueueConnectionFactory
        QueueConnectionFactory factory=(QueueConnectionFactory)context.lookup("ConnectionFactory");

        //创建QueueConnection对象
        QueueConnection connection=factory.createQueueConnection();

        //创建QueueSession对象,第一个参数表示事务自动提交,第二个参数表示一旦消息被正确送达,将自动发回响应
        QueueSession session=connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

        //获取Destination对象
        Queue queue=(Queue)context.lookup("queue/myqueue");

        //创建文本消息
        TextMessage msg=session.createTextMessage("Hello world!");

        //创建发送者
        QueueSender sender=session.createSender(queue);

        //发送信息
        for(int i=0;i<10;i++){
            sender.send(msg);
            System.out.println("消息"+i+"已经发送");
        }

        //关闭会话
        session.close();
        connection.close();
    }
}

  客户端执行结果

消息0已经发送
消息1已经发送
消息2已经发送
消息3已经发送
消息4已经发送
消息5已经发送
消息6已经发送
消息7已经发送
消息8已经发送
消息9已经发送

  EJB执行结果

13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息4被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息6被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息8被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息9被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息2被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息7被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息5被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息0被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息1被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息3被接收了。TextMessage:Hello world!

  从结果可以看出,发送消息的时候是有序的,但是MDB接收消息不一定是有序的。

实现Pub/Sub模式的消息驱动Bean

  服务端

  MyTopicMDBBean1.java

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(
    activationConfig={
            @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),
            @ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic")
    }
)
public class MyTopicMDBBean1 implements MessageListener{
    @Override
    public void onMessage(Message msg) {    
        TextMessage textMessage=(TextMessage)msg;
        try {
            System.out.println("【MyTopicMDBBean1】消息被接收了。TextMessage:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  MyTopicMDBBean2.java、MyTopicMDBBean3.java

  MyTopicMDBBean2.java、MyTopicMDBBean3.java的代码与MyTopicMDBBean1.java的实现一模一样。

  客户端

import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;

public class MyTopicMDBBeanClient {
     public static void main(String[] args) throws Exception {
            InitialContext context=new InitialContext();

            //获取QueueConnectionFactory
            TopicConnectionFactory factory=(TopicConnectionFactory)context.lookup("ConnectionFactory");

            //创建QueueConnection对象
            TopicConnection connection=factory.createTopicConnection();

            //创建QueueSession对象,第一个参数表示事务自动提交,第二个参数表示一旦消息被正确送达,将自动发回响应
            TopicSession session=connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

            //获取Destination对象
            Topic topic=(Topic)context.lookup("topic/mytopic");

            //创建文本消息
            TextMessage msg=session.createTextMessage("Hello world!");

            //创建发布者
            TopicPublisher publisher=session.createPublisher(topic);

            //发布信息
            publisher.publish(msg);
            System.out.println("消息已经发布");           

            //关闭会话
            session.close();    
            connection.close();
        }
}

  客户端执行结果

消息已经发布

  EJB执行结果

14:16:24,287 INFO  [STDOUT] 【MyTopicMDBBean01】消息被接收了。TextMessage:Hello world!
14:16:24,287 INFO  [STDOUT] 【MyTopicMDBBean03】消息被接收了。TextMessage:Hello world!
14:16:24,288 INFO  [STDOUT] 【MyTopicMDBBean02】消息被接收了。TextMessage:Hello world!

  这种场景类似于,我新发表了一篇博客,订阅我博客的人都会收到RSS推送。


【 转载请注明出处——胡玉洋《【EJB学习笔记】——JMS和消息驱动Bean》】

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏惨绿少年

zabbix 2.2.20 安装详解(Centos6.9)

环境说明 [root@centos ~]# cat /etc/redhat-release CentOS release 6.9 (Final) [root@...

25600
来自专栏北京马哥教育

如何快速截取某段时间内的日志

在排除故障时,需要分析发生故障的原因,避免再次出现同样的问题,需要对日志进行分析,截取故障前后时间段的日志。通常用正则表达式来获取某段时间的内的日志,比较繁琐。...

30460
来自专栏编程

ActiveMQ消息队列的使用及应用

ActiveMQ消息队列的使用及应用 这里就不说怎么安装了,直接解压出来就行了。 谢绝转载,作者保留所有权力 目录: 复制代码 一:JMQ的两种消息模式 1.1...

36070
来自专栏Java开发者杂谈

ActiveMQ专题2: 持久化

​ 前面一篇AMQ专题中,我们发现对于Topic这种类型的消息,即使将deliveryMode设置为持久化,只要生产者在消费者之前启动。消息生产者发布的消息还是...

12230
来自专栏积累沉淀

干货--JMS(java消息服务)整合Spring项目案例

Sprng-jms消息服务小项目 所需的包: spring的基础包 spring-jms-xx包 spring-messag...

579100
来自专栏Petrichor的专栏

macOS: 没有移动硬盘的 写权限

Note: 这里以希捷(seagate)硬盘为例。其他牌子移动硬盘写权限的问题可以举一反三。

46740
来自专栏帅小子的日常

ActiveMQ消息传递的两种方式

41190
来自专栏菩提树下的杨过

ActiveMQ笔记(1):编译、安装、示例代码

一、编译 虽然ActiveMQ提供了发布版本,但是建议同学们自己下载源代码编译,以后万一有坑,还可以尝试自己改改源码。 1.1 https://github.c...

33050
来自专栏difcareer的技术笔记

ubuntu14.04编译Android4.4源码

安装JDK: 因为我是编译android4.4,故jdk6即可满足。ubuntu14.04不支持apt-get安装,我们来手动安装一下:

9530
来自专栏Java开发者杂谈

ActiveMQ专题1: 入门实例

​ 从上面的代码可以看出,生产者和消费者的处理流程大致相同。存在很多重复代码,不难发现可以抽取出公共的代码来使得代码更加简洁。

13020

扫码关注云+社区

领取腾讯云代金券