前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Springboot+Rabbitmq全手动批量创建队列以及批量指定消费者测试性

Springboot+Rabbitmq全手动批量创建队列以及批量指定消费者测试性

作者头像
名字是乱打的
发布2021-12-24 09:19:40
2.5K0
发布2021-12-24 09:19:40
举报
文章被收录于专栏:软件工程

需求

做消息中心要求测试以及预估一下rabbitmq的消费能力,需求是创建1千个队列,每个队列1000条数据合计100W数据,然后每个队列指定一定数量的消费者进行消费,看下吞吐量,监控下cpu以及内存变化.

手动创建1000队列以及push1000消息

代码语言:javascript
复制
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestApplication.class)
@RequiredArgsConstructor
public class Queue_Producer {


    @Autowired
    RabbitTemplate rabbitTemplate;


    /**
     * 手动创建队列以及push消息
     * @throws IOException
     */
    @Test
    public void createQueue() throws IOException {
      //获取工厂创建连接
        final ConnectionFactory factory = rabbitTemplate.getConnectionFactory();
        final Connection connection = factory.createConnection();
        for (int i = 1; i < 1000; i++) {
            String currQueueName = "test_quque_" + i;
           //创建队列  
          connection.createChannel(false).queueDeclare(currQueueName, true, false, false, null);

            AtomicInteger current = new AtomicInteger(0);
            while (current.get() < 1000) {
                current.incrementAndGet();
               //对每个队列进行push消息
               rabbitTemplate.convertAndSend(currQueueName, "当前进入队列" + i + "的为" + current.get());
                System.out.println("当前进入队列" + currQueueName + "的为" + current.get());
            }
        }
        connection.close();
    }
}
指定消费队列以及控制并发消费者

这里指定了1000个队列的队列名,并设置每个队列并发消费者数量

代码语言:javascript
复制
@Component
@RequiredArgsConstructor
@Slf4j
public class BatchQueueCustomer_demo {
    private final RabbitTemplate rabbitMq;


    /**
     * 手动批量创建队列的消费者
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer dealQueue(){
        final ExecutorService pool = Executors.newCachedThreadPool();
        List<String> queueNames=new LinkedList<>();
        for (int i = 1; i <1000 ; i++) {
            queueNames.add("test_quque_"+i);
        }

        final ConnectionFactory factory = rabbitMq.getConnectionFactory();
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
        //加入所有的待消费队列到监听器内
        container.setQueueNames(queueNames.toArray(new String[queueNames.size()]));
        container.setExposeListenerChannel(true);
        //每个队列的消费者个数
        container.setConcurrentConsumers(1);
        //设置最大消费者个数---当消息堆积过多时候我们这里会自动增加消费者
        container.setMaxConcurrentConsumers(3);
        //设置每个消费者获取的最大的消息数量
        container.setPrefetchCount(1);
        //设置确认模式为手工确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //监听处理
        container.setMessageListener(new MqConsumerHandle());
        return container;
    }

    /**
     * 消息消费处理类
     */
    @Component
    public class MqConsumerHandle implements ChannelAwareMessageListener {

        @Override
        public void onMessage(Message message, Channel channel) {

            try {
                //处理消息
                System.out.println(message);


                //成功返回码 该消息的index
                //是否批量. true:将一次性ack所有小于deliveryTag的消息。
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                System.out.println("mq消息转发具体实现异常"+e);
                try {
                    //deliveryTag:该消息的index。
                    //multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
                    //requeue:被拒绝的是否重新入队列。
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

}

接下里就是控制变量了,主要测试系统消费能力,可以分别控制队列数量,以及每个队列的并发消费数量,也可以不同队列按权重灵活控制的并发消费数量,看看我们的系统能力

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/6/22 下,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求
    • 手动创建1000队列以及push1000消息
      • 指定消费队列以及控制并发消费者
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档