rocketmq学习2

前面我们已经通过quickstrat可以看到nameServer的启动:

/**
 * nameServer启动类
 */
public class NamesrvStartup {

    private static InternalLogger log;
    private static Properties properties = null;
    private static CommandLine commandLine = null;

    public static void main(String[] args) {
        main0(args);
    }

    //启动NamesrvController做了:创建namesrvController和启动controller,返回controller
    public static NamesrvController main0(String[] args) {

        try {
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

    //创建namesrvController:首先设置配置信息rocketmq的版本信息
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        //创建namesrvCnfig、nettyServerConfig,并进行配置的填充
        //采用-c和-p两种方式进行填充
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

    //启动namesrvController
    public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        //初始化:加载kv配置、创建网络对象,开启两个定时任务(心跳检查),注册jvm钩子
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        //添加关机钩子函数
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        //启动controller
        controller.start();

        return controller;
    }

    public static void shutdown(final NamesrvController controller) {
        controller.shutdown();
    }

    //使用-c和-p的方式进行环境变量参数的添加
    public static Options buildCommandlineOptions(final Options options) {
        Option opt = new Option("c", "configFile", true, "Name server config properties file");
        opt.setRequired(false);
        options.addOption(opt);

        opt = new Option("p", "printConfigItem", false, "Print all config item");
        opt.setRequired(false);
        options.addOption(opt);

        return options;
    }

    public static Properties getProperties() {
        return properties;
    }
}

从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。

NamesrvController的属性信息、构造函数:

public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    //namesrvConfig配置信息
    private final NamesrvConfig namesrvConfig;
    //NettyServer配置信息
    private final NettyServerConfig nettyServerConfig;

    //定时任务
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
    //kv配置管理
    private final KVConfigManager kvConfigManager;
    //路由信息管理
    private final RouteInfoManager routeInfoManager;
    //远程服务
    private RemotingServer remotingServer;
    //brokerHousekeepingService
    private BrokerHousekeepingService brokerHousekeepingService;

    private ExecutorService remotingExecutor;
    //配置
    private Configuration configuration;
    private FileWatchService fileWatchService;

    //namesrvController:从入参可以看到里面包含两个重要的配置namesrvConfig、nettyServerConfig
    //除了这两个重要的配置之外,还添加了kvConfigManager、routeInfoManager、brokerHousekeepingService、
    //Configuration
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }
}

NamesrvConfig配置信息:

public class NamesrvConfig {
    //rocketmqHome信息
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    //K-V配置路径
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    //配置存储路径
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    //productEnvName
    private String productEnvName = "center";
    //clusterTes
    private boolean clusterTest = false;
    //消息有序默认false
    private boolean orderMessageEnable = false;
}

NettysrvComfig配置信息

/**
 * NettyServer配置
 */
public class NettyServerConfig implements Cloneable {
    //监听端口号
    private int listenPort = 8888;
    //worker线程数
    private int serverWorkerThreads = 8;
    //callbackExecutor线程数
    private int serverCallbackExecutorThreads = 0;
    //选择器线程数
    private int serverSelectorThreads = 3;
    //OnewaySemaphoreValue值
    private int serverOnewaySemaphoreValue = 256;
    //异步SemaphoreValue值
    private int serverAsyncSemaphoreValue = 64;
    //通道最大闲置时间
    private int serverChannelMaxIdleTimeSeconds = 120;

    //socket的发送和接收的bufSize
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    //开启堆外内存
    private boolean serverPooledByteBufAllocatorEnable = true;
}

RouteInfoManager路由信息:

**
 * 路由信息
 */
public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //broker通道过期时间
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
   //读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    //topic消息队列路由表
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    //broker基础表
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    //cluster集群表
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //broker状态表
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //FilterServer表
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    //路由信息构造函数
    public RouteInfoManager() {
        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
        this.brokerAddrTable = new HashMap<String, BrokerData>(128);
        this.clusterAddrTable = new HashMap<String, Set<String>>(32);
        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
        this.filterServerTable = new HashMap<String, List<String>>(256);
    }

start(final NamesrvController controller)中进行初始化,创建nettyServer对象,开启两个定时任务,添加关闭钩子,然后进行启动操作

