前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse 源码解析(二):SQL 的一生(中)

ClickHouse 源码解析(二):SQL 的一生(中)

原创
作者头像
用户8447023
发布2022-11-18 14:44:29
1.3K0
发布2022-11-18 14:44:29
举报
文章被收录于专栏:ClickHouse 源码解析

基于 ClickHouse version 22.10.1 学习并写下 ClickHouse 源码解析系列。由于 CK 版本迭代很快,可能导致代码可能有些出入,但是整体流程大差不差吧。由于源码阅读过于枯燥,并且不太利于后续复习,所以在逻辑梳理时,我会使用思维导图或者流程图的方式来描述类与类之间的调用逻辑,后半部分会挑出核心的源码去分析。


概述

上一篇通过思维导图的方式,从宏观上了解了 CK 是如何处理 SQL 的。在源码解析部分对 SelectQuery 这类查询语句如何构建 QueryPipeline 进行了分析。这篇我们紧接着学习 CK 是如何调度 QueryPipeline,数据是如何在 Processor 之间流转的。

逻辑梳理

为了方便复习,先挂上之前画的思维导图。ClickHouse 源码解析(一):SQL 的一生(上)

我们聚焦于执行 Pipeline 这部分,学习 SelectQuery 的调度流程。

Pipeline 机制

我们先来看一下根据 QueryPlan 构建完成的 QueryPipeline 是个什么样子的,这里举例一个带 Join 查询的 SelectQuery:

代码语言:sql
复制
SELECT * FROM customer AS c
INNER JOIN order AS o ON c.C_CUSTKEY = o.C_CUSTKEY;

通过EXPLAIN PIPELINE可以查看这条 SQL 所构建的 Pipeline,如下所示:

比如,(ReadFromMergeTree)就是 QueryPlan 中的一个算子,MergeTreeInOrder则代表实现这个算子向 Pipeline 中添加的 Processor,Processor 后面的数字0 -> 1表示InputPort数量为 0,OutputPort数量为 1,如果 Processor 后面没有数字则默认InputPortOutputPort数量都是 1。酱紫看可能有些抽象,我们来看看下面这个图,这张图画的是 SQL 的 QueryPlan:

然后我们看看根据这个 QueryPlan 构建出来的 QueryPipeline 的样子:

注意:Source只有OutputPortSink只有InputPort

InputPortOutputPortPort的子类,在Port中有个成员变量std::shared_ptr<State> state它保存了两个连接Port的共享数据。

Port.h源码(省略部分代码):

代码语言:javascript
复制
class Port
{
protected:
    /// Shared state of two connected ports.
    class State
    {
    public:
        struct Data
        {
            /// Note: std::variant can be used. But move constructor for it can't be inlined.
            Chunk chunk;
            std::exception_ptr exception;
        };
    private:
        std::atomic<Data *> data;
    }
}

Processor之间传递数据就是通过这个State来实现的,就像下面这个样子:

Processor 在执行完成后,会将数据 Push 到ShareState中,下一个 Processor 在执行时,会从ShareState将数据 Pull 出来进行处理,处理完之后又 Push 下一个 Prot 中,整个 Pipeline 数据就是这样流动起来的。

ExecutingGraph 调度

个人理解,Pipeline 只是作为数据流通的数据结构,ExecutingGraph记录了 Processor 的执行状态,它对整个 Graph 进行调度处理。ExecutingGraph大致结构如下:

代码语言:javascript
复制
/// Graph of executing pipeline.
class ExecutingGraph
{
public:
    /// Edge represents connection between OutputPort and InputPort.
    /// For every connection, two edges are created: direct and backward (it is specified by backward flag).
    struct Edge
    {
        /// Port numbers. They are same for direct and backward edges.
        uint64_t input_port_number;
        uint64_t output_port_number;
        
        /// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details.
        /// To compare version with prev_version we can decide if neighbour processor need to be prepared.
        Port::UpdateInfo update_info;
    };

    /// Graph node. Represents single Processor.
    struct Node
    {
        /// Direct edges are for output ports, back edges are for input ports.
        Edges direct_edges;
        Edges back_edges;
    };

