前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ学习系列教程三:快速入门

RabbitMQ学习系列教程三:快速入门

原创
作者头像
凯哥Java
修改2019-07-12 10:09:35
4270
修改2019-07-12 10:09:35
举报
文章被收录于专栏:凯哥Java

快速入门:消息的生产者和消费者

生产者的代码步骤:

1:获取到连接的工厂 ConnectionFactory

2:从工厂中获取到一个连接:connection

3:重建一个数据通信的通道,可以发送和接收消息对象:channel

4:通过channel发送消息

5:关闭流

代码编写:

代码语言:javascript
复制
public class Procuder {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1:创建一个connectioFactory工厂对象,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置ip 端口 vhost等
        connectionFactory.setHost("192.168.31.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2:通过工厂对象获取到connection对象
        Connection connection = connectionFactory.newConnection();

        //3:通过connection对象获取到一个消息通信的通道 channel
        Channel channel = connection.createChannel();
        // 4:通过channel发送数据
        /**
         * 参数说明:
         * exchange: 数据路由 routingKey: props: 消息描述body:消息体。字节数组
         */

        for(int i = 0;i<5;i++){
            String mst = "hi Rabbit mq!"+i;
            channel.basicPublish("","mytest001",null,mst.getBytes());
        }
        System.out.println("===>>>生产者发送消息完成。。。");
         //5:关闭流
        channel.close();
        connection.close();

    }
}

消费者的代码步骤:

前三步是一样的。

4:声明一个队列

5:创建一个消费者

6:设置channel

7:获取消息

代码如下:

代码语言:javascript
复制
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1:创建一个connectioFactory工厂对象,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置ip 端口 vhost等
        connectionFactory.setHost("192.168.31.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2:通过工厂对象获取到connection对象
        Connection connection = connectionFactory.newConnection();

        //3:公共connection对象获取到一个消息通信的通道 channel
        Channel channel = connection.createChannel();

        //4:声明(创建)一个队列
        String queueName = "mytest001"; //这里可以使用routingKey
        /**
         *  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         Map<String, Object> arguments) throws IOException;
         */
        channel.queueDeclare(queueName,true,false,false,null);
        //5:创建消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //6:设置channel
        //    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
        /*
        * 参数说明:  queue:队列名称 autoAck:是否自动签收 consumer:消费者对象
         */
        channel.basicConsume(queueName,true,consumer);
        System.out.println("===>>消费者开始处理消息。");
        while(true){
            //7 获取消息
            QueueingConsumer.Delivery delivery =  consumer.nextDelivery();;
            String msg = new String(delivery.getBody());
            System.err.println("消费端: " + msg);
        }
    }
}

运行测试:

生产者:

启动消费者:

接着我们通过浏览器查看管理页面:

查看channel:

在queues中:

我们发现多了个channel和多了个queues。

下节预告:下节我们将讲解重要对象之:exchange 交换机

本文来源:www.kaigejava.com

凯哥公众号:凯哥Java(kaigejava)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档