前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 生产者 rebalence 原理

RocketMQ 生产者 rebalence 原理

作者头像
java404
发布2018-12-24 13:21:00
7590
发布2018-12-24 13:21:00
举报
文章被收录于专栏:java 成神之路java 成神之路

概述

生产者 producer 在发送消息的时候,每个消息发送到 broker 只存储在某一个 quene 上。那么 producer 是怎么选择 queue 呢?

下面主要通过以下5种方式进行分析。 1、自定义 MessageQueueSelector 实现 2、SelectMessageQueueByHash hash 选择 queue。 3、 SelectMessageQueueByRandom 随机选择 queue。 4、 SelectMessageQueueByMachineRoom 机房选择queue。 5、默认发送队列选择实现

1、自定义 MessageQueueSelector 实现

下面这个示例是 rocketmq 官网上的一个示例。

代码语言:javascript
复制
public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

从示例中可以看到 producer.send(msg, new MessageQueueSelector(){}, orderId) 在发送的时候 自定义了一个 MessageQueueSelector。

MessageQueueSelector 的 selelct(List<MessageQueue> mqs, Message msg, Object arg) 方法中有三个参数。

  • List<MessageQueue> mqs :topic 中的所有 queue 的集合。
  • Message msg:发送的消息
  • Object arg:上面示例中 send 方法的第三个参数。

通过实现 select 方法,通过 arg 参数进行取模 mqs.size() 进行选择队列。

RocketMQ 已实现的 MessageQueueSelector

rocketmq 源码中已经提供了几种 MessageQueueSelector 的实现。如下图:

  • SelectMessageQueueByHash:通过 hash 进行选择 queue。
  • SelectMessageQueueByRandom:随机选择 queue。
  • SelectMessageQueueByMachineRoom:机房选择queue。
2、SelectMessageQueueByHash
代码语言:javascript
复制
public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

通过 arg 的 hash,通过 mqs.size() 进行取模,来选择要存储的队列。

3、SelectMessageQueueByRandom
代码语言:javascript
复制
public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = random.nextInt(mqs.size());
        return mqs.get(value);
    }
}

随机产生一个小于等于 mqs.size() 的随机正整数,来选择要存储的队列。

4、SelectMessageQueueByMachineRoom
代码语言:javascript
复制
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;
    }

    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}

这个未实现,还是要通过自己的场景进行实现。

5、默认是轮询进行发送消息

如果直接调用 SendResult send(final Message msg) 方法,RocketMQ 是如何选择队列的呢?

代码语言:javascript
复制
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

1、int index = tpInfo.getSendWhichQueue().getAndIncrement();获取 一个自增的index。 2、然后 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); 进行选择一个 queue。

通过上面的代码可以看出,默认是通过轮询的方式进行选择发送队列的。

ThreadLocalIndex 实现

代码语言:javascript
复制
public class ThreadLocalIndex {
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }

        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;

        this.threadLocalIndex.set(index);
        return index;
    }

    @Override
    public String toString() {
        return "ThreadLocalIndex{" +
            "threadLocalIndex=" + threadLocalIndex.get() +
            '}';
    }
}

从 getAndIncrement() 方法中,可以看出。 为每个线程分配一个随机数,然后每次调用都自增 1。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 1、自定义 MessageQueueSelector 实现
  • RocketMQ 已实现的 MessageQueueSelector
    • 2、SelectMessageQueueByHash
      • 3、SelectMessageQueueByRandom
        • 4、SelectMessageQueueByMachineRoom
        • 5、默认是轮询进行发送消息
        • ThreadLocalIndex 实现
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档