JMS消息中间件之ActiveMQ学习

1、下载

下载二进制bin文件:http://activemq.apache.org/activemq-5132-release.html

下载源码:

2、启动:

解压任意路径:

启动后:

3、访问:

访问http://localhost:8161/admin/  用户名&密码:admin

4、主要应用:

5、点对点消息发送&接收

首先是producer方:

package com.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * 
 * @author Administrator
 *
 */
public class JMSProducer {

    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线

    private static final int SENDNUM = 10; // 发送消息总量

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地

        MessageProducer messageProducer = null; // 消息生产者

        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
                JMSProducer.BROKERURL);
        try {
            connection = connectionFactory.createConnection();

            connection.start(); // 启动连接

            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建session,true表示添加事务

            destination = session.createQueue("FirstQueue"); // 创建消息队列

            messageProducer = session.createProducer(destination); // 创建消息生产者
            
            sendMessage(session, messageProducer);
            
            session.commit();            //commit提交
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发送消息
     * 
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < SENDNUM; i++) {    
            TextMessage textMessage = session.createTextMessage("Active MQ消息"+i);        //文本消息
            System.out.println("发送消息: Active MQ消息"+i);
            messageProducer.send(textMessage);
        }
    }

}

然后是消费方实现,主要有两种,一种是直接receive方法接收消息,一种是通过监听实现:

receive:

package com.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
                JMSConsumer.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createQueue("FirstQueue"); // 创建消息队列

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            while (true) {
                TextMessage textMessage = (TextMessage)messageConsumer.receive(100000);        //接收消息(文本消息)
                if(textMessage != null){
                    System.out.println("接收到的消息:"+textMessage.getText());
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}

监听方式:

package com.activemq.test;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        
        try {
            System.out.println("接收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
}
package com.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer2 {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD,
                JMSConsumer2.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createQueue("FirstQueue"); // 创建消息队列

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new Listener());        //注册监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}

几轮测试下来,消费生产记录:

6、发布订阅模式

新建订阅1:

package com.activemq.test2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者    -- 消息订阅者1
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
                JMSConsumer.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createTopic("SecondTopic"); // 创建消息订阅

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new Listener());        //注册监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}
package com.activemq.test2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听-消息订阅者1的监听
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        
        try {
            System.out.println("订阅者1接收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
}

订阅2:

package com.activemq.test2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者    -- 消息订阅者2
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer2 {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD,
                JMSConsumer2.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createTopic("SecondTopic"); // 创建消息订阅

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new Listener2());        //注册监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}
package com.activemq.test2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听-消息订阅者1的监听
 * @author Administrator
 *
 */
public class Listener2 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        
        try {
            System.out.println("订阅者2接收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
}

消息一定要先订阅,然后producer再发布,否则先发布再订阅的话后边才订阅的一方是收不到之前发布的消息的!

然后是发布方:

package com.activemq.test2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * 
 * @author Administrator
 *
 */
public class JMSProducer {

    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线

    private static final int SENDNUM = 10; // 发送消息总量

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地

        MessageProducer messageProducer = null; // 消息生产者

        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
                JMSProducer.BROKERURL);
        try {
            connection = connectionFactory.createConnection();

            connection.start(); // 启动连接
            
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建session,true表示添加事务

            destination = session.createTopic("SecondTopic"); // 创建发布主题
                
            messageProducer = session.createProducer(destination); // 创建消息发布者
            
            sendMessage(session, messageProducer);

            session.commit(); // commit提交
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 发送消息
     * 
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < SENDNUM; i++) {
            TextMessage textMessage = session.createTextMessage("Active MQ发布消息" + i); // 文本消息
            System.out.println("发送消息: Active MQ 发布的消息" + i);
            messageProducer.send(textMessage);
        }
    }
}

运行效果查看:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏青枫的专栏

day62_Mybatis学习笔记_02

(1)创建扩展PO类   一般User.java类要和数据表表字段一致,最好不要在这里面添加其他字段,今天学习mybatis的逆向工程时,会根据表结构,生成po...

452
来自专栏个人分享

JMS的常用方法

561
来自专栏肖力涛的专栏

Spark 踩坑记:数据库(Hbase+Mysql)

在上篇《spark踩坑记—— 初试》文章中,给大家讲述了spark的基本概念和简要操作 ,这篇文章将对spark操作hbase和mysql的内容进行总结,并且对...

1.4K1
来自专栏SmartSql

SmartCode.ETL 这不是先有鸡还是蛋的问题!

相信不少同学都用过各种代码生成器,这里我就不做详细介绍了,如果想体验 SmartCode.Generator 请至 https://www.cnblogs.co...

1256
来自专栏张善友的专栏

IBatisNet之获取和操作SQL语句

IBatisNet和其他的ORMapping的工具相比较,可以说是一个sqlmap,所以在Why use iBATIS SQL Maps? 中有一条理由是 Y...

1808
来自专栏Java帮帮-微信公众号-技术文章全总结

Hibernate_day02总结

Hibernate_day02总结 今日内容 l Hibernate持久化对象的状态 l Hibernate的一级缓存 l Hibernate操作持久化对象的方...

34711
来自专栏数据库新发现

Oracle数据库恢复:归档日志损坏案例一则

链接:http://www.eygle.com/archives/2010/11/recover_archivelog_corruption.html

692
来自专栏日常分享

MyBatis 基本构成与框架搭建

        根据配置信息(eg:mybatis-config.xml)或者代码来生成SqlSessionFactory。

1142
来自专栏java学习

数据库连接池C3P0,DBCP教程详解示例

连接池 实际开发中“获得连接”或“释放资源”是非常消耗系统资源的两个过程,为了解决此类性能问题,通常情况我们采用连接池技术,来共享连接Connection。...

4826
来自专栏Java开发者杂谈

Netty(1):第一个netty程序

为什么选择Netty   netty是业界最流行的NIO框架之一,它的健壮型,功能,性能,可定制性和可扩展性都是首屈一指的,Hadoop的RPC框架Avro就使...

3677

扫码关注云+社区