上一篇文章中,主要讲解了token的一些通用知识,以及hadoop中,token的实现和通用数据结构及流程。
本文主要讲述yarn任务提交运行过程中涉及的几个重要token:AMRMToken,NMToken,ContainerToken。
【AMRMToken】
用于保证ApplicationMaster(下面均简称AM)与RM之间的安全通信,即AM向RM注册,以及后续向RM申请资源的rpc请求,都会带上该token。
AMRMToken在客户端向RM提交任务后,由RM创建生成,然后通过rpc请求传递给NM;NM通过将token持久化到本地文件,让AM启动后从对应文件中加载到token,这样AM就可以使用正确的token向RM注册并完成rpc请求交互了。接下来就展开说明下。
1)token的生成
客户端提交任务请求后,RM在内部的处理中,为AM构造对应的container启动上下文时,创建了AMRMToken,相关代码如下所示:
// AMLauncher.java
private void launch() throws IOException, YarnException {
...
// 构造 container 启动上下文
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
...
}
private ContainerLaunchContext createAMContainerLaunchContext(
ApplicationSubmissionContext applicationMasterContext,
ContainerId containerID) throws IOException {
...
setupTokens(container, containerID);
...
}
protected void setupTokens(ContainerLaunchContext container, ContainerId, containerID)
throws IOException {
...
// 构造 AMRMToken
Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken();
if (amrmToken != null) {
credentials.addToken(amrmToken.getService(), amrmToken);
}
...
}
protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
Token<AMRMTokenIdentifier> amrmToken =
this.rmContext.getAMRMTokenSecretManager()
.createAndGetAMRMToken(application.getAppAttemptId());
((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
return amrmToken;
}
2)AMRMToken的传递
a. RM --> NM
在构造完container启动上下文后,将启动上下文随container启动请求(StartContainerRequest)发送给NM。
b. NM --> AM
NM收到请求后,内部构造Container实例对象,并从请求中取出credential保存在实例对象中,在真正需要启动AM时,将token信息写到本地文件中。
// ContainerLauncher.java
public Integer call() {
// token存储在 nmPrivate 中的路径
Path nmPrivateTokensPath =
dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR +
String.format(TOKEN_FILE_NAME_FMT, containerIdStr));
// Set th token location too.
// 为AM设置环境变量
addToEnvMap(
environment, nmEnvVars,
ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
new Path(
containerWorkDir,
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
// 将token写入文件中
try (DataOutputStream tokensOutStream =
lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE))) {
Credentials creds = container.getCredentials();
creds.writeTokenStorageToStream(tokensOutStream);
}
...
}
//DefaultContainerExecutor.java
public int launchContainer(ContainerStartContext ctx) {
// copy container tokens to work dir
Path tokenDst =
new Path(containerWorkDir, Containerlaunch.FINAL_CONTAINER_TOKENS_FILE);
copyFile(nmPrivateTokensPath, tokenDst, user);
}
从上面的代码可以看到,实际上先将token写入nmPrivate目录中,以container的ID作为文件名,".tokens"作为文件后缀,然后将token文件拷贝到container的工作目录中,并重命名为container.tokens。
例如,存储在nmPrivate目录下的token:
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# pwd
/home/hncscwc/hadoop/yarn/nodemanager/local/nmPrivate/application_1652243949356_2011/container_e301_1652243949356_2011_01_000003
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# ll
total 52
-rw-r--r-- 1 hadoop hadoop 8 May 13 16:27 container_e301_1652243949356_2011_01_000003.pid
-rw-r--r-- 1 hadoop hadoop 387 May 13 16:27 container_e301_1652243949356_2011_01_000003.tokens
-rw-r--r-- 1 hadoop hadoop 43441 May 13 16:27 launch_container.sh
存储在container工作目录下的token:
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# pwd
/home/hncscwc/hadoop/yarn/nodemanager/local/usercache/dcp/appcache/application_1652243949356_2011/container_e301_1652243949356_2011_01_000003
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# ll
total 72
lrwxrwxrwx 1 hadoop hadoop 100 May 13 16:27 __app__.jar -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3906/spark-examples_2.11-2.4.4.jar
-rw-r--r-- 1 hadoop hadoop 387 May 13 16:27 container_tokens
-rwx------ 1 hadoop hadoop 750 May 13 16:27 default_container_executor_session.sh
-rwx------ 1 hadoop hadoop 805 May 13 16:27 default_container_executor.sh
-rwx------ 1 hadoop hadoop 43441 May 13 16:27 launch_container.sh
lrwxrwxrwx 1 hadoop hadoop 91 May 13 16:27 metrics-influxdb.jar -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3910/metrics-influxdb.jar
lrwxrwxrwx 1 hadoop hadoop 89 May 13 16:27 metrics.properties -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3909/metrics.properties
lrwxrwxrwx 1 hadoop hadoop 89 May 13 16:27 __spark_conf__ -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3908/__spark_conf__.zip
lrwxrwxrwx 1 hadoop hadoop 94 May 13 16:27 spark-influxdb-sink.jar -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3907/spark-influxdb-sink.jar
drwxr-xr-x 2 hadoop hadoop 12288 May 13 16:27 __spark_libs__
drwx--x--- 2 hadoop hadoop 55 May 13 16:27 tmp
3)AM启动后的注册校验
AM进程启动后,构造UGI(UserGroupInformation)对象时,会根据环境变量HADOOP_TOKEN_FILE_LOCATION的值,从指定文件中加载token信息,然后附在rpc请求中向RM进行注册。RM收到请求后由对应的SecretManager(这里对应于AMRMTokenSecretManager)完成认证逻辑。认证的逻辑在上一篇文章有详细介绍。
需要注意的是:CONTAINER_TOKEN_FLIE_ENV_NAME的值与HADOOP_TOKEN_FILE_LOCATION的值是相同的,这样就可以保证正确读取到对应的token。
【NMToken】
NMToken则是用于与NM的安全通信。
从任务提交运行的流程中可以知道,RM和AM都会和NM通信请求启动container,其中RM向NM请求启动AM;而AM则是向NM请求启动任务container。因此,在RM与NM的通信、AM与NM的通信中都会用到NMToken。
1) NM向RM注册获取NMToken的MasterKey
由于NMToken是由RM生成的,但最终在NM中进行校验,因此NM需要和RM使用一样的密钥,这个密钥是在NM向RM注册时获取的,并在心跳请求中更新密钥信息。
// ResourceTrackerService.java
public RegisterNodeManagerResponse registerNodeManager(
egisterNodeManagerRequest request) throws YarnException, IOException {
...
RegisterNodeManagerResponse response =
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
...
// 返回 containerToken 和 NMToken 的密钥信息
response.setContainerTokenMasterKey(
containerTokenSecretManager.getCurrentKey());
response.setNMTokenMasterKey(
nmTokenSecretManager.getCurrentKey());
}
// NodeStatusUpdaterImpl.java
protected void registerWithRM()
throws YarnException, IOException {
...
regNMResponse =
resourceTracker.registerNodeManager(request);
MasterKey masterKey =
regNMResponse.getContainerTokenMasterKey();
// do this now so that its set before we start heartbeating to RM
// It is expected that status updater is started by this point and
// RM gives the shared secret in registration during
// StatusUpdater#start().
if (masterKey != null) {
this.context.getContainerTokenSecretManager()
.setMasterKey(masterKey);
}
masterKey = regNMResponse.getNMTokenMasterKey();
if (masterKey != null) {
this.context.getNMTokenSecretManager()
.setMasterKey(masterKey);
}
}
2)RM向NM请求启动AM
在请求中会携带NMToken:
// AMLauncher.java
private void launch() throws IOException, YarnException {
connect();
...
}
private void connect() throws IOException {
ContainerId masterContainerID = masterContainer.getId();
containerMgrProxy = getContainerMgrProxy(masterContainerID);
}
protected ContainerManagementProtocol getContainerMgrProxy(
final ContainerId containerId) {
final NodeId node = masterContainer.getNodeId();
final InetSocketAddress containerManagerConnectAddress =
NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
final YarnRPC rpc = getYarnRPC();
UserGroupInformation currentUser =
UserGroupInformation.createRemoteUser(containerId
.getApplicationAttemptId().toString());
String user =
rmContext.getRMApps()
.get(containerId.getApplicationAttemptId().getApplicationId())
.getUser();
org.apache.hadoop.yarn.api.records.Token token =
rmContext.getNMTokenSecretManager().createNMToken(
containerId.getApplicationAttemptId(), node, user);
currentUser.addToken(ConverterUtils.convertFromYarn(token,
containerManagerConnectAddress));
return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
currentUser, rpc, containerManagerConnectAddress);
}
NM在请求处理中校验:
// ContainerManagerImpl.java
public StartContainersResponse startContainers(
StartContainersRequest requests) throws YarnException, IOException {
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi, nmTokenIdentifier);
...
}
protected NMTokenIdentifier selectNMTokenIdentifier(
UserGroupInformation remoteUgi) {
Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();
NMTokenIdentifier resultId = null;
for (TokenIdentifier id : tokenIdentifiers) {
if (id instanceof NMTokenIdentifier) {
resultId = (NMTokenIdentifier) id;
break;
}
}
return resultId;
}
protected void authorizeUser(UserGroupInformation remoteUgi,
NMTokenIdentifier nmTokenIdentifier) throws YarnException {
if (nmTokenIdentifier == null) {
throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
}
if (!remoteUgi.getUserName().equals(
nmTokenIdentifier.getApplicationAttemptId().toString())) {
throw RPCUtil.getRemoteException("Expected applicationAttemptId: "
+ remoteUgi.getUserName() + "Found: "
+ nmTokenIdentifier.getApplicationAttemptId());
}
}
3)AM启动向RM注册后,从注册的响应中获取NMToken
// AMRMClientImpl.java
private RegisterApplicationMasterResponse registerApplicationMaster()
throws YarnException, IOException {
RegisterApplicationMasterRequest request =
RegisterApplicationMasterRequest.newInstance(this.appHostName,
this.appHostPort, this.appTrackingUrl);
RegisterApplicationMasterResponse response =
rmClient.registerApplicationMaster(request);
synchronized (this) {
lastResponseId = 0;
if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
}
return response;
}
// 将Token放到缓存中
protected void populateNMTokens(List<NMToken> nmTokens) {
for (NMToken token : nmTokens) {
String nodeId = token.getNodeId().toString();
if (LOG.isDebugEnabled()) {
if (getNMTokenCache().containsToken(nodeId)) {
LOG.debug("Replacing token for : " + nodeId);
} else {
LOG.debug("Received new token for : " + nodeId);
}
}
getNMTokenCache().setToken(nodeId, token.getToken());
}
}
4)AM向NM请求启动任务container时,将token放到ugi中
从缓存中取出对应NM节点的的token,然后放到ugi中,随请求一并发送给NM。
// NMClientImpl.java
public Map<String, ByteBuffer> startContainer(
Container container, ContainerLaunchContext containerLaunchContext)
throws YarnException, IOException {
...
proxy =
cmProxy.getProxy(container.getNodeId().toString(),
container.getId());
// 注意containerToken
StartContainerRequest scRequest =
StartContainerRequest.newInstance(containerLaunchContext,
container.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
StartContainersResponse response =
proxy.getContainerManagementProtocol().startContainers(allRequests);
...
}
// ContainerManagementProtocolProxy.java
public synchronized ContainerManagementProtocolProxyData getProxy(
String containerManagerBindAddr, ContainerId containerId)
throws InvalidToken {
...
if (proxy == null) {
proxy =
new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
containerId, nmTokenCache.getToken(containerManagerBindAddr));
if (maxConnectedNMs > 0) {
addProxyToCache(containerManagerBindAddr, proxy);
}
}
...
}
public ContainerManagementProtocolProxyData(YarnRPC rpc,
String containerManagerBindAddr,
ContainerId containerId, Token token) throws InvalidToken {
this.containerManagerBindAddr = containerManagerBindAddr;
this.activeCallers = 0;
this.scheduledForClose = false;
this.token = token;
this.proxy = newProxy(rpc, containerManagerBindAddr, containerId, token);
}
protected ContainerManagementProtocol newProxy(final YarnRPC rpc,
String containerManagerBindAddr, ContainerId containerId, Token token)
throws InvalidToken {
UserGroupInformation user =
UserGroupInformation.createRemoteUser(containerId
.getApplicationAttemptId().toString());
org.apache.hadoop.security.token.Token<NMTokenIdentifier> nmToken =
ConverterUtils.convertFromYarn(token, cmAddr);
user.addToken(nmToken);
return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
user, rpc, cmAddr);
}
【ContainerToken】
在向NM请求启动container时,除了需要NMToken之外,还需要ContainerToken,以验证container的合法性。
ContainerToken和NMToken采用相同的方式,因此密钥的获取方式与流程以及更新,和前面NMToken中讲到的几乎是同一个流程。
首先,同样是在NM的注册与定时心跳请求中,RM向NM同步并更新密钥。RM向NM请求container时,直接创建并带上ContainerToken;而AM则是通过向RM申请资源时,RM创建了ContainerToken,并随请求的应答传递给了AM。此后AM再向NM请求启动container时,则带上了对应的Token信息,有兴趣的朋友可以对照流程走读相关源码。
另外,该token大的类型虽然都是containerToken,但实际上又细分为ApplicaitonMaster和Task两类,分别用于RM与NM通信、AM与NM通信中。
【LocalizerToken】
LocalizerToken主要用于NM的资源本地化服务与NM之间的通信。由于NM资源本地化服务是以一个独立进程的方式运行的,并且会通过rpc协议不断向NM汇报资源下载情况,因此使用Token来保证通信安全。
【总结】
小结一下,本文主要讲解了Yarn运行中涉及的几个token,具体包括token的作用,如何创建,具体使用的流程。
另外,除了上面介绍的几个token之外,各个任务(mr/spark/flink)在运行时,也还存在一些其他的token,例如mr中会用到的ClientToAMToken等,有兴趣的可以自行摸索下~