前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >amq简单使用_rabbitmq发送消息

amq简单使用_rabbitmq发送消息

作者头像
Java架构师必看
发布2022-08-01 08:31:41
7310
发布2022-08-01 08:31:41
举报
文章被收录于专栏:Java架构师必看

大家好,我是架构君,一个会写代码吟诗的架构师。今天说一说amq简单使用_rabbitmq发送消息,希望能够帮助大家进步!!!

queue sender

代码语言:javascript
复制
package org.arrow.amq.test;
import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnection;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.activemq.jms.pool.PooledProducer;
import org.apache.activemq.jms.pool.PooledSession;

import java.util.Random;


public class Sender { 
   

    public static void main(String[] args) throws JMSException, InterruptedException {
        final String URL = "failover:(tcp://192.168.2.44:61616,tcp://192.168.2.48:61616,tcp://192.168.2.49:61616)";


        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, URL);

        PooledConnectionFactory fact2 = new PooledConnectionFactory();
        fact2.setConnectionFactory(factory);


        Connection connection = fact2.createConnection();

        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);


        Destination desc = session.createQueue("TestQueue");
        MessageProducer producer = session.createProducer(desc);


        Queue replyTo = session.createQueue("TestReplyToQueue");

        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

       while(true){
           int flag = new Random().nextInt(1024);
           ObjectMessage msg = session.createObjectMessage("hello world..." + flag);
            // 使用不同的属性,消费时可以只消费指定的消息
            msg.setStringProperty("group", (flag % 2) + "" );
            // 可以在此放一个replyTo, 告诉消费者回复到哪个消息队列
            // 消费者可以在onMessage时取出来,并手动生成一个生成者发送数据到该队列
            msg.setJMSReplyTo(replyTo);
            producer.send(msg);
           Thread.sleep(1000);
        }
// session.commit();
// System.out.println("sent...");
// session.close();
// connection.close();
    }

}

只听到从架构师办公室传来架构君的声音:

小桃无主自开花,烟草茫茫带晚鸦。有谁来对上联或下联?

receiver

代码语言:javascript
复制
package org.arrow.amq.test;
import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver { 
   

    static int i = 0;

    public static void main(String[] args) throws JMSException {
        final String URL = "failover:(tcp://192.168.2.44:61616,tcp://192.168.2.49:61616,tcp://192.168.2.49:61616)";
        // 连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, URL);
        // 获取连接
        final Connection connection = factory.createConnection();
        connection.start();
        // 生成session, 参数1true,开启事务,必须commit确认消费,false时,自动应答
        // true + Session.AUTO_ACKNOWLEDGE : 不commit时不会确认消费
        // false + Session.AUTO_ACKNOWLEDGE : 接受到即确认消费
        // false + Session.CLIENT_ACKNOWLEDGE: 不自动确认,需要手动确认 message.acknowledge()确认消费
        // false + Session.DUPS_OK_ACKNOWLEDGE 不需要确认,也会自动消费

        // AUTO_ACKNOWLEDGE:自动确认模式。
        // DUPS_OK_ACKNOWLEDGE:允许确认模式的副本。接收应用程序来处理在会话对象的方法调用返回的消息后会收到一条确认消息,并允许重复确认。
        // CLIENT_ACKNOWLEDGE 客户端手动确认
        // 重要: true时将忽略b值,自动被设为SESSION_TRANSACTED

        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 操作目标队列或主题 queue("TestQueue") 或 session.createTopic("TestTopic")
        Destination destination = session.createQueue("TestQueue");
        // 生成消费者,同一队列中两个消息者消费不同属性的消息
        MessageConsumer consumer1 = session.createConsumer(destination, "group='1'");
        MessageConsumer consumer2 = session.createConsumer(destination, "group='0'");
        // 方式1,使用consumer.receive()
// ObjectMessage message = (ObjectMessage)consumer.receive();
// if (message != null) { 
   
// String messageString = (String)message.getObject();
// System.out.println("Receive1 : " + messageString);
// }
        // 方式2:注册一个listener
        consumer1.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                Object object = null;
                try {
                    object = ((ObjectMessage) message).getObject();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                System.out.println(i ++  + "Receive2-1 : " + (String)object);
            }
        });

        consumer2.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                Object object = null;
                try {
                    object = ((ObjectMessage) message).getObject();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                System.out.println(i ++ + "Receive2-2 : " + (String)object);
            }
        });


// session.close();
// connection.close();
    }
}

今天文章到此就结束了,感谢您的阅读,Java架构师必看祝您升职加薪,年年好运。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-012,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • queue sender
  • receiver
相关产品与服务
消息队列 CMQ
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档