基于 ClickHouse version 22.10.1 学习并写下 ClickHouse 源码解析系列。由于 CK 版本迭代很快,可能导致代码可能有些出入,但是整体流程大差不差吧。由于源码阅读过于枯燥,并且不太利于后续复习,所以在逻辑梳理时,我会使用思维导图或者流程图的方式来描述类与类之间的调用逻辑,后半部分会挑出核心的源码去分析。
上一篇ClickHouse 源码解析(二):SQL 的一生(中)在源码解析部分分析了 ExecutingGraph 的初始化流程,并且在文末画了调度节点状态轮转图,可以直观的看到一个节点是如何从 Read 转为 Finished 状态。那么本篇我们紧接着从源码的角度去学习 ExecutingGraph 是如何被调度的,节点状态是如何在其中轮转的。
为了方便复习,先挂上之前画的思维导图。ClickHouse 源码解析(一):SQL 的一生(上)
我们聚焦于执行 Pipeline 这部分,学习 SelectQuery 的调度流程。
我们先来看一下根据 QueryPlan 构建完成的 QueryPipeline 是个什么样子的,这里举例一个带 Join 查询的 SelectQuery:
SELECT * FROM customer AS c
INNER JOIN order AS o ON c.C_CUSTKEY = o.C_CUSTKEY;
通过EXPLAIN PIPELINE可以查看这条 SQL 所构建的 Pipeline,如下所示:
比如,(ReadFromMergeTree)
就是 QueryPlan 中的一个算子,MergeTreeInOrder
则代表实现这个算子向 Pipeline 中添加的 Processor,Processor 后面的数字0 -> 1
表示InputPort
数量为 0,OutputPort
数量为 1,如果 Processor 后面没有数字则默认InputPort
和OutputPort
数量都是 1。酱紫看可能有些抽象,我们来看看下面这个图,这张图画的是 SQL 的 QueryPlan:
然后我们看看根据这个 QueryPlan 构建出来的 QueryPipeline 的样子:
注意:
Source
只有OutputPort
,Sink
只有InputPort
。
InputPort
和OutputPort
是Port
的子类,在Port
中有个成员变量std::shared_ptr<State> state
它保存了两个连接Port
的共享数据:
Port.h
源码(省略部分代码):
class Port
{
protected:
/// Shared state of two connected ports.
class State
{
public:
struct Data
{
/// Note: std::variant can be used. But move constructor for it can't be inlined.
Chunk chunk;
std::exception_ptr exception;
};
private:
std::atomic<Data *> data;
}
}
Processor
之间传递数据就是通过这个State
来实现的,就像下面这个样子:
Processor 在执行完成后,会将数据 Push 到ShareState
中,下一个 Processor 在执行时,会从ShareState
将数据 Pull 出来进行处理,处理完之后又 Push 下一个 Prot 中,整个 Pipeline 数据就是这样流动起来的。
个人理解,Pipeline 只是作为数据流通的数据结构,ExecutingGraph
记录了 Processor 的执行状态,它对整个 Graph 进行调度处理。ExecutingGraph
大致结构如下:
/// Graph of executing pipeline.
class ExecutingGraph
{
public:
/// Edge represents connection between OutputPort and InputPort.
/// For every connection, two edges are created: direct and backward (it is specified by backward flag).
struct Edge
{
/// Port numbers. They are same for direct and backward edges.
uint64_t input_port_number;
uint64_t output_port_number;
/// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details.
/// To compare version with prev_version we can decide if neighbour processor need to be prepared.
Port::UpdateInfo update_info;
};
/// Graph node. Represents single Processor.
struct Node
{
/// Direct edges are for output ports, back edges are for input ports.
Edges direct_edges;
Edges back_edges;
};
Nodes nodes;
};
简单来说,ExcutingGraph
由多个Node
组成,每个Node
相当于 Processor,Node
之间通过Edge
连接,Node
中记录了direct_edges
、back_edges
分别对应OutputPort
、InputPort
。我们将上面得到的 QueryPipeline 转换为ExecutingGraph
大致如下图所示:
LimitsCheckingTransform
和LazyOutputFormat
是在 QueryPipeline 构建完成后添加的通用逻辑处理的 Processor,在后续调度中会用到。
既然ExcutingGraph
构建完成,终于要开始调度执行了。那么在调度之前,需要给最先接受处理的Node
初始化状态,CK 中 Processor 定义了如下六种状态:
enum class Status
{
/// Processor needs some data at its inputs to proceed.
/// You need to run another processor to generate required input and then call 'prepare' again.
NeedData,
/// Processor cannot proceed because output port is full or not isNeeded().
/// You need to transfer data from output port to the input port of another processor and then call 'prepare' again.
PortFull,
/// All work is done (all data is processed or all output are closed), nothing more to do.
Finished,
/// No one needs data on output ports.
/// Unneeded,
/// You may call 'work' method and processor will do some work synchronously.
Ready,
/// You may call 'schedule' method and processor will return descriptor.
/// You need to poll this descriptor and call work() afterwards.
Async,
/// Processor wants to add other processors to pipeline.
/// New processors must be obtained by expandPipeline() call.
ExpandPipeline,
};
ExcutingGraph
在初始化节点,各个节点状态如下图所示:
处于Ready
状态的Node
,会调用当前 Processor 的work()
方法处理数据,执行完成之后,MergeTree
调用prepare()
方法将数据 Push 到OutputPort
,节点状态如下:
紧接着,Expression
调用prepare()
方法将 Pull 数据并且进入Ready
状态,节点状态如下:
后续的节点还是以上面的方式,调用prepare()
和work()
方法更新Node状态,完成整个ExcutingGraph
调度逻辑。在文章末尾画了一部分算子较为完整的调度状态轮转图。
沿用之前的例子,那么ExecutingGraph
初始化完成之后的节点状态如下:
完成ExecutingGraph
初始化,重新回到PipelineExecutor::initializeExecution()
方法,代码如下:
void PipelineExecutor::initializeExecution(size_t num_threads)
{
...
Queue queue;
// 初始化 ExecutingGraph.
graph->initializeExecution(queue);
// 初始化 ExecutorTasks.
tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get());
// 将状态为 Ready 的 node,放入 ExecutorTasks 的 task_queue 中,等待调度.
tasks.fill(queue);
std::unique_lock lock{threads_mutex};
threads.reserve(num_threads);
}
ExecutorTasks
用于PipelineExecutor
,管理着已经准备好待执行的任务。ExecutorTasks
大概的结构:
/// Manage tasks which are ready for execution. Used in PipelineExecutor.
class ExecutorTasks
{
/// Contexts for every executing thread.
std::vector<std::unique_ptr<ExecutionThreadContext>> executor_contexts;
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
TaskQueue<ExecutingGraph::Node> task_queue;
/// Queue which stores tasks where processors returned Async status after prepare.
/// If multiple threads are using, main thread will wait for async tasks.
/// For single thread, will wait for async tasks only when task_queue is empty.
PollingQueue async_task_queue;
/// Maximum amount of threads. Constant after initialization, based on `max_threads` setting.
size_t num_threads = 0;
/// Started thread count (allocated by `ConcurrencyControl`). Can increase during execution up to `num_threads`.
size_t use_threads = 0;
/// This is the total number of waited async tasks which are not executed yet.
/// sum(executor_contexts[i].async_tasks.size())
size_t num_waiting_async_tasks = 0;
/// A set of currently waiting threads.
ThreadsQueue threads_queue;
};
ExecutorTasks
使用task_queue
保存了已经准备就绪的 processors,每个线程都可以并发地从task_queue
获取 task 去执行。
PipelineExecutor::executeImpl()
方法继续往下走,由于测试数据是小数据量,所以会使用单线程执行,来到PipelineExecutor::executeSingleThread()
方法,该方法会调用PipelineExecutor::executeStepImpl()
方法,这个方法是 ExecutingGraph 调度轮转的核心方法,代码如下:
void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yield_flag)
{
auto & context = tasks.getThreadContext(thread_num);
bool yield = false;
while (!tasks.isFinished() && !yield)
{
/// First, find any processor to execute.
/// Just traverse graph and prepare any processor.
while (!tasks.isFinished() && !context.hasTask())
// 1.尝试为当前线程获取 task,并放入 context.
tasks.tryGetTask(context);
while (context.hasTask() && !yield)
{
if (tasks.isFinished())
break;
// 2.线程执行 task.
if (!context.executeTask())
cancel();
if (tasks.isFinished())
break;
if (!checkTimeLimitSoft())
break;
/// Try to execute neighbour processor.
{
Queue queue;
Queue async_queue;
/// Prepare processor after execution.
// 3.更新已经执行节点相邻的节点状态.
if (!graph->updateNode(context.getProcessorID(), queue, async_queue))
finish();
/// Push other tasks to global queue.
// 4.将新准备好的 task 放入全局队列.
tasks.pushTasks(queue, async_queue, context);
}
/// Upscale if possible.
spawnThreads();
/// We have executed single processor. Check if we need to yield execution.
if (yield_flag && *yield_flag)
yield = true;
}
}
}
这个函数就是线程调度的循环体,执行 Processor 并更新相邻的节点状态(updataNode()
方法上一篇已经讲过了,不再赘述)。我们需要关注context.executeTask()
方法,在ExecutionThreadContext::executeTask()
函数会调executeJob(node,read_progress_callback)
函数,代码如下:
static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_progress_callback)
{
try
{
node->processor->work();
/// Update read progress only for source nodes.
bool is_source = node->back_edges.empty();
if (is_source && read_progress_callback)
{
if (auto read_progress = node->processor->getReadProgress())
{
if (read_progress->counters.total_rows_approx)
read_progress_callback->addTotalRowsApprox(read_progress->counters.total_rows_approx);
if (!read_progress_callback->onProgress(read_progress->counters.read_rows, read_progress->counters.read_bytes, read_progress->limits))
node->processor->cancel();
}
}
}
catch (Exception & exception)
{
if (checkCanAddAdditionalInfoToException(exception))
exception.addMessage("While executing " + node->processor->getName());
throw;
}
}
在这个函数中,会调用 node 中对应的IProcessor::work()
方法,work()方法就是算子处理数据、传递数据的关键方法。在执行算子之后,会判断是否是 Source 算子,如果是数据源算子则会调用 progress_callback 回复进度信息。
以第一个算子 MergeTree 举例,它是一个 ISource 继承自 IProcessor,所以会调用ISource::work()
函数,代码如下:
void ISource::work()
{
try
{
read_progress_was_set = false;
if (auto chunk = tryGenerate())
{
current_chunk.chunk = std::move(*chunk);
if (current_chunk.chunk)
{
has_input = true;
if (auto_progress && !read_progress_was_set)
progress(current_chunk.chunk.getNumRows(), current_chunk.chunk.bytes());
}
}
else
finished = true;
if (isCancelled())
finished = true;
}
catch (...)
{
finished = true;
throw;
}
}
存储层通过继承 ISource 类,实现tryGenerate()
和generate()
来生成数据并返回 chunk。至此 Pipeline 中第一个算子就会执行完成,然后将生成的数据通过 Pipeline 传递给下一个算子,直至所有算子处理完成数据并返回,这样子一个 ExecutingGraph 调度也就完成啦。
下一篇将深入学习执行层是如何与存储层之间建立桥梁的,存储层是如何处理传递数据的。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。