前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rocketmq学习2

rocketmq学习2

作者头像
路行的亚洲
发布2020-07-23 16:35:28
3850
发布2020-07-23 16:35:28
举报
文章被收录于专栏:后端技术学习后端技术学习

前面我们已经通过quickstrat可以看到nameServer的启动:

/**
 * nameServer启动类
 */
public class NamesrvStartup {

    private static InternalLogger log;
    private static Properties properties = null;
    private static CommandLine commandLine = null;

    public static void main(String[] args) {
        main0(args);
    }

    //启动NamesrvController做了:创建namesrvController和启动controller,返回controller
    public static NamesrvController main0(String[] args) {

        try {
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

    //创建namesrvController:首先设置配置信息rocketmq的版本信息
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        //创建namesrvCnfig、nettyServerConfig,并进行配置的填充
        //采用-c和-p两种方式进行填充
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

    //启动namesrvController
    public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        //初始化:加载kv配置、创建网络对象,开启两个定时任务(心跳检查),注册jvm钩子
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        //添加关机钩子函数
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        //启动controller
        controller.start();

        return controller;
    }

    public static void shutdown(final NamesrvController controller) {
        controller.shutdown();
    }

    //使用-c和-p的方式进行环境变量参数的添加
    public static Options buildCommandlineOptions(final Options options) {
        Option opt = new Option("c", "configFile", true, "Name server config properties file");
        opt.setRequired(false);
        options.addOption(opt);

        opt = new Option("p", "printConfigItem", false, "Print all config item");
        opt.setRequired(false);
        options.addOption(opt);

        return options;
    }

    public static Properties getProperties() {
        return properties;
    }
}

从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。

NamesrvController的属性信息、构造函数:

public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    //namesrvConfig配置信息
    private final NamesrvConfig namesrvConfig;
    //NettyServer配置信息
    private final NettyServerConfig nettyServerConfig;

    //定时任务
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
    //kv配置管理
    private final KVConfigManager kvConfigManager;
    //路由信息管理
    private final RouteInfoManager routeInfoManager;
    //远程服务
    private RemotingServer remotingServer;
    //brokerHousekeepingService
    private BrokerHousekeepingService brokerHousekeepingService;

    private ExecutorService remotingExecutor;
    //配置
    private Configuration configuration;
    private FileWatchService fileWatchService;

    //namesrvController:从入参可以看到里面包含两个重要的配置namesrvConfig、nettyServerConfig
    //除了这两个重要的配置之外,还添加了kvConfigManager、routeInfoManager、brokerHousekeepingService、
    //Configuration
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }
}

NamesrvConfig配置信息:

public class NamesrvConfig {
    //rocketmqHome信息
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    //K-V配置路径
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    //配置存储路径
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    //productEnvName
    private String productEnvName = "center";
    //clusterTes
    private boolean clusterTest = false;
    //消息有序默认false
    private boolean orderMessageEnable = false;
}

NettysrvComfig配置信息

/**
 * NettyServer配置
 */
public class NettyServerConfig implements Cloneable {
    //监听端口号
    private int listenPort = 8888;
    //worker线程数
    private int serverWorkerThreads = 8;
    //callbackExecutor线程数
    private int serverCallbackExecutorThreads = 0;
    //选择器线程数
    private int serverSelectorThreads = 3;
    //OnewaySemaphoreValue值
    private int serverOnewaySemaphoreValue = 256;
    //异步SemaphoreValue值
    private int serverAsyncSemaphoreValue = 64;
    //通道最大闲置时间
    private int serverChannelMaxIdleTimeSeconds = 120;

    //socket的发送和接收的bufSize
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    //开启堆外内存
    private boolean serverPooledByteBufAllocatorEnable = true;
}

RouteInfoManager路由信息:

**
 * 路由信息
 */
public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //broker通道过期时间
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
   //读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    //topic消息队列路由表
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    //broker基础表
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    //cluster集群表
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //broker状态表
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //FilterServer表
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    //路由信息构造函数
    public RouteInfoManager() {
        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
        this.brokerAddrTable = new HashMap<String, BrokerData>(128);
        this.clusterAddrTable = new HashMap<String, Set<String>>(32);
        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
        this.filterServerTable = new HashMap<String, List<String>>(256);
    }

start(final NamesrvController controller)中进行初始化,创建nettyServer对象,开启两个定时任务,添加关闭钩子,然后进行启动操作

//进行初始化
public boolean initialize() {
    //加载kv配置信息
    this.kvConfigManager.load();

    //netty远程服务:nettyServer配置、brokerHousekeepingService
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    //远程线程
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
   //注册处理
    this.registerProcessor();
    //下面两个定时任务是心跳检查
   //定时任务:路由信息管理扫描没有激活的broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    //定时任务:kv配置管理调用定时打印所有
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    //注册一个监听去重新加载Ssl上下文
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        }
                    }
                    private void reloadServerSslContext() {
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }

    return true;
}

启动服务:

public void start() throws Exception {
    this.remotingServer.start();

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

启动服务

//reomotingCelint,使用Netty,可以看到DefaultEventExecutorGroup继承MultithreadEventExecutorGroup
//而在Netty中,我们知道MultithreadEventExecutorGroup的构造方法是NioEventLoopGroup的构造方法
//构造方法:DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory)
@Override
public void start() {
    //创建NioEventLoopGroup
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);
            //重写线程方法
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    //创建引导 客户端 填充信息
    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(new ChannelInitializer<SocketChannel>() {
            //重写initChannel方法
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                if (nettyClientConfig.isUseTLS()) {
                    if (null != sslContext) {
                        pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                        log.info("Prepend SSL handler");
                    } else {
                        log.warn("Connections are insecure as SSLContext is null!");
                    }
                }
                //pipeline添加信息,以及handler
                pipeline.addLast(
                    defaultEventExecutorGroup,
                    new NettyEncoder(),
                    new NettyDecoder(),
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    new NettyConnectManageHandler(),
                    new NettyClientHandler());
            }
        });

    //扫描响应表启动 定时任务
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    //如果通道事件监听不为空,则启动
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}

定时任务:

/**
 * <p>
 * This method is periodically invoked to scan and expire deprecated request.
 * 定期调用此方法以扫描和终止已弃用的请求
 * </p>
 */
public void scanResponseTable() {
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
    //迭代器
    Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Integer, ResponseFuture> next = it.next();
        ResponseFuture rep = next.getValue();

        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
            rep.release();
            it.remove();
            rfList.add(rep);
            log.warn("remove timeout request, " + rep);
        }
    }

    for (ResponseFuture rf : rfList) {
        try {
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            log.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}

总结:从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档