    Nodes nodes;
};

简单来说,ExcutingGraph由多个Node组成,每个Node相当于 Processor,Node之间通过Edge连接,Node中记录了direct_edgesback_edges分别对应OutputPortInputPort。我们将上面得到的 QueryPipeline 转换为ExecutingGraph大致如下图所示:

LimitsCheckingTransformLazyOutputFormat是在 QueryPipeline 构建完成后添加的通用逻辑处理的 Processor,在后续调度中会用到。

既然ExcutingGraph构建完成,终于要开始调度执行了。那么在调度之前,需要给最先接受处理的Node初始化状态,CK 中 Processor 定义了如下六种状态:

代码语言:javascript
复制
enum class Status
{
    /// Processor needs some data at its inputs to proceed.
    /// You need to run another processor to generate required input and then call 'prepare' again.
    NeedData,

    /// Processor cannot proceed because output port is full or not isNeeded().
    /// You need to transfer data from output port to the input port of another processor and then call 'prepare' again.
    PortFull,

    /// All work is done (all data is processed or all output are closed), nothing more to do.
    Finished,

    /// No one needs data on output ports.
    /// Unneeded,

    /// You may call 'work' method and processor will do some work synchronously.
    Ready,

    /// You may call 'schedule' method and processor will return descriptor.
    /// You need to poll this descriptor and call work() afterwards.
    Async,

    /// Processor wants to add other processors to pipeline.
    /// New processors must be obtained by expandPipeline() call.
    ExpandPipeline,
};

ExcutingGraph在初始化节点,各个节点状态如下图所示:

处于Ready状态的Node,会调用当前 Processor 的work()方法处理数据,执行完成之后,MergeTree调用prepare()方法将数据 Push 到OutputPort,节点状态如下:

紧接着,Expression调用prepare()方法将 Pull 数据并且进入Ready状态,节点状态如下:

后续的节点还是以上面的方式,调用prepare()work()方法更新Node状态,完成整个ExcutingGraph调度逻辑。在文章末尾画了一部分算子较为完整的调度状态轮转图。

源码解析

上面内容大致了解了ExecutingGraph调度流程,接下来我们深入源码去学习 Graph 的节点初始化和 Graph 的调度。

ExecutingGraph 初始化

从思维导图可以看到在执行 Pipeline 之前会调用PipelineExecutor::initializeExecution()方法进行初始化,这个方法中主要是对ExecutingGraph进行初始化,调用ExecutingGraph::initializeExecution(Queue & queue)方法,源码如下:

代码语言:javascript
复制
void ExecutingGraph::initializeExecution(Queue & queue)
{
    std::stack<uint64_t> stack;

    /// Add childless processors to stack.
    uint64_t num_processors = nodes.size();
    for (uint64_t proc = 0; proc < num_processors; ++proc)
    {
        /// 1.将不存在 direct_edges 的 Node 放入栈中,
        /// 在上面的例子中,就是将 LazyOutputFormat 放入栈.
        if (nodes[proc]->direct_edges.empty())
        {
            stack.push(proc);
            /// do not lock mutex, as this function is executed in single thread
            nodes[proc]->status = ExecutingGraph::ExecStatus::Preparing;
        }
    }

    Queue async_queue;

    while (!stack.empty())
    {
        uint64_t proc = stack.top();
        stack.pop();
        
        /// 2.更新 Node 状态.
        updateNode(proc, queue, async_queue);

        if (!async_queue.empty())
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Async is only possible after work() call. Processor {}",
                            async_queue.front()->processor->getName());
    }
}

该方法就是将没有direct_edgesNode放入栈中,调用updateNode()方法更新Node状态,并将可执行Node放入queue中。

updateNode()方法是ExecutingGraph状态机调度轮转的关键函数,无论是初始化还是调度都会涉及到该函数,所以接下来不会直接将整个函数粘出来,而是说到哪一块就粘相关的代码结合流程图来分析,首先我们看一下整个函数大致做了什么事情,精简后的代码如下:

