前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rabbit MQ 工作模式 Work Queues

Rabbit MQ 工作模式 Work Queues

作者头像
收心
发布2022-01-14 09:22:26
2570
发布2022-01-14 09:22:26
举报
文章被收录于专栏:Java实战博客

官网地址:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。

上面是官网的介绍,什么意思呢?

就是原来的简单工作模式

P 生产的数据 都给C ,C是个废物,扛不住那么大的压力。怕C崩溃了,弄C1、C2,他俩轮询争抢,就避免了单个C 过大的压力

什么意思呢,请看实际效果

就是2个服务平分,怕一个服务吃不了那么多压力,分担一下。

如何实现呢?

Maven代码 见 简单模式

https://cloud.tencent.com/developer/article/1934210

先分析逻辑,再写代码

整体除了 变了个队列名称为 work_queues,其他:

生产者对比简单模式变化,啥也没变,只是循环发送消息

消费者 注释了一些没用的输出,啥也没变。

见代码吧。

生产者

代码语言:javascript
复制
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WordQueuesprovider {
    public static void main(String[] args) throws IOException, TimeoutException {

        //记得刷新Maven  简单模式   没有交换机,但会用到默认的交换机


        //1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2 设置参数
        factory.setHost("118.31.127.248");  //不设置 就为 127.0.0.0.1
        factory.setPort(5672);              //不设置 就为 5672
        factory.setVirtualHost("/govbuy");  //不设置 就为默认虚拟机 /
        factory.setUsername("zanglikun");   //不设置 就是默认 guest
        factory.setPassword("zanglikun");   //不设置 就是默认 guest

        //3 创建连接 Connection
        Connection connection = factory.newConnection();

        //4 获取channl
        Channel channel = connection.createChannel();

        //5 创建队列 Queue  如果没有叫hello_word的队列,会自动创建
        /*
        参数:
            1: queue 队列名称
            2: durable 是否持久化  持久化到erlang自带的数据库中 重启 数据依旧存在
            3: exclusive 是否独占 只允许一个消费者监听这个队列  2 当connection时,是否删除队列 一般为flase
            4: autodelete 是否自动删除 当没有消费者,会自动删除
            5: arguement 参数:如何删除队列的参数
         */
        channel.queueDeclare("work_queques",true,false,false,null);

        //6 发送消息到
        /*
        参数:
            1:exchange 交换机名称。简单模式,会使用默认的
            2:routingKey 路由名称
            3:props 配置信息
            4:body 真实发送的数据
         */

        String Body = "Hello Rabbit MQ";

        for (int i = 0; i < 100; i++) {
            //简单模式 没有交换机,所以 路由 与 队列名称一样
            channel.basicPublish("","work_queques",null,("第"+i+"条"+Body).getBytes());
        }

        //7 释放资源
        channel.close();
        connection.close();

    }
}

消费者(同样的代码 创建2个消费者,目的是 启动,让他俩轮询争)

代码语言:javascript
复制
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WordQueuesconsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //记得刷新Maven  简单模式   没有交换机,但会用到默认的交换机


        //1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2 设置参数
        factory.setHost("118.31.127.248");  //不设置 就为 127.0.0.0.1
        factory.setPort(5672);              //不设置 就为 5672
        factory.setVirtualHost("/govbuy");  //不设置 就为默认虚拟机 /
        factory.setUsername("zanglikun");   //不设置 就是默认 guest
        factory.setPassword("zanglikun");   //不设置 就是默认 guest

        //3 创建连接 Connection
        Connection connection = factory.newConnection();

        //4 获取channl
        Channel channel = connection.createChannel();


        //5 此方法不需要参数 是添加方法块{} 然后Alt + Inster --> Override Methords 生成的
        Consumer consumer = new DefaultConsumer(channel){
            // 这是一个回调方法 ,当收到消息后,会自动执行该方法。
            /**
             *
             * @param consumerTag 标识
             * @param envelope  获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body  真实的数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //System.out.println("接收时间是:"+System.currentTimeMillis());
                //System.out.println("consumerTag:"+consumerTag);
                //System.out.println("Exchange:"+envelope.getExchange());
                //System.out.println("RoutingKey:"+envelope.getRoutingKey());
                //System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                //System.out.println();
            }
        };


        //6 获取消息
        /*
            参数
                1:queue 队列名称
                2:Auto ack 是否自动确认
                3:callback 回调对象

         */
        channel.basicConsume("work_queques",true,consumer);


        // 消费者 不需要关闭连接,因为需要监听MQ。

    }
}

启动效果 就是 先启动2个 消费者,让他俩等着,在启动生产者,生产数据。

当然了 2个消费者 同时启动,是轮询,如果先启动一个消费者,就启动一个生产者后,在启动另一个消费者,是什么情况呢?

猜想:一开始是消费者1 不停的消费,然后消费者2进入,就开始轮询了。

试试

把生产者的循环 写死

代码图

开始测试

果然是的,是不是巧合呢,再试试10回,确认下。哈哈,我不测试了。有兴趣的老哥,你自己试试。我姑且默认他是这样的。

特殊说明:

解决问题的光鲜,藏着磕Bug的痛苦。

万物皆入轮回,谁也躲不掉!

以上文章,均是我实际操作,写出来的笔记资料,不会出现全文盗用别人文章!烦请各位,请勿直接盗用!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 生产者
  • 消费者(同样的代码 创建2个消费者,目的是 启动,让他俩轮询争)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档