前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用JAVA获取ActiveMQ队列数据和状态

使用JAVA获取ActiveMQ队列数据和状态

作者头像
头发还在
发布2022-11-13 13:31:20
1.8K0
发布2022-11-13 13:31:20
举报
文章被收录于专栏:桃花源桃花源

1、向ActiveMQ中放入消息

代码语言:javascript
复制
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.BytesMessage;
import javax.jms.BytesMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;



public class Putmsg {
    // tcp 地址, tcp://localhost:61616
    private String url;
    private String user;
    private String pwd;
    //目标,队列或Topic名称
    private String qName;
    Session session = null;
    MessageProducer producer = null;
    //目标,TOPIC相关
    TopicSession tsession = null;
    TopicPublisher publisher = null;

    
    /**
     * 
     * @param url
     * @param user
     * @param pwd
     * @param qName
     */
    public Putmsg(String url, String user, String pwd, String qName){
        this.url = url;
        this.user = user;
        this.pwd = pwd;
        this.qName = qName;
    }
    
    /**
     * <b>function:</b> 发送消息
     * @param session
     * @param producer
     * @throws Exception
     */    
    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 0; i < 5; i++) {
            String message = "发送消息第" + (i + 1) + "条";
            BytesMessage text = session.createBytesMessage();
            
            System.out.println(message);
            producer.send(text);
        }
    }
    
    /**
     * 将指定数据放入到AMQ中
     * @param destPath 目录下所有文本,放入到AMQ中
     * @throws Exception
     */
    public void sendMsg4Path(String destPath){
        try {            
              File direct=new File(destPath);
              
              File[] tempList = direct.listFiles();
              System.out.println("该目录下需要放入到MQ的文件个数:"+tempList.length);
    
              int count = 0;
              
              for (int i = 0; i < tempList.length; i++) {
                  
                  if (tempList[i].isFile()) {
                      try {
                        //遍历文件并生成对应的字节码文件到目录中
                        File file = new File(tempList[i].getAbsolutePath());
                        
                        //可以换成工程目录下的其他文本文件
                        FileInputStream fis= new FileInputStream(file);
                        //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
                        ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); 
                        // ByteArrayOutputStream 是OutputStream的一个实现类 
                        int ch = 0;
                        //byte[] msg = null;
                        
                        while (true) {
                            //取得文本对应的16进制数据
                            ch = fis.read(); 
                            if(ch==-1) break;
                            //将FileInputStream 的内容写到 ByteArrayOutputStream 中
                            bytestream.write(ch);  
                        }
                        
                        bytestream.close();
                        //关闭文件
                        fis.close();
                        
                        byte imgdata[] = bytestream.toByteArray();

                        BytesMessage text = session.createBytesMessage();
                        text.writeBytes(imgdata);                
                        
                        
                        producer.send(text);
                        
                        //TODO setReadOnlyBody(true),输出其长度
                        text.reset();
                        System.out.println("len = " + text.getBodyLength());
                        
                        count = i + 1;
//                        System.out.println("Put the " + count +" file into the MQ! " +  tempList[i]);
                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                  }//判断是否为文件
               
              }//在指定目录下循环取文件
              
            
            System.out.println("Put "+ count +" files all fininshed!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
        
        
    /**
         * 将指定数据放入到AMQ中
         * @param destPath 目录
         * @param fileName 文件名,放入到AMQ中的内容
         * @throws Exception
    */
    public void sendMsg4File(String destPath, String fileName){
        try {            
              File direct=new File(destPath);
                  
              File[] tempList = direct.listFiles();
              System.out.println("该目录下需要放入到MQ的文件个数:"+tempList.length);
        
              int count = 0;
                  
              for (int i = 0; i < tempList.length; i++) {
                      
                  if (tempList[i].isFile() && tempList[i].getName().contains(fileName)) {
                      try {
                           //遍历文件并生成对应的字节码文件到目录中
                           File file = new File(tempList[i].getAbsolutePath());
                            
                            //可以换成工程目录下的其他文本文件
                            FileInputStream fis= new FileInputStream(file);
                            //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
                            ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); 
                            // ByteArrayOutputStream 是OutputStream的一个实现类 
                            int ch = 0;
                            //byte[] msg = null;
                            
                            while (true) { 
                                //取得文本对应的16进制数据
                                ch = fis.read(); 
                                if(ch==-1) break;
                                //将FileInputStream 的内容写到 ByteArrayOutputStream 中
                                bytestream.write(ch);  
                            }
                            
                            byte imgdata[] = bytestream.toByteArray();
                            bytestream.close();
                            fis.close();
                            
                            BytesMessage text = session.createBytesMessage();
                            text.writeBytes(imgdata);
                            
                            producer.send(text);
                            count +=1;
    //                        System.out.println("Put the " + count +" file into the MQ! " +  tempList[i]);
                            
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                      }//判断是否为文件
                   
                  }//在指定目录下循环取文件
                  
                
                System.out.println("Put "+ count +" files all fininshed!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    /**
         * 将指定数据放入到AMQ中
         * @param destPath 目录,或放入到AMQ中的内容
         * 当传入的参数既不是目录也不是文件,就把该参数放入到AMQ中
         * @throws Exception
    */
    public void sendMsg4Str(String msg){                  
        BytesMessage message;
        try {
            message = session.createBytesMessage();                
                
            byte[] bmsg = msg.getBytes();
            message.writeBytes(bmsg);
            System.out.println(msg);
                    
            producer.send(message);
                    
            // TODO
            message.reset();
            System.out.println("len = " + message.getBodyLength());
                    
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void putmsg2amq() throws Exception {
        
        Connection connection = null;

        try {
            // 创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(
                    user, pwd, url);
            // 通过工厂创建一个连接
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(qName);
            // 创建消息生产者
            producer = session.createProducer(destination);
            // 设置持久化模式
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            sendMessage(session, producer);
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    /**
     * 数据同步,将具体报文内容发送到AMQ
     * @param msg
     * @throws Exception
     */
    public void putmsg2amq(String msg) throws Exception {
            
            Connection connection = null;
    //        Session session = null;
            try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, pwd, url);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(qName);
                // 创建消息生产者
                producer = session.createProducer(destination);
                // 设置持久化模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                
                sendMsg4Str(msg);
                
            } catch (Exception e) {
                throw e;
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }

    /**
     * 将指定目录下所有文件中的内容,发送到AMQ
     * @param msg
     * @throws Exception
     */
    public void putmsg2amqPath(String path) {
            
       Connection connection = null;
       try{
           try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, pwd, url);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(qName);
                // 创建消息生产者
                producer = session.createProducer(destination);
                // 设置持久化模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
                sendMsg4Path(path);
                
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    
    }

    /**
     * 将指定目录下,指定文件的内容发送到AMQ
     * @param msg
     * @throws Exception
     */
    public void putmsg2amqFile(String path, String file) {
            
        Connection connection = null;
        try{
            try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, pwd, url);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(qName);
                // 创建消息制作者
                producer = session.createProducer(destination);
                // 设置持久化模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
                sendMsg4File(path, file);
                
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    
    }

    /**
     * 
     * @param msg
     * @throws Exception
     */
    public void putmsg2amqPath(String destPath, String filename) {
            
        Connection connection = null;

        try{
            try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, pwd, url);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(qName);
                // 创建消息制作者
                producer = session.createProducer(destination);
                // 设置持久化模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
                try {            
                    File direct=new File(destPath);
                    
                    File[] tempList = direct.listFiles();
                    System.out.println("该目录下需要放入到MQ的文件个数:"+tempList.length);
          
                    int count = 0;
                    
                    for (int i = 0; i < tempList.length; i++) {
                        
                        if (tempList[i].isFile()) {
                            try {
                                //遍历文件并生成对应的字节码文件到目录中
                                File file = new File(tempList[i].getAbsolutePath());
                              
                                //可以换成工程目录下的其他文本文件
                                FileInputStream fis= new FileInputStream(file);
                                //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
                                ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); 
                                // ByteArrayOutputStream 是OutputStream的一个实现类 
                                int ch = 0;
                              
                                while (true) { 
                                    //取得文本对应的16进制数据
                                    ch = fis.read(); 
                                    if(ch==-1) break;
                                    //将FileInputStream 的内容写到 ByteArrayOutputStream 中
                                    bytestream.write(ch);  
                                }
                              
                                bytestream.close();
                                //关闭文件
                                fis.close();
                              
                                byte imgdata[] = bytestream.toByteArray();

                                BytesMessage text = session.createBytesMessage();
                                text.writeBytes(imgdata);                        
                              
                              
                                producer.send(text);
                                
                                count = i + 1;
//                              System.out.println("Put the " + count +" file into the MQ! " +  tempList[i]);
                              
                          } catch (Exception e) {
                              e.printStackTrace();
                          }
                        }//判断是否为文件
                        
                    }//在指定目录下循环取文件

                  System.out.println("Put "+ count +" files all fininshed!");
              } catch (Exception e) {
                  e.printStackTrace();
              }
               
                
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    
    }
    
    
    /**
     * 向TOPIC中放入消息
     * @throws Exception
     */
    public void putmsg2Topic(String path){
        
        TopicConnection connection = null;

        try{
            try {
                // 创建链接工厂
                TopicConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, 
                        pwd, 
                        url);
                // 通过工厂创建一个连接
                connection = factory.createTopicConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                tsession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Topic topic = tsession.createTopic(qName);
                // 创建消息发送者
                publisher = tsession.createPublisher(topic);
                // 设置持久化模式
                publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                sendMessage(tsession, publisher);
                
            } catch (Exception e) {
               System.out.println(e.getMessage());
            } finally {
                // 关闭释放资源
                if (tsession != null) {
                    tsession.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }catch (Exception e) {
               System.out.println(e.getMessage());
        }
    }

    /**
         * 将指定数据放入到AMQ的TOPIC中
         * @param destPath 目录下所有文本,放入到AMQ TOPIC中
         * @throws Exception
         */
    public void sendMsgTopic4Path(String destPath){
            try {            
                  File direct=new File(destPath);
                  
                  File[] tempList = direct.listFiles();
                  System.out.println("该目录下需要放入到MQ的文件个数:"+tempList.length);
        
                  int count = 0;
                  
                  for (int i = 0; i < tempList.length; i++) {
                      
                      if (tempList[i].isFile()) {
                          try {
                            //遍历文件并生成对应的字节码文件到目录中
                            File file = new File(tempList[i].getAbsolutePath());
                            
                            //可以换成工程目录下的其他文本文件
                            FileInputStream fis= new FileInputStream(file);
                            //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
                            ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); 
                            // ByteArrayOutputStream 是OutputStream的一个实现类 
                            int ch = 0;
                            //byte[] msg = null;
                            
                            while (true) {
                                //取得文本对应的16进制数据
                                ch = fis.read(); 
                                if(ch==-1) break;
                                //将FileInputStream 的内容写到 ByteArrayOutputStream 中
                                bytestream.write(ch);  
                            }
                            
                            bytestream.close();
                            //关闭文件
                            fis.close();
                            
                            byte imgdata[] = bytestream.toByteArray();
    
                            BytesMessage text = tsession.createBytesMessage();
                            text.writeBytes(imgdata);                        
                            
                            
                            publisher.send(text);
                            count = i + 1;
    //                        System.out.println("Put the " + count +" file into the MQ! " +  tempList[i]);
                            
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                      }//判断是否为文件
                   
                  }//在指定目录下循环取文件
                  
                
                System.out.println("Put "+ count +" files all fininshed!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
}

2、从ActiveMQ中取出消息

代码语言:javascript
复制
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;


/**
 * <b>function:</b> 消息接收者
 */
public class Getmsg {
    private String url;
    private String user;
    private String pwd;
    //目标,队列或Topic名称
    private String qName;
    Session session = null;
    MessageProducer producer = null;
    //目标,TOPIC相关
    TopicSession tsession = null;
    TopicPublisher publisher = null;
    

    
    /**
     * 
     * @param url
     * @param user
     * @param pwd
     * @param qName
     */
    public Getmsg(String url, String user, String pwd, String qName){
        this.url = url;
        this.user = user;
        this.pwd = pwd;
        this.qName = qName;
    }
    
    public BytesMessage getmsg() {
        BytesMessage text = null;
         
        Connection connection = null;
        try {
            // 创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(
                    user, pwd, url);
            // 通过工厂创建一个连接
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(qName);
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            
            // 接收数据的时间(等待) 100 ms
            Message message = consumer.receive(100);
            
            text = (BytesMessage) message;
            
        } catch (Exception e) {
            e.getStackTrace();
        } finally {
            // 关闭释放资源
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
        
        return text;
    }
    
    
    public static void main(String[] args) throws Exception {
        //从AMQ队列取得数据并存入文件中
        Getmsg g = new Getmsg("tcp://localhost:61616","amq", "123456", "testmq");

        BytesMessage bm = g.getmsg();
        
        int msgLenth = (int)bm.getBodyLength();
        byte[] bmArr = new byte[msgLenth];
        bm.readBytes(bmArr);
        
        File file = new File("D:/test.txt");
        FileOutputStream fos = new FileOutputStream(file);
        BufferedOutputStream bs = new BufferedOutputStream(fos);
        
        bs.write(bmArr);
        
        bs.close();
        fos.close(); 
    }
}
代码语言:javascript
复制
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;

import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;



/**
 * <b>function:</b> 消息接收者
 */
public class Jmx4Amq {
	private String uri;
    private String user;
    private String pwd;
    //目标,队列或Topic名称
    private String qName;
    private BrokerViewMBean mBean = null;
    private MBeanServerConnection connection = null;
    private JMXConnector connector = null;

    
    
    /**
     * 
     * @param url
     * @param user
     * @param pwd
     * @param qName
     */
    public Jmx4Amq(String uri, String user, String pwd){
    	this.uri = uri;
    	this.user = user;
    	this.pwd = pwd;
    }
    
    /**
     * 对JMX连接中的对象进行初始化
     */
    public void getStatus(){
        try {
        	
        	HashMap<String, Object> prop = new HashMap<String, Object>(); 
        	//jmx.password
        	String[] au = {user,pwd};
            prop.put(JMXConnector.CREDENTIALS, au); 
        	
        	JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"+
            		uri+"/jmxrmi");
            connector = JMXConnectorFactory.connect(url, prop);
            connector.connect();
            connection = connector.getMBeanServerConnection();

             // 需要注意的是,这里的jms-broker必须和上面配置的名称相同
            ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=Broker_Name");
            mBean =  (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection,  
            		name, BrokerViewMBean.class, true);
            
            for(ObjectName queueName : mBean.getQueues()) {
            	QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler.
            			newProxyInstance(connection, queueName, QueueViewMBean.class, true);

            	// 消息队列名称
            	System.out.println(queueMBean.getName());
            	// 队列中剩余的消息数
            	System.out.println(queueMBean.getQueueSize());
            	// 消费者数
            	System.out.println(queueMBean.getConsumerCount());
            	// 入队数
            	System.out.println(queueMBean.getEnqueueCount());
            	// 出队数
            	System.out.println(queueMBean.getDequeueCount());
            }
        }catch(Exception e){
        	e.printStackTrace();
        }
    }
    
    
    /**
     * 清空队列中的数据
     */
    public void clearMsg(){
        try {        	
            //遍历AMQ中的对象
            for(ObjectName queueName : mBean.getQueues()) {
                QueueViewMBean queueMBean =  (QueueViewMBean)MBeanServerInvocationHandler
                			.newProxyInstance(connection, queueName, QueueViewMBean.class, true);
                //找到匹配队列,执行purge操作
                if(queueMBean.getName().equals(qName)){
                	queueMBean.purge();
                	break;
                }
            }
            
        }catch(Exception e){
        	e.printStackTrace();
        }
    }
    
    /**
     * 关闭JMX连接
     */
    public void closeJmxConn(){
        try {
			connector.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
    }
    
    public static void main(String[] args) throws Exception {
    	//从AMQ取得数据
    	Jmx4Amq g = new Jmx4Amq("localhost:11099","admin","cacikf88");
        g.getStatus();
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-07-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、向ActiveMQ中放入消息
  • 2、从ActiveMQ中取出消息
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档