代码语言:javascript
复制
bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue)
{
    std::stack<Edge *> updated_edges;
    std::stack<uint64_t> updated_processors;
    updated_processors.push(pid);

    UpgradableMutex::ReadGuard read_lock(nodes_mutex);
    
    while (!updated_processors.empty() || !updated_edges.empty())
    {
        // 处理 update_edges 中 edge 指向的 node.
        if (updated_processors.empty()){...}
        
        // 处理 update_processors 中 node,更新 node 状态.
        if (!updated_processors.empty()){...}
    }
    return true;
}

从上面这部分代码可以看出,ExecutingGraph::updateNode()方法首先会创建两个栈updated_edgesupdated_processors分别保存待更新Edge或者Node。然后将该Node放入update_processors这个栈当中,接着会进入到一个循环去处理上面两个栈里的东西,直到两个栈的东西都被处理完。

ok,我们还是以逻辑梳理中的ExcutingGraph举例,在没有执行ExecutingGraph::initializeExecution()初始化节点之前,整个ExcutingGraph状态如下:

为了简洁,我给每个节点起了个编号,后续 Processor 直接用Node_n来表示。

由于Node_8是没有direct_edges,所以会被放入栈中调用updateNode()更新节点状态,并且会将Node_8放入updated_processors中,执行if (!updated_processors.empty()){...}代码块:

代码语言:javascript
复制
if (!updated_processors.empty())
{
    pid = updated_processors.top();
    updated_processors.pop();

    /// In this method we have ownership on node.
    auto & node = *nodes[pid];
    {
        try
        {
            auto & processor = *node.processor;
            IProcessor::Status last_status = node.last_processor_status;
            // 1.调用 Node 对应 IProcessor::prepare() 方法尝试 pull 数据.
            IProcessor::Status status = processor.prepare(node.updated_input_ports, node.updated_output_ports);
            // 记录上一个 Processor 的 status.
            node.last_processor_status = status;
        }
        switch (node.last_processor_status)
        {
            // 2.NeedData 将 Node.status 从 Preparing -> Idle.
            case IProcessor::Status::NeedData:
            case IProcessor::Status::PortFull:
            {
                node.status = ExecutingGraph::ExecStatus::Idle;
                break;
            }
            ...
        }
        
        // 3.将 Node 相邻的待更新 Edge 放入 update_edges 这个栈中.
        if (!need_expand_pipeline)
        {        
            for (auto it = node.post_updated_output_ports.rbegin(); it != node.post_updated_output_ports.rend(); ++it)
            {
                auto * edge = static_cast<ExecutingGraph::Edge *>(*it);
                updated_edges.push(edge);
                edge->update_info.trigger();
            }
        
            for (auto it = node.post_updated_input_ports.rbegin(); it != node.post_updated_input_ports.rend(); ++it)
            {
                auto * edge = static_cast<ExecutingGraph::Edge *>(*it);
                updated_edges.push(edge);
                edge->update_info.trigger();
            }
        
            node.post_updated_input_ports.clear();
            node.post_updated_output_ports.clear();
        }
    }
}

对于Node_8来说,在上面代码中干了两个事情:

  • 调用IProcessor::prepare()尝试拉取数据,并更新节点状态;
  • Node_8相邻的待更新的Edge放入updated_edges中;

那么我们先简单看一下LazyOutputFormat这个 Processor 的prepare()IOutputFormat::prepare()源码:

代码语言:javascript
复制
IOutputFormat::Status IOutputFormat::prepare()
{
    // has_input 标识已经拉取到数据.
    if (has_input)
        return Status::Ready;
    
    // 默认就是 Main kind, Totals/Extremes 暂时不用管.
    for (auto kind : {Main, Totals, Extremes})
    {
        auto & input = getPort(kind);

        if (kind != Main && !input.isConnected())
            continue;

        if (input.isFinished())
            continue;
        
        // 更新相邻的 Edge,分别放入 
        // post_updated_input_ports、post_updated_output_ports
        input.setNeeded();
        
        // 查看 state 是否有数据,如果没有则返回 NeedData.
        if (!input.hasData())
            return Status::NeedData;

        // 如果 state 已经有数据,那么调用 pullData() 方法
        // 将数据拉取到 current_chunk 中,并将 has_input 设为 true.
        current_chunk = input.pull(true);
        current_block_kind = kind;
        has_input = true;
        return Status::Ready;
    }

    finished = true;

    if (!finalized)
        return Status::Ready;

    return Status::Finished;
}

