专栏首页码匠的流水账聊聊rocketmq的sendHeartbeatToAllBrokerWithLock

聊聊rocketmq的sendHeartbeatToAllBrokerWithLock

本文主要研究一下rocketmq的sendHeartbeatToAllBrokerWithLock

sendHeartbeatToAllBrokerWithLock

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();

    //......

    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed.");
        }
    }

    private void sendHeartbeatToAllBroker() {
        final HeartbeatData heartbeatData = this.prepareHeartbeatData();
        final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
        final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
        if (producerEmpty && consumerEmpty) {
            log.warn("sending heartbeat, but no consumer and no producer");
            return;
        }

        if (!this.brokerAddrTable.isEmpty()) {
            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
            Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, HashMap<Long, String>> entry = it.next();
                String brokerName = entry.getKey();
                HashMap<Long, String> oneTable = entry.getValue();
                if (oneTable != null) {
                    for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                        Long id = entry1.getKey();
                        String addr = entry1.getValue();
                        if (addr != null) {
                            if (consumerEmpty) {
                                if (id != MixAll.MASTER_ID)
                                    continue;
                            }

                            try {
                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                                if (!this.brokerVersionTable.containsKey(brokerName)) {
                                    this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                }
                                this.brokerVersionTable.get(brokerName).put(addr, version);
                                if (times % 20 == 0) {
                                    log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                    log.info(heartbeatData.toString());
                                }
                            } catch (Exception e) {
                                if (this.isBrokerInNameServer(addr)) {
                                    log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                                } else {
                                    log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                        id, addr, e);
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    private void uploadFilterClassSource() {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> next = it.next();
            MQConsumerInner consumer = next.getValue();
            if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
                Set<SubscriptionData> subscriptions = consumer.subscriptions();
                for (SubscriptionData sub : subscriptions) {
                    if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
                        final String consumerGroup = consumer.groupName();
                        final String className = sub.getSubString();
                        final String topic = sub.getTopic();
                        final String filterClassSource = sub.getFilterClassSource();
                        try {
                            this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
                        } catch (Exception e) {
                            log.error("uploadFilterClassToAllFilterServer Exception", e);
                        }
                    }
                }
            }
        }
    }

    //......
}
  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先会执行lockHeartbeat.tryLock(),然后执行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最后在finally中执行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法会通过prepareHeartbeatData构建heartbeatData,之后遍历brokerAddrTable,执行mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000)
  • uploadFilterClassSource方法会遍历consumerTable,对于consumeType为ConsumeType.CONSUME_PASSIVELY类型的,会遍历其subscriptions,对于sub.isClassFilterMode()且sub.getFilterClassSource()不为null的执行uploadFilterClassToAllFilterServer方法

sendHearbeat

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

    private final static InternalLogger log = ClientLogger.getLog();
    private static boolean sendSmartMsg =
        Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));

    static {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    }

    //......

    public int sendHearbeat(
        final String addr,
        final HeartbeatData heartbeatData,
        final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return response.getVersion();
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    //......
}
  • MQClientAPIImpl的sendHearbeat方法首先构建RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null),之后通过remotingClient.invokeSync(addr, request, timeoutMillis)执行请求,成功的话返回response.getVersion(),否则抛出MQBrokerException(response.getCode(), response.getRemark())

processRequest

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java

public class ClientManageProcessor implements NettyRequestProcessor {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final BrokerController brokerController;

    public ClientManageProcessor(final BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    //......

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT:
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT:
                return this.unregisterClient(ctx, request);
            case RequestCode.CHECK_CLIENT_CONFIG:
                return this.checkClientConfig(ctx, request);
            default:
                break;
        }
        return null;
    }

    public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
            ctx.channel(),
            heartbeatData.getClientID(),
            request.getLanguage(),
            request.getVersion()
        );

        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                    data.getGroupName());
            boolean isNotifyConsumerIdsChangedEnable = true;
            if (null != subscriptionGroupConfig) {
                isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                int topicSysFlag = 0;
                if (data.isUnitMode()) {
                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                }
                String newTopic = MixAll.getRetryTopic(data.getGroupName());
                this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                    newTopic,
                    subscriptionGroupConfig.getRetryQueueNums(),
                    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
            }

            boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(),
                clientChannelInfo,
                data.getConsumeType(),
                data.getMessageModel(),
                data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(),
                isNotifyConsumerIdsChangedEnable
            );

            if (changed) {
                log.info("registerConsumer info changed {} {}",
                    data.toString(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                );
            }
        }

        for (ProducerData data : heartbeatData.getProducerDataSet()) {
            this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                clientChannelInfo);
        }
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    //......
}
  • processRequest方法对于RequestCode.HEART_BEAT会执行heartBeat(ctx, request)方法,该方法会解析body为heartbeatData,之后遍历heartbeatData的consumerData及producerData做不同处理,最后返回处理结果
  • 对于consumerData,会先取出subscriptionGroupConfig,对于config不为null的会执行brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod方法;最后执行brokerController.getConsumerManager().registerConsumer方法
  • 对于producerData,则执行brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo)方法

小结

  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先会执行lockHeartbeat.tryLock(),然后执行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最后在finally中执行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法会通过prepareHeartbeatData构建heartbeatData,之后遍历brokerAddrTable,执行mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000)
  • uploadFilterClassSource方法会遍历consumerTable,对于consumeType为ConsumeType.CONSUME_PASSIVELY类型的,会遍历其subscriptions,对于sub.isClassFilterMode()且sub.getFilterClassSource()不为null的执行uploadFilterClassToAllFilterServer方法

doc

  • MQClientInstance

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-12-07

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊rocketmq的sendHeartbeatToAllBrokerWithLock

    本文主要研究一下rocketmq的sendHeartbeatToAllBrokerWithLock

    codecraft
  • 聊聊rocketmq的registerProducer与unregisterProducer

    本文主要研究一下rocketmq的registerProducer与unregisterProducer

    codecraft
  • 聊聊rocketmq的ScheduleMessageService

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/...

    codecraft
  • 聊聊rocketmq的sendHeartbeatToAllBrokerWithLock

    本文主要研究一下rocketmq的sendHeartbeatToAllBrokerWithLock

    codecraft
  • 自己动手作图深入理解二叉树、满二叉树及完全二叉树

    二叉树是数据结构中的重点,也是难点。二叉树是一种非线性结构,比数组、栈、队列等线性结构相比复杂度更高,想要做到心中有“树”,需要自己动手画图、观察、思考,才能领...

    智慧zhuhuix
  • 急 | Linux再曝安全漏洞Bash 比心血还严重

    Linux再曝安全漏洞Bash 比心血还严重 安全更新: 今天刚刚爆出Bash安全漏洞,SSH bash紧急安全补丁!重要!测试是否存在漏洞,执行以下命令:...

    小小科
  • 微信小程序- 选择器 合并时间和日期

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    叉叉敌
  • vue原理及其手撸(一)

    工作比的是内力,而不是一味的模仿。如果你深入弄懂了造飞机的流程,给坦克造个拧螺丝钉还是绰绰有余的。靠卖关键零件赚钱也够了。但如果你说飞机坦克都能组装,就是不会自...

    一粒小麦
  • springcloud 服务间通讯方式 Ribbon

    参考:https://blog.csdn.net/madmk/article/details/76431486

    DencyCheng
  • renren-fast后端源码参考-配置和对应工具

    其中gson对象是来自qiniu-java-sdk,不需要的可以剔除或者一般国内就用fastjson

    老梁

扫码关注云+社区

领取腾讯云代金券