目录
上文我们看到了AutogradMetadata,DistAutogradContainer 和 DistAutogradContext 等一系列基础类。我们知道了分布式autograd如何基于RPC进行传递,如何在节点之间交互,节点如何区分维护这些Session。本文继续分析,主要目的是看看反向传播如何切入到引擎之中。
我们回忆一下前面几篇文章的内容。
首先,对于分布式 autograd,我们需要在前向传播期间跟踪所有 RPC,以确保正确执行后向传播。为此,当执行 RPC 时候,我们把 send
和recv
functions 附加到autograd图之上。
send
函数附加到 RPC 的发起源节点之上,其输出边指向 RPC 输入张量的 autograd 函数。在向后传播期间,send
函数的输入是从目标接收的,是对应recv
函数的输出。recv
函数附加到 RPC 的接受目标节点之上,其输入从某些运算符得到,这些运算符使用输入张量在RPC接受目标上执行。在后向传播期间,recv
函数的输出梯度将被发送到源节点之上,并且作为send
方法的输入。send-recv
对被分配一个全局唯一的autograd_message_id
以唯一地标识该send-recv
对。这对于在向后传播期间查找远程节点上的相应函数很有用。torch.distributed.rpc.RRef.to_here()
时,我们都为涉及的张量添加了一个适当的send-recv
对。其次,在前向传播的具体代码之中,我们在上下文中存储每个 autograd 传播的send
和recv
函数。这确保我们在 autograd 图中保存对适当节点的引用以使其保持活动状态。除此之外,这也使得在向后传播期间很容易查找到对应的send
和recv
函数。
再次,以下是 torch/csrc/distributed/rpc/message.h 之中的部分消息定义:
// Messages with autograd info
FORWARD_AUTOGRAD_REQ = 0x0f | MessageTypeFlags::REQUEST_TYPE,
FORWARD_AUTOGRAD_RESP = 0x10 | MessageTypeFlags::RESPONSE_TYPE,
// Messages to propagate gradients on the backward pass.
BACKWARD_AUTOGRAD_REQ = 0x11 | MessageTypeFlags::REQUEST_TYPE,
BACKWARD_AUTOGRAD_RESP = 0x12 | MessageTypeFlags::RESPONSE_TYPE,
在前文,我们看到了 FORWARD_AUTOGRAD_REQ 在前向传播之中如何调用,假设如下代码:rpc.rpc_sync("worker1", torch.add, args=(t1, t2)),其调用序列是:
至此,关于整体流程,我们就有了几个疑问:
我们接下来就围绕这些疑问进行分析,核心就是如何进入 dist.autograd 引擎。
我们首先从计算图来通过几个示例来看看。
首先看看普通计算,这个是 dist.auto 官方图例的本地版本。可以看到是由 AddBackward0,AccumulateGrad 和 MulBackward0 等组成了计算图。
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
t3 = t1 + t2
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
next_functions = t5.grad_fn.next_functions
具体对应如下图:
接下来看看分布式的例子,这个例子就是官方设计中图例大致对应的代码,我们把 torch.mul(t3, t4) 命名为 t5,加入了 loss。
def worker0():
# On worker 0:
# Setup the autograd context. Computations that take
# part in the distributed backward pass must be within
# the distributed autograd context manager.
with dist_autograd.context() as context_id:
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
# Perform some computation remotely.
t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))
# Perform some computation locally based on remote result.
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
# Compute some loss.
loss = t5.sum()
# Run the backward pass.
dist_autograd.backward(context_id, [loss])
# Retrieve the gradients from the context.
dist_autograd.get_gradients(context_id)
print(loss)
在分布式之下,t3 是异地运行。
我们把设计图例再展示出来,上面示例代码就是下图的左侧 worker 0,t3 实际就是运行在 worker 1,大家可以看到分布式上下文中的一些特点。
为了更好的说明,我们打印了一些log作为注释。
def _verify_send(send_function):
print(send_function.name())
next_funcs = send_function.next_functions
print(next_funcs[0][0].name())
print(next_funcs[1][0].name())
def _verify_recv(recv_function):
print(recv_function.name())
next_funcs = recv_function.next_functions
print(len(next_funcs))
def worker0():
# On worker 0:
# Setup the autograd context. Computations that take
# part in the distributed backward pass must be within
# the distributed autograd context manager.
with dist_autograd.context() as context_id:
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
# Perform some computation remotely.
#t3 = rpc.rpc_sync("worker1", my_add, args=(t1, t2))
t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))
# Perform some computation locally based on remote result.
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
# Compute some loss.
loss = t5.sum()
print("--- send ---")
ctx = dist_autograd._retrieve_context(context_id)
send_functions = ctx._send_functions()
_verify_send(list(send_functions.values())[0])
print("--- loss ---")
print(loss)
mul_func = loss.grad_fn.next_functions[0][0]
print(mul_func.name())
next_funcs = mul_func.next_functions
print(next_funcs[0][0].name())
print(next_funcs[1][0].name())
print("---- recv ----")
recv_functions = ctx._recv_functions()
_verify_recv(list(recv_functions.values())[0])
# Run the backward pass.
dist_autograd.backward(context_id, [loss])
# Retrieve the gradients from the context.
dist_autograd.get_gradients(context_id)
打印结果是:
--- send ---
torch::distributed::autograd::SendRpcBackward
torch::autograd::AccumulateGrad
torch::autograd::AccumulateGrad
--- loss ---
tensor(3.5197, grad_fn=<SumBackward0>)
MulBackward0
torch::distributed::autograd::RecvRpcBackward
torch::autograd::AccumulateGrad
---- recv ----
torch::distributed::autograd::RecvRpcBackward
加上分布式相关算子之后,图例如下:
我们接下来要看看如何进入dist autograd 引擎,结合我们图例,就是:
我们找一找如何发起反向传播,按照从下往上的顺序进行。这里也有两种:
我们从上往下看分布式 autograd 的 backward 如何主动调用,比如在示例之中会显示调用。
def worker0():
# On worker 0:
with dist_autograd.context() as context_id:
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
# Perform some computation remotely.
t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))
# Perform some computation locally based on remote result.
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
# Compute some loss.
loss = t5.sum()
# Run the backward pass.
dist_autograd.backward(context_id, [loss]) // 这里会调用
在 torch/_C/_distributed_autograd.pyi
之中我们可以看到如下注释:
# This module is defined in torch/csrc/distributed/autograd/init.cpp
因此我们去torch/csrc/distributed/autograd/init.cpp文件中看看。
省略了部分代码,这里能看到生成了上下文,定义了 backward,get_gradients等等。
PyObject* dist_autograd_init(PyObject* _unused, PyObject* noargs) {
auto autograd_module =
THPObjectPtr(PyImport_ImportModule("torch.distributed.autograd"));
auto torch_C_module = THPObjectPtr(PyImport_ImportModule("torch._C"));
auto torch_C_m = py::handle(torch_C_module).cast<py::module>();
auto m = torch_C_m.def_submodule("_distributed_autograd", "distributed autograd bindings");
auto module = py::handle(m).cast<py::module>();
auto distAutogradContext =
shared_ptr_class_<DistAutogradContext>(module, "DistAutogradContext")
.def(
"_context_id",
&DistAutogradContext::contextId,
py::call_guard<py::gil_scoped_release>())
.def(
"_recv_functions",
[](const DistAutogradContext& ctx) {
std::map<int64_t, py::object> funcs;
for (const auto& map_entry : ctx.recvFunctions()) {
funcs.emplace(
map_entry.first,
py::reinterpret_steal<py::object>(
torch::autograd::functionToPyObject(
map_entry.second)));
}
return funcs;
})
.def(
"_send_functions",
[](const ContextPtr& ctx) {
std::map<int64_t, py::object> funcs;
for (const auto& map_entry : ctx->sendFunctions()) {
funcs.emplace(
map_entry.first,
py::reinterpret_steal<py::object>(
torch::autograd::functionToPyObject(
map_entry.second)));
}
return funcs;
})
.def("_known_worker_ids", &DistAutogradContext::getKnownWorkerIds);
module.def(
"_new_context",
[]() -> const ContextPtr {
return DistAutogradContainer::getInstance().newContext();
},
py::return_value_policy::reference);
py::options options;
options.disable_function_signatures();
module.def(
"backward",
backward,
py::arg("contextId"),
py::arg("roots"),
py::arg("retain_graph") = false,
py::call_guard<py::gil_scoped_release>());
module.def(
"get_gradients",
[](int64_t contextId) -> py::dict {
const auto& autogradContext =
DistAutogradContainer::getInstance().retrieveContext(contextId);
return torch::jit::toPyObject(IValue(autogradContext->getGradients()));
},
py::arg("context_id"));
Py_RETURN_TRUE;
}
} // namespace
具体 backward 定义在 torch/csrc/distributed/autograd/autograd.cpp。
void backward(
int64_t context_id,
const variable_list& roots,
bool retain_graph) {
RECORD_FUNCTION(
kDistAutogradBackwardProfilingKey, std::vector<c10::IValue>());
try {
DistEngine::getInstance().execute(context_id, roots, retain_graph);
} catch (std::exception& e) {
// FIXME: crashes if exception type is not RuntimeError
throw std::runtime_error(e.what());
}
}
可以看到,最终会调用到 DistEngine::getInstance().execute(context_id, roots, retain_graph) 完成反向传播。这就进入了引擎。
因为是隐式发起,所以代码比较隐蔽,我们这次采用从下至上的方式来剥丝抽茧。我们知道,如果节点之间要求反向传播,会发送BACKWARD_AUTOGRAD_REQ,所以我们从 BACKWARD_AUTOGRAD_REQ 开始发起寻找。
在 torch/csrc/distributed/autograd/rpc_messages/propagate_gradients_req.cpp 之中 PropagateGradientsReq::toMessageImpl 会调用到 BACKWARD_AUTOGRAD_REQ。
Message PropagateGradientsReq::toMessageImpl() && {
std::vector<at::IValue> ivalues;
// Add all the grad tensors.
for (const auto& grad : grads_) {
ivalues.emplace_back(grad);
}
// Now add autograd metadata.
ivalues.emplace_back(autogradMetadata_.autogradContextId);
ivalues.emplace_back(autogradMetadata_.autogradMessageId);
// Add retain graph.
ivalues.emplace_back(retainGraph_);
// Now pickle using JIT pickler.
std::vector<torch::Tensor> tensorTable;
std::vector<char> payload =
jit::pickle(c10::ivalue::Tuple::create(std::move(ivalues)), &tensorTable);
return Message(
std::move(payload),
std::move(tensorTable),
MessageType::BACKWARD_AUTOGRAD_REQ); // 这里会用到
}
继续找谁发出来的 BACKWARD_AUTOGRAD_REQ,就是谁调用到了 toMessageImpl?原来在 torch/csrc/distributed/autograd/functions/recvrpc_backward.cpp 这里构建了 PropagateGradientsReq,会使用 toMessage 来构建一个消息。即,RecvRpcBackward 的调用会发送 BACKWARD_AUTOGRAD_REQ。
variable_list RecvRpcBackward::apply(variable_list&& grads) { // 调用Node
std::vector<Variable> outputGrads;
for (size_t i = 0; i < grads.size(); i++) {
const auto& grad = grads[i];
if (grad.defined()) {
outputGrads.emplace_back(grad);
} else {
// Put in zeros for a tensor with no grad.
outputGrads.emplace_back(input_metadata(i).zeros_like());
}
}
auto sharedContext = autogradContext_.lock();
// Send the gradients over the wire and record the future in the autograd
// context.
PropagateGradientsReq gradCall( // 这里构建了 PropagateGradientsReq
autogradMetadata_,
outputGrads,
sharedContext->retrieveGraphTask()->keep_graph_);
// Send the gradients over to the appropriate node.
auto rpcAgent = rpc::RpcAgent::getCurrentRpcAgent();
auto jitFuture = rpcAgent->send( // 发送出去,就是给后向传播过程的下一个节点
rpcAgent->getWorkerInfo(fromWorkerId_),
std::move(gradCall).toMessage(), // 这里调用了PropagateGradientsReq::toMessageImpl
rpc::kUnsetRpcTimeout,
deviceMap_);
// Record the future in the context.
sharedContext->addOutstandingRpc(jitFuture);
// 'recv' function sends the gradients over the wire using RPC, it doesn't
// need to return anything for any downstream autograd function.
return variable_list();
}
所以我们知道,在 RecvRpcBackward 的执行时候,会发送 BACKWARD_AUTOGRAD_REQ,发送给下一个节点。具体哪里调用 RecvRpcBackward?我们会在下一篇 DistEngine 之中介绍。
此时具体如下,对应就是 worker 0 的 t3 给 worker 1 发送 BACKWARD_AUTOGRAD_REQ 消息。
+
worker 0 | worker 1
|
|
RecvRpcBackward PropagateGradientsReq |
+ + |
| | |
| | |
| | |
v | |
| |
apply() | |
+ | |
| v |
| |
| +------------------------------> toMessageImpl |
| + |
| | |
| Message(BACKWARD_AUTOGRAD_REQ) | |
| <----------------------------------------+ |
| |
| |
v |
|
rpcAgent+>send(Message) +-------------------------------------------->
+ BACKWARD_AUTOGRAD_REQ |
| |
| |
v |
+
对应示例图就是:
我们接下来看看接收方如何处理反向传播,我们再次回到 worker 1,就是图上的 send 节点如何接受反向传播消息。
在生成 TensorPipeAgent 时候,把 RequestCallbackImpl 配置为回调函数。这是 agent 的统一响应函数。前面关于代理接收逻辑时候,我们也提到了,会进入 RequestCallbackNoPython::processRpc 函数。其中可以看到有对 BACKWARD_AUTOGRAD_REQ 的处理逻辑。
这种是 RPC 的正常流程。
void RequestCallbackNoPython::processRpc(
RpcCommandBase& rpc,
const MessageType& messageType,
const int64_t messageId,
const c10::intrusive_ptr<JitFuture>& responseFuture,
std::shared_ptr<LazyStreamContext> ctx) const {
switch (messageType) {
case MessageType::BACKWARD_AUTOGRAD_REQ: {
processBackwardAutogradReq(rpc, messageId, responseFuture); // 这里调用
return;
};
在 processBackwardAutogradReq 之中会:
void RequestCallbackNoPython::processBackwardAutogradReq(
RpcCommandBase& rpc,
const int64_t messageId,
const c10::intrusive_ptr<JitFuture>& responseFuture) const {
auto& gradientsCall = static_cast<PropagateGradientsReq&>(rpc);
const auto& autogradMetadata = gradientsCall.getAutogradMetadata();
// Retrieve the appropriate autograd context.
auto autogradContext = DistAutogradContainer::getInstance().retrieveContext(
autogradMetadata.autogradContextId); // 得到发送者的context id
// Lookup the appropriate 'send' function to enqueue.
std::shared_ptr<SendRpcBackward> sendFunction = // 依据发送者context id和消息id得到sendFunction
autogradContext->retrieveSendFunction(autogradMetadata.autogradMessageId);
// Attach the gradients to the send function.
sendFunction->setGrads(gradientsCall.getGrads()); // 设置梯度
// Now execute the autograd graph using the "distributed engine."
auto execFuture = DistEngine::getInstance().executeSendFunctionAsync( // 调用引擎
autogradContext, sendFunction, gradientsCall.retainGraph());
// Our response is satisfied when the rpcs come back.
execFuture->addCallback([responseFuture, messageId](JitFuture& execFuture) {
if (!execFuture.hasError()) {
Message m = std::move(PropagateGradientsResp()).toMessage();
m.setId(messageId);
responseFuture->markCompleted(
IValue(c10::make_intrusive<Message>(std::move(m))));
} else {
responseFuture->setError(execFuture.exception_ptr());
}
});
}
在 worker 1 的 DistEngine::executeSendFunctionAsync 内部,会进行辗转处理,最终发送 BACKWARD_AUTOGRAD_REQ 到其反向传播的下游,所以我们继续在示例图之上修改拓展,增加一个 BACKWARD_AUTOGRAD_REQ。
我们可以看到有两个途径进入 dist autograd 引擎,启动反向传播: