前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 命令行工具源码结构解析

RocketMQ 命令行工具源码结构解析

作者头像
java404
发布2018-12-21 16:44:01
1.2K0
发布2018-12-21 16:44:01
举报
文章被收录于专栏:java 成神之路java 成神之路

概述

RocketMQ 提供有控制台及一系列控制台命令,用于管理员对主题,集群,broker 等信息的管理;

进入 RocketMQ 的bin 目录,可以看到 mqadmin 脚本文件。

执行 mqadmin 脚本显示如下:

显示了 mqadmin 命令支持的所有操作。

如果想具体查新某一个操作的详细命令,可以使用

mqadmin help 命令名称 比如:mqadmin help updateTopic

查看 mqadmin脚本

可以发现 mqadmin 的命令调用的是 tools 命令,设置的启动类为 org.apache.rocketmq.tools.command.MQAdminStartup 。

tools 模块结构

MQAdminStartup 启动类
代码语言:javascript
复制
public static void main(String[] args) {
    main0(args, null);
}

public static void main0(String[] args, RPCHook rpcHook) {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    //PackageConflictDetect.detectFastjson();

    initCommand();

    try {
        initLogback();
        switch (args.length) {
            case 0:
                printHelp();
                break;
            case 2:
                if (args[0].equals("help")) {
                    SubCommand cmd = findSubCommand(args[1]);
                    if (cmd != null) {
                        Options options = ServerUtil.buildCommandlineOptions(new Options());
                        options = cmd.buildCommandlineOptions(options);
                        if (options != null) {
                            ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
                        }
                    } else {
                        System.out.printf("The sub command %s not exist.%n", args[1]);
                    }
                    break;
                }
            case 1:
            default:
                SubCommand cmd = findSubCommand(args[0]);
                if (cmd != null) {
                    String[] subargs = parseSubArgs(args);

                    Options options = ServerUtil.buildCommandlineOptions(new Options());
                    final CommandLine commandLine =
                        ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
                            new PosixParser());
                    if (null == commandLine) {
                        return;
                    }

                    if (commandLine.hasOption('n')) {
                        String namesrvAddr = commandLine.getOptionValue('n');
                        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
                    }

                    cmd.execute(commandLine, options, rpcHook);
                } else {
                    System.out.printf("The sub command %s not exist.%n", args[0]);
               }
               break;
        }
    } catch (Exception e) {
         e.printStackTrace();
    }
}

1、首先调用initCommand() 方法加载所有的命令。 2、初始化日志 3、判断启动该类main 方法传入的参数。

  • 3.1 如果没有参数,则打印帮助信息。
  • 3.2 如果参数为2个,并且第一个是 help,第二个参数是initCommand() 加载的命令名称,则调用 ServerUtil.printCommandLineHelp() 方法打印指定命令的帮助信息。
  • 3.3 如果参赛为一个、或2个,并且第一个参数不为 help,或多个。并且第一个参赛为 initCommand() 加载的命令,则调用 该initCommand() 加载类中的 execute() 方法。
代码语言:javascript
复制
 cmd.execute(commandLine, options, rpcHook);
initCommand() 方法
代码语言:javascript
复制
public static void initCommand() {
    initCommand(new UpdateTopicSubCommand());
    initCommand(new DeleteTopicSubCommand());
    initCommand(new UpdateSubGroupSubCommand());
    initCommand(new DeleteSubscriptionGroupCommand());
    initCommand(new UpdateBrokerConfigSubCommand());
    initCommand(new UpdateTopicPermSubCommand());

    initCommand(new TopicRouteSubCommand());
    initCommand(new TopicStatusSubCommand());
    initCommand(new TopicClusterSubCommand());

    initCommand(new BrokerStatusSubCommand());
    initCommand(new QueryMsgByIdSubCommand());
    initCommand(new QueryMsgByKeySubCommand());
    initCommand(new QueryMsgByUniqueKeySubCommand());
    initCommand(new QueryMsgByOffsetSubCommand());
        
    initCommand(new PrintMessageSubCommand());
    initCommand(new PrintMessageByQueueCommand());
    initCommand(new SendMsgStatusCommand());
    initCommand(new BrokerConsumeStatsSubCommad());

    initCommand(new ProducerConnectionSubCommand());
    initCommand(new ConsumerConnectionSubCommand());
    initCommand(new ConsumerProgressSubCommand());
    initCommand(new ConsumerStatusSubCommand());     
    initCommand(new CloneGroupOffsetCommand());

    initCommand(new ClusterListSubCommand());
    initCommand(new TopicListSubCommand());

    initCommand(new UpdateKvConfigCommand());
    initCommand(new DeleteKvConfigCommand());

    initCommand(new WipeWritePermSubCommand());
    initCommand(new ResetOffsetByTimeCommand());

    initCommand(new UpdateOrderConfCommand());
    initCommand(new CleanExpiredCQSubCommand());
    initCommand(new CleanUnusedTopicCommand());

    initCommand(new StartMonitoringSubCommand());
    initCommand(new StatsAllSubCommand());

    initCommand(new AllocateMQSubCommand());

    initCommand(new CheckMsgSendRTCommand());
    initCommand(new CLusterSendMsgRTCommand());

    initCommand(new GetNamesrvConfigCommand());
    initCommand(new UpdateNamesrvConfigCommand());
    initCommand(new GetBrokerConfigCommand());

    initCommand(new QueryConsumeQueueCommand());
    initCommand(new SendMessageCommand());
    initCommand(new ConsumeMessageCommand());
}

丛类名中可以看出跟上面控制台 执行 mqadmin 指令输出命令的名字和这里的类名可以一一对应上。

initCommand 方法
代码语言:javascript
复制
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();

public static void initCommand(SubCommand command) {
    subCommandList.add(command);
}

把 init 加载到一个List集合中。

SubCommand 接口定义

所有的操作命令都实现了 SubCommand 接口

代码语言:javascript
复制
public interface SubCommand {
    String commandName();
    String commandDesc();
    Options buildCommandlineOptions(final Options options);
    void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException;
}

1、commandName() 命令名称 2、commandDesc()命令描述 3、buildCommandlineOptions() 构建命令解析器 4、execute() 执行命令

创建 Topic 源码分析

下面我们以创建 Topic 命令来分析实现原理。 updateTopic 命令是创建Topic的命令。

通过该命令可以查看 updateTopic 支持那么多参数。 下面我们来分析下 UpdateTopicPermSubCommand 类的实现

UpdateTopicPermSubCommand 解析
commandName()
代码语言:javascript
复制
@Override
public String commandName() {
    return "updateTopic";
}

命令名称

commandDesc()
代码语言:javascript
复制
@Override
public String commandDesc() {
    return "Update or create topic";
}

命令描述

buildCommandlineOptions()
代码语言:javascript
复制
@Override
public Options buildCommandlineOptions(Options options) {
    Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("c", "clusterName", true, "create topic to which cluster");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("t", "topic", true, "topic name");
    opt.setRequired(true);
    options.addOption(opt);

    opt = new Option("r", "readQueueNums", true, "set read queue nums");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("w", "writeQueueNums", true, "set write queue nums");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("o", "order", true, "set topic's order(true|false)");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("u", "unit", true, "is unit topic (true|false)");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false)");
    opt.setRequired(false);
    options.addOption(opt);

    return options;
}

从该方法中可以看到定义的命令及其说明。

execute() 方法
代码语言:javascript
复制
@Override
public void execute(final CommandLine commandLine, final Options options,
    RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
    defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

    try {
        TopicConfig topicConfig = new TopicConfig();
        topicConfig.setReadQueueNums(8);
        topicConfig.setWriteQueueNums(8);
        topicConfig.setTopicName(commandLine.getOptionValue('t').trim());

        // readQueueNums
        if (commandLine.hasOption('r')) {
            topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
        }

        // writeQueueNums
        if (commandLine.hasOption('w')) {
            topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
        }

        // perm
        if (commandLine.hasOption('p')) {
            topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
        }

        boolean isUnit = false;
        if (commandLine.hasOption('u')) {
            isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
        }

        boolean isCenterSync = false;
        if (commandLine.hasOption('s')) {
            isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
        }

        int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
        topicConfig.setTopicSysFlag(topicCenterSync);

        boolean isOrder = false;
        if (commandLine.hasOption('o')) {
            isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
        }
        topicConfig.setOrder(isOrder);

        if (commandLine.hasOption('b')) {
            String addr = commandLine.getOptionValue('b').trim();

            defaultMQAdminExt.start();
            defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);

            if (isOrder) {
                String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
                String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
                defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
                System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
                    isOrder, orderConf.toString()));
            }
            System.out.printf("create topic to %s success.%n", addr);
            System.out.printf("%s", topicConfig);
            return;

        } else if (commandLine.hasOption('c')) {
            String clusterName = commandLine.getOptionValue('c').trim();

            defaultMQAdminExt.start();

            Set<String> masterSet =
                CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
            for (String addr : masterSet) {
                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
                System.out.printf("create topic to %s success.%n", addr);
            }

            if (isOrder) {
                Set<String> brokerNameSet =
                    CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
                StringBuilder orderConf = new StringBuilder();
                String splitor = "";
                for (String s : brokerNameSet) {
                    orderConf.append(splitor).append(s).append(":")
                        .append(topicConfig.getWriteQueueNums());
                    splitor = ";";
                }
                defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
                    orderConf.toString(), true);
                System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
            }

            System.out.printf("%s", topicConfig);
            return;
        }

        ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
    } catch (Exception e) {
        throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
    } finally {
        defaultMQAdminExt.shutdown();
    }
}

从上面代码中可以看出,很大一部分代码都是解析 commandLine 参数。 解析出来的参数来填充 TopicConfig 对象。 然后调用 DefaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig) 方法来创建 Topic。

从上面的代码中可以看出 -b 和 -c 参数只能有一个生效。 -b 参数是在指定的 broker 上创建 topic -c 是在指定的集群上每一个 broker 创建 topic。

优先判断的是 -b 参数,如果指定 -b 参数就会在指定的 broker 上创建,而不会在 -c 指定的集群上创建。

其它的 SubCommand 命令的实现方式都一样,就不一一解析了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
    • 查看 mqadmin脚本
    • tools 模块结构
      • MQAdminStartup 启动类
        • initCommand() 方法
          • initCommand 方法
            • SubCommand 接口定义
            • 创建 Topic 源码分析
              • UpdateTopicPermSubCommand 解析
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档