专栏首页java 成神之路RocketMQ 生产者 rebalence 原理

RocketMQ 生产者 rebalence 原理

概述

生产者 producer 在发送消息的时候,每个消息发送到 broker 只存储在某一个 quene 上。那么 producer 是怎么选择 queue 呢?

下面主要通过以下5种方式进行分析。 1、自定义 MessageQueueSelector 实现 2、SelectMessageQueueByHash hash 选择 queue。 3、 SelectMessageQueueByRandom 随机选择 queue。 4、 SelectMessageQueueByMachineRoom 机房选择queue。 5、默认发送队列选择实现

1、自定义 MessageQueueSelector 实现

下面这个示例是 rocketmq 官网上的一个示例。

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

从示例中可以看到 producer.send(msg, new MessageQueueSelector(){}, orderId) 在发送的时候 自定义了一个 MessageQueueSelector。

MessageQueueSelector 的 selelct(List<MessageQueue> mqs, Message msg, Object arg) 方法中有三个参数。

  • List<MessageQueue> mqs :topic 中的所有 queue 的集合。
  • Message msg:发送的消息
  • Object arg:上面示例中 send 方法的第三个参数。

通过实现 select 方法,通过 arg 参数进行取模 mqs.size() 进行选择队列。

RocketMQ 已实现的 MessageQueueSelector

rocketmq 源码中已经提供了几种 MessageQueueSelector 的实现。如下图:

  • SelectMessageQueueByHash:通过 hash 进行选择 queue。
  • SelectMessageQueueByRandom:随机选择 queue。
  • SelectMessageQueueByMachineRoom:机房选择queue。

2、SelectMessageQueueByHash

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

通过 arg 的 hash,通过 mqs.size() 进行取模,来选择要存储的队列。

3、SelectMessageQueueByRandom

public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = random.nextInt(mqs.size());
        return mqs.get(value);
    }
}

随机产生一个小于等于 mqs.size() 的随机正整数,来选择要存储的队列。

4、SelectMessageQueueByMachineRoom

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;
    }

    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}

这个未实现,还是要通过自己的场景进行实现。

5、默认是轮询进行发送消息

如果直接调用 SendResult send(final Message msg) 方法,RocketMQ 是如何选择队列的呢?

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

1、int index = tpInfo.getSendWhichQueue().getAndIncrement();获取 一个自增的index。 2、然后 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); 进行选择一个 queue。

通过上面的代码可以看出,默认是通过轮询的方式进行选择发送队列的。

ThreadLocalIndex 实现

public class ThreadLocalIndex {
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }

        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;

        this.threadLocalIndex.set(index);
        return index;
    }

    @Override
    public String toString() {
        return "ThreadLocalIndex{" +
            "threadLocalIndex=" + threadLocalIndex.get() +
            '}';
    }
}

从 getAndIncrement() 方法中,可以看出。 为每个线程分配一个随机数,然后每次调用都自增 1。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Socket 实现聊天功能

    java404
  • NIO 之 Channel

    java404
  • InetAddress 解析

    java404
  • Java14的新特性

    上面列出的是大方面的特性,除此之外还有一些api的更新及废弃,主要见JDK 14 Release Notes,这里举几个例子。

    codecraft
  • Java14的新特性

    上面列出的是大方面的特性,除此之外还有一些api的更新及废弃,主要见JDK 14 Release Notes,这里举几个例子。

    codecraft
  • 浅析类装载 顶

    [Loaded com.guanjian.Parent from file:/E:/classload/out/production/classload/] ...

    算法之名
  • 【设计模式】—— 迭代模式Iterator

      模式意图   提供一个方法按顺序遍历一个集合内的元素,而又不需要暴露该对象的内部表示。   应用场景   1 访问一个聚合的对象,而不需要暴露对象的内部表...

    用户1154259
  • [Redis] 分布式缓存中间件 Redis 之 分布式锁实战

    环境准备Redis 如何实现分布式锁线程不安全单机锁分布式锁代码实现Redisson 集成和源码分析Redisson 集成源码分析 `RedissonLock`...

    架构探险之道
  • 第六节:详细讲解Java中的装箱与拆箱及其字符串

    大家好,我是 Vic,今天给大家带来详细讲解Java中的装箱与拆箱及其字符串的概述,希望你们喜欢

    达达前端
  • LeetCode 402. 移掉K位数字(贪心,单调栈)

    给定一个以字符串表示的非负整数 num,移除这个数中的 k 位数字,使得剩下的数字最小。

    Michael阿明

扫码关注云+社区

领取腾讯云代金券