前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[Doris核心原理] Fe启动过程原理分析4 - 初始化QeService, FeServer, HttpServer

[Doris核心原理] Fe启动过程原理分析4 - 初始化QeService, FeServer, HttpServer

作者头像
小伟
发布2022-07-24 09:33:30
8260
发布2022-07-24 09:33:30
举报
文章被收录于专栏:魔都程序缘魔都程序缘

上一篇初略的讲解了[Doris核心原理] -- FE启动过程原理分析3 -- 初始化Catalog -- Fe中集元数据管理、回放日志管理、Doris插件管理、Fe角色管理、垃圾数据回收管理等 等 等 等功能于一体, 这一篇主要讲解在[Doris核心原理] -- FE启动过程原理分析3 -- 初始化Catalog 初始化后, Fe继续初始化 QeService,FeServer,HttpServer 三个核心服务.

初始化代码还是在[Doris核心原理] -- FE启动过程原理分析2 -- 启动类PaloFe.java中. 下面分别介绍三个服务初始化过程.

1. QeService, 一个SQL应用服务

QeService是一个SQL应用层服务, 启动后用户可以通过MySQL客户端像连接MySQL Server一样连接Doris, 并且执行SQL语句. Fe是通过构造函数初始化的, 代码如下:

代码语言:javascript
复制
public QeService(int port, boolean nioEnabled, ConnectScheduler scheduler) {
  //加载帮助w文档

        try {
            HelpModule.getInstance().setUpModule();
        } catch (Exception e) {
            LOG.error("Help module failed, because:", e);
        }
        this.port = port;
        if (nioEnabled) {//NIO服务开启
            mysqlServer = new NMysqlServer(port, scheduler);
        } else {
            mysqlServer = new MysqlServer(port, scheduler);
        }
    }

我们先讲解传入参数:

port: SQL服务监听端口, 默认是9030, 可以通过Fe的配置项query_port更改.

nioEnabled: 是否用NIO实现的MySQL Server服务, 可以通过Fe的配置mysql_service_nio_enabled更改. 默认是true

scheduler: SQL请求计划处理器. 现在的逻辑是来一个SQL请求, 新开一个线程处理. 这个对象在执行环境初始化的时候初始化的.

接下来我们以NMysqlServer为例讲解初始化的过程. 我们先看看构造函数初始化的代码:

代码语言:javascript
复制
public NMysqlServer(int port, ConnectScheduler connectScheduler) {
        this.port = port;
        this.xnioWorker = Xnio.getInstance().createWorkerBuilder()
                .setWorkerName("doris-mysql-nio")
                .setWorkerIoThreads(Config.mysql_service_io_threads_num)
                .setExternalExecutorService(taskService).build();
        // connectScheduler only used for idle check.
        this.acceptListener = new AcceptListener(connectScheduler);
    }

我们可以看到, NMysqlServer是NIO实现的服务. 这里将SQL调度器connectScheduler传入, 构造一个acceptListener实例, 在NMysqlServer初始化完成后, 会调用其start()方法启动.

AcceptListener的作用是监听SQL请求, 并处理. 其核心方法是handleEvent(), 部分核心如下代码:

代码语言:javascript
复制
 public void handleEvent(AcceptingChannel<StreamConnection> channel) {
    try {
        StreamConnection connection = channel.accept();
        ...
        NConnectContext context = new NConnectContext(connection);
        context.setCatalog(Catalog.getCurrentCatalog());
        connectScheduler.submit(context);
        channel.getWorker().execute(() -> {
            try {
                ...
                ConnectProcessor processor = new ConnectProcessor(context);
                context.startAcceptQuery(processor);
            } catch (AfterConnectedException e) {
                context.cleanup();
            } catch (Exception e) {
                LOG.warn("connect processor exception because ", e);
                context.cleanup();
            } finally {
                ConnectContext.remove();
            }
        });
    } catch (IOException e) {
        LOG.warn("Connection accept failed.", e);
    }
}

当MySQL NIO Server收到一个SQL请求时, 会自动调用handleEvent(), 将请求封装成AcceptingChannel<StreamConnection>类型传递给我们处理.

通过 context.startAcceptQuery(processor)方法, 将注册一个ReadListener, 在这个监听器的handleEvent()方法中调用ConnectProcessor实例中的processOnce()方法, 通过调用dispatch()方法, 开始正式处理一个SQL请求, 代码如下所示:

代码语言:javascript
复制
private void dispatch() throws IOException {
        int code = packetBuf.get();
        MysqlCommand command = MysqlCommand.fromCode(code);
        if (command == null) {
            ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
            ctx.getState().setError("Unknown command(" + command + ")");
            LOG.warn("Unknown command(" + command + ")");
            return;
        }
        ctx.setCommand(command);
        ctx.setStartTime();


        switch (command) {
            case COM_INIT_DB:
                handleInitDb();
                break;
            case COM_QUIT:
                handleQuit();
                break;
            case COM_QUERY:
                handleQuery();
                ctx.setStartTime();
                break;
            case COM_FIELD_LIST:
                handleFieldList();
                break;
            case COM_PING:
                handlePing();
                break;
            default:
                ctx.getState().setError("Unsupported command(" + command + ")");
                LOG.warn("Unsupported command(" + command + ")");
                break;
        }
    }

后面的逻辑则是对各种SQL语句的处理, 我们后面会一一详解.

2. FeServer, 一个RPC服务

FeServer其实是一个基于Thrift的RPC服务, 通过其构造函数我们可以很轻易的知道, 构造函数代码如下:

代码语言:javascript
复制
public void start() throws IOException {

        TProcessor tprocessor = new FrontendService.Processor<FrontendService.Iface>(
                new FrontendServiceImpl(ExecuteEnv.getInstance()));
        server = new ThriftServer(port, tprocessor);
        server.start();
        LOG.info("thrift server started.");
    }

通过观察 TProcessor tprocessor = new FrontendService.Processor<FrontendService.Iface>( new FrontendServiceImpl(ExecuteEnv.getInstance())), 我们可以推测出启动了一个RPC服务处理器, 处理FrontendService.Iface接口中的方法, 这个接口的实现类我们可以通过IDEA轻松知道是FrontendServiceImpl.java.

通过之前的文章: Apache Doris概述和组件介绍, 我们可以得知Fe的RPC服务会被其他Fe和Be调用.

3. HttpServer, 一个Http服务

这个http服务是Doris自己实现的一个Http服务, 支持web页面请求和restful api两类http请求.

一般的, org.apache.doris.http.action包下的全部是web页面请求; org.apache.doris.http.rest包下的全部是restful api.

具体哪些是生效的需要查看HttpServer中registerActions()方法, 这个方法注册了全部的web页面请求和restful api. 当然也包含Prometheus Exporter接口.

本章节到此结束了, 希望大家阅读完后, 对三个服务的基本指责和运行过程有大致了解.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 魔都程序缘 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档