前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊carrera的BrokerMonitor

聊聊carrera的BrokerMonitor

原创
作者头像
code4it
修改2020-01-13 10:56:21
3010
修改2020-01-13 10:56:21
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下carrera的BrokerMonitor

BrokerMonitor

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/BrokerMonitor.java

代码语言:javascript
复制
public class BrokerMonitor extends BaseConfigMonitor {
​
    private final static Logger LOGGER = LoggerFactory.getLogger(BrokerMonitor.class);
​
    private ExecutorService executor = ExecutorUtils.newFixedThreadPool(100, "BrokerMonitor", 200);
​
    private BrokerMonitorItem monitorItem = null;
​
    @Override
    protected void initMonitor(String broker, BrokerConfig brokerConfig) throws Exception {
        doMonitor(broker, brokerConfig);
    }
​
    public BrokerMonitor(MonitorConfig monitorConfig) {
        super("Broker", monitorConfig);
    }
​
    private void doMonitor(String broker, BrokerConfig config) throws InterruptedException {
        if (monitorItem != null) {
            // stop first.
            LOGGER.info("Stop old monitor broker: {}", broker);
            monitorItem.stop();
        }
​
        BrokerMonitorItem item = new BrokerMonitorItem(broker, config);
        try {
            item.start();
        } catch (Exception e) {
            LOGGER.error("broker monitor start exception, broker=" + broker, e);
        }
    }
​
    @Override
    public void shutdown() {
        ExecutorUtils.shutdown(executor);
        monitorItem.stop();
        super.shutdown();
    }
​
    //......
}
  • BrokerMonitor继承了BaseConfigMonitor,其initMonitor方法执行doMonitor,其shutdown会关闭executor,同时执行monitorItem.stop();doMonitor方法判断monitorItem不为null的话,先执行monitorItem.stop(),之后创建BrokerMonitorItem,执行其start方法

BrokerMonitorItem

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/BrokerMonitor.java