Node_7也没有 Push 数据到 state 中,所以Node_8拉取不到数据就返回Status::NeedData

Node_8prepare()执行完之后,接着往下走,它需要将相邻的待更新的Edge放入updated_edges栈中。

也就是将Node_8Node_7的回边放入栈中,接下来就来到了 while 循环中处理update_edges的方法,执行if (updated_processors.empty()){...}代码块:

代码语言:javascript
复制
if (updated_processors.empty())
{
    auto * edge = updated_edges.top();
    updated_edges.pop();

    /// Here we have ownership on edge, but node can be concurrently accessed.

    auto & node = *nodes[edge->to];

    std::unique_lock lock(node.status_mutex);

    ExecutingGraph::ExecStatus status = node.status;

    if (status != ExecutingGraph::ExecStatus::Finished)
    {
        if (edge->backward)
            node.updated_output_ports.push_back(edge->output_port_number);
        else
            node.updated_input_ports.push_back(edge->input_port_number);

        if (status == ExecutingGraph::ExecStatus::Idle)
        {
            node.status = ExecutingGraph::ExecStatus::Preparing;
            updated_processors.push(edge->to);
            stack_top_lock = std::move(lock);
        }
        else
            nodes[edge->to]->processor->onUpdatePorts();
    }
}

上面代码简单来说,就是将Edge所指向的Node放入update_processors栈中,也就是将Node_7放入栈。

到这里一次循环就算完成了,从Node_7开始又会和Node_8执行相似的代码逻辑,大概就是调用Node_7Processor::prepare()尝试更新从 state 拉取数据,如果 state 数据没有准备好,就将Processor.status更新为NeedData,紧接着将Node_7相邻Edge放入update_edges栈中,然后又将Node_7Edge指向的Node放入update_processors栈中,重复此循环。

整个流程大致如下:

在处理 Node_2 Joining 节点时,在JoiningTransform::prepare()方法中,如果它的 inputs 数量大于 1,则会优先处理inputs.back()节点,也就是 Node_5。

直到执行到Node_3,由于Node_3是一个ISource没有InputPort,所以它不需要向 state 拉取数据(它自己会从存储层读取数据),在Node_3会调用ISource::prepare(),代码如下:

代码语言:javascript
复制
ISource::Status ISource::prepare()
{
    if (finished)
    {
        output.finish();
        return Status::Finished;
    }

    /// Check can output.
    if (output.isFinished())
        return Status::Finished;

    if (!output.canPush())
        return Status::PortFull;
    
    // ISource 没有 input,所以会直接返回 Ready.
    if (!has_input)
        return Status::Ready;

    output.pushData(std::move(current_chunk));
    has_input = false;

    if (isCancelled())
    {
        output.finish();
        return Status::Finished;
    }

    if (got_exception)
    {
        finished = true;
        output.finish();
        return Status::Finished;
    }

    /// Now, we pushed to output, and it must be full.
    return Status::PortFull;
}

在上面代码中,由于ISource没有 input,则直接返回Status::Ready。从prepare()方法返回,接着往下走进入新的分支,代码如下:

代码语言:javascript
复制
switch (node.last_processor_status)
{
    case IProcessor::Status::Ready:
    {
        node.status = ExecutingGraph::ExecStatus::Executing;
        queue.push(&node);
        break;
    }
}

Node_3放入最开始传入的queue队列中,至此update_processorsupdate_edges两个栈都没有东西了,ExecutingGraph初始化完成,从ExecutingGraph::updateNode()方法返回,重新回到ExecutingGraph::initializeExecution()方法。那么现在ExecutingGraph初始化完成的节点状态如下:

到这里ExecutingGraph的初始化就完成了,由于本篇过长,就将调度放在下一篇来讲吧。

调度状态轮转图

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 逻辑梳理
    • Pipeline 机制
      • ExecutingGraph 调度
      • 源码解析
        • ExecutingGraph 初始化
        • 调度状态轮转图
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档