做消息中心要求测试以及预估一下rabbitmq的消费能力,需求是创建1千个队列,每个队列1000条数据合计100W数据,然后每个队列指定一定数量的消费者进行消费,看下吞吐量,监控下cpu以及内存变化.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestApplication.class)
@RequiredArgsConstructor
public class Queue_Producer {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 手动创建队列以及push消息
* @throws IOException
*/
@Test
public void createQueue() throws IOException {
//获取工厂创建连接
final ConnectionFactory factory = rabbitTemplate.getConnectionFactory();
final Connection connection = factory.createConnection();
for (int i = 1; i < 1000; i++) {
String currQueueName = "test_quque_" + i;
//创建队列
connection.createChannel(false).queueDeclare(currQueueName, true, false, false, null);
AtomicInteger current = new AtomicInteger(0);
while (current.get() < 1000) {
current.incrementAndGet();
//对每个队列进行push消息
rabbitTemplate.convertAndSend(currQueueName, "当前进入队列" + i + "的为" + current.get());
System.out.println("当前进入队列" + currQueueName + "的为" + current.get());
}
}
connection.close();
}
}
这里指定了1000个队列的队列名,并设置每个队列并发消费者数量
@Component
@RequiredArgsConstructor
@Slf4j
public class BatchQueueCustomer_demo {
private final RabbitTemplate rabbitMq;
/**
* 手动批量创建队列的消费者
* @return
*/
@Bean
public SimpleMessageListenerContainer dealQueue(){
final ExecutorService pool = Executors.newCachedThreadPool();
List<String> queueNames=new LinkedList<>();
for (int i = 1; i <1000 ; i++) {
queueNames.add("test_quque_"+i);
}
final ConnectionFactory factory = rabbitMq.getConnectionFactory();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
//加入所有的待消费队列到监听器内
container.setQueueNames(queueNames.toArray(new String[queueNames.size()]));
container.setExposeListenerChannel(true);
//每个队列的消费者个数
container.setConcurrentConsumers(1);
//设置最大消费者个数---当消息堆积过多时候我们这里会自动增加消费者
container.setMaxConcurrentConsumers(3);
//设置每个消费者获取的最大的消息数量
container.setPrefetchCount(1);
//设置确认模式为手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//监听处理
container.setMessageListener(new MqConsumerHandle());
return container;
}
/**
* 消息消费处理类
*/
@Component
public class MqConsumerHandle implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) {
try {
//处理消息
System.out.println(message);
//成功返回码 该消息的index
//是否批量. true:将一次性ack所有小于deliveryTag的消息。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("mq消息转发具体实现异常"+e);
try {
//deliveryTag:该消息的index。
//multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
//requeue:被拒绝的是否重新入队列。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
接下里就是控制变量了,主要测试系统消费能力,可以分别控制队列数量,以及每个队列的并发消费数量,也可以不同队列按权重灵活控制的并发消费数量,看看我们的系统能力