//进行初始化
public boolean initialize() {
    //加载kv配置信息
    this.kvConfigManager.load();

    //netty远程服务:nettyServer配置、brokerHousekeepingService
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    //远程线程
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
   //注册处理
    this.registerProcessor();
    //下面两个定时任务是心跳检查
   //定时任务:路由信息管理扫描没有激活的broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    //定时任务:kv配置管理调用定时打印所有
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    //注册一个监听去重新加载Ssl上下文
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        }
                    }
                    private void reloadServerSslContext() {
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }

    return true;
}

启动服务:

public void start() throws Exception {
    this.remotingServer.start();

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

启动服务

//reomotingCelint,使用Netty,可以看到DefaultEventExecutorGroup继承MultithreadEventExecutorGroup
//而在Netty中,我们知道MultithreadEventExecutorGroup的构造方法是NioEventLoopGroup的构造方法
//构造方法:DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory)
@Override
public void start() {
    //创建NioEventLoopGroup
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);
            //重写线程方法
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    //创建引导 客户端 填充信息
    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(new ChannelInitializer<SocketChannel>() {
            //重写initChannel方法
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                if (nettyClientConfig.isUseTLS()) {
                    if (null != sslContext) {
                        pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                        log.info("Prepend SSL handler");
                    } else {
                        log.warn("Connections are insecure as SSLContext is null!");
                    }
                }
                //pipeline添加信息,以及handler
                pipeline.addLast(
                    defaultEventExecutorGroup,
                    new NettyEncoder(),
                    new NettyDecoder(),
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    new NettyConnectManageHandler(),
                    new NettyClientHandler());
            }
        });

    //扫描响应表启动 定时任务
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    //如果通道事件监听不为空,则启动
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}

定时任务:

/**
 * <p>
 * This method is periodically invoked to scan and expire deprecated request.
 * 定期调用此方法以扫描和终止已弃用的请求
 * </p>
 */
public void scanResponseTable() {
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
    //迭代器
    Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Integer, ResponseFuture> next = it.next();
        ResponseFuture rep = next.getValue();

        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
            rep.release();
            it.remove();
            rfList.add(rep);
            log.warn("remove timeout request, " + rep);
        }
    }

    for (ResponseFuture rf : rfList) {
        try {
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            log.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}

总结:从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-07-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Netty实现简单RPC调用

    我们知道Dubbo是一个RPC框架,那RPC框架需要实现什么?需要实现的是调用远程服务和本地服务一样方便,同时提高调用远程服务的性能。而服务端和客户端之间的关系...

    路行的亚洲
  • java8学习整理

    由于项目中使用java8中的lambda表达式,因此想学习一下java8中的lambda表达式和stream流。

    路行的亚洲
  • RocketMQ学习四-生产者producer

    前面我们已经知道RocketMQ的生产者和消费者依赖NameServer和broker,因此需要先启动nameServer和broker。同时nameServe...

    路行的亚洲
  • Java——简单Java类深入(数据表与简单Java类、一对多映射、双向一对多映射、多对多映射)

    简单Java类是整个项目开发的灵魂,其有严格的开发标准,最为重要的是它要与数据表完全对应。由于目前没有接触过多的程序设计功能,所以对于此处的访问就有了一些限制,...

    Winter_world
  • Android之使用JavaMail发送邮件

    forrestlin
  • 【设计模式系列(三)】彻底搞懂原型模式

    原型模式(Prototype Pattern)是用于创建重复的对象,同时又能保证性能

    你好戴先生
  • 第50节:Java当中的泛型

    这就存在一个问题,如果集合存储元素时,而且存储对象有很多,而且对象类型不相同,就很容易导致隐患。

    达达前端
  • 最全一篇Lombok使用讲解,及原理,真香啊

    Lombok 是一款好用顺手的工具,就像 Google Guava 一样。可用来帮助开发人员消除 Java 的冗长代码,尤其是对于简单的 Java 对象(POJ...

    公众号 IT老哥
  • Java IO

    java中涉及到的io流基本都是从以上四个抽象基类派生出来的,其子类都是以其父类的名字做后缀。

    万能青年
  • 源码分析(1.4万字) | Mybatis接口没有实现类为什么可以执行增删改查

    MyBatis 是一款非常优秀的持久层框架,相对于IBatis更是精进了不少。与此同时它还提供了很多的扩展点,比如最常用的插件;语言驱动器,执行器,对象工厂,对...

    小傅哥

扫码关注云+社区

领取腾讯云代金券