目录
前文最终我们提到了如下代码就是调用引擎来进行反向传播,其中:
(grad_fn_, 0)
)的 vector,也就是edge_list。 Engine::execute(roots, inputs, keep_graph, create_graph, accumulate_grad, outputs);
结合Engine定义,我们可以一一把这些输入与 execute 的参数对应起来。
auto Engine::execute(const edge_list& roots, // 反向传播的根节点
const variable_list& inputs, // 根节点的梯度
bool keep_graph, // 计算图是否需要保留
bool create_graph, // 是否需要构建微分图以进行高阶求导
bool accumulate_grad,
const edge_list& outputs // 需要输出梯度的节点
)
所以本文我们首先从静态角度来看引擎,就是看看其数据结构和静态性质。
系列前几篇链接如下:
[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)
[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)
[源码解析] PyTorch如何实现前向传播(3) --- 具体实现
[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎
Engine 是autograd的核心,其实现了后向传播。后向传播方向是从根节点(就是正向传播的输出)到输出(就是正向传播的输入),在后向传播过程之中依据前向传播过程中设置的依赖关系生成了动态计算图。
Engine 入口 是execute函数,其逻辑如下:
引擎定义在:torch/csrc/autograd/engine.cpp,这里只给出成员变量,最主要的变量是:
cpu_ready_queue_
,用户可以向这些 cpu_ready_queue_
发送待处理的工作。具体代码是:
// A single instance of this struct should be created through the whole process lifetime.
// The worker thread creation logic and Engine's destructor rely on this.
struct TORCH_API Engine {
// Ensures device_ready_queues_ are initialized only once
std::once_flag start_device_threads_flag_;
// Safe to read device_ready_queues_ without synchronization after initialization
std::vector<std::shared_ptr<ReadyQueue>> device_ready_queues_;
std::vector<std::function<void()>> final_callbacks_;
// To protect reads and writes to final_callbacks_
std::mutex post_callbacks_lock_;
// How many nested reentrant calls are allowed until a new thread is used
int max_recursion_depth_;
struct ThreadPoolShared {
// Data structures used by the threads for executing reentrant backwards
// tasks. See Note [Reentrant backwards]
// Number of available threads for processing new GraphTasks.
unsigned int num_workers_;
// The threads will wait on work_ to be notified of GraphTasks
std::condition_variable work_;
// To protect reads and writes to graphtask_queue_ and num_workers_
// and for synchronizing creating new threads when needed
std::mutex mutex_;
// Workers will process the GraphTasks added to this queue. A GraphTask is
// allocated inside Engine::execute and lives for the duration of execute
std::queue<std::weak_ptr<GraphTask>> graphtasks_queue_;
ThreadPoolShared() : num_workers_(0) {}
};
// Temporary workaround until shutting down threads is done
// We need shared ownership of all these objects because the threads are leaked
// when Engine shuts down, so there may be threads waiting on work_
// for the graphtasks_queue_ to be nonempty.
std::shared_ptr<ThreadPoolShared> thread_pool_shared_;
private:
// Number of non-reentrant threads
std::atomic<uint32_t> non_reentrant_device_thread_count_;
// Destructor will wait for non-reentrant threads to finish
std::condition_variable non_reentrant_device_thread_condvar_;
std::mutex non_reentrant_device_thread_mutex_;
// stop() must be called before the destruction path goes down to the base
// class, in order to avoid a data-race-on-vptr. Use this boolean to guard
// whether stop() has already been called, so we can call this in every
// destructor of the class hierarchy.
bool stopped_{false};
};
我们接下来就先介绍各种基础类,每个类我们力争结合其使用代码来分析。
GraphRoot 是一个Node类型,Node其实就是原来的Function类。
struct TORCH_API GraphRoot : public Node {
GraphRoot(edge_list functions, variable_list inputs)
: Node(std::move(functions)),
outputs(std::move(inputs)) { // 把输入的 input 配置给 outputs 成员变量。
// Ensures calls to stream() on a GraphRoot instance reflect current stream(s)
// on devices of root grad tensors at the time the instance is constructed.
for (const auto& t : outputs) {
add_input_metadata(t);
}
}
variable_list apply(variable_list&& inputs) override {
return outputs; // apply 方法仅仅返回它的输入,就是梯度。Node 的其他派生类会有自己不同的实现。
}
variable_list outputs; // 梯度。其只是通过 apply() 来进行使用,就是 apply 方法返回这个outputs。
};
struct TORCH_API Identity : public Node {
variable_list apply(variable_list&& inputs) override;
};
在 engine 之中,是用如下代码构建 GraphRoot。结合 execute 的调用方式,我们知道是使用 反向传播的根节点(起始点)和 根节点的梯度 inputs 来构建 GraphRoot。
// If we receive a single root, skip creating extra root node
bool skip_dummy_node = roots.size() == 1;
auto graph_root = skip_dummy_node ?
roots.at(0).function :
std::make_shared<GraphRoot>(roots, inputs);
我们再回忆一下 GraphRoot 之中的 Node这个基类被如何构建。可以看到 GraphRoot 就是使用边列表构建了基类 Node,反向传播的根节点 roots 就是 GraphRoot(Node)相关联的边,然后 GraphRoot 本身新增了成员变量 variable_list outputs(就是输入 input 参数)。
explicit Node(edge_list&& next_edges = edge_list())
: Node(/*sequence_nr=*/at::sequence_number::get_and_increment(),
std::move(next_edges)) {}
具体如下:
+------------------------------------+
| GraphRoot |
| |
| variable_list outputs +---------------> inputs 梯度,被透传给下游
| |
| |
| +----------------------------+ |
| | Node | |
| | | |
| | | |
| | edge_list next_edges_ +-----------> roots 起始点
| | | |
| +----------------------------+ |
| |
+------------------------------------+
GraphRoot 的作用是:
// If we receive a single root, skip creating extra root node
bool skip_dummy_node = roots.size() == 1;
auto graph_root = skip_dummy_node ?
roots.at(0).function : // 如果只有一个root,就直接使用root作为 GraphRoot
std::make_shared<GraphRoot>(roots, inputs); // 如果多个root,就构造一个GraphRoot
auto min_topo_nr = compute_min_topological_nr(outputs);
// Now compute the dependencies for all executable functions
compute_dependencies(graph_root.get(), *graph_task, min_topo_nr);
我们先给出一个基本概念。GraphTask 实例代表一个动态图级别的资源管理对象,其拥有一次反向传播执行所需要的全部元数据,比如计算图中所有Node的依赖关系,还没有准备好Node的等待队列等等。如果允许重入反向传播,则会有多个GraphTask一起工作。
GraphTask 其主要成员变量如下:
cpu_ready_queue_
,CPU相关任务应该将发送到该队列。GraphTask
,我们维护cpu_ready_queue_
,这样在设备线程(即GPU)上执行时,如果是下一个NodeTask 应该在CPU上运行,我们就知道应该推送 NodeTask 到哪个就绪队列。not_ready_, dependencies_, captured_vars,has_error_, future_result_, cpu_ready_queue_, and leaf_streams
。具体定义如下,这里只给出成员变量:
// GraphTask holds metadata needed for a single execution of backward()
struct GraphTask: std::enable_shared_from_this<GraphTask> {
std::atomic<uint64_t> outstanding_tasks_{0};
// Indicates if an error occurred while executing any task. When this is
// true, it signals all threads to stop executing.
std::atomic_bool has_error_{false};
std::atomic_bool future_completed_{false};
// It is safe to read grad_mode_ and keep_graph_ without synchronization
bool keep_graph_;
bool grad_mode_;
// To protect reads/writes to not_ready_, dependencies_, captured_vars_,
// has_error_, future_result_, cpu_ready_queue_, and leaf_streams.
std::mutex mutex_;
std::unordered_map<Node*, InputBuffer> not_ready_;
std::unordered_map<Node*, int> dependencies_;
struct ExecInfo {
struct Capture {
Capture(const Capture&) = delete;
Capture(Capture&&) = default;
Capture(int input_idx, int output_idx)
: input_idx_(input_idx), output_idx_(output_idx) {}
int input_idx_; // within Node inputs
int output_idx_; // within the output vector of a GraphTask
// This hook will be executed after a grad is captured. The captured
// grad will be replaced by the return value of the hook.
struct GradCaptureHook {
virtual ~GradCaptureHook() = default;
virtual at::Tensor operator()(const at::Tensor& grad) = 0;
};
// The hooks will be called one by one in the order as they were added.
// The input grad of a hook will be the output of its preceding hook. The
// first hook will take the captured grad as the input. The output of the
// last hook will replace the captured grad.
std::vector<std::unique_ptr<GradCaptureHook>> hooks_;
};
bool should_execute() const {
return needed_ || captures_;
}
bool needed_ = false;
std::unique_ptr<std::vector<Capture>> captures_;
};
// Exec info has a bit complicated semantics. If it's empty, it means the task
// is run in a "default" mode, which means that all next_edges we encounter
// should get executed. If it's not empty, only functions that have an entry
// and this entry has needed == True should be executed. exec_info is only empty
// when the graph is executed via .backward() and the inputs parameter is not passed.
// Otherwise, when executed through .grad(), or when inputs arg is specified for
// .backward(), exec_info will be non-empty.
//
// exec_info_ is safe to read without synchronization
std::unordered_map<Node*, ExecInfo> exec_info_;
// Captures variables are grads captured that we return to the user. After
// execution of the GraphTask is completed, the captured_vars_ are moved
// out of the GraphTask and are no longer valid.
std::vector<Variable> captured_vars_;
at::ThreadLocalState thread_locals_ =
at::ThreadLocalState(/* keep_grad_mode */ false);
std::unordered_set<c10::Stream> leaf_streams;
// The value of worker_device in the thread that created this task.
// See Note [Reentrant backwards]
// Safe to read owner_ and reentrant_depth_ without synchronizaton
int owner_;
// The number of parent graph tasks for this graph task
const int reentrant_depth_;
// Whether or not to stop execution for this GraphTask when an error is
// encountered. When set to true, this would cause Engine::execute() to throw
// an exception as soon as the autograd engine receives an exception.
bool exit_on_error_;
// CPU threads are dedicated to processing CPU work for the backward they invoked.
// So any given graph task maintains its own cpu_ready_queue_ where you should send
// work for it to be done. We memoize the cpu_ready_queue_ per GraphTask so that
// we know which ready queue we should push to if we are on device thread (i.e. GPU)
// and but next NodeTask should be run on CPU.
std::shared_ptr<ReadyQueue> cpu_ready_queue_;
// Future representing the completion of the graph task. Notified when all
// tasks are done.
std::shared_ptr<at::ivalue::Future> future_result_;
// Final callbacks installed during execution of this GraphTask
std::vector<std::function<void()>> final_callbacks_;
// To protect reads and writes to final_callbacks_. Intentionally no reusing
// mutex_ as the two are protecting different data structures.
std::mutex final_callbacks_lock_;
};
我们接下来看看一些重要成员变量。
是待处理 NodeTask的数量,用来判断该GrapTask是否还需要执行,其数值总是先加再减,如果数目为0,则说明任务结束了。
以下代码用来判断GraphTask是否结束。
bool GraphTask::completed() {
return outstanding_tasks_.load() == 0 ||
(exit_on_error_ && has_error_.load());
}
NodeTask任务增加时 outstanding_tasks_ 就加一。即,往某一个 ReadyQueue 之中插入一个 NodeTask 时候, NodeTask 对应的GraphTask 就会把其 outstanding_tasks_ 增加一。
auto ReadyQueue::push(NodeTask item, bool incrementOutstandingTasks) -> void {
{
// Lock mutex for writing to heap_
std::lock_guard<std::mutex> lock(mutex_);
if (incrementOutstandingTasks) {
std::shared_ptr<GraphTask> graph_task = item.base_.lock();
++graph_task->outstanding_tasks_; // 增加
}
heap_.push(std::move(item));
}
not_empty_.notify_one();
}
NodeTask 任务结束时候就减一,我们用简化代码看看。
auto Engine::thread_main(const std::shared_ptr<GraphTask>& graph_task) -> void {
while (graph_task == nullptr || !graph_task->future_result_->completed()) { //运行 GraphTask
std::shared_ptr<GraphTask> local_graph_task;
{
NodeTask task = local_ready_queue->pop();
if (task.fn_ && !local_graph_task->has_error_.load()) {
// 运行 NodeTask
evaluate_function(local_graph_task, task.fn_.get(), task.inputs_, local_graph_task->cpu_ready_queue_);
}
}
// Decrement the outstanding tasks.
--local_graph_task->outstanding_tasks_; // 运行 NodeTask完毕,这里减一
// Check if we've completed execution.
if (local_graph_task->completed()) { // 判断 GraphTask是否结束。
// 做相关处理工作
}
}
}
keep_graph 用来指定一次反向计算后是否释放资源。资源就是在前向过程中建立起来的资源。keep_graph如果是False的话,则会在 fn 执行完毕后调用 fn 的 will_release_variables 方法来释放该资源。
当执行反向传播时候,在 void Engine::evaluate_function 会调用
auto outputs = call_function(graph_task, func, inputs);
在 call_function 之中,如果发现不需要保持图,就释放资源。
static variable_list call_function(
std::shared_ptr<GraphTask>& graph_task,
Node* func,
InputBuffer& inputBuffer) {
CheckpointValidGuard cpvguard(graph_task);
auto& fn = *func;
auto inputs =
call_pre_hooks(fn, InputBuffer::variables(std::move(inputBuffer)));
if (!graph_task->keep_graph_) {
fn.will_release_variables(); // 如果不需要保持图,就调用释放。
}
// 省略其他
}
dependencies 用来判断后续节点是否已经可以被执行,其类型如下:
std::unordered_map<Node*, int> dependencies_;
dependencies成员在compute_dependencies调用中被初始化,只要一个grad_fn函数在别人的next_edges()中出现过一次,那么dependencies[this_grad_fn] 就自增1。如果dependencies[this_grad_fn]大于0,说明this_grad_fn有一个后向传播的依赖,即this_grad_fn需要等被依赖者完成,才能进行反向传播。
比如如下计算图:
# MulBackward0 被 SubBackward0 的next_edges引用 1 次,即 MulBackward0 需要等 SubBackward0 反向计算完成之后,才能进行自己的反向传播
dependencies[MulBackward0] = 1
#PowBackward0-1 被 MulBackward0 的next_edges用1次
dependencies[PowBackward0-1] = 1
#PowBackward0-2 被 MulBackward0 的next_edges用1次
dependencies[PowBackward0-2] = 1
我们结合具体代码(删除无关代码)看看。
void Engine::evaluate_function(
std::shared_ptr<GraphTask>& graph_task,
Node* func,
InputBuffer& inputs,
const std::shared_ptr<ReadyQueue>& cpu_ready_queue) {
// 执行后向计算
auto outputs = call_function(graph_task, func, inputs);
std::lock_guard<std::mutex> lock(graph_task->mutex_);
for (int i = 0; i < num_outputs; ++i) { // 遍历自己的输出
auto& output = outputs[i];
const auto& next = fn.next_edge(i); // 找到第i个输出
// Check if the next function is ready to be computed
bool is_ready = false;
// 得到依赖关系
auto& dependencies = graph_task->dependencies_;
auto it = dependencies.find(next.function.get()); // 找到第i个输出的依赖关系
if (it == dependencies.end()) {
auto name = next.function->name();
throw std::runtime_error(std::string("dependency not found for ") + name);
} else if (--it->second == 0) { // 因为本节点的后向计算已经完成,所以第i个输出的依赖数目减一
dependencies.erase(it); // 如果为0,说明没有依赖了,就从依赖关系之中删除
is_ready = true; // true 代表没有依赖关系,可以构建一个 NodeTask 进行下一步反向计算了
}
}
}
用来暂存未就绪的function及其输入,类型如下:
std::unordered_map<Node*, InputBuffer> not_ready_;
not_ready_ 是针对未就绪节点和其输入的map,假设某节点 A 在反向传播路径上有两个输入,当第一个输入完成时候,因为第二个输入没有完成反向计算,所以需要有一个地方暂存这个 A 和 其第一个输入以备后续处理。not_ready_ 就是用来做这个的。
not_ready_ 的 key 是未就绪节点,value 是这个节点目前就绪的输入列表。
(节点 A, A 的输入信息 )
放入 not_ready_ 这里,得到 (节点 A, [A 的输入信息 1 ] )
(节点 A, [A 的输入信息 1,A的输入信息 2 ] )
我们结合代码看看。
auto& not_ready = graph_task->not_ready_;
auto not_ready_it = not_ready.find(next.function.get());
if (not_ready_it == not_ready.end()) { // 如果未就绪队列之中没有next节点
// Skip functions that aren't supposed to be executed
if (!exec_info_.empty()) {
auto it = exec_info_.find(next.function.get());
if (it == exec_info_.end() || !it->second.should_execute()) {
continue;
}
}
// No buffers have been allocated for the function
InputBuffer input_buffer(next.function->num_inputs()); // 整理 next 节点的输入参数信息
// Accumulates into buffer
const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA);
input_buffer.add(next.input_nr, // 插入 next 节点的输入参数信息
std::move(output),
opt_parent_stream,
opt_next_stream);
if (is_ready) { // is_ready 是前面小节之中,通过依赖关系计算出来的,true表示可以进行反向计算了
auto queue = ready_queue(cpu_ready_queue, input_buffer.device());
queue->push(
NodeTask(graph_task, next.function, std::move(input_buffer)));
} else {
// 还有依赖关系,不能进行反向计算,只能放入未就绪队列 not_ready_
not_ready.emplace(next.function.get(), std::move(input_buffer));
}
} else { // 如果未就绪队列之中已经有next节点
// The function already has a buffer
auto &input_buffer = not_ready_it->second;
// Accumulates into buffer
const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA);
input_buffer.add(next.input_nr, // 把最新完成反向计算的输入插入输入buffer input_buffer
std::move(output),
opt_parent_stream,
opt_next_stream);
if (is_ready) { // 如果可以计算,就放入ready 队列
auto queue = ready_queue(cpu_ready_queue, input_buffer.device());
queue->push(
NodeTask(graph_task, next.function, std::move(input_buffer)));
not_ready.erase(not_ready_it); // 同时从未就绪队列之中移除
}
}
ExecInfo 主要作用就是判断是否需要执行,并且注册了一个hook,用来在计算梯度时候做调用。
定义如下:
struct ExecInfo {
struct Capture {
Capture(const Capture&) = delete;
Capture(Capture&&) = default;
Capture(int input_idx, int output_idx)
: input_idx_(input_idx), output_idx_(output_idx) {}
int input_idx_; // within Node inputs
int output_idx_; // within the output vector of a GraphTask
// This hook will be executed after a grad is captured. The captured
// grad will be replaced by the return value of the hook.
struct GradCaptureHook {
virtual ~GradCaptureHook() = default;
virtual at::Tensor operator()(const at::Tensor& grad) = 0;
};
// The hooks will be called one by one in the order as they were added.
// The input grad of a hook will be the output of its preceding hook. The
// first hook will take the captured grad as the input. The output of the
// last hook will replace the captured grad.
std::vector<std::unique_ptr<GradCaptureHook>> hooks_;
};
bool should_execute() const {
return needed_ || captures_;
}
bool needed_ = false;
std::unique_ptr<std::vector<Capture>> captures_;
};
在引擎之中生成如下成员变量。
// Exec info has a bit complicated semantics. If it's empty, it means the task
// is run in a "default" mode, which means that all next_edges we encounter
// should get executed. If it's not empty, only functions that have an entry
// and this entry has needed == True should be executed. exec_info is only empty
// when the graph is executed via .backward() and the inputs parameter is not passed.
// Otherwise, when executed through .grad(), or when inputs arg is specified for
// .backward(), exec_info will be non-empty.
//
// exec_info_ is safe to read without synchronization
std::unordered_map<Node*, ExecInfo> exec_info_;
exec_info_ 的作用就是给 GraphTask 的每一个Node配置一个ExecInfo,就是执行信息。
exec_info_ 何时为空?何时非空?
所以,exec 和 captured_vars_ 就是针对 grad() 和指定参数的 backward(),就是标注在这种情况下需要计算哪些梯度。在这种情况下,只有某些节点需要执行,从这些节点开始,有一条路径通向 outpus
。
在 Engine::execute 之中会调用 init_to_execute 生成ExecInfo。
if (!outputs.empty()) {
graph_task->init_to_execute(*graph_root, outputs, accumulate_grad, min_topo_nr);
}
逻辑是:
Populates exec_info so nodes that should be executed have `exec_info[node].needed_ = true` Only nodes that have a path to any edge in `outputs` should be executed.The code below populates exec_info using recursion, but the actual code does this iteratively. Refer to the numbering to see how the actual code corresponds.A difference to note is that in the iterative version, when you are working with the current Node, you are reponsible to update your parent's is_needed after all your children have been updated.
从其注释可知,其作用是:填充exec_info,以便应执行的节点具有exec_info[node].needed_ = true
。
只具特定节点才应该执行,这些节点的性质是:节点拥有一条路径,这路径可以通往outputs
的任何一条边。
下面的代码使用递归填充exec_info,但实际代码以迭代方式执行此操作。关键代码如下,就是插入ExecInfo信息 exec_info_.emplace(stack.back().fn_, ExecInfo());
。具体删减版代码如下:
void GraphTask::init_to_execute(Node& graph_root, const edge_list& outputs, bool accumulate_grad, uint64_t min_topo_nr) {
// Populates exec_info so nodes that should be executed have `exec_info[node].needed_ = true`
// Only nodes that have a path to any edge in `outputs` should be executed.
// The code below populates exec_info using recursion, but the actual code does this
// iteratively. Refer to the numbering to see how the actual code corresponds.
// A difference to note is that in the iterative version, when you are working with
// the current Node, you are reponsible to update your parent's is_needed after all your
// children have been updated.
//
// is_needed = {fn: True for fn in outputs} # (0)
// seen = {}
// def compute_is_needed(fn):
// for next_edge in fn.next_edges:
// child_fn = next_edge.fn
// if child_fn in seen and is_needed[child_fn]: # (1)
// is_needed[fn] = true
// else:
// seen.add(child_fn)
// if compute_is_needed(child_fn):
// is_needed[fn] = true # (2)
// # (3) exit for-loop
// return is_needed[fn]
// compute_is_needed(graph_root)
//
// NB: you might be wondering why we don't populate `seen` with outputs. We cannot
// because in the case where two outputs lie on the same path, we still need to explore past
// the first output or we would miss the nodes that are required to compute the second output.
// 这一段就是针对 grad() API 进行处理,只有在所求梯度的张量路径上的其他张量才会被计算梯度
int output_idx = 0;
for (auto & output_edge : outputs) { // 遍历输出边
// (0) `is_needed` above corresponds to `exec_info_[fn].needed_`
Node *output = output_edge.function.get();
auto & info = exec_info_[output];
if (accumulate_grad) {
// if called through `.backward()` we directly set `needed_` for all the outputs to true
info.needed_ = true;
} else {
if (!info.captures_) {
info.captures_ = make_unique<std::vector<ExecInfo::Capture>>();
}
// 第 i 个输入对应的输出
info.captures_->emplace_back(output_edge.input_nr, output_idx++);
}
}
captured_vars_.resize(output_idx);
auto nodeShouldExecute = [this](Node *fn) {
auto it = exec_info_.find(fn);
return it != exec_info_.end() && it->second.should_execute();
};
std::vector<Frame> stack;
std::unordered_set<Node*> seen;
stack.emplace_back(&graph_root);
exec_info_.emplace(stack.back().fn_, ExecInfo()); // 这里会初始化 exec_info_,有多个 exec_info
while (!stack.empty()) {
auto &frame = stack.back();
const auto fn = frame.fn_;
Node *child_fn = nullptr;
while((child_fn = frame.get_next_fn()) && !seen.emplace(child_fn).second) {
// (1) next child exists AND has already been seen
if (nodeShouldExecute(child_fn)) {
exec_info_[fn].needed_ = true;
}
}
if (child_fn) {
// (2) next child exists but has not been seen
if (child_fn->topological_nr() < min_topo_nr) {
// child created before the first output means this child cannot have
// an edge to output
continue;
}
stack.emplace_back(child_fn);
} else {
// (3) no next child exists for `fn` means its `needed` has already been
// finalized. pop stack and update parent
stack.pop_back();
if (nodeShouldExecute(fn) && !stack.empty()) {
exec_info_[stack.back().fn_].needed_ = true;
}
}
}
}
其中,ExecInfo.Capture.GradCaptureHook 是要对梯度再做后续处理。
但是这个使用却是主要在分布式状态下,是因为分布式引擎有一个累积梯度的需要,这个必须在正常梯度操作之后的后置处理中完成。
在 DistEngine::computeDependencies 之中有添加操作:
// Create a dummy GraphRoot and run init_to_execute with it.
GraphRoot dummyRoot(edges, {});
graphTask->init_to_execute(dummyRoot, outputEdges, /*accumulate_grad=*/false, /*min_topo_nr=*/0);
for (auto& mapEntry : graphTask->exec_info_) {
auto& execInfo = mapEntry.second;
if (!execInfo.captures_) {
continue;
}
auto fn = mapEntry.first;
// There may be nodes other than 'AccumulateGrad', e.g. RecvRPCBackward,
// to be captured.
if (auto accumulateGradFn = dynamic_cast<AccumulateGrad*>(fn)) {
for (auto& capture : *execInfo.captures_) {
capture.hooks_.push_back( // 在这里添加 hook
std::make_unique<DistAccumulateGradCaptureHook>(
std::dynamic_pointer_cast<AccumulateGrad>(
accumulateGradFn->shared_from_this()),
autogradContext));
}
}
}
在 Engine::evaluate_function 之中有使用操作。
auto& exec_info_ = graph_task->exec_info_;
if (!exec_info_.empty()) {
auto& fn_info = exec_info_.at(func);
if (auto* capture_vec = fn_info.captures_.get()) {
// Lock mutex for writing to graph_task->captured_vars_.
std::lock_guard<std::mutex> lock(graph_task->mutex_);
for (const auto& capture : *capture_vec) {
// 获取到 captured_vars_,然后对其进行后置操作
auto& captured_grad = graph_task->captured_vars_[capture.output_idx_];
// 这里是引用操作,所以 captured_grad 的赋值实际就是往 graph_task->captured_vars_ 赋值
captured_grad = inputs[capture.input_idx_];
for (auto& hook : capture.hooks_) {
captured_grad = (*hook)(captured_grad); // 这里使用了 hook 进行后置操作
}
}
}
if (!fn_info.needed_) {
// Skip execution if we don't need to execute the function.
return;
}
}
上面提到了 captured_vars_,我们因此就一并分析。
Captures variables是我们返回给用户的捕获梯度。GraphTask执行完成后,Captures variables 将移出GraphTask,不再有效。
// Captures variables are grads captured that we return to the user. After
// execution of the GraphTask is completed, the captured_vars_ are moved
// out of the GraphTask and are no longer valid.
std::vector<Variable> captured_vars_;
这个 captured_vars_ 是可以进行后续处理,就是使用上面提到的GradCaptureHook 在 evaluate_function 进行处理,具体赋值也是在 evaluate_function 其中,参见前面代码之中的注释,我们后文详细对函数也会有分析。
// This hook will be executed after a grad is captured. The captured
// grad will be replaced by the return value of the hook.
引擎进行后向传播操作,最后返回给调用者(比如Python代码)的output结果就是 captured_vars_。
void GraphTask::mark_as_completed_and_run_post_processing() {
// Allow only one thread one attempt to process this logic.
if (future_completed_.exchange(true)) {
// Future is already marked complete, or being marked as such.
// In case the marking complete is only in progress, we add a
// wait() to guarantee the future is marked complete on exit.
future_result_->wait();
return;
}
try {
// Run post processing, before marking the future as complete.
// Drop lock prior to completing, to avoid holding across callbacks.
std::unique_lock<std::mutex> lock(mutex_);
exec_post_processing();
std::vector<Variable> vars = std::move(captured_vars_); //最后返回的输出
// Need to unlock before we call markCompleted to avoid holding locks
// when the callbacks are called.
lock.unlock();
// NOLINTNEXTLINE(performance-move-const-arg)
future_result_->markCompleted(std::move(vars)); // 反向传播最后的返回输出
} catch (std::exception& e) {
future_result_->setErrorIfNeeded(std::current_exception());
}
}
对于NodeTask,我们有一个疑问:为什么要再增加一个新类型?而不是继续使用 GraphTask。
因为 GraphTask 只是包括本计算图的总体信息,但是具体某一个节点如何计算梯度,GraphTask 是不知道的,所以引入了一个新类型 NodeTask 来处理。NodeTask 这个类的对象正是在queue中传输的东西,就是一个可以被执行的求导函数。从下面的定义可以看到,我们使用GraphTask、Node、InputBuffer来构建一个NodeTask实例,可以认为,生产者不停的向 ReadyQueue 插入 NodeTask,消费者则从 ReadyQueue 之中提取 NodeTask 进行处理。
NodeTask 定义如下:
struct NodeTask {
std::weak_ptr<GraphTask> base_; // 所属的GraphTask
std::shared_ptr<Node> fn_; // 需要执行的Node,比如 PowBackward0
// This buffer serves as an implicit "addition" node for all of the
// gradients flowing here. Once all the dependencies are finished, we
// use the contents of this buffer to run the function.
InputBuffer inputs_; // fn_的输入
// When worker receives a task with isShutdownTask = true, it will immediately
// exit. The engine sends a shutdown task to every queue upon its destruction.
bool isShutdownTask_;
int getReentrantDepth() const;
NodeTask(
std::weak_ptr<GraphTask> base,
std::shared_ptr<Node> fn,
InputBuffer inputs,
bool isShutdownTask = false)
: base_(base),
fn_(std::move(fn)),
inputs_(std::move(inputs)),
isShutdownTask_(isShutdownTask) {}
};
在主线程和工作线程之中都可以插入 NodeTask,我们逐一分析。
主线程有两种情况会产生 NodeTask。
// Now that all the non-thread safe fields of the graph_task have been populated,
// we can enqueue it.
// 主线程之中
queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));
// We set the worker_device to CPU_DEVICE only if worker_device was previously
// NO_DEVICE. Setting it to CPU afterwards allow us to detect whether this is
// a re-entrant call or not.
set_device(CPU_DEVICE);
// set the graph_task owner to the current device
graph_task->owner_ = worker_device;
// Now that all the non-thread safe fields of the graph_task have been populated,
// we can enqueue it.
queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));
graph_root 的初始化我们可以回忆一下:
auto graph_root = skip_dummy_node ?
roots.at(0).function : // 如果只有一个root,就直接使用root作为 GraphRoot
std::make_shared<GraphRoot>(roots, inputs); // 如果多个root,就构造一个GraphRoot
graph_root 由roots和inputs构建,roots就是最终输出节点的gradient_edge(),比如 [ (MulBackward0实例,0),(PowerBackward0, 0) ]。inputs 如果用户没有指定,就是默认的 tensor(1.),如果指定了,就是起始梯度。
在工作线程 thread_main 中,可以用如下方式构建新NodeTask实例,添加到queue中。
在 evaluate_function 之中,当完成一个节点的反向计算之后,会查找下一个可以计算的节点,如果找到了,就取出当前节点的下一条边,然后依据这个边构建一个NodeTask,放入对应的工作线程(依据下一条边的device等等信息)的 ReadyQueue。
for (int i = 0; i < num_outputs; ++i) { // 遍历输入节点
const auto& next = fn.next_edge(i); // 查找下一个可以计算的节点
if (not_ready_it == not_ready.end()) {
// Skip functions that aren't supposed to be executed
// No buffers have been allocated for the function
InputBuffer input_buffer(next.function->num_inputs());
// Accumulates into buffer
const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA);
input_buffer.add(next.input_nr,
std::move(output),
opt_parent_stream,
opt_next_stream);
if (is_ready) {
auto queue = ready_queue(cpu_ready_queue, input_buffer.device());
// 插入下一个需要计算的NodeTask
queue->push(
NodeTask(graph_task, next.function, std::move(input_buffer)));
} else {
not_ready.emplace(next.function.get(), std::move(input_buffer));
}
} else {
// The function already has a buffer
auto &input_buffer = not_ready_it->second;
// Accumulates into buffer
const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA);
input_buffer.add(next.input_nr,
std::move(output),
opt_parent_stream,
opt_next_stream);
if (is_ready) {
auto queue = ready_queue(cpu_ready_queue, input_buffer.device());
// 插入下一个需要计算的NodeTask
queue->push(
NodeTask(graph_task, next.function, std::move(input_buffer)));
not_ready.erase(not_ready_it);
}
}
}
其中,const auto& next = fn.next_edge(i);
就是用来查找下一个节点。
next_edge 代码如下:
const Edge& next_edge(size_t index) const noexcept {
return next_edges_[index];
}
next_edges_ 指向的是前向图中该Node节点的输入节点,所以在反向传播中,就是该节点的输出节点。
在 thread_main 之中,有一个 work around。就是:当前工作线程完成 graph_task,但此时,拥有graph_task的线程可能正在pop()上等待休眠。因此,我们需要向所属线程发送一个仿造的函数任务,以唤醒它,这样我们可以退出thread_main。
// Check if we've completed execution.
if (local_graph_task->completed()) {
local_graph_task->mark_as_completed_and_run_post_processing();
auto base_owner = local_graph_task->owner_;
// The current worker thread finish the graph_task, but the owning thread
// of the graph_task might be sleeping on pop() if it does not have work.
// So we need to send a dummy function task to the owning thread just to
// ensure that it's not sleeping, so that we can exit the thread_main.
// If it has work, it might see that graph_task->outstanding_tasks_ == 0
// before it gets to the task, but it's a no-op anyway.
//
// NB: This is not necessary if the current thread is the owning thread.
if (worker_device != base_owner) {
// Synchronize outstanding_tasks_ with queue mutex
std::atomic_thread_fence(std::memory_order_release);
ready_queue_by_index(local_graph_task->cpu_ready_queue_, base_owner)
->push(NodeTask(local_graph_task, nullptr, InputBuffer(0)));
}
}
首先,我们可以回忆一下graph_root 的初始化,graph_root 由roots和inputs构建,roots就是最终输出节点的gradient_edge(),比如 [ (MulBackward0实例,0),(PowerBackward0, 0) ]。inputs 如果用户没有指定,就是默认的 tensor(1.)。
auto graph_root = skip_dummy_node ?
roots.at(0).function : // 如果只有一个root,就直接使用root作为 GraphRoot
std::make_shared<GraphRoot>(roots, inputs); // 如果多个root,就构造一个GraphRoot
其次,我们看看如何消费。
当worker线程刚被创建出来的时候,该线程被阻塞在queue->pop(),就是等待生产者往这个queue里插入一个task。当主线程往 ReadyQueue 发送了 NodeTask 实例之后,消费端的工作线程在 thread_main 的 pop 结束阻塞被唤醒。
于是worker线程获取 到了NodeTask。worker线程 然后:
具体代码如下:
// 工作线程之中如何消费 NodeTask
NodeTask task = local_ready_queue->pop();
if (task.fn_ && !local_graph_task->has_error_.load()) {
AutoGradMode grad_mode(local_graph_task->grad_mode_);
try {
GraphTaskGuard guard(local_graph_task);
NodeGuard ndguard(task.fn_);
// 后向计算
evaluate_function(local_graph_task, task.fn_.get(), task.inputs_, local_graph_task->cpu_ready_queue_);
} catch (std::exception& e) {
thread_on_exception(local_graph_task, task.fn_, e);
}
}
}
对于主线程,则调用 init_local_ready_queue() 来 初始化local ready_queue。
因为 init_local_ready_queue 没有传入参数,所以新生成一个queue。
void Engine::init_local_ready_queue(std::shared_ptr<ReadyQueue> ready_queue) {
if (ready_queue) {
// if ready_queue provided in the caller, use the caller's ready_queue to initialize local_ready_queue
local_ready_queue = std::move(ready_queue);
} else if (!local_ready_queue){
// otherwise if local_ready_queue not allocated, allocate a new ready_queue
local_ready_queue = std::make_shared<ReadyQueue>();
}
}
这就是 CPU queue。我们把 CPU queue 和工作线程的queue做比较。
device_ready_queues_
进行信息共享。cpu_ready_queue_
,用户应该向其发送要完成的工作。CPU queue 就是GraphTask 的成员变量 cpu_ready_queue_。
// CPU threads are dedicated to processing CPU work for the backward they invoked.
// So any given graph task maintains its own cpu_ready_queue_ where you should send
// work for it to be done. We memoize the cpu_ready_queue_ per GraphTask so that
// we know which ready queue we should push to if we are on device thread (i.e. GPU)
// and but next NodeTask should be run on CPU.
std::shared_ptr<ReadyQueue> cpu_ready_queue_;
注意,CPU就绪队列为每个GraphTask独有,但CUDA设备就绪队列在所有GraphTask中共享。
所以,引擎之中就绪队列数目是:设备数目 + GraphTask 数目。
我们完善一下之前的图例,加入了GraphTask 和 Engine 信息,具体如下图:
+-------------------------+
| GraphTask |
| |
| cpu_ready_queue_ |
| + |
| | |
+-------------------------+
|
+--------------+ | +-----------------+
| Main Thread | v | Worker Thread 1 |
| | 1 +-------+---------+ 2 | |
| | push(NodeTask) | | pop(NodeTask) | |
| +-------------------> | CPU ReadyQueue +-----------------------> |
| | | | | |
| | +-----------------+ | |
| | +----------------------+ | |
| | | Device ReadyQueues | | |
| | | | | |
| | | | 3 | |
| | | +-------------+ | push(NodeTask)| |
| | | | ReadyQueue 1| <-----------------------+ |
| | | +------+------+ | | |
| | | | | | |
+--------------+ | | | +-----------------+
| +------------------+
| | | +-----------------+
+------------------------+ | . | | | Worker Thread 2 |
| Engine | | . | | | |
| | | . | | | |
| | | | | | |
| device_ready_queues_ +---> | +-------------+ | +-------------> |
| | | | ReadyQueue 2| | pop(NodeTask) | |
| | | +-------------+ | 4 | |
+------------------------+ | | | |
| | | |
| +-------------+ | | |
| | ReadyQueue 3| | | |
| +-------------+ | | |
| | | |
+----------------------+ +-----------------+
至此,静态结构和基础类介绍完毕,下一篇我们介绍动态逻辑。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。