Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。
Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!
下载Doris源码详细步骤:https://doris.apache.org/zh-CN/developer-guide/fe-idea-dev.html#_1-%E7%8E%AF%E5%A2%83%E5%87%86%E5%A4%87
我们先看看 FE启动类代码:
if (Strings.isNullOrEmpty(dorisHomeDir)) {
System.err.println("env DORIS_HOME is not set.");
return;
}
if (Strings.isNullOrEmpty(pidDir)) {
System.err.println("env PID_DIR is not set.");
return;
}
CommandLineOptions cmdLineOpts = parseArgs(args);
try {
// 创建 pid 文件
if (!createAndLockPidFile(pidDir + "/fe.pid")) {
throw new IOException("pid file is already locked.");
}
// 初始化 config文件
Config config = new Config();
config.init(dorisHomeDir + "/conf/fe.conf");
// Must init custom config after init config, separately.
// Because the path of custom config file is defined in fe.conf
config.initCustom(Config.custom_config_dir + "/fe_custom.conf");
LdapConfig ldapConfig = new LdapConfig();
if (new File(dorisHomeDir + "/conf/ldap.conf").exists()) {
ldapConfig.init(dorisHomeDir + "/conf/ldap.conf");
}
// check it after Config is initialized, otherwise the config 'check_java_version' won't work.
if (!JdkUtils.checkJavaVersion()) {
throw new IllegalArgumentException("Java version doesn't match");
}
Log4jConfig.initLogging(dorisHomeDir + "/conf/");
// set dns cache ttl
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
// check command line options
checkCommandLineOptions(cmdLineOpts);
LOG.info("Palo FE starting...");
//FE Address 初始化
FrontendOptions.init();
// 检查端口是否正常
checkAllPorts();
if (Config.enable_bdbje_debug_mode) {
// Start in BDB Debug mode
BDBDebugger.get().startDebugMode(dorisHomeDir);
return;
}
// 初始化 Catelog 并且等待加载完成
Catalog.getCurrentCatalog().initialize(args);
Catalog.getCurrentCatalog().waitForReady();
// 第一步 启动 HttpServer 类
// 第二步 启动 FeServer 类
// 第三步 启动 QeService
QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled, ExecuteEnv.getInstance().getScheduler());
FeServer feServer = new FeServer(Config.rpc_port);
feServer.start();
HttpServer httpServer = new HttpServer();
httpServer.setPort(Config.http_port);
httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
httpServer.setAcceptors(Config.jetty_server_acceptors);
httpServer.setSelectors(Config.jetty_server_selectors);
httpServer.setWorkers(Config.jetty_server_workers);
httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads);
httpServer.setMaxThreads(Config.jetty_threadPool_minThreads);
httpServer.start();
qeService.start();
ThreadPoolManager.registerAllThreadPoolMetric();
while (true) {
Thread.sleep(2000);
}
} catch (Throwable e) {
e.printStackTrace();
}
通过上面代码,我们可以清楚了解FE启动时主要执行以下过程:
CataLog 主要职责是维护FE 元数据,接下来我们看看FE启动时,CataLog初始化时,做什么处理:
// 获取本地节点和helper节点信息
getSelfHostPort();
getHelperNodes(args);
// 检查meta文件目录是否创建
File meta = new File(metaDir);
if (!meta.exists()) {
LOG.warn("Doris' meta dir {} does not exist. You need to create it before starting FE", meta.getAbsolutePath());
throw new Exception(meta.getAbsolutePath() + " does not exist, will exit");
}
//检查 BDB和Image目录是否创建
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
File bdbDir = new File(this.bdbDir);
if (!bdbDir.exists()) {
bdbDir.mkdirs();
}
File imageDir = new File(this.imageDir);
if (!imageDir.exists()) {
imageDir.mkdirs();
}
} else {
throw new Exception("Invalid edit log type: " + Config.edit_log_type);
}
// 初始化插件管理
pluginMgr.init();
auditEventProcessor.start();
// 2.获取集群ID和角色(Observer or Follower)
getClusterIdAndRole();
// 3. 首次加载image文件和回放Elog日志
this.editLog = new EditLog(nodeName);
loadImage(this.imageDir); // 加载image文件
editLog.open(); // 夹杂bdb环境配置
this.globalTransactionMgr.setEditLog(editLog);
this.idGenerator.setEditLog(editLog);
// 4. 创建加载和导出作业标签清理Daemon线程
createLabelCleaner();
// 5. 创建事务清理Daemon线程
createTxnCleaner();
// 6. 开始监听线程状态(MASTER/FOLLOWER/OBSERVER状态转换,以及leader选举工作和元数据同步工作)
createStateListener();
listener.start();
通过上面源码,我们可以发现,CateLog初始化时,执行以下操作:
QeServer职责是与Mysql Client进行通讯,支持Socket和Nio连接,具体源码:
try {
HelpModule.getInstance().setUpModule();
} catch (Exception e) {
LOG.error("Help module failed, because:", e);
}
this.port = port;
if (nioEnabled) {
mysqlServer = new NMysqlServer(port, scheduler);
} else {
mysqlServer = new MysqlServer(port, scheduler);
}
当nioEnabled(可配置) 为true时,使用Nio进行通讯,采用这种方式通信的好处是:
FeServer职责是负责FE和BE之间通信。
try {
switch (type) {
case SIMPLE:
createSimpleServer();
break;
case THREADED_SELECTOR:
createThreadedServer();
break;
default:
createThreadPoolServer();
}
} catch (TTransportException ex) {
LOG.warn("create thrift server failed.", ex);
throw new IOException("create thrift server failed.", ex);
}
ThriftServerEventProcessor eventProcessor = new ThriftServerEventProcessor(this);
server.setServerEventHandler(eventProcessor);
serverThread = new Thread(new Runnable() {
@Override
public void run() {
server.serve();
}
});
serverThread.setDaemon(true);
serverThread.start();
FE的Thrift使用的服务模型分为三种:
HttpServer职责主要是为Rest API和doris Web页面提供接口服务,源码如下:
Map<String, Object> properties = new HashMap<>();
properties.put("server.port", port);
properties.put("server.servlet.context-path", "/");
properties.put("spring.resources.static-locations", "classpath:/static");
properties.put("spring.http.encoding.charset", "UTF-8");
properties.put("spring.http.encoding.enabled", true);
properties.put("spring.http.encoding.force", true);
//enable jetty config
properties.put("server.jetty.acceptors", this.acceptors);
properties.put("server.jetty.max-http-post-size", this.maxHttpPostSize);
properties.put("server.jetty.selectors", this.selectors);
//Worker thread pool is not set by default, set according to your needs
if(this.workers > 0) {
properties.put("server.jetty.workers", this.workers);
}
// This is to disable the spring-boot-devtools restart feature.
// To avoid some unexpected behavior.
System.setProperty("spring.devtools.restart.enabled", "false");
// Value of `DORIS_HOME_DIR` is null in unit test.
if (PaloFe.DORIS_HOME_DIR != null) {
System.setProperty("spring.http.multipart.location", PaloFe.DORIS_HOME_DIR);
}
System.setProperty("spring.banner.image.location", "doris-logo.png");
if (FeConstants.runningUnitTest) {
// this is currently only used for unit test
properties.put("logging.config", getClass().getClassLoader().getResource("log4j2.xml").getPath());
} else {
properties.put("logging.config", Config.custom_config_dir + "/" + SpringLog4j2Config.SPRING_LOG_XML_FILE);
}
new SpringApplicationBuilder()
.sources(HttpServer.class)
.properties(properties)
.run(new String[]{});
HttpServer继承了SpringBootServletInitializer,同时使用了SpringApplicationBuilder类,那么我们就可以很清楚知道,使用Springboot框架提供Rest Api服务。