代码语言:javascript
复制
    class BrokerMonitorItem {
        private String broker;
        private BrokerConfig config;
        private volatile boolean isRunning = false;
        private ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
​
        public BrokerMonitorItem(String broker, BrokerConfig config) {
            this.broker = broker;
            this.config = config;
        }
​
        public void start() {
            isRunning = true;
​
            scheduledExecutor.submit(() -> {
                while (isRunning) {
​
                    monitorNamesvr();
                    monitorBroker();
​
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    LOGGER.info("broker<{}> [Active]", broker);
                }
            });
        }
​
        public void stop() {
            isRunning = false;
            ExecutorUtils.shutdown(scheduledExecutor);
        }
​
        private void monitorBroker() {
            if (MapUtils.isEmpty(config.getBrokers()) || StringUtils.isBlank(config.getBrokerClusterAddrs())) {
​
                return;
            }
​
            String nameSvr = config.getBrokerClusterAddrs().split(";")[0]; // use first namesvr.
            for (Map.Entry<String, Set<String>> entry : config.getBrokers().entrySet()) {
                String master = entry.getKey();
                Set<String> slaves = entry.getValue();
                executor.execute(() -> {
                    int j = 0;
                    for (; j < 2; ++j) {
                        try {
                            long masterOffset = Utils.checkReceive(broker, nameSvr, master);
                            if (masterOffset <= 0) {
                                continue;
                            }
                            Utils.checkSend(broker, nameSvr, master);
                            if (CollectionUtils.isNotEmpty(slaves)) {
                                for (String slave : slaves) {
                                    long slaveOffset = Utils.checkReceive(broker, nameSvr, slave);
                                    LOGGER.info("ReplicaDelayCheck broker={}, address={}->{}, masterOffset={}, slaveOffset={}, delayNum={}", broker, master.split(":")[0], slave.split(":")[0], masterOffset, slaveOffset, (masterOffset - slaveOffset));
​
                                    if (slaveOffset <= 0) {
                                        continue;
                                    }
                                    if (masterOffset - slaveOffset > 60) {
                                        LOGGER.error(String.format("[AlarmReplicaDelayErr] broker=%s, address=%s->%s, delayNum=%s", broker, master.split(":")[0], slave.split(":")[0], (masterOffset - slaveOffset)));
                                    }
                                }
                            }
                            break;
                        } catch (Throwable e) {
                            LOGGER.error("broker check broker exception, broker=" + broker, e);
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    }
                    if (j == 2) {
                        LOGGER.error(String.format("[AlarmCheckBrokerErr] broker=%s, namesvr=%s", broker, nameSvr));
                    }
                });
            }
        }
​
        private void monitorNamesvr() {
            if (StringUtils.isBlank(config.getBrokerClusterAddrs())) {
                LOGGER.info("broker:{}, brokerClusterAddrs is empty", config.getBrokerCluster());
                return;
            }
​
            for (String nameSvr : config.getBrokerClusterAddrs().split(";")) {
                executor.execute(() -> {
                    int j = 0;
                    for (; j < 2; ++j) {
​
                        try {
                            Utils.checkNameSvr(nameSvr, broker);
                            LOGGER.info(String.format("[NameSvrCheck] broker=%s, namesvr=%s", broker, nameSvr));
                            break;
                        } catch (Throwable e) {
                            LOGGER.error("broker checkNameSvr exception, broker=" + broker + ", namesvr=" + nameSvr, e);
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            LOGGER.error("broker checkNameSvr Thread.sleep exception, broker=" + broker, e);
                        }
                    }
                    if (j == 2) {
                        LOGGER.error(String.format("[AlarmNameSvrErr] broker=%s, namesvr=%s", broker, nameSvr));
                    }
                });
            }
        }
    }
  • BrokerMonitorItem的start方法会异步执行一个runnable,该runnable会不断执行monitorNamesvr、monitorBroker方法;monitorNamesvr方法会遍历nameSvr执行Utils.checkNameSvr(nameSvr, broker);monitorBroker方法会遍历config.getBrokers().entrySet(),挨个执行Utils.checkReceive(broker, nameSvr, master)以及Utils.checkSend(broker, nameSvr, master),对于slaves执行Utils.checkReceive(broker, nameSvr, slave)

Utils

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/Utils.java

代码语言:javascript
复制
public class Utils {
    private static final Logger logger = LoggerFactory.getLogger(Utils.class);
​
    private static final Map<String, DefaultMQAdminExt> mqAdminExtMap = new ConcurrentHashMap<>();
    private static final Map<String, DefaultMQPullConsumer> nameSvrCheckMap = new ConcurrentHashMap<>();
    private static final Map<String, DefaultMQPullConsumer> brokerReceiveCheckMap = new ConcurrentHashMap<>();
    private static final Map<String, DefaultMQProducer> brokerSendCheckMap = new ConcurrentHashMap<>();
​
    //......
​
    public static void checkNameSvr(String nameSvr, String cluster) throws MQClientException, InterruptedException {
        getNameSvrCheckConsumer(nameSvr, cluster).getDefaultMQPullConsumerImpl().fetchPublishMessageQueues("SELF_TEST_TOPIC");
    }
​
    public static long checkReceive(String cluster, String nameSvr, String address)
            throws MQClientException, NoSuchFieldException, SecurityException, IllegalArgumentException,
            IllegalAccessException, InterruptedException, RemotingException, MQBrokerException {
​
        DefaultMQPullConsumer consumer = getReceiveCheckConsumer(nameSvr, cluster, address);
        Field f1 = DefaultMQPullConsumerImpl.class.getDeclaredField("mQClientFactory");
        f1.setAccessible(true);
​
        MQClientInstance instance = (MQClientInstance) f1.get(consumer.getDefaultMQPullConsumerImpl());
​
        Field f = MQClientInstance.class.getDeclaredField("brokerAddrTable");
        f.setAccessible(true);
​
        Field f2 = MQClientInstance.class.getDeclaredField("scheduledExecutorService");
        f2.setAccessible(true);
​
        ScheduledExecutorService service = (ScheduledExecutorService) f2.get(instance);
        service.shutdown();
        service.awaitTermination(1000, TimeUnit.SECONDS);
​
        ConcurrentHashMap<String, HashMap<Long, String>> map = (ConcurrentHashMap<String, HashMap<Long, String>>) f.get(instance);
        HashMap<Long, String> addresses = new HashMap<>();
        addresses.put(0L, address);
        map.put("rmqmonitor_" + address, addresses);
​
        MessageQueue queue = new MessageQueue("SELF_TEST_TOPIC", "rmqmonitor_" + address, 0);
​
        boolean pullOk = false;
        long maxOffset = -1;
        for (int i = 0; i < 2; ++i) {
            try {
                maxOffset = consumer.getDefaultMQPullConsumerImpl().maxOffset(queue);
                PullResult result = consumer.pull(queue, "*", maxOffset > 100 ? maxOffset - 10 : 0, 1);
                if (result.getPullStatus() == PullStatus.FOUND) {
                    pullOk = true;
                    break;
                } else if(result.getPullStatus() == PullStatus.NO_NEW_MSG) {
                    checkSend(cluster, nameSvr, address);
                    continue;
                }
​
                logger.warn("pull result failed, PullResult={}, cluster={}, namesvr={}, address={}", result, cluster, nameSvr, address);
            } catch (Throwable e) {
                logger.error("pull exception, cluster={}, namesvr={}, address={}", cluster, nameSvr, address, e);
            }
            Thread.sleep(1000);
        }
        if (!pullOk) {
            logger.error(String.format("[AlarmPullErr] cluster=%s, broker=%s", cluster, address));
        } else {
            logger.info("AlarmPullCheck cluster={}, broker={}", cluster, address);
        }
        return maxOffset;
    }
​
    public static void checkSend(String cluster, String nameSvr, String address) throws MQClientException, NoSuchFieldException,
            SecurityException, InterruptedException, IllegalArgumentException, IllegalAccessException, UnsupportedEncodingException, MQBrokerException, RemotingException {
​
        if (!isBrokerTopicWritable(cluster, nameSvr, address)) {
            return;
        }
​
        DefaultMQProducer producer = getSendCheckProducer(nameSvr, cluster, address);
        MQClientInstance instance = producer.getDefaultMQProducerImpl().getmQClientFactory();
        Field f = MQClientInstance.class.getDeclaredField("brokerAddrTable");
        f.setAccessible(true);
​
        Field f2 = MQClientInstance.class.getDeclaredField("scheduledExecutorService");
        f2.setAccessible(true);
​
        ScheduledExecutorService service = (ScheduledExecutorService) f2.get(instance);
        service.shutdown();
        service.awaitTermination(1000, TimeUnit.SECONDS);
​
        ConcurrentHashMap<String, HashMap<Long, String>> map = (ConcurrentHashMap<String, HashMap<Long, String>>) f
                .get(instance);
        HashMap<Long, String> addresses = new HashMap<>();
        addresses.put(0L, address);
        map.put("rmqmonitor_" + address, addresses);
​
        MessageQueue queue = new MessageQueue("SELF_TEST_TOPIC", "rmqmonitor_" + address, 0);
        boolean sendOk = false;
        SendResult sendResult = null;
        for (int i = 0; i < 2; i++) {
            try {
                Message msg = new Message("SELF_TEST_TOPIC", // topic
                        "TagA", // tag
                        ("Hello RocketMQ " + i).getBytes()// body
                );
                sendResult = producer.send(msg, queue);
                if (sendResult.getSendStatus() == SendStatus.SEND_OK || sendResult.getSendStatus() == SLAVE_NOT_AVAILABLE) {
                    sendOk = true;
                    break;
                }
​
                logger.warn("send result failed, SendResult={}, cluster={}, namesvr={}, address={}", sendResult, cluster, nameSvr, address);
            } catch (Exception e) {
                logger.error("send exception, cluster={}, namesvr={}, address={}", cluster, nameSvr, address, e);
            }
            Thread.sleep(1000);
        }
​
        // 报警
        if (!sendOk) {
            logger.error(String.format("[AlarmSendErr] cluster=%s, broker=%s, result=%s", cluster, address, sendResult == null ? "null" : sendResult.toString()));
        } else {
            logger.info("AlarmSendCheck cluster={}, broker={}, result={}", cluster, address, sendResult.toString());
        }
    }
​
    //......
}
  • checkNameSvr方法执行getNameSvrCheckConsumer(nameSvr,cluster).getDefaultMQPullConsumerImpl().fetchPublishMessageQueues方法;checkReceive方法主要是从SELF_TEST_TOPIC拉取数据;checkSend方法主要是给SELF_TEST_TOPIC发送数据

小结

BrokerMonitor继承了BaseConfigMonitor,其initMonitor方法执行doMonitor,其shutdown会关闭executor,同时执行monitorItem.stop();doMonitor方法判断monitorItem不为null的话,先执行monitorItem.stop(),之后创建BrokerMonitorItem,执行其start方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • BrokerMonitor
  • BrokerMonitorItem
  • Utils
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档