基于 ClickHouse version 22.10.1 学习并写下 ClickHouse 源码解析系列。由于 CK 版本迭代很快,可能导致代码可能有些出入,但是整体流程大差不差吧。由于源码阅读过于枯燥,并且不太利于后续复习,所以在逻辑梳理时,我会使用思维导图或者流程图的方式来描述类与类之间的调用逻辑,后半部分会挑出核心的源码去分析。
上一篇通过思维导图的方式,从宏观上了解了 CK 是如何处理 SQL 的。在源码解析部分对 SelectQuery 这类查询语句如何构建 QueryPipeline 进行了分析。这篇我们紧接着学习 CK 是如何调度 QueryPipeline,数据是如何在 Processor 之间流转的。
为了方便复习,先挂上之前画的思维导图。ClickHouse 源码解析(一):SQL 的一生(上)
我们聚焦于执行 Pipeline 这部分,学习 SelectQuery 的调度流程。
我们先来看一下根据 QueryPlan 构建完成的 QueryPipeline 是个什么样子的,这里举例一个带 Join 查询的 SelectQuery:
SELECT * FROM customer AS c
INNER JOIN order AS o ON c.C_CUSTKEY = o.C_CUSTKEY;
通过EXPLAIN PIPELINE
可以查看这条 SQL 所构建的 Pipeline,如下所示:
比如,(ReadFromMergeTree)
就是 QueryPlan 中的一个算子,MergeTreeInOrder
则代表实现这个算子向 Pipeline 中添加的 Processor,Processor 后面的数字0 -> 1
表示InputPort
数量为 0,OutputPort
数量为 1,如果 Processor 后面没有数字则默认InputPort
和OutputPort
数量都是 1。酱紫看可能有些抽象,我们来看看下面这个图,这张图画的是 SQL 的 QueryPlan:
然后我们看看根据这个 QueryPlan 构建出来的 QueryPipeline 的样子:
注意:
Source
只有OutputPort
,Sink
只有InputPort
。
InputPort
和OutputPort
是Port
的子类,在Port
中有个成员变量std::shared_ptr<State> state
它保存了两个连接Port
的共享数据。
Port.h
源码(省略部分代码):
class Port
{
protected:
/// Shared state of two connected ports.
class State
{
public:
struct Data
{
/// Note: std::variant can be used. But move constructor for it can't be inlined.
Chunk chunk;
std::exception_ptr exception;
};
private:
std::atomic<Data *> data;
}
}
Processor
之间传递数据就是通过这个State
来实现的,就像下面这个样子:
Processor 在执行完成后,会将数据 Push 到ShareState
中,下一个 Processor 在执行时,会从ShareState
将数据 Pull 出来进行处理,处理完之后又 Push 下一个 Prot 中,整个 Pipeline 数据就是这样流动起来的。
个人理解,Pipeline 只是作为数据流通的数据结构,ExecutingGraph
记录了 Processor 的执行状态,它对整个 Graph 进行调度处理。ExecutingGraph
大致结构如下:
/// Graph of executing pipeline.
class ExecutingGraph
{
public:
/// Edge represents connection between OutputPort and InputPort.
/// For every connection, two edges are created: direct and backward (it is specified by backward flag).
struct Edge
{
/// Port numbers. They are same for direct and backward edges.
uint64_t input_port_number;
uint64_t output_port_number;
/// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details.
/// To compare version with prev_version we can decide if neighbour processor need to be prepared.
Port::UpdateInfo update_info;
};
/// Graph node. Represents single Processor.
struct Node
{
/// Direct edges are for output ports, back edges are for input ports.
Edges direct_edges;
Edges back_edges;
};
Nodes nodes;
};
简单来说,ExcutingGraph
由多个Node
组成,每个Node
相当于 Processor,Node
之间通过Edge
连接,Node
中记录了direct_edges
、back_edges
分别对应OutputPort
、InputPort
。我们将上面得到的 QueryPipeline 转换为ExecutingGraph
大致如下图所示:
LimitsCheckingTransform
和LazyOutputFormat
是在 QueryPipeline 构建完成后添加的通用逻辑处理的 Processor,在后续调度中会用到。
既然ExcutingGraph
构建完成,终于要开始调度执行了。那么在调度之前,需要给最先接受处理的Node
初始化状态,CK 中 Processor 定义了如下六种状态:
enum class Status
{
/// Processor needs some data at its inputs to proceed.
/// You need to run another processor to generate required input and then call 'prepare' again.
NeedData,
/// Processor cannot proceed because output port is full or not isNeeded().
/// You need to transfer data from output port to the input port of another processor and then call 'prepare' again.
PortFull,
/// All work is done (all data is processed or all output are closed), nothing more to do.
Finished,
/// No one needs data on output ports.
/// Unneeded,
/// You may call 'work' method and processor will do some work synchronously.
Ready,
/// You may call 'schedule' method and processor will return descriptor.
/// You need to poll this descriptor and call work() afterwards.
Async,
/// Processor wants to add other processors to pipeline.
/// New processors must be obtained by expandPipeline() call.
ExpandPipeline,
};
ExcutingGraph
在初始化节点,各个节点状态如下图所示:
处于Ready
状态的Node
,会调用当前 Processor 的work()
方法处理数据,执行完成之后,MergeTree
调用prepare()
方法将数据 Push 到OutputPort
,节点状态如下:
紧接着,Expression
调用prepare()
方法将 Pull 数据并且进入Ready
状态,节点状态如下:
后续的节点还是以上面的方式,调用prepare()
和work()
方法更新Node状态,完成整个ExcutingGraph
调度逻辑。在文章末尾画了一部分算子较为完整的调度状态轮转图。
上面内容大致了解了ExecutingGraph
调度流程,接下来我们深入源码去学习 Graph 的节点初始化和 Graph 的调度。
从思维导图可以看到在执行 Pipeline 之前会调用PipelineExecutor::initializeExecution()
方法进行初始化,这个方法中主要是对ExecutingGraph
进行初始化,调用ExecutingGraph::initializeExecution(Queue & queue)
方法,源码如下:
void ExecutingGraph::initializeExecution(Queue & queue)
{
std::stack<uint64_t> stack;
/// Add childless processors to stack.
uint64_t num_processors = nodes.size();
for (uint64_t proc = 0; proc < num_processors; ++proc)
{
/// 1.将不存在 direct_edges 的 Node 放入栈中,
/// 在上面的例子中,就是将 LazyOutputFormat 放入栈.
if (nodes[proc]->direct_edges.empty())
{
stack.push(proc);
/// do not lock mutex, as this function is executed in single thread
nodes[proc]->status = ExecutingGraph::ExecStatus::Preparing;
}
}
Queue async_queue;
while (!stack.empty())
{
uint64_t proc = stack.top();
stack.pop();
/// 2.更新 Node 状态.
updateNode(proc, queue, async_queue);
if (!async_queue.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Async is only possible after work() call. Processor {}",
async_queue.front()->processor->getName());
}
}
该方法就是将没有direct_edges
的Node
放入栈中,调用updateNode()
方法更新Node
状态,并将可执行Node
放入queue
中。
updateNode()
方法是ExecutingGraph
状态机调度轮转的关键函数,无论是初始化还是调度都会涉及到该函数,所以接下来不会直接将整个函数粘出来,而是说到哪一块就粘相关的代码结合流程图来分析,首先我们看一下整个函数大致做了什么事情,精简后的代码如下:
bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue)
{
std::stack<Edge *> updated_edges;
std::stack<uint64_t> updated_processors;
updated_processors.push(pid);
UpgradableMutex::ReadGuard read_lock(nodes_mutex);
while (!updated_processors.empty() || !updated_edges.empty())
{
// 处理 update_edges 中 edge 指向的 node.
if (updated_processors.empty()){...}
// 处理 update_processors 中 node,更新 node 状态.
if (!updated_processors.empty()){...}
}
return true;
}
从上面这部分代码可以看出,ExecutingGraph::updateNode()
方法首先会创建两个栈updated_edges
、updated_processors
分别保存待更新Edge
或者Node
。然后将该Node
放入update_processors
这个栈当中,接着会进入到一个循环去处理上面两个栈里的东西,直到两个栈的东西都被处理完。
ok,我们还是以逻辑梳理中的ExcutingGraph
举例,在没有执行ExecutingGraph::initializeExecution()
初始化节点之前,整个ExcutingGraph
状态如下:
为了简洁,我给每个节点起了个编号,后续 Processor 直接用Node_n
来表示。
由于Node_8
是没有direct_edges
,所以会被放入栈中调用updateNode()
更新节点状态,并且会将Node_8
放入updated_processors
中,执行if (!updated_processors.empty()){...}
代码块:
if (!updated_processors.empty())
{
pid = updated_processors.top();
updated_processors.pop();
/// In this method we have ownership on node.
auto & node = *nodes[pid];
{
try
{
auto & processor = *node.processor;
IProcessor::Status last_status = node.last_processor_status;
// 1.调用 Node 对应 IProcessor::prepare() 方法尝试 pull 数据.
IProcessor::Status status = processor.prepare(node.updated_input_ports, node.updated_output_ports);
// 记录上一个 Processor 的 status.
node.last_processor_status = status;
}
switch (node.last_processor_status)
{
// 2.NeedData 将 Node.status 从 Preparing -> Idle.
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
node.status = ExecutingGraph::ExecStatus::Idle;
break;
}
...
}
// 3.将 Node 相邻的待更新 Edge 放入 update_edges 这个栈中.
if (!need_expand_pipeline)
{
for (auto it = node.post_updated_output_ports.rbegin(); it != node.post_updated_output_ports.rend(); ++it)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(*it);
updated_edges.push(edge);
edge->update_info.trigger();
}
for (auto it = node.post_updated_input_ports.rbegin(); it != node.post_updated_input_ports.rend(); ++it)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(*it);
updated_edges.push(edge);
edge->update_info.trigger();
}
node.post_updated_input_ports.clear();
node.post_updated_output_ports.clear();
}
}
}
对于Node_8
来说,在上面代码中干了两个事情:
IProcessor::prepare()
尝试拉取数据,并更新节点状态;Node_8
相邻的待更新的Edge
放入updated_edges
中;那么我们先简单看一下LazyOutputFormat
这个 Processor 的prepare()
,IOutputFormat::prepare()
源码:
IOutputFormat::Status IOutputFormat::prepare()
{
// has_input 标识已经拉取到数据.
if (has_input)
return Status::Ready;
// 默认就是 Main kind, Totals/Extremes 暂时不用管.
for (auto kind : {Main, Totals, Extremes})
{
auto & input = getPort(kind);
if (kind != Main && !input.isConnected())
continue;
if (input.isFinished())
continue;
// 更新相邻的 Edge,分别放入
// post_updated_input_ports、post_updated_output_ports
input.setNeeded();
// 查看 state 是否有数据,如果没有则返回 NeedData.
if (!input.hasData())
return Status::NeedData;
// 如果 state 已经有数据,那么调用 pullData() 方法
// 将数据拉取到 current_chunk 中,并将 has_input 设为 true.
current_chunk = input.pull(true);
current_block_kind = kind;
has_input = true;
return Status::Ready;
}
finished = true;
if (!finalized)
return Status::Ready;
return Status::Finished;
}
Node_7
也没有 Push 数据到 state 中,所以Node_8
拉取不到数据就返回Status::NeedData
。
Node_8
的prepare()
执行完之后,接着往下走,它需要将相邻的待更新的Edge
放入updated_edges
栈中。
也就是将Node_8
与Node_7
的回边放入栈中,接下来就来到了 while 循环中处理update_edges
的方法,执行if (updated_processors.empty()){...}
代码块:
if (updated_processors.empty())
{
auto * edge = updated_edges.top();
updated_edges.pop();
/// Here we have ownership on edge, but node can be concurrently accessed.
auto & node = *nodes[edge->to];
std::unique_lock lock(node.status_mutex);
ExecutingGraph::ExecStatus status = node.status;
if (status != ExecutingGraph::ExecStatus::Finished)
{
if (edge->backward)
node.updated_output_ports.push_back(edge->output_port_number);
else
node.updated_input_ports.push_back(edge->input_port_number);
if (status == ExecutingGraph::ExecStatus::Idle)
{
node.status = ExecutingGraph::ExecStatus::Preparing;
updated_processors.push(edge->to);
stack_top_lock = std::move(lock);
}
else
nodes[edge->to]->processor->onUpdatePorts();
}
}
上面代码简单来说,就是将Edge
所指向的Node
放入update_processors
栈中,也就是将Node_7
放入栈。
到这里一次循环就算完成了,从Node_7
开始又会和Node_8
执行相似的代码逻辑,大概就是调用Node_7
的Processor::prepare()
尝试更新从 state 拉取数据,如果 state 数据没有准备好,就将Processor.status
更新为NeedData
,紧接着将Node_7
相邻Edge
放入update_edges
栈中,然后又将Node_7
的Edge
指向的Node
放入update_processors
栈中,重复此循环。
整个流程大致如下:
在处理 Node_2 Joining 节点时,在
JoiningTransform::prepare()
方法中,如果它的 inputs 数量大于 1,则会优先处理inputs.back()
节点,也就是 Node_5。
直到执行到Node_3
,由于Node_3
是一个ISource
没有InputPort
,所以它不需要向 state 拉取数据(它自己会从存储层读取数据),在Node_3
会调用ISource::prepare()
,代码如下:
ISource::Status ISource::prepare()
{
if (finished)
{
output.finish();
return Status::Finished;
}
/// Check can output.
if (output.isFinished())
return Status::Finished;
if (!output.canPush())
return Status::PortFull;
// ISource 没有 input,所以会直接返回 Ready.
if (!has_input)
return Status::Ready;
output.pushData(std::move(current_chunk));
has_input = false;
if (isCancelled())
{
output.finish();
return Status::Finished;
}
if (got_exception)
{
finished = true;
output.finish();
return Status::Finished;
}
/// Now, we pushed to output, and it must be full.
return Status::PortFull;
}
在上面代码中,由于ISource
没有 input,则直接返回Status::Ready
。从prepare()
方法返回,接着往下走进入新的分支,代码如下:
switch (node.last_processor_status)
{
case IProcessor::Status::Ready:
{
node.status = ExecutingGraph::ExecStatus::Executing;
queue.push(&node);
break;
}
}
将Node_3
放入最开始传入的queue
队列中,至此update_processors
和update_edges
两个栈都没有东西了,ExecutingGraph
初始化完成,从ExecutingGraph::updateNode()
方法返回,重新回到ExecutingGraph::initializeExecution()
方法。那么现在ExecutingGraph
初始化完成的节点状态如下:
到这里ExecutingGraph
的初始化就完成了,由于本篇过长,就将调度放在下一篇来讲吧。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有