官网地址:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
上面是官网的介绍,什么意思呢?
就是原来的简单工作模式
P 生产的数据 都给C ,C是个废物,扛不住那么大的压力。怕C崩溃了,弄C1、C2,他俩轮询争抢,就避免了单个C 过大的压力
什么意思呢,请看实际效果
就是2个服务平分,怕一个服务吃不了那么多压力,分担一下。
如何实现呢?
Maven代码 见 简单模式
https://cloud.tencent.com/developer/article/1934210
先分析逻辑,再写代码
整体除了 变了个队列名称为 work_queues,其他:
生产者对比简单模式变化,啥也没变,只是循环发送消息
消费者 注释了一些没用的输出,啥也没变。
见代码吧。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WordQueuesprovider {
public static void main(String[] args) throws IOException, TimeoutException {
//记得刷新Maven 简单模式 没有交换机,但会用到默认的交换机
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
factory.setHost("118.31.127.248"); //不设置 就为 127.0.0.0.1
factory.setPort(5672); //不设置 就为 5672
factory.setVirtualHost("/govbuy"); //不设置 就为默认虚拟机 /
factory.setUsername("zanglikun"); //不设置 就是默认 guest
factory.setPassword("zanglikun"); //不设置 就是默认 guest
//3 创建连接 Connection
Connection connection = factory.newConnection();
//4 获取channl
Channel channel = connection.createChannel();
//5 创建队列 Queue 如果没有叫hello_word的队列,会自动创建
/*
参数:
1: queue 队列名称
2: durable 是否持久化 持久化到erlang自带的数据库中 重启 数据依旧存在
3: exclusive 是否独占 只允许一个消费者监听这个队列 2 当connection时,是否删除队列 一般为flase
4: autodelete 是否自动删除 当没有消费者,会自动删除
5: arguement 参数:如何删除队列的参数
*/
channel.queueDeclare("work_queques",true,false,false,null);
//6 发送消息到
/*
参数:
1:exchange 交换机名称。简单模式,会使用默认的
2:routingKey 路由名称
3:props 配置信息
4:body 真实发送的数据
*/
String Body = "Hello Rabbit MQ";
for (int i = 0; i < 100; i++) {
//简单模式 没有交换机,所以 路由 与 队列名称一样
channel.basicPublish("","work_queques",null,("第"+i+"条"+Body).getBytes());
}
//7 释放资源
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WordQueuesconsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//记得刷新Maven 简单模式 没有交换机,但会用到默认的交换机
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
factory.setHost("118.31.127.248"); //不设置 就为 127.0.0.0.1
factory.setPort(5672); //不设置 就为 5672
factory.setVirtualHost("/govbuy"); //不设置 就为默认虚拟机 /
factory.setUsername("zanglikun"); //不设置 就是默认 guest
factory.setPassword("zanglikun"); //不设置 就是默认 guest
//3 创建连接 Connection
Connection connection = factory.newConnection();
//4 获取channl
Channel channel = connection.createChannel();
//5 此方法不需要参数 是添加方法块{} 然后Alt + Inster --> Override Methords 生成的
Consumer consumer = new DefaultConsumer(channel){
// 这是一个回调方法 ,当收到消息后,会自动执行该方法。
/**
*
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 真实的数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("接收时间是:"+System.currentTimeMillis());
//System.out.println("consumerTag:"+consumerTag);
//System.out.println("Exchange:"+envelope.getExchange());
//System.out.println("RoutingKey:"+envelope.getRoutingKey());
//System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
//System.out.println();
}
};
//6 获取消息
/*
参数
1:queue 队列名称
2:Auto ack 是否自动确认
3:callback 回调对象
*/
channel.basicConsume("work_queques",true,consumer);
// 消费者 不需要关闭连接,因为需要监听MQ。
}
}
启动效果 就是 先启动2个 消费者,让他俩等着,在启动生产者,生产数据。
当然了 2个消费者 同时启动,是轮询,如果先启动一个消费者,就启动一个生产者后,在启动另一个消费者,是什么情况呢?
猜想:一开始是消费者1 不停的消费,然后消费者2进入,就开始轮询了。
试试
把生产者的循环 写死
代码图
开始测试
果然是的,是不是巧合呢,再试试10回,确认下。哈哈,我不测试了。有兴趣的老哥,你自己试试。我姑且默认他是这样的。
特殊说明:
解决问题的光鲜,藏着磕Bug的痛苦。
万物皆入轮回,谁也躲不掉!
以上文章,均是我实际操作,写出来的笔记资料,不会出现全文盗用别人文章!烦请各位,请勿直接盗用!