模式是一个生产者多个消费者模式,一个消息只能别一个消费者消费
package com.shi.work;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.shi.util.RabbitMqUtils;
/**
* work 模式开发
* @author SHF
* @version 创建时间:2018年7月3日 下午5:55:20
*/
public class WorkMQTest {
private final static String QUEUE_NAME = "work_queue";
/**
* 生产者
* @author SHF
* @version 创建时间:2018年7月3日 下午5:56:41
* @throws TimeoutException
* @throws IOException
* @throws InterruptedException
*/
@Test
public void send() throws IOException, TimeoutException, InterruptedException {
//1 获取链接及mq通道
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//2 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3 循环发送消息
for(int i = 0;i < 100 ; i++) {
//消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println( "[x] sent "+ message );
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
/**
* 消费者 1
* @author SHF
* @version 创建时间:2018年7月3日 下午6:09:19
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
@Test
public void recv1() throws IOException, TimeoutException, InterruptedException {
//1 获取链接及mq通道
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//2 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3 同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
//4 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//5 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
//6 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x1] Received '" + message + "'");
//休眠
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
/**
* 消费者2
* @author SHF
* @version 创建时间:2018年7月3日 下午6:09:46
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
@Test
public void recv2() throws IOException, TimeoutException, InterruptedException {
//1 获取链接及mq通道
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//2 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3 同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
//4 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//5 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
//6 获取消息
while(true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x2] Received '" + message + "'");
//休眠
Thread.sleep(1000);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}